From 9fd3d28195f032d16e69f7bf341ab11539b03609 Mon Sep 17 00:00:00 2001 From: Dmitriy Ryajov Date: Sun, 8 Apr 2018 13:37:08 -0600 Subject: [PATCH] fix: correctly read chunked data --- src/channel.js | 15 ++++++++-- src/coder.js | 61 ++++++++++++++++++++++----------------- src/index.js | 8 ++--- test/old-mplex-interop.js | 61 ++++++++++++++++++++++++++++++++++++--- 4 files changed, 108 insertions(+), 37 deletions(-) diff --git a/src/channel.js b/src/channel.js index 585952f..92087bb 100644 --- a/src/channel.js +++ b/src/channel.js @@ -14,7 +14,7 @@ class Channel extends EE { constructor (id, name, plex, initiator, open) { super() this._id = id - this._name = name || this._id.toString() + this._name = name this._plex = plex this._open = open this._initiator = initiator @@ -118,11 +118,16 @@ class Channel extends EE { openChan () { this._log('openChan') + let name + if (this._name && !Buffer.isBuffer(this._name)) { + name = Buffer.from(this._name) + } + this.open = true this._plex.push([ this._id, consts.type.NEW, - this._name + name != this._id.toString() ? name : null ]) } @@ -133,6 +138,9 @@ class Channel extends EE { this.openChan() } + if (!Buffer.isBuffer(data)) { + data = Buffer.from(data) + } this._plex.push([ this._id, this._initiator @@ -153,7 +161,8 @@ class Channel extends EE { this._id, this._initiator ? consts.type.IN_CLOSE - : consts.type.OUT_CLOSE + : consts.type.OUT_CLOSE, + Buffer.from([0]) ]) } diff --git a/src/coder.js b/src/coder.js index 7852bae..36b8b5e 100644 --- a/src/coder.js +++ b/src/coder.js @@ -2,8 +2,6 @@ const pull = require('pull-stream') const varint = require('varint') -const lp = require('pull-length-prefixed') -const cat = require('pull-cat') const through = require('pull-through') const debug = require('debug') @@ -15,15 +13,13 @@ exports.encode = () => { return pull( through(function (msg) { const seq = [Buffer.from(varint.encode(msg[0] << 3 | msg[1]))] + const len = msg[2] ? Buffer.byteLength(msg[2]) : 0 + seq.push(Buffer.from(varint.encode(len))) // send empty body + this.queue(Buffer.concat(seq)) // send header - if (msg[2]) { - seq.push(Buffer.from(varint.encode(Buffer.byteLength(msg[2])))) - seq.push(Buffer.from(msg[2])) - } else { - seq.push(Buffer.from(varint.encode(0))) + if (len) { + this.queue(msg[2]) } - - this.queue(Buffer.concat(seq)) }) ) } @@ -35,32 +31,33 @@ let States = { let state = States.PARSING exports.decode = () => { const decode = (msg) => { - let offset = 0 - const h = varint.decode(msg) - offset += varint.decode.bytes - let length, data try { + let offset = 0 + let length = 0 + const h = varint.decode(msg) + offset += varint.decode.bytes length = varint.decode(msg, offset) offset += varint.decode.bytes + const message = { + id: h >> 3, + type: h & 7, + data: Buffer.alloc(length) // instead of allocating a new buff use a mem pool here + } + + state = States.READING + return [msg.slice(offset), message, length] } catch (err) { log.err(err) // ignore if data is empty + return [msg, undefined, undefined] } - - const message = { - id: h >> 3, - type: h & 7, - data: Buffer.alloc(length) // instead of allocating a new buff use a mem pool here - } - - state = States.READING - return [msg.slice(offset), message, length] } + let pos = 0 const read = (msg, data, length) => { let left = length - msg.length if (msg.length > 0) { - const buff = left > 0 ? msg.slice() : msg.slice(0, length) - buff.copy(data) + const buff = msg.slice(0, length - left) + pos += buff.copy(data, pos) msg = msg.slice(buff.length) } if (left <= 0) { state = States.PARSING } @@ -68,12 +65,23 @@ exports.decode = () => { } let offset = 0 - let message = {} + let message = null let length = 0 + let buffer = null return through(function (msg) { while (msg.length) { if (States.PARSING === state) { - [msg, message, length] = decode(msg) + if (!buffer) { + buffer = Buffer.from(msg) + } else { + buffer = Buffer.concat([buffer, msg]) + } + + [msg, message, length] = decode(buffer) + if (!message && !length) { + return // read more + } + buffer = null } if (States.READING === state) { @@ -83,6 +91,7 @@ exports.decode = () => { offset = 0 message = {} length = 0 + pos = 0 } } } diff --git a/src/index.js b/src/index.js index 6435c8e..e415433 100644 --- a/src/index.js +++ b/src/index.js @@ -24,7 +24,7 @@ class Plex extends EE { } this._initiator = !!initiator - this._chanId = this._initiator ? 1 : 0 + this._chanId = this._initiator ? 0 : 1 this._channels = {} this._endedRemote = false // remote stream ended this._endedLocal = false // local stream ended @@ -119,6 +119,9 @@ class Plex extends EE { } createStream (name) { + if (typeof name === 'number') { + name = name.toString() + } return this._newStream(null, this._initiator, false, name) } @@ -136,9 +139,6 @@ class Plex extends EE { } id = typeof id === 'number' ? id : this._nextChanId(initiator) - name = typeof name === 'number' ? name.toString() : name - name = name == null ? id.toString() : name - name = !name.length ? id.toString() : name const chan = new Channel(id, name, this, diff --git a/test/old-mplex-interop.js b/test/old-mplex-interop.js index e65a35e..c88285e 100644 --- a/test/old-mplex-interop.js +++ b/test/old-mplex-interop.js @@ -265,7 +265,7 @@ describe('node stream multiplex interop', () => { }) }) - it('chunks', (done) => { + it('new2old: chunks', (done) => { let times = 100 ;(function chunk () { const collect = collector(function () { @@ -276,13 +276,65 @@ describe('node stream multiplex interop', () => { } }) - const plex1 = new MplexCore() + const pullPlex = new Plex(true) + const plex1 = toStream(pullPlex) + const stream1 = toStream(pullPlex.createStream()) + const stream2 = toStream(pullPlex.createStream()) + + const plex2 = new MplexCore(function onStream (stream, id) { + stream.pipe(collect()) + }) + + plex1.pipe(through(function (buf, enc, next) { + const bufs = chunky(buf) + for (let i = 0; i < bufs.length; i++) this.push(bufs[i]) + next() + })).pipe(plex2) + + stream1.write(Buffer.from('hello')) + stream2.write(Buffer.from('world')) + stream1.end() + stream2.end() + })() + + function collector (cb) { + let pending = 2 + const results = [] + + return function () { + return concat(function (data) { + results.push(data.toString()) + if (--pending === 0) { + results.sort() + expect(results[0].toString()).to.equal('hello') + expect(results[1].toString()).to.equal('world') + cb() + } + }) + } + } + }) + + it('old2new: chunks', (done) => { + let times = 100 + ;(function chunk () { + const collect = collector(function () { + if (--times === 0) { + done() + } else { + chunk() + } + }) + + const plex1 = new MplexCore({ initiator: true }) const stream1 = plex1.createStream() const stream2 = plex1.createStream() - const plex2 = new MplexCore(function onStream (stream, id) { + const pullStream = new Plex(false, function onStream (pullStream, id) { + const stream = toStream(pullStream) stream.pipe(collect()) }) + const plex2 = toStream(pullStream) plex1.pipe(through(function (buf, enc, next) { const bufs = chunky(buf) @@ -315,7 +367,8 @@ describe('node stream multiplex interop', () => { }) it('prefinish + corking', (done) => { - const plex = new MplexCore() + const pullPlex = new Plex(true) + const plex = toStream(pullPlex) let async = false plex.on('prefinish', function () {