From 3b45d732e1a01e361a43200701477d96a380b012 Mon Sep 17 00:00:00 2001 From: Daniel McNally Date: Tue, 5 Jan 2021 01:39:15 -0500 Subject: [PATCH] refactor(p2p): better pool closing This makes several improvements to the way we close the p2p pool. 1. Make the peer `close` function synchronous by not awaiting the disconnection reason packet being sent. Previously we would wait and hold up the closing flow, which could introduce delays on stale or dead sockets. It also meant there was different behavior for closing peers with or without a disconnection reason. Instead we continue with all the steps for closing a peer and destroy the socket whenever the socket write attempt is complete. 2. Immediately destroy the socket if it is in `connecting` state, don't even attempt to send a disconnection reason packet since it would not work anyway. 3. Don't increment the connection failure attempt counter for peers when/after we close the pool and thus are canceling any ongoing connection attempts. This goes against the spirit of the connection failure counter and also means that he pool may be making db writes after it is closed (and potentially after the database has closed). Then in the Pool test suite, this flips the order of the db close and the pool close so that the pool is second, which makes sense since it depends on an open, functioning database. It also moves the logic in the `before` block up to the `describe` block, the `before` block wasn't properly completing before the test cases began running. See https://mochajs.org/#asynchronous-code. --- lib/p2p/Peer.ts | 66 ++++++++++++++++++++++------------- lib/p2p/Pool.ts | 66 +++++++++++++++++++---------------- test/integration/Pool.spec.ts | 57 ++++++++++++++---------------- 3 files changed, 103 insertions(+), 86 deletions(-) diff --git a/lib/p2p/Peer.ts b/lib/p2p/Peer.ts index 403ea724c..34e1f8ce4 100644 --- a/lib/p2p/Peer.ts +++ b/lib/p2p/Peer.ts @@ -334,7 +334,7 @@ class Peer extends EventEmitter { /** * Close a peer by ensuring the socket is destroyed and terminating all timers. */ - public close = async (reason?: DisconnectionReason, reasonPayload?: string): Promise => { + public close = (reason?: DisconnectionReason, reasonPayload?: string) => { if (this.status === PeerStatus.Closed) { return; } @@ -343,17 +343,33 @@ class Peer extends EventEmitter { this.revokeConnectionRetries(); if (this.socket) { - if (!this.socket.destroyed) { - if (reason !== undefined) { - this.logger.debug(`Peer ${this.label}: closing socket. reason: ${DisconnectionReason[reason]}`); - this.sentDisconnectionReason = reason; - await this.sendPacket(new packets.DisconnectingPacket({ reason, payload: reasonPayload })); - } - + if (this.socket.destroyed) { + // the socket has already been destroyed, it cannot be used to send further packets + // and does not need further destruction. it can simply be deleted from memory + delete this.socket; + } else if (this.socket.connecting || reason === undefined) { + // either we are still connecting (socket.connect has been called but not finished) the socket and + // thus we cannot transmit packets to the peer yet, or we have not specified a disconnection reason + // either way, we don't need to send a disconnecting packet and can destroy the socket right away this.socket.destroy(); this.socket.removeAllListeners(); + delete this.socket; + } else { + // try to send the peer a disconnecting reason packet before destroying the socket + this.logger.debug(`Peer ${this.label}: closing socket. reason: ${DisconnectionReason[reason]}`); + this.sentDisconnectionReason = reason; + + // we send the disconnection packet as a "best effort" and don't wait for the data to be written to the socket + // in case the socket is dead or stalled, we don't want the delay to hold up the flow of closing this peer + this.sendPacket(new packets.DisconnectingPacket({ reason, payload: reasonPayload })) + .finally(() => { + // destroy the socket after sending the disconnecting packet (if any) + this.socket?.destroy(); + this.socket?.removeAllListeners(); + delete this.socket; + }) + .catch(this.logger.error); } - delete this.socket; } if (this.retryConnectionTimer) { @@ -389,7 +405,7 @@ class Peer extends EventEmitter { DisconnectionReason[this.recvDisconnectionReason] }`; } else { - rejectionMsg = `Peer ${this.label} was destroyed`; + rejectionMsg = `Peer ${this.label} was closed`; } for (const [packetType, entry] of this.responseMap) { @@ -592,25 +608,25 @@ class Peer extends EventEmitter { resolve(); }; - const onError = async (err: Error) => { + const onError = (err: Error) => { cleanup(); this.emit('connFailure'); if (!retry) { - await this.close(); + this.close(); reject(errors.COULD_NOT_CONNECT(this.address, err)); return; } if (Date.now() - startTime + retryDelay > Peer.CONNECTION_RETRIES_MAX_PERIOD) { - await this.close(); + this.close(); reject(errors.CONNECTION_RETRIES_MAX_PERIOD_EXCEEDED); return; } if (this.connectionRetriesRevoked) { - await this.close(); + this.close(); reject(errors.CONNECTION_RETRIES_REVOKED); return; } @@ -675,7 +691,7 @@ class Peer extends EventEmitter { /** * Potentially timeout peer if it hasn't responded. */ - private checkTimeout = async () => { + private checkTimeout = () => { const now = ms(); for (const [packetId, entry] of this.responseMap) { @@ -684,7 +700,7 @@ class Peer extends EventEmitter { const err = errors.RESPONSE_TIMEOUT(request); this.logger.error(`Peer timed out waiting for response to packet ${packetId}`); entry.reject(err); - await this.close(DisconnectionReason.ResponseStalling, packetId); + this.close(DisconnectionReason.ResponseStalling, packetId); } } }; @@ -770,7 +786,7 @@ class Peer extends EventEmitter { } else { this.logger.info(`Peer ${this.label} socket closed`); } - await this.close(); + this.close(); }); this.socket.on('data', this.parser.feed); @@ -797,10 +813,10 @@ class Peer extends EventEmitter { case errorCodes.FRAMER_INVALID_MSG_LENGTH: this.logger.warn(`Peer (${this.label}): ${err.message}`); this.emit('reputation', ReputationEvent.WireProtocolErr); - await this.close(DisconnectionReason.WireProtocolErr, err.message); + this.close(DisconnectionReason.WireProtocolErr, err.message); break; default: - await this.close(); + this.close(); break; } }); @@ -864,7 +880,7 @@ class Peer extends EventEmitter { * @param nodePubKey our node pub key * @param expectedNodePubKey the expected node pub key of the sender of the init packet */ - private authenticateSessionInit = async ( + private authenticateSessionInit = ( packet: packets.SessionInitPacket, nodePubKey: string, expectedNodePubKey?: string, @@ -878,14 +894,14 @@ class Peer extends EventEmitter { // verify that the init packet came from the expected node if (expectedNodePubKey && expectedNodePubKey !== sourceNodePubKey) { - await this.close(DisconnectionReason.UnexpectedIdentity); + this.close(DisconnectionReason.UnexpectedIdentity); throw errors.UNEXPECTED_NODE_PUB_KEY(sourceNodePubKey, expectedNodePubKey, addressUtils.toString(this.address)); } // verify that the init packet was intended for us if (targetNodePubKey !== nodePubKey) { this.emit('reputation', ReputationEvent.InvalidAuth); - await this.close(DisconnectionReason.AuthFailureInvalidTarget); + this.close(DisconnectionReason.AuthFailureInvalidTarget); throw errors.AUTH_FAILURE_INVALID_TARGET(sourceNodePubKey, targetNodePubKey); } @@ -896,7 +912,7 @@ class Peer extends EventEmitter { if (!verified) { this.emit('reputation', ReputationEvent.InvalidAuth); - await this.close(DisconnectionReason.AuthFailureInvalidSignature); + this.close(DisconnectionReason.AuthFailureInvalidSignature); throw errors.AUTH_FAILURE_INVALID_SIGNATURE(sourceNodePubKey); } @@ -973,11 +989,11 @@ class Peer extends EventEmitter { assert(this.expectedNodePubKey); await this.initSession(ownNodeState, ownNodeKey, ownVersion, this.expectedNodePubKey); sessionInit = await this.waitSessionInit(); - await this.authenticateSessionInit(sessionInit, ownNodeKey.pubKey, this.expectedNodePubKey); + this.authenticateSessionInit(sessionInit, ownNodeKey.pubKey, this.expectedNodePubKey); } else { // inbound handshake sessionInit = await this.waitSessionInit(); - await this.authenticateSessionInit(sessionInit, ownNodeKey.pubKey); + this.authenticateSessionInit(sessionInit, ownNodeKey.pubKey); } return sessionInit; }; diff --git a/lib/p2p/Pool.ts b/lib/p2p/Pool.ts index a272c145f..dc8aa50db 100644 --- a/lib/p2p/Pool.ts +++ b/lib/p2p/Pool.ts @@ -202,9 +202,10 @@ class Pool extends EventEmitter { this.bindNodeList(); - this.loadingNodesPromise = this.nodes.load(); - this.loadingNodesPromise + this.loadingNodesPromise = this.nodes + .load() .then(async () => { + // check to make sure we're not already in the process of disconnecting if (this.disconnecting) { this.loadingNodesPromise = undefined; return; @@ -222,6 +223,12 @@ class Pool extends EventEmitter { }, Pool.SECONDARY_PEERS_DELAY); } + // check again to make sure we're not in the process of disconnecting + if (this.disconnecting) { + this.loadingNodesPromise = undefined; + return; + } + if (primary.length > 0) { this.logger.info('Connecting to known / previously connected peers'); await this.connectNodes(primary, true, true); @@ -326,17 +333,17 @@ class Pool extends EventEmitter { if (this.loadingNodesPromise) { await this.loadingNodesPromise; } - await Promise.all([ - this.unlisten(), - this.closePendingConnections(DisconnectionReason.Shutdown), - this.closePeers(DisconnectionReason.Shutdown), - ]); + await this.unlisten(); + + this.closePendingConnections(DisconnectionReason.Shutdown); + this.closePeers(DisconnectionReason.Shutdown); + this.connected = false; this.disconnecting = false; }; private bindNodeList = () => { - this.nodes.on('node.ban', async (nodePubKey: string, events: ReputationEvent[]) => { + this.nodes.on('node.ban', (nodePubKey: string, events: ReputationEvent[]) => { const banReasonText = events[0] !== ReputationEvent.ManualBan && ReputationEvent[events[0]] ? `due to ${ReputationEvent[events[0]]}` @@ -345,7 +352,7 @@ class Pool extends EventEmitter { const peer = this.peers.get(nodePubKey); if (peer) { - await peer.close(DisconnectionReason.Banned, JSON.stringify(events)); + peer.close(DisconnectionReason.Banned, JSON.stringify(events)); } }); }; @@ -367,7 +374,7 @@ class Pool extends EventEmitter { }); this.pendingOutboundPeers.delete(this.nodePubKey); - await peer.close(); + peer.close(); assert.fail(); } catch (err) { if ( @@ -599,7 +606,7 @@ class Pool extends EventEmitter { torport: this.config.torport, }); - await this.validatePeer(peer); + this.validatePeer(peer); await peer.completeOpen(this.nodeState, this.nodeKey, this.version, sessionInit); } catch (err) { @@ -629,7 +636,7 @@ class Pool extends EventEmitter { // the already established connection is closed. if (this.peers.has(peerPubKey)) { if (this.nodePubKey > peerPubKey) { - await peer.close(DisconnectionReason.AlreadyConnected); + peer.close(DisconnectionReason.AlreadyConnected); throw errors.NODE_ALREADY_CONNECTED(peerPubKey, peer.address); } else { const stillConnected = await new Promise((resolve) => { @@ -640,7 +647,7 @@ class Pool extends EventEmitter { }); }); if (stillConnected) { - await peer.close(DisconnectionReason.AlreadyConnected); + peer.close(DisconnectionReason.AlreadyConnected); throw errors.NODE_ALREADY_CONNECTED(peerPubKey, peer.address); } } @@ -697,10 +704,10 @@ class Pool extends EventEmitter { } }; - public closePeer = async (nodePubKey: string, reason?: DisconnectionReason, reasonPayload?: string) => { + public closePeer = (nodePubKey: string, reason?: DisconnectionReason, reasonPayload?: string) => { const peer = this.peers.get(nodePubKey); if (peer) { - await peer.close(reason, reasonPayload); + peer.close(reason, reasonPayload); this.logger.info(`Disconnected from ${peer.nodePubKey}@${addressUtils.toString(peer.address)} (${peer.alias})`); } else { throw errors.NOT_CONNECTED(nodePubKey); @@ -964,42 +971,42 @@ class Pool extends EventEmitter { }; /** Validates a peer. If a check fails, closes the peer and throws a p2p error. */ - private validatePeer = async (peer: Peer): Promise => { + private validatePeer = (peer: Peer) => { assert(peer.nodePubKey); const peerPubKey = peer.nodePubKey; if (peerPubKey === this.nodePubKey) { - await peer.close(DisconnectionReason.ConnectedToSelf); + peer.close(DisconnectionReason.ConnectedToSelf); throw errors.ATTEMPTED_CONNECTION_TO_SELF; } // Check if version is semantic, and higher than minCompatibleVersion. if (!semver.valid(peer.version)) { - await peer.close(DisconnectionReason.MalformedVersion); + peer.close(DisconnectionReason.MalformedVersion); throw errors.MALFORMED_VERSION(addressUtils.toString(peer.address), peer.version); } // dev.note: compare returns 0 if v1 == v2, or 1 if v1 is greater, or -1 if v2 is greater. if (semver.compare(peer.version, this.minCompatibleVersion) === -1) { - await peer.close(DisconnectionReason.IncompatibleProtocolVersion); + peer.close(DisconnectionReason.IncompatibleProtocolVersion); throw errors.INCOMPATIBLE_VERSION(addressUtils.toString(peer.address), this.minCompatibleVersion, peer.version); } if (!this.connected) { // if we have disconnected the pool, don't allow any new connections to open - await peer.close(DisconnectionReason.NotAcceptingConnections); + peer.close(DisconnectionReason.NotAcceptingConnections); throw errors.POOL_CLOSED; } if (this.nodes.isBanned(peerPubKey)) { // TODO: Ban IP address for this session if banned peer attempts repeated connections. - await peer.close(DisconnectionReason.Banned); + peer.close(DisconnectionReason.Banned); throw errors.NODE_IS_BANNED(peerPubKey); } if (this.peers.has(peerPubKey)) { // TODO: Penalize peers that attempt to create duplicate connections to us more than once. // The first time might be due to connection retries. - await peer.close(DisconnectionReason.AlreadyConnected); + peer.close(DisconnectionReason.AlreadyConnected); throw errors.NODE_ALREADY_CONNECTED(peerPubKey, peer.address); } @@ -1069,7 +1076,10 @@ class Pool extends EventEmitter { }); peer.on('connFailure', async () => { - await this.nodes.incrementConsecutiveConnFailures(peer.expectedNodePubKey!); + if (this.connected && !this.disconnecting) { + // don't penalize nodes for connection failures due to us closing the pool + await this.nodes.incrementConsecutiveConnFailures(peer.expectedNodePubKey!); + } }); peer.on('connect', async () => { @@ -1117,22 +1127,18 @@ class Pool extends EventEmitter { }; private closePeers = (reason?: DisconnectionReason) => { - const closePromises = []; for (const peer of this.peers.values()) { - closePromises.push(peer.close(reason)); + peer.close(reason); } - return Promise.all(closePromises); }; private closePendingConnections = (reason?: DisconnectionReason) => { - const closePromises = []; for (const peer of this.pendingOutboundPeers.values()) { - closePromises.push(peer.close(reason)); + peer.close(reason); } for (const peer of this.pendingInboundPeers) { - closePromises.push(peer.close(reason)); + peer.close(reason); } - return Promise.all(closePromises); }; /** diff --git a/test/integration/Pool.spec.ts b/test/integration/Pool.spec.ts index 51ddd57b9..06b2a8222 100644 --- a/test/integration/Pool.spec.ts +++ b/test/integration/Pool.spec.ts @@ -15,12 +15,29 @@ import addressUtils from '../../lib/utils/addressUtils'; chai.use(chaiAsPromised); describe('P2P Pool Tests', async () => { - let db: DB; - let pool: Pool; - let nodeKeyOne: NodeKey; const loggers = Logger.createLoggers(Level.Warn); const sandbox = sinon.createSandbox(); + const nodeKeyOne = await NodeKey['generate'](); + const nodeKeyTwo = await NodeKey['generate'](); + + const config = new Config(); + config.p2p.listen = false; + config.p2p.discover = false; + const db = new DB(loggers.db); + await db.init(); + + const pool = new Pool({ + config: config.p2p, + xuNetwork: XuNetwork.SimNet, + logger: loggers.p2p, + models: db.models, + nodeKey: nodeKeyTwo, + version: '1.0.0', + }); + + await pool.init(); + const createPeer = (nodePubKey: string, addresses: Address[]) => { const peer = sandbox.createStubInstance(Peer) as any; peer.beginOpen = () => { @@ -35,37 +52,15 @@ describe('P2P Pool Tests', async () => { peer.completeOpen = () => {}; peer.socket = {}; peer.sendPacket = () => {}; - peer.close = async () => { + peer.close = () => { peer.sentDisconnectionReason = DisconnectionReason.NotAcceptingConnections; - await pool['handlePeerClose'](peer); + pool['handlePeerClose'](peer); }; pool['bindPeer'](peer); return peer; }; - before(async () => { - nodeKeyOne = await NodeKey['generate'](); - const nodeKeyTwo = await NodeKey['generate'](); - - const config = new Config(); - config.p2p.listen = false; - config.p2p.discover = false; - db = new DB(loggers.db); - await db.init(); - - pool = new Pool({ - config: config.p2p, - xuNetwork: XuNetwork.SimNet, - logger: loggers.p2p, - models: db.models, - nodeKey: nodeKeyTwo, - version: '1.0.0', - }); - - await pool.init(); - }); - it('should open a connection with a peer', async () => { const addresses = [{ host: '123.123.123.123', port: 8885 }]; const peer = createPeer(nodeKeyOne.pubKey, addresses); @@ -74,8 +69,8 @@ describe('P2P Pool Tests', async () => { await Promise.all([openPromise, new Promise((resolve) => pool.on('peer.active', resolve))]); }); - it('should close a peer', async () => { - await pool.closePeer(nodeKeyOne.pubKey, DisconnectionReason.NotAcceptingConnections); + it('should close a peer', () => { + pool.closePeer(nodeKeyOne.pubKey, DisconnectionReason.NotAcceptingConnections); expect(pool['peers'].has(nodeKeyOne.pubKey)).to.be.false; expect(pool['peers'].size).to.equal(0); }); @@ -100,7 +95,7 @@ describe('P2P Pool Tests', async () => { expect(nodeInstance!.addresses!).to.have.length(1); expect(nodeInstance!.addresses![0].host).to.equal(addresses[0].host); - await pool.closePeer(nodeKeyOne.pubKey, DisconnectionReason.NotAcceptingConnections); + pool.closePeer(nodeKeyOne.pubKey, DisconnectionReason.NotAcceptingConnections); }); describe('reconnect logic', () => { @@ -136,8 +131,8 @@ describe('P2P Pool Tests', async () => { }); after(async () => { - await db.close(); await pool.disconnect(); + await db.close(); sandbox.restore(); });