diff --git a/package.json b/package.json index cbe5619..964c9dc 100644 --- a/package.json +++ b/package.json @@ -148,7 +148,6 @@ "@libp2p/utils": "^3.0.2", "@multiformats/mafmt": "^11.0.3", "@multiformats/multiaddr": "^11.0.0", - "abortable-iterator": "^4.0.2", "err-code": "^3.0.1", "stream-to-it": "^0.2.2" }, diff --git a/src/index.ts b/src/index.ts index b15b907..b3e19ff 100644 --- a/src/index.ts +++ b/src/index.ts @@ -94,6 +94,7 @@ class TCP implements Transport { async dial (ma: Multiaddr, options: TCPDialOptions): Promise { options.keepAlive = options.keepAlive ?? true + // options.signal destroys the socket before 'connect' event const socket = await this._connect(ma, options) // Avoid uncaught errors caused by unstable connections @@ -103,14 +104,32 @@ class TCP implements Transport { const maConn = toMultiaddrConnection(socket, { remoteAddr: ma, - signal: options.signal, socketInactivityTimeout: this.opts.outboundSocketInactivityTimeout, socketCloseTimeout: this.opts.socketCloseTimeout, metrics: this.metrics?.dialerEvents }) + + const onAbort = () => { + maConn.close().catch(err => { + log.error('Error closing maConn after abort', err) + }) + } + options.signal?.addEventListener('abort', onAbort, { once: true }) + log('new outbound connection %s', maConn.remoteAddr) const conn = await options.upgrader.upgradeOutbound(maConn) - log('outbound connection upgraded %s', maConn.remoteAddr) + log('outbound connection %s upgraded', maConn.remoteAddr) + + options.signal?.removeEventListener('abort', onAbort) + + if (options.signal?.aborted === true) { + conn.close().catch(err => { + log.error('Error closing conn after abort', err) + }) + + throw new AbortError() + } + return conn } diff --git a/src/socket-to-conn.ts b/src/socket-to-conn.ts index efd7e8a..88f99cb 100644 --- a/src/socket-to-conn.ts +++ b/src/socket-to-conn.ts @@ -1,4 +1,3 @@ -import { abortableSource } from 'abortable-iterator' import { logger } from '@libp2p/logger' // @ts-expect-error no types import toIterable from 'stream-to-it' @@ -17,7 +16,6 @@ interface ToConnectionOptions { listeningAddr?: Multiaddr remoteAddr?: Multiaddr localAddr?: Multiaddr - signal?: AbortSignal socketInactivityTimeout?: number socketCloseTimeout?: number metrics?: CounterGroup @@ -99,10 +97,6 @@ export const toMultiaddrConnection = (socket: Socket, options: ToConnectionOptio const maConn: MultiaddrConnection = { async sink (source) { - if ((options?.signal) != null) { - source = abortableSource(source, options.signal) - } - try { await sink(source) } catch (err: any) { @@ -119,7 +113,7 @@ export const toMultiaddrConnection = (socket: Socket, options: ToConnectionOptio socket.end() }, - source: (options.signal != null) ? abortableSource(source, options.signal) : source, + source, // If the remote address was passed, use it - it may have the peer ID encapsulated remoteAddr, diff --git a/test/listen-dial.spec.ts b/test/listen-dial.spec.ts index 59bfb3a..ded447d 100644 --- a/test/listen-dial.spec.ts +++ b/test/listen-dial.spec.ts @@ -8,6 +8,8 @@ import all from 'it-all' import { mockRegistrar, mockUpgrader } from '@libp2p/interface-mocks' import { fromString as uint8ArrayFromString } from 'uint8arrays/from-string' import type { Transport, Upgrader } from '@libp2p/interface-transport' +import pDefer from 'p-defer' +import type { MultiaddrConnection } from '@libp2p/interface-connection' const isCI = process.env.CI @@ -20,6 +22,7 @@ describe('listen', () => { transport = tcp()() upgrader = mockUpgrader() }) + afterEach(async () => { try { if (listener != null) { @@ -326,4 +329,57 @@ describe('dial', () => { await conn.close() await listener.close() }) + + it('aborts during dial', async () => { + const ma = multiaddr('/ip4/127.0.0.1/tcp/9090/ipfs/Qmb6owHp6eaWArVbcJJbQSyifyJBttMMjYV76N2hMbf5Vw') + const maConnPromise = pDefer() + + // @ts-expect-error missing return value + upgrader.upgradeOutbound = async (maConn) => { + maConnPromise.resolve(maConn) + + // take a long time to give us time to abort the dial + await new Promise((resolve) => { + setTimeout(() => resolve(), 100) + }) + } + + const listener = transport.createListener({ + upgrader + }) + await listener.listen(ma) + + const abortController = new AbortController() + + // abort once the upgrade process has started + void maConnPromise.promise.then(() => abortController.abort()) + + await expect(transport.dial(ma, { + upgrader, + signal: abortController.signal + })).to.eventually.be.rejected('The operation was aborted') + + await expect(maConnPromise.promise).to.eventually.have.nested.property('timeline.close') + .that.is.ok('did not gracefully close maConn') + + await listener.close() + }) + + it('aborts before dial', async () => { + const ma = multiaddr('/ip4/127.0.0.1/tcp/9090/ipfs/Qmb6owHp6eaWArVbcJJbQSyifyJBttMMjYV76N2hMbf5Vw') + const listener = transport.createListener({ + upgrader + }) + await listener.listen(ma) + + const abortController = new AbortController() + abortController.abort() + + await expect(transport.dial(ma, { + upgrader, + signal: abortController.signal + })).to.eventually.be.rejected('The operation was aborted') + + await listener.close() + }) })