Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

INTEGRATION [PR#594 > development/8.1] ft: ZENKO-1175 tailable cursor to consume mongo oplog #600

Merged
merged 3 commits into from
Nov 8, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
135 changes: 109 additions & 26 deletions lib/storage/metadata/mongoclient/ListRecordStream.js
Original file line number Diff line number Diff line change
@@ -1,33 +1,90 @@
const stream = require('stream');

class ListRecordStream extends stream.Transform {
constructor(logger, lastEndID) {
/**
* @class ListRecordStream
* @classdesc Filter and stream records returned from a mongodb query
* cursor
*/
class ListRecordStream extends stream.Readable {
/**
* @constructor
* @param {mongodb.Cursor} mongoCursor - cursor returned by a
* mongodb query to the oplog (see
* http://mongodb.github.io/node-mongodb-native/2.0/api/Cursor.html)
* @param {werelogs.Logger} logger - logger object
* @param {string} lastSavedID - unique ID that has been persisted
* of the most recently processed entry in the oplog
* @param {string} latestOplogID - unique ID of the most recently
* added entry in the oplog
*/
constructor(mongoCursor, logger, lastSavedID, latestOplogID) {
super({ objectMode: true });
this._cursor = mongoCursor;
this._logger = logger;
this._lastEndID = lastEndID;
this._lastTs = 0;
this._lastUniqID = null;
this._lastSavedID = lastSavedID;
this._latestOplogID = latestOplogID;
this._lastConsumedID = null;
// this._unpublishedListing is true once we pass the oplog
// that has the start seq timestamp and uniqID 'h'
// record that has the same uniqID 'h' than last saved. If we
// don't find it (e.g. log rolled over before populator could
// process its oldest entries), we will restart from the
// latest record of the oplog.
this._unpublishedListing = false;
// cf. this.getSkipCount()
this._skipCount = 0;
}

_transform(itemObj, encoding, callback) {
// always update to most recent uniqID
this._lastUniqID = itemObj.h.toString();
_read() {
// MongoDB cursors provide a stream interface. We choose not
// to use it though because errors may not be emitted by the
// stream when there is an issue with the connection to
// MongoDB (especially when pause()/resume() are used).
//
// Instead we use the async cursor.next() call directly to
// fetch records one at a time, errors are then forwarded in
// the callback.
this._cursor.next((err, item) => {
if (err) {
this._logger.error('mongodb cursor error', {
method: 'mongoclient.ListRecordStream._read()',
error: err.message,
});
this.emit('error', err);
return undefined;
}
if (this._processItem(item)) {
return process.nextTick(this._read.bind(this));
}
// wait until _read() gets called again
return undefined;
});
}

if (this._lastTs === null || itemObj.ts.toNumber() > this._lastTs) {
this._lastTs = itemObj.ts.toNumber();
}
_processItem(itemObj) {
// always update to most recent uniqID
this._lastConsumedID = itemObj.h.toString();

// only push to stream unpublished objects
if (!this._unpublishedListing) {
if (!this._lastSavedID) {
// process from the first entry
this._unpublishedListing = true;
} else if (!this._unpublishedListing) {
// When an oplog with a unique ID that is stored in the
// log offset is found, all oplogs AFTER this is unpublished.
if (!this._lastEndID || this._lastEndID === itemObj.h.toString()) {
if (this._lastSavedID === this._lastConsumedID) {
this._unpublishedListing = true;
} else if (this._latestOplogID === this._lastConsumedID) {
this._logger.warn(
'did not encounter the last saved offset in oplog, ' +
'resuming processing right after the latest record ' +
'to date; some entries may have been skipped', {
lastSavedID: this._lastSavedID,
latestRecordID: this._latestOplogID,
});
this._unpublishedListing = true;
}
return callback();
++this._skipCount;
return true; // read next record
}

const dbName = itemObj.ns.split('.');
Expand Down Expand Up @@ -62,27 +119,53 @@ class ListRecordStream extends stream.Transform {
} else {
// skip other entry types as we don't need them for now
// ('c', ...?)
return callback();
++this._skipCount;
return true; // read next record
}
const streamObject = {
timestamp: new Date((itemObj.ts ?
itemObj.ts.toNumber() * 1000 : 0)),
db: dbName[1],
entries: [entry],
};
return callback(null, streamObject);
// push object to the stream, then return false to wait until
// _read() is called again (because we are in an asynchronous
// context already)
this.push(streamObject);
return false;
}

_flush(callback) {
this.emit('info', {
// store both the timestamp and unique oplog id in an
// opaque JSON string returned to the reader
end: JSON.stringify({
ts: this._lastTs,
uniqID: this._lastUniqID,
}),
/**
* Get an opaque JSON blob containing the latest consumed offset
* from MongoDB oplog.
*
* @return {string} opaque JSON blob
*/
getOffset() {
return JSON.stringify({
uniqID: this._lastConsumedID,
});
callback();
}

/**
* Get the number of entries that have been read and skipped from
* MongoDB oplog since the ListRecordStream instance was created.
*
* @return {integer} number of skipped entries
*/
getSkipCount() {
return this._skipCount;
}

/**
* Get whether the stream reached yet-unpublished records
* (i.e. after we reached either the saved unique ID, or the tip
* of the oplog)
*
* @return {boolean} true if we are now returning unpublished records
*/
reachedUnpublishedListing() {
return this._unpublishedListing;
}
}

Expand Down
84 changes: 51 additions & 33 deletions lib/storage/metadata/mongoclient/LogConsumer.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

const MongoClient = require('mongodb').MongoClient;
const ListRecordStream = require('./ListRecordStream');
const { Timestamp } = require('bson');

/**
* @class
Expand All @@ -18,12 +17,11 @@ class LogConsumer {
*/
constructor(mongoConfig, logger) {
const { replicaSetHosts, database } = mongoConfig;
// 'local' is the database where MongoDB has oplogs.rs capped collection
this.database = 'local';
this.mongoUrl = `mongodb://${replicaSetHosts}/local`;
this.logger = logger;
this.metadataDatabase = database;
this.oplogNsRegExp = new RegExp(`^${database}\\.`);
this._mongoUrl = `mongodb://${replicaSetHosts}/local`;
this._logger = logger;
this._oplogNsRegExp = new RegExp(`^${database}\\.`);
// oplog collection
this._coll = null;
}

/**
Expand All @@ -35,67 +33,87 @@ class LogConsumer {
* @return {undefined}
*/
connectMongo(done) {
MongoClient.connect(this.mongoUrl, { replicaSet: 'rs0' },
MongoClient.connect(this._mongoUrl, {
replicaSet: 'rs0',
useNewUrlParser: true,
},
(err, client) => {
if (err) {
this.logger.error('Unable to connect to MongoDB',
this._logger.error('Unable to connect to MongoDB',
{ error: err });
return done(err);
}
this.logger.info('connected to mongodb');
this.client = client;
this.db = client.db(this.database, {
this._logger.info('connected to mongodb');
// 'local' is the database where MongoDB has oplog.rs
// capped collection
const db = client.db('local', {
ignoreUndefined: true,
});
this._coll = db.collection('oplog.rs');
return done();
});
}

/**
* Read a series of log records from mongo
* Open a tailable cursor to mongo oplog and retrieve a stream of
* records to read
*
* @param {Object} [params] - params object
* @param {String} [params.startSeq] - fetch starting from this
* opaque offset returned previously by mongo ListRecordStream
* in an 'info' event
* @param {Number} [params.limit] - maximum number of log records
* to return
* @param {function} cb - callback function, called with an error
* object or null and an object as 2nd parameter
*
* @return {undefined}
*/
readRecords(params, cb) {
const limit = params.limit || 10000;
let startSeq = { ts: 0 };
let startSeq = {};
if (params.startSeq) {
try {
// parse the opaque JSON string passed through from a
// previous 'info' event
startSeq = JSON.parse(params.startSeq);
} catch (err) {
this.logger.error('malformed startSeq', {
this._logger.error('malformed startSeq', {
startSeq: params.startSeq,
});
// start over if malformed
}
}
const recordStream = new ListRecordStream(this.logger, startSeq.uniqID);
this._readLatestOplogID((err, latestOplogID) => {
if (err) {
return cb(err);
}
return this._coll.find({
ns: this._oplogNsRegExp,
}, {
tailable: true,
awaitData: true,
noCursorTimeout: true,
numberOfRetries: Number.MAX_VALUE,
}, (err, cursor) => {
const recordStream = new ListRecordStream(
cursor, this._logger, startSeq.uniqID, latestOplogID);
return cb(null, { log: recordStream, tailable: true });
});
});
}

this.coll = this.db.collection('oplog.rs');
return this.coll.find({
ns: this.oplogNsRegExp,
ts: { $gte: Timestamp.fromNumber(startSeq.ts) },
_readLatestOplogID(cb) {
this._coll.find({
ns: this._oplogNsRegExp,
}, {
limit,
tailable: false,
awaitData: false,
noCursorTimeout: true,
oplogReplay: true,
numberOfRetries: Number.MAX_VALUE,
}, (err, res) => {
res.stream().pipe(recordStream);
recordStream.removeAllListeners('error');
return cb(null, { log: recordStream });
ts: 1,
}).sort({
$natural: -1,
}).limit(1).toArray((err, data) => {
if (err) {
return cb(err);
}
const latestOplogID = data[0].h.toString();
this._logger.debug('latest oplog ID read', { latestOplogID });
return cb(null, latestOplogID);
});
}
}
Expand Down
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
"engines": {
"node": ">=8"
},
"version": "8.0.4",
"version": "8.0.5",
"description": "Common utilities for the S3 project components",
"main": "index.js",
"repository": {
Expand Down
Loading