|
// v1.1.2 |
|
// |
|
// this lambda is the one automatically created by AWS |
|
// when creating a CWL to ES stream using the AWS Console. |
|
// I just added the `endpoint` variable handling. |
|
// |
|
var https = require('https'); |
|
var zlib = require('zlib'); |
|
var crypto = require('crypto'); |
|
|
|
const ENV = process.env; |
|
|
|
var endpoint = ENV.es_endpoint; |
|
|
|
exports.handler = function(input, context) { |
|
// decode input from base64 |
|
var zippedInput = new Buffer(input.awslogs.data, 'base64'); |
|
|
|
// decompress the input |
|
zlib.gunzip(zippedInput, function(error, buffer) { |
|
if (error) { context.fail(error); return; } |
|
|
|
// parse the input from JSON |
|
var awslogsData = JSON.parse(buffer.toString('utf8')); |
|
|
|
// transform the input to Elasticsearch documents |
|
var elasticsearchBulkData = transform(awslogsData); |
|
|
|
// skip control messages |
|
if (!elasticsearchBulkData) { |
|
console.log('Received a control message'); |
|
context.succeed('Control message handled successfully'); |
|
return; |
|
} |
|
|
|
// post documents to the Amazon Elasticsearch Service |
|
post(elasticsearchBulkData, function(error, success, statusCode, failedItems) { |
|
console.log('Response: ' + JSON.stringify({ |
|
"statusCode": statusCode |
|
})); |
|
|
|
if (error) { |
|
console.log('Error: ' + JSON.stringify(error, null, 2)); |
|
|
|
if (failedItems && failedItems.length > 0) { |
|
console.log("Failed Items: " + |
|
JSON.stringify(failedItems, null, 2)); |
|
} |
|
|
|
context.fail(JSON.stringify(error)); |
|
} else { |
|
console.log('Success: ' + JSON.stringify(success)); |
|
context.succeed('Success'); |
|
} |
|
}); |
|
}); |
|
}; |
|
|
|
function transform(payload) { |
|
if (payload.messageType === 'CONTROL_MESSAGE') { |
|
return null; |
|
} |
|
|
|
var bulkRequestBody = ''; |
|
|
|
payload.logEvents.forEach(function(logEvent) { |
|
var timestamp = new Date(1 * logEvent.timestamp); |
|
|
|
// index name format: cwl-YYYY.MM.DD |
|
var indexName = [ |
|
'cwl-' + timestamp.getUTCFullYear(), // year |
|
('0' + (timestamp.getUTCMonth() + 1)).slice(-2), // month |
|
('0' + timestamp.getUTCDate()).slice(-2) // day |
|
].join('.'); |
|
|
|
var source = buildSource(logEvent.message, logEvent.extractedFields); |
|
source['@id'] = logEvent.id; |
|
source['@timestamp'] = new Date(1 * logEvent.timestamp).toISOString(); |
|
source['@message'] = logEvent.message; |
|
source['@owner'] = payload.owner; |
|
source['@log_group'] = payload.logGroup; |
|
source['@log_stream'] = payload.logStream; |
|
|
|
var action = { "index": {} }; |
|
action.index._index = indexName; |
|
action.index._type = payload.logGroup; |
|
action.index._id = logEvent.id; |
|
|
|
bulkRequestBody += [ |
|
JSON.stringify(action), |
|
JSON.stringify(source), |
|
].join('\n') + '\n'; |
|
}); |
|
return bulkRequestBody; |
|
} |
|
|
|
function buildSource(message, extractedFields) { |
|
if (extractedFields) { |
|
var source = {}; |
|
|
|
for (var key in extractedFields) { |
|
if (extractedFields.hasOwnProperty(key) && extractedFields[key]) { |
|
var value = extractedFields[key]; |
|
|
|
if (isNumeric(value)) { |
|
source[key] = 1 * value; |
|
continue; |
|
} |
|
|
|
jsonSubString = extractJson(value); |
|
if (jsonSubString !== null) { |
|
source['$' + key] = JSON.parse(jsonSubString); |
|
} |
|
|
|
source[key] = value; |
|
} |
|
} |
|
return source; |
|
} |
|
|
|
jsonSubString = extractJson(message); |
|
if (jsonSubString !== null) { |
|
return JSON.parse(jsonSubString); |
|
} |
|
|
|
return {}; |
|
} |
|
|
|
function extractJson(message) { |
|
var jsonStart = message.indexOf('{'); |
|
if (jsonStart < 0) return null; |
|
var jsonSubString = message.substring(jsonStart); |
|
return isValidJson(jsonSubString) ? jsonSubString : null; |
|
} |
|
|
|
function isValidJson(message) { |
|
try { |
|
JSON.parse(message); |
|
} catch (e) { return false; } |
|
return true; |
|
} |
|
|
|
function isNumeric(n) { |
|
return !isNaN(parseFloat(n)) && isFinite(n); |
|
} |
|
|
|
function post(body, callback) { |
|
var requestParams = buildRequest(endpoint, body); |
|
|
|
var request = https.request(requestParams, function(response) { |
|
var responseBody = ''; |
|
response.on('data', function(chunk) { |
|
responseBody += chunk; |
|
}); |
|
response.on('end', function() { |
|
var info = JSON.parse(responseBody); |
|
var failedItems; |
|
var success; |
|
|
|
if (response.statusCode >= 200 && response.statusCode < 299) { |
|
failedItems = info.items.filter(function(x) { |
|
return x.index.status >= 300; |
|
}); |
|
|
|
success = { |
|
"attemptedItems": info.items.length, |
|
"successfulItems": info.items.length - failedItems.length, |
|
"failedItems": failedItems.length |
|
}; |
|
} |
|
|
|
var error = response.statusCode !== 200 || info.errors === true ? { |
|
"statusCode": response.statusCode, |
|
"responseBody": responseBody |
|
} : null; |
|
|
|
callback(error, success, response.statusCode, failedItems); |
|
}); |
|
}).on('error', function(e) { |
|
callback(e); |
|
}); |
|
request.end(requestParams.body); |
|
} |
|
|
|
function buildRequest(endpoint, body) { |
|
var endpointParts = endpoint.match(/^([^\.]+)\.?([^\.]*)\.?([^\.]*)\.amazonaws\.com$/); |
|
var region = endpointParts[2]; |
|
var service = endpointParts[3]; |
|
var datetime = (new Date()).toISOString().replace(/[:\-]|\.\d{3}/g, ''); |
|
var date = datetime.substr(0, 8); |
|
var kDate = hmac('AWS4' + process.env.AWS_SECRET_ACCESS_KEY, date); |
|
var kRegion = hmac(kDate, region); |
|
var kService = hmac(kRegion, service); |
|
var kSigning = hmac(kService, 'aws4_request'); |
|
|
|
var request = { |
|
host: endpoint, |
|
method: 'POST', |
|
path: '/_bulk', |
|
body: body, |
|
headers: { |
|
'Content-Type': 'application/json', |
|
'Host': endpoint, |
|
'Content-Length': Buffer.byteLength(body), |
|
'X-Amz-Security-Token': process.env.AWS_SESSION_TOKEN, |
|
'X-Amz-Date': datetime |
|
} |
|
}; |
|
|
|
var canonicalHeaders = Object.keys(request.headers) |
|
.sort(function(a, b) { return a.toLowerCase() < b.toLowerCase() ? -1 : |
|
1; }) |
|
.map(function(k) { return k.toLowerCase() + ':' + request.headers[k]; }) |
|
.join('\n'); |
|
|
|
var signedHeaders = Object.keys(request.headers) |
|
.map(function(k) { return k.toLowerCase(); }) |
|
.sort() |
|
.join(';'); |
|
|
|
var canonicalString = [ |
|
request.method, |
|
request.path, '', |
|
canonicalHeaders, '', |
|
signedHeaders, |
|
hash(request.body, 'hex'), |
|
].join('\n'); |
|
|
|
var credentialString = [ date, region, service, 'aws4_request' ].join('/'); |
|
|
|
var stringToSign = [ |
|
'AWS4-HMAC-SHA256', |
|
datetime, |
|
credentialString, |
|
hash(canonicalString, 'hex') |
|
] .join('\n'); |
|
|
|
request.headers.Authorization = [ |
|
'AWS4-HMAC-SHA256 Credential=' + process.env.AWS_ACCESS_KEY_ID + '/' + credentialString, |
|
'SignedHeaders=' + signedHeaders, |
|
'Signature=' + hmac(kSigning, stringToSign, 'hex') |
|
].join(', '); |
|
|
|
return request; |
|
} |
|
|
|
function hmac(key, str, encoding) { |
|
return crypto.createHmac('sha256', key).update(str, |
|
'utf8').digest(encoding); |
|
} |
|
|
|
function hash(str, encoding) { |
|
return crypto.createHash('sha256').update(str, 'utf8').digest(encoding); |
|
} |