diff --git a/src/connection/connection.js b/src/connection/connection.js index 168ff114b..56680095d 100644 --- a/src/connection/connection.js +++ b/src/connection/connection.js @@ -220,6 +220,8 @@ class Connection { this.stat.status = CLOSING + await Promise.all(this.streams.map(s => s.close && s.close())) + // Close raw connection this._closing = await this._close() diff --git a/src/connection/tests/connection.js b/src/connection/tests/connection.js index 30e03277a..1d043dbb9 100644 --- a/src/connection/tests/connection.js +++ b/src/connection/tests/connection.js @@ -3,9 +3,7 @@ 'use strict' -const chai = require('chai') -const expect = chai.expect -chai.use(require('dirty-chai')) +const { expect } = require('aegir/utils/chai') const sinon = require('sinon') const Status = require('../status') diff --git a/src/stream-muxer/tests/close-test.js b/src/stream-muxer/tests/close-test.js index 480ca80a9..468795d57 100644 --- a/src/stream-muxer/tests/close-test.js +++ b/src/stream-muxer/tests/close-test.js @@ -3,17 +3,15 @@ /* eslint max-nested-callbacks: ["error", 8] */ 'use strict' +const { expect } = require('aegir/utils/chai') const pair = require('it-pair/duplex') const { pipe } = require('it-pipe') +const pDefer = require('p-defer') const { consume } = require('streaming-iterables') -const Tcp = require('libp2p-tcp') -const { Multiaddr } = require('multiaddr') const { source: abortable } = require('abortable-iterator') const AbortController = require('abort-controller').default const uint8arrayFromString = require('uint8arrays/from-string') -const mh = new Multiaddr('/ip4/127.0.0.1/tcp/0') - function pause (ms) { return new Promise(resolve => setTimeout(resolve, ms)) } @@ -39,33 +37,31 @@ module.exports = (common) => { Muxer = await common.setup() }) - it('closing underlying socket closes streams (tcp)', async () => { + it('closing underlying socket closes streams', async () => { const mockConn = muxer => ({ newStream: (...args) => muxer.newStream(...args) }) - const mockUpgrade = () => maConn => { + const mockUpgrade = maConn => { const muxer = new Muxer(stream => pipe(stream, stream)) pipe(maConn, muxer, maConn) return mockConn(muxer) } - const mockUpgrader = () => ({ - upgradeInbound: mockUpgrade(), - upgradeOutbound: mockUpgrade() + const [local, remote] = pair() + const controller = new AbortController() + const abortableRemote = abortable.duplex(remote, controller.signal, { + returnOnAbort: true }) - const tcp = new Tcp({ upgrader: mockUpgrader() }) - const tcpListener = tcp.createListener() - - await tcpListener.listen(mh) - const dialerConn = await tcp.dial(tcpListener.getAddrs()[0]) + mockUpgrade(abortableRemote) + const dialerConn = mockUpgrade(local) const s1 = await dialerConn.newStream() const s2 = await dialerConn.newStream() - // close the listener in a bit - setTimeout(() => tcpListener.close(), 50) + // close the remote in a bit + setTimeout(() => controller.abort(), 50) const s1Result = pipe(infiniteRandom, s1, consume) const s2Result = pipe(infiniteRandom, s2, consume) @@ -116,5 +112,94 @@ module.exports = (common) => { // These should now all resolve without error await Promise.all(streamResults) }) + + it('can close a stream for writing', (done) => { + const p = pair() + const dialer = new Muxer() + const data = [randomBuffer(), randomBuffer()] + + const listener = new Muxer(async stream => { + // Immediate close for write + await stream.closeWrite() + + const results = await pipe(stream, async (source) => { + const data = [] + for await (const chunk of source) { + data.push(chunk.slice()) + } + return data + }) + expect(results).to.eql(data) + + try { + await stream.sink([randomBuffer()]) + } catch (err) { + expect(err).to.exist() + return done() + } + expect.fail('should not support writing to closed writer') + }) + + pipe(p[0], dialer, p[0]) + pipe(p[1], listener, p[1]) + + const stream = dialer.newStream() + stream.sink(data) + }) + + it('can close a stream for reading', (done) => { + const p = pair() + const dialer = new Muxer() + const data = [randomBuffer(), randomBuffer()] + + const listener = new Muxer(async stream => { + const results = await pipe(stream, async (source) => { + const data = [] + for await (const chunk of source) { + data.push(chunk.slice()) + } + return data + }) + expect(results).to.eql(data) + done() + }) + + pipe(p[0], dialer, p[0]) + pipe(p[1], listener, p[1]) + + const stream = dialer.newStream() + stream.closeRead() + + // Source should be done + ; (async () => { + expect(await stream.source.next()).to.eql({ done: true }) + stream.sink(data) + })() + }) + + it('calls onStreamEnd for closed streams not previously written', async () => { + const deferred = pDefer() + + const onStreamEnd = () => deferred.resolve() + const dialer = new Muxer({ onStreamEnd }) + + const stream = await dialer.newStream() + + stream.close() + await deferred.promise + }) + + it('calls onStreamEnd for read and write closed streams not previously written', async () => { + const deferred = pDefer() + + const onStreamEnd = () => deferred.resolve() + const dialer = new Muxer({ onStreamEnd }) + + const stream = await dialer.newStream() + + stream.closeWrite() + stream.closeRead() + await deferred.promise + }) }) } diff --git a/src/stream-muxer/types.d.ts b/src/stream-muxer/types.d.ts index cf0c6f43a..1735ef4dc 100644 --- a/src/stream-muxer/types.d.ts +++ b/src/stream-muxer/types.d.ts @@ -39,7 +39,9 @@ export type MuxedTimeline = { } export interface MuxedStream extends AsyncIterable { - close: () => void; + close: () => Promise; + closeRead: () => Promise; + closeWrite: () => Promise; abort: () => void; reset: () => void; sink: Sink;