-
Notifications
You must be signed in to change notification settings - Fork 0
/
index.js
105 lines (88 loc) · 3.25 KB
/
index.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
const assert = require('assert');
const { SQS } = require('@aws-sdk/client-sqs');
const { NodeHttpHandler } = require('@aws-sdk/node-http-handler');
const sqs = new SQS({
region: process.env.AWS_REGION || 'us-east-1',
apiVersion: '2012-11-05',
requestHandler: new NodeHttpHandler({
connectionTimeout: 5000,
}),
});
const DEFAULT_DYNAMO_EVENT_NAMES = ['INSERT', 'REMOVE', 'MODIFY'];
const RAW_BODY_HANDLER = record => record;
const NO_FILTER = () => true;
class DynamoStreamHandler {
constructor({ sqsConfigs, logger, customBodyHandler, messageFilter, logPayloadTransformer } = {}) {
assert(
sqsConfigs && Array.isArray(sqsConfigs) && sqsConfigs.length > 0,
'sqsConfig must be an array with at least one element',
);
this.sqsConfigs = sqsConfigs;
this.logger = logger ? logger : new ConsoleLogger();
const params = this;
this.sqsConfigs.forEach(setEventNames);
this.logger.info(`Creating dynamo-to-sqs`);
this.sqsConfigs.forEach(sqsConfig =>
params.logger.info(`SQS Endpoint ${sqsConfig.endpoint} | Event Names: ${sqsConfig.eventNames}`),
);
assert(
!customBodyHandler || {}.toString.call(customBodyHandler) === '[object Function]',
'customBody must be a function',
);
this.bodyHandler = customBodyHandler ? customBodyHandler : RAW_BODY_HANDLER;
this.messageFilter = messageFilter ? messageFilter : NO_FILTER;
this.handler = async (event, context) => {
try {
const promises = event.Records.map(record => sendToSqs({ record, params, logPayloadTransformer }));
await Promise.all(promises);
return `Successfully processed ${event.Records.length} records.`;
} catch (err) {
params.logger.error({ err }, 'Failed processing records');
context.fail(err);
}
};
}
}
function setEventNames(sqsConfig) {
const { eventNames } = sqsConfig;
sqsConfig.eventNames = eventNames ? eventNames.map(name => name.toUpperCase()) : DEFAULT_DYNAMO_EVENT_NAMES;
assert(
sqsConfig.eventNames.every(x => DEFAULT_DYNAMO_EVENT_NAMES.includes(x)),
`Event Names must be in ${DEFAULT_DYNAMO_EVENT_NAMES}`,
);
}
async function sendToSqs({ record, params, logPayloadTransformer = (record) => record }) {
const message = params.bodyHandler(record);
const MessageBody = JSON.stringify(message);
try {
params.logger.info('DynamoDB Record: %j', logPayloadTransformer(record));
} catch (err) {
params.logger.error({ err }, 'Error logging DynamoDB record');
params.logger.info('DynamoDB Record: %j', record);
}
const promises = params.sqsConfigs.map(sqsConfig => {
const body = {
MessageBody,
QueueUrl: sqsConfig.endpoint,
};
if (!sqsConfig.eventNames.includes(record.eventName.toUpperCase())) {
params.logger.info(`Event not forwarded to SQS ${sqsConfig.endpoint}: Event Name ${record.eventName}`);
return;
}
if (!params.messageFilter({ ...message, sqsConfig })) {
params.logger.info(`DynamoDB message (${message}) filtered from SQS (${sqsConfig})`);
return;
}
return sqs.sendMessage(body);
});
return Promise.all(promises);
}
class ConsoleLogger {
info(msg) {
console.log(msg);
}
debug(obj, msg) {
console.error(obj, msg);
}
}
module.exports = DynamoStreamHandler;