CLOUD

Scripts for Splunk Integration with Amazon CloudWatch Logs Insights

The below is a sample code referenced in the blog post, "Get Complete Hybrid Visibility in Splunk by Querying AWS CloudWatch Logs Insights."

The following scripts are provided under the Splunk Software License Agreement. The following components are provided by their authors and subject to the Apache 2.0 license: requests and Amazon Version 4 signing example code. The Amazon Version 4 signing example was modified for purposes of this integration.

Below is the content of loginsights_connector.py:

import sys, datetime, hashlib, hmac 
import requests
import json
import time
import ingest


####################################################################################################
# Account-Specific Data to be Configured by End User
####################################################################################################


# Authentication Info #
access_key = "<your access key>"
secret_key = "<your access key>"
algorithm = 'AWS4-HMAC-SHA256'


# Valid Query Names #
YOUR_CUSTOM_QUERY_NAME = "yourCustomQueryName"


# example query names:
# VPC_METRIC_SRC_QUERY = "vpcMetricSrc"
# VPC_METRIC_DST_BIN_QUERY = "vpcMetricDstBin"
# VPC_METRIC_BIN_QUERY = "vpcMetricBin"


# Valid Queries
VALID_QUERIES = {
 YOUR_CUSTOM_QUERY_NAME: "<your custom query goes here>",
}


# example queries:
# VALID_QUERIES = {
# VPC_METRIC_SRC_QUERY: "stats avg(bytes), min(bytes), max(bytes) by srcAddr",
# VPC_METRIC_DST_BIN_QUERY: "stats avg(bytes), min(bytes), max(bytes) by dstAddr, bin(5m)",
# VPC_METRIC_BIN_QUERY: "stats avg(bytes), min(bytes), max(bytes) by bin(5m)",
# }


# valid loggroups
VALID_LOG_GROUP_NAMES = {
 YOUR_CUSTOM_QUERY_NAME: "<your log group name goes here>",
}


# example loggroups:
# VALID_LOG_GROUP_NAMES = {
# VPC_METRIC_SRC_QUERY: "flowlogsGroup",
# VPC_METRIC_DST_BIN_QUERY: "flowlogsGroup",
# VPC_METRIC_BIN_QUERY: "flowlogsGroup",
# }


####################################################################################################
# Default Settings - change these only if you KNOW you need to
####################################################################################################


# Valid Actions
SCHEDULE_QUERY_TARGET = 'Logs_20140328.StartQuery'
GET_QUERY_RESULTS_TARGET = 'Logs_20140328.GetQueryResults'


# Service Metadata
SERVICE = 'logs'
HOST = 'logs.us-west-2.amazonaws.com'
REGION = 'us-west-2'
ENDPOINT = 'https://logs.us-west-2.amazonaws.com'
CONTENT_TYPE = 'application/x-amz-json-1.1'
HEADER_NAMES = 'content-type;date;host;user-agent;x-amz-date;x-amz-target;x-amzn-requestid'


####################################################################################################
# Sample Commands
####################################################################################################


#python loginsights_connector.py "<search window start datetime YYYY-MM-DD HH:MM:SS>" "<search windows end datetime YYYY-MM-DD HH:MM:SS>" <query name> <splunk host> <splunk hec token>
#python loginsights_connector.py "2018-10-08 07:00:00" "2018-11-08 09:46:08" vpcMetricBin "my.splunk.host.com" 35b5a4b4-0726-4dbe-984b-359b79051377
#python loginsights_connector.py "2018-10-08 07:00:00" "2018-11-08 09:46:08" vpcMetricSrc "my.splunk.host.com" 35b5a4b4-0726-4dbe-984b-359b79051377
#python loginsights_connector.py "2018-10-08 07:00:00" "2018-11-08 09:46:08" vpcMetricSrcBin "my.splunk.host.com" 35b5a4b4-0726-4dbe-984b-359b79051377
#python loginsights_connector.py "2018-10-08 07:00:00" "2018-11-08 09:46:08" vpcMetricDstBin "my.splunk.host.com" 35b5a4b4-0726-4dbe-984b-359b79051377


# time must ALWAYS be formatted like so: <YYYY-MM-DD HH:MM:SS>
# Example: 2018-12-31 24:59:59


####################################################################################################
# Executable Code
####################################################################################################


def sign(key, msg):
    return hmac.new(key, msg.encode('utf-8'), hashlib.sha256).digest()



def getSignatureKey(key, dateStamp, regionName, serviceName):
    kDate = sign(('AWS4' + key).encode('utf-8'), dateStamp)
    kRegion = sign(kDate, regionName)
    kService = sign(kRegion, serviceName)
    kSigning = sign(kService, 'aws4_request')
    return kSigning


def create_canonical_request(content_type, host, amzdate, amz_target, header_names, request_parameters):
    canonical_uri = '/' 
    canonical_querystring = ''
    canonical_headers = 'content-type:%s\ndate:\nhost:%s\nuser-agent:python-requests/2.18.4\nx-amz-date:%s\nx-amz-target:%s\nx-amzn-requestid:\n' % \
         (content_type, host, amzdate, amz_target)
    payload_hash = hashlib.sha256(request_parameters.encode('utf-8')).hexdigest()
    canonical_request = 'POST' + '\n' + canonical_uri + '\n' + canonical_querystring + '\n' + canonical_headers + '\n' + header_names + '\n' + payload_hash
    return canonical_request


def create_and_sign_request_headers(region, service, host, content_type, header_names, query_target, request_parameters):

    # create the canonical request here
    time_now = datetime.datetime.utcnow()
    amzdate = time_now.strftime('%Y%m%dT%H%M%SZ')
    canonical_request = create_canonical_request(
        content_type, host, amzdate, query_target, header_names, request_parameters)


    # create string to sign
    datestamp = time_now.strftime('%Y%m%d')
    credential_scope = datestamp + '/' + region + '/' + service + '/' + 'aws4_request'
    string_to_sign = algorithm + '\n' +  amzdate + '\n' + credential_scope + '\n' +  hashlib.sha256(canonical_request.encode('utf-8')).hexdigest()
 

   # sign the string
    signing_key = getSignatureKey(secret_key, datestamp, region, service)
    signature = hmac.new(signing_key, (string_to_sign).encode('utf-8'), hashlib.sha256).hexdigest()


    # return the signed auth header
    signed_auth_header = algorithm + ' ' + 'Credential=' + access_key + '/' + credential_scope + ', ' +  'SignedHeaders=' + header_names + ', ' + 'Signature=' + signature

    return {
        'Content-Type': content_type,
        'X-Amz-Date': amzdate,
        'X-Amz-Target': query_target,
        'Authorization': signed_auth_header
    }

def send_request(endpoint, request_parameters, headers):
    r = requests.post(endpoint, data=request_parameters, headers=headers)
    response_text = r.text
    return response_text


def schedule_query(start_time, end_time, query_name, query_target):
    schedule_query_request_parameters = json.dumps({
        "limit": 100,
        "logGroupName": VALID_LOG_GROUP_NAMES[query_name],
        "startTime": start_time,
        "endTime": end_time,
        "queryString": VALID_QUERIES[query_name]
    })


    headers = create_and_sign_request_headers(REGION, SERVICE, HOST, CONTENT_TYPE, HEADER_NAMES, 
        query_target, schedule_query_request_parameters)
    response_text = send_request(ENDPOINT, schedule_query_request_parameters, headers)
    return json.loads(response_text)['queryId']


def retrieve_query(query_id, query_target):
    print("query_id is: %s" % query_id)


    retrieve_query_request_parameters = json.dumps({
        "queryId": query_id
    })
    headers = create_and_sign_request_headers(REGION, SERVICE, HOST, CONTENT_TYPE, HEADER_NAMES, 
        query_target, retrieve_query_request_parameters)
    response_text = send_request(ENDPOINT, retrieve_query_request_parameters, headers)

    try:
        return json.loads(response_text)
    except Exception as e:
        print("%s" % e)
        return "{\"results\": []}"


def retrieve_command_line_args():
    if access_key is None or secret_key is None:
        print('Exiting because no access key is available.')
        sys.exit()


    if len(sys.argv) < 3:
        print('\nNot enough info to continue: please specify startTime, endTime, and query.\n'
            'Run the command like this: python loginsights_connector.py <startTme> <end_time> <query_name> <hec_token> <hec_hos>\n')
        sys.exit()


    # this must be formatted into seconds since the epoch
    # time is ALWAYS formatted like so: 2018-12-31 24:59:59
    start_time_obj = datetime.datetime.strptime(sys.argv[1], "%Y-%m-%d %H:%M:%S")
    start_secs_since_epoch = int((start_time_obj - datetime.datetime(1970, 1, 1)).total_seconds())


    end_time_obj = datetime.datetime.strptime(sys.argv[2], "%Y-%m-%d %H:%M:%S")
    end_secs_since_epoch = int((end_time_obj - datetime.datetime(1970, 1, 1)).total_seconds())


    query_name = sys.argv[3]
    hec_host = sys.argv[4]
    hec_token = sys.argv[5]


    return start_secs_since_epoch, end_secs_since_epoch, query_name, hec_host, hec_token 


start_secs, end_secs, query_name, hec_host, hec_token = retrieve_command_line_args()


#send the query and retrieve the id, based on the query name
if query_name in VALID_QUERIES.keys():
    query_id = schedule_query(start_secs, end_secs, query_name, SCHEDULE_QUERY_TARGET)
    time.sleep(1)

    retry_count = 0
    previous_response_length = 0
    response_json = retrieve_query(query_id, GET_QUERY_RESULTS_TARGET)

    while((previous_response_length != len(response_json['results'])) or len(response_json['results']) == 0):
        time.sleep(5)
        previous_response_length = len(response_json['results'])

        print("Re-fetching results because results are still being computed: call %s, elapsed time: %ss" % (retry_count, retry_count*5))
        response_json = retrieve_query(query_id, GET_QUERY_RESULTS_TARGET)
        print("  Just fetched %s results" % len(response_json['results']))
        retry_count = retry_count + 1


        if retry_count >= 6:
            print("Hit max retries of %s, returning Amazon's response now" % retry_count)
            break


    #ingest the results to splunk using ingest.py
    ingest.persist_to_metric_store(response_json, hec_host, hec_token)
    file = open("output.json", 'w')
    file.write(json.dumps(response_json))
    file.close()
else:
    print("unknown query '%s', please specify one of these: %s" % (query_name, VALID_QUERIES.keys()))

 

Below is the contents of ingest.py:

import requests
import json
import copy
import time

def get_splunk_auth_header(token):
    return {'Authorization': 'Splunk %s' % token}


def get_splunk_hec_url(hec_host):
    return 'https://%s:8088/services/collector/event' % (hec_host)


def report_to_splunk(data, hec_host, hec_token):
    batch_data = ''
    if type(data) is list:
        for d in data:
            batch_data = batch_data+json.dumps(d)
    else:
        batch_data = json.dumps(data)


    resp = requests.post(
        get_splunk_hec_url(hec_host),
        headers=get_splunk_auth_header(hec_token),
        data= batch_data,
        verify=False
    )
    print resp.json()


def parse_to_hec_format(event, hec_host):
    json= {
        'host': hec_host,
        'source': 'loginsights_connector.py',
        'fields': {}
    }
    metadata = {}
    p='%Y-%m-%d %H:%M:%S.000'
    for f in event:
        if not ( '(' in f['field'] or f['field'].startswith('bin')):
            metadata[f['field']] = f['value'] 
       elif f['field'].startswith('bin'):
            json['time'] = int(time.mktime(time.strptime(f['value'],p))


    json['fields'].update(metadata)
    result = []
    for f in event:
        parsed_json={}
        if '(' in f['field']:
           json['fields']['metric_name'] = f['field'] 
           json['fields']['_value'] =  f['value'] 
           parsed_json.update(json)
           result.append(copy.deepcopy(parsed_json))


    return result


def persist_to_metric_store(data, hec_host, hec_token):
    batch_metric=[]
    for event in data['results']:
        batch_metric.append(parse_to_hec_format(event, hec_host))


    report_to_splunk(batch_metric, hec_host, hec_token)
Andi Mann
Posted by

Andi Mann

Andi Mann, Chief Technology Advocate at Splunk, is an accomplished digital business executive with extensive global expertise as a strategist, technologist, innovator, marketer, and communicator. For over 30 years across five continents, Andi has built success with startups, enterprises, vendors, governments, and as a leading research analyst. Andi has been named to multiple ‘Top …’ lists and is the co-author of two popular books, 'Visible Ops – Private Cloud' and and 'The Innovative CIO'.

Join the Discussion