Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(p2p): better pool closing #2051

Merged
merged 1 commit into from
Jan 12, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 41 additions & 25 deletions lib/p2p/Peer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -334,7 +334,7 @@ class Peer extends EventEmitter {
/**
* Close a peer by ensuring the socket is destroyed and terminating all timers.
*/
public close = async (reason?: DisconnectionReason, reasonPayload?: string): Promise<void> => {
public close = (reason?: DisconnectionReason, reasonPayload?: string) => {
if (this.status === PeerStatus.Closed) {
return;
}
Expand All @@ -343,17 +343,33 @@ class Peer extends EventEmitter {
this.revokeConnectionRetries();

if (this.socket) {
if (!this.socket.destroyed) {
if (reason !== undefined) {
this.logger.debug(`Peer ${this.label}: closing socket. reason: ${DisconnectionReason[reason]}`);
this.sentDisconnectionReason = reason;
await this.sendPacket(new packets.DisconnectingPacket({ reason, payload: reasonPayload }));
}

if (this.socket.destroyed) {
// the socket has already been destroyed, it cannot be used to send further packets
// and does not need further destruction. it can simply be deleted from memory
delete this.socket;
} else if (this.socket.connecting || reason === undefined) {
// either we are still connecting (socket.connect has been called but not finished) the socket and
// thus we cannot transmit packets to the peer yet, or we have not specified a disconnection reason
// either way, we don't need to send a disconnecting packet and can destroy the socket right away
this.socket.destroy();
this.socket.removeAllListeners();
delete this.socket;
} else {
// try to send the peer a disconnecting reason packet before destroying the socket
this.logger.debug(`Peer ${this.label}: closing socket. reason: ${DisconnectionReason[reason]}`);
this.sentDisconnectionReason = reason;

// we send the disconnection packet as a "best effort" and don't wait for the data to be written to the socket
// in case the socket is dead or stalled, we don't want the delay to hold up the flow of closing this peer
this.sendPacket(new packets.DisconnectingPacket({ reason, payload: reasonPayload }))
.finally(() => {
// destroy the socket after sending the disconnecting packet (if any)
this.socket?.destroy();
this.socket?.removeAllListeners();
delete this.socket;
})
.catch(this.logger.error);
}
delete this.socket;
}

if (this.retryConnectionTimer) {
Expand Down Expand Up @@ -389,7 +405,7 @@ class Peer extends EventEmitter {
DisconnectionReason[this.recvDisconnectionReason]
}`;
} else {
rejectionMsg = `Peer ${this.label} was destroyed`;
rejectionMsg = `Peer ${this.label} was closed`;
}

for (const [packetType, entry] of this.responseMap) {
Expand Down Expand Up @@ -592,25 +608,25 @@ class Peer extends EventEmitter {
resolve();
};

const onError = async (err: Error) => {
const onError = (err: Error) => {
cleanup();

this.emit('connFailure');

if (!retry) {
await this.close();
this.close();
reject(errors.COULD_NOT_CONNECT(this.address, err));
return;
}

if (Date.now() - startTime + retryDelay > Peer.CONNECTION_RETRIES_MAX_PERIOD) {
await this.close();
this.close();
reject(errors.CONNECTION_RETRIES_MAX_PERIOD_EXCEEDED);
return;
}

if (this.connectionRetriesRevoked) {
await this.close();
this.close();
reject(errors.CONNECTION_RETRIES_REVOKED);
return;
}
Expand Down Expand Up @@ -675,7 +691,7 @@ class Peer extends EventEmitter {
/**
* Potentially timeout peer if it hasn't responded.
*/
private checkTimeout = async () => {
private checkTimeout = () => {
const now = ms();

for (const [packetId, entry] of this.responseMap) {
Expand All @@ -684,7 +700,7 @@ class Peer extends EventEmitter {
const err = errors.RESPONSE_TIMEOUT(request);
this.logger.error(`Peer timed out waiting for response to packet ${packetId}`);
entry.reject(err);
await this.close(DisconnectionReason.ResponseStalling, packetId);
this.close(DisconnectionReason.ResponseStalling, packetId);
}
}
};
Expand Down Expand Up @@ -770,7 +786,7 @@ class Peer extends EventEmitter {
} else {
this.logger.info(`Peer ${this.label} socket closed`);
}
await this.close();
this.close();
});

this.socket.on('data', this.parser.feed);
Expand All @@ -797,10 +813,10 @@ class Peer extends EventEmitter {
case errorCodes.FRAMER_INVALID_MSG_LENGTH:
this.logger.warn(`Peer (${this.label}): ${err.message}`);
this.emit('reputation', ReputationEvent.WireProtocolErr);
await this.close(DisconnectionReason.WireProtocolErr, err.message);
this.close(DisconnectionReason.WireProtocolErr, err.message);
break;
default:
await this.close();
this.close();
break;
}
});
Expand Down Expand Up @@ -864,7 +880,7 @@ class Peer extends EventEmitter {
* @param nodePubKey our node pub key
* @param expectedNodePubKey the expected node pub key of the sender of the init packet
*/
private authenticateSessionInit = async (
private authenticateSessionInit = (
packet: packets.SessionInitPacket,
nodePubKey: string,
expectedNodePubKey?: string,
Expand All @@ -878,14 +894,14 @@ class Peer extends EventEmitter {

// verify that the init packet came from the expected node
if (expectedNodePubKey && expectedNodePubKey !== sourceNodePubKey) {
await this.close(DisconnectionReason.UnexpectedIdentity);
this.close(DisconnectionReason.UnexpectedIdentity);
throw errors.UNEXPECTED_NODE_PUB_KEY(sourceNodePubKey, expectedNodePubKey, addressUtils.toString(this.address));
}

// verify that the init packet was intended for us
if (targetNodePubKey !== nodePubKey) {
this.emit('reputation', ReputationEvent.InvalidAuth);
await this.close(DisconnectionReason.AuthFailureInvalidTarget);
this.close(DisconnectionReason.AuthFailureInvalidTarget);
throw errors.AUTH_FAILURE_INVALID_TARGET(sourceNodePubKey, targetNodePubKey);
}

Expand All @@ -896,7 +912,7 @@ class Peer extends EventEmitter {

if (!verified) {
this.emit('reputation', ReputationEvent.InvalidAuth);
await this.close(DisconnectionReason.AuthFailureInvalidSignature);
this.close(DisconnectionReason.AuthFailureInvalidSignature);
throw errors.AUTH_FAILURE_INVALID_SIGNATURE(sourceNodePubKey);
}

Expand Down Expand Up @@ -973,11 +989,11 @@ class Peer extends EventEmitter {
assert(this.expectedNodePubKey);
await this.initSession(ownNodeState, ownNodeKey, ownVersion, this.expectedNodePubKey);
sessionInit = await this.waitSessionInit();
await this.authenticateSessionInit(sessionInit, ownNodeKey.pubKey, this.expectedNodePubKey);
this.authenticateSessionInit(sessionInit, ownNodeKey.pubKey, this.expectedNodePubKey);
} else {
// inbound handshake
sessionInit = await this.waitSessionInit();
await this.authenticateSessionInit(sessionInit, ownNodeKey.pubKey);
this.authenticateSessionInit(sessionInit, ownNodeKey.pubKey);
}
return sessionInit;
};
Expand Down
66 changes: 36 additions & 30 deletions lib/p2p/Pool.ts
Original file line number Diff line number Diff line change
Expand Up @@ -202,9 +202,10 @@ class Pool extends EventEmitter {

this.bindNodeList();

this.loadingNodesPromise = this.nodes.load();
this.loadingNodesPromise
this.loadingNodesPromise = this.nodes
.load()
.then(async () => {
// check to make sure we're not already in the process of disconnecting
if (this.disconnecting) {
this.loadingNodesPromise = undefined;
return;
Expand All @@ -222,6 +223,12 @@ class Pool extends EventEmitter {
}, Pool.SECONDARY_PEERS_DELAY);
}

// check again to make sure we're not in the process of disconnecting
if (this.disconnecting) {
this.loadingNodesPromise = undefined;
return;
}

if (primary.length > 0) {
this.logger.info('Connecting to known / previously connected peers');
await this.connectNodes(primary, true, true);
Expand Down Expand Up @@ -326,17 +333,17 @@ class Pool extends EventEmitter {
if (this.loadingNodesPromise) {
await this.loadingNodesPromise;
}
await Promise.all([
this.unlisten(),
this.closePendingConnections(DisconnectionReason.Shutdown),
this.closePeers(DisconnectionReason.Shutdown),
]);
await this.unlisten();

this.closePendingConnections(DisconnectionReason.Shutdown);
this.closePeers(DisconnectionReason.Shutdown);

this.connected = false;
this.disconnecting = false;
};

private bindNodeList = () => {
this.nodes.on('node.ban', async (nodePubKey: string, events: ReputationEvent[]) => {
this.nodes.on('node.ban', (nodePubKey: string, events: ReputationEvent[]) => {
const banReasonText =
events[0] !== ReputationEvent.ManualBan && ReputationEvent[events[0]]
? `due to ${ReputationEvent[events[0]]}`
Expand All @@ -345,7 +352,7 @@ class Pool extends EventEmitter {

const peer = this.peers.get(nodePubKey);
if (peer) {
await peer.close(DisconnectionReason.Banned, JSON.stringify(events));
peer.close(DisconnectionReason.Banned, JSON.stringify(events));
}
});
};
Expand All @@ -367,7 +374,7 @@ class Pool extends EventEmitter {
});
this.pendingOutboundPeers.delete(this.nodePubKey);

await peer.close();
peer.close();
assert.fail();
} catch (err) {
if (
Expand Down Expand Up @@ -599,7 +606,7 @@ class Pool extends EventEmitter {
torport: this.config.torport,
});

await this.validatePeer(peer);
this.validatePeer(peer);

await peer.completeOpen(this.nodeState, this.nodeKey, this.version, sessionInit);
} catch (err) {
Expand Down Expand Up @@ -629,7 +636,7 @@ class Pool extends EventEmitter {
// the already established connection is closed.
if (this.peers.has(peerPubKey)) {
if (this.nodePubKey > peerPubKey) {
await peer.close(DisconnectionReason.AlreadyConnected);
peer.close(DisconnectionReason.AlreadyConnected);
throw errors.NODE_ALREADY_CONNECTED(peerPubKey, peer.address);
} else {
const stillConnected = await new Promise<boolean>((resolve) => {
Expand All @@ -640,7 +647,7 @@ class Pool extends EventEmitter {
});
});
if (stillConnected) {
await peer.close(DisconnectionReason.AlreadyConnected);
peer.close(DisconnectionReason.AlreadyConnected);
throw errors.NODE_ALREADY_CONNECTED(peerPubKey, peer.address);
}
}
Expand Down Expand Up @@ -697,10 +704,10 @@ class Pool extends EventEmitter {
}
};

public closePeer = async (nodePubKey: string, reason?: DisconnectionReason, reasonPayload?: string) => {
public closePeer = (nodePubKey: string, reason?: DisconnectionReason, reasonPayload?: string) => {
const peer = this.peers.get(nodePubKey);
if (peer) {
await peer.close(reason, reasonPayload);
peer.close(reason, reasonPayload);
this.logger.info(`Disconnected from ${peer.nodePubKey}@${addressUtils.toString(peer.address)} (${peer.alias})`);
} else {
throw errors.NOT_CONNECTED(nodePubKey);
Expand Down Expand Up @@ -964,42 +971,42 @@ class Pool extends EventEmitter {
};

/** Validates a peer. If a check fails, closes the peer and throws a p2p error. */
private validatePeer = async (peer: Peer): Promise<void> => {
private validatePeer = (peer: Peer) => {
assert(peer.nodePubKey);
const peerPubKey = peer.nodePubKey;

if (peerPubKey === this.nodePubKey) {
await peer.close(DisconnectionReason.ConnectedToSelf);
peer.close(DisconnectionReason.ConnectedToSelf);
throw errors.ATTEMPTED_CONNECTION_TO_SELF;
}

// Check if version is semantic, and higher than minCompatibleVersion.
if (!semver.valid(peer.version)) {
await peer.close(DisconnectionReason.MalformedVersion);
peer.close(DisconnectionReason.MalformedVersion);
throw errors.MALFORMED_VERSION(addressUtils.toString(peer.address), peer.version);
}
// dev.note: compare returns 0 if v1 == v2, or 1 if v1 is greater, or -1 if v2 is greater.
if (semver.compare(peer.version, this.minCompatibleVersion) === -1) {
await peer.close(DisconnectionReason.IncompatibleProtocolVersion);
peer.close(DisconnectionReason.IncompatibleProtocolVersion);
throw errors.INCOMPATIBLE_VERSION(addressUtils.toString(peer.address), this.minCompatibleVersion, peer.version);
}

if (!this.connected) {
// if we have disconnected the pool, don't allow any new connections to open
await peer.close(DisconnectionReason.NotAcceptingConnections);
peer.close(DisconnectionReason.NotAcceptingConnections);
throw errors.POOL_CLOSED;
}

if (this.nodes.isBanned(peerPubKey)) {
// TODO: Ban IP address for this session if banned peer attempts repeated connections.
await peer.close(DisconnectionReason.Banned);
peer.close(DisconnectionReason.Banned);
throw errors.NODE_IS_BANNED(peerPubKey);
}

if (this.peers.has(peerPubKey)) {
// TODO: Penalize peers that attempt to create duplicate connections to us more than once.
// The first time might be due to connection retries.
await peer.close(DisconnectionReason.AlreadyConnected);
peer.close(DisconnectionReason.AlreadyConnected);
throw errors.NODE_ALREADY_CONNECTED(peerPubKey, peer.address);
}

Expand Down Expand Up @@ -1069,7 +1076,10 @@ class Pool extends EventEmitter {
});

peer.on('connFailure', async () => {
await this.nodes.incrementConsecutiveConnFailures(peer.expectedNodePubKey!);
if (this.connected && !this.disconnecting) {
// don't penalize nodes for connection failures due to us closing the pool
await this.nodes.incrementConsecutiveConnFailures(peer.expectedNodePubKey!);
}
});

peer.on('connect', async () => {
Expand Down Expand Up @@ -1117,22 +1127,18 @@ class Pool extends EventEmitter {
};

private closePeers = (reason?: DisconnectionReason) => {
const closePromises = [];
for (const peer of this.peers.values()) {
closePromises.push(peer.close(reason));
peer.close(reason);
}
return Promise.all(closePromises);
};

private closePendingConnections = (reason?: DisconnectionReason) => {
const closePromises = [];
for (const peer of this.pendingOutboundPeers.values()) {
closePromises.push(peer.close(reason));
peer.close(reason);
}
for (const peer of this.pendingInboundPeers) {
closePromises.push(peer.close(reason));
peer.close(reason);
}
return Promise.all(closePromises);
};

/**
Expand Down
Loading