Skip to content

Commit

Permalink
Merge branch 'bugfix/ZENKO-1175-oplogSkipFix' into tmp/octopus/w/8.1/…
Browse files Browse the repository at this point in the history
…bugfix/ZENKO-1175-oplogSkipFix
  • Loading branch information
bert-e committed Nov 8, 2018
2 parents ebd9a74 + d620fef commit 7f54136
Show file tree
Hide file tree
Showing 4 changed files with 292 additions and 134 deletions.
135 changes: 109 additions & 26 deletions lib/storage/metadata/mongoclient/ListRecordStream.js
Original file line number Diff line number Diff line change
@@ -1,33 +1,90 @@
const stream = require('stream');

class ListRecordStream extends stream.Transform {
constructor(logger, lastEndID) {
/**
* @class ListRecordStream
* @classdesc Filter and stream records returned from a mongodb query
* cursor
*/
class ListRecordStream extends stream.Readable {
/**
* @constructor
* @param {mongodb.Cursor} mongoCursor - cursor returned by a
* mongodb query to the oplog (see
* http://mongodb.github.io/node-mongodb-native/2.0/api/Cursor.html)
* @param {werelogs.Logger} logger - logger object
* @param {string} lastSavedID - unique ID that has been persisted
* of the most recently processed entry in the oplog
* @param {string} latestOplogID - unique ID of the most recently
* added entry in the oplog
*/
constructor(mongoCursor, logger, lastSavedID, latestOplogID) {
super({ objectMode: true });
this._cursor = mongoCursor;
this._logger = logger;
this._lastEndID = lastEndID;
this._lastTs = 0;
this._lastUniqID = null;
this._lastSavedID = lastSavedID;
this._latestOplogID = latestOplogID;
this._lastConsumedID = null;
// this._unpublishedListing is true once we pass the oplog
// that has the start seq timestamp and uniqID 'h'
// record that has the same uniqID 'h' than last saved. If we
// don't find it (e.g. log rolled over before populator could
// process its oldest entries), we will restart from the
// latest record of the oplog.
this._unpublishedListing = false;
// cf. this.getSkipCount()
this._skipCount = 0;
}

_transform(itemObj, encoding, callback) {
// always update to most recent uniqID
this._lastUniqID = itemObj.h.toString();
_read() {
// MongoDB cursors provide a stream interface. We choose not
// to use it though because errors may not be emitted by the
// stream when there is an issue with the connection to
// MongoDB (especially when pause()/resume() are used).
//
// Instead we use the async cursor.next() call directly to
// fetch records one at a time, errors are then forwarded in
// the callback.
this._cursor.next((err, item) => {
if (err) {
this._logger.error('mongodb cursor error', {
method: 'mongoclient.ListRecordStream._read()',
error: err.message,
});
this.emit('error', err);
return undefined;
}
if (this._processItem(item)) {
return process.nextTick(this._read.bind(this));
}
// wait until _read() gets called again
return undefined;
});
}

if (this._lastTs === null || itemObj.ts.toNumber() > this._lastTs) {
this._lastTs = itemObj.ts.toNumber();
}
_processItem(itemObj) {
// always update to most recent uniqID
this._lastConsumedID = itemObj.h.toString();

// only push to stream unpublished objects
if (!this._unpublishedListing) {
if (!this._lastSavedID) {
// process from the first entry
this._unpublishedListing = true;
} else if (!this._unpublishedListing) {
// When an oplog with a unique ID that is stored in the
// log offset is found, all oplogs AFTER this is unpublished.
if (!this._lastEndID || this._lastEndID === itemObj.h.toString()) {
if (this._lastSavedID === this._lastConsumedID) {
this._unpublishedListing = true;
} else if (this._latestOplogID === this._lastConsumedID) {
this._logger.warn(
'did not encounter the last saved offset in oplog, ' +
'resuming processing right after the latest record ' +
'to date; some entries may have been skipped', {
lastSavedID: this._lastSavedID,
latestRecordID: this._latestOplogID,
});
this._unpublishedListing = true;
}
return callback();
++this._skipCount;
return true; // read next record
}

const dbName = itemObj.ns.split('.');
Expand Down Expand Up @@ -62,27 +119,53 @@ class ListRecordStream extends stream.Transform {
} else {
// skip other entry types as we don't need them for now
// ('c', ...?)
return callback();
++this._skipCount;
return true; // read next record
}
const streamObject = {
timestamp: new Date((itemObj.ts ?
itemObj.ts.toNumber() * 1000 : 0)),
db: dbName[1],
entries: [entry],
};
return callback(null, streamObject);
// push object to the stream, then return false to wait until
// _read() is called again (because we are in an asynchronous
// context already)
this.push(streamObject);
return false;
}

_flush(callback) {
this.emit('info', {
// store both the timestamp and unique oplog id in an
// opaque JSON string returned to the reader
end: JSON.stringify({
ts: this._lastTs,
uniqID: this._lastUniqID,
}),
/**
* Get an opaque JSON blob containing the latest consumed offset
* from MongoDB oplog.
*
* @return {string} opaque JSON blob
*/
getOffset() {
return JSON.stringify({
uniqID: this._lastConsumedID,
});
callback();
}

/**
* Get the number of entries that have been read and skipped from
* MongoDB oplog since the ListRecordStream instance was created.
*
* @return {integer} number of skipped entries
*/
getSkipCount() {
return this._skipCount;
}

/**
* Get whether the stream reached yet-unpublished records
* (i.e. after we reached either the saved unique ID, or the tip
* of the oplog)
*
* @return {boolean} true if we are now returning unpublished records
*/
reachedUnpublishedListing() {
return this._unpublishedListing;
}
}

Expand Down
84 changes: 51 additions & 33 deletions lib/storage/metadata/mongoclient/LogConsumer.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

const MongoClient = require('mongodb').MongoClient;
const ListRecordStream = require('./ListRecordStream');
const { Timestamp } = require('bson');

/**
* @class
Expand All @@ -18,12 +17,11 @@ class LogConsumer {
*/
constructor(mongoConfig, logger) {
const { replicaSetHosts, database } = mongoConfig;
// 'local' is the database where MongoDB has oplogs.rs capped collection
this.database = 'local';
this.mongoUrl = `mongodb://${replicaSetHosts}/local`;
this.logger = logger;
this.metadataDatabase = database;
this.oplogNsRegExp = new RegExp(`^${database}\\.`);
this._mongoUrl = `mongodb://${replicaSetHosts}/local`;
this._logger = logger;
this._oplogNsRegExp = new RegExp(`^${database}\\.`);
// oplog collection
this._coll = null;
}

/**
Expand All @@ -35,67 +33,87 @@ class LogConsumer {
* @return {undefined}
*/
connectMongo(done) {
MongoClient.connect(this.mongoUrl, { replicaSet: 'rs0' },
MongoClient.connect(this._mongoUrl, {
replicaSet: 'rs0',
useNewUrlParser: true,
},
(err, client) => {
if (err) {
this.logger.error('Unable to connect to MongoDB',
this._logger.error('Unable to connect to MongoDB',
{ error: err });
return done(err);
}
this.logger.info('connected to mongodb');
this.client = client;
this.db = client.db(this.database, {
this._logger.info('connected to mongodb');
// 'local' is the database where MongoDB has oplog.rs
// capped collection
const db = client.db('local', {
ignoreUndefined: true,
});
this._coll = db.collection('oplog.rs');
return done();
});
}

/**
* Read a series of log records from mongo
* Open a tailable cursor to mongo oplog and retrieve a stream of
* records to read
*
* @param {Object} [params] - params object
* @param {String} [params.startSeq] - fetch starting from this
* opaque offset returned previously by mongo ListRecordStream
* in an 'info' event
* @param {Number} [params.limit] - maximum number of log records
* to return
* @param {function} cb - callback function, called with an error
* object or null and an object as 2nd parameter
*
* @return {undefined}
*/
readRecords(params, cb) {
const limit = params.limit || 10000;
let startSeq = { ts: 0 };
let startSeq = {};
if (params.startSeq) {
try {
// parse the opaque JSON string passed through from a
// previous 'info' event
startSeq = JSON.parse(params.startSeq);
} catch (err) {
this.logger.error('malformed startSeq', {
this._logger.error('malformed startSeq', {
startSeq: params.startSeq,
});
// start over if malformed
}
}
const recordStream = new ListRecordStream(this.logger, startSeq.uniqID);
this._readLatestOplogID((err, latestOplogID) => {
if (err) {
return cb(err);
}
return this._coll.find({
ns: this._oplogNsRegExp,
}, {
tailable: true,
awaitData: true,
noCursorTimeout: true,
numberOfRetries: Number.MAX_VALUE,
}, (err, cursor) => {
const recordStream = new ListRecordStream(
cursor, this._logger, startSeq.uniqID, latestOplogID);
return cb(null, { log: recordStream, tailable: true });
});
});
}

this.coll = this.db.collection('oplog.rs');
return this.coll.find({
ns: this.oplogNsRegExp,
ts: { $gte: Timestamp.fromNumber(startSeq.ts) },
_readLatestOplogID(cb) {
this._coll.find({
ns: this._oplogNsRegExp,
}, {
limit,
tailable: false,
awaitData: false,
noCursorTimeout: true,
oplogReplay: true,
numberOfRetries: Number.MAX_VALUE,
}, (err, res) => {
res.stream().pipe(recordStream);
recordStream.removeAllListeners('error');
return cb(null, { log: recordStream });
ts: 1,
}).sort({
$natural: -1,
}).limit(1).toArray((err, data) => {
if (err) {
return cb(err);
}
const latestOplogID = data[0].h.toString();
this._logger.debug('latest oplog ID read', { latestOplogID });
return cb(null, latestOplogID);
});
}
}
Expand Down
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
"engines": {
"node": ">=8"
},
"version": "8.0.4",
"version": "8.0.5",
"description": "Common utilities for the S3 project components",
"main": "index.js",
"repository": {
Expand Down
Loading

0 comments on commit 7f54136

Please sign in to comment.