diff --git a/lib/queuePopulator/LogReader.js b/lib/queuePopulator/LogReader.js index 6f6c1f815..3e3038355 100644 --- a/lib/queuePopulator/LogReader.js +++ b/lib/queuePopulator/LogReader.js @@ -35,8 +35,8 @@ class LogReader { this._extensions.forEach(ext => ext.setBatch(entryBatch)); } - _endEntryBatch() { - this._extensions.forEach(ext => ext.endBatch()); + _unsetEntryBatch() { + this._extensions.forEach(ext => ext.unsetBatch()); } setLogConsumer(logConsumer) { @@ -288,14 +288,18 @@ class LogReader { return done(null); } - this._setEntryBatch(entriesToPublish); - logRes.log.on('data', record => { + this.log.debug('received log data', + { nbEntries: record.entries.length, + info: logRes.info }); logStats.nbLogRecordsRead += 1; + + this._setEntryBatch(entriesToPublish); record.entries.forEach(entry => { logStats.nbLogEntriesRead += 1; this._processLogEntry(batchState, record, entry); }); + this._unsetEntryBatch(); }); logRes.log.on('error', err => { this.log.error('error fetching entries from log', @@ -304,8 +308,7 @@ class LogReader { return done(err); }); logRes.log.on('end', () => { - this.log.debug('ending record stream'); - this._endEntryBatch(); + this.log.debug('ending record stream', { info: logRes.info }); return done(); }); return undefined; diff --git a/lib/queuePopulator/QueuePopulatorExtension.js b/lib/queuePopulator/QueuePopulatorExtension.js index 95b5fff5e..e39338560 100644 --- a/lib/queuePopulator/QueuePopulatorExtension.js +++ b/lib/queuePopulator/QueuePopulatorExtension.js @@ -63,14 +63,14 @@ class QueuePopulatorExtension { /** * Internal use by QueuePopulator * - * @param {Object} batch - batch to publish + * @param {Object} batch - current batch to be published * @return {undefined} */ setBatch(batch) { this._batch = batch; } - endBatch() { + unsetBatch() { this._batch = null; } } diff --git a/package.json b/package.json deleted file mode 120000 index e709a082e..000000000 --- a/package.json +++ /dev/null @@ -1 +0,0 @@ -eve/workers/unit_and_feature_tests/package.json \ No newline at end of file diff --git a/package.json b/package.json new file mode 100644 index 000000000..efe10eae0 --- /dev/null +++ b/package.json @@ -0,0 +1,45 @@ +{ + "name": "backbeat", + "version": "1.0.0", + "description": "Asynchronous queue and job manager", + "main": "index.js", + "scripts": { + "queue_populator": "node bin/queuePopulator.js", + "queue_processor": "node extensions/replication/queueProcessor/task.js", + "test": "mocha --recursive tests/unit", + "ft_test": "mocha --recursive tests/functional", + "bh_test": "mocha --recursive tests/behavior", + "lint": "eslint $(git ls-files '*.js')", + "lint_md": "mdlint $(git ls-files '*.md')" + }, + "repository": { + "type": "git", + "url": "git+https://github.com/scality/backbeat.git" + }, + "author": "Scality Inc.", + "license": "Apache-2.0", + "bugs": { + "url": "https://github.com/scality/backbeat/issues" + }, + "homepage": "https://github.com/scality/backbeat#readme", + "dependencies": { + "arsenal": "scality/Arsenal", + "async": "^2.3.0", + "aws-sdk": "2.147.0", + "backo": "^1.1.0", + "bucketclient": "scality/bucketclient", + "commander": "^2.11.0", + "eslint": "^2.4.0", + "eslint-config-airbnb": "^6.0.0", + "eslint-config-scality": "scality/Guidelines", + "eslint-plugin-react": "^4.2.3", + "joi": "^10.6", + "kafka-node": "^2.2.0", + "node-schedule": "^1.2.0", + "vaultclient": "github:scality/vaultclient", + "werelogs": "scality/werelogs" + }, + "devDependencies": { + "mocha": "^3.3.0" + } +}