diff --git a/src/protocol/libp2p/kademlia/mod.rs b/src/protocol/libp2p/kademlia/mod.rs index b59a1fcf..dcdfe43d 100644 --- a/src/protocol/libp2p/kademlia/mod.rs +++ b/src/protocol/libp2p/kademlia/mod.rs @@ -642,50 +642,71 @@ impl Kademlia { } } - /// Handle next query action. - async fn on_query_action(&mut self, action: QueryAction) -> Result<(), (QueryId, PeerId)> { - match action { - QueryAction::SendMessage { query, peer, .. } => match self.service.open_substream(peer) - { - Err(_) => { - tracing::trace!(target: LOG_TARGET, ?query, ?peer, "dial peer"); - - match self.service.dial(&peer) { - Ok(_) => match self.pending_dials.entry(peer) { - Entry::Occupied(entry) => { - entry.into_mut().push(PeerAction::SendFindNode(query)); + /// Open a substream with a peer or dial the peer. + fn open_substream_or_dial( + &mut self, + peer: PeerId, + action: PeerAction, + query: Option, + ) -> Result<(), Error> { + match self.service.open_substream(peer) { + Ok(substream_id) => { + self.pending_substreams.insert(substream_id, peer); + self.peers.entry(peer).or_default().pending_actions.insert(substream_id, action); + + Ok(()) + } + Err(err) => { + tracing::trace!(target: LOG_TARGET, ?query, ?peer, ?err, "Failed to open substream. Dialing peer"); + + match self.service.dial(&peer) { + Ok(()) => { + self.pending_dials.entry(peer).or_default().push(action); + Ok(()) + } + + // Already connected is a recoverable error. + Err(Error::AlreadyConnected) => { + // Dial returned `Error::AlreadyConnected`, retry opening the substream. + match self.service.open_substream(peer) { + Ok(substream_id) => { + self.pending_substreams.insert(substream_id, peer); + self.peers + .entry(peer) + .or_default() + .pending_actions + .insert(substream_id, action); + Ok(()) } - Entry::Vacant(entry) => { - entry.insert(vec![PeerAction::SendFindNode(query)]); + Err(err) => { + tracing::trace!(target: LOG_TARGET, ?query, ?peer, ?err, "Failed to open substream a second time"); + Err(err) } - }, - Err(error) => { - tracing::trace!(target: LOG_TARGET, ?query, ?peer, ?error, "failed to dial peer"); - self.engine.register_response_failure(query, peer); } } - Ok(()) + Err(error) => { + tracing::trace!(target: LOG_TARGET, ?query, ?peer, ?error, "Failed to dial peer"); + Err(error) + } } - Ok(substream_id) => { - tracing::trace!( - target: LOG_TARGET, - ?query, - ?peer, - ?substream_id, - "open outbound substream for peer" - ); - - self.pending_substreams.insert(substream_id, peer); - self.peers - .entry(peer) - .or_default() - .pending_actions - .insert(substream_id, PeerAction::SendFindNode(query)); + } + } + } - Ok(()) + /// Handle next query action. + async fn on_query_action(&mut self, action: QueryAction) -> Result<(), (QueryId, PeerId)> { + match action { + QueryAction::SendMessage { query, peer, .. } => { + if self + .open_substream_or_dial(peer, PeerAction::SendFindNode(query), Some(query)) + .is_err() + { + // Announce the error to the query engine. + self.engine.register_response_failure(query, peer); } - }, + Ok(()) + } QueryAction::FindNodeQuerySucceeded { target, peers, @@ -720,36 +741,18 @@ impl Kademlia { let message = KademliaMessage::put_value(record); for peer in peers { - match self.service.open_substream(peer.peer) { - Ok(substream_id) => { - self.pending_substreams.insert(substream_id, peer.peer); - self.peers - .entry(peer.peer) - .or_default() - .pending_actions - .insert(substream_id, PeerAction::SendPutValue(message.clone())); - } - Err(_) => match self.service.dial(&peer.peer) { - Ok(_) => match self.pending_dials.entry(peer.peer) { - Entry::Occupied(entry) => { - entry - .into_mut() - .push(PeerAction::SendPutValue(message.clone())); - } - Entry::Vacant(entry) => { - entry.insert(vec![PeerAction::SendPutValue(message.clone())]); - } - }, - Err(error) => { - tracing::debug!( - target: LOG_TARGET, - ?peer, - ?key, - ?error, - "failed to dial peer", - ); - } - }, + if let Err(error) = self.open_substream_or_dial( + peer.peer, + PeerAction::SendPutValue(message.clone()), + None, + ) { + tracing::debug!( + target: LOG_TARGET, + ?peer, + ?key, + ?error, + "failed to put record to peer", + ); } } diff --git a/src/protocol/libp2p/kademlia/query/find_node.rs b/src/protocol/libp2p/kademlia/query/find_node.rs index 2d168e94..ce63f95a 100644 --- a/src/protocol/libp2p/kademlia/query/find_node.rs +++ b/src/protocol/libp2p/kademlia/query/find_node.rs @@ -233,6 +233,14 @@ impl>> FindNodeContext { // If we cannot make progress, return the final result. // A query failed when we are not able to identify one single peer. if self.is_done() { + tracing::trace!( + target: LOG_TARGET, + query = ?self.config.query, + pending = self.pending.len(), + candidates = self.candidates.len(), + "query finished" + ); + return if self.responses.is_empty() { Some(QueryAction::QueryFailed { query: self.config.query,