Skip to content

Commit

Permalink
kad: Fix substream opening and dialing race (#222)
Browse files Browse the repository at this point in the history
This PR makes the substream opening and dialing of query commands more
robust.

In the past, the logic of a query action would:
- T0; try to open a substream
- T1; try to dial the peer

There is a race between the moment when a substream is opened and when
the peer is dialed (T0 and T1), where the peer could connect to us. The
race manifested after extensively running `FindNode` commands via
[subp2p-explorer
bench-cli](https://github.com/lexnv/subp2p-explorer/tree/main/bench-cli)

To mitigate this race, retry to open the substream on recoverable errors
from dialing (ie Error::PeerAlreadyConnected).

cc @paritytech/networking

---------

Signed-off-by: Alexandru Vasile <[email protected]>
  • Loading branch information
lexnv authored Aug 30, 2024
1 parent 5b8183b commit 86f2a30
Show file tree
Hide file tree
Showing 2 changed files with 77 additions and 66 deletions.
135 changes: 69 additions & 66 deletions src/protocol/libp2p/kademlia/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -642,50 +642,71 @@ impl Kademlia {
}
}

/// Handle next query action.
async fn on_query_action(&mut self, action: QueryAction) -> Result<(), (QueryId, PeerId)> {
match action {
QueryAction::SendMessage { query, peer, .. } => match self.service.open_substream(peer)
{
Err(_) => {
tracing::trace!(target: LOG_TARGET, ?query, ?peer, "dial peer");

match self.service.dial(&peer) {
Ok(_) => match self.pending_dials.entry(peer) {
Entry::Occupied(entry) => {
entry.into_mut().push(PeerAction::SendFindNode(query));
/// Open a substream with a peer or dial the peer.
fn open_substream_or_dial(
&mut self,
peer: PeerId,
action: PeerAction,
query: Option<QueryId>,
) -> Result<(), Error> {
match self.service.open_substream(peer) {
Ok(substream_id) => {
self.pending_substreams.insert(substream_id, peer);
self.peers.entry(peer).or_default().pending_actions.insert(substream_id, action);

Ok(())
}
Err(err) => {
tracing::trace!(target: LOG_TARGET, ?query, ?peer, ?err, "Failed to open substream. Dialing peer");

match self.service.dial(&peer) {
Ok(()) => {
self.pending_dials.entry(peer).or_default().push(action);
Ok(())
}

// Already connected is a recoverable error.
Err(Error::AlreadyConnected) => {
// Dial returned `Error::AlreadyConnected`, retry opening the substream.
match self.service.open_substream(peer) {
Ok(substream_id) => {
self.pending_substreams.insert(substream_id, peer);
self.peers
.entry(peer)
.or_default()
.pending_actions
.insert(substream_id, action);
Ok(())
}
Entry::Vacant(entry) => {
entry.insert(vec![PeerAction::SendFindNode(query)]);
Err(err) => {
tracing::trace!(target: LOG_TARGET, ?query, ?peer, ?err, "Failed to open substream a second time");
Err(err)
}
},
Err(error) => {
tracing::trace!(target: LOG_TARGET, ?query, ?peer, ?error, "failed to dial peer");
self.engine.register_response_failure(query, peer);
}
}

Ok(())
Err(error) => {
tracing::trace!(target: LOG_TARGET, ?query, ?peer, ?error, "Failed to dial peer");
Err(error)
}
}
Ok(substream_id) => {
tracing::trace!(
target: LOG_TARGET,
?query,
?peer,
?substream_id,
"open outbound substream for peer"
);

self.pending_substreams.insert(substream_id, peer);
self.peers
.entry(peer)
.or_default()
.pending_actions
.insert(substream_id, PeerAction::SendFindNode(query));
}
}
}

Ok(())
/// Handle next query action.
async fn on_query_action(&mut self, action: QueryAction) -> Result<(), (QueryId, PeerId)> {
match action {
QueryAction::SendMessage { query, peer, .. } => {
if self
.open_substream_or_dial(peer, PeerAction::SendFindNode(query), Some(query))
.is_err()
{
// Announce the error to the query engine.
self.engine.register_response_failure(query, peer);
}
},
Ok(())
}
QueryAction::FindNodeQuerySucceeded {
target,
peers,
Expand Down Expand Up @@ -720,36 +741,18 @@ impl Kademlia {
let message = KademliaMessage::put_value(record);

for peer in peers {
match self.service.open_substream(peer.peer) {
Ok(substream_id) => {
self.pending_substreams.insert(substream_id, peer.peer);
self.peers
.entry(peer.peer)
.or_default()
.pending_actions
.insert(substream_id, PeerAction::SendPutValue(message.clone()));
}
Err(_) => match self.service.dial(&peer.peer) {
Ok(_) => match self.pending_dials.entry(peer.peer) {
Entry::Occupied(entry) => {
entry
.into_mut()
.push(PeerAction::SendPutValue(message.clone()));
}
Entry::Vacant(entry) => {
entry.insert(vec![PeerAction::SendPutValue(message.clone())]);
}
},
Err(error) => {
tracing::debug!(
target: LOG_TARGET,
?peer,
?key,
?error,
"failed to dial peer",
);
}
},
if let Err(error) = self.open_substream_or_dial(
peer.peer,
PeerAction::SendPutValue(message.clone()),
None,
) {
tracing::debug!(
target: LOG_TARGET,
?peer,
?key,
?error,
"failed to put record to peer",
);
}
}

Expand Down
8 changes: 8 additions & 0 deletions src/protocol/libp2p/kademlia/query/find_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,14 @@ impl<T: Clone + Into<Vec<u8>>> FindNodeContext<T> {
// If we cannot make progress, return the final result.
// A query failed when we are not able to identify one single peer.
if self.is_done() {
tracing::trace!(
target: LOG_TARGET,
query = ?self.config.query,
pending = self.pending.len(),
candidates = self.candidates.len(),
"query finished"
);

return if self.responses.is_empty() {
Some(QueryAction::QueryFailed {
query: self.config.query,
Expand Down

0 comments on commit 86f2a30

Please sign in to comment.