Skip to content

Commit

Permalink
Merge pull request #459 from scality/fix/stuck-replication
Browse files Browse the repository at this point in the history
Fix/stuck replication
  • Loading branch information
rahulreddy authored Apr 2, 2018
2 parents ad42baa + 91ccccf commit 82b4055
Showing 1 changed file with 11 additions and 9 deletions.
20 changes: 11 additions & 9 deletions lib/storage/metadata/mongoclient/LogConsumer.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,18 +4,17 @@ const stream = require('stream');
const MongoClient = require('mongodb').MongoClient;
const { Timestamp } = require('bson');

let lastEndID = null;

const ops = {
i: 'put',
u: 'put',
d: 'delete',
};

class ListRecordStream extends stream.Transform {
constructor(logger) {
constructor(logger, lastEndID) {
super({ objectMode: true });
this.logger = logger;
this.lastEndID = lastEndID;
this.hasStarted = false;
this.start = null;
this.end = null;
Expand Down Expand Up @@ -45,10 +44,9 @@ class ListRecordStream extends stream.Transform {

// only push to stream unpublished objects
if (!this.unpublishedListing) {
if (lastEndID === itemObj.h.toString()) {
if (this.lastEndID === itemObj.h.toString()) {
this.unpublishedListing = true;
}
return callback();
}

if (!this.hasStarted) {
Expand Down Expand Up @@ -105,11 +103,12 @@ class LogConsumer {
* @param {string} logger - logger
*/
constructor(mongoConfig, logger) {
const { replicaSetHosts } = mongoConfig;
const { replicaSetHosts, database } = mongoConfig;
// 'local' is the database where MongoDB has oplogs.rs capped collection
this.database = 'local';
this.mongoUrl = `mongodb://${replicaSetHosts}/local`;
this.logger = logger;
this.metadataDatabase = database;
}

/**
Expand Down Expand Up @@ -150,15 +149,18 @@ class LogConsumer {
* @return {undefined}
*/
readRecords(params, cb) {
const recordStream = new ListRecordStream(this.logger);
const limit = params.limit || 10000;
const startIDandSeq = params.startSeq.toString().split('|');
const startSeq = parseInt(startIDandSeq[0], 10) || 0;
lastEndID = startIDandSeq[1];
const lastEndID = startIDandSeq[1];
const recordStream = new ListRecordStream(this.logger, lastEndID);

const db = this.metadataDatabase;
const ns = new RegExp(`^(?!.*${db}.*(?:__)).*${db}\\.\\w+.*`);

this.coll = this.db.collection('oplog.rs');
return this.coll.find({
ns: /^(?!.*metadata.*(?:__)).*metadata\.\w+.*/,
ns,
ts: { $gte: Timestamp.fromNumber(startSeq) },
}, {
limit,
Expand Down

0 comments on commit 82b4055

Please sign in to comment.