forked from montumodi/mongodb-change-streams
-
Notifications
You must be signed in to change notification settings - Fork 0
/
mongo-to-elasticsearch.js
40 lines (36 loc) · 1.45 KB
/
mongo-to-elasticsearch.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
const {getUpsertChangeStream, getDeleteChangeStream} = require("./change-identifier");
const {saveResumeTaken} = require("./token-provider");
const client = require("./elastic-client");
(async () => {
const upsertChangeStream = await getUpsertChangeStream();
upsertChangeStream.on("change", async change => {
console.log("Pushing data to elasticsearch with id", change.fullDocument._id);
change.fullDocument.id = change.fullDocument._id;
Reflect.deleteProperty(change.fullDocument, "_id");
const response = await client.index({
"id": change.fullDocument.id,
"index": "users",
"body": change.fullDocument,
"type": "doc"
});
console.log("document upserted successsfully with status code", response.statusCode);
await saveResumeTaken(change._id, "SOME_UPSERT_TOKEN_ID");
});
upsertChangeStream.on("error", error => {
console.error(error);
});
const deleteChangeStream = await getDeleteChangeStream();
deleteChangeStream.on("change", async change => {
console.log("Deleting data from elasticsearch with id", change.documentKey._id);
const response = await client.delete({
"id": change.documentKey._id,
"index": "users",
"type": "doc"
});
console.log("document deleted successsfully with status code", response.statusCode);
await saveResumeTaken(change._id, "SOME_DELETE_TOKEN_ID");
});
deleteChangeStream.on("error", error => {
console.error(error);
});
})();