-
Notifications
You must be signed in to change notification settings - Fork 5
/
lambda.js
142 lines (120 loc) · 4.81 KB
/
lambda.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
// require the UUID module to generate unique identifiers
const { v4: uuidv4 } = require('uuid');
// require the failure-lambda wrapper to support Chaos Engineering
const failureLambda = require('failure-lambda');
// require the AWS SDK to communicate with S3 and DynamoDB
var AWS = require('aws-sdk');
// require json2csv to parse the JSON files published to S3
const { parse } = require('json2csv');
var lambdaInstanceId = uuidv4();
const fields = ['objectName', 'submissionDate', 'author', 'formatVersion'];
const opts = { "fields": fields };
var s3 = new AWS.S3();
var ddb = new AWS.DynamoDB.DocumentClient();
var cloudwatch = new AWS.CloudWatch();
var chaosDataTable = process.env.CHAOS_DATA_TABLE;
exports.handler = failureLambda(async(event, context, callback) => {
console.log('Lambda instance ID:', lambdaInstanceId, ': v1 ETL processor handling event', JSON.stringify(event));
var s3Event = JSON.parse(event.Records[0].body);
console.log('Extracted S3 event', JSON.stringify(s3Event));
// retrieve key fields from the S3 event object
var srcBucket = s3Event.Records[0].s3.bucket.name;
// Object key may have spaces or unicode non-ASCII characters.
var srcKey = decodeURIComponent(s3Event.Records[0].s3.object.key.replace(/\+/g, " "));
var dstBucket = srcBucket;
var dstKey = "output/" + srcKey.replace(/input\//g, "") + ".csv";
console.log("Reading JSON file", srcKey, "in bucket", srcBucket);
const data = await s3.getObject({ Bucket: srcBucket, Key: srcKey }).promise();
var jsonData = JSON.parse(data.Body.toString('utf-8'));
console.log("Retrieved JSON data:", jsonData);
// Update the database with the latest summary of the symbol
let params = {
TableName: chaosDataTable,
Key: {
"symbol": jsonData.symbol,
"entryType": "latest"
},
UpdateExpression: "ADD updateCount :i, symbolValue :v SET lastMessage = :mid",
ExpressionAttributeValues: {
":mid": jsonData.messageId,
":v": jsonData.value,
":i": 1
},
ReturnValues: "UPDATED_NEW"
};
ddb.update(params, function(err, data) {
if (err) {
console.error("Unable to update", jsonData.symbol, "aggregate record in DynamoDB, Error JSON:", JSON.stringify(err, null, 2));
}
else {
console.log("Updated DynamoDB for", jsonData.symbol, ":", JSON.stringify(data, null, 2));
}
});
// record the individual record
var dateStr = (new Date()).toISOString();
params = {
TableName: chaosDataTable,
Key: {
"symbol": jsonData.symbol,
"entryType": dateStr + "#" + jsonData.messageId
},
UpdateExpression: "SET symbolValue =:v, processingTimestamp = :d, messageId = :mid",
ExpressionAttributeValues: {
":mid": jsonData.messageId,
":v": jsonData.value,
":d": dateStr
},
ReturnValues: "UPDATED_NEW"
};
ddb.update(params, function(err, data) {
if (err) {
console.error("Unable to record message ID in DynamoDB, Error JSON:", JSON.stringify(err, null, 2));
}
else {
console.log("Recorded", jsonData.symbol, "message ID", jsonData.messageId, "in DynamoDB", JSON.stringify(data, null, 2));
var cwParams = {
MetricData: [{
MetricName: 'SymbolWriteCount',
Dimensions: [{
Name: 'DynamoDBTable',
Value: chaosDataTable
}],
StorageResolution: 1,
Timestamp: new Date(),
Unit: 'Count',
Value: 1,
}],
Namespace: 'ChaosTransformer'
};
cloudwatch.putMetricData(cwParams, function(err, data) {
if (err) {
console.log("Error logging custom metrics:", err, err.stack);
}
else {
console.log("Successfully logged custom metric update:", data);
}
});
}
});
/**
* Perform the ETL and write the converted data to S3
*/
try {
const csvData = parse(jsonData, opts);
console.log("Parsed CSV data from JSON:", csvData);
const result = await s3.putObject({ Bucket: dstBucket, Key: dstKey, Body: csvData, ContentType: 'text/csv' }).promise();
}
catch (err) {
console.error(err);
callback(err);
}
/**
* Respond completion back to the Lambda systems
* */
const response = {
statusCode: 200,
body: JSON.stringify('Input conversion complete')
};
console.log("ETL processer completed processing of", srcKey, "in bucket", srcBucket);
return response;
});