Skip to content
This repository has been archived by the owner on Jun 26, 2023. It is now read-only.

Commit

Permalink
fix: close streams when connection is closed (#214)
Browse files Browse the repository at this point in the history
This adds tests for closeRead and closeWrite on the muxer streams.

Also:

- Connection.close() will now close its internal streams to avoid lingering streams.

Supersedes #90

BREAKING CHANGE: This adds closeWrite and closeRead checks in the tests, which will cause test failures for muxers that don't implement those
  • Loading branch information
achingbrain authored May 20, 2022
1 parent 873d49b commit 88fcd58
Show file tree
Hide file tree
Showing 8 changed files with 187 additions and 14 deletions.
1 change: 1 addition & 0 deletions packages/libp2p-connection/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,7 @@
},
"dependencies": {
"@libp2p/interfaces": "^1.3.0",
"@libp2p/logger": "^1.1.0",
"@multiformats/multiaddr": "^10.1.5",
"err-code": "^3.0.1"
},
Expand Down
14 changes: 14 additions & 0 deletions packages/libp2p-connection/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@ import { OPEN, CLOSING, CLOSED } from '@libp2p/interfaces/connection/status'
import { symbol } from '@libp2p/interfaces/connection'
import type { Connection, ConnectionStat, Metadata, ProtocolStream, Stream } from '@libp2p/interfaces/connection'
import type { PeerId } from '@libp2p/interfaces/peer-id'
import { logger } from '@libp2p/logger'

const log = logger('libp2p:connection')

interface ConnectionInit {
remoteAddr: Multiaddr
Expand Down Expand Up @@ -150,6 +153,17 @@ export class ConnectionImpl implements Connection {

this.stat.status = CLOSING

// close all streams - this can throw if we're not multiplexed
try {
await Promise.all(
this.streams.map(async s => await s.close().catch(err => {
log.error(err)
}))
)
} catch (err) {
log.error(err)
}

// Close raw connection
this._closing = true
await this._close()
Expand Down
13 changes: 7 additions & 6 deletions packages/libp2p-connection/test/compliance.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,13 @@ describe('compliance tests', () => {
const id = `${streamId++}`
const stream: Stream = {
...pair(),
close: () => {
void stream.sink(async function * () {}())
.then(() => {
connection.removeStream(stream.id)
})
.catch()
close: async () => {
await stream.sink(async function * () {}())
connection.removeStream(stream.id)
},
closeRead: async () => {},
closeWrite: async () => {
await stream.sink(async function * () {}())
},
id,
abort: () => {},
Expand Down
8 changes: 6 additions & 2 deletions packages/libp2p-connection/test/index.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,12 @@ describe('connection tests', () => {
const id = `${streamId++}`
const stream: Stream = {
...pair<Uint8Array>(),
close: () => {
void stream.sink(async function * () {}()).catch()
close: async () => {
await stream.sink(async function * () {}()).catch()
},
closeRead: async () => {},
closeWrite: async () => {
await stream.sink(async function * () {}())
},
id,
abort: () => {},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,9 @@ export function mockConnection (maConn: MultiaddrConnection, opts: MockConnectio
export function mockStream (stream: Duplex<Uint8Array>): Stream {
return {
...stream,
close: () => {},
close: async () => {},
closeRead: async () => {},
closeWrite: async () => {},
abort: () => {},
reset: () => {},
timeline: {
Expand Down
18 changes: 15 additions & 3 deletions packages/libp2p-interface-compliance-tests/src/mocks/muxer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,15 @@ class MuxedStream {
source: this.input,

// Close for reading
close: () => {
close: async () => {
this.input.end()
},

closeRead: async () => {
this.input.end()
},

closeWrite: async () => {
this.input.end()
},

Expand Down Expand Up @@ -242,7 +250,11 @@ class MockMuxer implements StreamMuxer {
onEnd: (err) => {
this.log('closing muxed streams')
for (const stream of this.streams) {
stream.abort(err)
if (err == null) {
void stream.close().catch()
} else {
stream.abort(err)
}
}
}
})
Expand Down Expand Up @@ -307,7 +319,7 @@ class MockMuxer implements StreamMuxer {
muxedStream.stream.reset()
} else if (message.type === 'close') {
this.log('-> closing stream %s %s', muxedStream.type, muxedStream.stream.id)
muxedStream.stream.close()
void muxedStream.stream.close()
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ import delay from 'delay'
import type { TestSetup } from '../index.js'
import type { StreamMuxerFactory } from '@libp2p/interfaces/stream-muxer'
import { Components } from '@libp2p/interfaces/components'
import pDefer from 'p-defer'
import all from 'it-all'

function randomBuffer () {
return uint8ArrayFromString(Math.random().toString())
Expand Down Expand Up @@ -113,5 +115,113 @@ export default (common: TestSetup<StreamMuxerFactory>) => {
// These should now all resolve without error
await Promise.all(streamResults)
})

it('can close a stream for writing', async () => {
const deferred = pDefer<any>()

const p = duplexPair<Uint8Array>()
const dialerFactory = await common.setup()
const dialer = dialerFactory.createStreamMuxer(new Components())
const data = [randomBuffer(), randomBuffer()]

const listenerFactory = await common.setup()
const listener = listenerFactory.createStreamMuxer(new Components(), {
onIncomingStream: (stream) => {
void Promise.resolve().then(async () => {
// Immediate close for write
await stream.closeWrite()

const results = await pipe(stream, async (source) => {
const data = []
for await (const chunk of source) {
data.push(chunk.slice())
}
return data
})
expect(results).to.eql(data)

try {
await stream.sink([randomBuffer()])
} catch (err) {
deferred.resolve(err)
}

deferred.reject(new Error('should not support writing to closed writer'))
})
}
})

void pipe(p[0], dialer, p[0])
void pipe(p[1], listener, p[1])

const stream = dialer.newStream()
await stream.sink(data)

const err = await deferred.promise
expect(err).to.have.property('message').that.matches(/stream closed for writing/)
})

it('can close a stream for reading', async () => {
const deferred = pDefer<any>()

const p = duplexPair<Uint8Array>()
const dialerFactory = await common.setup()
const dialer = dialerFactory.createStreamMuxer(new Components())
const data = [randomBuffer(), randomBuffer()]

const listenerFactory = await common.setup()
const listener = listenerFactory.createStreamMuxer(new Components(), {
onIncomingStream: (stream) => {
void all(stream.source).then(deferred.resolve, deferred.reject)
}
})

void pipe(p[0], dialer, p[0])
void pipe(p[1], listener, p[1])

const stream = dialer.newStream()
await stream.closeRead()

// Source should be done
void Promise.resolve().then(async () => {
// @ts-expect-error next is part of the iterable protocol
expect(await stream.source.next()).to.have.property('done', true)
await stream.sink(data)
})

const results = await deferred.promise
expect(results).to.eql(data)
})

it('calls onStreamEnd for closed streams not previously written', async () => {
const deferred = pDefer()

const onStreamEnd = () => deferred.resolve()
const dialerFactory = await common.setup()
const dialer = dialerFactory.createStreamMuxer(new Components(), {
onStreamEnd
})

const stream = await dialer.newStream()

await stream.close()
await deferred.promise
})

it('calls onStreamEnd for read and write closed streams not previously written', async () => {
const deferred = pDefer()

const onStreamEnd = () => deferred.resolve()
const dialerFactory = await common.setup()
const dialer = dialerFactory.createStreamMuxer(new Components(), {
onStreamEnd
})

const stream = await dialer.newStream()

await stream.closeWrite()
await stream.closeRead()
await deferred.promise
})
})
}
33 changes: 31 additions & 2 deletions packages/libp2p-interfaces/src/connection/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,39 @@ export interface Metadata {
* configuration of the nodes.
*/
export interface Stream extends Duplex<Uint8Array> {
close: () => void
abort: (err?: Error) => void
/**
* Close a stream for reading and writing
*/
close: () => Promise<void>

/**
* Close a stream for reading only
*/
closeRead: () => Promise<void>

/**
* Close a stream for writing only
*/
closeWrite: () => Promise<void>

/**
* Call when a local error occurs, should close the stream for reading and writing
*/
abort: (err: Error) => void

/**
* Call when a remote error occurs, should close the stream for reading and writing
*/
reset: () => void

/**
* Records stream lifecycle event timings
*/
timeline: Timeline

/**
* Unique identifier for a stream
*/
id: string
}

Expand Down

0 comments on commit 88fcd58

Please sign in to comment.