Skip to content

Commit

Permalink
merge #433
Browse files Browse the repository at this point in the history
  • Loading branch information
ironman-machine authored and Cloud User committed Mar 8, 2018
2 parents da1da43 + a9a6b24 commit 189194a
Show file tree
Hide file tree
Showing 3 changed files with 191 additions and 5 deletions.
2 changes: 2 additions & 0 deletions index.js
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,8 @@ module.exports = {
MongoClientInterface:
require('./lib/storage/metadata/mongoclient/' +
'MongoClientInterface'),
LogConsumer:
require('./lib/storage/metadata/mongoclient/LogConsumer'),
},
},
data: {
Expand Down
187 changes: 187 additions & 0 deletions lib/storage/metadata/mongoclient/LogConsumer.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,187 @@
'use strict'; // eslint-disable-line

const stream = require('stream');
const MongoClient = require('mongodb').MongoClient;
const { Timestamp } = require('bson');

let lastEndID = undefined;

const ops = {
i: 'put',
u: 'put',
d: 'delete',
};

class ListRecordStream extends stream.Transform {
constructor(logger) {
super({ objectMode: true });
this.logger = logger;
this.hasStarted = false;
this.start = undefined;
this.end = undefined;
this.lastUniqID = undefined;
// this.unpublishedListing is true once we pass the oplog that has the
// start seq timestamp and uniqID 'h'
this.unpublishedListing = undefined;
}

_transform(itemObj, encoding, callback) {
if (!itemObj) {
this.push(null);
this.emit('info', {
start: this.start,
end: this.end,
uniqID: this.lastUniqID,
});
return callback();
}

// always update to most recent uniqID
this.lastUniqID = itemObj.h.toString();

if (this.end === undefined || itemObj.ts.toNumber() > this.end) {
this.end = itemObj.ts.toNumber();
}

// only push to stream unpublished objects
if (!this.unpublishedListing) {
if (lastEndID === itemObj.h.toString()) {
this.unpublishedListing = true;
}
return callback();
}

if (!this.hasStarted) {
this.hasStarted = true;
this.start = itemObj.ts.toNumber();
this.emit('info', {
start: this.start,
end: this.end,
uniqId: this.lastUniqID,
});
}

// don't push oplogs that have already been sent
if (!this.unpublishedListing) {
return callback();
}

const dbName = itemObj.ns.split('.');
const streamObject = {
timestamp: new Date(itemObj.ts.high_ * 1000),
db: dbName[1],
entries: [
{
type: ops[itemObj.op],
key: itemObj.o._id,
value: JSON.stringify(itemObj.o.value),
},
],
};
return callback(null, streamObject);
}

_flush(callback) {
this.emit('info', {
start: this.start,
end: this.end,
uniqID: this.lastUniqID,
});
this.push(null);
callback();
}
}

/**
* @class
* @classdesc Class to consume mongo oplog
*/
class LogConsumer {

/**
* @constructor
*
* @param {object} mongoConfig - object with the mongo configuration
* @param {string} logger - logger
*/
constructor(mongoConfig, logger) {
const { replicaSetHosts } = mongoConfig;
// 'local' is the database where MongoDB has oplogs.rs capped collection
this.database = 'local';
this.mongoUrl = `mongodb://${replicaSetHosts}/local`;
this.logger = logger;
}

/**
* Connect to MongoClient using Mongo node module to access database and
* database oplogs (operation logs)
*
* @param {function} done - callback function, called with an error object
* or null and an object as 2nd parameter
* @return {undefined}
*/
connectMongo(done) {
MongoClient.connect(this.mongoUrl, { replicaSet: 'rs0' },
(err, client) => {
if (err) {
this.logger.error('Unable to connect to MongoDB',
{ error: err });
return done(err);
}
this.logger.info('connected to mongodb');
this.client = client;
this.db = client.db(this.database, {
ignoreUndefined: true,
});
return done();
});
}
/**
* Read a series of log records from mongo
*
* @param {Object} [params] - params object
* @param {String} [params.startSeq] - fetch starting from this
* sequence number
* @param {Number} [params.limit] - maximum number of log records
* to return
* @param {function} cb - callback function, called with an error
* object or null and an object as 2nd parameter
*
* @return {undefined}
*/
readRecords(params, cb) {
const recordStream = new ListRecordStream(this.logger);
const limit = params.limit || 10000;
const startIDandSeq = params.startSeq.toString().split('|');
const startSeq = parseInt(startIDandSeq[0], 10) || 0;
lastEndID = startIDandSeq[1];

this.coll = this.db.collection('oplog.rs');
return this.coll.find({
ns: /^(?!.*metadata.*(?:__)).*metadata\.\w+.*/,
ts: { $gte: Timestamp.fromNumber(startSeq) },
}, {
limit,
tailable: false,
awaitData: false,
noCursorTimeout: true,
OplogReplay: true,
numberOfRetries: Number.MAX_VALUE,
}, (err, res) => {
const stream = res.stream();
stream.on('data', data => {
recordStream.write(data);
});
stream.on('end', () => {
recordStream.write(undefined);
});
recordStream.once('info', info => {
recordStream.removeAllListeners('error');
cb(null, { info, log: recordStream });
});
return undefined;
});
}
}

module.exports = LogConsumer;
7 changes: 2 additions & 5 deletions lib/storage/metadata/mongoclient/MongoClientInterface.js
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
*/
const async = require('async');

const format = require('util').format;
const constants = require('../../../constants');

const errors = require('../../../errors');
Expand Down Expand Up @@ -82,10 +81,8 @@ class MongoClientInterface {
constructor(params) {
const { replicaSetHosts, path, database, logger,
replicationGroupId, replicaSet } = params;
this.mongoUrl = format(
'mongodb://%s/?replicaSet=%s',
replicaSetHosts,
replicaSet);
this.mongoUrl = `mongodb://${replicaSetHosts}/?replicaSet=` +
`${replicaSet}`;
this.logger = logger;
this.client = null;
this.db = null;
Expand Down

0 comments on commit 189194a

Please sign in to comment.