From 69922222b443364929f16969a94076faba58d95b Mon Sep 17 00:00:00 2001 From: Jonathan Gramain Date: Tue, 28 Nov 2017 18:17:49 -0800 Subject: [PATCH] bf: stream response from getRaftLog() Adapt LogConsumer.readRecords() to use the stream returned by the modified BucketClient.getRaftLog() function. That allows end-to-end streaming, hence supporting arbitrary-sized responses, which should avoid toString() exceptions or consume excessive amounts of memory. --- .../metadata/bucketclient/LogConsumer.js | 32 +++++++---- package.json | 1 + .../metadata/bucketclient/LogConsumer.js | 55 ++++++++++++++++++- 3 files changed, 73 insertions(+), 15 deletions(-) diff --git a/lib/storage/metadata/bucketclient/LogConsumer.js b/lib/storage/metadata/bucketclient/LogConsumer.js index 4b8062b8d..f671df588 100644 --- a/lib/storage/metadata/bucketclient/LogConsumer.js +++ b/lib/storage/metadata/bucketclient/LogConsumer.js @@ -1,6 +1,7 @@ 'use strict'; // eslint-disable-line const stream = require('stream'); +const jsonStream = require('JSONStream'); const werelogs = require('werelogs'); @@ -89,7 +90,7 @@ class LogConsumer { this.bucketClient.getRaftLog( this.raftSession, _params.startSeq, _params.limit, - false, null, (err, data) => { + false, null, (err, stream) => { if (err) { if (err.code === 404) { // no such raft session, log and ignore @@ -109,18 +110,25 @@ class LogConsumer { 'Error handling record log request', { error: err }); return cb(err); } - let logResponse; - try { - logResponse = JSON.parse(data); - } catch (err) { - this.logger.error('received malformed JSON', - { params }); + // setup a temporary listener until the 'header' event + // is emitted + recordStream.on('error', err => { + this.logger.error('error receiving raft log', + { error: err.message }); return cb(errors.InternalError); - } - logResponse.log.forEach(entry => recordStream.write(entry)); - recordStream.end(); - return cb(null, { info: logResponse.info, - log: recordStream }); + }); + const jsonResponse = stream.pipe(jsonStream.parse('log.*')); + jsonResponse.pipe(recordStream); + stream.on('error', err => recordStream.emit('error', err)); + jsonResponse + .on('header', header => { + // remove temporary listener + recordStream.removeAllListeners('error'); + return cb(null, { info: header.info, + log: recordStream }); + }) + .on('error', err => recordStream.emit('error', err)); + return undefined; }, this.logger.newRequestLogger()); } } diff --git a/package.json b/package.json index 9981ca0bf..545839b30 100644 --- a/package.json +++ b/package.json @@ -24,6 +24,7 @@ "ioredis": "2.4.0", "ipaddr.js": "1.2.0", "joi": "^10.6", + "JSONStream": "^1.0.0", "level": "~1.6.0", "level-sublevel": "~6.6.1", "simple-glob": "^0.1", diff --git a/tests/unit/storage/metadata/bucketclient/LogConsumer.js b/tests/unit/storage/metadata/bucketclient/LogConsumer.js index 7d1440a10..ad382ac87 100644 --- a/tests/unit/storage/metadata/bucketclient/LogConsumer.js +++ b/tests/unit/storage/metadata/bucketclient/LogConsumer.js @@ -1,13 +1,14 @@ 'use strict'; //eslint-disable-line const assert = require('assert'); +const stream = require('stream'); const errors = require('../../../../../lib/errors'); const LogConsumer = require( '../../../../../lib/storage/metadata/bucketclient/LogConsumer.js'); /* eslint-disable max-len */ -const mockedLogResponse = `{ +const mockedLogResponseData = `{ "info": { "start": 10, "end": 11, "prune": 10, "cseq": 11 }, "log": [ { @@ -41,12 +42,44 @@ const mockedLogResponse = `{ ] }`; -const malformedLogResponse = `{ +const malformedLogResponseData = `{ "info": not json!, "log": [] }`; + +const malformedLogEntryData = `{ + "info": { "start": 10, "end": 11, "prune": 10, "cseq": 11 }, + "log": [ + { + "db": "funbucket", + "entries": [ + { + "key": "coolkey0", + "value": "42" + } + ], + "method": 8 + }, + {{{ + ] +}`; /* eslint-enable max-len */ +class MockStream extends stream.Readable { + constructor(dataSource) { + super(); + this._dataSource = dataSource; + } + _read() { + this.push(this._dataSource); + this.push(null); + } +} + +const mockedLogResponse = new MockStream(mockedLogResponseData); +const malformedLogResponse = new MockStream(malformedLogResponseData); +const malformedLogEntry = new MockStream(malformedLogEntryData); + // mock a simple bucketclient to get a fake raft log class BucketClientMock { @@ -63,6 +96,9 @@ class BucketClientMock { case 4: return process.nextTick(() => callback(null, malformedLogResponse)); + case 5: + return process.nextTick(() => callback(null, + malformedLogEntry)); default: assert.fail(); } @@ -153,7 +189,7 @@ describe('raft record log client', () => { done(); }); }); - it('should not crash with malformed log response', done => { + it('should return error with malformed log response', done => { const logClient = new LogConsumer({ bucketClient, raftSession: 4 }); logClient.readRecords({}, err => { @@ -162,5 +198,18 @@ describe('raft record log client', () => { done(); }); }); + it('should emit error event if a log entry is malformed', done => { + const logClient = new LogConsumer({ bucketClient, + raftSession: 5 }); + logClient.readRecords({}, (err, res) => { + assert.ifError(err); + assert(res.info); + assert(res.log); + res.log.on('error', err => { + assert(err); + done(); + }); + }); + }); }); });