diff --git a/Cargo.lock b/Cargo.lock index ab4db881dfa..2f252eecba0 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2642,6 +2642,7 @@ dependencies = [ "fnv", "futures", "futures-ticker", + "futures-timer", "getrandom 0.2.11", "hex", "hex_fmt", diff --git a/protocols/gossipsub/CHANGELOG.md b/protocols/gossipsub/CHANGELOG.md index 10b709cb46f..b8c16942b86 100644 --- a/protocols/gossipsub/CHANGELOG.md +++ b/protocols/gossipsub/CHANGELOG.md @@ -1,4 +1,5 @@ ## 0.46.1 - unreleased +- Implement publish and forward message dropping. - Implement backpressure by diferentiating between priority and non priority messages. Drop `Publish` and `Forward` messages when the queue becomes full. diff --git a/protocols/gossipsub/Cargo.toml b/protocols/gossipsub/Cargo.toml index 2a0510038f1..f913b030b5c 100644 --- a/protocols/gossipsub/Cargo.toml +++ b/protocols/gossipsub/Cargo.toml @@ -22,6 +22,7 @@ either = "1.9" fnv = "1.0.7" futures = "0.3.29" futures-ticker = "0.0.3" +futures-timer = "3.0.2" getrandom = "0.2.11" hex_fmt = "0.3.0" instant = "0.1.12" diff --git a/protocols/gossipsub/src/behaviour.rs b/protocols/gossipsub/src/behaviour.rs index c822e02c2c5..8f72acb63bf 100644 --- a/protocols/gossipsub/src/behaviour.rs +++ b/protocols/gossipsub/src/behaviour.rs @@ -45,7 +45,6 @@ use libp2p_swarm::{ THandlerOutEvent, ToSwarm, }; -use crate::config::{Config, ValidationMode}; use crate::gossip_promises::GossipPromises; use crate::handler::{Handler, HandlerEvent, HandlerIn}; use crate::mcache::MessageCache; @@ -62,6 +61,10 @@ use crate::types::{ }; use crate::types::{PeerConnections, PeerKind}; use crate::{backoff::BackoffStorage, types::RpcSender}; +use crate::{ + config::{Config, ValidationMode}, + types::RpcOut, +}; use crate::{rpc_proto::proto, TopicScoreParams}; use crate::{PublishError, SubscriptionError, ValidationError}; use instant::SystemTime; @@ -732,7 +735,11 @@ where .expect("Peerid should exist"); if sender - .publish(raw_message.clone(), self.metrics.as_mut()) + .publish( + raw_message.clone(), + self.config.publish_queue_duration(), + self.metrics.as_mut(), + ) .is_err() { errors += 1; @@ -1342,7 +1349,11 @@ where .get_mut(peer_id) .expect("Peerid should exist"); - sender.forward(msg, self.metrics.as_mut()); + sender.forward( + msg, + self.config.forward_queue_duration(), + self.metrics.as_mut(), + ); } } } @@ -2660,7 +2671,11 @@ where .handler_send_queues .get_mut(peer) .expect("Peerid should exist"); - sender.forward(message.clone(), self.metrics.as_mut()); + sender.forward( + message.clone(), + self.config.forward_queue_duration(), + self.metrics.as_mut(), + ); } tracing::debug!("Completed forwarding message"); Ok(true) @@ -3115,6 +3130,21 @@ where } } } + HandlerEvent::MessageDropped(rpc) => { + // TODO: + // * Build scoring logic to handle peers that are dropping messages + if let Some(metrics) = self.metrics.as_mut() { + match rpc { + RpcOut::Publish { message, .. } => { + metrics.publish_msg_dropped(&message.topic); + } + RpcOut::Forward { message, .. } => { + metrics.forward_msg_dropped(&message.topic); + } + _ => {} + } + } + } HandlerEvent::Message { rpc, invalid_messages, @@ -3439,42 +3469,3 @@ impl fmt::Debug for PublishConfig { } } } - -#[cfg(test)] -mod local_test { - use super::*; - use crate::{types::RpcOut, IdentTopic}; - use quickcheck::*; - - fn test_message() -> RawMessage { - RawMessage { - source: Some(PeerId::random()), - data: vec![0; 100], - sequence_number: None, - topic: TopicHash::from_raw("test_topic"), - signature: None, - key: None, - validated: false, - } - } - - fn test_control() -> ControlAction { - ControlAction::IHave { - topic_hash: IdentTopic::new("TestTopic").hash(), - message_ids: vec![MessageId(vec![12u8]); 5], - } - } - - impl Arbitrary for RpcOut { - fn arbitrary(g: &mut Gen) -> Self { - match u8::arbitrary(g) % 5 { - 0 => RpcOut::Subscribe(IdentTopic::new("TestTopic").hash()), - 1 => RpcOut::Unsubscribe(IdentTopic::new("TestTopic").hash()), - 2 => RpcOut::Publish(test_message()), - 3 => RpcOut::Forward(test_message()), - 4 => RpcOut::Control(test_control()), - _ => panic!("outside range"), - } - } - } -} diff --git a/protocols/gossipsub/src/behaviour/tests.rs b/protocols/gossipsub/src/behaviour/tests.rs index e6c52526f63..8482240fe81 100644 --- a/protocols/gossipsub/src/behaviour/tests.rs +++ b/protocols/gossipsub/src/behaviour/tests.rs @@ -230,7 +230,7 @@ where } }; - let sender = RpcSender::new(peer, 100); + let sender = RpcSender::new(peer, gs.config.connection_handler_queue_len()); let receiver = sender.new_receiver(); gs.handler_send_queues.insert(peer, sender); @@ -669,7 +669,7 @@ fn test_publish_without_flood_publishing() { .into_values() .fold(vec![], |mut collected_publish, c| { while !c.priority.is_empty() { - if let Ok(RpcOut::Publish(message)) = c.priority.try_recv() { + if let Ok(RpcOut::Publish { message, .. }) = c.priority.try_recv() { collected_publish.push(message); } } @@ -753,7 +753,7 @@ fn test_fanout() { .into_values() .fold(vec![], |mut collected_publish, c| { while !c.priority.is_empty() { - if let Ok(RpcOut::Publish(message)) = c.priority.try_recv() { + if let Ok(RpcOut::Publish { message, .. }) = c.priority.try_recv() { collected_publish.push(message); } } @@ -1035,8 +1035,8 @@ fn test_handle_iwant_msg_cached() { .into_values() .fold(vec![], |mut collected_messages, c| { while !c.non_priority.is_empty() { - if let Ok(RpcOut::Forward(msg)) = c.non_priority.try_recv() { - collected_messages.push(msg) + if let Ok(RpcOut::Forward { message, .. }) = c.non_priority.try_recv() { + collected_messages.push(message) } } collected_messages @@ -1090,7 +1090,7 @@ fn test_handle_iwant_msg_cached_shifted() { let message_exists = queues.values().any(|c| { let mut out = false; while !c.non_priority.is_empty() { - if matches!(c.non_priority.try_recv(), Ok(RpcOut::Forward(message)) if + if matches!(c.non_priority.try_recv(), Ok(RpcOut::Forward{message, timeout: _ }) if gs.config.message_id( &gs.data_transform .inbound_transform(message.clone()) @@ -1558,7 +1558,7 @@ fn do_forward_messages_to_explicit_peers() { assert_eq!( queues.into_iter().fold(0, |mut fwds, (peer_id, c)| { while !c.non_priority.is_empty() { - if matches!(c.non_priority.try_recv(), Ok(RpcOut::Forward(m)) if peer_id == peers[0] && m.data == message.data) { + if matches!(c.non_priority.try_recv(), Ok(RpcOut::Forward{message: m, timeout: _}) if peer_id == peers[0] && m.data == message.data) { fwds +=1; } } @@ -2113,7 +2113,7 @@ fn test_flood_publish() { .into_values() .fold(vec![], |mut collected_publish, c| { while !c.priority.is_empty() { - if let Ok(RpcOut::Publish(message)) = c.priority.try_recv() { + if let Ok(RpcOut::Publish { message, .. }) = c.priority.try_recv() { collected_publish.push(message); } } @@ -2672,7 +2672,7 @@ fn test_iwant_msg_from_peer_below_gossip_threshold_gets_ignored() { .into_iter() .fold(vec![], |mut collected_messages, (peer_id, c)| { while !c.non_priority.is_empty() { - if let Ok(RpcOut::Forward(message)) = c.non_priority.try_recv() { + if let Ok(RpcOut::Forward { message, .. }) = c.non_priority.try_recv() { collected_messages.push((peer_id, message)); } } @@ -2817,7 +2817,7 @@ fn test_do_not_publish_to_peer_below_publish_threshold() { .into_iter() .fold(vec![], |mut collected_publish, (peer_id, c)| { while !c.priority.is_empty() { - if let Ok(RpcOut::Publish(message)) = c.priority.try_recv() { + if let Ok(RpcOut::Publish { message, .. }) = c.priority.try_recv() { collected_publish.push((peer_id, message)); } } @@ -2871,7 +2871,7 @@ fn test_do_not_flood_publish_to_peer_below_publish_threshold() { .into_iter() .fold(vec![], |mut collected_publish, (peer_id, c)| { while !c.priority.is_empty() { - if let Ok(RpcOut::Publish(message)) = c.priority.try_recv() { + if let Ok(RpcOut::Publish { message, .. }) = c.priority.try_recv() { collected_publish.push((peer_id, message)) } } @@ -4394,7 +4394,7 @@ fn test_ignore_too_many_iwants_from_same_peer_for_same_message() { assert_eq!( queues.into_values().fold(0, |mut fwds, c| { while !c.non_priority.is_empty() { - if let Ok(RpcOut::Forward(_)) = c.non_priority.try_recv() { + if let Ok(RpcOut::Forward { .. }) = c.non_priority.try_recv() { fwds += 1; } } @@ -4806,7 +4806,7 @@ fn test_publish_to_floodsub_peers_without_flood_publish() { .fold(0, |mut collected_publish, (peer_id, c)| { while !c.priority.is_empty() { if matches!(c.priority.try_recv(), - Ok(RpcOut::Publish(_)) if peer_id == &p1 || peer_id == &p2) + Ok(RpcOut::Publish{..}) if peer_id == &p1 || peer_id == &p2) { collected_publish += 1; } @@ -4861,7 +4861,7 @@ fn test_do_not_use_floodsub_in_fanout() { .fold(0, |mut collected_publish, (peer_id, c)| { while !c.priority.is_empty() { if matches!(c.priority.try_recv(), - Ok(RpcOut::Publish(_)) if peer_id == &p1 || peer_id == &p2) + Ok(RpcOut::Publish{..}) if peer_id == &p1 || peer_id == &p2) { collected_publish += 1; } diff --git a/protocols/gossipsub/src/config.rs b/protocols/gossipsub/src/config.rs index b99f8548067..3e1c1ce3807 100644 --- a/protocols/gossipsub/src/config.rs +++ b/protocols/gossipsub/src/config.rs @@ -96,6 +96,8 @@ pub struct Config { iwant_followup_time: Duration, published_message_ids_cache_time: Duration, connection_handler_queue_len: usize, + connection_handler_publish_duration: Duration, + connection_handler_forward_duration: Duration, } impl Config { @@ -352,10 +354,22 @@ impl Config { self.published_message_ids_cache_time } - /// The max number of messages a `ConnectionHandler` can buffer. + /// The max number of messages a `ConnectionHandler` can buffer. The default is 5000. pub fn connection_handler_queue_len(&self) -> usize { self.connection_handler_queue_len } + + /// The duration a message to be published can wait to be sent before it is abandoned. The + /// default is 5 seconds. + pub fn publish_queue_duration(&self) -> Duration { + self.connection_handler_publish_duration + } + + /// The duration a message to be forwarded can wait to be sent before it is abandoned. The + /// default is 500ms. + pub fn forward_queue_duration(&self) -> Duration { + self.connection_handler_forward_duration + } } impl Default for Config { @@ -423,7 +437,9 @@ impl Default for ConfigBuilder { max_ihave_messages: 10, iwant_followup_time: Duration::from_secs(3), published_message_ids_cache_time: Duration::from_secs(10), - connection_handler_queue_len: 100, + connection_handler_queue_len: 5000, + connection_handler_publish_duration: Duration::from_secs(5), + connection_handler_forward_duration: Duration::from_millis(500), }, invalid_protocol: false, } @@ -789,10 +805,23 @@ impl ConfigBuilder { self } + /// The max number of messages a `ConnectionHandler` can buffer. The default is 5000. pub fn connection_handler_queue_len(&mut self, len: usize) { self.config.connection_handler_queue_len = len; } + /// The duration a message to be published can wait to be sent before it is abandoned. The + /// default is 5 seconds. + pub fn publish_queue_duration(&mut self, duration: Duration) { + self.config.connection_handler_publish_duration = duration; + } + + /// The duration a message to be forwarded can wait to be sent before it is abandoned. The + /// default is 500ms. + pub fn forward_queue_duration(&mut self, duration: Duration) { + self.config.connection_handler_forward_duration = duration; + } + /// Constructs a [`Config`] from the given configuration and validates the settings. pub fn build(&self) -> Result { // check all constraints on config diff --git a/protocols/gossipsub/src/handler.rs b/protocols/gossipsub/src/handler.rs index dd048c7e2fd..a7564307c61 100644 --- a/protocols/gossipsub/src/handler.rs +++ b/protocols/gossipsub/src/handler.rs @@ -20,7 +20,7 @@ use crate::protocol::{GossipsubCodec, ProtocolConfig}; use crate::rpc_proto::proto; -use crate::types::{PeerKind, RawMessage, Rpc, RpcReceiver}; +use crate::types::{PeerKind, RawMessage, Rpc, RpcOut, RpcReceiver}; use crate::ValidationError; use asynchronous_codec::Framed; use futures::future::Either; @@ -54,6 +54,8 @@ pub enum HandlerEvent { /// An inbound or outbound substream has been established with the peer and this informs over /// which protocol. This message only occurs once per connection. PeerKind(PeerKind), + /// A message to be published was dropped because it could not be sent in time. + MessageDropped(RpcOut), } /// A message sent from the behaviour to the handler. @@ -239,6 +241,10 @@ impl EnabledHandler { }); } + // We may need to inform the behviour if we have a dropped a message. This gets set if that + // is the case. + let mut dropped_message = None; + // process outbound stream loop { match std::mem::replace( @@ -247,7 +253,26 @@ impl EnabledHandler { ) { // outbound idle state Some(OutboundSubstreamState::WaitingOutput(substream)) => { - if let Poll::Ready(Some(message)) = self.send_queue.poll_next_unpin(cx) { + if let Poll::Ready(Some(mut message)) = self.send_queue.poll_next_unpin(cx) { + match message { + RpcOut::Publish { + message: _, + ref mut timeout, + } + | RpcOut::Forward { + message: _, + ref mut timeout, + } => { + if Pin::new(timeout).poll(cx).is_ready() { + // Inform the behaviour and end the poll. + dropped_message = Some(HandlerEvent::MessageDropped(message)); + self.outbound_substream = + Some(OutboundSubstreamState::WaitingOutput(substream)); + break; + } + } + _ => {} // All other messages are not time-bound. + } self.outbound_substream = Some(OutboundSubstreamState::PendingSend( substream, message.into_protobuf(), @@ -317,6 +342,13 @@ impl EnabledHandler { } } + // If there was a timeout in sending a message, inform the behaviour before restarting the + // poll + if let Some(handler_event) = dropped_message { + return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(handler_event)); + } + + // Handle inbound messages loop { match std::mem::replace( &mut self.inbound_substream, diff --git a/protocols/gossipsub/src/metrics.rs b/protocols/gossipsub/src/metrics.rs index e044ca67e71..6b21b0685f8 100644 --- a/protocols/gossipsub/src/metrics.rs +++ b/protocols/gossipsub/src/metrics.rs @@ -127,6 +127,10 @@ pub(crate) struct Metrics { ignored_messages: Family, /// The number of messages rejected by the application (validation result). rejected_messages: Family, + /// The number of publish messages dropped by the sender. + publish_messages_dropped: Family, + /// The number of forward messages dropped by the sender. + forward_messages_dropped: Family, /* Metrics regarding mesh state */ /// Number of peers in our mesh. This metric should be updated with the count of peers for a @@ -222,6 +226,16 @@ impl Metrics { "Number of rejected messages received for each topic" ); + let publish_messages_dropped = register_family!( + "publish_messages_dropped_per_topic", + "Number of publish messages dropped per topic" + ); + + let forward_messages_dropped = register_family!( + "forward_messages_dropped_per_topic", + "Number of forward messages dropped per topic" + ); + let mesh_peer_counts = register_family!( "mesh_peer_counts", "Number of peers in each topic in our mesh" @@ -312,6 +326,8 @@ impl Metrics { accepted_messages, ignored_messages, rejected_messages, + publish_messages_dropped, + forward_messages_dropped, mesh_peer_counts, mesh_peer_inclusion_events, mesh_peer_churn_events, @@ -452,6 +468,20 @@ impl Metrics { } } + /// Register sending a message over a topic. + pub(crate) fn publish_msg_dropped(&mut self, topic: &TopicHash) { + if self.register_topic(topic).is_ok() { + self.publish_messages_dropped.get_or_create(topic).inc(); + } + } + + /// Register dropping a message over a topic. + pub(crate) fn forward_msg_dropped(&mut self, topic: &TopicHash) { + if self.register_topic(topic).is_ok() { + self.forward_messages_dropped.get_or_create(topic).inc(); + } + } + /// Register that a message was received (and was not a duplicate). pub(crate) fn msg_recvd(&mut self, topic: &TopicHash) { if self.register_topic(topic).is_ok() { diff --git a/protocols/gossipsub/src/types.rs b/protocols/gossipsub/src/types.rs index 23ee4a8ed1e..f6438687960 100644 --- a/protocols/gossipsub/src/types.rs +++ b/protocols/gossipsub/src/types.rs @@ -23,6 +23,8 @@ use crate::metrics::Metrics; use crate::TopicHash; use async_channel::{Receiver, Sender}; use futures::Stream; +use futures_timer::Delay; +use instant::Duration; use libp2p_identity::PeerId; use libp2p_swarm::ConnectionId; use prometheus_client::encoding::EncodeLabelValue; @@ -240,12 +242,14 @@ pub enum ControlAction { } /// A Gossipsub RPC message sent. -#[derive(Debug, Clone, PartialEq, Eq, Hash)] +#[derive(Debug)] pub enum RpcOut { - /// Publish a Gossipsub message on network. - Publish(RawMessage), - /// Forward a Gossipsub message to the network. - Forward(RawMessage), + /// Publish a Gossipsub message on network. The [`Delay`] tags the time we attempted to + /// send it. + Publish { message: RawMessage, timeout: Delay }, + /// Forward a Gossipsub message to the network. The [`Delay`] tags the time we attempted to + /// send it. + Forward { message: RawMessage, timeout: Delay }, /// Subscribe a topic. Subscribe(TopicHash), /// Unsubscribe a topic. @@ -266,12 +270,18 @@ impl From for proto::RPC { /// Converts the RPC into protobuf format. fn from(rpc: RpcOut) -> Self { match rpc { - RpcOut::Publish(message) => proto::RPC { + RpcOut::Publish { + message, + timeout: _, + } => proto::RPC { subscriptions: Vec::new(), publish: vec![message.into()], control: None, }, - RpcOut::Forward(message) => proto::RPC { + RpcOut::Forward { + message, + timeout: _, + } => proto::RPC { publish: vec![message.into()], subscriptions: Vec::new(), control: None, @@ -537,7 +547,7 @@ impl RpcSender { let (non_priority_sender, non_priority_receiver) = async_channel::bounded(cap / 2); let len = Arc::new(AtomicUsize::new(0)); let receiver = RpcReceiver { - len: len.clone(), + priority_len: len.clone(), priority: priority_receiver, non_priority: non_priority_receiver, }; @@ -585,13 +595,17 @@ impl RpcSender { pub(crate) fn publish( &mut self, message: RawMessage, + timeout: Duration, metrics: Option<&mut Metrics>, ) -> Result<(), ()> { if self.len.load(Ordering::Relaxed) >= self.cap { return Err(()); } self.priority - .try_send(RpcOut::Publish(message.clone())) + .try_send(RpcOut::Publish { + message: message.clone(), + timeout: Delay::new(timeout), + }) .expect("Channel is unbounded and should always be open"); self.len.fetch_add(1, Ordering::Relaxed); @@ -604,8 +618,16 @@ impl RpcSender { /// Send a `RpcOut::Forward` message to the `RpcReceiver` /// this is high priority. If the queue is full the message is discarded. - pub(crate) fn forward(&mut self, message: RawMessage, metrics: Option<&mut Metrics>) { - if let Err(err) = self.non_priority.try_send(RpcOut::Forward(message.clone())) { + pub(crate) fn forward( + &mut self, + message: RawMessage, + timeout: Duration, + metrics: Option<&mut Metrics>, + ) { + if let Err(err) = self.non_priority.try_send(RpcOut::Forward { + message: message.clone(), + timeout: Delay::new(timeout), + }) { let rpc = err.into_inner(); tracing::trace!( "{:?} message to peer {} dropped, queue is full", @@ -624,8 +646,11 @@ impl RpcSender { /// `RpcOut` sender that is priority aware. #[derive(Debug, Clone)] pub struct RpcReceiver { - len: Arc, + /// The maximum length of the priority queue. + priority_len: Arc, + /// The priority queue receiver. pub(crate) priority: Receiver, + /// The non priority queue receiver. pub(crate) non_priority: Receiver, } @@ -645,8 +670,8 @@ impl Stream for RpcReceiver { ) -> std::task::Poll> { // The priority queue is first polled. if let Poll::Ready(rpc) = Pin::new(&mut self.priority).poll_next(cx) { - if let Some(RpcOut::Publish(_)) = rpc { - self.len.fetch_sub(1, Ordering::Relaxed); + if let Some(RpcOut::Publish { .. }) = rpc { + self.priority_len.fetch_sub(1, Ordering::Relaxed); } return Poll::Ready(rpc); }