Skip to content

Commit

Permalink
refactor(p2p): better pool closing
Browse files Browse the repository at this point in the history
This makes several improvements to the way we close the p2p pool.

1. Make the peer `close` function synchronous by not awaiting the
disconnection reason packet being sent. Previously we would wait and
hold up the closing flow, which could introduce delays on stale or
dead sockets. It also meant there was different behavior for closing
peers with or without a disconnection reason. Instead we continue
with all the steps for closing a peer and destroy the socket whenever
the socket write attempt is complete.

2. Immediately destroy the socket if it is in `connecting` state,
don't even attempt to send a disconnection reason packet since it
would not work anyway.

3. Don't increment the connection failure attempt counter for peers
when/after we close the pool and thus are canceling any ongoing
connection attempts. This goes against the spirit of the connection
failure counter and also means that he pool may be making db writes
after it is closed (and potentially after the database has closed).

Then in the Pool test suite, this flips the order of the db close
and the pool close so that the pool is second, which makes sense
since it depends on an open, functioning database. It also moves
the logic in the `before` block up to the `describe` block, the
`before` block wasn't properly completing before the test cases
began running. See https://mochajs.org/#asynchronous-code.
  • Loading branch information
sangaman committed Jan 5, 2021
1 parent cd75416 commit 3b45d73
Show file tree
Hide file tree
Showing 3 changed files with 103 additions and 86 deletions.
66 changes: 41 additions & 25 deletions lib/p2p/Peer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -334,7 +334,7 @@ class Peer extends EventEmitter {
/**
* Close a peer by ensuring the socket is destroyed and terminating all timers.
*/
public close = async (reason?: DisconnectionReason, reasonPayload?: string): Promise<void> => {
public close = (reason?: DisconnectionReason, reasonPayload?: string) => {
if (this.status === PeerStatus.Closed) {
return;
}
Expand All @@ -343,17 +343,33 @@ class Peer extends EventEmitter {
this.revokeConnectionRetries();

if (this.socket) {
if (!this.socket.destroyed) {
if (reason !== undefined) {
this.logger.debug(`Peer ${this.label}: closing socket. reason: ${DisconnectionReason[reason]}`);
this.sentDisconnectionReason = reason;
await this.sendPacket(new packets.DisconnectingPacket({ reason, payload: reasonPayload }));
}

if (this.socket.destroyed) {
// the socket has already been destroyed, it cannot be used to send further packets
// and does not need further destruction. it can simply be deleted from memory
delete this.socket;
} else if (this.socket.connecting || reason === undefined) {
// either we are still connecting (socket.connect has been called but not finished) the socket and
// thus we cannot transmit packets to the peer yet, or we have not specified a disconnection reason
// either way, we don't need to send a disconnecting packet and can destroy the socket right away
this.socket.destroy();
this.socket.removeAllListeners();
delete this.socket;
} else {
// try to send the peer a disconnecting reason packet before destroying the socket
this.logger.debug(`Peer ${this.label}: closing socket. reason: ${DisconnectionReason[reason]}`);
this.sentDisconnectionReason = reason;

// we send the disconnection packet as a "best effort" and don't wait for the data to be written to the socket
// in case the socket is dead or stalled, we don't want the delay to hold up the flow of closing this peer
this.sendPacket(new packets.DisconnectingPacket({ reason, payload: reasonPayload }))
.finally(() => {
// destroy the socket after sending the disconnecting packet (if any)
this.socket?.destroy();
this.socket?.removeAllListeners();
delete this.socket;
})
.catch(this.logger.error);
}
delete this.socket;
}

if (this.retryConnectionTimer) {
Expand Down Expand Up @@ -389,7 +405,7 @@ class Peer extends EventEmitter {
DisconnectionReason[this.recvDisconnectionReason]
}`;
} else {
rejectionMsg = `Peer ${this.label} was destroyed`;
rejectionMsg = `Peer ${this.label} was closed`;
}

for (const [packetType, entry] of this.responseMap) {
Expand Down Expand Up @@ -592,25 +608,25 @@ class Peer extends EventEmitter {
resolve();
};

const onError = async (err: Error) => {
const onError = (err: Error) => {
cleanup();

this.emit('connFailure');

if (!retry) {
await this.close();
this.close();
reject(errors.COULD_NOT_CONNECT(this.address, err));
return;
}

if (Date.now() - startTime + retryDelay > Peer.CONNECTION_RETRIES_MAX_PERIOD) {
await this.close();
this.close();
reject(errors.CONNECTION_RETRIES_MAX_PERIOD_EXCEEDED);
return;
}

if (this.connectionRetriesRevoked) {
await this.close();
this.close();
reject(errors.CONNECTION_RETRIES_REVOKED);
return;
}
Expand Down Expand Up @@ -675,7 +691,7 @@ class Peer extends EventEmitter {
/**
* Potentially timeout peer if it hasn't responded.
*/
private checkTimeout = async () => {
private checkTimeout = () => {
const now = ms();

for (const [packetId, entry] of this.responseMap) {
Expand All @@ -684,7 +700,7 @@ class Peer extends EventEmitter {
const err = errors.RESPONSE_TIMEOUT(request);
this.logger.error(`Peer timed out waiting for response to packet ${packetId}`);
entry.reject(err);
await this.close(DisconnectionReason.ResponseStalling, packetId);
this.close(DisconnectionReason.ResponseStalling, packetId);
}
}
};
Expand Down Expand Up @@ -770,7 +786,7 @@ class Peer extends EventEmitter {
} else {
this.logger.info(`Peer ${this.label} socket closed`);
}
await this.close();
this.close();
});

this.socket.on('data', this.parser.feed);
Expand All @@ -797,10 +813,10 @@ class Peer extends EventEmitter {
case errorCodes.FRAMER_INVALID_MSG_LENGTH:
this.logger.warn(`Peer (${this.label}): ${err.message}`);
this.emit('reputation', ReputationEvent.WireProtocolErr);
await this.close(DisconnectionReason.WireProtocolErr, err.message);
this.close(DisconnectionReason.WireProtocolErr, err.message);
break;
default:
await this.close();
this.close();
break;
}
});
Expand Down Expand Up @@ -864,7 +880,7 @@ class Peer extends EventEmitter {
* @param nodePubKey our node pub key
* @param expectedNodePubKey the expected node pub key of the sender of the init packet
*/
private authenticateSessionInit = async (
private authenticateSessionInit = (
packet: packets.SessionInitPacket,
nodePubKey: string,
expectedNodePubKey?: string,
Expand All @@ -878,14 +894,14 @@ class Peer extends EventEmitter {

// verify that the init packet came from the expected node
if (expectedNodePubKey && expectedNodePubKey !== sourceNodePubKey) {
await this.close(DisconnectionReason.UnexpectedIdentity);
this.close(DisconnectionReason.UnexpectedIdentity);
throw errors.UNEXPECTED_NODE_PUB_KEY(sourceNodePubKey, expectedNodePubKey, addressUtils.toString(this.address));
}

// verify that the init packet was intended for us
if (targetNodePubKey !== nodePubKey) {
this.emit('reputation', ReputationEvent.InvalidAuth);
await this.close(DisconnectionReason.AuthFailureInvalidTarget);
this.close(DisconnectionReason.AuthFailureInvalidTarget);
throw errors.AUTH_FAILURE_INVALID_TARGET(sourceNodePubKey, targetNodePubKey);
}

Expand All @@ -896,7 +912,7 @@ class Peer extends EventEmitter {

if (!verified) {
this.emit('reputation', ReputationEvent.InvalidAuth);
await this.close(DisconnectionReason.AuthFailureInvalidSignature);
this.close(DisconnectionReason.AuthFailureInvalidSignature);
throw errors.AUTH_FAILURE_INVALID_SIGNATURE(sourceNodePubKey);
}

Expand Down Expand Up @@ -973,11 +989,11 @@ class Peer extends EventEmitter {
assert(this.expectedNodePubKey);
await this.initSession(ownNodeState, ownNodeKey, ownVersion, this.expectedNodePubKey);
sessionInit = await this.waitSessionInit();
await this.authenticateSessionInit(sessionInit, ownNodeKey.pubKey, this.expectedNodePubKey);
this.authenticateSessionInit(sessionInit, ownNodeKey.pubKey, this.expectedNodePubKey);
} else {
// inbound handshake
sessionInit = await this.waitSessionInit();
await this.authenticateSessionInit(sessionInit, ownNodeKey.pubKey);
this.authenticateSessionInit(sessionInit, ownNodeKey.pubKey);
}
return sessionInit;
};
Expand Down
66 changes: 36 additions & 30 deletions lib/p2p/Pool.ts
Original file line number Diff line number Diff line change
Expand Up @@ -202,9 +202,10 @@ class Pool extends EventEmitter {

this.bindNodeList();

this.loadingNodesPromise = this.nodes.load();
this.loadingNodesPromise
this.loadingNodesPromise = this.nodes
.load()
.then(async () => {
// check to make sure we're not already in the process of disconnecting
if (this.disconnecting) {
this.loadingNodesPromise = undefined;
return;
Expand All @@ -222,6 +223,12 @@ class Pool extends EventEmitter {
}, Pool.SECONDARY_PEERS_DELAY);
}

// check again to make sure we're not in the process of disconnecting
if (this.disconnecting) {
this.loadingNodesPromise = undefined;
return;
}

if (primary.length > 0) {
this.logger.info('Connecting to known / previously connected peers');
await this.connectNodes(primary, true, true);
Expand Down Expand Up @@ -326,17 +333,17 @@ class Pool extends EventEmitter {
if (this.loadingNodesPromise) {
await this.loadingNodesPromise;
}
await Promise.all([
this.unlisten(),
this.closePendingConnections(DisconnectionReason.Shutdown),
this.closePeers(DisconnectionReason.Shutdown),
]);
await this.unlisten();

this.closePendingConnections(DisconnectionReason.Shutdown);
this.closePeers(DisconnectionReason.Shutdown);

this.connected = false;
this.disconnecting = false;
};

private bindNodeList = () => {
this.nodes.on('node.ban', async (nodePubKey: string, events: ReputationEvent[]) => {
this.nodes.on('node.ban', (nodePubKey: string, events: ReputationEvent[]) => {
const banReasonText =
events[0] !== ReputationEvent.ManualBan && ReputationEvent[events[0]]
? `due to ${ReputationEvent[events[0]]}`
Expand All @@ -345,7 +352,7 @@ class Pool extends EventEmitter {

const peer = this.peers.get(nodePubKey);
if (peer) {
await peer.close(DisconnectionReason.Banned, JSON.stringify(events));
peer.close(DisconnectionReason.Banned, JSON.stringify(events));
}
});
};
Expand All @@ -367,7 +374,7 @@ class Pool extends EventEmitter {
});
this.pendingOutboundPeers.delete(this.nodePubKey);

await peer.close();
peer.close();
assert.fail();
} catch (err) {
if (
Expand Down Expand Up @@ -599,7 +606,7 @@ class Pool extends EventEmitter {
torport: this.config.torport,
});

await this.validatePeer(peer);
this.validatePeer(peer);

await peer.completeOpen(this.nodeState, this.nodeKey, this.version, sessionInit);
} catch (err) {
Expand Down Expand Up @@ -629,7 +636,7 @@ class Pool extends EventEmitter {
// the already established connection is closed.
if (this.peers.has(peerPubKey)) {
if (this.nodePubKey > peerPubKey) {
await peer.close(DisconnectionReason.AlreadyConnected);
peer.close(DisconnectionReason.AlreadyConnected);
throw errors.NODE_ALREADY_CONNECTED(peerPubKey, peer.address);
} else {
const stillConnected = await new Promise<boolean>((resolve) => {
Expand All @@ -640,7 +647,7 @@ class Pool extends EventEmitter {
});
});
if (stillConnected) {
await peer.close(DisconnectionReason.AlreadyConnected);
peer.close(DisconnectionReason.AlreadyConnected);
throw errors.NODE_ALREADY_CONNECTED(peerPubKey, peer.address);
}
}
Expand Down Expand Up @@ -697,10 +704,10 @@ class Pool extends EventEmitter {
}
};

public closePeer = async (nodePubKey: string, reason?: DisconnectionReason, reasonPayload?: string) => {
public closePeer = (nodePubKey: string, reason?: DisconnectionReason, reasonPayload?: string) => {
const peer = this.peers.get(nodePubKey);
if (peer) {
await peer.close(reason, reasonPayload);
peer.close(reason, reasonPayload);
this.logger.info(`Disconnected from ${peer.nodePubKey}@${addressUtils.toString(peer.address)} (${peer.alias})`);
} else {
throw errors.NOT_CONNECTED(nodePubKey);
Expand Down Expand Up @@ -964,42 +971,42 @@ class Pool extends EventEmitter {
};

/** Validates a peer. If a check fails, closes the peer and throws a p2p error. */
private validatePeer = async (peer: Peer): Promise<void> => {
private validatePeer = (peer: Peer) => {
assert(peer.nodePubKey);
const peerPubKey = peer.nodePubKey;

if (peerPubKey === this.nodePubKey) {
await peer.close(DisconnectionReason.ConnectedToSelf);
peer.close(DisconnectionReason.ConnectedToSelf);
throw errors.ATTEMPTED_CONNECTION_TO_SELF;
}

// Check if version is semantic, and higher than minCompatibleVersion.
if (!semver.valid(peer.version)) {
await peer.close(DisconnectionReason.MalformedVersion);
peer.close(DisconnectionReason.MalformedVersion);
throw errors.MALFORMED_VERSION(addressUtils.toString(peer.address), peer.version);
}
// dev.note: compare returns 0 if v1 == v2, or 1 if v1 is greater, or -1 if v2 is greater.
if (semver.compare(peer.version, this.minCompatibleVersion) === -1) {
await peer.close(DisconnectionReason.IncompatibleProtocolVersion);
peer.close(DisconnectionReason.IncompatibleProtocolVersion);
throw errors.INCOMPATIBLE_VERSION(addressUtils.toString(peer.address), this.minCompatibleVersion, peer.version);
}

if (!this.connected) {
// if we have disconnected the pool, don't allow any new connections to open
await peer.close(DisconnectionReason.NotAcceptingConnections);
peer.close(DisconnectionReason.NotAcceptingConnections);
throw errors.POOL_CLOSED;
}

if (this.nodes.isBanned(peerPubKey)) {
// TODO: Ban IP address for this session if banned peer attempts repeated connections.
await peer.close(DisconnectionReason.Banned);
peer.close(DisconnectionReason.Banned);
throw errors.NODE_IS_BANNED(peerPubKey);
}

if (this.peers.has(peerPubKey)) {
// TODO: Penalize peers that attempt to create duplicate connections to us more than once.
// The first time might be due to connection retries.
await peer.close(DisconnectionReason.AlreadyConnected);
peer.close(DisconnectionReason.AlreadyConnected);
throw errors.NODE_ALREADY_CONNECTED(peerPubKey, peer.address);
}

Expand Down Expand Up @@ -1069,7 +1076,10 @@ class Pool extends EventEmitter {
});

peer.on('connFailure', async () => {
await this.nodes.incrementConsecutiveConnFailures(peer.expectedNodePubKey!);
if (this.connected && !this.disconnecting) {
// don't penalize nodes for connection failures due to us closing the pool
await this.nodes.incrementConsecutiveConnFailures(peer.expectedNodePubKey!);
}
});

peer.on('connect', async () => {
Expand Down Expand Up @@ -1117,22 +1127,18 @@ class Pool extends EventEmitter {
};

private closePeers = (reason?: DisconnectionReason) => {
const closePromises = [];
for (const peer of this.peers.values()) {
closePromises.push(peer.close(reason));
peer.close(reason);
}
return Promise.all(closePromises);
};

private closePendingConnections = (reason?: DisconnectionReason) => {
const closePromises = [];
for (const peer of this.pendingOutboundPeers.values()) {
closePromises.push(peer.close(reason));
peer.close(reason);
}
for (const peer of this.pendingInboundPeers) {
closePromises.push(peer.close(reason));
peer.close(reason);
}
return Promise.all(closePromises);
};

/**
Expand Down
Loading

0 comments on commit 3b45d73

Please sign in to comment.