diff --git a/.aegir.js b/.aegir.js deleted file mode 100644 index b0484cc..0000000 --- a/.aegir.js +++ /dev/null @@ -1,35 +0,0 @@ -'use strict' - -const WSlibp2p = require('libp2p-websockets') -const multiaddr = require('multiaddr') -const pull = require('pull-stream') - -const multiplex = require('./src') - -let listener -const boot = (done) => { - const ws = new WSlibp2p() - const mh = multiaddr('/ip4/127.0.0.1/tcp/9095/ws') - listener = ws.createListener((transportSocket) => { - const muxedConn = multiplex.listener(transportSocket) - muxedConn.on('stream', (connRx) => { - const connTx = muxedConn.newStream() - pull(connRx, connTx, connRx) - }) - }) - - listener.listen(mh, done) -} - -const shutdown = (done) => { - listener.close(done) -} - -module.exports = { - hooks: { - browser: { - pre: boot, - post: shutdown - } - } -} diff --git a/.gitignore b/.gitignore index d14c494..db79d1f 100644 --- a/.gitignore +++ b/.gitignore @@ -1,13 +1,7 @@ +node_modules +coverage +.nyc_output package-lock.json yarn.lock docs - -**/node_modules -**/*.log -test/setup/tmp-disposable-nodes-addrs.json dist -coverage -**/*.swp -examples/sub-module/**/bundle.js -examples/sub-module/**/*-minified.js -examples/sub-module/*-bundle.js diff --git a/README.md b/README.md index 4335dca..d5d8051 100644 --- a/README.md +++ b/README.md @@ -1,5 +1,4 @@ -js-libp2p-mplex -=================== +# js-libp2p-mplex [![](https://img.shields.io/badge/made%20by-Protocol%20Labs-blue.svg?style=flat-square)](http://protocol.ai) [![](https://img.shields.io/badge/project-libp2p-yellow.svg?style=flat-square)](http://libp2p.io/) @@ -12,100 +11,146 @@ js-libp2p-mplex ![](https://img.shields.io/badge/npm-%3E%3D6.0.0-orange.svg?style=flat-square) ![](https://img.shields.io/badge/Node.js-%3E%3D10.0.0-orange.svg?style=flat-square) -> JavaScript implementation of https://github.com/libp2p/mplex +> JavaScript implementation of [mplex](https://github.com/libp2p/specs/tree/master/mplex). [![](https://github.com/libp2p/interface-stream-muxer/raw/master/img/badge.png)](https://github.com/libp2p/interface-stream-muxer) ## Lead Maintainer -[Vasco Santos](https://github.com/vasco-santos). +[Vasco Santos](https://github.com/vasco-santos) + +## Install + +```sh +npm install libp2p-mplex +``` ## Usage -Let's define a `listener.js`, which starts a TCP server on port 9999 and waits for a connection. Once we get a connection, we wait for a stream. And finally, once we have the stream, we pull the data from that stream, and printing it to the console. - -```JavaScript -const mplex = require('libp2p-mplex') -const tcp = require('net') -const pull = require('pull-stream') -const toPull = require('stream-to-pull-stream') - -const listener = tcp.createServer((socket) => { - console.log('[listener] Got connection!') - - const muxer = mplex.listener(toPull(socket)) - - muxer.on('stream', (stream) => { - console.log('[listener] Got stream!') - pull( - stream, - pull.drain((data) => { - console.log('[listener] Received:') - console.log(data.toString()) - }) - ) - }) -}) +```js +const Mplex = require('libp2p-mplex') +const pipe = require('it-pipe') -listener.listen(9999, () => { - console.log('[listener] listening on 9999') +const muxer = new Mplex({ + onStream: stream => { // Receive a duplex stream from the remote + // ...receive data from the remote and optionally send data back + } }) + +pipe(conn, muxer, conn) // conn is duplex connection to another peer + +const stream = muxer.newStream() // Create a new duplex stream to the remote + +// Use the duplex stream to send some data to the remote... +pipe([1, 2, 3], stream) ``` -Now, let's define `dialer.js` who will connect to our `listener` over a TCP socket. Once we have that, we'll put a message in the stream for our `listener`. +## API -```JavaScript -const mplex = require('libp2p-mplex') -const tcp = require('net') -const pull = require('pull-stream') -const toPull = require('stream-to-pull-stream') +### `const muxer = new Mplex([options])` -const socket = tcp.connect(9999) +Create a new _duplex_ stream that can be piped together with a connection in order to allow multiplexed communications. -const muxer = mplex.dialer(toPull(socket)) +e.g. -console.log('[dialer] opening stream') -const stream = muxer.newStream((err) => { - console.log('[dialer] opened stream') - if (err) throw err -}) +```js +const Mplex = require('libp2p-mplex') +const pipe = require('it-pipe') -pull( - pull.values(['hey, how is it going. I am the dialer']), - stream -) +// Create a duplex muxer +const muxer = new Mplex() + +// Use the muxer in a pipeline +pipe(conn, muxer, conn) // conn is duplex connection to another peer ``` -Now we can first run `listener.js` and then `dialer.js` to see the -following output: +`options` is an optional `Object` that may have the following properties: + +* `onStream` - A function called when receiving a new stream from the remote. e.g. + ```js + // Receive a new stream on the muxed connection + const onStream = stream => { + // Read from this stream and write back to it (echo server) + pipe( + stream, + source => (async function * () { + for await (const data of source) yield data + })(), + stream + ) + } + const muxer = new Mplex({ onStream }) + // ... + ``` + **Note:** The `onStream` function can be passed in place of the `options` object. i.e. + ```js + new Mplex(stream => { /* ... */ }) + ``` +* `signal` - An [`AbortSignal`](https://developer.mozilla.org/en-US/docs/Web/API/AbortSignal) which can be used to abort the muxer, _including_ all of it's multiplexed connections. e.g. + ```js + const controller = new AbortController() + const muxer = new Mplex({ signal: controller.signal }) + + pipe(conn, muxer, conn) + + controller.abort() + ``` +* `maxMsgSize` - The maximum size in bytes the data field of multiplexed messages may contain (default 1MB) + +### `muxer.onStream` + +Use this property as an alternative to passing `onStream` as an option to the `Mplex` constructor. + +### `const stream = muxer.newStream([options])` + +Initiate a new stream with the remote. Returns a [duplex stream](https://gist.github.com/alanshaw/591dc7dd54e4f99338a347ef568d6ee9#duplex-it). + +e.g. -*listener.js* +```js +// Create a new stream on the muxed connection +const stream = muxer.newStream() -``` -$ node listener.js -[listener] listening on 9999 -[listener] Got connection! -[listener] Got stream! -[listener] Received: -hey, how is it going. I am the dialer +// Use this new stream like any other duplex stream: +pipe([1, 2, 3], stream, consume) ``` -*dialer.js* +In addition to `sink` and `source` properties, this stream also has the following API, that will **normally _not_ be used by stream consumers**. -``` -$ node dialer.js -[dialer] opening stream -[dialer] opened stream -``` +#### `stream.close()` -## Install +Closes the stream for **reading**. If iterating over the source of this stream in a `for await of` loop, it will return (exit the loop) after any buffered data has been consumed. -```sh -> npm install libp2p-mplex -``` +This function is called automatically by the muxer when it receives a `CLOSE` message from the remote. -## API +The source will return normally, the sink will continue to consume. -```js -const mplex = require('libp2p-mplex') -``` +#### `stream.abort([err])` + +Closes the stream for **reading** _and_ **writing**. This should be called when a _local error_ has occurred. + +Note, if called without an error any buffered data in the source can still be consumed and the stream will end normally. + +This will cause a `RESET` message to be sent to the remote, _unless_ the sink has already ended. + +The sink will return and the source will throw if an error is passed or return normally if not. + +#### `stream.reset()` + +Closes the stream _immediately_ for **reading** _and_ **writing**. This should be called when a _remote error_ has occurred. + +This function is called automatically by the muxer when it receives a `RESET` message from the remote. + +The sink will return and the source will throw. + +## Contribute + +The libp2p implementation in JavaScript is a work in progress. As such, there are a few things you can do right now to help out: + + - Go through the modules and **check out existing issues**. This is especially useful for modules in active development. Some knowledge of IPFS/libp2p may be required, as well as the infrastructure behind it - for instance, you may need to read up on p2p and more complex operations like muxing to be able to help technically. + - **Perform code reviews**. More eyes will help a) speed the project along b) ensure quality and c) reduce possible future bugs. + - **Add tests**. There can never be enough tests. + +## License + +[MIT](LICENSE) © Protocol Labs diff --git a/examples/dialer.js b/examples/dialer.js index 96ef141..aec6d88 100644 --- a/examples/dialer.js +++ b/examples/dialer.js @@ -2,21 +2,42 @@ 'use strict' const tcp = require('net') -const pull = require('pull-stream') -const toPull = require('stream-to-pull-stream') -const multiplex = require('../src') +const pipe = require('it-pipe') +const AbortController = require('abort-controller') +const { toIterable } = require('./util') +const Mplex = require('../src') -const socket = tcp.connect(9999) +const socket = toIterable(tcp.connect(9999)) +console.log('[dialer] socket stream opened') -const muxer = multiplex.dialer(toPull(socket)) +const controller = new AbortController() -console.log('[dialer] opening stream') -const stream = muxer.newStream((err) => { - console.log('[dialer] opened stream') - if (err) throw err -}) +const muxer = new Mplex({ signal: controller.signal }) -pull( - pull.values(['hey, how is it going. I am the dialer']), - stream -) +const pipeMuxerToSocket = async () => { + await pipe(muxer, socket, muxer) + console.log('[dialer] socket stream closed') +} + +const sendAndReceive = async () => { + const muxedStream = muxer.newStream() + console.log('[dialer] muxed stream opened') + + await pipe( + ['hey, how is it going. I am the dialer'], + muxedStream, + async source => { + for await (const chunk of source) { + console.log('[dialer] received:') + console.log(chunk.toString()) + } + } + ) + console.log('[dialer] muxed stream closed') + + // Close the socket stream after 1s + setTimeout(() => controller.abort(), 1000) +} + +pipeMuxerToSocket() +sendAndReceive() diff --git a/examples/listener.js b/examples/listener.js index e57671e..37f167f 100644 --- a/examples/listener.js +++ b/examples/listener.js @@ -2,27 +2,34 @@ 'use strict' const tcp = require('net') -const pull = require('pull-stream') -const toPull = require('stream-to-pull-stream') -const multiplex = require('../src') +const pipe = require('it-pipe') +const { toIterable } = require('./util') +const Mplex = require('../src') -const listener = tcp.createServer((socket) => { +const listener = tcp.createServer(async socket => { console.log('[listener] Got connection!') - const muxer = multiplex.listener(toPull(socket)) - - muxer.on('stream', (stream) => { - console.log('[listener] Got stream!') - pull( - stream, - pull.drain((data) => { - console.log('[listener] Received:') - console.log(data.toString()) - }) - ) + const muxer = new Mplex({ + async onStream (stream) { + console.log('[listener] muxed stream opened') + await pipe( + stream, + source => (async function * () { + for await (const chunk of source) { + console.log('[listener] received:') + console.log(chunk.toString()) + yield 'thanks for the message, I am the listener' + } + })(), + stream + ) + console.log('[listener] muxed stream closed') + } }) -}) -listener.listen(9999, () => { - console.log('[listener] listening on 9999') + socket = toIterable(socket) + await pipe(socket, muxer, socket) + console.log('[listener] socket stream closed') }) + +listener.listen(9999, () => console.log('[listener] listening on 9999')) diff --git a/examples/util.js b/examples/util.js new file mode 100644 index 0000000..9fc3182 --- /dev/null +++ b/examples/util.js @@ -0,0 +1,18 @@ +// Simple convertion of Node.js duplex to iterable duplex (no backpressure) +exports.toIterable = socket => { + return { + sink: async source => { + try { + for await (const chunk of source) { + // Chunk is a BufferList, pass the underlying buffer to to the socket + socket.write(chunk.slice()) + } + } catch (err) { + // If not an abort then destroy the socket with an error + return socket.destroy(err.code === 'ABORT_ERR' ? null : err) + } + socket.end() + }, + source: socket + } +} diff --git a/package.json b/package.json index 4d535ff..edff9eb 100644 --- a/package.json +++ b/package.json @@ -14,6 +14,7 @@ "test": "aegir test -t node -t browser", "test:node": "aegir test -t node", "test:browser": "aegir test -t browser", + "coverage": "nyc --reporter=text --reporter=lcov npm run test:node", "release": "aegir release -t node -t browser", "release-minor": "aegir release --type minor -t node -t browser", "release-major": "aegir release --type major -t node -t browser" @@ -23,6 +24,13 @@ "url": "git+https://github.com/libp2p/js-libp2p-mplex.git" }, "keywords": [ + "multiplex", + "mplex", + "stream", + "muxer", + "connection", + "duplex", + "libp2p", "IPFS" ], "license": "MIT", @@ -31,29 +39,24 @@ }, "homepage": "https://github.com/libp2p/js-libp2p-mplex#readme", "devDependencies": { - "aegir": "^18.2.1", + "aegir": "^20.0.0", "chai": "^4.2.0", - "chai-checkmark": "^1.0.1", "dirty-chai": "^2.0.1", - "interface-stream-muxer": "~0.6.0", - "libp2p-tcp": "~0.13.0", - "libp2p-websockets": "~0.12.2", - "multiaddr": "^6.0.6", - "pull-pair": "^1.1.0", - "through2": "^2.0.3" + "interface-stream-muxer": "^0.7.0", + "p-defer": "^3.0.0", + "random-bytes": "^1.0.0", + "random-int": "^2.0.0" }, "dependencies": { - "async": "^2.6.2", - "chunky": "0.0.0", - "concat-stream": "^1.6.2", + "abort-controller": "^3.0.0", + "abortable-iterator": "^2.1.0", + "async-iterator-to-pull-stream": "^1.3.0", + "bl": "^3.0.0", "debug": "^4.1.1", "interface-connection": "~0.3.3", - "pull-catch": "^1.0.1", + "it-pipe": "^1.0.1", + "it-pushable": "^1.3.1", "pull-stream": "^3.6.9", - "pull-stream-to-stream": "^1.3.4", - "pump": "^3.0.0", - "readable-stream": "^3.1.1", - "stream-to-pull-stream": "^1.7.3", "varint": "^5.0.0" }, "contributors": [ diff --git a/src/codec.js b/src/codec.js deleted file mode 100644 index bed17c4..0000000 --- a/src/codec.js +++ /dev/null @@ -1,3 +0,0 @@ -'use strict' - -module.exports = '/mplex/6.7.0' diff --git a/src/coder/decode.js b/src/coder/decode.js new file mode 100644 index 0000000..a19e112 --- /dev/null +++ b/src/coder/decode.js @@ -0,0 +1,71 @@ +'use strict' + +const varint = require('varint') +const BufferList = require('bl') + +// Decode a chunk and yield an _array_ of decoded messages +module.exports = source => (async function * decode () { + const decoder = new Decoder() + for await (const chunk of source) { + const msgs = decoder.write(chunk) + if (msgs.length) yield msgs + } +})() + +class Decoder { + constructor () { + this._buffer = new BufferList() + // optimization to allow varint to take a BufferList (well a proxy to) + this._bufferProxy = new Proxy({}, { + get: (_, prop) => prop[0] === 'l' ? this._buffer[prop] : this._buffer.get(parseInt(prop)) + }) + this._headerInfo = null + } + + /** + * @param {Buffer|BufferList} chunk + * @returns {object[]} An array of message objects + */ + write (chunk) { + if (!chunk || !chunk.length) return [] + + this._buffer.append(chunk) + const msgs = [] + + while (true) { + if (!this._headerInfo) { + try { + this._headerInfo = this._decodeHeader(this._bufferProxy) + } catch (_) { + break // We haven't received enough data yet + } + } + + const { id, type, length, offset } = this._headerInfo + const bufferedDataLength = this._buffer.length - offset + + if (bufferedDataLength < length) break // not enough data yet + + msgs.push({ id, type, data: this._buffer.shallowSlice(offset, offset + length) }) + + this._buffer.consume(offset + length) + this._headerInfo = null + } + + return msgs + } + + /** + * Attempts to decode the message header from the buffer + * @private + * @param {Buffer} data + * @returns {*} message header (id, type, offset, length) + */ + _decodeHeader (data) { + const h = varint.decode(data) + let offset = varint.decode.bytes + const length = varint.decode(data, offset) + offset += varint.decode.bytes + return { id: h >> 3, type: h & 7, offset, length } + } +} diff --git a/src/coder/encode.js b/src/coder/encode.js new file mode 100644 index 0000000..5e69a06 --- /dev/null +++ b/src/coder/encode.js @@ -0,0 +1,54 @@ +'use strict' + +const varint = require('varint') +const BufferList = require('bl') + +const POOL_SIZE = 10 * 1024 + +class Encoder { + constructor () { + this._pool = Buffer.allocUnsafe(POOL_SIZE) + this._poolOffset = 0 + } + + /** + * Encodes the given message and returns it and its header + * @param {*} msg The message object to encode + * @returns {Buffer|Buffer[]} + */ + write (msg) { + const pool = this._pool + let offset = this._poolOffset + + varint.encode(msg.id << 3 | msg.type, pool, offset) + offset += varint.encode.bytes + varint.encode(msg.data ? msg.data.length : 0, pool, offset) + offset += varint.encode.bytes + + const header = pool.slice(this._poolOffset, offset) + + if (POOL_SIZE - offset < 100) { + this._pool = Buffer.allocUnsafe(POOL_SIZE) + this._poolOffset = 0 + } else { + this._poolOffset = offset + } + + if (!msg.data) return header + + return [header, msg.data] + } +} + +const encoder = new Encoder() + +// Encode one or more messages and yield a BufferList of encoded messages +module.exports = source => (async function * encode () { + for await (const msg of source) { + if (Array.isArray(msg)) { + yield new BufferList(msg.map(m => encoder.write(m))) + } else { + yield new BufferList(encoder.write(msg)) + } + } +})() diff --git a/src/coder/index.js b/src/coder/index.js new file mode 100644 index 0000000..aa271e4 --- /dev/null +++ b/src/coder/index.js @@ -0,0 +1,4 @@ +'use strict' + +exports.encode = require('./encode') +exports.decode = require('./decode') diff --git a/src/index.js b/src/index.js index 006eeaf..3cfe70b 100644 --- a/src/index.js +++ b/src/index.js @@ -1,29 +1,3 @@ 'use strict' -const toStream = require('pull-stream-to-stream') -const MplexCore = require('./internals') -const MULTIPLEX_CODEC = require('./codec') -const Muxer = require('./muxer') - -const pump = require('pump') - -function create (rawConn, isListener) { - const stream = toStream(rawConn) - - // Cleanup and destroy the connection when it ends as the converted stream - // doesn't emit 'close' but .destroy will trigger a 'close' event. - stream.on('end', () => stream.destroy()) - - const mpx = new MplexCore({ - halfOpen: true, - initiator: !isListener - }) - pump(stream, mpx, stream) - - return new Muxer(rawConn, mpx) -} - -exports = module.exports = create -exports.multicodec = MULTIPLEX_CODEC -exports.dialer = (conn) => create(conn, false) -exports.listener = (conn) => create(conn, true) +module.exports = require('./mplex') diff --git a/src/internals/channel.js b/src/internals/channel.js deleted file mode 100644 index 56e7f5f..0000000 --- a/src/internals/channel.js +++ /dev/null @@ -1,192 +0,0 @@ -'use strict' -/* @flow */ - -const stream = require('readable-stream') -const debug = require('debug') - -/* :: import type Multiplex from './index' - -export type ChannelOpts = { - chunked?: bool, - halfOpen?: bool, - lazy?: bool -} -*/ - -class Channel extends stream.Duplex { - constructor (name/* : Buffer | string */, plex/* : Multiplex */, opts/* : ChannelOpts = {} */) { - const halfOpen = Boolean(opts.halfOpen) - super({ - allowHalfOpen: halfOpen - }) - - this.name = name - this.log = debug('mplex:channel:' + this.name.toString()) - this.channel = 0 - this.initiator = false - this.chunked = Boolean(opts.chunked) - this.halfOpen = halfOpen - this.destroyed = false - this.finalized = false - this.local = true - - this._multiplex = plex - this._dataHeader = 0 - this._opened = false - this._awaitDrain = 0 - this._lazy = Boolean(opts.lazy) - - let finished = false - let ended = false - this.log('open, halfOpen: ' + this.halfOpen) - - this.once('end', () => { - this.log('end') - this._read() // trigger drain - - if (this.destroyed) { - return - } - - ended = true - if (finished) { - this._finalize() - } else if (!this.halfOpen) { - this.end() - } - }) - - this.once('finish', function onfinish () { - if (this.destroyed) { - return - } - - if (!this._opened) { - return this.once('open', onfinish) - } - - if (this._lazy && this.initiator) { - this._open() - } - - this._multiplex._send( - this.channel << 3 | (this.initiator ? 4 : 3), - null - ) - - finished = true - - if (ended) { - this._finalize() - } - }) - } - - /** - * Conditionally emit errors if we have listeners. All other - * events are sent to EventEmitter.emit - * @param {string} eventName - * @param {...any} args - * @returns {void} - */ - emit (eventName, ...args) { - if (eventName === 'error' && !this._events.error) { - this.log('error', ...args) - } else { - super.emit(eventName, ...args) - } - } - - _destroy (err/* : Error */, callback) { - this.log('_destroy:' + (this.local ? 'local' : 'remote')) - - if (this.local && this._opened) { - if (this._lazy && this.initiator) { - this._open() - } - - const msg = err ? Buffer.from(err.message) : null - try { - this._multiplex._send( - this.channel << 3 | (this.initiator ? 6 : 5), - msg - ) - } catch (e) { /* do nothing */ } - } - - this._finalize() - callback(err) - } - - _finalize () { - if (this.finalized) { - return - } - - this.finalized = true - this.emit('finalize') - } - - _write (data/* : Buffer */, enc/* : string */, cb/* : () => void */) { - this.log('write: ', data.length) - if (!this._opened) { - this.once('open', () => { - this._write(data, enc, cb) - }) - return - } - - if (this.destroyed) { - cb() - return - } - - if (this._lazy && this.initiator) { - this._open() - } - - const drained = this._multiplex._send( - this._dataHeader, - data - ) - - if (drained) { - cb() - return - } - - this._multiplex._ondrain.push(cb) - } - - _read () { - if (this._awaitDrain) { - const drained = this._awaitDrain - this._awaitDrain = 0 - this._multiplex._onchanneldrain(drained) - } - } - - _open () { - let buf = null - if (Buffer.isBuffer(this.name)) { - buf = this.name - } else if (this.name !== this.channel.toString()) { - buf = Buffer.from(this.name) - } - - this._lazy = false - this._multiplex._send(this.channel << 3 | 0, buf) - } - - open (channel/* : number */, initiator/* : bool */) { - this.log('open: ' + channel) - this.channel = channel - this.initiator = initiator - this._dataHeader = channel << 3 | (initiator ? 2 : 1) - this._opened = true - if (!this._lazy && this.initiator) this._open() - this.emit('open') - } -} - -module.exports = Channel diff --git a/src/internals/index.js b/src/internals/index.js deleted file mode 100644 index db92d3f..0000000 --- a/src/internals/index.js +++ /dev/null @@ -1,477 +0,0 @@ -'use strict' -/* @flow */ - -const stream = require('readable-stream') -const varint = require('varint') -const debug = require('debug') - -const Channel = require('./channel') -/* :: import type {ChannelOpts} from './channel' */ - -const SIGNAL_FLUSH = Buffer.from([0]) - -const empty = Buffer.alloc(0) -let pool = Buffer.alloc(10 * 1024) -let used = 0 - -/* :: -type MultiplexOpts = { - binaryName?: bool, - limit?: number, - initiator?: bool -} - -type ChannelCallback = (Channel) => void -*/ - -class Multiplex extends stream.Duplex { - constructor (opts/* :: ?: MultiplexOpts | ChannelCallback */, onchannel /* :: ?: ChannelCallback */) { - super() - if (typeof opts === 'function') { - onchannel = opts - opts = {} - } - - if (!opts) { - opts = {} - } - - if (onchannel) { - this.on('stream', onchannel) - } - - this.destroyed = false - this.limit = opts.limit || 0 - if (opts.initiator == null) { - opts.initiator = true - } - - this.initiator = opts.initiator - - this._corked = 0 - this._options = opts - this._binaryName = Boolean(opts.binaryName) - this._local = [] - this._remote = [] - this._list = this._local - this._receiving = null - this._chunked = false - this._state = 0 - this._type = 0 - this._channel = 0 - this._missing = 0 - this._message = null - - this.log = debug('mplex:main:' + Math.floor(Math.random() * 100000)) - this.log('construction') - - let bufSize = 100 - if (this.limit) { - bufSize = varint.encodingLength(this.limit) - } - this._buf = Buffer.alloc(bufSize) - this._ptr = 0 - this._awaitChannelDrains = 0 - this._onwritedrain = null - this._ondrain = [] - this._finished = false - - this.once('finish', this._clear) - - // setup id handling - this._nextId = this.initiator ? 0 : 1 - } - - // Generate the next stream id - _nextStreamId ()/* : number */ { - let id = this._nextId - this._nextId += 2 - return id - } - - createStream (name/* : Buffer | string */, opts/* : ChannelOpts */)/* : Channel */ { - if (this.destroyed) { - throw new Error('Multiplexer is destroyed') - } - const id = this._nextStreamId() - let channelName = this._name(name || id.toString()) - const options = Object.assign(this._options, opts) - this.log('createStream: %s', id, channelName.toString(), options) - - const channel = new Channel(channelName, this, options) - return this._addChannel(channel, id, this._local) - } - - receiveStream (name/* : Buffer | string */, opts/* : ChannelOpts */)/* : Channel */ { - if (this.destroyed) { - throw new Error('Multiplexer is destroyed') - } - - if (name === undefined || name === null) { - throw new Error('Name is needed when receiving a stream') - } - - const channelName = this._name(name) - this.log('receiveStream: ' + channelName.toString()) - const channel = new Channel( - channelName, - this, - Object.assign(this._options, opts) - ) - - if (!this._receiving) { - this._receiving = {} - } - - if (this._receiving[channel.name]) { - throw new Error('You are already receiving this stream') - } - - this._receiving[channel.name] = channel - - return channel - } - - _name (name/* : Buffer | string */)/* : Buffer | string */ { - if (!this._binaryName) { - return name.toString() - } - return Buffer.isBuffer(name) ? name : Buffer.from(name) - } - - _send (header/* : number */, data /* :: ?: Buffer */)/* : bool */ { - const len = data ? data.length : 0 - const oldUsed = used - - this.log('_send', header, len) - - varint.encode(header, pool, used) - used += varint.encode.bytes - varint.encode(len, pool, used) - used += varint.encode.bytes - - let buf = pool.slice(oldUsed, used) - - if (pool.length - used < 100) { - pool = Buffer.alloc(10 * 1024) - used = 0 - } - - if (data) { - buf = Buffer.concat([ - buf, - data - ]) - } - - // Push and return the results - return this.push(buf) - } - - _addChannel (channel/* : Channel */, id/* : number */, list/* : Array */)/* : Channel */ { - this.log('_addChannel', id) - list[id] = channel - channel.on('finalize', () => { - this.log('_remove channel', id) - list[id] = null - }) - channel.open(id, list === this._local) - - return channel - } - - _writeVarint (data/* : Buffer */, offset/* : number */)/* : number */ { - for (offset; offset < data.length; offset++) { - if (this._ptr === this._buf.length) { - return this._lengthError(data) - } - - this._buf[this._ptr++] = data[offset] - - if (!(data[offset] & 0x80)) { - if (this._state === 0) { - const header = varint.decode(this._buf) - this._type = header & 7 - this._channel = header >> 3 - this._list = this._type & 1 ? this._local : this._remote - const chunked = this._list.length > this._channel && - this._list[this._channel] && - this._list[this._channel].chunked - - this._chunked = Boolean(this._type === 1 || this._type === 2) && chunked - } else { - this._missing = varint.decode(this._buf) - - if (this.limit && this._missing > this.limit) { - return this._lengthError(data) - } - } - - this._state++ - this._ptr = 0 - return offset + 1 - } - } - - return data.length - } - - _lengthError (data/* : Buffer */)/* : number */ { - this.destroy(new Error('Incoming message is too big')) - return data.length - } - - _writeMessage (data/* : Buffer */, offset/* : number */)/* : number */ { - const free = data.length - offset - const missing = this._missing - - if (!this._message) { - if (missing <= free) { // fast track - no copy - this._missing = 0 - this._push(data.slice(offset, offset + missing)) - return offset + missing - } - if (this._chunked) { - this._missing -= free - this._push(data.slice(offset, data.length)) - return data.length - } - this._message = Buffer.alloc(missing) - } - - data.copy(this._message, this._ptr, offset, offset + missing) - - if (missing <= free) { - this._missing = 0 - this._push(this._message) - return offset + missing - } - - this._missing -= free - this._ptr += free - - return data.length - } - - _push (data/* : Buffer */) { - this.log('_push', data.length) - if (!this._missing) { - this._ptr = 0 - this._state = 0 - this._message = null - } - - if (this._type === 0) { // open - this.log('open', this._channel) - if (this.destroyed || this._finished) { - return - } - - let name - if (this._binaryName) { - name = data - } else { - name = data.toString() || this._channel.toString() - } - this.log('open name', name) - let channel - if (this._receiving && this._receiving[name]) { - channel = this._receiving[name] - delete this._receiving[name] - this._addChannel(channel, this._channel, this._list) - } else { - channel = new Channel(name, this, this._options) - this.emit('stream', this._addChannel( - channel, - this._channel, - this._list), channel.name) - } - return - } - - const stream = this._list[this._channel] - if (!stream) { - return - } - - switch (this._type) { - case 5: // local error - case 6: { // remote error - const error = new Error(data.toString() || 'Channel destroyed') - stream.local = false - stream.destroy(error) - return - } - - case 3: // local end - case 4: { // remote end - stream.push(null) - return - } - - case 1: // local packet - case 2: // remote packet - if (!stream.push(data)) { - this._awaitChannelDrains++ - stream._awaitDrain++ - } - break - default: // no action - } - } - - _onchanneldrain (drained/* : number */) { - this._awaitChannelDrains -= drained - - if (this._awaitChannelDrains) { - return - } - - const ondrain = this._onwritedrain - this._onwritedrain = null - - if (ondrain) { - ondrain() - } - } - - _write (data/* : Buffer */, enc/* : string */, cb/* : () => void */) { - this.log('_write', data.length) - if (this._finished) { - cb() - return - } - - if (this._corked) { - this._onuncork(this._write.bind(this, data, enc, cb)) - return - } - - if (data === SIGNAL_FLUSH) { - this._finish(cb) - return - } - - let offset = 0 - while (offset < data.length) { - if (this._state === 2) { - offset = this._writeMessage(data, offset) - } else { - offset = this._writeVarint(data, offset) - } - } - - if (this._state === 2 && !this._missing) { - this._push(empty) - } - - if (this._awaitChannelDrains) { - this._onwritedrain = cb - } else { - cb() - } - } - - _finish (cb/* : () => void */) { - this._onuncork(() => { - if (this._writableState.prefinished === false) { - this._writableState.prefinished = true - } - this.emit('prefinish') - this._onuncork(cb) - }) - } - - cork () { - if (++this._corked === 1) { - this.emit('cork') - } - } - - uncork () { - if (this._corked && --this._corked === 0) { - this.emit('uncork') - } - } - - end (data/* :: ?: Buffer | () => void */, enc/* :: ?: string | () => void */, cb/* :: ?: () => void */) { - this.log('end') - if (typeof data === 'function') { - cb = data - data = undefined - } - if (typeof enc === 'function') { - cb = enc - enc = undefined - } - - if (data) { - this.write(data) - } - - if (!this._writableState.ending) { - this.write(SIGNAL_FLUSH) - } - - return stream.Writable.prototype.end.call(this, cb) - } - - _onuncork (fn/* : () => void */) { - if (this._corked) { - this.once('uncork', fn) - return - } - - fn() - } - - _read () { - while (this._ondrain.length) { - this._ondrain.shift()() - } - } - - _clear () { - this.log('_clear') - if (this._finished) { - return - } - - this._finished = true - - const list = this._local.concat(this._remote) - - this._local = [] - this._remote = [] - - list.forEach(function (stream) { - if (stream) { - stream.local = false - stream.destroy(null) - } - }) - - this.push(null) - } - - finalize () { - this._clear() - } - - _destroy (err/* :: ?: Error */, callback) { - this.log('destroy') - - const list = this._local.concat(this._remote) - - list.forEach(function (stream) { - if (stream) { - stream.destroy(err || new Error('Channel destroyed')) - } - }) - - this._clear() - callback(err) - } -} - -module.exports = Multiplex diff --git a/src/message-types.js b/src/message-types.js new file mode 100644 index 0000000..2c2e996 --- /dev/null +++ b/src/message-types.js @@ -0,0 +1,33 @@ +'use strict' + +const MessageTypes = Object.freeze({ + NEW_STREAM: 0, + MESSAGE_RECEIVER: 1, + MESSAGE_INITIATOR: 2, + CLOSE_RECEIVER: 3, + CLOSE_INITIATOR: 4, + RESET_RECEIVER: 5, + RESET_INITIATOR: 6 +}) + +exports.MessageTypes = MessageTypes + +exports.InitiatorMessageTypes = Object.freeze({ + NEW_STREAM: MessageTypes.NEW_STREAM, + MESSAGE: MessageTypes.MESSAGE_INITIATOR, + CLOSE: MessageTypes.CLOSE_INITIATOR, + RESET: MessageTypes.RESET_INITIATOR +}) + +exports.ReceiverMessageTypes = Object.freeze({ + MESSAGE: MessageTypes.MESSAGE_RECEIVER, + CLOSE: MessageTypes.CLOSE_RECEIVER, + RESET: MessageTypes.RESET_RECEIVER +}) + +exports.MessageTypeNames = Object.freeze( + Object.entries(MessageTypes).reduce((obj, e) => { + obj[e[1]] = e[0] + return obj + }, {}) +) diff --git a/src/mplex.js b/src/mplex.js new file mode 100644 index 0000000..9f6411c --- /dev/null +++ b/src/mplex.js @@ -0,0 +1,213 @@ +'use strict' + +const pipe = require('it-pipe') +const pushable = require('it-pushable') +const log = require('debug')('libp2p:mplex') +const abortable = require('abortable-iterator') +const Coder = require('./coder') +const restrictSize = require('./restrict-size') +const { MessageTypes, MessageTypeNames } = require('./message-types') +const createStream = require('./stream') + +class Mplex { + /** + * @constructor + * @param {object} options + * @param {function(*)} options.onStream Called whenever an inbound stream is created + * @param {AbortSignal} options.signal An AbortController signal + */ + constructor (options) { + options = options || {} + options = typeof options === 'function' ? { onStream: options } : options + + this._streamId = 0 + this._streams = { + /** + * @type {Map} Stream to ids map + */ + initiators: new Map(), + /** + * @type {Map} Stream to ids map + */ + receivers: new Map() + } + this._options = options + + /** + * An iterable sink + */ + this.sink = this._createSink() + + /** + * An iterable source + */ + this.source = this._createSource() + + /** + * @property {function} onStream + */ + this.onStream = options.onStream + } + + /** + * Initiate a new stream with the given name. If no name is + * provided, the id of th stream will be used. + * @param {string} [name] If name is not a string it will be cast to one + * @returns {Stream} + */ + newStream (name) { + const id = this._streamId++ + name = name == null ? id.toString() : String(name) + const registry = this._streams.initiators + return this._newStream({ id, name, type: 'initiator', registry }) + } + + /** + * Called whenever an inbound stream is created + * @private + * @param {*} options + * @param {number} options.id + * @param {string} options.name + * @returns {*} A muxed stream + */ + _newReceiverStream ({ id, name }) { + const registry = this._streams.receivers + return this._newStream({ id, name, type: 'receiver', registry }) + } + + /** + * Creates a new stream + * @private + * @param {object} options + * @param {number} options.id + * @param {string} options.name + * @param {string} options.type + * @param {Map} options.registry A map of streams to their ids + * @returns {*} A muxed stream + */ + _newStream ({ id, name, type, registry }) { + if (registry.has(id)) { + throw new Error(`${type} stream ${id} already exists!`) + } + log('new %s stream %s %s', type, id, name) + const send = msg => { + if (log.enabled) { + log('%s stream %s %s send', type, id, name, { ...msg, type: MessageTypeNames[msg.type], data: msg.data && msg.data.slice() }) + } + return this.source.push(msg) + } + const onEnd = () => { + log('%s stream %s %s ended', type, id, name) + registry.delete(id) + } + const stream = createStream({ id, name, send, type, onEnd }) + registry.set(id, stream) + return stream + } + + /** + * Creates a sink with an abortable source. Incoming messages will + * also have their size restricted. All messages will be varint decoded. + * @private + * @returns {*} Returns an iterable sink + */ + _createSink () { + return async source => { + if (this._options.signal) { + source = abortable(source, this._options.signal) + } + + try { + await pipe( + source, + Coder.decode, + restrictSize(this._options.maxMsgSize), + async source => { + for await (const msgs of source) { + for (const msg of msgs) { + this._handleIncoming(msg) + } + } + } + ) + } catch (err) { + log('error in sink', err) + return this.source.end(err) // End the source with an error + } + + this.source.end() + } + } + + /** + * Creates a source that restricts outgoing message sizes + * and varint encodes them. + * @private + * @returns {*} An iterable source + */ + _createSource () { + const onEnd = err => { + const { initiators, receivers } = this._streams + // Abort all the things! + for (const s of initiators.values()) s.abort(err) + for (const s of receivers.values()) s.abort(err) + } + const source = pushable({ onEnd, writev: true }) + const encodedSource = pipe( + source, + restrictSize(this._options.maxMsgSize), + Coder.encode + ) + return Object.assign(encodedSource, { + push: source.push, + end: source.end, + return: source.return + }) + } + + /** + * @private + * @param {object} options + * @param {number} options.id + * @param {string} options.type + * @param {Buffer|BufferList} options.data + * @returns {void} + */ + _handleIncoming ({ id, type, data }) { + if (log.enabled) { + log('incoming message', { id, type: MessageTypeNames[type], data: data.slice() }) + } + + // Create a new stream? + if (type === MessageTypes.NEW_STREAM && this.onStream) { + const stream = this._newReceiverStream({ id, name: data.toString() }) + return this.onStream(stream) + } + + const list = type & 1 ? this._streams.initiators : this._streams.receivers + const stream = list.get(id) + + if (!stream) return log('missing stream %s', id) + + switch (type) { + case MessageTypes.MESSAGE_INITIATOR: + case MessageTypes.MESSAGE_RECEIVER: + stream.source.push(data) + break + case MessageTypes.CLOSE_INITIATOR: + case MessageTypes.CLOSE_RECEIVER: + stream.close() + break + case MessageTypes.RESET_INITIATOR: + case MessageTypes.RESET_RECEIVER: + stream.reset() + break + default: + log('unknown message type %s', type) + } + } +} + +Mplex.multicodec = '/mplex/6.7.0' + +module.exports = Mplex diff --git a/src/muxer.js b/src/muxer.js deleted file mode 100644 index 2f0ee64..0000000 --- a/src/muxer.js +++ /dev/null @@ -1,108 +0,0 @@ -'use strict' - -const EventEmitter = require('events').EventEmitter -const Connection = require('interface-connection').Connection -const toPull = require('stream-to-pull-stream') -const pull = require('pull-stream') -const pullCatch = require('pull-catch') -const setImmediate = require('async/setImmediate') -const debug = require('debug') -const log = debug('mplex') -log.error = debug('mplex:error') - -const MULTIPLEX_CODEC = require('./codec') - -function noop () {} - -// Catch error makes sure that even though we get the "Channel destroyed" error -// from when closing streams, that it's not leaking through since it's not -// really an error for us, channels shoul close cleanly. -function catchError (stream) { - return { - source: pull( - stream.source, - pullCatch((err) => { - if (err.message === 'Channel destroyed') { - return - } - return false - }) - ), - sink: stream.sink - } -} - -class MultiplexMuxer extends EventEmitter { - constructor (conn, multiplex) { - super() - this.multiplex = multiplex - this.conn = conn - this.multicodec = MULTIPLEX_CODEC - - multiplex.on('close', () => this.emit('close')) - multiplex.on('error', (err) => this.emit('error', err)) - - multiplex.on('stream', (stream, id) => { - const muxedConn = new Connection( - catchError(toPull.duplex(stream)), - this.conn - ) - this.emit('stream', muxedConn) - }) - } - - /** - * Conditionally emit errors if we have listeners. All other - * events are sent to EventEmitter.emit - * - * @param {string} eventName - * @param {...any} args - * @returns {void} - */ - emit (eventName, ...args) { - if (eventName === 'error' && !this._events.error) { - log.error('error', ...args) - } else { - super.emit(eventName, ...args) - } - } - - // method added to enable pure stream muxer feeling - newStream (callback) { - callback = callback || noop - let stream - try { - stream = this.multiplex.createStream() - } catch (err) { - return setImmediate(() => callback(err)) - } - - const conn = new Connection( - catchError(toPull.duplex(stream)), - this.conn - ) - - setImmediate(() => callback(null, conn)) - - return conn - } - - /** - * Destroys multiplex and ends all internal streams - * - * @param {Error} err Optional error to pass to end the muxer with - * @param {function()} callback Optional - * @returns {void} - */ - end (err, callback) { - if (typeof err === 'function') { - callback = err - err = null - } - callback = callback || noop - this.multiplex.destroy(err) - callback() - } -} - -module.exports = MultiplexMuxer diff --git a/src/restrict-size.js b/src/restrict-size.js new file mode 100644 index 0000000..7477a9a --- /dev/null +++ b/src/restrict-size.js @@ -0,0 +1,32 @@ +'use strict' + +const MAX_MSG_SIZE = 1 << 20 // 1MB + +/** + * Creates an iterable transform that restricts message sizes to + * the given maximum size. + * @param {number} [max] The maximum message size. Defaults to 1MB + * @returns {*} An iterable transform. + */ +module.exports = max => { + max = max || MAX_MSG_SIZE + + const checkSize = msg => { + if (msg.data && msg.data.length > max) { + throw Object.assign(new Error('message size too large!'), { code: 'ERR_MSG_TOO_BIG' }) + } + } + + return source => { + return (async function * restrictSize () { + for await (const msg of source) { + if (Array.isArray(msg)) { + msg.forEach(checkSize) + } else { + checkSize(msg) + } + yield msg + } + })() + } +} diff --git a/src/stream.js b/src/stream.js new file mode 100644 index 0000000..4c11f7a --- /dev/null +++ b/src/stream.js @@ -0,0 +1,89 @@ +'use strict' + +const abortable = require('abortable-iterator') +const AbortController = require('abort-controller') +const log = require('debug')('libp2p:mplex:stream') +const pushable = require('it-pushable') +const { InitiatorMessageTypes, ReceiverMessageTypes } = require('./message-types') + +/** + * @param {object} options + * @param {number} options.id + * @param {string} options.name + * @param {function(*)} options.send Called to send data through the stream + * @param {function(Error)} [options.onEnd] Called whenever the stream ends + * @param {string} options.type One of ['initiator','receiver']. Defaults to 'initiator' + * @returns {*} A muxed stream + */ +module.exports = ({ id, name, send, onEnd = () => {}, type = 'initiator' }) => { + const abortController = new AbortController() + const resetController = new AbortController() + const Types = type === 'initiator' ? InitiatorMessageTypes : ReceiverMessageTypes + + name = String(name == null ? id : name) + + let sourceEnded = false + let sinkEnded = false + let endErr + + const onSourceEnd = err => { + sourceEnded = true + log('%s stream %s source end', type, name, err) + if (err && !endErr) endErr = err + if (sinkEnded) onEnd(endErr) + } + + const onSinkEnd = err => { + sinkEnded = true + log('%s stream %s sink end', type, name, err) + if (err && !endErr) endErr = err + if (sourceEnded) onEnd(endErr) + } + + const stream = { + // Close for reading + close: () => stream.source.end(), + // Close for reading and writing (local error) + abort: err => { + log('%s stream %s abort', type, name, err) + // End the source with the passed error + stream.source.end(err) + abortController.abort() + }, + // Close immediately for reading and writing (remote error) + reset: () => resetController.abort(), + sink: async source => { + source = abortable.multi(source, [ + { signal: abortController.signal, options: { abortMessage: 'stream aborted', abortCode: 'ERR_MPLEX_STREAM_ABORT' } }, + { signal: resetController.signal, options: { abortMessage: 'stream reset', abortCode: 'ERR_MPLEX_STREAM_RESET' } } + ]) + + if (type === 'initiator') { // If initiator, open a new stream + send({ id, type: Types.NEW_STREAM, data: name }) + } + + try { + for await (const data of source) { + send({ id, type: Types.MESSAGE, data }) + } + } catch (err) { + // Send no more data if this stream was remotely reset + if (err.code === 'ERR_MPLEX_STREAM_RESET') { + log('%s stream %s reset', type, name) + } else { + log('%s stream %s error', type, name, err) + send({ id, type: Types.RESET }) + } + + stream.source.end(err) + return onSinkEnd(err) + } + + send({ id, type: Types.CLOSE }) + onSinkEnd() + }, + source: pushable(onSourceEnd) + } + + return stream +} diff --git a/test/browser.js b/test/browser.js deleted file mode 100644 index ecc0fb5..0000000 --- a/test/browser.js +++ /dev/null @@ -1,43 +0,0 @@ -/* eslint-env mocha */ -'use strict' - -const chai = require('chai') -const dirtyChai = require('dirty-chai') -const expect = chai.expect -chai.use(dirtyChai) -const WSlibp2p = require('libp2p-websockets') -const multiaddr = require('multiaddr') -const pull = require('pull-stream') - -const multiplex = require('../src') - -describe('browser-server', () => { - let ws - - before(() => { - ws = new WSlibp2p() - }) - - it('ricochet test', (done) => { - const mh = multiaddr('/ip4/127.0.0.1/tcp/9095/ws') - const transportSocket = ws.dial(mh) - const muxedConn = multiplex.dialer(transportSocket) - - muxedConn.on('stream', (conn) => { - pull( - conn, - pull.collect((err, chunks) => { - expect(err).to.not.exist() - expect(chunks).to.be.eql([Buffer.from('hey')]) - pull(pull.empty(), conn) - }) - ) - }) - - pull( - pull.values([Buffer.from('hey')]), - muxedConn.newStream(), - pull.onEnd(done) - ) - }) -}) diff --git a/test/coder.spec.js b/test/coder.spec.js new file mode 100644 index 0000000..d0d4664 --- /dev/null +++ b/test/coder.spec.js @@ -0,0 +1,110 @@ +/* eslint-env mocha */ +/* eslint max-nested-callbacks: ["error", 5] */ +'use strict' + +const chai = require('chai') +const dirtyChai = require('dirty-chai') +const BufferList = require('bl') +const { expect } = chai +chai.use(dirtyChai) + +const coder = require('../src/coder') + +describe('coder', () => { + it('should encode header', async () => { + const source = [{ id: 17, type: 0, data: Buffer.from('17') }] + + const data = new BufferList() + for await (const chunk of coder.encode(source)) { + data.append(chunk) + } + + const expectedHeader = Buffer.from('880102', 'hex') + expect(data.slice(0, expectedHeader.length)).to.be.eql(expectedHeader) + }) + + it('should decode header', async () => { + const source = [Buffer.from('8801023137', 'hex')] + for await (const msgs of coder.decode(source)) { + expect(msgs.length).to.equal(1) + msgs[0].data = msgs[0].data.slice() // convert BufferList to Buffer + expect(msgs[0]).to.be.eql({ id: 17, type: 0, data: Buffer.from('17') }) + } + }) + + it('should encode several msgs into buffer', async () => { + const source = [ + { id: 17, type: 0, data: Buffer.from('17') }, + { id: 19, type: 0, data: Buffer.from('19') }, + { id: 21, type: 0, data: Buffer.from('21') } + ] + + const data = new BufferList() + for await (const chunk of coder.encode(source)) { + data.append(chunk) + } + + expect(data.slice()).to.be.eql(Buffer.from('88010231379801023139a801023231', 'hex')) + }) + + it('should encode from BufferList', async () => { + const source = [{ + id: 17, + type: 0, + data: new BufferList([ + Buffer.from(Math.random().toString()), + Buffer.from(Math.random().toString()) + ]) + }] + + const data = new BufferList() + for await (const chunk of coder.encode(source)) { + data.append(chunk) + } + + expect(data.slice()).to.be.eql(Buffer.concat([ + Buffer.from('8801', 'hex'), + Buffer.from([source[0].data.length]), + source[0].data.slice() + ])) + }) + + it('should decode msgs from buffer', async () => { + const source = [Buffer.from('88010231379801023139a801023231', 'hex')] + + const res = [] + for await (const msgs of coder.decode(source)) { + for (const msg of msgs) { + msg.data = msg.data.slice() // convert BufferList to Buffer + res.push(msg) + } + } + + expect(res).to.be.deep.eql([ + { id: 17, type: 0, data: Buffer.from('17') }, + { id: 19, type: 0, data: Buffer.from('19') }, + { id: 21, type: 0, data: Buffer.from('21') } + ]) + }) + + it('should encode zero length body msg', async () => { + const source = [{ id: 17, type: 0 }] + + const data = new BufferList() + for await (const chunk of coder.encode(source)) { + data.append(chunk) + } + + expect(data.slice()).to.be.eql(Buffer.from('880100', 'hex')) + }) + + it('should decode zero length body msg', async () => { + const source = [Buffer.from('880100', 'hex')] + + for await (const msgs of coder.decode(source)) { + expect(msgs.length).to.equal(1) + msgs[0].data = msgs[0].data.slice() // convert BufferList to Buffer + expect(msgs[0]).to.be.eql({ id: 17, type: 0, data: Buffer.alloc(0) }) + } + }) +}) diff --git a/test/compliance.spec.js b/test/compliance.spec.js index d74ccd8..b512640 100644 --- a/test/compliance.spec.js +++ b/test/compliance.spec.js @@ -2,15 +2,11 @@ 'use strict' const tests = require('interface-stream-muxer') -const multiplex = require('../src') +const Mplex = require('../src') describe('compliance', () => { tests({ - setup (cb) { - cb(null, multiplex) - }, - teardown (cb) { - cb() - } + setup: () => Mplex, + teardown () {} }) }) diff --git a/test/internals.node.js b/test/internals.node.js deleted file mode 100644 index c05e2ca..0000000 --- a/test/internals.node.js +++ /dev/null @@ -1,460 +0,0 @@ -/* eslint-env mocha */ -'use strict' - -const chai = require('chai') -chai.use(require('dirty-chai')) -chai.use(require('chai-checkmark')) -const expect = chai.expect - -const concat = require('concat-stream') -const through = require('through2') -const net = require('net') -const chunky = require('chunky') -const pump = require('pump') - -const MplexCore = require('../src/internals') - -describe('Internals - MplexCore', () => { - it('one way piping work with 2 sub-streams', (done) => { - const plex1 = new MplexCore() - const stream1 = plex1.createStream() - const stream2 = plex1.createStream() - - function onStream (stream, id) { - stream.pipe(collect()) - } - - const plex2 = new MplexCore(onStream) - - plex1.pipe(plex2) - - stream1.write(Buffer.from('hello')) - stream2.write(Buffer.from('world')) - stream1.end() - stream2.end() - - let pending = 2 - const results = [] - - function collect () { - return concat(function (data) { - results.push(data.toString()) - - if (--pending === 0) { - results.sort() - expect(results[0].toString()).to.equal('hello') - expect(results[1].toString()).to.equal('world') - done() - } - }) - } - }) - - it('two way piping works with 2 sub-streams', (done) => { - const plex1 = new MplexCore() - - const plex2 = new MplexCore(function onStream (stream, id) { - const uppercaser = through(function (chunk, e, done) { - this.push(Buffer.from(chunk.toString().toUpperCase())) - this.end() - done() - }) - stream.pipe(uppercaser).pipe(stream) - }) - - plex1.pipe(plex2).pipe(plex1) - - const stream1 = plex1.createStream() - const stream2 = plex1.createStream() - - stream1.pipe(collect()) - stream2.pipe(collect()) - - stream1.write(Buffer.from('hello')) - stream2.write(Buffer.from('world')) - - let pending = 2 - const results = [] - - function collect () { - return concat(function (data) { - results.push(data.toString()) - if (--pending === 0) { - results.sort() - expect(results[0].toString()).to.equal('HELLO') - expect(results[1].toString()).to.equal('WORLD') - done() - } - }) - } - }) - - it('stream id should be exposed as stream.name', (done) => { - const plex1 = new MplexCore() - const stream1 = plex1.createStream('5') - expect(stream1.name).to.equal('5') - - const plex2 = new MplexCore(function onStream (stream, id) { - expect(stream.name).to.equal('5') - expect(id).to.equal('5') - done() - }) - - plex1.pipe(plex2) - - stream1.write(Buffer.from('hello')) - stream1.end() - }) - - it('stream id can be a long string', (done) => { - const plex1 = new MplexCore() - const stream1 = plex1.createStream('hello-yes-this-is-dog') - expect(stream1.name).to.equal('hello-yes-this-is-dog') - - const plex2 = new MplexCore(function onStream (stream, id) { - expect(stream.name).to.equal('hello-yes-this-is-dog') - expect(id).to.equal('hello-yes-this-is-dog') - done() - }) - - plex1.pipe(plex2) - - stream1.write(Buffer.from('hello')) - stream1.end() - }) - - it('destroy', (done) => { - const plex1 = new MplexCore() - const stream1 = plex1.createStream() - - expect(2).check(done) - - const plex2 = new MplexCore(function onStream (stream, id) { - stream.on('error', function (err) { - expect(err.message).to.equal('0 had an error').mark() - }) - }) - - plex1.pipe(plex2) - stream1.on('error', function (err) { - expect(err.message).to.equal('0 had an error').mark() - }) - stream1.write(Buffer.from('hello')) - stream1.destroy(new Error('0 had an error')) - }) - - it('testing invalid data error', (done) => { - const plex = new MplexCore() - - plex.on('error', function (err) { - if (err) { - expect(err.message).to.equal('Incoming message is too big') - done() - } - }) - // a really stupid thing to do - plex.write(Array(50000).join('\xff')) - }) - - it('overflow', (done) => { - let count = 0 - function check () { - if (++count === 2) { - done() - } - } - const plex1 = new MplexCore() - const plex2 = new MplexCore({ limit: 10 }) - - plex2.on('stream', function (stream) { - stream.on('error', function (err) { - expect(err.message).to.equal('Incoming message is too big') - check() - }) - }) - - plex2.on('error', function (err) { - if (err) { - expect(err.message).to.equal('Incoming message is too big') - check() - } - }) - - plex1.pipe(plex2).pipe(plex1) - - const stream = plex1.createStream() - - stream.write(Buffer.alloc(11)) - }) - - it('2 buffers packed into 1 chunk', (done) => { - const plex1 = new MplexCore() - const plex2 = new MplexCore(function (b) { - b.pipe(concat(function (body) { - expect(body.toString('utf8')).to.equal('abc\n123\n') - server.close() - plex1.end() - done() - })) - }) - - const a = plex1.createStream(1337) - a.write('abc\n') - a.write('123\n') - a.end() - - const server = net.createServer(function (stream) { - plex2.pipe(stream).pipe(plex2) - }) - server.listen(0, function () { - const port = server.address().port - plex1.pipe(net.connect(port)).pipe(plex1) - }) - }) - - it('chunks', (done) => { - let times = 100 - ;(function chunk () { - const collect = collector(function () { - if (--times === 0) { - done() - } else { - chunk() - } - }) - - const plex1 = new MplexCore() - const stream1 = plex1.createStream() - const stream2 = plex1.createStream() - - const plex2 = new MplexCore(function onStream (stream, id) { - stream.pipe(collect()) - }) - - plex1.pipe(through(function (buf, enc, next) { - const bufs = chunky(buf) - for (let i = 0; i < bufs.length; i++) this.push(bufs[i]) - next() - })).pipe(plex2) - - stream1.write(Buffer.from('hello')) - stream2.write(Buffer.from('world')) - stream1.end() - stream2.end() - })() - - function collector (cb) { - let pending = 2 - const results = [] - - return function () { - return concat(function (data) { - results.push(data.toString()) - if (--pending === 0) { - results.sort() - expect(results[0].toString()).to.equal('hello') - expect(results[1].toString()).to.equal('world') - cb() - } - }) - } - } - }) - - it('prefinish + corking', (done) => { - const plex = new MplexCore() - let async = false - - plex.on('prefinish', function () { - plex.cork() - process.nextTick(function () { - async = true - plex.uncork() - }) - }) - - plex.on('finish', function () { - expect(async).to.be.ok() - done() - }) - - plex.end() - }) - - it('quick message', (done) => { - const plex2 = new MplexCore() - const plex1 = new MplexCore(function (stream) { - stream.write('hello world') - }) - - plex1.pipe(plex2).pipe(plex1) - - setTimeout(function () { - const stream = plex2.createStream() - stream.on('data', function (data) { - expect(data).to.eql(Buffer.from('hello world')) - done() - }) - }, 100) - }) - - it('if onstream is not passed, stream is emitted', (done) => { - const plex1 = new MplexCore() - const plex2 = new MplexCore() - - plex1.pipe(plex2).pipe(plex1) - - plex2.on('stream', function (stream, id) { - expect(stream).to.exist() - expect(id).to.exist() - stream.write('hello world') - stream.end() - }) - - const stream = plex1.createStream() - stream.on('data', function (data) { - expect(data).to.eql(Buffer.from('hello world')) - stream.end() - setTimeout(() => done(), 1000) - }) - }) - - it('half close a muxed stream', (done) => { - const plex1 = new MplexCore() - const plex2 = new MplexCore() - - plex1.pipe(plex2).pipe(plex1) - - plex2.on('stream', function (stream, id) { - expect(stream).to.exist() - expect(id).to.exist() - - // let it flow - stream.on('data', function () {}) - - stream.on('end', function () { - done() - }) - - stream.on('error', function (err) { - expect(err).to.not.exist() - }) - - stream.write(Buffer.from('hello world')) - - stream.end() - }) - - const stream = plex1.createStream() - - stream.on('data', function (data) { - expect(data).to.eql(Buffer.from('hello world')) - }) - - stream.on('error', function (err) { - expect(err).to.not.exist() - }) - - stream.on('end', function () { - stream.end() - }) - }) - - it('half close a half closed muxed stream', (done) => { - const plex1 = new MplexCore({ halfOpen: true }) - const plex2 = new MplexCore({ halfOpen: true }) - - plex1.nameTag = 'plex1:' - plex2.nameTag = 'plex2:' - - plex1.pipe(plex2).pipe(plex1) - - plex2.on('stream', function (stream, id) { - expect(stream).to.exist() - expect(id).to.exist() - - stream.on('data', function (data) { - expect(data).to.eql(Buffer.from('some data')) - }) - - stream.on('end', function () { - stream.write(Buffer.from('hello world')) - stream.end() - }) - - stream.on('error', function (err) { - expect(err).to.not.exist() - }) - }) - - const stream = plex1.createStream() - - stream.on('data', function (data) { - expect(data).to.eql(Buffer.from('hello world')) - }) - - stream.on('error', function (err) { - expect(err).to.not.exist() - }) - - stream.on('end', function () { - done() - }) - - stream.write(Buffer.from('some data')) - - stream.end() - }) - - it('underlying error is propagated to muxed streams', (done) => { - let count = 0 - function check () { - if (++count === 4) { - done() - } - } - - const plex1 = new MplexCore() - const plex2 = new MplexCore() - - let socket - - plex2.on('stream', function (stream) { - stream.on('error', function (err) { - expect(err).to.exist() - check() - }) - - stream.on('close', function () { - check() - }) - - socket.destroy() - }) - - const stream1to2 = plex1.createStream(1337) - - stream1to2.on('error', function (err) { - expect(err).to.exist() - check() - }) - - stream1to2.on('close', function () { - check() - }) - - const server = net.createServer(function (stream) { - pump(plex2, stream) - pump(stream, plex2) - server.close() - }) - - server.listen(0, function () { - const port = server.address().port - socket = net.connect(port) - - pump(plex1, socket) - pump(socket, plex1) - }) - }) -}) diff --git a/test/mplex.spec.js b/test/mplex.spec.js deleted file mode 100644 index e67a996..0000000 --- a/test/mplex.spec.js +++ /dev/null @@ -1,70 +0,0 @@ -/* eslint-env mocha */ -'use strict' - -const chai = require('chai') -const dirtyChai = require('dirty-chai') -const expect = chai.expect -chai.use(dirtyChai) -const pair = require('pull-pair/duplex') -const pull = require('pull-stream') - -const multiplex = require('../src') - -describe('multiplex-generic', () => { - let listenerSocket - let dialerSocket - - let listener - let dialer - - before(() => { - const p = pair() - dialerSocket = p[0] - listenerSocket = p[1] - }) - - it('attach to a duplex stream, as listener', () => { - listener = multiplex.listener(listenerSocket) - expect(listener).to.exist() - }) - - it('attach to a duplex stream, as dialer', () => { - dialer = multiplex.dialer(dialerSocket) - expect(dialer).to.exist() - }) - - it('open a multiplex stream from client', (done) => { - listener.once('stream', (conn) => { - pull(conn, conn) - }) - - const conn = dialer.newStream() - pull( - // Strings should be converted to Buffers - pull.values(['hello']), - conn, - pull.collect((err, res) => { - expect(err).to.not.exist() - expect(res).to.eql([Buffer.from('hello')]) - done() - }) - ) - }) - - it('open a multiplex stream from listener', (done) => { - dialer.once('stream', (conn) => { - pull(conn, conn) - }) - - const conn = listener.newStream() - pull( - pull.values([Buffer.from('hello')]), - conn, - pull.collect((err, res) => { - expect(err).to.not.exist() - expect(res).to.eql([Buffer.from('hello')]) - done() - }) - ) - }) -}) diff --git a/test/muxer.spec.js b/test/muxer.spec.js deleted file mode 100644 index 0bc918c..0000000 --- a/test/muxer.spec.js +++ /dev/null @@ -1,74 +0,0 @@ -/* eslint-env mocha */ -'use strict' - -const chai = require('chai') -const dirtyChai = require('dirty-chai') -const expect = chai.expect -chai.use(dirtyChai) - -const pair = require('pull-pair/duplex') - -const Muxer = require('../src/muxer') -const Multiplex = require('../src/internals') - -describe('multiplex-muxer', () => { - let muxer - let multiplex - - it('can be created', () => { - const p = pair() - multiplex = new Multiplex() - muxer = new Muxer(p, multiplex) - }) - - it('catches newStream errors', (done) => { - multiplex.createStream = () => { - throw new Error('something nbad happened') - } - muxer.newStream((err) => { - expect(err).to.exist() - expect(err.message).to.equal('something nbad happened') - done() - }) - }) - - it('can be destroyed with an error', (done) => { - const p = pair() - const multiplex = new Multiplex() - const muxer = new Muxer(p, multiplex) - const error = new Error('bad things') - muxer.once('error', (err) => { - expect(err).to.eql(error) - done() - }) - muxer.end(error) - }) - - it('destroying with error does not throw with no listener', () => { - const p = pair() - const multiplex = new Multiplex() - const muxer = new Muxer(p, multiplex) - const error = new Error('bad things') - expect(() => muxer.end(error)).to.not.throw() - }) - - it('can get destroyed', (done) => { - expect(multiplex.destroyed).to.eql(false) - - muxer.end((err) => { - expect(err).to.not.exist() - expect(multiplex.destroyed).to.be.true() - done() - }) - }) - - it('should handle a repeat destroy', (done) => { - expect(multiplex.destroyed).to.be.true() - - muxer.end((err) => { - expect(err).to.not.exist() - expect(multiplex.destroyed).to.be.true() - done() - }) - }) -}) diff --git a/test/node.js b/test/node.js deleted file mode 100644 index ab22922..0000000 --- a/test/node.js +++ /dev/null @@ -1,3 +0,0 @@ -'use strict' - -require('./internals.node') diff --git a/test/restrict-size.spec.js b/test/restrict-size.spec.js new file mode 100644 index 0000000..6129703 --- /dev/null +++ b/test/restrict-size.spec.js @@ -0,0 +1,52 @@ +/* eslint-env mocha */ +'use strict' + +const chai = require('chai') +const dirtyChai = require('dirty-chai') +const { expect } = chai +chai.use(dirtyChai) +const pipe = require('it-pipe') +const randomBytes = require('random-bytes') +const { tap, consume, collect } = require('streaming-iterables') + +const restrictSize = require('../src/restrict-size') + +describe('restrict-size', () => { + it('should throw when size is too big', async () => { + const maxSize = 32 + + const input = [ + { data: await randomBytes(8) }, + { data: await randomBytes(maxSize) }, + { data: await randomBytes(64) }, + { data: await randomBytes(16) } + ] + + const output = [] + + try { + await pipe( + input, + restrictSize(maxSize), + tap(chunk => output.push(chunk)), + consume + ) + } catch (err) { + expect(err.code).to.equal('ERR_MSG_TOO_BIG') + expect(output).to.have.length(2) + expect(output[0]).to.deep.equal(input[0]) + expect(output[1]).to.deep.equal(input[1]) + return + } + throw new Error('did not restrict size') + }) + + it('should allow message with no data property', async () => { + const output = await pipe( + [{}], + restrictSize(32), + collect + ) + expect(output).to.deep.equal([{}]) + }) +}) diff --git a/test/stream.spec.js b/test/stream.spec.js new file mode 100644 index 0000000..634da42 --- /dev/null +++ b/test/stream.spec.js @@ -0,0 +1,506 @@ +/* eslint-env mocha */ +'use strict' + +const chai = require('chai') +const dirtyChai = require('dirty-chai') +const { expect } = chai +chai.use(dirtyChai) +const pipe = require('it-pipe') +const randomBytes = require('random-bytes') +const randomInt = require('random-int') +const { tap, take, collect, consume } = require('streaming-iterables') +const defer = require('p-defer') + +const createStream = require('../src/stream') +const { MessageTypes, MessageTypeNames } = require('../src/message-types') + +function randomInput (min = 1, max = 100) { + return Promise.all( + Array.from(Array(randomInt(min, max)), () => randomBytes(randomInt(1, 128))) + ) +} + +function expectMsgType (actual, expected) { + expect(MessageTypeNames[actual]).to.equal(MessageTypeNames[expected]) +} + +const infiniteRandom = { + [Symbol.iterator]: function * () { + while (true) yield randomBytes(randomInt(1, 128)) + } +} + +describe('stream', () => { + it('should initiate stream with NEW_STREAM message', async () => { + const msgs = [] + const mockSend = msg => msgs.push(msg) + const id = randomInt(1000) + const stream = createStream({ id, send: mockSend }) + const input = await randomInput() + + await pipe(input, stream) + + expect(msgs[0].id).to.equal(id) + expectMsgType(msgs[0].type, MessageTypes.NEW_STREAM) + expect(msgs[0].data).to.deep.equal(id.toString()) + }) + + it('should initiate named stream with NEW_STREAM message', async () => { + const msgs = [] + const mockSend = msg => msgs.push(msg) + const id = randomInt(1000) + const name = `STREAM${Date.now()}` + const stream = createStream({ id, name, send: mockSend }) + const input = await randomInput() + + await pipe(input, stream) + + expect(msgs[0].id).to.equal(id) + expectMsgType(msgs[0].type, MessageTypes.NEW_STREAM) + expect(msgs[0].data).to.deep.equal(name) + }) + + it('should send data with MESSAGE_INITIATOR messages if stream initiator', async () => { + const msgs = [] + const mockSend = msg => msgs.push(msg) + const id = randomInt(1000) + const name = id.toString() + const stream = createStream({ id, name, send: mockSend, type: 'initiator' }) + const input = await randomInput() + + await pipe(input, stream) + + // First and last should be NEW_STREAM and CLOSE + const dataMsgs = msgs.slice(1, -1) + expect(dataMsgs).have.length(input.length) + + dataMsgs.forEach((msg, i) => { + expect(msg.id).to.equal(id) + expectMsgType(msg.type, MessageTypes.MESSAGE_INITIATOR) + expect(msg.data).to.deep.equal(input[i]) + }) + }) + + it('should send data with MESSAGE_RECEIVER messages if stream receiver', async () => { + const msgs = [] + const mockSend = msg => msgs.push(msg) + const id = randomInt(1000) + const name = id.toString() + const stream = createStream({ id, name, send: mockSend, type: 'receiver' }) + const input = await randomInput() + + await pipe(input, stream) + + // Last should be CLOSE + const dataMsgs = msgs.slice(0, -1) + expect(dataMsgs).have.length(input.length) + + dataMsgs.forEach((msg, i) => { + expect(msg.id).to.equal(id) + expectMsgType(msg.type, MessageTypes.MESSAGE_RECEIVER) + expect(msg.data).to.deep.equal(input[i]) + }) + }) + + it('should close stream with CLOSE_INITIATOR message if stream initiator', async () => { + const msgs = [] + const mockSend = msg => msgs.push(msg) + const id = randomInt(1000) + const name = id.toString() + const stream = createStream({ id, name, send: mockSend, type: 'initiator' }) + const input = await randomInput() + + await pipe(input, stream) + + const closeMsg = msgs[msgs.length - 1] + + expect(closeMsg.id).to.equal(id) + expectMsgType(closeMsg.type, MessageTypes.CLOSE_INITIATOR) + expect(closeMsg.data).to.not.exist() + }) + + it('should close stream with CLOSE_RECEIVER message if stream receiver', async () => { + const msgs = [] + const mockSend = msg => msgs.push(msg) + const id = randomInt(1000) + const name = id.toString() + const stream = createStream({ id, name, send: mockSend, type: 'receiver' }) + const input = await randomInput() + + await pipe(input, stream) + + const closeMsg = msgs[msgs.length - 1] + + expect(closeMsg.id).to.equal(id) + expectMsgType(closeMsg.type, MessageTypes.CLOSE_RECEIVER) + expect(closeMsg.data).to.not.exist() + }) + + it('should reset stream on error with RESET_INITIATOR message if stream initiator', async () => { + const msgs = [] + const mockSend = msg => msgs.push(msg) + const id = randomInt(1000) + const name = id.toString() + const stream = createStream({ id, name, send: mockSend, type: 'initiator' }) + const error = new Error(`Boom ${Date.now()}`) + const input = { + [Symbol.iterator]: function * () { + for (let i = 0; i < randomInt(1, 10); i++) { + yield randomBytes(randomInt(1, 128)) + } + throw error + } + } + + await pipe(input, stream) + + const resetMsg = msgs[msgs.length - 1] + + expect(resetMsg.id).to.equal(id) + expectMsgType(resetMsg.type, MessageTypes.RESET_INITIATOR) + expect(resetMsg.data).to.not.exist() + }) + + it('should reset stream on error with RESET_RECEIVER message if stream receiver', async () => { + const msgs = [] + const mockSend = msg => msgs.push(msg) + const id = randomInt(1000) + const name = id.toString() + const stream = createStream({ id, name, send: mockSend, type: 'receiver' }) + const error = new Error(`Boom ${Date.now()}`) + const input = { + [Symbol.iterator]: function * () { + for (let i = 0; i < randomInt(1, 10); i++) { + yield randomBytes(randomInt(1, 128)) + } + throw error + } + } + + await pipe(input, stream) + + const resetMsg = msgs[msgs.length - 1] + + expect(resetMsg.id).to.equal(id) + expectMsgType(resetMsg.type, MessageTypes.RESET_RECEIVER) + expect(resetMsg.data).to.not.exist() + }) + + it('should close for reading (remote close)', async () => { + const mockInitiatorSend = msg => receiver.source.push(msg) + const mockReceiverSend = msg => initiator.source.push(msg) + const id = randomInt(1000) + const name = id.toString() + const initiator = createStream({ id, name, send: mockInitiatorSend, type: 'initiator' }) + const receiver = createStream({ id, name, send: mockReceiverSend, type: 'receiver' }) + + // echo back (on the other side this will be { type: MESSAGE, data: msg }) + pipe( + receiver, + tap(msg => { + // when the initiator sends a CLOSE message, we call close + if (msg.type === MessageTypes.CLOSE_INITIATOR) { + receiver.close() + } + }), + receiver + ) + + const input = await randomInput() + const msgs = await pipe( + input, + initiator, + tap(msg => { + // when the receiver sends a CLOSE message, we call close + if (msg.type === MessageTypes.CLOSE_RECEIVER) { + initiator.close() + } + }), + collect + ) + + // NEW_STREAM should have been echoed back to us + expectMsgType(msgs[0].type, MessageTypes.MESSAGE_RECEIVER) + expectMsgType(msgs[0].data.type, MessageTypes.NEW_STREAM) + + // check the receiver echoed back all our data messages + expect(msgs.slice(1, -2).length).to.equal(input.length) + + msgs.slice(1, -2).forEach((msg, i) => { + expectMsgType(msg.data.type, MessageTypes.MESSAGE_INITIATOR) + expect(msg.data.data).to.deep.equal(input[i]) + }) + + // ...and echoed back the close message + expectMsgType(msgs[msgs.length - 2].type, MessageTypes.MESSAGE_RECEIVER) + expectMsgType(msgs[msgs.length - 2].data.type, MessageTypes.CLOSE_INITIATOR) + + // ...and finally sent a close message + const closeMsg = msgs[msgs.length - 1] + + expectMsgType(closeMsg.type, MessageTypes.CLOSE_RECEIVER) + expect(closeMsg.data).to.not.exist() + }) + + it('should close for reading and writing (abort on local error)', async () => { + const mockInitiatorSend = msg => receiver.source.push(msg) + const mockReceiverSend = msg => initiator.source.push(msg) + const id = randomInt(1000) + const name = id.toString() + const initiator = createStream({ id, name, send: mockInitiatorSend, type: 'initiator' }) + const receiver = createStream({ id, name, send: mockReceiverSend, type: 'receiver' }) + + // echo back (on the other side this will be { type: MESSAGE, data: msg }) + pipe( + receiver, + tap(msg => { + // when the initiator sends a RESET message, we call reset + if (msg.type === MessageTypes.RESET_INITIATOR) { + receiver.reset() + } + }), + receiver + ) + + const input = infiniteRandom + const error = new Error(`Boom ${Date.now()}`) + const maxMsgs = randomInt(1, 10) + const generatedMsgs = [] + const msgs = [] + + try { + let i = 0 + + await pipe( + input, + tap(msg => generatedMsgs.push(msg)), + tap(msg => { if (i++ >= maxMsgs) initiator.abort(error) }), + initiator, + tap(msg => msgs.push(msg)), + consume + ) + } catch (err) { + expect(err.message).to.equal(error.message) + + // NEW_STREAM should have been echoed back to us + expectMsgType(msgs[0].type, MessageTypes.MESSAGE_RECEIVER) + expectMsgType(msgs[0].data.type, MessageTypes.NEW_STREAM) + + expect(msgs).to.have.length(generatedMsgs.length) + + // check the receiver echoed back all our data messages, and nothing else + msgs.slice(1).forEach((msg, i) => { + expectMsgType(msg.data.type, MessageTypes.MESSAGE_INITIATOR) + expect(msg.data.data).to.deep.equal(generatedMsgs[i]) + }) + } + }) + + it('should close for reading and writing (abort on remote error)', async () => { + const mockInitiatorSend = msg => receiver.source.push(msg) + const mockReceiverSend = msg => initiator.source.push(msg) + const id = randomInt(1000) + const name = id.toString() + const initiator = createStream({ id, name, send: mockInitiatorSend, type: 'initiator' }) + const receiver = createStream({ id, name, send: mockReceiverSend, type: 'receiver' }) + + const error = new Error(`Boom ${Date.now()}`) + const maxMsgs = randomInt(1, 10) + let i = 0 + + // echo back (on the other side this will be { type: MESSAGE, data: msg }) + pipe( + receiver, + tap(msg => { if (i++ >= maxMsgs) receiver.abort(error) }), + receiver + ) + + const input = infiniteRandom + const generatedMsgs = [] + const msgs = [] + + try { + await pipe( + input, + tap(msg => generatedMsgs.push(msg)), + initiator, + tap(msg => msgs.push(msg)), + tap(msg => { + // when the receiver sends a RESET message, we call reset + if (msg.type === MessageTypes.RESET_RECEIVER) { + initiator.reset() + } + }), + consume + ) + } catch (err) { + expect(err.message).to.equal('stream reset') + + // NEW_STREAM should have been echoed back to us + expectMsgType(msgs[0].type, MessageTypes.MESSAGE_RECEIVER) + expectMsgType(msgs[0].data.type, MessageTypes.NEW_STREAM) + + // because the receiver errored we might not have received all our data messages + expect(msgs.length - 2).to.be.lte(generatedMsgs.length) + + // check the receiver echoed back some/all our data messages + msgs.slice(1, -1).forEach((msg, i) => { + expectMsgType(msg.data.type, MessageTypes.MESSAGE_INITIATOR) + expect(msg.data.data).to.deep.equal(generatedMsgs[i]) + }) + + // ...and finally a RESET message + expectMsgType(msgs[msgs.length - 1].type, MessageTypes.RESET_RECEIVER) + } + }) + + it('should close immediately for reading and writing (reset on local error)', async () => { + const mockInitiatorSend = msg => receiver.source.push(msg) + const mockReceiverSend = msg => initiator.source.push(msg) + const id = randomInt(1000) + const name = id.toString() + const initiator = createStream({ id, name, send: mockInitiatorSend, type: 'initiator' }) + const receiver = createStream({ id, name, send: mockReceiverSend, type: 'receiver' }) + + // echo back (on the other side this will be { type: MESSAGE, data: msg }) + pipe( + receiver, + tap(msg => { + // when the initiator sends a RESET message, we call reset + if (msg.type === MessageTypes.RESET_INITIATOR) { + receiver.reset() + } + }), + receiver + ) + + const input = infiniteRandom + const error = new Error(`Boom ${Date.now()}`) + const maxMsgs = randomInt(1, 10) + const generatedMsgs = [] + const msgs = [] + + try { + let i = 0 + + await pipe( + input, + tap(msg => generatedMsgs.push(msg)), + tap(msg => { if (i++ >= maxMsgs) throw error }), + initiator, + tap(msg => msgs.push(msg)), + consume + ) + } catch (err) { + expect(err.message).to.equal(error.message) + + // NEW_STREAM should have been echoed back to us + expectMsgType(msgs[0].type, MessageTypes.MESSAGE_RECEIVER) + expectMsgType(msgs[0].data.type, MessageTypes.NEW_STREAM) + + // because we errored locally we might not receive all the echo messages + // from the receiver before our source stream is ended + expect(msgs.length - 1).to.be.lte(generatedMsgs.length) + + // check the receiver echoed back some/all our data messages, and nothing else + msgs.slice(1).forEach((msg, i) => { + expectMsgType(msg.data.type, MessageTypes.MESSAGE_INITIATOR) + expect(msg.data.data).to.deep.equal(generatedMsgs[i]) + }) + } + }) + + it('should close immediately for reading and writing (reset on remote error)', async () => { + const mockInitiatorSend = msg => receiver.source.push(msg) + const mockReceiverSend = msg => initiator.source.push(msg) + const id = randomInt(1000) + const name = id.toString() + const initiator = createStream({ id, name, send: mockInitiatorSend, type: 'initiator' }) + const receiver = createStream({ id, name, send: mockReceiverSend, type: 'receiver' }) + + const error = new Error(`Boom ${Date.now()}`) + const maxMsgs = randomInt(1, 10) + let i = 0 + + // echo back (on the other side this will be { type: MESSAGE, data: msg }) + pipe( + receiver, + tap(msg => { if (i++ >= maxMsgs) throw error }), + receiver + ) + + const input = infiniteRandom + const generatedMsgs = [] + const msgs = [] + + try { + await pipe( + input, + tap(msg => generatedMsgs.push(msg)), + initiator, + tap(msg => msgs.push(msg)), + tap(msg => { + // when the receiver sends a RESET message, we call reset + if (msg.type === MessageTypes.RESET_RECEIVER) { + initiator.reset() + } + }), + consume + ) + } catch (err) { + expect(err.message).to.equal('stream reset') + + // NEW_STREAM should have been echoed back to us + expectMsgType(msgs[0].type, MessageTypes.MESSAGE_RECEIVER) + expectMsgType(msgs[0].data.type, MessageTypes.NEW_STREAM) + + // because we errored locally we might not receive all the echo messages + // from the receiver before our source stream is ended + expect(msgs.length - 2).to.be.lte(generatedMsgs.length) + + // check the receiver echoed back some/all our data messages + msgs.slice(1, -1).forEach((msg, i) => { + expectMsgType(msg.data.type, MessageTypes.MESSAGE_INITIATOR) + expect(msg.data.data).to.deep.equal(generatedMsgs[i]) + }) + + // ...and finally a RESET message + expectMsgType(msgs[msgs.length - 1].type, MessageTypes.RESET_RECEIVER) + } + }) + + it('should call onEnd only when both sides have closed', async () => { + const send = msg => stream.source.push(msg) + const id = randomInt(1000) + const name = id.toString() + const deferred = defer() + const onEnd = err => err ? deferred.reject(err) : deferred.resolve() + const stream = createStream({ id, name, send, onEnd }) + const input = await randomInput() + + pipe(input, stream, take(randomInt(1, input.length)), consume) + await deferred.promise + }) + + it('should call onEnd with error for local error', async () => { + const send = msg => stream.source.push(msg) + const id = randomInt(1000) + const name = id.toString() + const deferred = defer() + const onEnd = err => err ? deferred.reject(err) : deferred.resolve() + const stream = createStream({ id, name, send, onEnd }) + + const error = new Error(`Boom ${Date.now()}`) + const maxMsgs = randomInt(1, 10) + let i = 0 + + pipe(infiniteRandom, tap(msg => { if (i++ >= maxMsgs) throw error }), stream) + + try { + await deferred.promise + } catch (err) { + return expect(err.message).to.equal(error.message) + } + throw new Error('did not call onEnd with error') + }) +})