diff --git a/lib/storage/metadata/mongoclient/LogConsumer.js b/lib/storage/metadata/mongoclient/LogConsumer.js index 076bb63e7..a87108161 100644 --- a/lib/storage/metadata/mongoclient/LogConsumer.js +++ b/lib/storage/metadata/mongoclient/LogConsumer.js @@ -4,8 +4,6 @@ const stream = require('stream'); const MongoClient = require('mongodb').MongoClient; const { Timestamp } = require('bson'); -let lastEndID = null; - const ops = { i: 'put', u: 'put', @@ -13,9 +11,10 @@ const ops = { }; class ListRecordStream extends stream.Transform { - constructor(logger) { + constructor(logger, lastEndID) { super({ objectMode: true }); this.logger = logger; + this.lastEndID = lastEndID; this.hasStarted = false; this.start = null; this.end = null; @@ -45,10 +44,9 @@ class ListRecordStream extends stream.Transform { // only push to stream unpublished objects if (!this.unpublishedListing) { - if (lastEndID === itemObj.h.toString()) { + if (this.lastEndID === itemObj.h.toString()) { this.unpublishedListing = true; } - return callback(); } if (!this.hasStarted) { @@ -105,11 +103,12 @@ class LogConsumer { * @param {string} logger - logger */ constructor(mongoConfig, logger) { - const { replicaSetHosts } = mongoConfig; + const { replicaSetHosts, database } = mongoConfig; // 'local' is the database where MongoDB has oplogs.rs capped collection this.database = 'local'; this.mongoUrl = `mongodb://${replicaSetHosts}/local`; this.logger = logger; + this.metadataDatabase = database; } /** @@ -150,15 +149,18 @@ class LogConsumer { * @return {undefined} */ readRecords(params, cb) { - const recordStream = new ListRecordStream(this.logger); const limit = params.limit || 10000; const startIDandSeq = params.startSeq.toString().split('|'); const startSeq = parseInt(startIDandSeq[0], 10) || 0; - lastEndID = startIDandSeq[1]; + const lastEndID = startIDandSeq[1]; + const recordStream = new ListRecordStream(this.logger, lastEndID); + + const db = this.metadataDatabase; + const ns = new RegExp(`^(?!.*${db}.*(?:__)).*${db}\\.\\w+.*`); this.coll = this.db.collection('oplog.rs'); return this.coll.find({ - ns: /^(?!.*metadata.*(?:__)).*metadata\.\w+.*/, + ns, ts: { $gte: Timestamp.fromNumber(startSeq) }, }, { limit,