diff --git a/src/channel.js b/src/channel.js index 2261e1c..b5b3135 100644 --- a/src/channel.js +++ b/src/channel.js @@ -1,6 +1,7 @@ 'use strict' const pushable = require('pull-pushable') +const defaults = require('lodash.defaults') const consts = require('./consts') const EE = require('events') @@ -11,13 +12,16 @@ const log = debug('pull-plex:chan') log.err = debug('pull-plex:chan:err') class Channel extends EE { - constructor (id, name, plex, initiator, open) { + constructor (opts) { super() - this._id = id - this._name = name - this._plex = plex - this._open = open - this._initiator = initiator + + opts = defaults({}, opts, { initiator: false }) + + this._id = opts.id + this._name = opts.name + this._plex = opts.plex + this._open = opts.open + this._initiator = opts.initiator this._endedRemote = false // remote stream ended this._endedLocal = false // local stream ended this._reset = false @@ -41,9 +45,7 @@ class Channel extends EE { if (err && typeof err !== 'boolean') { setImmediate(() => this.emit('error', err)) } - if (this._reset) { return } // don't try closing the channel on reset - - // this.endChan() + // this.endChan() // TODO: do not uncoment this, it will end the channel too early }) this._source = this._msgs @@ -99,6 +101,10 @@ class Channel extends EE { return this._name } + get destroyed () { + return this._endedRemote && this._endedLocal + } + push (data) { this._log('push', data) this._msgs.push(data) @@ -124,6 +130,8 @@ class Channel extends EE { openChan () { this._log('openChan') + if (this.open) { return } // chan already open + let name if (this._name && !Buffer.isBuffer(this._name)) { name = Buffer.from(this._name) @@ -133,7 +141,7 @@ class Channel extends EE { this._plex.push([ this._id, consts.type.NEW, - name != this._id.toString() ? name : null + name !== this._id.toString() ? name : null ]) } diff --git a/src/index.js b/src/index.js index 856ae44..1965835 100644 --- a/src/index.js +++ b/src/index.js @@ -2,6 +2,9 @@ const pull = require('pull-stream') const pushable = require('pull-pushable') +const through = require('pull-through') + +const defautls = require('lodash.defaults') const EE = require('events') @@ -14,18 +17,31 @@ const debug = require('debug') const log = debug('pull-plex') log.err = debug('pull-plex:err') +const MAX_MSG_SIZE = 1024 * 1024 // 1mb + class Plex extends EE { - constructor (initiator, onChan) { + constructor (opts) { super() - if (typeof initiator === 'function') { - onChan = initiator - initiator = true + if (typeof opts === 'boolean') { + opts = { initiator: opts } } - this._initiator = !!initiator + opts = defautls({}, opts, { + initiator: true, + onChan: null, + maxChannels: 10000, + maxMsgSize: MAX_MSG_SIZE, + lazy: false + }) + + this._maxChannels = opts.maxChannels + this._maxMsgSize = opts.maxMsgSize + this._lazy = opts.lazy + + this._initiator = !!opts.initiator this._chanId = this._initiator ? 1 : 0 - this._channels = {} + this._channels = new Map() this._endedRemote = false // remote stream ended this._endedLocal = false // local stream ended @@ -45,8 +61,8 @@ class Plex extends EE { this.close(err) }) - if (onChan) { - this.on('stream', (chan) => onChan(chan, chan.id)) + if (opts.onChan) { + this.on('stream', (chan) => opts.onChan(chan, chan.id)) } this.source = pull( @@ -54,7 +70,15 @@ class Plex extends EE { coder.encode() ) + const self = this this.sink = pull( + through(function (data) { + if (Buffer.byteLength(data) > self._maxMsgSize) { + setImmediate(() => self.emit('error', new Error('message too large!'))) + return this.queue(null) + } + this.queue(data) + }), coder.decode(), (read) => { const next = (end, data) => { @@ -86,12 +110,9 @@ class Plex extends EE { this._endedLocal = true // propagate close to channels - Object - .keys(this._channels) - .forEach((id) => { - const chan = this._channels[id] - if (chan) { return chan.close(err) } - }) + for (let chan of this._channels.values()) { + chan.close(err) + } this.emit('close') } @@ -101,13 +122,18 @@ class Plex extends EE { } reset (err) { - err = err || 'Underlying stream has been closed' + err = err || new Error('Underlying stream has been closed') this._chandata.end(err) this.close(err) } push (data) { this._log('push', data) + if (data.data + && Buffer.byteLength(data.data) > this._maxMsgSize) { + this._chandata.end(new Error('message too large!')) + } + this._chandata.push(data) log('buffer', this._chandata.buffer) } @@ -119,14 +145,20 @@ class Plex extends EE { } createStream (name) { - if (typeof name === 'number') { - name = name.toString() - } - return this._newStream(null, this._initiator, false, name) + if (typeof name === 'number') { name = name.toString() } + const chan = this._newStream(null, this._initiator, false, name) + if (!this._lazy) { chan.openChan() } + return chan } _newStream (id, initiator, open, name) { this._log('_newStream', Array.prototype.slice.call(arguments)) + + if (this._channels.size >= this._maxChannels) { + this.emit('error', new Error('max channels exceeded')) + return + } + if (typeof initiator === 'string') { name = initiator initiator = false @@ -139,28 +171,32 @@ class Plex extends EE { } id = typeof id === 'number' ? id : this._nextChanId(initiator) - const chan = new Channel(id, + const chan = new Channel({ + id, name, - this, + plex: this, initiator, - open || false) + open: open || false + }) chan.once('close', () => { + const chan = this._channels.get(id) this._log('deleting channel', JSON.stringify({ channel: this._name, id: id, - endedLocal: this._channels[id]._endedLocal, - endedRemote: this._channels[id]._endedRemote, - initiator: this._channels[id]._initiator + endedLocal: chan._endedLocal, + endedRemote: chan._endedRemote, + initiator: chan._initiator })) - delete this._channels[id] + this._channels.delete(id) }) - if (this._channels[id]) { - return this.emit('error', `channel with id ${id} already exist!`) + if (this._channels.has(id)) { + this.emit('error', new Error(`channel with id ${id} already exist!`)) + return } - this._channels[id] = chan + this._channels.set(id, chan) return chan } @@ -176,7 +212,7 @@ class Plex extends EE { case consts.type.OUT_MESSAGE: case consts.type.IN_MESSAGE: { - const chan = this._channels[id] + const chan = this._channels.get(id) if (chan) { chan.push(data) } @@ -185,7 +221,7 @@ class Plex extends EE { case consts.type.OUT_CLOSE: case consts.type.IN_CLOSE: { - const chan = this._channels[id] + const chan = this._channels.get(id) if (chan) { chan.close() } @@ -194,12 +230,15 @@ class Plex extends EE { case consts.type.OUT_RESET: case consts.type.IN_RESET: { - const chan = this._channels[id] + const chan = this._channels.get(id) if (chan) { chan.reset() } return } + + default: + this.emit('error', new Error('Invalid message type')) } } } diff --git a/test/channel.spec.js b/test/channel.spec.js index 3f0e5d4..343a8af 100644 --- a/test/channel.spec.js +++ b/test/channel.spec.js @@ -232,13 +232,14 @@ describe('channel', () => { plex2.on('stream', (stream) => { pull( stream, - pull.onEnd((err) => { + pull.collect((err, data) => { expect(err).to.exist() + expect(data[0].toString()).to.eql('hello there!') done() }) ) - sndrSrc.push('hello there!') // should be able to write to closed chan + sndrSrc.push(Buffer.from('hello there!')) aborter.abort(new Error('nasty error!')) }) diff --git a/test/coder.spec.js b/test/coder.spec.js index b071524..f2ea12e 100644 --- a/test/coder.spec.js +++ b/test/coder.spec.js @@ -86,4 +86,4 @@ describe('coder', () => { }) ) }) -}) \ No newline at end of file +}) diff --git a/test/node.js b/test/node.js new file mode 100644 index 0000000..6abcd74 --- /dev/null +++ b/test/node.js @@ -0,0 +1 @@ +require('./old-mplex-interop') diff --git a/test/old-mplex-interop.js b/test/old-mplex-interop.js index c6dad50..53280f4 100644 --- a/test/old-mplex-interop.js +++ b/test/old-mplex-interop.js @@ -10,7 +10,6 @@ const concat = require('concat-stream') const through = require('through2') const net = require('net') const chunky = require('chunky') -const pump = require('pump') const toStream = require('pull-stream-to-stream') const MplexCore = require('libp2p-mplex/src/internals') @@ -23,11 +22,9 @@ describe('node stream multiplex interop', () => { const stream1 = toStream(pullPlex.createStream()) const stream2 = toStream(pullPlex.createStream()) - function onStream (stream, id) { + const plex2 = new MplexCore({ initiator: false }, (stream) => { stream.pipe(collect()) - } - - const plex2 = new MplexCore({ initiator: false }, onStream) + }) plex1.pipe(plex2) @@ -58,12 +55,12 @@ describe('node stream multiplex interop', () => { const stream1 = plex1.createStream() const stream2 = plex1.createStream() - function onStream (pullStream, id) { - const stream = toStream(pullStream) - stream.pipe(collect()) - } - - const pullPlex = new Plex(onStream) + const pullPlex = new Plex({ + onChan: (pullStream) => { + const stream = toStream(pullStream) + stream.pipe(collect()) + } + }) const plex2 = toStream(pullPlex) plex1.pipe(plex2) @@ -94,7 +91,7 @@ describe('node stream multiplex interop', () => { const pullPlex = new Plex(true) const plex1 = toStream(pullPlex) - const plex2 = new MplexCore(function onStream (stream, id) { + const plex2 = new MplexCore((stream) => { const uppercaser = through(function (chunk, e, callback) { this.push(Buffer.from(chunk.toString().toUpperCase())) this.end() @@ -133,14 +130,17 @@ describe('node stream multiplex interop', () => { it('old2new: two way piping works with 2 sub-streams', (done) => { const plex1 = new MplexCore() - const plex2 = toStream(new Plex(false, function onStream (pstream, id) { - const stream = toStream(pstream) - const uppercaser = through(function (chunk, e, callback) { - this.push(Buffer.from(chunk.toString().toUpperCase())) - this.end() - callback() - }) - stream.pipe(uppercaser).pipe(stream) + const plex2 = toStream(new Plex({ + initiator: false, + onChan: (pstream) => { + const stream = toStream(pstream) + const uppercaser = through(function (chunk, e, callback) { + this.push(Buffer.from(chunk.toString().toUpperCase())) + this.end() + callback() + }) + stream.pipe(uppercaser).pipe(stream) + } })) plex1.pipe(plex2).pipe(plex1) @@ -170,26 +170,6 @@ describe('node stream multiplex interop', () => { } }) - it.skip('destroy', (done) => { - const pullPlex = new Plex() - const plex1 = toStream(pullPlex) - - const stream1 = toStream(pullPlex.createStream()) - - const plex2 = new MplexCore(function onStream (stream, id) { - stream.on('error', function (err) { - expect(err.message).to.equal('0 had an error') - done() - }) - }) - - plex1.pipe(plex2) - - stream1.write(Buffer.from('hello')) - // pull-stream-to-stream destroy doesn't take parameters, so error never gets emited - stream1.destroy(new Error('0 had an error')) - }) - // need to implement message size checks it.skip('testing invalid data error', (done) => { const plex = toStream(new Plex()) @@ -281,7 +261,7 @@ describe('node stream multiplex interop', () => { const stream1 = toStream(pullPlex.createStream()) const stream2 = toStream(pullPlex.createStream()) - const plex2 = new MplexCore(function onStream (stream, id) { + const plex2 = new MplexCore((stream) => { stream.pipe(collect()) }) @@ -330,9 +310,12 @@ describe('node stream multiplex interop', () => { const stream1 = plex1.createStream() const stream2 = plex1.createStream() - const pullStream = new Plex(false, function onStream (pullStream, id) { - const stream = toStream(pullStream) - stream.pipe(collect()) + const pullStream = new Plex({ + initiator: false, + onChan: (pullStream) => { + const stream = toStream(pullStream) + stream.pipe(collect()) + } }) const plex2 = toStream(pullStream) @@ -366,6 +349,7 @@ describe('node stream multiplex interop', () => { } }) + // not sure how to do this with pull streams (prob not required?) it.skip('prefinish + corking', (done) => { const pullPlex = new Plex(true) const plex = toStream(pullPlex) @@ -552,7 +536,7 @@ describe('node stream multiplex interop', () => { it('old2new: half close a half closed muxed stream', (done) => { const plex1 = new MplexCore({ halfOpen: true }) - const pullPlex2 = new Plex() + const pullPlex2 = new Plex(false) const plex2 = toStream(pullPlex2) plex1.nameTag = 'plex1:' @@ -560,22 +544,22 @@ describe('node stream multiplex interop', () => { plex1.pipe(plex2).pipe(plex1) - pullPlex2.on('stream', function (chan, id) { + pullPlex2.on('stream', (chan, id) => { const stream = toStream(chan) expect(stream).to.exist() expect(id).to.exist() - stream.on('data', function (data) { + stream.on('data', (data) => { expect(data).to.eql(Buffer.from('some data')) }) - stream.on('end', function () { + stream.on('end', () => { stream.write(Buffer.from('hello world')) stream.end() }) - stream.on('error', function (err) { + stream.on('error', (err) => { expect(err).to.not.exist() console.dir(err) }) @@ -583,15 +567,14 @@ describe('node stream multiplex interop', () => { const stream = plex1.createStream() - stream.on('data', function (data) { + stream.on('data', (data) => { expect(data).to.eql(Buffer.from('hello world')) }) // we can't make pull stream halfOpen with pull-stream-to-pull-stream // so it will error out with a writting after EOF error, so just ignore - stream.on('error', function (err) { - // expect(err).to.not.exist() - // console.dir(err) + stream.on('error', (err) => { + expect(err).to.not.exist() }) stream.on('end', function () { @@ -602,57 +585,4 @@ describe('node stream multiplex interop', () => { stream.end() }) - - it.skip('underlying error is propagated to muxed streams', (done) => { - let count = 0 - - function check () { - if (++count === 4) { - done() - } - } - - const plex1 = new MplexCore() - const plex2 = new MplexCore() - - let socket - - plex2.on('stream', function (stream) { - stream.on('error', function (err) { - expect(err).to.exist() - check() - }) - - stream.on('close', function () { - check() - }) - - socket.destroy() - }) - - const stream1to2 = plex1.createStream(1337) - - stream1to2.on('error', function (err) { - expect(err).to.exist() - check() - }) - - stream1to2.on('close', function () { - check() - }) - - const server = net.createServer(function (stream) { - pump(plex2, stream) - pump(stream, plex2) - server.close() - }) - - server.listen(0, function () { - const port = server.address().port - socket = net.connect(port) - - pump(plex1, socket) - pump(socket, plex1) - }) - }) }) diff --git a/test/plex.spec.js b/test/plex.spec.js index 3adea30..ca64cba 100644 --- a/test/plex.spec.js +++ b/test/plex.spec.js @@ -10,11 +10,13 @@ chai.use(dirtyChai) const pull = require('pull-stream') const pair = require('pull-pair/duplex') +const abortable = require('pull-abortable') +const coder = require('../src/coder') const Plex = require('../src') describe('plex', () => { - it.skip(`reset should close both ends`, (done) => { + it(`reset should close both ends`, (done) => { const p = pair() const plex1 = new Plex(true) @@ -41,6 +43,116 @@ describe('plex', () => { plex1.reset() }) + it(`closing stream should close all channels`, (done) => { + const aborter = abortable() + const plex1 = new Plex() + + plex1.on('error', (() => {})) + + pull(plex1, aborter) + + expect(2).check(done) + + const stream1 = plex1.createStream() + stream1.on('error', (() => {})) + + const stream2 = plex1.createStream() + stream2.on('error', (() => {})) + pull( + stream1, + pull.onEnd((err) => { + expect(err).to.exist().mark() + }) + ) + + pull( + stream2, + pull.onEnd((err) => { + expect(err).to.exist().mark() + }) + ) + + aborter.abort() + }) + + it(`error should propagate to all channels`, (done) => { + const aborter = abortable() + const plex1 = new Plex() + + plex1.on('error', (() => {})) + + pull(plex1, aborter) + + expect(2).check(done) + + const stream1 = plex1.createStream() + stream1.on('error', (() => {})) + + const stream2 = plex1.createStream() + stream2.on('error', (() => {})) + + pull( + stream1, + pull.onEnd((err) => { + expect(err.message).to.eql('nasty error').mark() + }) + ) + + pull( + stream2, + pull.onEnd((err) => { + expect(err.message).to.eql('nasty error').mark() + }) + ) + + aborter.abort(new Error('nasty error')) + }) + + it(`should fail if max number of channels exceeded`, (done) => { + const plex1 = new Plex({ + maxChannels: 10, + lazy: true + }) + + plex1.on('error', (err) => { + expect(err.message).to.eql('max channels exceeded') + done() + }) + + for (let i = 0; i < 11; i++) { + plex1.createStream() + } + }) + + it(`should restrict message size`, (done) => { + const plex = new Plex() + + plex.on('error', function (err) { + expect(err.message).to.equal('message too large!') + done() + }) + + pull( + pull.values([Array(1048576 + 1).join('\xff')]), // 1mb + plex + ) + }) + + it(`should validate message`, (done) => { + const plex = new Plex() + + plex.on('error', function (err) { + expect(err.message).to.equal('Invalid message type') + done() + }) + + pull( + pull.values([[1, 7]]), + coder.encode(), // invalid message type + plex + ) + }) + describe(`check id`, () => [true, false].forEach((initiator) => { it(`id should be ${initiator ? 'odd' : 'even'}`, () => { const plex = new Plex(initiator)