From e25f1d0dc7e4168bea474483e4e3f4f58c4347b5 Mon Sep 17 00:00:00 2001 From: Age Manning Date: Thu, 23 Mar 2023 17:03:25 +1100 Subject: [PATCH 1/5] Add less aggressive flood publishing --- protocols/gossipsub/src/behaviour.rs | 329 +++++++++++++++------ protocols/gossipsub/src/behaviour/tests.rs | 45 +-- protocols/gossipsub/src/config.rs | 28 +- protocols/gossipsub/src/mcache.rs | 1 - 4 files changed, 279 insertions(+), 124 deletions(-) diff --git a/protocols/gossipsub/src/behaviour.rs b/protocols/gossipsub/src/behaviour.rs index 13ef81b321f..8b055a3b85c 100644 --- a/protocols/gossipsub/src/behaviour.rs +++ b/protocols/gossipsub/src/behaviour.rs @@ -32,7 +32,7 @@ use std::{ use futures::StreamExt; use log::{debug, error, trace, warn}; use prometheus_client::registry::Registry; -use rand::{seq::SliceRandom, thread_rng}; +use rand::{seq::SliceRandom, thread_rng, Rng}; use libp2p_core::{multiaddr::Protocol::Ip4, multiaddr::Protocol::Ip6, Endpoint, Multiaddr}; use libp2p_identity::Keypair; @@ -46,7 +46,7 @@ use libp2p_swarm::{ use wasm_timer::Instant; use crate::backoff::BackoffStorage; -use crate::config::{Config, ValidationMode}; +use crate::config::{Config, FloodPublish, ValidationMode}; use crate::gossip_promises::GossipPromises; use crate::handler::{Handler, HandlerEvent, HandlerIn}; use crate::mcache::MessageCache; @@ -301,6 +301,10 @@ pub struct Behaviour { /// Short term cache for fast message ids mapping them to the real message ids fast_message_id_cache: TimeCache, + /// When flood publishing, we delay publishing to some peers until the next heartbeat. + /// This stores the messages and the peers we are required to publish to. + messages_to_flood_publish: Vec<(MessageId, HashSet)>, + /// The filter used to handle message subscriptions. subscription_filter: F, @@ -421,7 +425,6 @@ where control_pool: HashMap::new(), publish_config: privacy.into(), duplicate_cache: DuplicateCache::new(config.duplicate_cache_time()), - fast_message_id_cache: TimeCache::new(config.duplicate_cache_time()), topic_peers: HashMap::new(), peer_topics: HashMap::new(), explicit_peers: HashSet::new(), @@ -448,6 +451,8 @@ where pending_iwant_msgs: HashSet::new(), connected_peers: HashMap::new(), published_message_ids: DuplicateCache::new(config.published_message_ids_cache_time()), + fast_message_id_cache: TimeCache::new(config.duplicate_cache_time()), + messages_to_flood_publish: Vec::with_capacity(16), config, subscription_filter, data_transform, @@ -604,18 +609,6 @@ where topic: raw_message.topic.clone(), }); - let event = Rpc { - subscriptions: Vec::new(), - messages: vec![raw_message.clone()], - control_msgs: Vec::new(), - } - .into_protobuf(); - - // check that the size doesn't exceed the max transmission size - if event.get_size() > self.config.max_transmit_size() { - return Err(PublishError::MessageTooLarge); - } - // Check the if the message has been published before if self.duplicate_cache.contains(&msg_id) { // This message has already been seen. We don't re-publish messages that have already @@ -631,89 +624,176 @@ where let topic_hash = raw_message.topic.clone(); - // If we are not flood publishing forward the message to mesh peers. - let mesh_peers_sent = !self.config.flood_publish() - && self.forward_msg(&msg_id, raw_message.clone(), None, HashSet::new())?; + // Forward the message to all mesh peers + let non_zero_mesh_peers = + self.forward_msg(&msg_id, raw_message.clone(), None, HashSet::new())?; + + if !non_zero_mesh_peers { + warn!( + "Insufficient mesh peers for publishing on mesh. Msg-id: {}", + msg_id + ); + } + // Build a list of auxiliary peers we may need to send to. + // The recipient peers are peers we need to send to immediately and are in addition to the + // mesh peers we have already sent to. We may optionally need to later send to some + // optional peers for flood publishing. let mut recipient_peers = HashSet::new(); + let mut flood_publish_peers = HashSet::new(); + + // If there are peers we know about in this topic if let Some(set) = self.topic_peers.get(&topic_hash) { - if self.config.flood_publish() { - // Forward to all peers above score and all explicit peers - recipient_peers.extend( - set.iter() - .filter(|p| { - self.explicit_peers.contains(*p) - || !self.score_below_threshold(p, |ts| ts.publish_threshold).0 - }) - .cloned(), - ); - } else { - // Explicit peers - for peer in &self.explicit_peers { - if set.contains(peer) { - recipient_peers.insert(*peer); - } + // If we are flood publishing to all peers in a rapid manner, we just add all peers + // (including explicit peers) that are above the publish threshold. + match self.config.flood_publish() { + FloodPublish::Rapid => { + recipient_peers.extend( + set.iter() + .filter(|p| { + self.explicit_peers.contains(*p) + || !self.score_below_threshold(p, |ts| ts.publish_threshold).0 + }) + .cloned(), + ); } - - // Floodsub peers - for (peer, connections) in &self.connected_peers { - if connections.kind == PeerKind::Floodsub - && !self - .score_below_threshold(peer, |ts| ts.publish_threshold) - .0 - { - recipient_peers.insert(*peer); + FloodPublish::Heartbeat(_) | FloodPublish::Disabled => { + // We need to build the list of recipient peers according to the spec and optionally + // add in peers to flood publish to after. + + // Send to any floodsub peers, if flood sub is supported. + if self.config.support_floodsub() { + for (peer, connections) in &self.connected_peers { + if connections.kind == PeerKind::Floodsub + && !self + .score_below_threshold(peer, |ts| ts.publish_threshold) + .0 + && set.contains(peer) + { + recipient_peers.insert(*peer); + } + } } - } - // Gossipsub peers - if self.mesh.get(&topic_hash).is_none() { - debug!("Topic: {:?} not in the mesh", topic_hash); - // If we have fanout peers add them to the map. - if self.fanout.contains_key(&topic_hash) { - for peer in self.fanout.get(&topic_hash).expect("Topic must exist") { - recipient_peers.insert(*peer); + // Send to any non-mesh gossipsub peers (i.e fanout peers) + if self.mesh.get(&topic_hash).is_none() { + debug!("Topic: {:?} not in the mesh", topic_hash); + // If we have fanout peers add them to the map. + if self.fanout.contains_key(&topic_hash) { + for peer in self.fanout.get(&topic_hash).expect("Topic must exist") { + recipient_peers.insert(*peer); + } + } else { + // We have no fanout peers, select mesh_n of them and add them to the fanout + let mesh_n = self.config.mesh_n(); + let new_peers = get_random_peers( + &self.topic_peers, + &self.connected_peers, + &topic_hash, + mesh_n, + { + |p| { + !self.explicit_peers.contains(p) + && !self + .score_below_threshold(p, |pst| { + pst.publish_threshold + }) + .0 + } + }, + ); + // Add the new peers to the fanout and recipient peers + self.fanout.insert(topic_hash.clone(), new_peers.clone()); + for peer in new_peers { + debug!("Peer added to fanout: {:?}", peer); + recipient_peers.insert(peer); + } } - } else { - // We have no fanout peers, select mesh_n of them and add them to the fanout - let mesh_n = self.config.mesh_n(); - let new_peers = get_random_peers( - &self.topic_peers, - &self.connected_peers, - &topic_hash, - mesh_n, - { - |p| { - !self.explicit_peers.contains(p) - && !self - .score_below_threshold(p, |pst| pst.publish_threshold) - .0 - } - }, - ); - // Add the new peers to the fanout and recipient peers - self.fanout.insert(topic_hash.clone(), new_peers.clone()); - for peer in new_peers { - debug!("Peer added to fanout: {:?}", peer); - recipient_peers.insert(peer); + // We are publishing to fanout peers - update the time we published + self.fanout_last_pub + .insert(topic_hash.clone(), Instant::now()); + } + + // May need to queue messages to flood publish too. + if let FloodPublish::Heartbeat(max_peers_to_send_to) = + self.config.flood_publish() + { + // We collect up to max_peers_to_send additional peers from our known peers + // list + let mut peer_list = set + .iter() + .filter(|p| { + // Do not include peers we are already sending too + !self.mesh.get(&topic_hash).map(|peers| peers.contains(*p)).unwrap_or(false) && + // Do not include peers we have already added to the recipient peers + !recipient_peers.contains(*p) && + // We have already included explicit peers, so just make sure + // these peers are above threshold + !self.score_below_threshold(p, |ts| ts.publish_threshold).0 + }) + .collect::>(); + + // If we flood publish to everyone, add them all to the list. + if *max_peers_to_send_to == 0 || *max_peers_to_send_to >= peer_list.len() { + flood_publish_peers.extend(peer_list); + } else { + // We have to randomly sample peers from the list. + // The amount we are sampling must be less than the eligible + // peers + let mut sampled = 0; + let mut rng = rand::thread_rng(); + while sampled < *max_peers_to_send_to { + let index = rng.gen_range(0..peer_list.len()); + flood_publish_peers.insert(peer_list.swap_remove(index).clone()); + sampled += 1; + } } } - // We are publishing to fanout peers - update the time we published - self.fanout_last_pub - .insert(topic_hash.clone(), Instant::now()); } } } - if recipient_peers.is_empty() && !mesh_peers_sent { + // If there are no peers to send to, report an error. + if !non_zero_mesh_peers && recipient_peers.is_empty() && flood_publish_peers.is_empty() { return Err(PublishError::InsufficientPeers); } - // If the message isn't a duplicate and we have sent it to some peers add it to the - // duplicate cache and memcache. - self.duplicate_cache.insert(msg_id.clone()); + // There are additional peers to send to. Some are immediate, others are queued for the + // next heartbeat. + + let recipient_peers_non_empty = !recipient_peers.is_empty(); + + // Publish to the immediate peers + // NOTE: A message always exists as it is constructed above. + self.publish_message(&msg_id, recipient_peers, raw_message.clone())?; + + if non_zero_mesh_peers || recipient_peers_non_empty { + // We have sent this message to at least one peer. + debug!("Published message: {:?}", &msg_id); + + // If the message isn't a duplicate and we have sent it to some peers add it to the + // duplicate cache and memcache. + self.duplicate_cache.insert(msg_id.clone()); + } + + // We intend to send this to at least one peer so we add it to the cache. self.mcache.put(&msg_id, raw_message); + // Queue the message for flood publishing peers (if there are any) + if !flood_publish_peers.is_empty() { + self.messages_to_flood_publish + .push((msg_id.clone(), flood_publish_peers)); + } + Ok(msg_id) + } + + /// Helper function to publish a message to a set of peers. + fn publish_message( + &mut self, + msg_id: &MessageId, + recipient_peers: HashSet, + raw_message: RawMessage, + ) -> Result<(), PublishError> { // If the message is anonymous or has a random author add it to the published message ids // cache. if let PublishConfig::RandomAuthor | PublishConfig::Anonymous = self.publish_config { @@ -722,24 +802,52 @@ where } } - // Send to peers we know are subscribed to the topic. + // Send to the recipient list + let topic_hash = raw_message.topic.clone(); + let event = Rpc { + subscriptions: Vec::new(), + messages: vec![raw_message], + control_msgs: Vec::new(), + } + .into_protobuf(); let msg_bytes = event.get_size(); - for peer_id in recipient_peers.iter() { - trace!("Sending message to peer: {:?}", peer_id); - self.send_message(*peer_id, event.clone())?; - - if let Some(m) = self.metrics.as_mut() { - m.msg_sent(&topic_hash, msg_bytes); + let mut sent_to_a_peer = false; + for peer_id in recipient_peers.into_iter() { + // NOTE: There is a delay between calculating which peers to send to and actually sending + // the message. This creates a race condition in which peers can unsubscribe or + // disconnect. We need to check the peers are still connected before sending the + // message. + if self + .peer_topics + .get(&peer_id) + .map(|t| t.contains(&topic_hash)) + .unwrap_or(false) + { + trace!("Sending message to peer: {:?}", peer_id); + self.send_message(peer_id, event.clone())?; + sent_to_a_peer = true; + if let Some(m) = self.metrics.as_mut() { + m.msg_sent(&topic_hash, msg_bytes); + } } } - debug!("Published message: {:?}", &msg_id); - if let Some(metrics) = self.metrics.as_mut() { metrics.register_published_message(&topic_hash); } - Ok(msg_id) + // There is a case where we have no mesh peers, but attempt to send to flood publish peers, + // which disconnect before we are able to send to them. In this case we don't want to + // prevent future publishing, so emit adding the message id to the duplicate cache. + // + // If we did manage to send to a peer, we add the message to the duplicate cache. + // This can double up, but duplicates don't exist in this cache, so there shouldn't be an + // issue here. + if sent_to_a_peer { + self.duplicate_cache.insert(msg_id.clone()); + } + + Ok(()) } /// This function should be called when [`Config::validate_messages()`] is `true` after @@ -2483,6 +2591,22 @@ where // piggyback pooled control messages self.flush_control_pool(); + // Before shifting the memcache, attempt to publish any messages that are required for + // flood publishing. + while !self.messages_to_flood_publish.is_empty() { + let (msg_id, peer_list) = self + .messages_to_flood_publish + .pop() + .expect("Vector is not empty"); + + // Get the actual message from the memcache and send it to the peer list. + if let Some(raw_message) = self.mcache.get(&msg_id) { + // We ignore publish errors here as flood publishing is an optional feature. + let _ = self.publish_message(&msg_id, peer_list, raw_message.clone()); + } + } + self.messages_to_flood_publish.shrink_to(16); + // shift the memcache self.mcache.shift(); @@ -2726,21 +2850,30 @@ where } } + // NOTE: This provides the check for publishing messages. It is located here to avoid + // double protobuf encoding. When forwarding messages, this cannot occur as an oversized + // message will not be docoded to forward. + let topic_hash = message.topic.clone(); + let event = Rpc { + subscriptions: Vec::new(), + messages: vec![message], + control_msgs: Vec::new(), + } + .into_protobuf(); + + // check that the size doesn't exceed the max transmission size + if event.get_size() > self.config.max_transmit_size() { + return Err(PublishError::MessageTooLarge); + } + // forward the message to peers if !recipient_peers.is_empty() { - let event = Rpc { - subscriptions: Vec::new(), - messages: vec![message.clone()], - control_msgs: Vec::new(), - } - .into_protobuf(); - let msg_bytes = event.get_size(); for peer in recipient_peers.iter() { debug!("Sending message: {:?} to peer {:?}", msg_id, peer); self.send_message(*peer, event.clone())?; if let Some(m) = self.metrics.as_mut() { - m.msg_sent(&message.topic, msg_bytes); + m.msg_sent(&topic_hash, msg_bytes); } } debug!("Completed forwarding message"); diff --git a/protocols/gossipsub/src/behaviour/tests.rs b/protocols/gossipsub/src/behaviour/tests.rs index 5413f6dbb89..af4b2cf2dd5 100644 --- a/protocols/gossipsub/src/behaviour/tests.rs +++ b/protocols/gossipsub/src/behaviour/tests.rs @@ -631,9 +631,9 @@ fn test_publish_without_flood_publishing() { // - Send publish message to all peers // - Insert message into gs.mcache and gs.received - //turn off flood publish to test old behaviour + // Turn off flood publish to test old behaviour let config = ConfigBuilder::default() - .flood_publish(false) + .flood_publish(FloodPublish::Disabled) .build() .unwrap(); @@ -715,7 +715,7 @@ fn test_fanout() { //turn off flood publish to test fanout behaviour let config = ConfigBuilder::default() - .flood_publish(false) + .flood_publish(FloodPublish::Disabled) .build() .unwrap(); @@ -2104,6 +2104,7 @@ fn test_unsubscribe_backoff() { #[test] fn test_flood_publish() { + let _ = env_logger::try_init(); let config: Config = Config::default(); let topic = "test"; @@ -2114,10 +2115,13 @@ fn test_flood_publish() { .to_subscribe(true) .create_network(); - //publish message + // publish message let publish_data = vec![0; 42]; gs.publish(Topic::new(topic), publish_data).unwrap(); + // Wait a heartbeat + gs.heartbeat(); + // Collect all publish messages let publishes = gs .events @@ -2148,7 +2152,6 @@ fn test_flood_publish() { let msg_id = gs.config.message_id(message); - let config: Config = Config::default(); assert_eq!( publishes.len(), config.mesh_n_high() + 10, @@ -2788,7 +2791,7 @@ fn test_ihave_msg_from_peer_below_gossip_threshold_gets_ignored() { #[test] fn test_do_not_publish_to_peer_below_publish_threshold() { let config = ConfigBuilder::default() - .flood_publish(false) + .flood_publish(FloodPublish::Disabled) .build() .unwrap(); let peer_score_params = PeerScoreParams::default(); @@ -2851,33 +2854,37 @@ fn test_do_not_publish_to_peer_below_publish_threshold() { #[test] fn test_do_not_flood_publish_to_peer_below_publish_threshold() { - let config = Config::default(); + let _ = env_logger::try_init(); + let config = ConfigBuilder::default() + .flood_publish(FloodPublish::Rapid) + .build() + .unwrap(); let peer_score_params = PeerScoreParams::default(); let peer_score_thresholds = PeerScoreThresholds { gossip_threshold: 0.5 * peer_score_params.behaviour_penalty_weight, publish_threshold: 3.0 * peer_score_params.behaviour_penalty_weight, ..PeerScoreThresholds::default() }; - //build mesh with no peers + // Build mesh with no peers let (mut gs, _, topics) = inject_nodes1() .topics(vec!["test".into()]) .gs_config(config) .scoring(Some((peer_score_params, peer_score_thresholds))) .create_network(); - //add two additional peers that will be added to the mesh + // add two additional peers that will be added to the mesh let p1 = add_peer(&mut gs, &topics, false, false); let p2 = add_peer(&mut gs, &topics, false, false); - //reduce score of p1 below peer_score_thresholds.publish_threshold - //note that penalties get squared so two penalties means a score of + // reduce score of p1 below peer_score_thresholds.publish_threshold + // note that penalties get squared so two penalties means a score of // 4 * peer_score_params.behaviour_penalty_weight. gs.peer_score.as_mut().unwrap().0.add_penalty(&p1, 2); - //reduce score of p2 below 0 but not below peer_score_thresholds.publish_threshold + // Reduce score of p2 below 0 but not below peer_score_thresholds.publish_threshold gs.peer_score.as_mut().unwrap().0.add_penalty(&p2, 1); - //a heartbeat will remove the peers from the mesh + // a heartbeat will remove the peers from the mesh gs.heartbeat(); // publish on topic @@ -2901,7 +2908,7 @@ fn test_do_not_flood_publish_to_peer_below_publish_threshold() { _ => collected_publish, }); - //assert only published to p2 + // assert only published to p2 assert_eq!(publishes.len(), 1); assert!(publishes[0].0 == p2); } @@ -4793,7 +4800,8 @@ fn test_iwant_penalties() { #[test] fn test_publish_to_floodsub_peers_without_flood_publish() { let config = ConfigBuilder::default() - .flood_publish(false) + .flood_publish(FloodPublish::Disabled) + .support_floodsub() .build() .unwrap(); let (mut gs, _, topics) = inject_nodes1() @@ -4850,7 +4858,8 @@ fn test_publish_to_floodsub_peers_without_flood_publish() { #[test] fn test_do_not_use_floodsub_in_fanout() { let config = ConfigBuilder::default() - .flood_publish(false) + .flood_publish(FloodPublish::Disabled) + .support_floodsub() .build() .unwrap(); let (mut gs, _, _) = inject_nodes1() @@ -4863,7 +4872,7 @@ fn test_do_not_use_floodsub_in_fanout() { let topic = Topic::new("test"); let topics = vec![topic.hash()]; - //add two floodsub peer, one explicit, one implicit + // Add two floodsub peer, one explicit, one implicit let p1 = add_peer_with_addr_and_kind( &mut gs, &topics, @@ -4874,7 +4883,7 @@ fn test_do_not_use_floodsub_in_fanout() { ); let p2 = add_peer_with_addr_and_kind(&mut gs, &topics, false, false, Multiaddr::empty(), None); - //publish a message + // Publish a message let publish_data = vec![0; 42]; gs.publish(Topic::new("test"), publish_data).unwrap(); diff --git a/protocols/gossipsub/src/config.rs b/protocols/gossipsub/src/config.rs index 1f0b23848d6..b5b9e326bc2 100644 --- a/protocols/gossipsub/src/config.rs +++ b/protocols/gossipsub/src/config.rs @@ -56,6 +56,19 @@ pub enum Version { V1_1, } +//// Selector for types of flood publishing. +#[derive(Clone, Debug, PartialEq, Eq)] +pub enum FloodPublish { + /// Flood publishing is disabled. + Disabled, + /// Flood publishing is enabled. Messages are sent immediately to all peers in the mesh and + /// flood messages are queued to be published in the next heartbeat up to a maximum number of + /// peers. If 0 is set, this publish to all known peers in the next heartbeat. + Heartbeat(usize), + /// Flood publishing is enabled and messages are immediately published to all peers on the topic. + Rapid, +} + /// Configuration parameters that define the performance of the gossipsub network. #[derive(Clone)] pub struct Config { @@ -86,7 +99,7 @@ pub struct Config { prune_backoff: Duration, unsubscribe_backoff: Duration, backoff_slack: u32, - flood_publish: bool, + flood_publish: FloodPublish, graft_flood_threshold: Duration, mesh_outbound_min: usize, opportunistic_graft_ticks: u64, @@ -310,11 +323,12 @@ impl Config { self.backoff_slack } - /// Whether to do flood publishing or not. If enabled newly created messages will always be + /// Whether to enable flood publishing and the method for flood publishing. If enabled newly created messages will always be /// sent to all peers that are subscribed to the topic and have a good enough score. - /// The default is true. - pub fn flood_publish(&self) -> bool { - self.flood_publish + /// The default is FloodPublish::Heartbeat(0), which will flood publish to all known peers in the + /// next heartbeat. + pub fn flood_publish(&self) -> &FloodPublish { + &self.flood_publish } /// If a GRAFT comes before `graft_flood_threshold` has elapsed since the last PRUNE, @@ -448,7 +462,7 @@ impl Default for ConfigBuilder { prune_backoff: Duration::from_secs(60), unsubscribe_backoff: Duration::from_secs(10), backoff_slack: 1, - flood_publish: true, + flood_publish: FloodPublish::Heartbeat(0), graft_flood_threshold: Duration::from_secs(10), mesh_outbound_min: 2, opportunistic_graft_ticks: 60, @@ -701,7 +715,7 @@ impl ConfigBuilder { /// Whether to do flood publishing or not. If enabled newly created messages will always be /// sent to all peers that are subscribed to the topic and have a good enough score. /// The default is true. - pub fn flood_publish(&mut self, flood_publish: bool) -> &mut Self { + pub fn flood_publish(&mut self, flood_publish: FloodPublish) -> &mut Self { self.config.flood_publish = flood_publish; self } diff --git a/protocols/gossipsub/src/mcache.rs b/protocols/gossipsub/src/mcache.rs index 7fa08a6ac6a..085ef3b16f9 100644 --- a/protocols/gossipsub/src/mcache.rs +++ b/protocols/gossipsub/src/mcache.rs @@ -107,7 +107,6 @@ impl MessageCache { } /// Get a message with `message_id` - #[cfg(test)] pub fn get(&self, message_id: &MessageId) -> Option<&RawMessage> { self.msgs.get(message_id).map(|(message, _)| message) } From 7a8ffcfa83eac0b6cf86b04e496f4628e92f81fb Mon Sep 17 00:00:00 2001 From: Age Manning Date: Thu, 23 Mar 2023 17:19:20 +1100 Subject: [PATCH 2/5] Add changelog --- protocols/gossipsub/CHANGELOG.md | 3 +++ 1 file changed, 3 insertions(+) diff --git a/protocols/gossipsub/CHANGELOG.md b/protocols/gossipsub/CHANGELOG.md index b3accf6ef18..7b1184b19fb 100644 --- a/protocols/gossipsub/CHANGELOG.md +++ b/protocols/gossipsub/CHANGELOG.md @@ -2,7 +2,10 @@ - Signed messages now use sequential integers in the sequence number field. See [PR 3551]. +- Modify the flood publishing behaviour to publish to extra peers on the next + heartbeat. See [PR 3665]. +[PR 3665]: https://github.com/libp2p/rust-libp2p/pull/3665 [PR 3551]: https://github.com/libp2p/rust-libp2p/pull/3551 # 0.44.1 From d9a33e4b53798464bcdbb530700051106216d976 Mon Sep 17 00:00:00 2001 From: Age Manning Date: Thu, 23 Mar 2023 17:23:52 +1100 Subject: [PATCH 3/5] Avoid mesh peer doubling --- protocols/gossipsub/src/behaviour.rs | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/protocols/gossipsub/src/behaviour.rs b/protocols/gossipsub/src/behaviour.rs index 8b055a3b85c..adafc109433 100644 --- a/protocols/gossipsub/src/behaviour.rs +++ b/protocols/gossipsub/src/behaviour.rs @@ -651,7 +651,11 @@ where recipient_peers.extend( set.iter() .filter(|p| { - self.explicit_peers.contains(*p) + self.mesh + .get(&topic_hash) + .map(|peers| !peers.contains(*p)) + .unwrap_or(true) + && self.explicit_peers.contains(*p) || !self.score_below_threshold(p, |ts| ts.publish_threshold).0 }) .cloned(), @@ -724,12 +728,12 @@ where .iter() .filter(|p| { // Do not include peers we are already sending too - !self.mesh.get(&topic_hash).map(|peers| peers.contains(*p)).unwrap_or(false) && + self.mesh.get(&topic_hash).map(|peers| !peers.contains(*p)).unwrap_or(true) && // Do not include peers we have already added to the recipient peers !recipient_peers.contains(*p) && // We have already included explicit peers, so just make sure // these peers are above threshold - !self.score_below_threshold(p, |ts| ts.publish_threshold).0 + !self.score_below_threshold(p, |ts| ts.publish_threshold).0 }) .collect::>(); From 4d0c950b92e9ef823089053a8e066a81c6cd0b02 Mon Sep 17 00:00:00 2001 From: Age Manning Date: Thu, 23 Mar 2023 17:29:34 +1100 Subject: [PATCH 4/5] Fix a clippy lint --- protocols/gossipsub/src/behaviour.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/protocols/gossipsub/src/behaviour.rs b/protocols/gossipsub/src/behaviour.rs index adafc109433..ecab8b6d6fb 100644 --- a/protocols/gossipsub/src/behaviour.rs +++ b/protocols/gossipsub/src/behaviour.rs @@ -748,7 +748,7 @@ where let mut rng = rand::thread_rng(); while sampled < *max_peers_to_send_to { let index = rng.gen_range(0..peer_list.len()); - flood_publish_peers.insert(peer_list.swap_remove(index).clone()); + flood_publish_peers.insert(*peer_list.swap_remove(index)); sampled += 1; } } From a422d4c36abeac8732a4dc3dc0b0ebf373e25dbc Mon Sep 17 00:00:00 2001 From: Akihito Nakano Date: Mon, 3 Apr 2023 09:41:22 +0900 Subject: [PATCH 5/5] Make FloodPublish public (#293) --- protocols/gossipsub/src/lib.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/protocols/gossipsub/src/lib.rs b/protocols/gossipsub/src/lib.rs index 222a2f34f93..5518be67ca3 100644 --- a/protocols/gossipsub/src/lib.rs +++ b/protocols/gossipsub/src/lib.rs @@ -159,7 +159,7 @@ mod types; mod rpc_proto; pub use self::behaviour::{Behaviour, Event, MessageAuthenticity}; -pub use self::config::{Config, ConfigBuilder, ValidationMode, Version}; +pub use self::config::{Config, ConfigBuilder, FloodPublish, ValidationMode, Version}; pub use self::error_priv::{HandlerError, PublishError, SubscriptionError, ValidationError}; pub use self::peer_score::{ score_parameter_decay, score_parameter_decay_with_base, PeerScoreParams, PeerScoreThresholds,