
The below is a sample code referenced in the blog post, "Get Complete Hybrid Visibility in Splunk by Querying AWS CloudWatch Logs Insights."
The following scripts are provided under the Splunk Software License Agreement. The following components are provided by their authors and subject to the Apache 2.0 license: requests and Amazon Version 4 signing example code. The Amazon Version 4 signing example was modified for purposes of this integration.
Below is the content of loginsights_connector.py:
import sys, datetime, hashlib, hmac
import requests
import json
import time
import ingest
####################################################################################################
# Account-Specific Data to be Configured by End User
####################################################################################################
# Authentication Info #
access_key = "<your access key>"
secret_key = "<your access key>"
algorithm = 'AWS4-HMAC-SHA256'
# Valid Query Names #
YOUR_CUSTOM_QUERY_NAME = "yourCustomQueryName"
# example query names:
# VPC_METRIC_SRC_QUERY = "vpcMetricSrc"
# VPC_METRIC_DST_BIN_QUERY = "vpcMetricDstBin"
# VPC_METRIC_BIN_QUERY = "vpcMetricBin"
# Valid Queries
VALID_QUERIES = {
YOUR_CUSTOM_QUERY_NAME: "<your custom query goes here>",
}
# example queries:
# VALID_QUERIES = {
# VPC_METRIC_SRC_QUERY: "stats avg(bytes), min(bytes), max(bytes) by srcAddr",
# VPC_METRIC_DST_BIN_QUERY: "stats avg(bytes), min(bytes), max(bytes) by dstAddr, bin(5m)",
# VPC_METRIC_BIN_QUERY: "stats avg(bytes), min(bytes), max(bytes) by bin(5m)",
# }
# valid loggroups
VALID_LOG_GROUP_NAMES = {
YOUR_CUSTOM_QUERY_NAME: "<your log group name goes here>",
}
# example loggroups:
# VALID_LOG_GROUP_NAMES = {
# VPC_METRIC_SRC_QUERY: "flowlogsGroup",
# VPC_METRIC_DST_BIN_QUERY: "flowlogsGroup",
# VPC_METRIC_BIN_QUERY: "flowlogsGroup",
# }
####################################################################################################
# Default Settings - change these only if you KNOW you need to
####################################################################################################
# Valid Actions
SCHEDULE_QUERY_TARGET = 'Logs_20140328.StartQuery'
GET_QUERY_RESULTS_TARGET = 'Logs_20140328.GetQueryResults'
# Service Metadata
SERVICE = 'logs'
HOST = 'logs.us-west-2.amazonaws.com'
REGION = 'us-west-2'
ENDPOINT = 'https://logs.us-west-2.amazonaws.com'
CONTENT_TYPE = 'application/x-amz-json-1.1'
HEADER_NAMES = 'content-type;date;host;user-agent;x-amz-date;x-amz-target;x-amzn-requestid'
####################################################################################################
# Sample Commands
####################################################################################################
#python loginsights_connector.py "<search window start datetime YYYY-MM-DD HH:MM:SS>" "<search windows end datetime YYYY-MM-DD HH:MM:SS>" <query name> <splunk host> <splunk hec token>
#python loginsights_connector.py "2018-10-08 07:00:00" "2018-11-08 09:46:08" vpcMetricBin "my.splunk.host.com" 35b5a4b4-0726-4dbe-984b-359b79051377
#python loginsights_connector.py "2018-10-08 07:00:00" "2018-11-08 09:46:08" vpcMetricSrc "my.splunk.host.com" 35b5a4b4-0726-4dbe-984b-359b79051377
#python loginsights_connector.py "2018-10-08 07:00:00" "2018-11-08 09:46:08" vpcMetricSrcBin "my.splunk.host.com" 35b5a4b4-0726-4dbe-984b-359b79051377
#python loginsights_connector.py "2018-10-08 07:00:00" "2018-11-08 09:46:08" vpcMetricDstBin "my.splunk.host.com" 35b5a4b4-0726-4dbe-984b-359b79051377
# time must ALWAYS be formatted like so: <YYYY-MM-DD HH:MM:SS>
# Example: 2018-12-31 24:59:59
####################################################################################################
# Executable Code
####################################################################################################
def sign(key, msg):
return hmac.new(key, msg.encode('utf-8'), hashlib.sha256).digest()
def getSignatureKey(key, dateStamp, regionName, serviceName):
kDate = sign(('AWS4' + key).encode('utf-8'), dateStamp)
kRegion = sign(kDate, regionName)
kService = sign(kRegion, serviceName)
kSigning = sign(kService, 'aws4_request')
return kSigning
def create_canonical_request(content_type, host, amzdate, amz_target, header_names, request_parameters):
canonical_uri = '/'
canonical_querystring = ''
canonical_headers = 'content-type:%s\ndate:\nhost:%s\nuser-agent:python-requests/2.18.4\nx-amz-date:%s\nx-amz-target:%s\nx-amzn-requestid:\n' % \
(content_type, host, amzdate, amz_target)
payload_hash = hashlib.sha256(request_parameters.encode('utf-8')).hexdigest()
canonical_request = 'POST' + '\n' + canonical_uri + '\n' + canonical_querystring + '\n' + canonical_headers + '\n' + header_names + '\n' + payload_hash
return canonical_request
def create_and_sign_request_headers(region, service, host, content_type, header_names, query_target, request_parameters):
# create the canonical request here
time_now = datetime.datetime.utcnow()
amzdate = time_now.strftime('%Y%m%dT%H%M%SZ')
canonical_request = create_canonical_request(
content_type, host, amzdate, query_target, header_names, request_parameters)
# create string to sign
datestamp = time_now.strftime('%Y%m%d')
credential_scope = datestamp + '/' + region + '/' + service + '/' + 'aws4_request'
string_to_sign = algorithm + '\n' + amzdate + '\n' + credential_scope + '\n' + hashlib.sha256(canonical_request.encode('utf-8')).hexdigest()
# sign the string
signing_key = getSignatureKey(secret_key, datestamp, region, service)
signature = hmac.new(signing_key, (string_to_sign).encode('utf-8'), hashlib.sha256).hexdigest()
# return the signed auth header
signed_auth_header = algorithm + ' ' + 'Credential=' + access_key + '/' + credential_scope + ', ' + 'SignedHeaders=' + header_names + ', ' + 'Signature=' + signature
return {
'Content-Type': content_type,
'X-Amz-Date': amzdate,
'X-Amz-Target': query_target,
'Authorization': signed_auth_header
}
def send_request(endpoint, request_parameters, headers):
r = requests.post(endpoint, data=request_parameters, headers=headers)
response_text = r.text
return response_text
def schedule_query(start_time, end_time, query_name, query_target):
schedule_query_request_parameters = json.dumps({
"limit": 100,
"logGroupName": VALID_LOG_GROUP_NAMES[query_name],
"startTime": start_time,
"endTime": end_time,
"queryString": VALID_QUERIES[query_name]
})
headers = create_and_sign_request_headers(REGION, SERVICE, HOST, CONTENT_TYPE, HEADER_NAMES,
query_target, schedule_query_request_parameters)
response_text = send_request(ENDPOINT, schedule_query_request_parameters, headers)
return json.loads(response_text)['queryId']
def retrieve_query(query_id, query_target):
print("query_id is: %s" % query_id)
retrieve_query_request_parameters = json.dumps({
"queryId": query_id
})
headers = create_and_sign_request_headers(REGION, SERVICE, HOST, CONTENT_TYPE, HEADER_NAMES,
query_target, retrieve_query_request_parameters)
response_text = send_request(ENDPOINT, retrieve_query_request_parameters, headers)
try:
return json.loads(response_text)
except Exception as e:
print("%s" % e)
return "{\"results\": []}"
def retrieve_command_line_args():
if access_key is None or secret_key is None:
print('Exiting because no access key is available.')
sys.exit()
if len(sys.argv) < 3:
print('\nNot enough info to continue: please specify startTime, endTime, and query.\n'
'Run the command like this: python loginsights_connector.py <startTme> <end_time> <query_name> <hec_token> <hec_hos>\n')
sys.exit()
# this must be formatted into seconds since the epoch
# time is ALWAYS formatted like so: 2018-12-31 24:59:59
start_time_obj = datetime.datetime.strptime(sys.argv[1], "%Y-%m-%d %H:%M:%S")
start_secs_since_epoch = int((start_time_obj - datetime.datetime(1970, 1, 1)).total_seconds())
end_time_obj = datetime.datetime.strptime(sys.argv[2], "%Y-%m-%d %H:%M:%S")
end_secs_since_epoch = int((end_time_obj - datetime.datetime(1970, 1, 1)).total_seconds())
query_name = sys.argv[3]
hec_host = sys.argv[4]
hec_token = sys.argv[5]
return start_secs_since_epoch, end_secs_since_epoch, query_name, hec_host, hec_token
start_secs, end_secs, query_name, hec_host, hec_token = retrieve_command_line_args()
#send the query and retrieve the id, based on the query name
if query_name in VALID_QUERIES.keys():
query_id = schedule_query(start_secs, end_secs, query_name, SCHEDULE_QUERY_TARGET)
time.sleep(1)
retry_count = 0
previous_response_length = 0
response_json = retrieve_query(query_id, GET_QUERY_RESULTS_TARGET)
while((previous_response_length != len(response_json['results'])) or len(response_json['results']) == 0):
time.sleep(5)
previous_response_length = len(response_json['results'])
print("Re-fetching results because results are still being computed: call %s, elapsed time: %ss" % (retry_count, retry_count*5))
response_json = retrieve_query(query_id, GET_QUERY_RESULTS_TARGET)
print(" Just fetched %s results" % len(response_json['results']))
retry_count = retry_count + 1
if retry_count >= 6:
print("Hit max retries of %s, returning Amazon's response now" % retry_count)
break
#ingest the results to splunk using ingest.py
ingest.persist_to_metric_store(response_json, hec_host, hec_token)
file = open("output.json", 'w')
file.write(json.dumps(response_json))
file.close()
else:
print("unknown query '%s', please specify one of these: %s" % (query_name, VALID_QUERIES.keys()))
Below is the contents of ingest.py:
import requests
import json
import copy
import time
def get_splunk_auth_header(token):
return {'Authorization': 'Splunk %s' % token}
def get_splunk_hec_url(hec_host):
return 'https://%s:8088/services/collector/event' % (hec_host)
def report_to_splunk(data, hec_host, hec_token):
batch_data = ''
if type(data) is list:
for d in data:
batch_data = batch_data+json.dumps(d)
else:
batch_data = json.dumps(data)
resp = requests.post(
get_splunk_hec_url(hec_host),
headers=get_splunk_auth_header(hec_token),
data= batch_data,
verify=False
)
print resp.json()
def parse_to_hec_format(event, hec_host):
json= {
'host': hec_host,
'source': 'loginsights_connector.py',
'fields': {}
}
metadata = {}
p='%Y-%m-%d %H:%M:%S.000'
for f in event:
if not ( '(' in f['field'] or f['field'].startswith('bin')):
metadata[f['field']] = f['value']
elif f['field'].startswith('bin'):
json['time'] = int(time.mktime(time.strptime(f['value'],p))
json['fields'].update(metadata)
result = []
for f in event:
parsed_json={}
if '(' in f['field']:
json['fields']['metric_name'] = f['field']
json['fields']['_value'] = f['value']
parsed_json.update(json)
result.append(copy.deepcopy(parsed_json))
return result
def persist_to_metric_store(data, hec_host, hec_token):
batch_metric=[]
for event in data['results']:
batch_metric.append(parse_to_hec_format(event, hec_host))
report_to_splunk(batch_metric, hec_host, hec_token)
----------------------------------------------------
Thanks!
Andi Mann