diff --git a/packages/libp2p/src/transport-manager.ts b/packages/libp2p/src/transport-manager.ts index 070503764a..38f5359616 100644 --- a/packages/libp2p/src/transport-manager.ts +++ b/packages/libp2p/src/transport-manager.ts @@ -262,12 +262,22 @@ export class DefaultTransportManager implements TransportManager, Startable { * If a transport has any running listeners, they will be closed. */ async remove (key: string): Promise { - log('removing %s', key) + const listeners = this.listeners.get(key) ?? [] + log.trace('removing transport %s', key) // Close any running listeners - for (const listener of this.listeners.get(key) ?? []) { - await listener.close() + const tasks = [] + log.trace('closing listeners for %s', key) + while (listeners.length > 0) { + const listener = listeners.pop() + + if (listener == null) { + continue + } + + tasks.push(listener.close()) } + await Promise.all(tasks) this.transports.delete(key) this.listeners.delete(key) diff --git a/packages/transport-tcp/src/listener.ts b/packages/transport-tcp/src/listener.ts index c7de579471..e6e7b8e1cf 100644 --- a/packages/transport-tcp/src/listener.ts +++ b/packages/transport-tcp/src/listener.ts @@ -1,4 +1,5 @@ import net from 'net' +import { CodeError } from '@libp2p/interface/errors' import { EventEmitter, CustomEvent } from '@libp2p/interface/events' import { logger } from '@libp2p/logger' import { CODE_P2P } from './constants.js' @@ -46,17 +47,25 @@ interface Context extends TCPCreateListenerOptions { closeServerOnMaxConnections?: CloseServerOnMaxConnectionsOpts } -const SERVER_STATUS_UP = 1 -const SERVER_STATUS_DOWN = 0 - export interface TCPListenerMetrics { status: MetricGroup errors: CounterGroup events: CounterGroup } -type Status = { started: false } | { - started: true +enum TCPListenerStatusCode { + /** + * When server object is initialized but we don't know the listening address yet or + * the server object is stopped manually, can be resumed only by calling listen() + **/ + INACTIVE = 0, + ACTIVE = 1, + /* During the connection limits */ + PAUSED = 2, +} + +type Status = { code: TCPListenerStatusCode.INACTIVE } | { + code: Exclude listeningAddr: Multiaddr peerId: string | null netConfig: NetConfig @@ -66,7 +75,7 @@ export class TCPListener extends EventEmitter implements Listene private readonly server: net.Server /** Keep track of open connections to destroy in case of timeout */ private readonly connections = new Set() - private status: Status = { started: false } + private status: Status = { code: TCPListenerStatusCode.INACTIVE } private metrics?: TCPListenerMetrics private addr: string @@ -88,7 +97,7 @@ export class TCPListener extends EventEmitter implements Listene if (context.closeServerOnMaxConnections != null) { // Sanity check options if (context.closeServerOnMaxConnections.closeAbove < context.closeServerOnMaxConnections.listenBelow) { - throw Error('closeAbove must be >= listenBelow') + throw new CodeError('closeAbove must be >= listenBelow', 'ERROR_CONNECTION_LIMITS') } } @@ -133,7 +142,7 @@ export class TCPListener extends EventEmitter implements Listene } this.metrics?.status.update({ - [this.addr]: SERVER_STATUS_UP + [this.addr]: TCPListenerStatusCode.ACTIVE }) } @@ -145,13 +154,22 @@ export class TCPListener extends EventEmitter implements Listene }) .on('close', () => { this.metrics?.status.update({ - [this.addr]: SERVER_STATUS_DOWN + [this.addr]: this.status.code }) - this.dispatchEvent(new CustomEvent('close')) + + // If this event is emitted, the transport manager will remove the listener from it's cache + // in the meanwhile if the connections are dropped then listener will start listening again + // and the transport manager will not be able to close the server + if (this.status.code !== TCPListenerStatusCode.PAUSED) { + this.dispatchEvent(new CustomEvent('close')) + } }) } private onSocket (socket: net.Socket): void { + if (this.status.code !== TCPListenerStatusCode.ACTIVE) { + throw new CodeError('Server is is not listening yet', 'ERR_SERVER_NOT_RUNNING') + } // Avoid uncaught errors caused by unstable connections socket.on('error', err => { log('socket error', err) @@ -161,7 +179,7 @@ export class TCPListener extends EventEmitter implements Listene let maConn: MultiaddrConnection try { maConn = toMultiaddrConnection(socket, { - listeningAddr: this.status.started ? this.status.listeningAddr : undefined, + listeningAddr: this.status.listeningAddr, socketInactivityTimeout: this.context.socketInactivityTimeout, socketCloseTimeout: this.context.socketCloseTimeout, metrics: this.metrics?.events, @@ -189,9 +207,9 @@ export class TCPListener extends EventEmitter implements Listene ) { // The most likely case of error is if the port taken by this application is binded by // another process during the time the server if closed. In that case there's not much - // we can do. netListen() will be called again every time a connection is dropped, which + // we can do. resume() will be called again every time a connection is dropped, which // acts as an eventual retry mechanism. onListenError allows the consumer act on this. - this.netListen().catch(e => { + this.resume().catch(e => { log.error('error attempting to listen server once connection count under limit', e) this.context.closeServerOnMaxConnections?.onListenError?.(e as Error) }) @@ -206,7 +224,9 @@ export class TCPListener extends EventEmitter implements Listene this.context.closeServerOnMaxConnections != null && this.connections.size >= this.context.closeServerOnMaxConnections.closeAbove ) { - this.netClose() + this.pause(false).catch(e => { + log.error('error attempting to close server once connection count over limit', e) + }) } this.dispatchEvent(new CustomEvent('connection', { detail: conn })) @@ -232,7 +252,7 @@ export class TCPListener extends EventEmitter implements Listene } getAddrs (): Multiaddr[] { - if (!this.status.started) { + if (this.status.code === TCPListenerStatusCode.INACTIVE) { return [] } @@ -264,35 +284,44 @@ export class TCPListener extends EventEmitter implements Listene } async listen (ma: Multiaddr): Promise { - if (this.status.started) { - throw Error('server is already listening') + if (this.status.code === TCPListenerStatusCode.ACTIVE || this.status.code === TCPListenerStatusCode.PAUSED) { + throw new CodeError('server is already listening', 'ERR_SERVER_ALREADY_LISTENING') } const peerId = ma.getPeerId() const listeningAddr = peerId == null ? ma.decapsulateCode(CODE_P2P) : ma const { backlog } = this.context - this.status = { - started: true, - listeningAddr, - peerId, - netConfig: multiaddrToNetConfig(listeningAddr, { backlog }) - } + try { + this.status = { + code: TCPListenerStatusCode.ACTIVE, + listeningAddr, + peerId, + netConfig: multiaddrToNetConfig(listeningAddr, { backlog }) + } - await this.netListen() + await this.resume() + } catch (err) { + this.status = { code: TCPListenerStatusCode.INACTIVE } + throw err + } } async close (): Promise { - await Promise.all( - Array.from(this.connections.values()).map(async maConn => { await attemptClose(maConn) }) - ) - - // netClose already checks if server.listening - this.netClose() + // Close connections and server the same time to avoid any race condition + await Promise.all([ + Promise.all(Array.from(this.connections.values()).map(async maConn => attemptClose(maConn))), + this.pause(true).catch(e => { + log.error('error attempting to close server once connection count over limit', e) + }) + ]) } - private async netListen (): Promise { - if (!this.status.started || this.server.listening) { + /** + * Can resume a stopped or start an inert server + */ + private async resume (): Promise { + if (this.server.listening || this.status.code === TCPListenerStatusCode.INACTIVE) { return } @@ -304,11 +333,17 @@ export class TCPListener extends EventEmitter implements Listene this.server.listen(netConfig, resolve) }) + this.status = { ...this.status, code: TCPListenerStatusCode.ACTIVE } log('Listening on %s', this.server.address()) } - private netClose (): void { - if (!this.status.started || !this.server.listening) { + private async pause (permanent: boolean): Promise { + if (!this.server.listening && this.status.code === TCPListenerStatusCode.PAUSED && permanent) { + this.status = { code: TCPListenerStatusCode.INACTIVE } + return + } + + if (!this.server.listening || this.status.code !== TCPListenerStatusCode.ACTIVE) { return } @@ -326,9 +361,12 @@ export class TCPListener extends EventEmitter implements Listene // Stops the server from accepting new connections and keeps existing connections. // 'close' event is emitted only emitted when all connections are ended. // The optional callback will be called once the 'close' event occurs. - // - // NOTE: Since we want to keep existing connections and have checked `!this.server.listening` it's not necessary - // to pass a callback to close. - this.server.close() + + // We need to set this status before closing server, so other procedures are aware + // during the time the server is closing + this.status = permanent ? { code: TCPListenerStatusCode.INACTIVE } : { ...this.status, code: TCPListenerStatusCode.PAUSED } + await new Promise((resolve, reject) => { + this.server.close(err => { (err != null) ? reject(err) : resolve() }) + }) } } diff --git a/packages/transport-tcp/test/connection-limits.spec.ts b/packages/transport-tcp/test/connection-limits.spec.ts new file mode 100644 index 0000000000..f2f81deb1c --- /dev/null +++ b/packages/transport-tcp/test/connection-limits.spec.ts @@ -0,0 +1,217 @@ +import net from 'node:net' +import { promisify } from 'util' +import { EventEmitter } from '@libp2p/interface/events' +import { mockUpgrader } from '@libp2p/interface-compliance-tests/mocks' +import { multiaddr } from '@multiformats/multiaddr' +import { expect } from 'aegir/chai' +import { tcp } from '../src/index.js' +import type { TCPListener } from '../src/listener.js' + +const buildSocketAssertions = (port: number, closeCallbacks: Array<() => Promise | any>): { assertConnectedSocket: (i: number) => Promise, assertRefusedSocket: (i: number) => Promise } => { + function createSocket (i: number): net.Socket { + const socket = net.connect({ host: '127.0.0.1', port }) + + closeCallbacks.unshift(async function closeHandler (): Promise { + if (!socket.destroyed) { + socket.destroy() + await new Promise((resolve) => socket.on('close', resolve)) + } + }) + return socket + } + + async function assertConnectedSocket (i: number): Promise { + const socket = createSocket(i) + + await new Promise((resolve, reject) => { + socket.once('connect', () => { + resolve() + }) + socket.once('error', (err) => { + err.message = `Socket[${i}] ${err.message}` + reject(err) + }) + }) + + return socket + } + + async function assertRefusedSocket (i: number): Promise { + const socket = createSocket(i) + + await new Promise((resolve, reject) => { + socket.once('connect', () => { + reject(Error(`Socket[${i}] connected but was expected to reject`)) + }) + socket.once('error', (err) => { + if (err.message.includes('ECONNREFUSED')) { + resolve() + } else { + err.message = `Socket[${i}] unexpected error ${err.message}` + reject(err) + } + }) + }) + + return socket + } + + return { assertConnectedSocket, assertRefusedSocket } +} + +async function assertServerConnections (listener: TCPListener, connections: number): Promise { + // Expect server connections but allow time for sockets to connect or disconnect + for (let i = 0; i < 100; i++) { + // eslint-disable-next-line @typescript-eslint/dot-notation + if (listener['connections'].size === connections) { + return + } else { + await promisify(setTimeout)(10) + } + } + // eslint-disable-next-line @typescript-eslint/dot-notation + expect(listener['connections'].size).equals(connections, 'invalid amount of server connections') +} + +describe('closeAbove/listenBelow', () => { + const afterEachCallbacks: Array<() => Promise | any> = [] + afterEach(async () => { + await Promise.all(afterEachCallbacks.map(fn => fn())) + afterEachCallbacks.length = 0 + }) + + it('reject dial of connection above closeAbove', async () => { + const listenBelow = 2 + const closeAbove = 3 + const port = 9900 + + const trasnport = tcp({ closeServerOnMaxConnections: { listenBelow, closeAbove } })() + + const upgrader = mockUpgrader({ + events: new EventEmitter() + }) + const listener = trasnport.createListener({ upgrader }) as TCPListener + // eslint-disable-next-line @typescript-eslint/promise-function-async + afterEachCallbacks.push(() => listener.close()) + await listener.listen(multiaddr(`/ip4/127.0.0.1/tcp/${port}`)) + const { assertConnectedSocket, assertRefusedSocket } = buildSocketAssertions(port, afterEachCallbacks) + + await assertConnectedSocket(1) + await assertConnectedSocket(2) + await assertConnectedSocket(3) + await assertServerConnections(listener, 3) + + // Limit reached, server should be closed here + await assertRefusedSocket(4) + await assertRefusedSocket(5) + await assertServerConnections(listener, 3) + }) + + it('accepts dial of connection when connection drop listenBelow limit', async () => { + const listenBelow = 2 + const closeAbove = 3 + const port = 9900 + + const trasnport = tcp({ closeServerOnMaxConnections: { listenBelow, closeAbove } })() + + const upgrader = mockUpgrader({ + events: new EventEmitter() + }) + const listener = trasnport.createListener({ upgrader }) as TCPListener + // eslint-disable-next-line @typescript-eslint/promise-function-async + afterEachCallbacks.push(() => listener.close()) + await listener.listen(multiaddr(`/ip4/127.0.0.1/tcp/${port}`)) + const { assertConnectedSocket } = buildSocketAssertions(port, afterEachCallbacks) + + const socket1 = await assertConnectedSocket(1) + const socket2 = await assertConnectedSocket(2) + await assertConnectedSocket(3) + await assertServerConnections(listener, 3) + + // Destroy sockets to be have connections < listenBelow + socket1.destroy() + socket2.destroy() + // After destroying 2 sockets connections will be below "listenBelow" limit + await assertServerConnections(listener, 1) + + // Now it should be able to accept new connections + await assertConnectedSocket(4) + await assertConnectedSocket(5) + + // 2 connections dropped and 2 new connections accepted + await assertServerConnections(listener, 3) + }) + + it('should not emit "close" event when server is stopped due to "closeAbove" limit', async () => { + const listenBelow = 2 + const closeAbove = 3 + const port = 9900 + + const trasnport = tcp({ closeServerOnMaxConnections: { listenBelow, closeAbove } })() + + const upgrader = mockUpgrader({ + events: new EventEmitter() + }) + const listener = trasnport.createListener({ upgrader }) as TCPListener + // eslint-disable-next-line @typescript-eslint/promise-function-async + afterEachCallbacks.push(() => listener.close()) + + let closeEventCallCount = 0 + listener.addEventListener('close', () => { + closeEventCallCount += 1 + }) + + await listener.listen(multiaddr(`/ip4/127.0.0.1/tcp/${port}`)) + const { assertConnectedSocket } = buildSocketAssertions(port, afterEachCallbacks) + + await assertConnectedSocket(1) + await assertConnectedSocket(2) + await assertConnectedSocket(3) + await assertServerConnections(listener, 3) + + // Limit reached, server should be closed but should not emit "close" event + expect(closeEventCallCount).equals(0) + }) + + it('should emit "listening" event when server is resumed due to "listenBelow" limit', async () => { + const listenBelow = 2 + const closeAbove = 3 + const port = 9900 + + const trasnport = tcp({ closeServerOnMaxConnections: { listenBelow, closeAbove } })() + + const upgrader = mockUpgrader({ + events: new EventEmitter() + }) + const listener = trasnport.createListener({ upgrader }) as TCPListener + // eslint-disable-next-line @typescript-eslint/promise-function-async + afterEachCallbacks.push(() => listener.close()) + + let listeningEventCallCount = 0 + listener.addEventListener('listening', () => { + listeningEventCallCount += 1 + }) + + await listener.listen(multiaddr(`/ip4/127.0.0.1/tcp/${port}`)) + const { assertConnectedSocket } = buildSocketAssertions(port, afterEachCallbacks) + + // Server should be listening now + expect(listeningEventCallCount).equals(1) + + const socket1 = await assertConnectedSocket(1) + const socket2 = await assertConnectedSocket(2) + await assertConnectedSocket(3) + // Limit reached, server should be closed now + await assertServerConnections(listener, 3) + + // Close some sockets to resume listening + socket1.destroy() + socket2.destroy() + + // Wait for listener to emit event + await promisify(setTimeout)(50) + + // Server should emit the "listening" event again + expect(listeningEventCallCount).equals(2) + }) +}) diff --git a/packages/transport-tcp/test/max-connections-close.spec.ts b/packages/transport-tcp/test/max-connections-close.spec.ts deleted file mode 100644 index 177c71bbe9..0000000000 --- a/packages/transport-tcp/test/max-connections-close.spec.ts +++ /dev/null @@ -1,121 +0,0 @@ -import net from 'node:net' -import { promisify } from 'util' -import { EventEmitter } from '@libp2p/interface/events' -import { mockUpgrader } from '@libp2p/interface-compliance-tests/mocks' -import { multiaddr } from '@multiformats/multiaddr' -import { expect } from 'aegir/chai' -import { tcp } from '../src/index.js' -import type { TCPListener } from '../src/listener.js' - -describe('close server on maxConnections', () => { - const afterEachCallbacks: Array<() => Promise | any> = [] - afterEach(async () => { - await Promise.all(afterEachCallbacks.map(fn => fn())) - afterEachCallbacks.length = 0 - }) - - it('reject dial of connection above closeAbove', async () => { - const listenBelow = 2 - const closeAbove = 3 - const port = 9900 - - const seenRemoteConnections = new Set() - const trasnport = tcp({ closeServerOnMaxConnections: { listenBelow, closeAbove } })() - - const upgrader = mockUpgrader({ - events: new EventEmitter() - }) - const listener = trasnport.createListener({ upgrader }) as TCPListener - // eslint-disable-next-line @typescript-eslint/promise-function-async - afterEachCallbacks.push(() => listener.close()) - await listener.listen(multiaddr(`/ip4/127.0.0.1/tcp/${port}`)) - - listener.addEventListener('connection', (conn) => { - seenRemoteConnections.add(conn.detail.remoteAddr.toString()) - }) - - function createSocket (): net.Socket { - const socket = net.connect({ host: '127.0.0.1', port }) - - // eslint-disable-next-line @typescript-eslint/promise-function-async - afterEachCallbacks.unshift(async () => { - if (!socket.destroyed) { - socket.destroy() - await new Promise((resolve) => socket.on('close', resolve)) - } - }) - - return socket - } - - async function assertConnectedSocket (i: number): Promise { - const socket = createSocket() - - await new Promise((resolve, reject) => { - socket.once('connect', () => { - resolve() - }) - socket.once('error', (err) => { - err.message = `Socket[${i}] ${err.message}` - reject(err) - }) - }) - - return socket - } - - async function assertRefusedSocket (i: number): Promise { - const socket = createSocket() - - await new Promise((resolve, reject) => { - socket.once('connect', () => { - reject(Error(`Socket[${i}] connected but was expected to reject`)) - }) - socket.once('error', (err) => { - if (err.message.includes('ECONNREFUSED')) { - resolve() - } else { - err.message = `Socket[${i}] unexpected error ${err.message}` - reject(err) - } - }) - }) - } - - async function assertServerConnections (connections: number): Promise { - // Expect server connections but allow time for sockets to connect or disconnect - for (let i = 0; i < 100; i++) { - // eslint-disable-next-line @typescript-eslint/dot-notation - if (listener['connections'].size === connections) { - return - } else { - await promisify(setTimeout)(10) - } - } - // eslint-disable-next-line @typescript-eslint/dot-notation - expect(listener['connections'].size).equals(connections, 'Wrong server connections') - } - - const socket1 = await assertConnectedSocket(1) - const socket2 = await assertConnectedSocket(2) - const socket3 = await assertConnectedSocket(3) - await assertServerConnections(3) - // Limit reached, server should be closed here - await assertRefusedSocket(4) - await assertRefusedSocket(5) - // Destroy sockets to be have connections < listenBelow - socket1.destroy() - socket2.destroy() - await assertServerConnections(1) - // Attempt to connect more sockets - const socket6 = await assertConnectedSocket(6) - const socket7 = await assertConnectedSocket(7) - await assertServerConnections(3) - // Limit reached, server should be closed here - await assertRefusedSocket(8) - - expect(socket3.destroyed).equals(false, 'socket3 must not destroyed') - expect(socket6.destroyed).equals(false, 'socket6 must not destroyed') - expect(socket7.destroyed).equals(false, 'socket7 must not destroyed') - }) -})