Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: ensure all listeners are properly closed on tcp shutdown #2058

Merged
merged 14 commits into from
Oct 1, 2023
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
97 changes: 66 additions & 31 deletions packages/transport-tcp/src/listener.ts
Original file line number Diff line number Diff line change
Expand Up @@ -46,17 +46,25 @@ interface Context extends TCPCreateListenerOptions {
closeServerOnMaxConnections?: CloseServerOnMaxConnectionsOpts
}

const SERVER_STATUS_UP = 1
const SERVER_STATUS_DOWN = 0

export interface TCPListenerMetrics {
status: MetricGroup
errors: CounterGroup
events: CounterGroup
}

type Status = { started: false } | {
started: true
enum TCPListenerStatusCode {
/**
* When server object is initialized but we don't know the listening address yet or
* the server object is stopped manually, can be resumed only by calling listen()
**/
DOWN = 0,
maschad marked this conversation as resolved.
Show resolved Hide resolved
UP = 1,
/* During the connection limits */
PAUSED = 2,
}

type Status = { code: TCPListenerStatusCode.DOWN } | {
code: Exclude<TCPListenerStatusCode, TCPListenerStatusCode.DOWN>
listeningAddr: Multiaddr
peerId: string | null
netConfig: NetConfig
Expand All @@ -66,7 +74,7 @@ export class TCPListener extends EventEmitter<ListenerEvents> implements Listene
private readonly server: net.Server
/** Keep track of open connections to destroy in case of timeout */
private readonly connections = new Set<MultiaddrConnection>()
private status: Status = { started: false }
private status: Status = { code: TCPListenerStatusCode.DOWN }
private metrics?: TCPListenerMetrics
private addr: string

Expand Down Expand Up @@ -133,7 +141,7 @@ export class TCPListener extends EventEmitter<ListenerEvents> implements Listene
}

this.metrics?.status.update({
[this.addr]: SERVER_STATUS_UP
[this.addr]: TCPListenerStatusCode.UP
})
}

Expand All @@ -145,13 +153,22 @@ export class TCPListener extends EventEmitter<ListenerEvents> implements Listene
})
.on('close', () => {
this.metrics?.status.update({
[this.addr]: SERVER_STATUS_DOWN
[this.addr]: this.status.code
})
this.dispatchEvent(new CustomEvent('close'))

// If this event is emitted, the transport manager will remove the listener from it's cache
// in the meanwhile if the connections are dropped then listener will start listening again
// and the transport manager will not be able to close the server
if(this.status.code !== TCPListenerStatusCode.PAUSED){
this.dispatchEvent(new CustomEvent('close'))
}
})
}

private onSocket (socket: net.Socket): void {
if(this.status.code === TCPListenerStatusCode.DOWN) {
throw new Error('Server is is not listening yet')
maschad marked this conversation as resolved.
Show resolved Hide resolved
}
// Avoid uncaught errors caused by unstable connections
socket.on('error', err => {
log('socket error', err)
Expand All @@ -161,7 +178,7 @@ export class TCPListener extends EventEmitter<ListenerEvents> implements Listene
let maConn: MultiaddrConnection
try {
maConn = toMultiaddrConnection(socket, {
listeningAddr: this.status.started ? this.status.listeningAddr : undefined,
listeningAddr: this.status.code ? this.status.listeningAddr : undefined,
socketInactivityTimeout: this.context.socketInactivityTimeout,
socketCloseTimeout: this.context.socketCloseTimeout,
metrics: this.metrics?.events,
Expand Down Expand Up @@ -191,7 +208,7 @@ export class TCPListener extends EventEmitter<ListenerEvents> implements Listene
// another process during the time the server if closed. In that case there's not much
// we can do. netListen() will be called again every time a connection is dropped, which
maschad marked this conversation as resolved.
Show resolved Hide resolved
// acts as an eventual retry mechanism. onListenError allows the consumer act on this.
this.netListen().catch(e => {
this.resume().catch(e => {
log.error('error attempting to listen server once connection count under limit', e)
this.context.closeServerOnMaxConnections?.onListenError?.(e as Error)
})
Expand All @@ -206,7 +223,9 @@ export class TCPListener extends EventEmitter<ListenerEvents> implements Listene
this.context.closeServerOnMaxConnections != null &&
this.connections.size >= this.context.closeServerOnMaxConnections.closeAbove
) {
this.netClose()
this.pause(false).catch(e => {
log.error('error attempting to close server once connection count over limit', e)
})
}

this.dispatchEvent(new CustomEvent<Connection>('connection', { detail: conn }))
Expand All @@ -232,7 +251,7 @@ export class TCPListener extends EventEmitter<ListenerEvents> implements Listene
}

getAddrs (): Multiaddr[] {
if (!this.status.started) {
if (this.status.code === TCPListenerStatusCode.DOWN) {
return []
}

Expand Down Expand Up @@ -264,35 +283,42 @@ export class TCPListener extends EventEmitter<ListenerEvents> implements Listene
}

async listen (ma: Multiaddr): Promise<void> {
if (this.status.started) {
if (this.status.code === TCPListenerStatusCode.UP || this.status.code === TCPListenerStatusCode.PAUSED ) {
throw Error('server is already listening')
}

const peerId = ma.getPeerId()
const listeningAddr = peerId == null ? ma.decapsulateCode(CODE_P2P) : ma
const { backlog } = this.context

this.status = {
started: true,
listeningAddr,
peerId,
netConfig: multiaddrToNetConfig(listeningAddr, { backlog })
try {
this.status = {
code: TCPListenerStatusCode.UP,
listeningAddr,
peerId,
netConfig: multiaddrToNetConfig(listeningAddr, { backlog })
}

await this.resume()
} catch(err) {
this.status = {code: TCPListenerStatusCode.DOWN}
throw err
}

await this.netListen()
}

async close (): Promise<void> {
await Promise.all(
Array.from(this.connections.values()).map(async maConn => { await attemptClose(maConn) })
)

// netClose already checks if server.listening
this.netClose()
await this.pause(true)
maschad marked this conversation as resolved.
Show resolved Hide resolved
}

private async netListen (): Promise<void> {
if (!this.status.started || this.server.listening) {
/**
* Can resume a stopped or start an inert server
*/
private async resume (): Promise<void> {
if (this.server.listening || this.status.code === TCPListenerStatusCode.DOWN) {
return
}

Expand All @@ -304,11 +330,17 @@ export class TCPListener extends EventEmitter<ListenerEvents> implements Listene
this.server.listen(netConfig, resolve)
})

this.status = { ...this.status, code: TCPListenerStatusCode.UP }
log('Listening on %s', this.server.address())
}

private netClose (): void {
if (!this.status.started || !this.server.listening) {
private async pause (permanent: boolean): Promise<void> {
if(!this.server.listening && this.status.code === TCPListenerStatusCode.PAUSED && permanent) {
this.status = { code: TCPListenerStatusCode.DOWN }
return
}

if (!this.server.listening || this.status.code !== TCPListenerStatusCode.UP){
return
}

Expand All @@ -326,9 +358,12 @@ export class TCPListener extends EventEmitter<ListenerEvents> implements Listene
// Stops the server from accepting new connections and keeps existing connections.
// 'close' event is emitted only emitted when all connections are ended.
// The optional callback will be called once the 'close' event occurs.
//
// NOTE: Since we want to keep existing connections and have checked `!this.server.listening` it's not necessary
// to pass a callback to close.
this.server.close()

// We need to set this status before closing server, so other procedures are aware
// during the time the server is closing
this.status = permanent ? { code: TCPListenerStatusCode.DOWN } : { ...this.status, code: TCPListenerStatusCode.PAUSED }
await new Promise<void>((resolve, reject) => {
this.server.close( err => { err ? reject(err) : resolve() })
})
}
}
Loading
Loading