Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/rel/7.2' into fwdport_7.2_to_master
Browse files Browse the repository at this point in the history
  • Loading branch information
ThibaultRiviere committed Jan 29, 2018
2 parents 8310e0e + 7793682 commit 30a2613
Show file tree
Hide file tree
Showing 3 changed files with 56 additions and 9 deletions.
15 changes: 9 additions & 6 deletions lib/queuePopulator/LogReader.js
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,8 @@ class LogReader {
this._extensions.forEach(ext => ext.setBatch(entryBatch));
}

_endEntryBatch() {
this._extensions.forEach(ext => ext.endBatch());
_unsetEntryBatch() {
this._extensions.forEach(ext => ext.unsetBatch());
}

setLogConsumer(logConsumer) {
Expand Down Expand Up @@ -288,14 +288,18 @@ class LogReader {
return done(null);
}

this._setEntryBatch(entriesToPublish);

logRes.log.on('data', record => {
this.log.debug('received log data',
{ nbEntries: record.entries.length,
info: logRes.info });
logStats.nbLogRecordsRead += 1;

this._setEntryBatch(entriesToPublish);
record.entries.forEach(entry => {
logStats.nbLogEntriesRead += 1;
this._processLogEntry(batchState, record, entry);
});
this._unsetEntryBatch();
});
logRes.log.on('error', err => {
this.log.error('error fetching entries from log',
Expand All @@ -304,8 +308,7 @@ class LogReader {
return done(err);
});
logRes.log.on('end', () => {
this.log.debug('ending record stream');
this._endEntryBatch();
this.log.debug('ending record stream', { info: logRes.info });
return done();
});
return undefined;
Expand Down
4 changes: 2 additions & 2 deletions lib/queuePopulator/QueuePopulatorExtension.js
Original file line number Diff line number Diff line change
Expand Up @@ -63,14 +63,14 @@ class QueuePopulatorExtension {
/**
* Internal use by QueuePopulator
*
* @param {Object} batch - batch to publish
* @param {Object} batch - current batch to be published
* @return {undefined}
*/
setBatch(batch) {
this._batch = batch;
}

endBatch() {
unsetBatch() {
this._batch = null;
}
}
Expand Down
1 change: 0 additions & 1 deletion package.json

This file was deleted.

45 changes: 45 additions & 0 deletions package.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
{
"name": "backbeat",
"version": "1.0.0",
"description": "Asynchronous queue and job manager",
"main": "index.js",
"scripts": {
"queue_populator": "node bin/queuePopulator.js",
"queue_processor": "node extensions/replication/queueProcessor/task.js",
"test": "mocha --recursive tests/unit",
"ft_test": "mocha --recursive tests/functional",
"bh_test": "mocha --recursive tests/behavior",
"lint": "eslint $(git ls-files '*.js')",
"lint_md": "mdlint $(git ls-files '*.md')"
},
"repository": {
"type": "git",
"url": "git+https://github.com/scality/backbeat.git"
},
"author": "Scality Inc.",
"license": "Apache-2.0",
"bugs": {
"url": "https://github.com/scality/backbeat/issues"
},
"homepage": "https://github.com/scality/backbeat#readme",
"dependencies": {
"arsenal": "scality/Arsenal",
"async": "^2.3.0",
"aws-sdk": "2.147.0",
"backo": "^1.1.0",
"bucketclient": "scality/bucketclient",
"commander": "^2.11.0",
"eslint": "^2.4.0",
"eslint-config-airbnb": "^6.0.0",
"eslint-config-scality": "scality/Guidelines",
"eslint-plugin-react": "^4.2.3",
"joi": "^10.6",
"kafka-node": "^2.2.0",
"node-schedule": "^1.2.0",
"vaultclient": "github:scality/vaultclient",
"werelogs": "scality/werelogs"
},
"devDependencies": {
"mocha": "^3.3.0"
}
}

0 comments on commit 30a2613

Please sign in to comment.