From a2696196981d75b4f883cdde6b1e3123bc3bebe1 Mon Sep 17 00:00:00 2001 From: JianqinWang Date: Wed, 28 Feb 2018 10:44:02 -0800 Subject: [PATCH 1/3] ZENKO-15 ft: oplog tailer for MongoDB --- index.js | 2 + .../metadata/mongoclient/LogConsumer.js | 190 ++++++++++++++++++ 2 files changed, 192 insertions(+) create mode 100644 lib/storage/metadata/mongoclient/LogConsumer.js diff --git a/index.js b/index.js index 643f03e99..6672a8967 100644 --- a/index.js +++ b/index.js @@ -99,6 +99,8 @@ module.exports = { MongoClientInterface: require('./lib/storage/metadata/mongoclient/' + 'MongoClientInterface'), + LogConsumer: + require('./lib/storage/metadata/mongoclient/LogConsumer'), }, }, data: { diff --git a/lib/storage/metadata/mongoclient/LogConsumer.js b/lib/storage/metadata/mongoclient/LogConsumer.js new file mode 100644 index 000000000..b03aca99b --- /dev/null +++ b/lib/storage/metadata/mongoclient/LogConsumer.js @@ -0,0 +1,190 @@ +'use strict'; // eslint-disable-line + +const stream = require('stream'); +const MongoClient = require('mongodb').MongoClient; +const format = require('util').format; +const { Timestamp } = require('bson'); + +let lastEndID = undefined; + +const ops = { + i: 'put', + u: 'put', + d: 'delete', +}; + +class ListRecordStream extends stream.Transform { + constructor(logger) { + super({ objectMode: true }); + this.logger = logger; + this.hasStarted = false; + this.start = undefined; + this.end = undefined; + this.lastUniqID = undefined; + // this.unpublishedListing is true once we pass the oplog that has the + // start seq timestamp and uniqID 'h' + this.unpublishedListing = undefined; + } + + _transform(itemObj, encoding, callback) { + if (!itemObj) { + this.push(null); + this.emit('info', { + start: this.start, + end: this.end, + uniqID: this.lastUniqID, + }); + return callback(); + } + + // always update to most recent uniqID + this.lastUniqID = itemObj.h.toString(); + + if (this.end === undefined || itemObj.ts.toNumber() > this.end) { + this.end = itemObj.ts.toNumber(); + } + + // only push to stream unpublished objects + if (!this.unpublishedListing) { + if (lastEndID === itemObj.h.toString()) { + this.unpublishedListing = true; + } + return callback(); + } + + if (!this.hasStarted) { + this.hasStarted = true; + this.start = itemObj.ts.toNumber(); + this.emit('info', { + start: this.start, + end: this.end, + uniqId: this.lastUniqID, + }); + } + + // don't push oplogs that have already been sent + if (!this.unpublishedListing) { + return callback(); + } + + const dbName = itemObj.ns.split('.'); + const streamObject = { + timestamp: new Date(itemObj.ts.high_ * 1000), + db: dbName[1], + entries: [ + { + type: ops[itemObj.op], + key: itemObj.o._id, + value: JSON.stringify(itemObj.o.value), + }, + ], + }; + return callback(null, streamObject); + } + + _flush(callback) { + this.emit('info', { + start: this.start, + end: this.end, + uniqID: this.lastUniqID, + }); + this.push(null); + callback(); + } +} + +/** + * @class + * @classdesc Class to consume mongo oplog + */ +class LogConsumer { + + /** + * @constructor + * + * @param {object} mongoConfig - object with the mongo configuration + * @param {string} logger - logger + */ + constructor(mongoConfig, logger) { + const { host } = mongoConfig; + // 'local' is the database where MongoDB has oplogs.rs capped collection + this.database = 'local'; + this.mongoUrl = format( + 'mongodb://%s/local', + host); + this.logger = logger; + } + + /** + * Connect to MongoClient using Mongo node module to access database and + * database oplogs (operation logs) + * + * @param {function} done - callback function, called with an error object + * or null and an object as 2nd parameter + * @return {undefined} + */ + connectMongo(done) { + MongoClient.connect(this.mongoUrl, { replicaSet: 'rs0' }, + (err, client) => { + if (err) { + this.logger.error('Unable to connect to MongoDB', + { error: err }); + return done(err); + } + this.logger.info('connected to mongodb'); + this.client = client; + this.db = client.db(this.database, { + ignoreUndefined: true, + }); + return done(); + }); + } + /** + * Read a series of log records from mongo + * + * @param {Object} [params] - params object + * @param {String} [params.startSeq] - fetch starting from this + * sequence number + * @param {Number} [params.limit] - maximum number of log records + * to return + * @param {function} cb - callback function, called with an error + * object or null and an object as 2nd parameter + * + * @return {undefined} + */ + readRecords(params, cb) { + const recordStream = new ListRecordStream(this.logger); + const limit = params.limit || 10000; + const startIDandSeq = params.startSeq.toString().split('|'); + const startSeq = parseInt(startIDandSeq[0], 10) || 0; + lastEndID = startIDandSeq[1]; + + this.coll = this.db.collection('oplog.rs'); + return this.coll.find({ + ns: /^(?!.*metadata.*(?:__)).*metadata\.\w+.*/, + ts: { $gte: Timestamp.fromNumber(startSeq) }, + }, { + limit, + tailable: false, + awaitData: false, + noCursorTimeout: true, + OplogReplay: true, + numberOfRetries: Number.MAX_VALUE, + }, (err, res) => { + const stream = res.stream(); + stream.on('data', data => { + recordStream.write(data); + }); + stream.on('end', () => { + recordStream.write(undefined); + }); + recordStream.once('info', info => { + recordStream.removeAllListeners('error'); + cb(null, { info, log: recordStream }); + }); + return undefined; + }); + } +} + +module.exports = LogConsumer; From fa19fc8859472e7e8dbaec27edda2ca9e5b0a0e2 Mon Sep 17 00:00:00 2001 From: JianqinWang Date: Mon, 5 Mar 2018 17:53:58 -0800 Subject: [PATCH 2/3] rf: name change for replica set hosts --- lib/storage/metadata/mongoclient/LogConsumer.js | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/lib/storage/metadata/mongoclient/LogConsumer.js b/lib/storage/metadata/mongoclient/LogConsumer.js index b03aca99b..c539bb637 100644 --- a/lib/storage/metadata/mongoclient/LogConsumer.js +++ b/lib/storage/metadata/mongoclient/LogConsumer.js @@ -106,12 +106,12 @@ class LogConsumer { * @param {string} logger - logger */ constructor(mongoConfig, logger) { - const { host } = mongoConfig; + const { replicaSetHosts } = mongoConfig; // 'local' is the database where MongoDB has oplogs.rs capped collection this.database = 'local'; this.mongoUrl = format( 'mongodb://%s/local', - host); + replicaSetHosts); this.logger = logger; } From a9a6b2433d2f2d2eeba91747e64745e6e2a1efdb Mon Sep 17 00:00:00 2001 From: JianqinWang Date: Wed, 7 Mar 2018 19:34:57 -0800 Subject: [PATCH 3/3] rf: remove use of util.format --- lib/storage/metadata/mongoclient/LogConsumer.js | 5 +---- lib/storage/metadata/mongoclient/MongoClientInterface.js | 7 ++----- 2 files changed, 3 insertions(+), 9 deletions(-) diff --git a/lib/storage/metadata/mongoclient/LogConsumer.js b/lib/storage/metadata/mongoclient/LogConsumer.js index c539bb637..92a18f13d 100644 --- a/lib/storage/metadata/mongoclient/LogConsumer.js +++ b/lib/storage/metadata/mongoclient/LogConsumer.js @@ -2,7 +2,6 @@ const stream = require('stream'); const MongoClient = require('mongodb').MongoClient; -const format = require('util').format; const { Timestamp } = require('bson'); let lastEndID = undefined; @@ -109,9 +108,7 @@ class LogConsumer { const { replicaSetHosts } = mongoConfig; // 'local' is the database where MongoDB has oplogs.rs capped collection this.database = 'local'; - this.mongoUrl = format( - 'mongodb://%s/local', - replicaSetHosts); + this.mongoUrl = `mongodb://${replicaSetHosts}/local`; this.logger = logger; } diff --git a/lib/storage/metadata/mongoclient/MongoClientInterface.js b/lib/storage/metadata/mongoclient/MongoClientInterface.js index 2f97f4500..5d40fcd09 100644 --- a/lib/storage/metadata/mongoclient/MongoClientInterface.js +++ b/lib/storage/metadata/mongoclient/MongoClientInterface.js @@ -11,7 +11,6 @@ */ const async = require('async'); -const format = require('util').format; const constants = require('../../../constants'); const errors = require('../../../errors'); @@ -82,10 +81,8 @@ class MongoClientInterface { constructor(params) { const { replicaSetHosts, path, database, logger, replicationGroupId, replicaSet } = params; - this.mongoUrl = format( - 'mongodb://%s/?replicaSet=%s', - replicaSetHosts, - replicaSet); + this.mongoUrl = `mongodb://${replicaSetHosts}/?replicaSet=` + + `${replicaSet}`; this.logger = logger; this.client = null; this.db = null;