Skip to content

Commit

Permalink
Gossipsub backpressure (libp2p#549)
Browse files Browse the repository at this point in the history
* feat(gossipsub): introduce backpressure

* remove duplicated poll from RpcReceiver

* make RpcReceiver part of RpcSender,

and clone it per ConnectionHandler, this will allow us to always have an open Receiver.

* split send_message into multiple RpcSender methods,

to allow for better handling of each message send.

* cargo clippy

* add CHANGELOG.md entry

* Update protocols/gossipsub/src/types.rs

---------

Co-authored-by: João Oliveira <[email protected]>
  • Loading branch information
AgeManning and jxs authored Nov 30, 2023
1 parent dfd66f9 commit a4a3f39
Show file tree
Hide file tree
Showing 8 changed files with 682 additions and 424 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 6 additions & 2 deletions protocols/gossipsub/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
## 0.46.1
## 0.46.1 - unreleased

- Implement backpressure by diferentiating between priority and non priority messages.
Drop `Publish` and `Forward` messages when the queue becomes full.
See [PR 4914](https://github.com/libp2p/rust-libp2p/pull/4914)

- Deprecate `Rpc` in preparation for removing it from the public API because it is an internal type.
See [PR 4833](https://github.com/libp2p/rust-libp2p/pull/4833).
See [PR 4833](https://github.com/libp2p/rust-libp2p/pull/4833).

## 0.46.0

Expand Down
1 change: 1 addition & 0 deletions protocols/gossipsub/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ sha2 = "0.10.8"
smallvec = "1.11.2"
tracing = "0.1.37"
void = "1.0.2"
async-channel = "1.9.0"

# Metrics dependencies
prometheus-client = { workspace = true }
Expand Down
153 changes: 108 additions & 45 deletions protocols/gossipsub/src/behaviour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@ use libp2p_swarm::{
THandlerOutEvent, ToSwarm,
};

use crate::backoff::BackoffStorage;
use crate::config::{Config, ValidationMode};
use crate::gossip_promises::GossipPromises;
use crate::handler::{Handler, HandlerEvent, HandlerIn};
Expand All @@ -61,7 +60,8 @@ use crate::types::{
ControlAction, Message, MessageAcceptance, MessageId, PeerInfo, RawMessage, Subscription,
SubscriptionAction,
};
use crate::types::{PeerConnections, PeerKind, RpcOut};
use crate::types::{PeerConnections, PeerKind};
use crate::{backoff::BackoffStorage, types::RpcSender};
use crate::{rpc_proto::proto, TopicScoreParams};
use crate::{PublishError, SubscriptionError, ValidationError};
use instant::SystemTime;
Expand Down Expand Up @@ -332,6 +332,9 @@ pub struct Behaviour<D = IdentityTransform, F = AllowAllSubscriptionFilter> {

/// Keep track of a set of internal metrics relating to gossipsub.
metrics: Option<Metrics>,

/// Connection handler message queue channels.
handler_send_queues: HashMap<PeerId, RpcSender>,
}

impl<D, F> Behaviour<D, F>
Expand Down Expand Up @@ -471,6 +474,7 @@ where
config,
subscription_filter,
data_transform,
handler_send_queues: Default::default(),
})
}
}
Expand Down Expand Up @@ -534,10 +538,14 @@ where
}

// send subscription request to all peers
for peer in self.peer_topics.keys().copied().collect::<Vec<_>>() {
for peer in self.peer_topics.keys() {
tracing::debug!(%peer, "Sending SUBSCRIBE to peer");
let event = RpcOut::Subscribe(topic_hash.clone());
self.send_message(peer, event);
let sender = self
.handler_send_queues
.get_mut(peer)
.expect("Peerid should exist");

sender.subscribe(topic_hash.clone());
}

// call JOIN(topic)
Expand All @@ -561,10 +569,14 @@ where
}

// announce to all peers
for peer in self.peer_topics.keys().copied().collect::<Vec<_>>() {
for peer in self.peer_topics.keys() {
tracing::debug!(%peer, "Sending UNSUBSCRIBE to peer");
let event = RpcOut::Unsubscribe(topic_hash.clone());
self.send_message(peer, event);
let sender = self
.handler_send_queues
.get_mut(peer)
.expect("Peerid should exist");

sender.unsubscribe(topic_hash.clone());
}

// call LEAVE(topic)
Expand Down Expand Up @@ -711,9 +723,23 @@ where
}

// Send to peers we know are subscribed to the topic.
let mut errors = 0;
for peer_id in recipient_peers.iter() {
tracing::trace!(peer=%peer_id, "Sending message to peer");
self.send_message(*peer_id, RpcOut::Publish(raw_message.clone()));
let sender = self
.handler_send_queues
.get_mut(peer_id)
.expect("Peerid should exist");

if sender
.publish(raw_message.clone(), self.metrics.as_mut())
.is_err()
{
errors += 1;
}
}
if errors == recipient_peers.len() {
return Err(PublishError::InsufficientPeers);
}

tracing::debug!(message=%msg_id, "Published message");
Expand Down Expand Up @@ -1311,7 +1337,12 @@ where
);
} else {
tracing::debug!(peer=%peer_id, "IWANT: Sending cached messages to peer");
self.send_message(*peer_id, RpcOut::Forward(msg));
let sender = self
.handler_send_queues
.get_mut(peer_id)
.expect("Peerid should exist");

sender.forward(msg, self.metrics.as_mut());
}
}
}
Expand Down Expand Up @@ -1464,12 +1495,17 @@ where
if !to_prune_topics.is_empty() {
// build the prune messages to send
let on_unsubscribe = false;
let mut sender = self
.handler_send_queues
.get_mut(peer_id)
.expect("Peerid should exist")
.clone();

for action in to_prune_topics
.iter()
.map(|t| self.make_prune(t, peer_id, do_px, on_unsubscribe))
.collect::<Vec<_>>()
{
self.send_message(*peer_id, RpcOut::Control(action));
sender.control(action);
}
// Send the prune messages to the peer
tracing::debug!(
Expand Down Expand Up @@ -1964,12 +2000,16 @@ where

// If we need to send grafts to peer, do so immediately, rather than waiting for the
// heartbeat.
let sender = self
.handler_send_queues
.get_mut(propagation_source)
.expect("Peerid should exist");

for action in topics_to_graft
.into_iter()
.map(|topic_hash| ControlAction::Graft { topic_hash })
.collect::<Vec<_>>()
{
self.send_message(*propagation_source, RpcOut::Control(action))
sender.control(action);
}

// Notify the application of the subscriptions
Expand Down Expand Up @@ -2504,6 +2544,13 @@ where
// It therefore must be in at least one mesh and we do not need to inform the handler
// of its removal from another.

// send the control messages
let mut sender = self
.handler_send_queues
.get_mut(&peer)
.expect("Peerid should exist")
.clone();

// The following prunes are not due to unsubscribing.
let prunes = to_prune
.remove(&peer)
Expand All @@ -2518,9 +2565,8 @@ where
)
});

// send the control messages
for msg in control_msgs.chain(prunes).collect::<Vec<_>>() {
self.send_message(peer, RpcOut::Control(msg));
for msg in control_msgs.chain(prunes) {
sender.control(msg);
}
}

Expand All @@ -2534,7 +2580,13 @@ where
self.config.do_px() && !no_px.contains(peer),
false,
);
self.send_message(*peer, RpcOut::Control(prune));
let mut sender = self
.handler_send_queues
.get_mut(peer)
.expect("Peerid should exist")
.clone();

sender.control(prune);

// inform the handler
peer_removed_from_mesh(
Expand Down Expand Up @@ -2602,11 +2654,13 @@ where

// forward the message to peers
if !recipient_peers.is_empty() {
let event = RpcOut::Forward(message.clone());

for peer in recipient_peers.iter() {
tracing::debug!(%peer, message=%msg_id, "Sending message to peer");
self.send_message(*peer, event.clone());
let sender = self
.handler_send_queues
.get_mut(peer)
.expect("Peerid should exist");
sender.forward(message.clone(), self.metrics.as_mut());
}
tracing::debug!("Completed forwarding message");
Ok(true)
Expand Down Expand Up @@ -2720,31 +2774,19 @@ where
fn flush_control_pool(&mut self) {
for (peer, controls) in self.control_pool.drain().collect::<Vec<_>>() {
for msg in controls {
self.send_message(peer, RpcOut::Control(msg));
let sender = self
.handler_send_queues
.get_mut(&peer)
.expect("Peerid should exist");

sender.control(msg);
}
}

// This clears all pending IWANT messages
self.pending_iwant_msgs.clear();
}

/// Send a [`RpcOut`] message to a peer. This will wrap the message in an arc if it
/// is not already an arc.
fn send_message(&mut self, peer_id: PeerId, rpc: RpcOut) {
if let Some(m) = self.metrics.as_mut() {
if let RpcOut::Publish(ref message) | RpcOut::Forward(ref message) = rpc {
// register bytes sent on the internal metrics.
m.msg_sent(&message.topic, message.raw_protobuf_len());
}
}

self.events.push_back(ToSwarm::NotifyHandler {
peer_id,
event: HandlerIn::Message(rpc),
handler: NotifyHandler::Any,
});
}

fn on_connection_established(
&mut self,
ConnectionEstablished {
Expand Down Expand Up @@ -2810,8 +2852,14 @@ where

tracing::debug!(peer=%peer_id, "New peer connected");
// We need to send our subscriptions to the newly-connected node.
let mut sender = self
.handler_send_queues
.get_mut(&peer_id)
.expect("Peerid should exist")
.clone();

for topic_hash in self.mesh.clone().into_keys() {
self.send_message(peer_id, RpcOut::Subscribe(topic_hash));
sender.subscribe(topic_hash);
}
}

Expand Down Expand Up @@ -2939,6 +2987,7 @@ where
}

self.connected_peers.remove(&peer_id);
self.handler_send_queues.remove(&peer_id);

if let Some((peer_score, ..)) = &mut self.peer_score {
peer_score.remove_peer(&peer_id);
Expand Down Expand Up @@ -2998,21 +3047,35 @@ where
fn handle_established_inbound_connection(
&mut self,
_: ConnectionId,
_: PeerId,
peer_id: PeerId,
_: &Multiaddr,
_: &Multiaddr,
) -> Result<THandler<Self>, ConnectionDenied> {
Ok(Handler::new(self.config.protocol_config()))
let sender = self
.handler_send_queues
.entry(peer_id)
.or_insert_with(|| RpcSender::new(peer_id, self.config.connection_handler_queue_len()));
Ok(Handler::new(
self.config.protocol_config(),
sender.new_receiver(),
))
}

fn handle_established_outbound_connection(
&mut self,
_: ConnectionId,
_: PeerId,
peer_id: PeerId,
_: &Multiaddr,
_: Endpoint,
) -> Result<THandler<Self>, ConnectionDenied> {
Ok(Handler::new(self.config.protocol_config()))
let sender = self
.handler_send_queues
.entry(peer_id)
.or_insert_with(|| RpcSender::new(peer_id, self.config.connection_handler_queue_len()));
Ok(Handler::new(
self.config.protocol_config(),
sender.new_receiver(),
))
}

fn on_connection_handler_event(
Expand Down Expand Up @@ -3380,7 +3443,7 @@ impl fmt::Debug for PublishConfig {
#[cfg(test)]
mod local_test {
use super::*;
use crate::IdentTopic;
use crate::{types::RpcOut, IdentTopic};
use quickcheck::*;

fn test_message() -> RawMessage {
Expand Down
Loading

0 comments on commit a4a3f39

Please sign in to comment.