-
Notifications
You must be signed in to change notification settings - Fork 19
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Ft/mongo oplog #433
Ft/mongo oplog #433
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||
---|---|---|---|---|
@@ -0,0 +1,187 @@ | ||||
'use strict'; // eslint-disable-line | ||||
|
||||
const stream = require('stream'); | ||||
const MongoClient = require('mongodb').MongoClient; | ||||
const { Timestamp } = require('bson'); | ||||
|
||||
let lastEndID = undefined; | ||||
|
||||
const ops = { | ||||
i: 'put', | ||||
u: 'put', | ||||
d: 'delete', | ||||
}; | ||||
|
||||
class ListRecordStream extends stream.Transform { | ||||
constructor(logger) { | ||||
super({ objectMode: true }); | ||||
this.logger = logger; | ||||
this.hasStarted = false; | ||||
this.start = undefined; | ||||
this.end = undefined; | ||||
this.lastUniqID = undefined; | ||||
// this.unpublishedListing is true once we pass the oplog that has the | ||||
// start seq timestamp and uniqID 'h' | ||||
this.unpublishedListing = undefined; | ||||
} | ||||
|
||||
_transform(itemObj, encoding, callback) { | ||||
if (!itemObj) { | ||||
this.push(null); | ||||
this.emit('info', { | ||||
start: this.start, | ||||
end: this.end, | ||||
uniqID: this.lastUniqID, | ||||
}); | ||||
return callback(); | ||||
} | ||||
|
||||
// always update to most recent uniqID | ||||
this.lastUniqID = itemObj.h.toString(); | ||||
|
||||
if (this.end === undefined || itemObj.ts.toNumber() > this.end) { | ||||
this.end = itemObj.ts.toNumber(); | ||||
} | ||||
|
||||
// only push to stream unpublished objects | ||||
if (!this.unpublishedListing) { | ||||
if (lastEndID === itemObj.h.toString()) { | ||||
this.unpublishedListing = true; | ||||
} | ||||
return callback(); | ||||
} | ||||
|
||||
if (!this.hasStarted) { | ||||
this.hasStarted = true; | ||||
this.start = itemObj.ts.toNumber(); | ||||
this.emit('info', { | ||||
start: this.start, | ||||
end: this.end, | ||||
uniqId: this.lastUniqID, | ||||
}); | ||||
} | ||||
|
||||
// don't push oplogs that have already been sent | ||||
if (!this.unpublishedListing) { | ||||
return callback(); | ||||
} | ||||
|
||||
const dbName = itemObj.ns.split('.'); | ||||
const streamObject = { | ||||
timestamp: new Date(itemObj.ts.high_ * 1000), | ||||
db: dbName[1], | ||||
entries: [ | ||||
{ | ||||
type: ops[itemObj.op], | ||||
key: itemObj.o._id, | ||||
value: JSON.stringify(itemObj.o.value), | ||||
}, | ||||
], | ||||
}; | ||||
return callback(null, streamObject); | ||||
} | ||||
|
||||
_flush(callback) { | ||||
this.emit('info', { | ||||
start: this.start, | ||||
end: this.end, | ||||
uniqID: this.lastUniqID, | ||||
}); | ||||
this.push(null); | ||||
callback(); | ||||
} | ||||
} | ||||
|
||||
/** | ||||
* @class | ||||
* @classdesc Class to consume mongo oplog | ||||
*/ | ||||
class LogConsumer { | ||||
|
||||
/** | ||||
* @constructor | ||||
* | ||||
* @param {object} mongoConfig - object with the mongo configuration | ||||
* @param {string} logger - logger | ||||
*/ | ||||
constructor(mongoConfig, logger) { | ||||
const { replicaSetHosts } = mongoConfig; | ||||
// 'local' is the database where MongoDB has oplogs.rs capped collection | ||||
this.database = 'local'; | ||||
this.mongoUrl = `mongodb://${replicaSetHosts}/local`; | ||||
this.logger = logger; | ||||
} | ||||
|
||||
/** | ||||
* Connect to MongoClient using Mongo node module to access database and | ||||
* database oplogs (operation logs) | ||||
* | ||||
* @param {function} done - callback function, called with an error object | ||||
* or null and an object as 2nd parameter | ||||
* @return {undefined} | ||||
*/ | ||||
connectMongo(done) { | ||||
MongoClient.connect(this.mongoUrl, { replicaSet: 'rs0' }, | ||||
(err, client) => { | ||||
if (err) { | ||||
this.logger.error('Unable to connect to MongoDB', | ||||
{ error: err }); | ||||
return done(err); | ||||
} | ||||
this.logger.info('connected to mongodb'); | ||||
this.client = client; | ||||
this.db = client.db(this.database, { | ||||
ignoreUndefined: true, | ||||
}); | ||||
return done(); | ||||
}); | ||||
} | ||||
/** | ||||
* Read a series of log records from mongo | ||||
* | ||||
* @param {Object} [params] - params object | ||||
* @param {String} [params.startSeq] - fetch starting from this | ||||
* sequence number | ||||
* @param {Number} [params.limit] - maximum number of log records | ||||
* to return | ||||
* @param {function} cb - callback function, called with an error | ||||
* object or null and an object as 2nd parameter | ||||
* | ||||
* @return {undefined} | ||||
*/ | ||||
readRecords(params, cb) { | ||||
const recordStream = new ListRecordStream(this.logger); | ||||
const limit = params.limit || 10000; | ||||
const startIDandSeq = params.startSeq.toString().split('|'); | ||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. From the JSDoc, this function expects There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It's possible that the last logs use numbers though if the instances were started with a different backend - I think I had the toString() here just in case we got a number |
||||
const startSeq = parseInt(startIDandSeq[0], 10) || 0; | ||||
lastEndID = startIDandSeq[1]; | ||||
|
||||
this.coll = this.db.collection('oplog.rs'); | ||||
return this.coll.find({ | ||||
ns: /^(?!.*metadata.*(?:__)).*metadata\.\w+.*/, | ||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What is this regular expression? Adding a comment of what the expression is trying to filter out would be helpful. Cleaner way would be writing it as a constant on the top of the file /*
** This is a comment to explain this filter
*/
const METADATA_FILTER = /^(?!.*metadata.*(?:__)).*metadata\.\w+.*/ and then use it in the code as return this.coll.find({
ns: METADATA_FILTER,
... |
||||
ts: { $gte: Timestamp.fromNumber(startSeq) }, | ||||
}, { | ||||
limit, | ||||
tailable: false, | ||||
awaitData: false, | ||||
noCursorTimeout: true, | ||||
OplogReplay: true, | ||||
numberOfRetries: Number.MAX_VALUE, | ||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why is retry configured to such a high number? |
||||
}, (err, res) => { | ||||
const stream = res.stream(); | ||||
stream.on('data', data => { | ||||
recordStream.write(data); | ||||
}); | ||||
stream.on('end', () => { | ||||
recordStream.write(undefined); | ||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Shouldn't this be There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. When we write 'null', it throws this error:
|
||||
}); | ||||
recordStream.once('info', info => { | ||||
recordStream.removeAllListeners('error'); | ||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Wondering if this is good to do as the error event not being listened for. Maybe we should add a listener in There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Was trying to follow the format here as closely as possible:
@rahulreddy @vrancurel any opinions? XD There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It's important to understand why that was put in LogConsumer in the first place. I don't like the idea of removing all error listeners. I believe the reason it was used in RecordStream was because Metadata was returning InternalError when we request the next sequence id but there are no logs available. @jonathan-gramain was the author of that class, can you check with him? |
||||
cb(null, { info, log: recordStream }); | ||||
}); | ||||
return undefined; | ||||
}); | ||||
} | ||||
} | ||||
|
||||
module.exports = LogConsumer; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should this be
get
?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The
i
andu
ops in mongo oplogs areinsert
andupdate
, which I mapped both toput
I'm not sure if mongodb oplogs have any logs corresponding to
get
actions - these logs are records of the replication that happens in the mongodb replication set itself, so the main actions areinsert
,update
, anddelete
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also, Lauren and I never found super official documentation (that was clear enough) for an explanation on oplogs - we ended up using this relatively out-dated blog as our primary source: https://www.compose.com/articles/the-mongodb-oplog-and-node-js/