Skip to content

Commit

Permalink
Merge branch 'rel/7.4' into fwdport_7.4_master
Browse files Browse the repository at this point in the history
  • Loading branch information
ThibaultRiviere committed Feb 7, 2018
2 parents 393d6ed + 71e5a57 commit 0f1b0da
Show file tree
Hide file tree
Showing 3 changed files with 74 additions and 16 deletions.
32 changes: 20 additions & 12 deletions lib/storage/metadata/bucketclient/LogConsumer.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
'use strict'; // eslint-disable-line

const stream = require('stream');
const jsonStream = require('JSONStream');

const werelogs = require('werelogs');

Expand Down Expand Up @@ -89,7 +90,7 @@ class LogConsumer {

this.bucketClient.getRaftLog(
this.raftSession, _params.startSeq, _params.limit,
false, null, (err, data) => {
false, null, (err, stream) => {
if (err) {
if (err.code === 404) {
// no such raft session, log and ignore
Expand All @@ -109,18 +110,25 @@ class LogConsumer {
'Error handling record log request', { error: err });
return cb(err);
}
let logResponse;
try {
logResponse = JSON.parse(data);
} catch (err) {
this.logger.error('received malformed JSON',
{ params });
// setup a temporary listener until the 'header' event
// is emitted
recordStream.on('error', err => {
this.logger.error('error receiving raft log',
{ error: err.message });
return cb(errors.InternalError);
}
logResponse.log.forEach(entry => recordStream.write(entry));
recordStream.end();
return cb(null, { info: logResponse.info,
log: recordStream });
});
const jsonResponse = stream.pipe(jsonStream.parse('log.*'));
jsonResponse.pipe(recordStream);
stream.on('error', err => recordStream.emit('error', err));
jsonResponse
.on('header', header => {
// remove temporary listener
recordStream.removeAllListeners('error');
return cb(null, { info: header.info,
log: recordStream });
})
.on('error', err => recordStream.emit('error', err));
return undefined;
}, this.logger.newRequestLogger());
}
}
Expand Down
3 changes: 2 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
"engines": {
"node": "6.9.5"
},
"version": "7.2.1",
"version": "7.4.0",
"description": "Common utilities for the S3 project components",
"main": "index.js",
"repository": {
Expand All @@ -24,6 +24,7 @@
"ioredis": "2.4.0",
"ipaddr.js": "1.2.0",
"joi": "^10.6",
"JSONStream": "^1.0.0",
"level": "~1.6.0",
"level-sublevel": "~6.6.1",
"mongodb": "^3.0.1",
Expand Down
55 changes: 52 additions & 3 deletions tests/unit/storage/metadata/bucketclient/LogConsumer.js
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
'use strict'; //eslint-disable-line

const assert = require('assert');
const stream = require('stream');
const errors = require('../../../../../lib/errors');

const LogConsumer = require(
'../../../../../lib/storage/metadata/bucketclient/LogConsumer.js');

/* eslint-disable max-len */
const mockedLogResponse = `{
const mockedLogResponseData = `{
"info": { "start": 10, "end": 11, "prune": 10, "cseq": 11 },
"log": [
{
Expand Down Expand Up @@ -41,12 +42,44 @@ const mockedLogResponse = `{
]
}`;

const malformedLogResponse = `{
const malformedLogResponseData = `{
"info": not json!,
"log": []
}`;

const malformedLogEntryData = `{
"info": { "start": 10, "end": 11, "prune": 10, "cseq": 11 },
"log": [
{
"db": "funbucket",
"entries": [
{
"key": "coolkey0",
"value": "42"
}
],
"method": 8
},
{{{
]
}`;
/* eslint-enable max-len */

class MockStream extends stream.Readable {
constructor(dataSource) {
super();
this._dataSource = dataSource;
}
_read() {
this.push(this._dataSource);
this.push(null);
}
}

const mockedLogResponse = new MockStream(mockedLogResponseData);
const malformedLogResponse = new MockStream(malformedLogResponseData);
const malformedLogEntry = new MockStream(malformedLogEntryData);

// mock a simple bucketclient to get a fake raft log
class BucketClientMock {

Expand All @@ -63,6 +96,9 @@ class BucketClientMock {
case 4:
return process.nextTick(() => callback(null,
malformedLogResponse));
case 5:
return process.nextTick(() => callback(null,
malformedLogEntry));
default:
assert.fail();
}
Expand Down Expand Up @@ -153,7 +189,7 @@ describe('raft record log client', () => {
done();
});
});
it('should not crash with malformed log response', done => {
it('should return error with malformed log response', done => {
const logClient = new LogConsumer({ bucketClient,
raftSession: 4 });
logClient.readRecords({}, err => {
Expand All @@ -162,5 +198,18 @@ describe('raft record log client', () => {
done();
});
});
it('should emit error event if a log entry is malformed', done => {
const logClient = new LogConsumer({ bucketClient,
raftSession: 5 });
logClient.readRecords({}, (err, res) => {
assert.ifError(err);
assert(res.info);
assert(res.log);
res.log.on('error', err => {
assert(err);
done();
});
});
});
});
});

0 comments on commit 0f1b0da

Please sign in to comment.