diff --git a/Cargo.lock b/Cargo.lock index 617d54e9b2b..ab4db881dfa 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2632,6 +2632,7 @@ dependencies = [ name = "libp2p-gossipsub" version = "0.46.1" dependencies = [ + "async-channel", "async-std", "asynchronous-codec", "base64 0.21.5", diff --git a/protocols/gossipsub/CHANGELOG.md b/protocols/gossipsub/CHANGELOG.md index 5ff4cfa27d6..10b709cb46f 100644 --- a/protocols/gossipsub/CHANGELOG.md +++ b/protocols/gossipsub/CHANGELOG.md @@ -1,7 +1,11 @@ -## 0.46.1 +## 0.46.1 - unreleased + +- Implement backpressure by diferentiating between priority and non priority messages. + Drop `Publish` and `Forward` messages when the queue becomes full. + See [PR 4914](https://github.com/libp2p/rust-libp2p/pull/4914) - Deprecate `Rpc` in preparation for removing it from the public API because it is an internal type. - See [PR 4833](https://github.com/libp2p/rust-libp2p/pull/4833). + See [PR 4833](https://github.com/libp2p/rust-libp2p/pull/4833). ## 0.46.0 diff --git a/protocols/gossipsub/Cargo.toml b/protocols/gossipsub/Cargo.toml index 37873de39f9..2a0510038f1 100644 --- a/protocols/gossipsub/Cargo.toml +++ b/protocols/gossipsub/Cargo.toml @@ -37,6 +37,7 @@ sha2 = "0.10.8" smallvec = "1.11.2" tracing = "0.1.37" void = "1.0.2" +async-channel = "1.9.0" # Metrics dependencies prometheus-client = { workspace = true } diff --git a/protocols/gossipsub/src/behaviour.rs b/protocols/gossipsub/src/behaviour.rs index 24a32de4cc7..c822e02c2c5 100644 --- a/protocols/gossipsub/src/behaviour.rs +++ b/protocols/gossipsub/src/behaviour.rs @@ -45,7 +45,6 @@ use libp2p_swarm::{ THandlerOutEvent, ToSwarm, }; -use crate::backoff::BackoffStorage; use crate::config::{Config, ValidationMode}; use crate::gossip_promises::GossipPromises; use crate::handler::{Handler, HandlerEvent, HandlerIn}; @@ -61,7 +60,8 @@ use crate::types::{ ControlAction, Message, MessageAcceptance, MessageId, PeerInfo, RawMessage, Subscription, SubscriptionAction, }; -use crate::types::{PeerConnections, PeerKind, RpcOut}; +use crate::types::{PeerConnections, PeerKind}; +use crate::{backoff::BackoffStorage, types::RpcSender}; use crate::{rpc_proto::proto, TopicScoreParams}; use crate::{PublishError, SubscriptionError, ValidationError}; use instant::SystemTime; @@ -332,6 +332,9 @@ pub struct Behaviour { /// Keep track of a set of internal metrics relating to gossipsub. metrics: Option, + + /// Connection handler message queue channels. + handler_send_queues: HashMap, } impl Behaviour @@ -471,6 +474,7 @@ where config, subscription_filter, data_transform, + handler_send_queues: Default::default(), }) } } @@ -534,10 +538,14 @@ where } // send subscription request to all peers - for peer in self.peer_topics.keys().copied().collect::>() { + for peer in self.peer_topics.keys() { tracing::debug!(%peer, "Sending SUBSCRIBE to peer"); - let event = RpcOut::Subscribe(topic_hash.clone()); - self.send_message(peer, event); + let sender = self + .handler_send_queues + .get_mut(peer) + .expect("Peerid should exist"); + + sender.subscribe(topic_hash.clone()); } // call JOIN(topic) @@ -561,10 +569,14 @@ where } // announce to all peers - for peer in self.peer_topics.keys().copied().collect::>() { + for peer in self.peer_topics.keys() { tracing::debug!(%peer, "Sending UNSUBSCRIBE to peer"); - let event = RpcOut::Unsubscribe(topic_hash.clone()); - self.send_message(peer, event); + let sender = self + .handler_send_queues + .get_mut(peer) + .expect("Peerid should exist"); + + sender.unsubscribe(topic_hash.clone()); } // call LEAVE(topic) @@ -711,9 +723,23 @@ where } // Send to peers we know are subscribed to the topic. + let mut errors = 0; for peer_id in recipient_peers.iter() { tracing::trace!(peer=%peer_id, "Sending message to peer"); - self.send_message(*peer_id, RpcOut::Publish(raw_message.clone())); + let sender = self + .handler_send_queues + .get_mut(peer_id) + .expect("Peerid should exist"); + + if sender + .publish(raw_message.clone(), self.metrics.as_mut()) + .is_err() + { + errors += 1; + } + } + if errors == recipient_peers.len() { + return Err(PublishError::InsufficientPeers); } tracing::debug!(message=%msg_id, "Published message"); @@ -1311,7 +1337,12 @@ where ); } else { tracing::debug!(peer=%peer_id, "IWANT: Sending cached messages to peer"); - self.send_message(*peer_id, RpcOut::Forward(msg)); + let sender = self + .handler_send_queues + .get_mut(peer_id) + .expect("Peerid should exist"); + + sender.forward(msg, self.metrics.as_mut()); } } } @@ -1464,12 +1495,17 @@ where if !to_prune_topics.is_empty() { // build the prune messages to send let on_unsubscribe = false; + let mut sender = self + .handler_send_queues + .get_mut(peer_id) + .expect("Peerid should exist") + .clone(); + for action in to_prune_topics .iter() .map(|t| self.make_prune(t, peer_id, do_px, on_unsubscribe)) - .collect::>() { - self.send_message(*peer_id, RpcOut::Control(action)); + sender.control(action); } // Send the prune messages to the peer tracing::debug!( @@ -1964,12 +2000,16 @@ where // If we need to send grafts to peer, do so immediately, rather than waiting for the // heartbeat. + let sender = self + .handler_send_queues + .get_mut(propagation_source) + .expect("Peerid should exist"); + for action in topics_to_graft .into_iter() .map(|topic_hash| ControlAction::Graft { topic_hash }) - .collect::>() { - self.send_message(*propagation_source, RpcOut::Control(action)) + sender.control(action); } // Notify the application of the subscriptions @@ -2504,6 +2544,13 @@ where // It therefore must be in at least one mesh and we do not need to inform the handler // of its removal from another. + // send the control messages + let mut sender = self + .handler_send_queues + .get_mut(&peer) + .expect("Peerid should exist") + .clone(); + // The following prunes are not due to unsubscribing. let prunes = to_prune .remove(&peer) @@ -2518,9 +2565,8 @@ where ) }); - // send the control messages - for msg in control_msgs.chain(prunes).collect::>() { - self.send_message(peer, RpcOut::Control(msg)); + for msg in control_msgs.chain(prunes) { + sender.control(msg); } } @@ -2534,7 +2580,13 @@ where self.config.do_px() && !no_px.contains(peer), false, ); - self.send_message(*peer, RpcOut::Control(prune)); + let mut sender = self + .handler_send_queues + .get_mut(peer) + .expect("Peerid should exist") + .clone(); + + sender.control(prune); // inform the handler peer_removed_from_mesh( @@ -2602,11 +2654,13 @@ where // forward the message to peers if !recipient_peers.is_empty() { - let event = RpcOut::Forward(message.clone()); - for peer in recipient_peers.iter() { tracing::debug!(%peer, message=%msg_id, "Sending message to peer"); - self.send_message(*peer, event.clone()); + let sender = self + .handler_send_queues + .get_mut(peer) + .expect("Peerid should exist"); + sender.forward(message.clone(), self.metrics.as_mut()); } tracing::debug!("Completed forwarding message"); Ok(true) @@ -2720,7 +2774,12 @@ where fn flush_control_pool(&mut self) { for (peer, controls) in self.control_pool.drain().collect::>() { for msg in controls { - self.send_message(peer, RpcOut::Control(msg)); + let sender = self + .handler_send_queues + .get_mut(&peer) + .expect("Peerid should exist"); + + sender.control(msg); } } @@ -2728,23 +2787,6 @@ where self.pending_iwant_msgs.clear(); } - /// Send a [`RpcOut`] message to a peer. This will wrap the message in an arc if it - /// is not already an arc. - fn send_message(&mut self, peer_id: PeerId, rpc: RpcOut) { - if let Some(m) = self.metrics.as_mut() { - if let RpcOut::Publish(ref message) | RpcOut::Forward(ref message) = rpc { - // register bytes sent on the internal metrics. - m.msg_sent(&message.topic, message.raw_protobuf_len()); - } - } - - self.events.push_back(ToSwarm::NotifyHandler { - peer_id, - event: HandlerIn::Message(rpc), - handler: NotifyHandler::Any, - }); - } - fn on_connection_established( &mut self, ConnectionEstablished { @@ -2810,8 +2852,14 @@ where tracing::debug!(peer=%peer_id, "New peer connected"); // We need to send our subscriptions to the newly-connected node. + let mut sender = self + .handler_send_queues + .get_mut(&peer_id) + .expect("Peerid should exist") + .clone(); + for topic_hash in self.mesh.clone().into_keys() { - self.send_message(peer_id, RpcOut::Subscribe(topic_hash)); + sender.subscribe(topic_hash); } } @@ -2939,6 +2987,7 @@ where } self.connected_peers.remove(&peer_id); + self.handler_send_queues.remove(&peer_id); if let Some((peer_score, ..)) = &mut self.peer_score { peer_score.remove_peer(&peer_id); @@ -2998,21 +3047,35 @@ where fn handle_established_inbound_connection( &mut self, _: ConnectionId, - _: PeerId, + peer_id: PeerId, _: &Multiaddr, _: &Multiaddr, ) -> Result, ConnectionDenied> { - Ok(Handler::new(self.config.protocol_config())) + let sender = self + .handler_send_queues + .entry(peer_id) + .or_insert_with(|| RpcSender::new(peer_id, self.config.connection_handler_queue_len())); + Ok(Handler::new( + self.config.protocol_config(), + sender.new_receiver(), + )) } fn handle_established_outbound_connection( &mut self, _: ConnectionId, - _: PeerId, + peer_id: PeerId, _: &Multiaddr, _: Endpoint, ) -> Result, ConnectionDenied> { - Ok(Handler::new(self.config.protocol_config())) + let sender = self + .handler_send_queues + .entry(peer_id) + .or_insert_with(|| RpcSender::new(peer_id, self.config.connection_handler_queue_len())); + Ok(Handler::new( + self.config.protocol_config(), + sender.new_receiver(), + )) } fn on_connection_handler_event( @@ -3380,7 +3443,7 @@ impl fmt::Debug for PublishConfig { #[cfg(test)] mod local_test { use super::*; - use crate::IdentTopic; + use crate::{types::RpcOut, IdentTopic}; use quickcheck::*; fn test_message() -> RawMessage { diff --git a/protocols/gossipsub/src/behaviour/tests.rs b/protocols/gossipsub/src/behaviour/tests.rs index 570cdf43f90..e6c52526f63 100644 --- a/protocols/gossipsub/src/behaviour/tests.rs +++ b/protocols/gossipsub/src/behaviour/tests.rs @@ -23,6 +23,7 @@ use super::*; use crate::subscription_filter::WhitelistSubscriptionFilter; use crate::transform::{DataTransform, IdentityTransform}; +use crate::types::{RpcOut, RpcReceiver}; use crate::ValidationError; use crate::{ config::Config, config::ConfigBuilder, types::Rpc, IdentTopic as Topic, TopicScoreParams, @@ -58,7 +59,14 @@ where D: DataTransform + Default + Clone + Send + 'static, F: TopicSubscriptionFilter + Clone + Default + Send + 'static, { - pub(crate) fn create_network(self) -> (Behaviour, Vec, Vec) { + pub(crate) fn create_network( + self, + ) -> ( + Behaviour, + Vec, + HashMap, + Vec, + ) { let keypair = libp2p_identity::Keypair::generate_ed25519(); // create a gossipsub struct let mut gs: Behaviour = Behaviour::new_with_subscription_filter_and_transform( @@ -86,10 +94,11 @@ where // build and connect peer_no random peers let mut peers = vec![]; + let mut receiver_queues = HashMap::new(); let empty = vec![]; for i in 0..self.peer_no { - peers.push(add_peer( + let (peer, receiver) = add_peer( &mut gs, if self.to_subscribe { &topic_hashes @@ -98,10 +107,12 @@ where }, i < self.outbound, i < self.explicit, - )); + ); + peers.push(peer); + receiver_queues.insert(peer, receiver); } - (gs, peers, topic_hashes) + (gs, peers, receiver_queues, topic_hashes) } fn peer_no(mut self, peer_no: usize) -> Self { @@ -165,7 +176,7 @@ fn add_peer( topic_hashes: &Vec, outbound: bool, explicit: bool, -) -> PeerId +) -> (PeerId, RpcReceiver) where D: DataTransform + Default + Clone + Send + 'static, F: TopicSubscriptionFilter + Clone + Default + Send + 'static, @@ -179,7 +190,7 @@ fn add_peer_with_addr( outbound: bool, explicit: bool, address: Multiaddr, -) -> PeerId +) -> (PeerId, RpcReceiver) where D: DataTransform + Default + Clone + Send + 'static, F: TopicSubscriptionFilter + Clone + Default + Send + 'static, @@ -201,7 +212,7 @@ fn add_peer_with_addr_and_kind( explicit: bool, address: Multiaddr, kind: Option, -) -> PeerId +) -> (PeerId, RpcReceiver) where D: DataTransform + Default + Clone + Send + 'static, F: TopicSubscriptionFilter + Clone + Default + Send + 'static, @@ -219,6 +230,10 @@ where } }; + let sender = RpcSender::new(peer, 100); + let receiver = sender.new_receiver(); + gs.handler_send_queues.insert(peer, sender); + gs.on_swarm_event(FromSwarm::ConnectionEstablished(ConnectionEstablished { peer_id: peer, connection_id: ConnectionId::new_unchecked(0), @@ -249,7 +264,7 @@ where &peer, ); } - peer + (peer, receiver) } fn disconnect_peer(gs: &mut Behaviour, peer_id: &PeerId) @@ -389,7 +404,7 @@ fn test_subscribe() { // - run JOIN(topic) let subscribe_topic = vec![String::from("test_subscribe")]; - let (gs, _, topic_hashes) = inject_nodes1() + let (gs, _, queues, topic_hashes) = inject_nodes1() .peer_no(20) .topics(subscribe_topic) .to_subscribe(true) @@ -401,19 +416,16 @@ fn test_subscribe() { ); // collect all the subscriptions - let subscriptions = gs - .events - .iter() - .filter(|e| { - matches!( - e, - ToSwarm::NotifyHandler { - event: HandlerIn::Message(RpcOut::Subscribe(_)), - .. + let subscriptions = queues + .into_values() + .fold(0, |mut collected_subscriptions, c| { + while !c.priority.is_empty() { + if let Ok(RpcOut::Subscribe(_)) = c.priority.try_recv() { + collected_subscriptions += 1 } - ) - }) - .count(); + } + collected_subscriptions + }); // we sent a subscribe to all known peers assert_eq!(subscriptions, 20); @@ -434,7 +446,7 @@ fn test_unsubscribe() { .collect::>(); // subscribe to topic_strings - let (mut gs, _, topic_hashes) = inject_nodes1() + let (mut gs, _, queues, topic_hashes) = inject_nodes1() .peer_no(20) .topics(topic_strings) .to_subscribe(true) @@ -462,15 +474,15 @@ fn test_unsubscribe() { ); // collect all the subscriptions - let subscriptions = gs - .events - .iter() - .fold(0, |collected_subscriptions, e| match e { - ToSwarm::NotifyHandler { - event: HandlerIn::Message(RpcOut::Subscribe(_)), - .. - } => collected_subscriptions + 1, - _ => collected_subscriptions, + let subscriptions = queues + .into_values() + .fold(0, |mut collected_subscriptions, c| { + while !c.priority.is_empty() { + if let Ok(RpcOut::Subscribe(_)) = c.priority.try_recv() { + collected_subscriptions += 1 + } + } + collected_subscriptions }); // we sent a unsubscribe to all known peers, for two topics @@ -503,7 +515,7 @@ fn test_join() { .map(|t| Topic::new(t.clone())) .collect::>(); - let (mut gs, _, topic_hashes) = inject_nodes1() + let (mut gs, _, _receivers, topic_hashes) = inject_nodes1() .peer_no(20) .topics(topic_strings) .to_subscribe(true) @@ -557,14 +569,27 @@ fn test_join() { gs.fanout .insert(topic_hashes[1].clone(), Default::default()); let mut new_peers: Vec = vec![]; + + let mut peers = vec![]; for _ in 0..3 { let random_peer = PeerId::random(); // inform the behaviour of a new peer + let address = "/ip4/127.0.0.1".parse::().unwrap(); + let peer = gs + .handle_established_inbound_connection( + ConnectionId::new_unchecked(0), + random_peer, + &address, + &address, + ) + .unwrap(); + peers.push(peer); + gs.on_swarm_event(FromSwarm::ConnectionEstablished(ConnectionEstablished { peer_id: random_peer, connection_id: ConnectionId::new_unchecked(0), endpoint: &ConnectedPoint::Dialer { - address: "/ip4/127.0.0.1".parse::().unwrap(), + address, role_override: Endpoint::Dialer, }, failed_addresses: &[], @@ -616,7 +641,7 @@ fn test_publish_without_flood_publishing() { .unwrap(); let publish_topic = String::from("test_publish"); - let (mut gs, _, topic_hashes) = inject_nodes1() + let (mut gs, _, queues, topic_hashes) = inject_nodes1() .peer_no(20) .topics(vec![publish_topic.clone()]) .to_subscribe(true) @@ -640,18 +665,15 @@ fn test_publish_without_flood_publishing() { gs.publish(Topic::new(publish_topic), publish_data).unwrap(); // Collect all publish messages - let publishes = gs - .events - .into_iter() - .fold(vec![], |mut collected_publish, e| match e { - ToSwarm::NotifyHandler { - event: HandlerIn::Message(RpcOut::Publish(message)), - .. - } => { - collected_publish.push(message); - collected_publish + let publishes = queues + .into_values() + .fold(vec![], |mut collected_publish, c| { + while !c.priority.is_empty() { + if let Ok(RpcOut::Publish(message)) = c.priority.try_recv() { + collected_publish.push(message); + } } - _ => collected_publish, + collected_publish }); // Transform the inbound message @@ -695,7 +717,7 @@ fn test_fanout() { .unwrap(); let fanout_topic = String::from("test_fanout"); - let (mut gs, _, topic_hashes) = inject_nodes1() + let (mut gs, _, queues, topic_hashes) = inject_nodes1() .peer_no(20) .topics(vec![fanout_topic.clone()]) .to_subscribe(true) @@ -727,18 +749,15 @@ fn test_fanout() { ); // Collect all publish messages - let publishes = gs - .events - .into_iter() - .fold(vec![], |mut collected_publish, e| match e { - ToSwarm::NotifyHandler { - event: HandlerIn::Message(RpcOut::Publish(message)), - .. - } => { - collected_publish.push(message); - collected_publish + let publishes = queues + .into_values() + .fold(vec![], |mut collected_publish, c| { + while !c.priority.is_empty() { + if let Ok(RpcOut::Publish(message)) = c.priority.try_recv() { + collected_publish.push(message); + } } - _ => collected_publish, + collected_publish }); // Transform the inbound message @@ -769,7 +788,7 @@ fn test_fanout() { #[test] /// Test the gossipsub NetworkBehaviour peer connection logic. fn test_inject_connected() { - let (gs, peers, topic_hashes) = inject_nodes1() + let (gs, peers, queues, topic_hashes) = inject_nodes1() .peer_no(20) .topics(vec![String::from("topic1"), String::from("topic2")]) .to_subscribe(true) @@ -777,26 +796,19 @@ fn test_inject_connected() { // check that our subscriptions are sent to each of the peers // collect all the SendEvents - let subscriptions = gs - .events - .into_iter() - .filter_map(|e| match e { - ToSwarm::NotifyHandler { - event: HandlerIn::Message(RpcOut::Subscribe(topic)), - peer_id, - .. - } => Some((peer_id, topic)), - _ => None, - }) - .fold( - HashMap::>::new(), - |mut subs, (peer, sub)| { - let mut peer_subs = subs.remove(&peer).unwrap_or_default(); - peer_subs.push(sub.into_string()); - subs.insert(peer, peer_subs); - subs - }, - ); + let subscriptions = queues.into_iter().fold( + HashMap::>::new(), + |mut collected_subscriptions, (peer, c)| { + while !c.priority.is_empty() { + if let Ok(RpcOut::Subscribe(topic)) = c.priority.try_recv() { + let mut peer_subs = collected_subscriptions.remove(&peer).unwrap_or_default(); + peer_subs.push(topic.into_string()); + collected_subscriptions.insert(peer, peer_subs); + } + } + collected_subscriptions + }, + ); // check that there are two subscriptions sent to each peer for peer_subs in subscriptions.values() { @@ -831,7 +843,7 @@ fn test_handle_received_subscriptions() { .iter() .map(|&t| String::from(t)) .collect(); - let (mut gs, peers, topic_hashes) = inject_nodes1() + let (mut gs, peers, _receivers, topic_hashes) = inject_nodes1() .peer_no(20) .topics(topics) .to_subscribe(false) @@ -991,7 +1003,7 @@ fn test_get_random_peers() { /// Tests that the correct message is sent when a peer asks for a message in our cache. #[test] fn test_handle_iwant_msg_cached() { - let (mut gs, peers, _) = inject_nodes1() + let (mut gs, peers, queues, _) = inject_nodes1() .peer_no(20) .topics(Vec::new()) .to_subscribe(true) @@ -1019,18 +1031,16 @@ fn test_handle_iwant_msg_cached() { gs.handle_iwant(&peers[7], vec![msg_id.clone()]); // the messages we are sending - let sent_messages = gs.events.into_iter().fold( - Vec::::new(), - |mut collected_messages, e| match e { - ToSwarm::NotifyHandler { event, .. } => { - if let HandlerIn::Message(RpcOut::Forward(message)) = event { - collected_messages.push(message); + let sent_messages = queues + .into_values() + .fold(vec![], |mut collected_messages, c| { + while !c.non_priority.is_empty() { + if let Ok(RpcOut::Forward(msg)) = c.non_priority.try_recv() { + collected_messages.push(msg) } - collected_messages } - _ => collected_messages, - }, - ); + collected_messages + }); assert!( sent_messages @@ -1044,7 +1054,7 @@ fn test_handle_iwant_msg_cached() { /// Tests that messages are sent correctly depending on the shifting of the message cache. #[test] fn test_handle_iwant_msg_cached_shifted() { - let (mut gs, peers, _) = inject_nodes1() + let (mut gs, peers, queues, _) = inject_nodes1() .peer_no(20) .topics(Vec::new()) .to_subscribe(true) @@ -1077,18 +1087,20 @@ fn test_handle_iwant_msg_cached_shifted() { gs.handle_iwant(&peers[7], vec![msg_id.clone()]); // is the message is being sent? - let message_exists = gs.events.iter().any(|e| match e { - ToSwarm::NotifyHandler { - event: HandlerIn::Message(RpcOut::Forward(message)), - .. - } => { - gs.config.message_id( - &gs.data_transform - .inbound_transform(message.clone()) - .unwrap(), - ) == msg_id + let message_exists = queues.values().any(|c| { + let mut out = false; + while !c.non_priority.is_empty() { + if matches!(c.non_priority.try_recv(), Ok(RpcOut::Forward(message)) if + gs.config.message_id( + &gs.data_transform + .inbound_transform(message.clone()) + .unwrap(), + ) == msg_id) + { + out = true; + } } - _ => false, + out }); // default history_length is 5, expect no messages after shift > 5 if shift < 5 { @@ -1108,7 +1120,7 @@ fn test_handle_iwant_msg_cached_shifted() { #[test] // tests that an event is not created when a peers asks for a message not in our cache fn test_handle_iwant_msg_not_cached() { - let (mut gs, peers, _) = inject_nodes1() + let (mut gs, peers, _, _) = inject_nodes1() .peer_no(20) .topics(Vec::new()) .to_subscribe(true) @@ -1127,7 +1139,7 @@ fn test_handle_iwant_msg_not_cached() { #[test] // tests that an event is created when a peer shares that it has a message we want fn test_handle_ihave_subscribed_and_msg_not_cached() { - let (mut gs, peers, topic_hashes) = inject_nodes1() + let (mut gs, peers, _, topic_hashes) = inject_nodes1() .peer_no(20) .topics(vec![String::from("topic1")]) .to_subscribe(true) @@ -1159,7 +1171,7 @@ fn test_handle_ihave_subscribed_and_msg_not_cached() { // tests that an event is not created when a peer shares that it has a message that // we already have fn test_handle_ihave_subscribed_and_msg_cached() { - let (mut gs, peers, topic_hashes) = inject_nodes1() + let (mut gs, peers, _, topic_hashes) = inject_nodes1() .peer_no(20) .topics(vec![String::from("topic1")]) .to_subscribe(true) @@ -1181,7 +1193,7 @@ fn test_handle_ihave_subscribed_and_msg_cached() { // test that an event is not created when a peer shares that it has a message in // a topic that we are not subscribed to fn test_handle_ihave_not_subscribed() { - let (mut gs, peers, _) = inject_nodes1() + let (mut gs, peers, _, _) = inject_nodes1() .peer_no(20) .topics(vec![]) .to_subscribe(true) @@ -1207,7 +1219,7 @@ fn test_handle_ihave_not_subscribed() { // tests that a peer is added to our mesh when we are both subscribed // to the same topic fn test_handle_graft_is_subscribed() { - let (mut gs, peers, topic_hashes) = inject_nodes1() + let (mut gs, peers, _, topic_hashes) = inject_nodes1() .peer_no(20) .topics(vec![String::from("topic1")]) .to_subscribe(true) @@ -1225,7 +1237,7 @@ fn test_handle_graft_is_subscribed() { // tests that a peer is not added to our mesh when they are subscribed to // a topic that we are not fn test_handle_graft_is_not_subscribed() { - let (mut gs, peers, topic_hashes) = inject_nodes1() + let (mut gs, peers, _, topic_hashes) = inject_nodes1() .peer_no(20) .topics(vec![String::from("topic1")]) .to_subscribe(true) @@ -1250,7 +1262,7 @@ fn test_handle_graft_multiple_topics() { .map(|&t| String::from(t)) .collect(); - let (mut gs, peers, topic_hashes) = inject_nodes1() + let (mut gs, peers, _, topic_hashes) = inject_nodes1() .peer_no(20) .topics(topics) .to_subscribe(true) @@ -1280,7 +1292,7 @@ fn test_handle_graft_multiple_topics() { #[test] // tests that a peer is removed from our mesh fn test_handle_prune_peer_in_mesh() { - let (mut gs, peers, topic_hashes) = inject_nodes1() + let (mut gs, peers, _, topic_hashes) = inject_nodes1() .peer_no(20) .topics(vec![String::from("topic1")]) .to_subscribe(true) @@ -1309,34 +1321,50 @@ fn test_handle_prune_peer_in_mesh() { fn count_control_msgs( gs: &Behaviour, + queues: &HashMap, mut filter: impl FnMut(&PeerId, &ControlAction) -> bool, ) -> usize { gs.control_pool .iter() .map(|(peer_id, actions)| actions.iter().filter(|m| filter(peer_id, m)).count()) .sum::() - + gs.events + + queues .iter() - .filter(|e| match e { - ToSwarm::NotifyHandler { - peer_id, - event: HandlerIn::Message(RpcOut::Control(action)), - .. - } => filter(peer_id, action), - _ => false, + .fold(0, |mut collected_messages, (peer_id, c)| { + while !c.priority.is_empty() || !c.non_priority.is_empty() { + if let Ok(RpcOut::Control(action)) = c.priority.try_recv() { + if filter(peer_id, &action) { + collected_messages += 1; + } + } + if let Ok(RpcOut::Control(action)) = c.non_priority.try_recv() { + if filter(peer_id, &action) { + collected_messages += 1; + } + } + } + collected_messages }) - .count() } -fn flush_events(gs: &mut Behaviour) { +fn flush_events( + gs: &mut Behaviour, + receiver_queues: &mut HashMap, +) { gs.control_pool.clear(); gs.events.clear(); + for c in receiver_queues.values_mut() { + while !c.priority.is_empty() || !c.non_priority.is_empty() { + let _ = c.priority.try_recv(); + let _ = c.non_priority.try_recv(); + } + } } #[test] // tests that a peer added as explicit peer gets connected to fn test_explicit_peer_gets_connected() { - let (mut gs, _, _) = inject_nodes1() + let (mut gs, _, _, _) = inject_nodes1() .peer_no(0) .topics(Vec::new()) .to_subscribe(true) @@ -1369,7 +1397,7 @@ fn test_explicit_peer_reconnects() { .check_explicit_peers_ticks(2) .build() .unwrap(); - let (mut gs, others, _) = inject_nodes1() + let (mut gs, others, mut queues, _) = inject_nodes1() .peer_no(1) .topics(Vec::new()) .to_subscribe(true) @@ -1381,7 +1409,7 @@ fn test_explicit_peer_reconnects() { //add peer as explicit peer gs.add_explicit_peer(peer); - flush_events(&mut gs); + flush_events(&mut gs, &mut queues); //disconnect peer disconnect_peer(&mut gs, peer); @@ -1419,7 +1447,7 @@ fn test_explicit_peer_reconnects() { #[test] fn test_handle_graft_explicit_peer() { - let (mut gs, peers, topic_hashes) = inject_nodes1() + let (mut gs, peers, queues, topic_hashes) = inject_nodes1() .peer_no(1) .topics(vec![String::from("topic1"), String::from("topic2")]) .to_subscribe(true) @@ -1437,7 +1465,7 @@ fn test_handle_graft_explicit_peer() { //check prunes assert!( - count_control_msgs(&gs, |peer_id, m| peer_id == peer + count_control_msgs(&gs, &queues, |peer_id, m| peer_id == peer && match m { ControlAction::Prune { topic_hash, .. } => topic_hash == &topic_hashes[0] || topic_hash == &topic_hashes[1], @@ -1450,7 +1478,7 @@ fn test_handle_graft_explicit_peer() { #[test] fn explicit_peers_not_added_to_mesh_on_receiving_subscription() { - let (gs, peers, topic_hashes) = inject_nodes1() + let (gs, peers, queues, topic_hashes) = inject_nodes1() .peer_no(2) .topics(vec![String::from("topic1")]) .to_subscribe(true) @@ -1466,7 +1494,7 @@ fn explicit_peers_not_added_to_mesh_on_receiving_subscription() { //assert that graft gets created to non-explicit peer assert!( - count_control_msgs(&gs, |peer_id, m| peer_id == &peers[1] + count_control_msgs(&gs, &queues, |peer_id, m| peer_id == &peers[1] && matches!(m, ControlAction::Graft { .. })) >= 1, "No graft message got created to non-explicit peer" @@ -1474,7 +1502,7 @@ fn explicit_peers_not_added_to_mesh_on_receiving_subscription() { //assert that no graft gets created to explicit peer assert_eq!( - count_control_msgs(&gs, |peer_id, m| peer_id == &peers[0] + count_control_msgs(&gs, &queues, |peer_id, m| peer_id == &peers[0] && matches!(m, ControlAction::Graft { .. })), 0, "A graft message got created to an explicit peer" @@ -1483,7 +1511,7 @@ fn explicit_peers_not_added_to_mesh_on_receiving_subscription() { #[test] fn do_not_graft_explicit_peer() { - let (mut gs, others, topic_hashes) = inject_nodes1() + let (mut gs, others, queues, topic_hashes) = inject_nodes1() .peer_no(1) .topics(vec![String::from("topic")]) .to_subscribe(true) @@ -1498,7 +1526,7 @@ fn do_not_graft_explicit_peer() { //assert that no graft gets created to explicit peer assert_eq!( - count_control_msgs(&gs, |peer_id, m| peer_id == &others[0] + count_control_msgs(&gs, &queues, |peer_id, m| peer_id == &others[0] && matches!(m, ControlAction::Graft { .. })), 0, "A graft message got created to an explicit peer" @@ -1507,7 +1535,7 @@ fn do_not_graft_explicit_peer() { #[test] fn do_forward_messages_to_explicit_peers() { - let (mut gs, peers, topic_hashes) = inject_nodes1() + let (mut gs, peers, queues, topic_hashes) = inject_nodes1() .peer_no(2) .topics(vec![String::from("topic1"), String::from("topic2")]) .to_subscribe(true) @@ -1527,21 +1555,15 @@ fn do_forward_messages_to_explicit_peers() { validated: true, }; gs.handle_received_message(message.clone(), &local_id); - assert_eq!( - gs.events - .iter() - .filter(|e| match e { - ToSwarm::NotifyHandler { - peer_id, - event: HandlerIn::Message(RpcOut::Forward(m)), - .. - } => { - peer_id == &peers[0] && m.data == message.data + queues.into_iter().fold(0, |mut fwds, (peer_id, c)| { + while !c.non_priority.is_empty() { + if matches!(c.non_priority.try_recv(), Ok(RpcOut::Forward(m)) if peer_id == peers[0] && m.data == message.data) { + fwds +=1; + } } - _ => false, - }) - .count(), + fwds + }), 1, "The message did not get forwarded to the explicit peer" ); @@ -1549,7 +1571,7 @@ fn do_forward_messages_to_explicit_peers() { #[test] fn explicit_peers_not_added_to_mesh_on_subscribe() { - let (mut gs, peers, _) = inject_nodes1() + let (mut gs, peers, queues, _) = inject_nodes1() .peer_no(2) .topics(Vec::new()) .to_subscribe(true) @@ -1578,7 +1600,7 @@ fn explicit_peers_not_added_to_mesh_on_subscribe() { //assert that graft gets created to non-explicit peer assert!( - count_control_msgs(&gs, |peer_id, m| peer_id == &peers[1] + count_control_msgs(&gs, &queues, |peer_id, m| peer_id == &peers[1] && matches!(m, ControlAction::Graft { .. })) > 0, "No graft message got created to non-explicit peer" @@ -1586,7 +1608,7 @@ fn explicit_peers_not_added_to_mesh_on_subscribe() { //assert that no graft gets created to explicit peer assert_eq!( - count_control_msgs(&gs, |peer_id, m| peer_id == &peers[0] + count_control_msgs(&gs, &queues, |peer_id, m| peer_id == &peers[0] && matches!(m, ControlAction::Graft { .. })), 0, "A graft message got created to an explicit peer" @@ -1595,7 +1617,7 @@ fn explicit_peers_not_added_to_mesh_on_subscribe() { #[test] fn explicit_peers_not_added_to_mesh_from_fanout_on_subscribe() { - let (mut gs, peers, _) = inject_nodes1() + let (mut gs, peers, queues, _) = inject_nodes1() .peer_no(2) .topics(Vec::new()) .to_subscribe(true) @@ -1627,7 +1649,7 @@ fn explicit_peers_not_added_to_mesh_from_fanout_on_subscribe() { //assert that graft gets created to non-explicit peer assert!( - count_control_msgs(&gs, |peer_id, m| peer_id == &peers[1] + count_control_msgs(&gs, &queues, |peer_id, m| peer_id == &peers[1] && matches!(m, ControlAction::Graft { .. })) >= 1, "No graft message got created to non-explicit peer" @@ -1635,7 +1657,7 @@ fn explicit_peers_not_added_to_mesh_from_fanout_on_subscribe() { //assert that no graft gets created to explicit peer assert_eq!( - count_control_msgs(&gs, |peer_id, m| peer_id == &peers[0] + count_control_msgs(&gs, &queues, |peer_id, m| peer_id == &peers[0] && matches!(m, ControlAction::Graft { .. })), 0, "A graft message got created to an explicit peer" @@ -1644,7 +1666,7 @@ fn explicit_peers_not_added_to_mesh_from_fanout_on_subscribe() { #[test] fn no_gossip_gets_sent_to_explicit_peers() { - let (mut gs, peers, topic_hashes) = inject_nodes1() + let (mut gs, peers, _, topic_hashes) = inject_nodes1() .peer_no(2) .topics(vec![String::from("topic1"), String::from("topic2")]) .to_subscribe(true) @@ -1691,7 +1713,7 @@ fn test_mesh_addition() { let config: Config = Config::default(); // Adds mesh_low peers and PRUNE 2 giving us a deficit. - let (mut gs, peers, topics) = inject_nodes1() + let (mut gs, peers, _queues, topics) = inject_nodes1() .peer_no(config.mesh_n() + 1) .topics(vec!["test".into()]) .to_subscribe(true) @@ -1727,7 +1749,7 @@ fn test_mesh_subtraction() { // Adds mesh_low peers and PRUNE 2 giving us a deficit. let n = config.mesh_n_high() + 10; //make all outbound connections so that we allow grafting to all - let (mut gs, peers, topics) = inject_nodes1() + let (mut gs, peers, _receivers, topics) = inject_nodes1() .peer_no(n) .topics(vec!["test".into()]) .to_subscribe(true) @@ -1751,7 +1773,7 @@ fn test_mesh_subtraction() { fn test_connect_to_px_peers_on_handle_prune() { let config: Config = Config::default(); - let (mut gs, peers, topics) = inject_nodes1() + let (mut gs, peers, _, topics) = inject_nodes1() .peer_no(1) .topics(vec!["test".into()]) .to_subscribe(true) @@ -1807,7 +1829,7 @@ fn test_send_px_and_backoff_in_prune() { let config: Config = Config::default(); //build mesh with enough peers for px - let (mut gs, peers, topics) = inject_nodes1() + let (mut gs, peers, queues, topics) = inject_nodes1() .peer_no(config.prune_peers() + 1) .topics(vec!["test".into()]) .to_subscribe(true) @@ -1824,7 +1846,7 @@ fn test_send_px_and_backoff_in_prune() { //check prune message assert_eq!( - count_control_msgs(&gs, |peer_id, m| peer_id == &peers[0] + count_control_msgs(&gs, &queues, |peer_id, m| peer_id == &peers[0] && match m { ControlAction::Prune { topic_hash, @@ -1848,7 +1870,7 @@ fn test_prune_backoffed_peer_on_graft() { let config: Config = Config::default(); //build mesh with enough peers for px - let (mut gs, peers, topics) = inject_nodes1() + let (mut gs, peers, mut queues, topics) = inject_nodes1() .peer_no(config.prune_peers() + 1) .topics(vec!["test".into()]) .to_subscribe(true) @@ -1865,14 +1887,14 @@ fn test_prune_backoffed_peer_on_graft() { ); //ignore all messages until now - gs.events.clear(); + flush_events(&mut gs, &mut queues); //handle graft gs.handle_graft(&peers[0], vec![topics[0].clone()]); //check prune message assert_eq!( - count_control_msgs(&gs, |peer_id, m| peer_id == &peers[0] + count_control_msgs(&gs, &queues, |peer_id, m| peer_id == &peers[0] && match m { ControlAction::Prune { topic_hash, @@ -1897,7 +1919,7 @@ fn test_do_not_graft_within_backoff_period() { .build() .unwrap(); //only one peer => mesh too small and will try to regraft as early as possible - let (mut gs, peers, topics) = inject_nodes1() + let (mut gs, peers, mut queues, topics) = inject_nodes1() .peer_no(1) .topics(vec!["test".into()]) .to_subscribe(true) @@ -1908,7 +1930,7 @@ fn test_do_not_graft_within_backoff_period() { gs.handle_prune(&peers[0], vec![(topics[0].clone(), Vec::new(), Some(1))]); //forget all events until now - flush_events(&mut gs); + flush_events(&mut gs, &mut queues); //call heartbeat gs.heartbeat(); @@ -1922,7 +1944,10 @@ fn test_do_not_graft_within_backoff_period() { //Check that no graft got created (we have backoff_slack = 1 therefore one more heartbeat // is needed). assert_eq!( - count_control_msgs(&gs, |_, m| matches!(m, ControlAction::Graft { .. })), + count_control_msgs(&gs, &queues, |_, m| matches!( + m, + ControlAction::Graft { .. } + )), 0, "Graft message created too early within backoff period" ); @@ -1933,7 +1958,10 @@ fn test_do_not_graft_within_backoff_period() { //check that graft got created assert!( - count_control_msgs(&gs, |_, m| matches!(m, ControlAction::Graft { .. })) > 0, + count_control_msgs(&gs, &queues, |_, m| matches!( + m, + ControlAction::Graft { .. } + )) > 0, "No graft message was created after backoff period" ); } @@ -1948,7 +1976,7 @@ fn test_do_not_graft_within_default_backoff_period_after_receiving_prune_without .build() .unwrap(); //only one peer => mesh too small and will try to regraft as early as possible - let (mut gs, peers, topics) = inject_nodes1() + let (mut gs, peers, mut queues, topics) = inject_nodes1() .peer_no(1) .topics(vec!["test".into()]) .to_subscribe(true) @@ -1959,7 +1987,7 @@ fn test_do_not_graft_within_default_backoff_period_after_receiving_prune_without gs.handle_prune(&peers[0], vec![(topics[0].clone(), Vec::new(), None)]); //forget all events until now - flush_events(&mut gs); + flush_events(&mut gs, &mut queues); //call heartbeat gs.heartbeat(); @@ -1971,7 +1999,10 @@ fn test_do_not_graft_within_default_backoff_period_after_receiving_prune_without //Check that no graft got created (we have backoff_slack = 1 therefore one more heartbeat // is needed). assert_eq!( - count_control_msgs(&gs, |_, m| matches!(m, ControlAction::Graft { .. })), + count_control_msgs(&gs, &queues, |_, m| matches!( + m, + ControlAction::Graft { .. } + )), 0, "Graft message created too early within backoff period" ); @@ -1982,7 +2013,10 @@ fn test_do_not_graft_within_default_backoff_period_after_receiving_prune_without //check that graft got created assert!( - count_control_msgs(&gs, |_, m| matches!(m, ControlAction::Graft { .. })) > 0, + count_control_msgs(&gs, &queues, |_, m| matches!( + m, + ControlAction::Graft { .. } + )) > 0, "No graft message was created after backoff period" ); } @@ -2001,7 +2035,7 @@ fn test_unsubscribe_backoff() { let topic = String::from("test"); // only one peer => mesh too small and will try to regraft as early as possible - let (mut gs, _, topics) = inject_nodes1() + let (mut gs, _, mut queues, topics) = inject_nodes1() .peer_no(1) .topics(vec![topic.clone()]) .to_subscribe(true) @@ -2011,7 +2045,7 @@ fn test_unsubscribe_backoff() { let _ = gs.unsubscribe(&Topic::new(topic)); assert_eq!( - count_control_msgs(&gs, |_, m| match m { + count_control_msgs(&gs, &queues, |_, m| match m { ControlAction::Prune { backoff, .. } => backoff == &Some(1), _ => false, }), @@ -2022,7 +2056,7 @@ fn test_unsubscribe_backoff() { let _ = gs.subscribe(&Topic::new(topics[0].to_string())); // forget all events until now - flush_events(&mut gs); + flush_events(&mut gs, &mut queues); // call heartbeat gs.heartbeat(); @@ -2036,7 +2070,10 @@ fn test_unsubscribe_backoff() { // Check that no graft got created (we have backoff_slack = 1 therefore one more heartbeat // is needed). assert_eq!( - count_control_msgs(&gs, |_, m| matches!(m, ControlAction::Graft { .. })), + count_control_msgs(&gs, &queues, |_, m| matches!( + m, + ControlAction::Graft { .. } + )), 0, "Graft message created too early within backoff period" ); @@ -2047,7 +2084,10 @@ fn test_unsubscribe_backoff() { // check that graft got created assert!( - count_control_msgs(&gs, |_, m| matches!(m, ControlAction::Graft { .. })) > 0, + count_control_msgs(&gs, &queues, |_, m| matches!( + m, + ControlAction::Graft { .. } + )) > 0, "No graft message was created after backoff period" ); } @@ -2058,7 +2098,7 @@ fn test_flood_publish() { let topic = "test"; // Adds more peers than mesh can hold to test flood publishing - let (mut gs, _, _) = inject_nodes1() + let (mut gs, _, queues, _) = inject_nodes1() .peer_no(config.mesh_n_high() + 10) .topics(vec![topic.into()]) .to_subscribe(true) @@ -2069,17 +2109,15 @@ fn test_flood_publish() { gs.publish(Topic::new(topic), publish_data).unwrap(); // Collect all publish messages - let publishes = gs - .events - .into_iter() - .fold(vec![], |mut collected_publish, e| match e { - ToSwarm::NotifyHandler { event, .. } => { - if let HandlerIn::Message(RpcOut::Publish(message)) = event { + let publishes = queues + .into_values() + .fold(vec![], |mut collected_publish, c| { + while !c.priority.is_empty() { + if let Ok(RpcOut::Publish(message)) = c.priority.try_recv() { collected_publish.push(message); } - collected_publish } - _ => collected_publish, + collected_publish }); // Transform the inbound message @@ -2114,7 +2152,7 @@ fn test_gossip_to_at_least_gossip_lazy_peers() { //add more peers than in mesh to test gossipping //by default only mesh_n_low peers will get added to mesh - let (mut gs, _, topic_hashes) = inject_nodes1() + let (mut gs, _, queues, topic_hashes) = inject_nodes1() .peer_no(config.mesh_n_low() + config.gossip_lazy() + 1) .topics(vec!["topic".into()]) .to_subscribe(true) @@ -2142,7 +2180,7 @@ fn test_gossip_to_at_least_gossip_lazy_peers() { //check that exactly config.gossip_lazy() many gossip messages were sent. assert_eq!( - count_control_msgs(&gs, |_, action| match action { + count_control_msgs(&gs, &queues, |_, action| match action { ControlAction::IHave { topic_hash, message_ids, @@ -2159,7 +2197,7 @@ fn test_gossip_to_at_most_gossip_factor_peers() { //add a lot of peers let m = config.mesh_n_low() + config.gossip_lazy() * (2.0 / config.gossip_factor()) as usize; - let (mut gs, _, topic_hashes) = inject_nodes1() + let (mut gs, _, queues, topic_hashes) = inject_nodes1() .peer_no(m) .topics(vec!["topic".into()]) .to_subscribe(true) @@ -2186,7 +2224,7 @@ fn test_gossip_to_at_most_gossip_factor_peers() { let msg_id = gs.config.message_id(message); //check that exactly config.gossip_lazy() many gossip messages were sent. assert_eq!( - count_control_msgs(&gs, |_, action| match action { + count_control_msgs(&gs, &queues, |_, action| match action { ControlAction::IHave { topic_hash, message_ids, @@ -2202,7 +2240,7 @@ fn test_accept_only_outbound_peer_grafts_when_mesh_full() { let config: Config = Config::default(); //enough peers to fill the mesh - let (mut gs, peers, topics) = inject_nodes1() + let (mut gs, peers, _, topics) = inject_nodes1() .peer_no(config.mesh_n_high()) .topics(vec!["test".into()]) .to_subscribe(true) @@ -2217,8 +2255,8 @@ fn test_accept_only_outbound_peer_grafts_when_mesh_full() { assert_eq!(gs.mesh[&topics[0]].len(), config.mesh_n_high()); //create an outbound and an inbound peer - let inbound = add_peer(&mut gs, &topics, false, false); - let outbound = add_peer(&mut gs, &topics, true, false); + let (inbound, _in_reciver) = add_peer(&mut gs, &topics, false, false); + let (outbound, _out_receiver) = add_peer(&mut gs, &topics, true, false); //send grafts gs.handle_graft(&inbound, vec![topics[0].clone()]); @@ -2248,7 +2286,7 @@ fn test_do_not_remove_too_many_outbound_peers() { .unwrap(); //fill the mesh with inbound connections - let (mut gs, peers, topics) = inject_nodes1() + let (mut gs, peers, _receivers, topics) = inject_nodes1() .peer_no(n) .topics(vec!["test".into()]) .to_subscribe(true) @@ -2263,7 +2301,7 @@ fn test_do_not_remove_too_many_outbound_peers() { //create m outbound connections and graft (we will accept the graft) let mut outbound = HashSet::new(); for _ in 0..m { - let peer = add_peer(&mut gs, &topics, true, false); + let (peer, _) = add_peer(&mut gs, &topics, true, false); outbound.insert(peer); gs.handle_graft(&peer, topics.clone()); } @@ -2286,7 +2324,7 @@ fn test_add_outbound_peers_if_min_is_not_satisfied() { let config: Config = Config::default(); // Fill full mesh with inbound peers - let (mut gs, peers, topics) = inject_nodes1() + let (mut gs, peers, _, topics) = inject_nodes1() .peer_no(config.mesh_n_high()) .topics(vec!["test".into()]) .to_subscribe(true) @@ -2298,8 +2336,9 @@ fn test_add_outbound_peers_if_min_is_not_satisfied() { } //create config.mesh_outbound_min() many outbound connections without grafting + let mut peers = vec![]; for _ in 0..config.mesh_outbound_min() { - add_peer(&mut gs, &topics, true, false); + peers.push(add_peer(&mut gs, &topics, true, false)); } // Nothing changed in the mesh yet @@ -2320,7 +2359,7 @@ fn test_prune_negative_scored_peers() { let config = Config::default(); //build mesh with one peer - let (mut gs, peers, topics) = inject_nodes1() + let (mut gs, peers, queues, topics) = inject_nodes1() .peer_no(1) .topics(vec!["test".into()]) .to_subscribe(true) @@ -2344,7 +2383,7 @@ fn test_prune_negative_scored_peers() { //check prune message assert_eq!( - count_control_msgs(&gs, |peer_id, m| peer_id == &peers[0] + count_control_msgs(&gs, &queues, |peer_id, m| peer_id == &peers[0] && match m { ControlAction::Prune { topic_hash, @@ -2365,7 +2404,7 @@ fn test_prune_negative_scored_peers() { fn test_dont_graft_to_negative_scored_peers() { let config = Config::default(); //init full mesh - let (mut gs, peers, topics) = inject_nodes1() + let (mut gs, peers, _, topics) = inject_nodes1() .peer_no(config.mesh_n_high()) .topics(vec!["test".into()]) .to_subscribe(true) @@ -2377,8 +2416,8 @@ fn test_dont_graft_to_negative_scored_peers() { .create_network(); //add two additional peers that will not be part of the mesh - let p1 = add_peer(&mut gs, &topics, false, false); - let p2 = add_peer(&mut gs, &topics, false, false); + let (p1, _receiver1) = add_peer(&mut gs, &topics, false, false); + let (p2, _receiver2) = add_peer(&mut gs, &topics, false, false); //reduce score of p1 to negative gs.peer_score.as_mut().unwrap().0.add_penalty(&p1, 1); @@ -2404,7 +2443,7 @@ fn test_ignore_px_from_negative_scored_peer() { let config = Config::default(); //build mesh with one peer - let (mut gs, peers, topics) = inject_nodes1() + let (mut gs, peers, _, topics) = inject_nodes1() .peer_no(1) .topics(vec!["test".into()]) .to_subscribe(true) @@ -2451,7 +2490,7 @@ fn test_only_send_nonnegative_scoring_peers_in_px() { .unwrap(); // Build mesh with three peer - let (mut gs, peers, topics) = inject_nodes1() + let (mut gs, peers, queues, topics) = inject_nodes1() .peer_no(3) .topics(vec!["test".into()]) .to_subscribe(true) @@ -2478,7 +2517,7 @@ fn test_only_send_nonnegative_scoring_peers_in_px() { // Check that px in prune message only contains third peer assert_eq!( - count_control_msgs(&gs, |peer_id, m| peer_id == &peers[1] + count_control_msgs(&gs, &queues, |peer_id, m| peer_id == &peers[1] && match m { ControlAction::Prune { topic_hash, @@ -2504,7 +2543,7 @@ fn test_do_not_gossip_to_peers_below_gossip_threshold() { }; // Build full mesh - let (mut gs, peers, topics) = inject_nodes1() + let (mut gs, peers, queues, topics) = inject_nodes1() .peer_no(config.mesh_n_high()) .topics(vec!["test".into()]) .to_subscribe(true) @@ -2518,8 +2557,8 @@ fn test_do_not_gossip_to_peers_below_gossip_threshold() { } // Add two additional peers that will not be part of the mesh - let p1 = add_peer(&mut gs, &topics, false, false); - let p2 = add_peer(&mut gs, &topics, false, false); + let (p1, _receiver1) = add_peer(&mut gs, &topics, false, false); + let (p2, _receiver2) = add_peer(&mut gs, &topics, false, false); // Reduce score of p1 below peer_score_thresholds.gossip_threshold // note that penalties get squared so two penalties means a score of @@ -2551,7 +2590,7 @@ fn test_do_not_gossip_to_peers_below_gossip_threshold() { // Check that exactly one gossip messages got sent and it got sent to p2 assert_eq!( - count_control_msgs(&gs, |peer, action| match action { + count_control_msgs(&gs, &queues, |peer, action| match action { ControlAction::IHave { topic_hash, message_ids, @@ -2579,7 +2618,7 @@ fn test_iwant_msg_from_peer_below_gossip_threshold_gets_ignored() { }; // Build full mesh - let (mut gs, peers, topics) = inject_nodes1() + let (mut gs, peers, mut queues, topics) = inject_nodes1() .peer_no(config.mesh_n_high()) .topics(vec!["test".into()]) .to_subscribe(true) @@ -2595,8 +2634,10 @@ fn test_iwant_msg_from_peer_below_gossip_threshold_gets_ignored() { } // Add two additional peers that will not be part of the mesh - let p1 = add_peer(&mut gs, &topics, false, false); - let p2 = add_peer(&mut gs, &topics, false, false); + let (p1, receiver1) = add_peer(&mut gs, &topics, false, false); + queues.insert(p1, receiver1); + let (p2, receiver2) = add_peer(&mut gs, &topics, false, false); + queues.insert(p2, receiver2); // Reduce score of p1 below peer_score_thresholds.gossip_threshold // note that penalties get squared so two penalties means a score of @@ -2627,17 +2668,15 @@ fn test_iwant_msg_from_peer_below_gossip_threshold_gets_ignored() { gs.handle_iwant(&p2, vec![msg_id.clone()]); // the messages we are sending - let sent_messages = gs - .events + let sent_messages = queues .into_iter() - .fold(vec![], |mut collected_messages, e| match e { - ToSwarm::NotifyHandler { event, peer_id, .. } => { - if let HandlerIn::Message(RpcOut::Forward(message)) = event { + .fold(vec![], |mut collected_messages, (peer_id, c)| { + while !c.non_priority.is_empty() { + if let Ok(RpcOut::Forward(message)) = c.non_priority.try_recv() { collected_messages.push((peer_id, message)); } - collected_messages } - _ => collected_messages, + collected_messages }); //the message got sent to p2 @@ -2667,7 +2706,7 @@ fn test_ihave_msg_from_peer_below_gossip_threshold_gets_ignored() { ..PeerScoreThresholds::default() }; //build full mesh - let (mut gs, peers, topics) = inject_nodes1() + let (mut gs, peers, queues, topics) = inject_nodes1() .peer_no(config.mesh_n_high()) .topics(vec!["test".into()]) .to_subscribe(true) @@ -2683,8 +2722,8 @@ fn test_ihave_msg_from_peer_below_gossip_threshold_gets_ignored() { } //add two additional peers that will not be part of the mesh - let p1 = add_peer(&mut gs, &topics, false, false); - let p2 = add_peer(&mut gs, &topics, false, false); + let (p1, _receiver1) = add_peer(&mut gs, &topics, false, false); + let (p2, _receiver2) = add_peer(&mut gs, &topics, false, false); //reduce score of p1 below peer_score_thresholds.gossip_threshold //note that penalties get squared so two penalties means a score of @@ -2715,7 +2754,7 @@ fn test_ihave_msg_from_peer_below_gossip_threshold_gets_ignored() { // check that we sent exactly one IWANT request to p2 assert_eq!( - count_control_msgs(&gs, |peer, c| match c { + count_control_msgs(&gs, &queues, |peer, c| match c { ControlAction::IWant { message_ids } => if message_ids.iter().any(|m| m == &msg_id) { assert_eq!(peer, &p2); @@ -2743,7 +2782,7 @@ fn test_do_not_publish_to_peer_below_publish_threshold() { }; //build mesh with no peers and no subscribed topics - let (mut gs, _, _) = inject_nodes1() + let (mut gs, _, mut queues, _) = inject_nodes1() .gs_config(config) .scoring(Some((peer_score_params, peer_score_thresholds))) .create_network(); @@ -2753,8 +2792,10 @@ fn test_do_not_publish_to_peer_below_publish_threshold() { let topics = vec![topic.hash()]; //add two additional peers that will be added to the mesh - let p1 = add_peer(&mut gs, &topics, false, false); - let p2 = add_peer(&mut gs, &topics, false, false); + let (p1, receiver1) = add_peer(&mut gs, &topics, false, false); + queues.insert(p1, receiver1); + let (p2, receiver2) = add_peer(&mut gs, &topics, false, false); + queues.insert(p2, receiver2); //reduce score of p1 below peer_score_thresholds.publish_threshold //note that penalties get squared so two penalties means a score of @@ -2772,17 +2813,15 @@ fn test_do_not_publish_to_peer_below_publish_threshold() { gs.publish(topic, publish_data).unwrap(); // Collect all publish messages - let publishes = gs - .events + let publishes = queues .into_iter() - .fold(vec![], |mut collected_publish, e| match e { - ToSwarm::NotifyHandler { event, peer_id, .. } => { - if let HandlerIn::Message(RpcOut::Publish(message)) = event { + .fold(vec![], |mut collected_publish, (peer_id, c)| { + while !c.priority.is_empty() { + if let Ok(RpcOut::Publish(message)) = c.priority.try_recv() { collected_publish.push((peer_id, message)); } - collected_publish } - _ => collected_publish, + collected_publish }); //assert only published to p2 @@ -2800,15 +2839,17 @@ fn test_do_not_flood_publish_to_peer_below_publish_threshold() { ..PeerScoreThresholds::default() }; //build mesh with no peers - let (mut gs, _, topics) = inject_nodes1() + let (mut gs, _, mut queues, topics) = inject_nodes1() .topics(vec!["test".into()]) .gs_config(config) .scoring(Some((peer_score_params, peer_score_thresholds))) .create_network(); //add two additional peers that will be added to the mesh - let p1 = add_peer(&mut gs, &topics, false, false); - let p2 = add_peer(&mut gs, &topics, false, false); + let (p1, receiver1) = add_peer(&mut gs, &topics, false, false); + queues.insert(p1, receiver1); + let (p2, receiver2) = add_peer(&mut gs, &topics, false, false); + queues.insert(p2, receiver2); //reduce score of p1 below peer_score_thresholds.publish_threshold //note that penalties get squared so two penalties means a score of @@ -2826,17 +2867,15 @@ fn test_do_not_flood_publish_to_peer_below_publish_threshold() { gs.publish(Topic::new("test"), publish_data).unwrap(); // Collect all publish messages - let publishes = gs - .events + let publishes = queues .into_iter() - .fold(vec![], |mut collected_publish, e| match e { - ToSwarm::NotifyHandler { event, peer_id, .. } => { - if let HandlerIn::Message(RpcOut::Publish(message)) = event { - collected_publish.push((peer_id, message)); + .fold(vec![], |mut collected_publish, (peer_id, c)| { + while !c.priority.is_empty() { + if let Ok(RpcOut::Publish(message)) = c.priority.try_recv() { + collected_publish.push((peer_id, message)) } - collected_publish } - _ => collected_publish, + collected_publish }); //assert only published to p2 @@ -2856,15 +2895,15 @@ fn test_ignore_rpc_from_peers_below_graylist_threshold() { }; //build mesh with no peers - let (mut gs, _, topics) = inject_nodes1() + let (mut gs, _, _, topics) = inject_nodes1() .topics(vec!["test".into()]) .gs_config(config.clone()) .scoring(Some((peer_score_params, peer_score_thresholds))) .create_network(); //add two additional peers that will be added to the mesh - let p1 = add_peer(&mut gs, &topics, false, false); - let p2 = add_peer(&mut gs, &topics, false, false); + let (p1, _receiver1) = add_peer(&mut gs, &topics, false, false); + let (p2, _receiver2) = add_peer(&mut gs, &topics, false, false); //reduce score of p1 below peer_score_thresholds.graylist_threshold //note that penalties get squared so two penalties means a score of @@ -2986,7 +3025,7 @@ fn test_ignore_px_from_peers_below_accept_px_threshold() { ..PeerScoreThresholds::default() }; // Build mesh with two peers - let (mut gs, peers, topics) = inject_nodes1() + let (mut gs, peers, _, topics) = inject_nodes1() .peer_no(2) .topics(vec!["test".into()]) .to_subscribe(true) @@ -3057,7 +3096,7 @@ fn test_keep_best_scoring_peers_on_oversubscription() { //build mesh with more peers than mesh can hold let n = config.mesh_n_high() + 1; - let (mut gs, peers, topics) = inject_nodes1() + let (mut gs, peers, _receivers, topics) = inject_nodes1() .peer_no(n) .topics(vec!["test".into()]) .to_subscribe(true) @@ -3117,7 +3156,7 @@ fn test_scoring_p1() { let peer_score_thresholds = PeerScoreThresholds::default(); //build mesh with one peer - let (mut gs, peers, _) = inject_nodes1() + let (mut gs, peers, _, _) = inject_nodes1() .peer_no(1) .topics(vec!["test".into()]) .to_subscribe(true) @@ -3199,7 +3238,7 @@ fn test_scoring_p2() { let peer_score_thresholds = PeerScoreThresholds::default(); //build mesh with one peer - let (mut gs, peers, topics) = inject_nodes1() + let (mut gs, peers, _, topics) = inject_nodes1() .peer_no(2) .topics(vec!["test".into()]) .to_subscribe(true) @@ -3299,7 +3338,7 @@ fn test_scoring_p3() { let peer_score_thresholds = PeerScoreThresholds::default(); //build mesh with two peers - let (mut gs, peers, topics) = inject_nodes1() + let (mut gs, peers, _, topics) = inject_nodes1() .peer_no(2) .topics(vec!["test".into()]) .to_subscribe(true) @@ -3400,7 +3439,7 @@ fn test_scoring_p3b() { let peer_score_thresholds = PeerScoreThresholds::default(); //build mesh with one peer - let (mut gs, peers, topics) = inject_nodes1() + let (mut gs, peers, _, topics) = inject_nodes1() .peer_no(1) .topics(vec!["test".into()]) .to_subscribe(true) @@ -3492,7 +3531,7 @@ fn test_scoring_p4_valid_message() { let peer_score_thresholds = PeerScoreThresholds::default(); //build mesh with two peers - let (mut gs, peers, topics) = inject_nodes1() + let (mut gs, peers, _, topics) = inject_nodes1() .peer_no(1) .topics(vec!["test".into()]) .to_subscribe(true) @@ -3551,7 +3590,7 @@ fn test_scoring_p4_invalid_signature() { let peer_score_thresholds = PeerScoreThresholds::default(); //build mesh with one peer - let (mut gs, peers, topics) = inject_nodes1() + let (mut gs, peers, _, topics) = inject_nodes1() .peer_no(1) .topics(vec!["test".into()]) .to_subscribe(true) @@ -3609,7 +3648,7 @@ fn test_scoring_p4_message_from_self() { let peer_score_thresholds = PeerScoreThresholds::default(); //build mesh with two peers - let (mut gs, peers, topics) = inject_nodes1() + let (mut gs, peers, _, topics) = inject_nodes1() .peer_no(1) .topics(vec!["test".into()]) .to_subscribe(true) @@ -3659,7 +3698,7 @@ fn test_scoring_p4_ignored_message() { let peer_score_thresholds = PeerScoreThresholds::default(); //build mesh with two peers - let (mut gs, peers, topics) = inject_nodes1() + let (mut gs, peers, _, topics) = inject_nodes1() .peer_no(1) .topics(vec!["test".into()]) .to_subscribe(true) @@ -3718,7 +3757,7 @@ fn test_scoring_p4_application_invalidated_message() { let peer_score_thresholds = PeerScoreThresholds::default(); //build mesh with two peers - let (mut gs, peers, topics) = inject_nodes1() + let (mut gs, peers, _, topics) = inject_nodes1() .peer_no(1) .topics(vec!["test".into()]) .to_subscribe(true) @@ -3780,7 +3819,7 @@ fn test_scoring_p4_application_invalid_message_from_two_peers() { let peer_score_thresholds = PeerScoreThresholds::default(); //build mesh with two peers - let (mut gs, peers, topics) = inject_nodes1() + let (mut gs, peers, _, topics) = inject_nodes1() .peer_no(2) .topics(vec!["test".into()]) .to_subscribe(true) @@ -3850,7 +3889,7 @@ fn test_scoring_p4_three_application_invalid_messages() { let peer_score_thresholds = PeerScoreThresholds::default(); //build mesh with one peer - let (mut gs, peers, topics) = inject_nodes1() + let (mut gs, peers, _, topics) = inject_nodes1() .peer_no(1) .topics(vec!["test".into()]) .to_subscribe(true) @@ -3934,7 +3973,7 @@ fn test_scoring_p4_decay() { let peer_score_thresholds = PeerScoreThresholds::default(); //build mesh with one peer - let (mut gs, peers, topics) = inject_nodes1() + let (mut gs, peers, _, topics) = inject_nodes1() .peer_no(1) .topics(vec!["test".into()]) .to_subscribe(true) @@ -3988,7 +4027,7 @@ fn test_scoring_p5() { }; //build mesh with one peer - let (mut gs, peers, _) = inject_nodes1() + let (mut gs, peers, _, _) = inject_nodes1() .peer_no(1) .topics(vec!["test".into()]) .to_subscribe(true) @@ -4014,7 +4053,7 @@ fn test_scoring_p6() { ..Default::default() }; - let (mut gs, _, _) = inject_nodes1() + let (mut gs, _, _, _) = inject_nodes1() .peer_no(0) .topics(vec![]) .to_subscribe(false) @@ -4027,20 +4066,20 @@ fn test_scoring_p6() { //create 5 peers with the same ip let addr = Multiaddr::from(Ipv4Addr::new(10, 1, 2, 3)); let peers = vec![ - add_peer_with_addr(&mut gs, &vec![], false, false, addr.clone()), - add_peer_with_addr(&mut gs, &vec![], false, false, addr.clone()), - add_peer_with_addr(&mut gs, &vec![], true, false, addr.clone()), - add_peer_with_addr(&mut gs, &vec![], true, false, addr.clone()), - add_peer_with_addr(&mut gs, &vec![], true, true, addr.clone()), + add_peer_with_addr(&mut gs, &vec![], false, false, addr.clone()).0, + add_peer_with_addr(&mut gs, &vec![], false, false, addr.clone()).0, + add_peer_with_addr(&mut gs, &vec![], true, false, addr.clone()).0, + add_peer_with_addr(&mut gs, &vec![], true, false, addr.clone()).0, + add_peer_with_addr(&mut gs, &vec![], true, true, addr.clone()).0, ]; //create 4 other peers with other ip let addr2 = Multiaddr::from(Ipv4Addr::new(10, 1, 2, 4)); let others = vec![ - add_peer_with_addr(&mut gs, &vec![], false, false, addr2.clone()), - add_peer_with_addr(&mut gs, &vec![], false, false, addr2.clone()), - add_peer_with_addr(&mut gs, &vec![], true, false, addr2.clone()), - add_peer_with_addr(&mut gs, &vec![], true, false, addr2.clone()), + add_peer_with_addr(&mut gs, &vec![], false, false, addr2.clone()).0, + add_peer_with_addr(&mut gs, &vec![], false, false, addr2.clone()).0, + add_peer_with_addr(&mut gs, &vec![], true, false, addr2.clone()).0, + add_peer_with_addr(&mut gs, &vec![], true, false, addr2.clone()).0, ]; //no penalties yet @@ -4144,7 +4183,7 @@ fn test_scoring_p7_grafts_before_backoff() { ..Default::default() }; - let (mut gs, peers, topics) = inject_nodes1() + let (mut gs, peers, _receivers, topics) = inject_nodes1() .peer_no(2) .topics(vec!["test".into()]) .to_subscribe(false) @@ -4221,7 +4260,7 @@ fn test_opportunistic_grafting() { ..Default::default() }; - let (mut gs, peers, topics) = inject_nodes1() + let (mut gs, peers, _receivers, topics) = inject_nodes1() .peer_no(5) .topics(vec!["test".into()]) .to_subscribe(false) @@ -4250,7 +4289,7 @@ fn test_opportunistic_grafting() { } //set scores for peers in the mesh - for (i, peer) in others.iter().enumerate().take(5) { + for (i, (peer, _receiver)) in others.iter().enumerate().take(5) { gs.set_application_score(peer, 0.0 + i as f64); } @@ -4290,7 +4329,7 @@ fn test_opportunistic_grafting() { ); assert!( - gs.mesh[&topics[0]].is_disjoint(&others.iter().cloned().take(2).collect()), + gs.mesh[&topics[0]].is_disjoint(&others.iter().map(|(p, _)| p).cloned().take(2).collect()), "peers below or equal to median should not be added in opportunistic grafting" ); } @@ -4298,7 +4337,7 @@ fn test_opportunistic_grafting() { #[test] fn test_ignore_graft_from_unknown_topic() { //build gossipsub without subscribing to any topics - let (mut gs, _, _) = inject_nodes1() + let (mut gs, _, queues, _) = inject_nodes1() .peer_no(0) .topics(vec![]) .to_subscribe(false) @@ -4309,7 +4348,10 @@ fn test_ignore_graft_from_unknown_topic() { //assert that no prune got created assert_eq!( - count_control_msgs(&gs, |_, a| matches!(a, ControlAction::Prune { .. })), + count_control_msgs(&gs, &queues, |_, a| matches!( + a, + ControlAction::Prune { .. } + )), 0, "we should not prune after graft in unknown topic" ); @@ -4319,14 +4361,15 @@ fn test_ignore_graft_from_unknown_topic() { fn test_ignore_too_many_iwants_from_same_peer_for_same_message() { let config = Config::default(); //build gossipsub with full mesh - let (mut gs, _, topics) = inject_nodes1() + let (mut gs, _, mut queues, topics) = inject_nodes1() .peer_no(config.mesh_n_high()) .topics(vec!["test".into()]) .to_subscribe(false) .create_network(); //add another peer not in the mesh - let peer = add_peer(&mut gs, &topics, false, false); + let (peer, receiver) = add_peer(&mut gs, &topics, false, false); + queues.insert(peer, receiver); //receive a message let mut seq = 0; @@ -4340,7 +4383,7 @@ fn test_ignore_too_many_iwants_from_same_peer_for_same_message() { gs.handle_received_message(m1, &PeerId::random()); //clear events - gs.events.clear(); + flush_events(&mut gs, &mut queues); //the first gossip_retransimission many iwants return the valid message, all others are // ignored. @@ -4349,16 +4392,14 @@ fn test_ignore_too_many_iwants_from_same_peer_for_same_message() { } assert_eq!( - gs.events - .iter() - .filter(|e| matches!( - e, - ToSwarm::NotifyHandler { - event: HandlerIn::Message(RpcOut::Forward(_)), - .. + queues.into_values().fold(0, |mut fwds, c| { + while !c.non_priority.is_empty() { + if let Ok(RpcOut::Forward(_)) = c.non_priority.try_recv() { + fwds += 1; } - )) - .count(), + } + fwds + }), config.gossip_retransimission() as usize, "not more then gossip_retransmission many messages get sent back" ); @@ -4371,7 +4412,7 @@ fn test_ignore_too_many_ihaves() { .build() .unwrap(); //build gossipsub with full mesh - let (mut gs, _, topics) = inject_nodes1() + let (mut gs, _, mut queues, topics) = inject_nodes1() .peer_no(config.mesh_n_high()) .topics(vec!["test".into()]) .to_subscribe(false) @@ -4379,7 +4420,8 @@ fn test_ignore_too_many_ihaves() { .create_network(); //add another peer not in the mesh - let peer = add_peer(&mut gs, &topics, false, false); + let (peer, receiver) = add_peer(&mut gs, &topics, false, false); + queues.insert(peer, receiver); //peer has 20 messages let mut seq = 0; @@ -4408,7 +4450,7 @@ fn test_ignore_too_many_ihaves() { //we send iwant only for the first 10 messages assert_eq!( - count_control_msgs(&gs, |p, action| p == &peer + count_control_msgs(&gs, &queues, |p, action| p == &peer && matches!(action, ControlAction::IWant { message_ids } if message_ids.len() == 1 && first_ten.contains(&message_ids[0]))), 10, "exactly the first ten ihaves should be processed and one iwant for each created" @@ -4431,7 +4473,7 @@ fn test_ignore_too_many_ihaves() { //we sent iwant for all 20 messages assert_eq!( - count_control_msgs(&gs, |p, action| p == &peer + count_control_msgs(&gs, &queues, |p, action| p == &peer && matches!(action, ControlAction::IWant { message_ids } if message_ids.len() == 1)), 20, "all 20 should get sent" @@ -4446,7 +4488,7 @@ fn test_ignore_too_many_messages_in_ihave() { .build() .unwrap(); //build gossipsub with full mesh - let (mut gs, _, topics) = inject_nodes1() + let (mut gs, _, mut queues, topics) = inject_nodes1() .peer_no(config.mesh_n_high()) .topics(vec!["test".into()]) .to_subscribe(false) @@ -4454,7 +4496,8 @@ fn test_ignore_too_many_messages_in_ihave() { .create_network(); //add another peer not in the mesh - let peer = add_peer(&mut gs, &topics, false, false); + let (peer, receiver) = add_peer(&mut gs, &topics, false, false); + queues.insert(peer, receiver); //peer has 20 messages let mut seq = 0; @@ -4480,7 +4523,7 @@ fn test_ignore_too_many_messages_in_ihave() { //we send iwant only for the first 10 messages let mut sum = 0; assert_eq!( - count_control_msgs(&gs, |p, action| match action { + count_control_msgs(&gs, &queues, |p, action| match action { ControlAction::IWant { message_ids } => p == &peer && { assert!(first_twelve.is_superset(&message_ids.iter().collect())); @@ -4505,7 +4548,7 @@ fn test_ignore_too_many_messages_in_ihave() { //we sent 20 iwant messages let mut sum = 0; assert_eq!( - count_control_msgs(&gs, |p, action| match action { + count_control_msgs(&gs, &queues, |p, action| match action { ControlAction::IWant { message_ids } => p == &peer && { sum += message_ids.len(); @@ -4526,7 +4569,7 @@ fn test_limit_number_of_message_ids_inside_ihave() { .build() .unwrap(); //build gossipsub with full mesh - let (mut gs, peers, topics) = inject_nodes1() + let (mut gs, peers, queues, topics) = inject_nodes1() .peer_no(config.mesh_n_high()) .topics(vec!["test".into()]) .to_subscribe(false) @@ -4539,8 +4582,8 @@ fn test_limit_number_of_message_ids_inside_ihave() { } //add two other peers not in the mesh - let p1 = add_peer(&mut gs, &topics, false, false); - let p2 = add_peer(&mut gs, &topics, false, false); + let (p1, _) = add_peer(&mut gs, &topics, false, false); + let (p2, _) = add_peer(&mut gs, &topics, false, false); //receive 200 messages from another peer let mut seq = 0; @@ -4559,7 +4602,7 @@ fn test_limit_number_of_message_ids_inside_ihave() { let mut ihaves2 = HashSet::new(); assert_eq!( - count_control_msgs(&gs, |p, action| match action { + count_control_msgs(&gs, &queues, |p, action| match action { ControlAction::IHave { message_ids, .. } => { if p == &p1 { ihaves1 = message_ids.iter().cloned().collect(); @@ -4616,7 +4659,7 @@ fn test_iwant_penalties() { }; // fill the mesh - let (mut gs, peers, topics) = inject_nodes1() + let (mut gs, peers, _, topics) = inject_nodes1() .peer_no(2) .topics(vec!["test".into()]) .to_subscribe(false) @@ -4640,7 +4683,7 @@ fn test_iwant_penalties() { let mut first_messages = Vec::new(); let mut second_messages = Vec::new(); let mut seq = 0; - for peer in &other_peers { + for (peer, _receiver) in &other_peers { let msg1 = random_message(&mut seq, &topics); let msg2 = random_message(&mut seq, &topics); @@ -4663,19 +4706,19 @@ fn test_iwant_penalties() { } // the peers send us all the first message ids in time - for (index, peer) in other_peers.iter().enumerate() { + for (index, (peer, _receiver)) in other_peers.iter().enumerate() { gs.handle_received_message(first_messages[index].clone(), peer); } // now we do a heartbeat no penalization should have been applied yet gs.heartbeat(); - for peer in &other_peers { + for (peer, _receiver) in &other_peers { assert_eq!(gs.peer_score.as_ref().unwrap().0.score(peer), 0.0); } // receive the first twenty of the other peers then send their response - for (index, peer) in other_peers.iter().enumerate().take(20) { + for (index, (peer, _receiver)) in other_peers.iter().enumerate().take(20) { gs.handle_received_message(second_messages[index].clone(), peer); } @@ -4686,7 +4729,7 @@ fn test_iwant_penalties() { gs.heartbeat(); // now we get the second messages from the last 80 peers. - for (index, peer) in other_peers.iter().enumerate() { + for (index, (peer, _receiver)) in other_peers.iter().enumerate() { if index > 19 { gs.handle_received_message(second_messages[index].clone(), peer); } @@ -4700,7 +4743,7 @@ fn test_iwant_penalties() { let mut single_penalized = 0; let mut double_penalized = 0; - for (i, peer) in other_peers.iter().enumerate() { + for (i, (peer, _receiver)) in other_peers.iter().enumerate() { let score = gs.peer_score.as_ref().unwrap().0.score(peer); if score == 0.0 { not_penalized += 1; @@ -4728,7 +4771,7 @@ fn test_publish_to_floodsub_peers_without_flood_publish() { .flood_publish(false) .build() .unwrap(); - let (mut gs, _, topics) = inject_nodes1() + let (mut gs, _, mut queues, topics) = inject_nodes1() .peer_no(config.mesh_n_low() - 1) .topics(vec!["test".into()]) .to_subscribe(false) @@ -4736,7 +4779,7 @@ fn test_publish_to_floodsub_peers_without_flood_publish() { .create_network(); //add two floodsub peer, one explicit, one implicit - let p1 = add_peer_with_addr_and_kind( + let (p1, receiver1) = add_peer_with_addr_and_kind( &mut gs, &topics, false, @@ -4744,7 +4787,11 @@ fn test_publish_to_floodsub_peers_without_flood_publish() { Multiaddr::empty(), Some(PeerKind::Floodsub), ); - let p2 = add_peer_with_addr_and_kind(&mut gs, &topics, false, false, Multiaddr::empty(), None); + queues.insert(p1, receiver1); + + let (p2, receiver2) = + add_peer_with_addr_and_kind(&mut gs, &topics, false, false, Multiaddr::empty(), None); + queues.insert(p2, receiver2); //p1 and p2 are not in the mesh assert!(!gs.mesh[&topics[0]].contains(&p1) && !gs.mesh[&topics[0]].contains(&p2)); @@ -4754,24 +4801,21 @@ fn test_publish_to_floodsub_peers_without_flood_publish() { gs.publish(Topic::new("test"), publish_data).unwrap(); // Collect publish messages to floodsub peers - let publishes = gs - .events + let publishes = queues .iter() - .fold(vec![], |mut collected_publish, e| match e { - ToSwarm::NotifyHandler { peer_id, event, .. } => { - if peer_id == &p1 || peer_id == &p2 { - if let HandlerIn::Message(RpcOut::Publish(message)) = event { - collected_publish.push(message); - } + .fold(0, |mut collected_publish, (peer_id, c)| { + while !c.priority.is_empty() { + if matches!(c.priority.try_recv(), + Ok(RpcOut::Publish(_)) if peer_id == &p1 || peer_id == &p2) + { + collected_publish += 1; } - collected_publish } - _ => collected_publish, + collected_publish }); assert_eq!( - publishes.len(), - 2, + publishes, 2, "Should send a publish message to all floodsub peers" ); } @@ -4782,7 +4826,7 @@ fn test_do_not_use_floodsub_in_fanout() { .flood_publish(false) .build() .unwrap(); - let (mut gs, _, _) = inject_nodes1() + let (mut gs, _, mut queues, _) = inject_nodes1() .peer_no(config.mesh_n_low() - 1) .topics(Vec::new()) .to_subscribe(false) @@ -4793,7 +4837,7 @@ fn test_do_not_use_floodsub_in_fanout() { let topics = vec![topic.hash()]; //add two floodsub peer, one explicit, one implicit - let p1 = add_peer_with_addr_and_kind( + let (p1, receiver1) = add_peer_with_addr_and_kind( &mut gs, &topics, false, @@ -4801,31 +4845,32 @@ fn test_do_not_use_floodsub_in_fanout() { Multiaddr::empty(), Some(PeerKind::Floodsub), ); - let p2 = add_peer_with_addr_and_kind(&mut gs, &topics, false, false, Multiaddr::empty(), None); + queues.insert(p1, receiver1); + let (p2, receiver2) = + add_peer_with_addr_and_kind(&mut gs, &topics, false, false, Multiaddr::empty(), None); + + queues.insert(p2, receiver2); //publish a message let publish_data = vec![0; 42]; gs.publish(Topic::new("test"), publish_data).unwrap(); // Collect publish messages to floodsub peers - let publishes = gs - .events + let publishes = queues .iter() - .fold(vec![], |mut collected_publish, e| match e { - ToSwarm::NotifyHandler { peer_id, event, .. } => { - if peer_id == &p1 || peer_id == &p2 { - if let HandlerIn::Message(RpcOut::Publish(message)) = event { - collected_publish.push(message); - } + .fold(0, |mut collected_publish, (peer_id, c)| { + while !c.priority.is_empty() { + if matches!(c.priority.try_recv(), + Ok(RpcOut::Publish(_)) if peer_id == &p1 || peer_id == &p2) + { + collected_publish += 1; } - collected_publish } - _ => collected_publish, + collected_publish }); assert_eq!( - publishes.len(), - 2, + publishes, 2, "Should send a publish message to all floodsub peers" ); @@ -4837,7 +4882,7 @@ fn test_do_not_use_floodsub_in_fanout() { #[test] fn test_dont_add_floodsub_peers_to_mesh_on_join() { - let (mut gs, _, _) = inject_nodes1() + let (mut gs, _, _, _) = inject_nodes1() .peer_no(0) .topics(Vec::new()) .to_subscribe(false) @@ -4867,14 +4912,14 @@ fn test_dont_add_floodsub_peers_to_mesh_on_join() { #[test] fn test_dont_send_px_to_old_gossipsub_peers() { - let (mut gs, _, topics) = inject_nodes1() + let (mut gs, _, queues, topics) = inject_nodes1() .peer_no(0) .topics(vec!["test".into()]) .to_subscribe(false) .create_network(); //add an old gossipsub peer - let p1 = add_peer_with_addr_and_kind( + let (p1, _receiver1) = add_peer_with_addr_and_kind( &mut gs, &topics, false, @@ -4892,7 +4937,7 @@ fn test_dont_send_px_to_old_gossipsub_peers() { //check that prune does not contain px assert_eq!( - count_control_msgs(&gs, |_, m| match m { + count_control_msgs(&gs, &queues, |_, m| match m { ControlAction::Prune { peers: px, .. } => !px.is_empty(), _ => false, }), @@ -4904,7 +4949,7 @@ fn test_dont_send_px_to_old_gossipsub_peers() { #[test] fn test_dont_send_floodsub_peers_in_px() { //build mesh with one peer - let (mut gs, peers, topics) = inject_nodes1() + let (mut gs, peers, queues, topics) = inject_nodes1() .peer_no(1) .topics(vec!["test".into()]) .to_subscribe(true) @@ -4930,7 +4975,7 @@ fn test_dont_send_floodsub_peers_in_px() { //check that px in prune message is empty assert_eq!( - count_control_msgs(&gs, |_, m| match m { + count_control_msgs(&gs, &queues, |_, m| match m { ControlAction::Prune { peers: px, .. } => !px.is_empty(), _ => false, }), @@ -4941,7 +4986,7 @@ fn test_dont_send_floodsub_peers_in_px() { #[test] fn test_dont_add_floodsub_peers_to_mesh_in_heartbeat() { - let (mut gs, _, topics) = inject_nodes1() + let (mut gs, _, _, topics) = inject_nodes1() .peer_no(0) .topics(vec!["test".into()]) .to_subscribe(false) @@ -4969,7 +5014,7 @@ fn test_dont_add_floodsub_peers_to_mesh_in_heartbeat() { // Some very basic test of public api methods. #[test] fn test_public_api() { - let (gs, peers, topic_hashes) = inject_nodes1() + let (gs, peers, _, topic_hashes) = inject_nodes1() .peer_no(4) .topics(vec![String::from("topic1")]) .to_subscribe(true) @@ -5001,7 +5046,7 @@ fn test_public_api() { fn test_subscribe_to_invalid_topic() { let t1 = Topic::new("t1"); let t2 = Topic::new("t2"); - let (mut gs, _, _) = inject_nodes::() + let (mut gs, _, _, _) = inject_nodes::() .subscription_filter(WhitelistSubscriptionFilter( vec![t1.hash()].into_iter().collect(), )) @@ -5015,7 +5060,7 @@ fn test_subscribe_to_invalid_topic() { #[test] fn test_subscribe_and_graft_with_negative_score() { //simulate a communication between two gossipsub instances - let (mut gs1, _, topic_hashes) = inject_nodes1() + let (mut gs1, _, _, topic_hashes) = inject_nodes1() .topics(vec!["test".into()]) .scoring(Some(( PeerScoreParams::default(), @@ -5023,14 +5068,14 @@ fn test_subscribe_and_graft_with_negative_score() { ))) .create_network(); - let (mut gs2, _, _) = inject_nodes1().create_network(); + let (mut gs2, _, queues, _) = inject_nodes1().create_network(); let connection_id = ConnectionId::new_unchecked(0); let topic = Topic::new("test"); - let p2 = add_peer(&mut gs1, &Vec::new(), true, false); - let p1 = add_peer(&mut gs2, &topic_hashes, false, false); + let (p2, _receiver1) = add_peer(&mut gs1, &Vec::new(), true, false); + let (p1, _receiver2) = add_peer(&mut gs2, &topic_hashes, false, false); //add penalty to peer p2 gs1.peer_score.as_mut().unwrap().0.add_penalty(&p2, 1); @@ -5040,22 +5085,16 @@ fn test_subscribe_and_graft_with_negative_score() { //subscribe to topic in gs2 gs2.subscribe(&topic).unwrap(); - let forward_messages_to_p1 = |gs1: &mut Behaviour<_, _>, gs2: &mut Behaviour<_, _>| { + let forward_messages_to_p1 = |gs1: &mut Behaviour<_, _>, _gs2: &mut Behaviour<_, _>| { //collect messages to p1 - let messages_to_p1 = gs2.events.drain(..).filter_map(|e| match e { - ToSwarm::NotifyHandler { peer_id, event, .. } => { - if peer_id == p1 { - if let HandlerIn::Message(m) = event { - Some(m) - } else { - None - } - } else { - None - } - } - _ => None, - }); + let messages_to_p1 = + queues + .iter() + .filter_map(|(peer_id, c)| match c.non_priority.try_recv() { + Ok(rpc) if peer_id == &p1 => Some(rpc), + _ => None, + }); + for message in messages_to_p1 { gs1.on_connection_handler_event( p2, @@ -5093,7 +5132,7 @@ fn test_graft_without_subscribe() { let topic = String::from("test_subscribe"); let subscribe_topic = vec![topic.clone()]; let subscribe_topic_hash = vec![Topic::new(topic.clone()).hash()]; - let (mut gs, peers, topic_hashes) = inject_nodes1() + let (mut gs, peers, _, topic_hashes) = inject_nodes1() .peer_no(1) .topics(subscribe_topic) .to_subscribe(false) diff --git a/protocols/gossipsub/src/config.rs b/protocols/gossipsub/src/config.rs index 7e79912cc4a..b99f8548067 100644 --- a/protocols/gossipsub/src/config.rs +++ b/protocols/gossipsub/src/config.rs @@ -95,6 +95,7 @@ pub struct Config { max_ihave_messages: usize, iwant_followup_time: Duration, published_message_ids_cache_time: Duration, + connection_handler_queue_len: usize, } impl Config { @@ -350,6 +351,11 @@ impl Config { pub fn published_message_ids_cache_time(&self) -> Duration { self.published_message_ids_cache_time } + + /// The max number of messages a `ConnectionHandler` can buffer. + pub fn connection_handler_queue_len(&self) -> usize { + self.connection_handler_queue_len + } } impl Default for Config { @@ -417,6 +423,7 @@ impl Default for ConfigBuilder { max_ihave_messages: 10, iwant_followup_time: Duration::from_secs(3), published_message_ids_cache_time: Duration::from_secs(10), + connection_handler_queue_len: 100, }, invalid_protocol: false, } @@ -782,6 +789,10 @@ impl ConfigBuilder { self } + pub fn connection_handler_queue_len(&mut self, len: usize) { + self.config.connection_handler_queue_len = len; + } + /// Constructs a [`Config`] from the given configuration and validates the settings. pub fn build(&self) -> Result { // check all constraints on config diff --git a/protocols/gossipsub/src/handler.rs b/protocols/gossipsub/src/handler.rs index e91f81776e7..dd048c7e2fd 100644 --- a/protocols/gossipsub/src/handler.rs +++ b/protocols/gossipsub/src/handler.rs @@ -20,7 +20,7 @@ use crate::protocol::{GossipsubCodec, ProtocolConfig}; use crate::rpc_proto::proto; -use crate::types::{PeerKind, RawMessage, Rpc, RpcOut}; +use crate::types::{PeerKind, RawMessage, Rpc, RpcReceiver}; use crate::ValidationError; use asynchronous_codec::Framed; use futures::future::Either; @@ -33,7 +33,6 @@ use libp2p_swarm::handler::{ FullyNegotiatedInbound, FullyNegotiatedOutbound, StreamUpgradeError, SubstreamProtocol, }; use libp2p_swarm::Stream; -use smallvec::SmallVec; use std::{ pin::Pin, task::{Context, Poll}, @@ -61,8 +60,6 @@ pub enum HandlerEvent { #[allow(clippy::large_enum_variant)] #[derive(Debug)] pub enum HandlerIn { - /// A gossipsub message to send. - Message(RpcOut), /// The peer has joined the mesh. JoinedMesh, /// The peer has left the mesh. @@ -94,8 +91,8 @@ pub struct EnabledHandler { /// The single long-lived inbound substream. inbound_substream: Option, - /// Queue of values that we want to send to the remote. - send_queue: SmallVec<[proto::RPC; 16]>, + /// Queue of values that we want to send to the remote + send_queue: RpcReceiver, /// Flag indicating that an outbound substream is being established to prevent duplicate /// requests. @@ -159,7 +156,7 @@ enum OutboundSubstreamState { impl Handler { /// Builds a new [`Handler`]. - pub fn new(protocol_config: ProtocolConfig) -> Self { + pub fn new(protocol_config: ProtocolConfig, message_queue: RpcReceiver) -> Self { Handler::Enabled(EnabledHandler { listen_protocol: protocol_config, inbound_substream: None, @@ -167,11 +164,11 @@ impl Handler { outbound_substream_establishing: false, outbound_substream_attempts: 0, inbound_substream_attempts: 0, - send_queue: SmallVec::new(), peer_kind: None, peer_kind_sent: false, last_io_activity: Instant::now(), in_mesh: false, + send_queue: message_queue, }) } } @@ -250,10 +247,11 @@ impl EnabledHandler { ) { // outbound idle state Some(OutboundSubstreamState::WaitingOutput(substream)) => { - if let Some(message) = self.send_queue.pop() { - self.send_queue.shrink_to_fit(); - self.outbound_substream = - Some(OutboundSubstreamState::PendingSend(substream, message)); + if let Poll::Ready(Some(message)) = self.send_queue.poll_next_unpin(cx) { + self.outbound_substream = Some(OutboundSubstreamState::PendingSend( + substream, + message.into_protobuf(), + )); continue; } @@ -409,7 +407,6 @@ impl ConnectionHandler for Handler { fn on_behaviour_event(&mut self, message: HandlerIn) { match self { Handler::Enabled(handler) => match message { - HandlerIn::Message(m) => handler.send_queue.push(m.into_protobuf()), HandlerIn::JoinedMesh => { handler.in_mesh = true; } diff --git a/protocols/gossipsub/src/types.rs b/protocols/gossipsub/src/types.rs index d1b92ff0ba8..23ee4a8ed1e 100644 --- a/protocols/gossipsub/src/types.rs +++ b/protocols/gossipsub/src/types.rs @@ -19,13 +19,19 @@ // DEALINGS IN THE SOFTWARE. //! A collection of types using the Gossipsub system. +use crate::metrics::Metrics; use crate::TopicHash; +use async_channel::{Receiver, Sender}; +use futures::Stream; use libp2p_identity::PeerId; use libp2p_swarm::ConnectionId; use prometheus_client::encoding::EncodeLabelValue; use quick_protobuf::MessageWrite; -use std::fmt; use std::fmt::Debug; +use std::sync::atomic::{AtomicUsize, Ordering}; +use std::sync::Arc; +use std::task::Poll; +use std::{fmt, pin::Pin}; use crate::rpc_proto::proto; #[cfg(feature = "serde")] @@ -512,3 +518,139 @@ impl fmt::Display for PeerKind { f.write_str(self.as_ref()) } } + +/// `RpcOut` sender that is priority aware. +#[derive(Debug, Clone)] +pub(crate) struct RpcSender { + peer_id: PeerId, + cap: usize, + len: Arc, + priority: Sender, + non_priority: Sender, + receiver: RpcReceiver, +} + +impl RpcSender { + /// Create a RpcSender. + pub(crate) fn new(peer_id: PeerId, cap: usize) -> RpcSender { + let (priority_sender, priority_receiver) = async_channel::unbounded(); + let (non_priority_sender, non_priority_receiver) = async_channel::bounded(cap / 2); + let len = Arc::new(AtomicUsize::new(0)); + let receiver = RpcReceiver { + len: len.clone(), + priority: priority_receiver, + non_priority: non_priority_receiver, + }; + RpcSender { + peer_id, + cap: cap / 2, + len, + priority: priority_sender, + non_priority: non_priority_sender, + receiver: receiver.clone(), + } + } + + /// Create a new Receiver to the sender. + pub(crate) fn new_receiver(&self) -> RpcReceiver { + self.receiver.clone() + } + + /// Send a `RpcOut::Control` message to the `RpcReceiver` + /// this is high priority. + pub(crate) fn control(&mut self, control: ControlAction) { + self.priority + .try_send(RpcOut::Control(control)) + .expect("Channel is unbounded and should always be open"); + } + + /// Send a `RpcOut::Subscribe` message to the `RpcReceiver` + /// this is high priority. + pub(crate) fn subscribe(&mut self, topic: TopicHash) { + self.priority + .try_send(RpcOut::Subscribe(topic)) + .expect("Channel is unbounded and should always be open"); + } + + /// Send a `RpcOut::Unsubscribe` message to the `RpcReceiver` + /// this is high priority. + pub(crate) fn unsubscribe(&mut self, topic: TopicHash) { + self.priority + .try_send(RpcOut::Unsubscribe(topic)) + .expect("Channel is unbounded and should always be open"); + } + + /// Send a `RpcOut::Publish` message to the `RpcReceiver` + /// this is high priority. If message sending fails, an `Err` is returned. + pub(crate) fn publish( + &mut self, + message: RawMessage, + metrics: Option<&mut Metrics>, + ) -> Result<(), ()> { + if self.len.load(Ordering::Relaxed) >= self.cap { + return Err(()); + } + self.priority + .try_send(RpcOut::Publish(message.clone())) + .expect("Channel is unbounded and should always be open"); + self.len.fetch_add(1, Ordering::Relaxed); + + if let Some(m) = metrics { + m.msg_sent(&message.topic, message.raw_protobuf_len()); + } + + Ok(()) + } + + /// Send a `RpcOut::Forward` message to the `RpcReceiver` + /// this is high priority. If the queue is full the message is discarded. + pub(crate) fn forward(&mut self, message: RawMessage, metrics: Option<&mut Metrics>) { + if let Err(err) = self.non_priority.try_send(RpcOut::Forward(message.clone())) { + let rpc = err.into_inner(); + tracing::trace!( + "{:?} message to peer {} dropped, queue is full", + rpc, + self.peer_id + ); + return; + } + + if let Some(m) = metrics { + m.msg_sent(&message.topic, message.raw_protobuf_len()); + } + } +} + +/// `RpcOut` sender that is priority aware. +#[derive(Debug, Clone)] +pub struct RpcReceiver { + len: Arc, + pub(crate) priority: Receiver, + pub(crate) non_priority: Receiver, +} + +impl RpcReceiver { + /// Check if both queues are empty. + pub(crate) fn is_empty(&self) -> bool { + self.priority.is_empty() && self.non_priority.is_empty() + } +} + +impl Stream for RpcReceiver { + type Item = RpcOut; + + fn poll_next( + mut self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll> { + // The priority queue is first polled. + if let Poll::Ready(rpc) = Pin::new(&mut self.priority).poll_next(cx) { + if let Some(RpcOut::Publish(_)) = rpc { + self.len.fetch_sub(1, Ordering::Relaxed); + } + return Poll::Ready(rpc); + } + // Then we poll the non priority. + Pin::new(&mut self.non_priority).poll_next(cx) + } +}