From 8ac3cf554890808c88ccfc1f66c48acf57309e67 Mon Sep 17 00:00:00 2001 From: Jonathan Gramain Date: Wed, 17 Oct 2018 16:26:46 -0700 Subject: [PATCH 1/2] ft: ZENKO-1175 tailable cursor to consume mongo oplog Use a tailable custor to keep ordering guarantees for the records we read. This also means we have to read from the beginning when we reconnect (at startup), and start processing when we encountered the unique ID previously stored in zookeeper. Also removed dispatcher mode with MongoLogReader (was only used for the short-lived Federation deployment of Zenko). --- .../metadata/mongoclient/ListRecordStream.js | 126 ++++++++++--- .../metadata/mongoclient/LogConsumer.js | 84 +++++---- package.json | 2 +- .../mongoclient/ListRecordStream.spec.js | 178 ++++++++++-------- 4 files changed, 259 insertions(+), 131 deletions(-) diff --git a/lib/storage/metadata/mongoclient/ListRecordStream.js b/lib/storage/metadata/mongoclient/ListRecordStream.js index 78d9c2f29..c09773d55 100644 --- a/lib/storage/metadata/mongoclient/ListRecordStream.js +++ b/lib/storage/metadata/mongoclient/ListRecordStream.js @@ -1,33 +1,87 @@ const stream = require('stream'); -class ListRecordStream extends stream.Transform { - constructor(logger, lastEndID) { +/** + * @class ListRecordStream + * @classdesc Filter and stream records returned from a mongodb query + * cursor + */ +class ListRecordStream extends stream.Readable { + /** + * @constructor + * @param {mongodb.Cursor} mongoCursor - cursor returned by a + * mongodb query to the oplog (see + * http://mongodb.github.io/node-mongodb-native/2.0/api/Cursor.html) + * @param {werelogs.Logger} logger - logger object + * @param {string} lastSavedID - unique ID that has been persisted + * of the most recently processed entry in the oplog + * @param {string} latestOplogID - unique ID of the most recently + * added entry in the oplog + */ + constructor(mongoCursor, logger, lastSavedID, latestOplogID) { super({ objectMode: true }); + this._cursor = mongoCursor; this._logger = logger; - this._lastEndID = lastEndID; - this._lastTs = 0; + this._lastSavedID = lastSavedID; + this._latestOplogID = latestOplogID; this._lastUniqID = null; // this._unpublishedListing is true once we pass the oplog - // that has the start seq timestamp and uniqID 'h' + // record that has the same uniqID 'h' than last saved. If we + // don't find it (e.g. log rolled over before populator could + // process its oldest entries), we will restart from the + // latest record of the oplog. this._unpublishedListing = false; + // cf. this.getSkipCount() + this._skipCount = 0; } - _transform(itemObj, encoding, callback) { + _read() { + // MongoDB cursors provide a stream interface. We choose not + // to use it though because errors may not be emitted by the + // stream when there is an issue with the connection to + // MongoDB (especially when pause()/resume() are used). + // + // Instead we use the async cursor.next() call directly to + // fetch records one at a time, errors are then forwarded in + // the callback. + this._cursor.next((err, item) => { + if (err) { + this._logger.error('mongodb cursor error', { + method: 'mongoclient.ListRecordStream._read()', + error: err.message, + }); + this.emit('error', err); + return undefined; + } + if (this._processItem(item)) { + return process.nextTick(this._read.bind(this)); + } + // wait until _read() gets called again + return undefined; + }); + } + + _processItem(itemObj) { // always update to most recent uniqID this._lastUniqID = itemObj.h.toString(); - if (this._lastTs === null || itemObj.ts.toNumber() > this._lastTs) { - this._lastTs = itemObj.ts.toNumber(); - } - // only push to stream unpublished objects if (!this._unpublishedListing) { // When an oplog with a unique ID that is stored in the // log offset is found, all oplogs AFTER this is unpublished. - if (!this._lastEndID || this._lastEndID === itemObj.h.toString()) { + if (!this._lastSavedID || this._lastSavedID === this._lastUniqID) { + this._unpublishedListing = true; + } else if (this._latestOplogID === this._lastUniqID) { + this._logger.warn( + 'did not encounter the last saved offset in oplog, ' + + 'resuming processing right after the latest record ' + + 'to date; some entries may have been skipped', { + lastSavedID: this._lastSavedID, + latestRecordID: this._latestOplogID, + }); this._unpublishedListing = true; } - return callback(); + ++this._skipCount; + return true; // read next record } const dbName = itemObj.ns.split('.'); @@ -62,7 +116,8 @@ class ListRecordStream extends stream.Transform { } else { // skip other entry types as we don't need them for now // ('c', ...?) - return callback(); + ++this._skipCount; + return true; // read next record } const streamObject = { timestamp: new Date((itemObj.ts ? @@ -70,19 +125,44 @@ class ListRecordStream extends stream.Transform { db: dbName[1], entries: [entry], }; - return callback(null, streamObject); + // push object to the stream, then return false to wait until + // _read() is called again (because we are in an asynchronous + // context already) + this.push(streamObject); + return false; } - _flush(callback) { - this.emit('info', { - // store both the timestamp and unique oplog id in an - // opaque JSON string returned to the reader - end: JSON.stringify({ - ts: this._lastTs, - uniqID: this._lastUniqID, - }), + /** + * Get an opaque JSON blob containing the latest consumed offset + * from MongoDB oplog. + * + * @return {string} opaque JSON blob + */ + getOffset() { + return JSON.stringify({ + uniqID: this._lastUniqID, }); - callback(); + } + + /** + * Get the number of entries that have been read and skipped from + * MongoDB oplog since the ListRecordStream instance was created. + * + * @return {integer} number of skipped entries + */ + getSkipCount() { + return this._skipCount; + } + + /** + * Get whether the stream reached yet-unpublished records + * (i.e. after we reached either the saved unique ID, or the tip + * of the oplog) + * + * @return {boolean} true if we are now returning unpublished records + */ + reachedUnpublishedListing() { + return this._unpublishedListing; } } diff --git a/lib/storage/metadata/mongoclient/LogConsumer.js b/lib/storage/metadata/mongoclient/LogConsumer.js index ca8fcb8cf..56e6f7903 100644 --- a/lib/storage/metadata/mongoclient/LogConsumer.js +++ b/lib/storage/metadata/mongoclient/LogConsumer.js @@ -2,7 +2,6 @@ const MongoClient = require('mongodb').MongoClient; const ListRecordStream = require('./ListRecordStream'); -const { Timestamp } = require('bson'); /** * @class @@ -18,12 +17,11 @@ class LogConsumer { */ constructor(mongoConfig, logger) { const { replicaSetHosts, database } = mongoConfig; - // 'local' is the database where MongoDB has oplogs.rs capped collection - this.database = 'local'; - this.mongoUrl = `mongodb://${replicaSetHosts}/local`; - this.logger = logger; - this.metadataDatabase = database; - this.oplogNsRegExp = new RegExp(`^${database}\\.`); + this._mongoUrl = `mongodb://${replicaSetHosts}/local`; + this._logger = logger; + this._oplogNsRegExp = new RegExp(`^${database}\\.`); + // oplog collection + this._coll = null; } /** @@ -35,67 +33,87 @@ class LogConsumer { * @return {undefined} */ connectMongo(done) { - MongoClient.connect(this.mongoUrl, { replicaSet: 'rs0' }, + MongoClient.connect(this._mongoUrl, { + replicaSet: 'rs0', + useNewUrlParser: true, + }, (err, client) => { if (err) { - this.logger.error('Unable to connect to MongoDB', + this._logger.error('Unable to connect to MongoDB', { error: err }); return done(err); } - this.logger.info('connected to mongodb'); - this.client = client; - this.db = client.db(this.database, { + this._logger.info('connected to mongodb'); + // 'local' is the database where MongoDB has oplog.rs + // capped collection + const db = client.db('local', { ignoreUndefined: true, }); + this._coll = db.collection('oplog.rs'); return done(); }); } + /** - * Read a series of log records from mongo + * Open a tailable cursor to mongo oplog and retrieve a stream of + * records to read * * @param {Object} [params] - params object * @param {String} [params.startSeq] - fetch starting from this * opaque offset returned previously by mongo ListRecordStream * in an 'info' event - * @param {Number} [params.limit] - maximum number of log records - * to return * @param {function} cb - callback function, called with an error * object or null and an object as 2nd parameter * * @return {undefined} */ readRecords(params, cb) { - const limit = params.limit || 10000; - let startSeq = { ts: 0 }; + let startSeq = {}; if (params.startSeq) { try { // parse the opaque JSON string passed through from a // previous 'info' event startSeq = JSON.parse(params.startSeq); } catch (err) { - this.logger.error('malformed startSeq', { + this._logger.error('malformed startSeq', { startSeq: params.startSeq, }); // start over if malformed } } - const recordStream = new ListRecordStream(this.logger, startSeq.uniqID); + this._readLatestOplogID((err, latestOplogID) => { + if (err) { + return cb(err); + } + return this._coll.find({ + ns: this._oplogNsRegExp, + }, { + tailable: true, + awaitData: true, + noCursorTimeout: true, + numberOfRetries: Number.MAX_VALUE, + }, (err, cursor) => { + const recordStream = new ListRecordStream( + cursor, this._logger, startSeq.uniqID, latestOplogID); + return cb(null, { log: recordStream, tailable: true }); + }); + }); + } - this.coll = this.db.collection('oplog.rs'); - return this.coll.find({ - ns: this.oplogNsRegExp, - ts: { $gte: Timestamp.fromNumber(startSeq.ts) }, + _readLatestOplogID(cb) { + this._coll.find({ + ns: this._oplogNsRegExp, }, { - limit, - tailable: false, - awaitData: false, - noCursorTimeout: true, - oplogReplay: true, - numberOfRetries: Number.MAX_VALUE, - }, (err, res) => { - res.stream().pipe(recordStream); - recordStream.removeAllListeners('error'); - return cb(null, { log: recordStream }); + ts: 1, + }).sort({ + $natural: -1, + }).limit(1).toArray((err, data) => { + if (err) { + return cb(err); + } + const latestOplogID = data[0].h.toString(); + this._logger.debug('latest oplog ID read', { latestOplogID }); + return cb(null, latestOplogID); }); } } diff --git a/package.json b/package.json index 6eb90cfda..9b80d8c87 100644 --- a/package.json +++ b/package.json @@ -3,7 +3,7 @@ "engines": { "node": ">=8" }, - "version": "8.0.4", + "version": "8.0.5", "description": "Common utilities for the S3 project components", "main": "index.js", "repository": { diff --git a/tests/unit/storage/metadata/mongoclient/ListRecordStream.spec.js b/tests/unit/storage/metadata/mongoclient/ListRecordStream.spec.js index b0f0faf8f..9354970cf 100644 --- a/tests/unit/storage/metadata/mongoclient/ListRecordStream.spec.js +++ b/tests/unit/storage/metadata/mongoclient/ListRecordStream.spec.js @@ -161,6 +161,32 @@ const expectedStreamEntries = { }, }; +class MongoCursorMock { + constructor(itemsToYield, errorAtPos) { + this.itemsToYield = itemsToYield; + this.pos = 0; + this.errorAtPos = errorAtPos; + } + + next(cb) { + // if there's no more item, just hang out there waiting for + // items that will never come (this is how the real mongo + // tailable cursor would behave) + if (this.pos === this.errorAtPos) { + return process.nextTick(() => cb(new Error('boo'))); + } + if (!this.hasSentAllItems()) { + const pos = this.pos; + this.pos += 1; + return process.nextTick(() => cb(null, this.itemsToYield[pos])); + } + return undefined; + } + hasSentAllItems() { + return this.pos === this.itemsToYield.length; + } +} + describe('mongoclient.ListRecordStream', () => { const lastEndIDEntry = { h: -43, @@ -168,107 +194,111 @@ describe('mongoclient.ListRecordStream', () => { }; Object.keys(mongoProcessedLogEntries).forEach(entryType => { it(`should transform ${entryType}`, done => { - const lrs = new ListRecordStream(logger, + // first write will be ignored by ListRecordStream because + // of the last end ID (-42), it's needed though to bootstrap it + const cursor = new MongoCursorMock([ + lastEndIDEntry, + mongoProcessedLogEntries[entryType], + ]); + const lrs = new ListRecordStream(cursor, logger, lastEndIDEntry.h.toString()); - let dataReceived = false; - lrs.on('info', info => { - assert(dataReceived); - const parsedInfo = info; - parsedInfo.end = JSON.parse(parsedInfo.end); - assert.deepStrictEqual(parsedInfo, { - end: { ts: 42, uniqID: '-42' }, - }); - return done(); - }); + let hasReceivedData = false; lrs.on('data', entry => { + assert.strictEqual(hasReceivedData, false); + hasReceivedData = true; assert.deepStrictEqual(entry, expectedStreamEntries[entryType]); - dataReceived = true; + if (cursor.hasSentAllItems()) { + assert.strictEqual(hasReceivedData, true); + assert.deepStrictEqual(JSON.parse(lrs.getOffset()), + { uniqID: '-42' }); + done(); + } }); - // first write will be ignored by ListRecordStream because - // of the last end ID (-42), it's needed though to bootstrap it - lrs.write(lastEndIDEntry); - lrs.write(mongoProcessedLogEntries[entryType]); - lrs.end(); }); }); it('should ignore other entry types', done => { - const lrs = new ListRecordStream(logger, lastEndIDEntry.h.toString()); - let infoEmitted = false; - lrs.on('info', info => { - const parsedInfo = info; - parsedInfo.end = JSON.parse(parsedInfo.end); - assert.deepStrictEqual(parsedInfo, { - end: { ts: 42, uniqID: '-42' }, - }); - infoEmitted = true; - }); - lrs.on('data', entry => { - assert(false, `ListRecordStream did not ignore entry ${entry}`); - }); - lrs.on('end', () => { - assert(infoEmitted); - done(); - }); // first write will be ignored by ListRecordStream because // of the last end ID (-43), it's needed though to bootstrap it - lrs.write(lastEndIDEntry); + const logEntries = [lastEndIDEntry]; Object.keys(mongoIgnoredLogEntries).forEach(entryType => { - lrs.write(mongoIgnoredLogEntries[entryType]); + logEntries.push(mongoIgnoredLogEntries[entryType]); }); - lrs.end(); - }); - it('should emit info even if no entry consumed', done => { - const lrs = new ListRecordStream(logger, lastEndIDEntry.h.toString()); - let infoEmitted = false; - lrs.on('info', info => { - const parsedInfo = info; - parsedInfo.end = JSON.parse(parsedInfo.end); - assert.deepStrictEqual(parsedInfo, { - end: { ts: 0, uniqID: null }, - }); - infoEmitted = true; - }); - lrs.on('data', () => { - assert(false, 'did not expect data from ListRecordStream'); + const cursor = new MongoCursorMock(logEntries); + const lrs = new ListRecordStream(cursor, logger, + lastEndIDEntry.h.toString()); + lrs.on('data', entry => { + assert(false, `ListRecordStream did not ignore entry ${entry}`); }); - lrs.on('end', () => { - assert(infoEmitted); + setTimeout(() => { + assert.strictEqual(cursor.hasSentAllItems(), true); + assert.deepStrictEqual(JSON.parse(lrs.getOffset()), + { uniqID: '-42' }); done(); - }); - lrs.end(); + }, 200); }); it('should skip entries until uniqID is encountered', done => { const logEntries = [ Object.assign({}, mongoProcessedLogEntries.insert, - { h: 1234 }), + { h: 1234, ts: Timestamp.fromNumber(45) }), Object.assign({}, mongoProcessedLogEntries.insert, - { h: 5678 }), + { h: 5678, ts: Timestamp.fromNumber(44) }), Object.assign({}, mongoProcessedLogEntries.insert, - { h: -1234 }), + { h: -1234, ts: Timestamp.fromNumber(42) }), Object.assign({}, mongoProcessedLogEntries.insert, - { h: 2345 }), + { h: 2345, ts: Timestamp.fromNumber(42) }), ]; - const lrs = new ListRecordStream(logger, '5678'); + const cursor = new MongoCursorMock(logEntries); + const lrs = new ListRecordStream(cursor, logger, '5678'); + assert.strictEqual(lrs.reachedUnpublishedListing(), false); let nbReceivedEntries = 0; - let infoEmitted = false; - lrs.on('info', info => { - infoEmitted = true; - const parsedInfo = info; - parsedInfo.end = JSON.parse(parsedInfo.end); - assert.deepStrictEqual(parsedInfo, { - end: { ts: 42, uniqID: '2345' }, - }); + lrs.on('data', entry => { + assert.deepStrictEqual(entry, expectedStreamEntries.insert); + assert.strictEqual(lrs.reachedUnpublishedListing(), true); + ++nbReceivedEntries; + if (cursor.hasSentAllItems()) { + assert.strictEqual(nbReceivedEntries, 2); + assert.deepStrictEqual(JSON.parse(lrs.getOffset()), + { uniqID: '2345' }); + assert.strictEqual(lrs.getSkipCount(), 2); + assert.strictEqual(lrs.reachedUnpublishedListing(), true); + done(); + } }); + }); + + it('should start after latest entry if uniqID is not encountered', done => { + const logEntries = [ + Object.assign({}, mongoProcessedLogEntries.insert, + { h: 1234, ts: Timestamp.fromNumber(45) }), + Object.assign({}, mongoProcessedLogEntries.insert, + { h: 5678, ts: Timestamp.fromNumber(44) }), + Object.assign({}, mongoProcessedLogEntries.insert, + { h: -1234, ts: Timestamp.fromNumber(42) }), + Object.assign({}, mongoProcessedLogEntries.insert, + { h: 2345, ts: Timestamp.fromNumber(42) }), + ]; + const cursor = new MongoCursorMock(logEntries); + const lrs = new ListRecordStream(cursor, logger, '4242', '-1234'); + let nbReceivedEntries = 0; lrs.on('data', entry => { assert.deepStrictEqual(entry, expectedStreamEntries.insert); ++nbReceivedEntries; + if (cursor.hasSentAllItems()) { + assert.strictEqual(nbReceivedEntries, 1); + assert.deepStrictEqual(JSON.parse(lrs.getOffset()), + { uniqID: '2345' }); + assert.strictEqual(lrs.getSkipCount(), 3); + assert.strictEqual(lrs.reachedUnpublishedListing(), true); + done(); + } }); - lrs.on('end', () => { - assert.strictEqual(nbReceivedEntries, 2); - assert(infoEmitted); - done(); + }); + it('should emit an error event when cursor returns an error', done => { + const cursor = new MongoCursorMock([], 0); + const lrs = new ListRecordStream(cursor, logger, '4242', '-1234'); + lrs.on('data', () => { + assert(false, 'did not expect data'); }); - logEntries.forEach(entry => lrs.write(entry)); - lrs.end(); + lrs.on('error', () => done()); }); }); From d620fef517886b179f2b693679b1dcecc53fb012 Mon Sep 17 00:00:00 2001 From: Jonathan Gramain Date: Thu, 8 Nov 2018 12:06:29 -0800 Subject: [PATCH 2/2] bf: ZENKO-1175 fix when no saved ID exists In case where there is no saved ID yet (initial deployment), do process the very first entry in the log instead of skipping it. In practice it should not have an impact because the very first entry in the log is normally not due to be processed for CRR, but it ensures correctness. --- .../metadata/mongoclient/ListRecordStream.js | 15 ++++++----- .../mongoclient/ListRecordStream.spec.js | 27 +++++++++++++++++++ 2 files changed, 36 insertions(+), 6 deletions(-) diff --git a/lib/storage/metadata/mongoclient/ListRecordStream.js b/lib/storage/metadata/mongoclient/ListRecordStream.js index c09773d55..c9b6bd480 100644 --- a/lib/storage/metadata/mongoclient/ListRecordStream.js +++ b/lib/storage/metadata/mongoclient/ListRecordStream.js @@ -23,7 +23,7 @@ class ListRecordStream extends stream.Readable { this._logger = logger; this._lastSavedID = lastSavedID; this._latestOplogID = latestOplogID; - this._lastUniqID = null; + this._lastConsumedID = null; // this._unpublishedListing is true once we pass the oplog // record that has the same uniqID 'h' than last saved. If we // don't find it (e.g. log rolled over before populator could @@ -62,15 +62,18 @@ class ListRecordStream extends stream.Readable { _processItem(itemObj) { // always update to most recent uniqID - this._lastUniqID = itemObj.h.toString(); + this._lastConsumedID = itemObj.h.toString(); // only push to stream unpublished objects - if (!this._unpublishedListing) { + if (!this._lastSavedID) { + // process from the first entry + this._unpublishedListing = true; + } else if (!this._unpublishedListing) { // When an oplog with a unique ID that is stored in the // log offset is found, all oplogs AFTER this is unpublished. - if (!this._lastSavedID || this._lastSavedID === this._lastUniqID) { + if (this._lastSavedID === this._lastConsumedID) { this._unpublishedListing = true; - } else if (this._latestOplogID === this._lastUniqID) { + } else if (this._latestOplogID === this._lastConsumedID) { this._logger.warn( 'did not encounter the last saved offset in oplog, ' + 'resuming processing right after the latest record ' + @@ -140,7 +143,7 @@ class ListRecordStream extends stream.Readable { */ getOffset() { return JSON.stringify({ - uniqID: this._lastUniqID, + uniqID: this._lastConsumedID, }); } diff --git a/tests/unit/storage/metadata/mongoclient/ListRecordStream.spec.js b/tests/unit/storage/metadata/mongoclient/ListRecordStream.spec.js index 9354970cf..9ad3f447c 100644 --- a/tests/unit/storage/metadata/mongoclient/ListRecordStream.spec.js +++ b/tests/unit/storage/metadata/mongoclient/ListRecordStream.spec.js @@ -293,6 +293,33 @@ describe('mongoclient.ListRecordStream', () => { } }); }); + it('should consume from the first entry if there is no saved ID', done => { + const logEntries = [ + Object.assign({}, mongoProcessedLogEntries.insert, + { h: 1234, ts: Timestamp.fromNumber(42) }), + Object.assign({}, mongoProcessedLogEntries.insert, + { h: 5678, ts: Timestamp.fromNumber(42) }), + Object.assign({}, mongoProcessedLogEntries.insert, + { h: -1234, ts: Timestamp.fromNumber(42) }), + Object.assign({}, mongoProcessedLogEntries.insert, + { h: 2345, ts: Timestamp.fromNumber(42) }), + ]; + const cursor = new MongoCursorMock(logEntries); + const lrs = new ListRecordStream(cursor, logger, undefined, '-1234'); + let nbReceivedEntries = 0; + lrs.on('data', entry => { + assert.deepStrictEqual(entry, expectedStreamEntries.insert); + ++nbReceivedEntries; + if (cursor.hasSentAllItems()) { + assert.strictEqual(nbReceivedEntries, 4); + assert.deepStrictEqual(JSON.parse(lrs.getOffset()), + { uniqID: '2345' }); + assert.strictEqual(lrs.getSkipCount(), 0); + assert.strictEqual(lrs.reachedUnpublishedListing(), true); + done(); + } + }); + }); it('should emit an error event when cursor returns an error', done => { const cursor = new MongoCursorMock([], 0); const lrs = new ListRecordStream(cursor, logger, '4242', '-1234');