-
Notifications
You must be signed in to change notification settings - Fork 1
/
index.js
88 lines (78 loc) · 2.25 KB
/
index.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
'use strict';
class MessageValidationError extends Error {
constructor(message, details) {
super(message);
this.name = this.constructor.name;
this.details = details;
if (typeof Error.captureStackTrace === 'function') {
Error.captureStackTrace(this, this.constructor);
} else {
this.stack = (new Error(message)).stack;
}
}
}
function processResults(results) {
const total = results.length;
const succeeded = results.reduce((acc, result) => {
return result.isError ? acc : (acc + 1)
}, 0);
const failed = total - succeeded;
return Promise.resolve({
total,
succeeded,
failed,
results
});
}
function processJsonMessages(sqs, sqsParams, consumeMessage) {
return sqs.receiveMessage(sqsParams).promise()
.then(data => {
if (!data.Messages) {
return processResults([]);
}
const messagePromises = data.Messages.map(rawMessage => {
if (!rawMessage.ReceiptHandle || !rawMessage.Body) {
return Promise.resolve({
isError: true,
result: new MessageValidationError('Failed to validate SQS message. Raw delivery setting should be enabled.', rawMessage)
});
}
let message;
try {
message = JSON.parse(rawMessage.Body);
} catch(error) {
return Promise.resolve({
isError: true,
result: new MessageValidationError(`Failed to parse SQS message. All messages should be JSON`, {
error,
rawMessage
})
});
}
let messageConsumeResult;
return consumeMessage(message)
.then(result => {
messageConsumeResult = result;
return sqs.deleteMessage({
QueueUrl: sqsParams.QueueUrl,
ReceiptHandle: rawMessage.ReceiptHandle
}).promise();
})
.then(() => Promise.resolve({
isError: false,
result: messageConsumeResult
}))
.catch(err => Promise.resolve({
isError: true,
result: err
}));
});
return Promise.all(messagePromises).then(processResults);
});
}
module.exports = {
processJsonMessages,
errors: {
MessageValidationError
}
};