diff --git a/lib/p2p/Peer.ts b/lib/p2p/Peer.ts index 403ea724c..34e1f8ce4 100644 --- a/lib/p2p/Peer.ts +++ b/lib/p2p/Peer.ts @@ -334,7 +334,7 @@ class Peer extends EventEmitter { /** * Close a peer by ensuring the socket is destroyed and terminating all timers. */ - public close = async (reason?: DisconnectionReason, reasonPayload?: string): Promise => { + public close = (reason?: DisconnectionReason, reasonPayload?: string) => { if (this.status === PeerStatus.Closed) { return; } @@ -343,17 +343,33 @@ class Peer extends EventEmitter { this.revokeConnectionRetries(); if (this.socket) { - if (!this.socket.destroyed) { - if (reason !== undefined) { - this.logger.debug(`Peer ${this.label}: closing socket. reason: ${DisconnectionReason[reason]}`); - this.sentDisconnectionReason = reason; - await this.sendPacket(new packets.DisconnectingPacket({ reason, payload: reasonPayload })); - } - + if (this.socket.destroyed) { + // the socket has already been destroyed, it cannot be used to send further packets + // and does not need further destruction. it can simply be deleted from memory + delete this.socket; + } else if (this.socket.connecting || reason === undefined) { + // either we are still connecting (socket.connect has been called but not finished) the socket and + // thus we cannot transmit packets to the peer yet, or we have not specified a disconnection reason + // either way, we don't need to send a disconnecting packet and can destroy the socket right away this.socket.destroy(); this.socket.removeAllListeners(); + delete this.socket; + } else { + // try to send the peer a disconnecting reason packet before destroying the socket + this.logger.debug(`Peer ${this.label}: closing socket. reason: ${DisconnectionReason[reason]}`); + this.sentDisconnectionReason = reason; + + // we send the disconnection packet as a "best effort" and don't wait for the data to be written to the socket + // in case the socket is dead or stalled, we don't want the delay to hold up the flow of closing this peer + this.sendPacket(new packets.DisconnectingPacket({ reason, payload: reasonPayload })) + .finally(() => { + // destroy the socket after sending the disconnecting packet (if any) + this.socket?.destroy(); + this.socket?.removeAllListeners(); + delete this.socket; + }) + .catch(this.logger.error); } - delete this.socket; } if (this.retryConnectionTimer) { @@ -389,7 +405,7 @@ class Peer extends EventEmitter { DisconnectionReason[this.recvDisconnectionReason] }`; } else { - rejectionMsg = `Peer ${this.label} was destroyed`; + rejectionMsg = `Peer ${this.label} was closed`; } for (const [packetType, entry] of this.responseMap) { @@ -592,25 +608,25 @@ class Peer extends EventEmitter { resolve(); }; - const onError = async (err: Error) => { + const onError = (err: Error) => { cleanup(); this.emit('connFailure'); if (!retry) { - await this.close(); + this.close(); reject(errors.COULD_NOT_CONNECT(this.address, err)); return; } if (Date.now() - startTime + retryDelay > Peer.CONNECTION_RETRIES_MAX_PERIOD) { - await this.close(); + this.close(); reject(errors.CONNECTION_RETRIES_MAX_PERIOD_EXCEEDED); return; } if (this.connectionRetriesRevoked) { - await this.close(); + this.close(); reject(errors.CONNECTION_RETRIES_REVOKED); return; } @@ -675,7 +691,7 @@ class Peer extends EventEmitter { /** * Potentially timeout peer if it hasn't responded. */ - private checkTimeout = async () => { + private checkTimeout = () => { const now = ms(); for (const [packetId, entry] of this.responseMap) { @@ -684,7 +700,7 @@ class Peer extends EventEmitter { const err = errors.RESPONSE_TIMEOUT(request); this.logger.error(`Peer timed out waiting for response to packet ${packetId}`); entry.reject(err); - await this.close(DisconnectionReason.ResponseStalling, packetId); + this.close(DisconnectionReason.ResponseStalling, packetId); } } }; @@ -770,7 +786,7 @@ class Peer extends EventEmitter { } else { this.logger.info(`Peer ${this.label} socket closed`); } - await this.close(); + this.close(); }); this.socket.on('data', this.parser.feed); @@ -797,10 +813,10 @@ class Peer extends EventEmitter { case errorCodes.FRAMER_INVALID_MSG_LENGTH: this.logger.warn(`Peer (${this.label}): ${err.message}`); this.emit('reputation', ReputationEvent.WireProtocolErr); - await this.close(DisconnectionReason.WireProtocolErr, err.message); + this.close(DisconnectionReason.WireProtocolErr, err.message); break; default: - await this.close(); + this.close(); break; } }); @@ -864,7 +880,7 @@ class Peer extends EventEmitter { * @param nodePubKey our node pub key * @param expectedNodePubKey the expected node pub key of the sender of the init packet */ - private authenticateSessionInit = async ( + private authenticateSessionInit = ( packet: packets.SessionInitPacket, nodePubKey: string, expectedNodePubKey?: string, @@ -878,14 +894,14 @@ class Peer extends EventEmitter { // verify that the init packet came from the expected node if (expectedNodePubKey && expectedNodePubKey !== sourceNodePubKey) { - await this.close(DisconnectionReason.UnexpectedIdentity); + this.close(DisconnectionReason.UnexpectedIdentity); throw errors.UNEXPECTED_NODE_PUB_KEY(sourceNodePubKey, expectedNodePubKey, addressUtils.toString(this.address)); } // verify that the init packet was intended for us if (targetNodePubKey !== nodePubKey) { this.emit('reputation', ReputationEvent.InvalidAuth); - await this.close(DisconnectionReason.AuthFailureInvalidTarget); + this.close(DisconnectionReason.AuthFailureInvalidTarget); throw errors.AUTH_FAILURE_INVALID_TARGET(sourceNodePubKey, targetNodePubKey); } @@ -896,7 +912,7 @@ class Peer extends EventEmitter { if (!verified) { this.emit('reputation', ReputationEvent.InvalidAuth); - await this.close(DisconnectionReason.AuthFailureInvalidSignature); + this.close(DisconnectionReason.AuthFailureInvalidSignature); throw errors.AUTH_FAILURE_INVALID_SIGNATURE(sourceNodePubKey); } @@ -973,11 +989,11 @@ class Peer extends EventEmitter { assert(this.expectedNodePubKey); await this.initSession(ownNodeState, ownNodeKey, ownVersion, this.expectedNodePubKey); sessionInit = await this.waitSessionInit(); - await this.authenticateSessionInit(sessionInit, ownNodeKey.pubKey, this.expectedNodePubKey); + this.authenticateSessionInit(sessionInit, ownNodeKey.pubKey, this.expectedNodePubKey); } else { // inbound handshake sessionInit = await this.waitSessionInit(); - await this.authenticateSessionInit(sessionInit, ownNodeKey.pubKey); + this.authenticateSessionInit(sessionInit, ownNodeKey.pubKey); } return sessionInit; }; diff --git a/lib/p2p/Pool.ts b/lib/p2p/Pool.ts index a272c145f..dc8aa50db 100644 --- a/lib/p2p/Pool.ts +++ b/lib/p2p/Pool.ts @@ -202,9 +202,10 @@ class Pool extends EventEmitter { this.bindNodeList(); - this.loadingNodesPromise = this.nodes.load(); - this.loadingNodesPromise + this.loadingNodesPromise = this.nodes + .load() .then(async () => { + // check to make sure we're not already in the process of disconnecting if (this.disconnecting) { this.loadingNodesPromise = undefined; return; @@ -222,6 +223,12 @@ class Pool extends EventEmitter { }, Pool.SECONDARY_PEERS_DELAY); } + // check again to make sure we're not in the process of disconnecting + if (this.disconnecting) { + this.loadingNodesPromise = undefined; + return; + } + if (primary.length > 0) { this.logger.info('Connecting to known / previously connected peers'); await this.connectNodes(primary, true, true); @@ -326,17 +333,17 @@ class Pool extends EventEmitter { if (this.loadingNodesPromise) { await this.loadingNodesPromise; } - await Promise.all([ - this.unlisten(), - this.closePendingConnections(DisconnectionReason.Shutdown), - this.closePeers(DisconnectionReason.Shutdown), - ]); + await this.unlisten(); + + this.closePendingConnections(DisconnectionReason.Shutdown); + this.closePeers(DisconnectionReason.Shutdown); + this.connected = false; this.disconnecting = false; }; private bindNodeList = () => { - this.nodes.on('node.ban', async (nodePubKey: string, events: ReputationEvent[]) => { + this.nodes.on('node.ban', (nodePubKey: string, events: ReputationEvent[]) => { const banReasonText = events[0] !== ReputationEvent.ManualBan && ReputationEvent[events[0]] ? `due to ${ReputationEvent[events[0]]}` @@ -345,7 +352,7 @@ class Pool extends EventEmitter { const peer = this.peers.get(nodePubKey); if (peer) { - await peer.close(DisconnectionReason.Banned, JSON.stringify(events)); + peer.close(DisconnectionReason.Banned, JSON.stringify(events)); } }); }; @@ -367,7 +374,7 @@ class Pool extends EventEmitter { }); this.pendingOutboundPeers.delete(this.nodePubKey); - await peer.close(); + peer.close(); assert.fail(); } catch (err) { if ( @@ -599,7 +606,7 @@ class Pool extends EventEmitter { torport: this.config.torport, }); - await this.validatePeer(peer); + this.validatePeer(peer); await peer.completeOpen(this.nodeState, this.nodeKey, this.version, sessionInit); } catch (err) { @@ -629,7 +636,7 @@ class Pool extends EventEmitter { // the already established connection is closed. if (this.peers.has(peerPubKey)) { if (this.nodePubKey > peerPubKey) { - await peer.close(DisconnectionReason.AlreadyConnected); + peer.close(DisconnectionReason.AlreadyConnected); throw errors.NODE_ALREADY_CONNECTED(peerPubKey, peer.address); } else { const stillConnected = await new Promise((resolve) => { @@ -640,7 +647,7 @@ class Pool extends EventEmitter { }); }); if (stillConnected) { - await peer.close(DisconnectionReason.AlreadyConnected); + peer.close(DisconnectionReason.AlreadyConnected); throw errors.NODE_ALREADY_CONNECTED(peerPubKey, peer.address); } } @@ -697,10 +704,10 @@ class Pool extends EventEmitter { } }; - public closePeer = async (nodePubKey: string, reason?: DisconnectionReason, reasonPayload?: string) => { + public closePeer = (nodePubKey: string, reason?: DisconnectionReason, reasonPayload?: string) => { const peer = this.peers.get(nodePubKey); if (peer) { - await peer.close(reason, reasonPayload); + peer.close(reason, reasonPayload); this.logger.info(`Disconnected from ${peer.nodePubKey}@${addressUtils.toString(peer.address)} (${peer.alias})`); } else { throw errors.NOT_CONNECTED(nodePubKey); @@ -964,42 +971,42 @@ class Pool extends EventEmitter { }; /** Validates a peer. If a check fails, closes the peer and throws a p2p error. */ - private validatePeer = async (peer: Peer): Promise => { + private validatePeer = (peer: Peer) => { assert(peer.nodePubKey); const peerPubKey = peer.nodePubKey; if (peerPubKey === this.nodePubKey) { - await peer.close(DisconnectionReason.ConnectedToSelf); + peer.close(DisconnectionReason.ConnectedToSelf); throw errors.ATTEMPTED_CONNECTION_TO_SELF; } // Check if version is semantic, and higher than minCompatibleVersion. if (!semver.valid(peer.version)) { - await peer.close(DisconnectionReason.MalformedVersion); + peer.close(DisconnectionReason.MalformedVersion); throw errors.MALFORMED_VERSION(addressUtils.toString(peer.address), peer.version); } // dev.note: compare returns 0 if v1 == v2, or 1 if v1 is greater, or -1 if v2 is greater. if (semver.compare(peer.version, this.minCompatibleVersion) === -1) { - await peer.close(DisconnectionReason.IncompatibleProtocolVersion); + peer.close(DisconnectionReason.IncompatibleProtocolVersion); throw errors.INCOMPATIBLE_VERSION(addressUtils.toString(peer.address), this.minCompatibleVersion, peer.version); } if (!this.connected) { // if we have disconnected the pool, don't allow any new connections to open - await peer.close(DisconnectionReason.NotAcceptingConnections); + peer.close(DisconnectionReason.NotAcceptingConnections); throw errors.POOL_CLOSED; } if (this.nodes.isBanned(peerPubKey)) { // TODO: Ban IP address for this session if banned peer attempts repeated connections. - await peer.close(DisconnectionReason.Banned); + peer.close(DisconnectionReason.Banned); throw errors.NODE_IS_BANNED(peerPubKey); } if (this.peers.has(peerPubKey)) { // TODO: Penalize peers that attempt to create duplicate connections to us more than once. // The first time might be due to connection retries. - await peer.close(DisconnectionReason.AlreadyConnected); + peer.close(DisconnectionReason.AlreadyConnected); throw errors.NODE_ALREADY_CONNECTED(peerPubKey, peer.address); } @@ -1069,7 +1076,10 @@ class Pool extends EventEmitter { }); peer.on('connFailure', async () => { - await this.nodes.incrementConsecutiveConnFailures(peer.expectedNodePubKey!); + if (this.connected && !this.disconnecting) { + // don't penalize nodes for connection failures due to us closing the pool + await this.nodes.incrementConsecutiveConnFailures(peer.expectedNodePubKey!); + } }); peer.on('connect', async () => { @@ -1117,22 +1127,18 @@ class Pool extends EventEmitter { }; private closePeers = (reason?: DisconnectionReason) => { - const closePromises = []; for (const peer of this.peers.values()) { - closePromises.push(peer.close(reason)); + peer.close(reason); } - return Promise.all(closePromises); }; private closePendingConnections = (reason?: DisconnectionReason) => { - const closePromises = []; for (const peer of this.pendingOutboundPeers.values()) { - closePromises.push(peer.close(reason)); + peer.close(reason); } for (const peer of this.pendingInboundPeers) { - closePromises.push(peer.close(reason)); + peer.close(reason); } - return Promise.all(closePromises); }; /** diff --git a/test/integration/Pool.spec.ts b/test/integration/Pool.spec.ts index 51ddd57b9..06b2a8222 100644 --- a/test/integration/Pool.spec.ts +++ b/test/integration/Pool.spec.ts @@ -15,12 +15,29 @@ import addressUtils from '../../lib/utils/addressUtils'; chai.use(chaiAsPromised); describe('P2P Pool Tests', async () => { - let db: DB; - let pool: Pool; - let nodeKeyOne: NodeKey; const loggers = Logger.createLoggers(Level.Warn); const sandbox = sinon.createSandbox(); + const nodeKeyOne = await NodeKey['generate'](); + const nodeKeyTwo = await NodeKey['generate'](); + + const config = new Config(); + config.p2p.listen = false; + config.p2p.discover = false; + const db = new DB(loggers.db); + await db.init(); + + const pool = new Pool({ + config: config.p2p, + xuNetwork: XuNetwork.SimNet, + logger: loggers.p2p, + models: db.models, + nodeKey: nodeKeyTwo, + version: '1.0.0', + }); + + await pool.init(); + const createPeer = (nodePubKey: string, addresses: Address[]) => { const peer = sandbox.createStubInstance(Peer) as any; peer.beginOpen = () => { @@ -35,37 +52,15 @@ describe('P2P Pool Tests', async () => { peer.completeOpen = () => {}; peer.socket = {}; peer.sendPacket = () => {}; - peer.close = async () => { + peer.close = () => { peer.sentDisconnectionReason = DisconnectionReason.NotAcceptingConnections; - await pool['handlePeerClose'](peer); + pool['handlePeerClose'](peer); }; pool['bindPeer'](peer); return peer; }; - before(async () => { - nodeKeyOne = await NodeKey['generate'](); - const nodeKeyTwo = await NodeKey['generate'](); - - const config = new Config(); - config.p2p.listen = false; - config.p2p.discover = false; - db = new DB(loggers.db); - await db.init(); - - pool = new Pool({ - config: config.p2p, - xuNetwork: XuNetwork.SimNet, - logger: loggers.p2p, - models: db.models, - nodeKey: nodeKeyTwo, - version: '1.0.0', - }); - - await pool.init(); - }); - it('should open a connection with a peer', async () => { const addresses = [{ host: '123.123.123.123', port: 8885 }]; const peer = createPeer(nodeKeyOne.pubKey, addresses); @@ -74,8 +69,8 @@ describe('P2P Pool Tests', async () => { await Promise.all([openPromise, new Promise((resolve) => pool.on('peer.active', resolve))]); }); - it('should close a peer', async () => { - await pool.closePeer(nodeKeyOne.pubKey, DisconnectionReason.NotAcceptingConnections); + it('should close a peer', () => { + pool.closePeer(nodeKeyOne.pubKey, DisconnectionReason.NotAcceptingConnections); expect(pool['peers'].has(nodeKeyOne.pubKey)).to.be.false; expect(pool['peers'].size).to.equal(0); }); @@ -100,7 +95,7 @@ describe('P2P Pool Tests', async () => { expect(nodeInstance!.addresses!).to.have.length(1); expect(nodeInstance!.addresses![0].host).to.equal(addresses[0].host); - await pool.closePeer(nodeKeyOne.pubKey, DisconnectionReason.NotAcceptingConnections); + pool.closePeer(nodeKeyOne.pubKey, DisconnectionReason.NotAcceptingConnections); }); describe('reconnect logic', () => { @@ -136,8 +131,8 @@ describe('P2P Pool Tests', async () => { }); after(async () => { - await db.close(); await pool.disconnect(); + await db.close(); sandbox.restore(); });