Skip to content

Commit

Permalink
Implement message time-bound dropping (libp2p#555)
Browse files Browse the repository at this point in the history
* Implement message time-bound dropping

* Fix wasm and tests

* remove unrequired derives from RpcOut and NOOP Arbitraty test

* replace comparing Instant's with futures_timer::Delay

* make handler ignorant wrt the messages dropped

* add metrics for dropped messages

* Fix warnings

* Fix intradoc links

* Add changelog

---------

Co-authored-by: João Oliveira <[email protected]>
  • Loading branch information
AgeManning and jxs authored Dec 4, 2023
1 parent a4a3f39 commit 73ae996
Show file tree
Hide file tree
Showing 9 changed files with 185 additions and 75 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions protocols/gossipsub/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
## 0.46.1 - unreleased
- Implement publish and forward message dropping.

- Implement backpressure by diferentiating between priority and non priority messages.
Drop `Publish` and `Forward` messages when the queue becomes full.
Expand Down
1 change: 1 addition & 0 deletions protocols/gossipsub/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ either = "1.9"
fnv = "1.0.7"
futures = "0.3.29"
futures-ticker = "0.0.3"
futures-timer = "3.0.2"
getrandom = "0.2.11"
hex_fmt = "0.3.0"
instant = "0.1.12"
Expand Down
77 changes: 34 additions & 43 deletions protocols/gossipsub/src/behaviour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@ use libp2p_swarm::{
THandlerOutEvent, ToSwarm,
};

use crate::config::{Config, ValidationMode};
use crate::gossip_promises::GossipPromises;
use crate::handler::{Handler, HandlerEvent, HandlerIn};
use crate::mcache::MessageCache;
Expand All @@ -62,6 +61,10 @@ use crate::types::{
};
use crate::types::{PeerConnections, PeerKind};
use crate::{backoff::BackoffStorage, types::RpcSender};
use crate::{
config::{Config, ValidationMode},
types::RpcOut,
};
use crate::{rpc_proto::proto, TopicScoreParams};
use crate::{PublishError, SubscriptionError, ValidationError};
use instant::SystemTime;
Expand Down Expand Up @@ -732,7 +735,11 @@ where
.expect("Peerid should exist");

if sender
.publish(raw_message.clone(), self.metrics.as_mut())
.publish(
raw_message.clone(),
self.config.publish_queue_duration(),
self.metrics.as_mut(),
)
.is_err()
{
errors += 1;
Expand Down Expand Up @@ -1342,7 +1349,11 @@ where
.get_mut(peer_id)
.expect("Peerid should exist");

sender.forward(msg, self.metrics.as_mut());
sender.forward(
msg,
self.config.forward_queue_duration(),
self.metrics.as_mut(),
);
}
}
}
Expand Down Expand Up @@ -2660,7 +2671,11 @@ where
.handler_send_queues
.get_mut(peer)
.expect("Peerid should exist");
sender.forward(message.clone(), self.metrics.as_mut());
sender.forward(
message.clone(),
self.config.forward_queue_duration(),
self.metrics.as_mut(),
);
}
tracing::debug!("Completed forwarding message");
Ok(true)
Expand Down Expand Up @@ -3115,6 +3130,21 @@ where
}
}
}
HandlerEvent::MessageDropped(rpc) => {
// TODO:
// * Build scoring logic to handle peers that are dropping messages
if let Some(metrics) = self.metrics.as_mut() {
match rpc {
RpcOut::Publish { message, .. } => {
metrics.publish_msg_dropped(&message.topic);
}
RpcOut::Forward { message, .. } => {
metrics.forward_msg_dropped(&message.topic);
}
_ => {}
}
}
}
HandlerEvent::Message {
rpc,
invalid_messages,
Expand Down Expand Up @@ -3439,42 +3469,3 @@ impl fmt::Debug for PublishConfig {
}
}
}

#[cfg(test)]
mod local_test {
use super::*;
use crate::{types::RpcOut, IdentTopic};
use quickcheck::*;

fn test_message() -> RawMessage {
RawMessage {
source: Some(PeerId::random()),
data: vec![0; 100],
sequence_number: None,
topic: TopicHash::from_raw("test_topic"),
signature: None,
key: None,
validated: false,
}
}

fn test_control() -> ControlAction {
ControlAction::IHave {
topic_hash: IdentTopic::new("TestTopic").hash(),
message_ids: vec![MessageId(vec![12u8]); 5],
}
}

impl Arbitrary for RpcOut {
fn arbitrary(g: &mut Gen) -> Self {
match u8::arbitrary(g) % 5 {
0 => RpcOut::Subscribe(IdentTopic::new("TestTopic").hash()),
1 => RpcOut::Unsubscribe(IdentTopic::new("TestTopic").hash()),
2 => RpcOut::Publish(test_message()),
3 => RpcOut::Forward(test_message()),
4 => RpcOut::Control(test_control()),
_ => panic!("outside range"),
}
}
}
}
28 changes: 14 additions & 14 deletions protocols/gossipsub/src/behaviour/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,7 @@ where
}
};

let sender = RpcSender::new(peer, 100);
let sender = RpcSender::new(peer, gs.config.connection_handler_queue_len());
let receiver = sender.new_receiver();
gs.handler_send_queues.insert(peer, sender);

Expand Down Expand Up @@ -669,7 +669,7 @@ fn test_publish_without_flood_publishing() {
.into_values()
.fold(vec![], |mut collected_publish, c| {
while !c.priority.is_empty() {
if let Ok(RpcOut::Publish(message)) = c.priority.try_recv() {
if let Ok(RpcOut::Publish { message, .. }) = c.priority.try_recv() {
collected_publish.push(message);
}
}
Expand Down Expand Up @@ -753,7 +753,7 @@ fn test_fanout() {
.into_values()
.fold(vec![], |mut collected_publish, c| {
while !c.priority.is_empty() {
if let Ok(RpcOut::Publish(message)) = c.priority.try_recv() {
if let Ok(RpcOut::Publish { message, .. }) = c.priority.try_recv() {
collected_publish.push(message);
}
}
Expand Down Expand Up @@ -1035,8 +1035,8 @@ fn test_handle_iwant_msg_cached() {
.into_values()
.fold(vec![], |mut collected_messages, c| {
while !c.non_priority.is_empty() {
if let Ok(RpcOut::Forward(msg)) = c.non_priority.try_recv() {
collected_messages.push(msg)
if let Ok(RpcOut::Forward { message, .. }) = c.non_priority.try_recv() {
collected_messages.push(message)
}
}
collected_messages
Expand Down Expand Up @@ -1090,7 +1090,7 @@ fn test_handle_iwant_msg_cached_shifted() {
let message_exists = queues.values().any(|c| {
let mut out = false;
while !c.non_priority.is_empty() {
if matches!(c.non_priority.try_recv(), Ok(RpcOut::Forward(message)) if
if matches!(c.non_priority.try_recv(), Ok(RpcOut::Forward{message, timeout: _ }) if
gs.config.message_id(
&gs.data_transform
.inbound_transform(message.clone())
Expand Down Expand Up @@ -1558,7 +1558,7 @@ fn do_forward_messages_to_explicit_peers() {
assert_eq!(
queues.into_iter().fold(0, |mut fwds, (peer_id, c)| {
while !c.non_priority.is_empty() {
if matches!(c.non_priority.try_recv(), Ok(RpcOut::Forward(m)) if peer_id == peers[0] && m.data == message.data) {
if matches!(c.non_priority.try_recv(), Ok(RpcOut::Forward{message: m, timeout: _}) if peer_id == peers[0] && m.data == message.data) {
fwds +=1;
}
}
Expand Down Expand Up @@ -2113,7 +2113,7 @@ fn test_flood_publish() {
.into_values()
.fold(vec![], |mut collected_publish, c| {
while !c.priority.is_empty() {
if let Ok(RpcOut::Publish(message)) = c.priority.try_recv() {
if let Ok(RpcOut::Publish { message, .. }) = c.priority.try_recv() {
collected_publish.push(message);
}
}
Expand Down Expand Up @@ -2672,7 +2672,7 @@ fn test_iwant_msg_from_peer_below_gossip_threshold_gets_ignored() {
.into_iter()
.fold(vec![], |mut collected_messages, (peer_id, c)| {
while !c.non_priority.is_empty() {
if let Ok(RpcOut::Forward(message)) = c.non_priority.try_recv() {
if let Ok(RpcOut::Forward { message, .. }) = c.non_priority.try_recv() {
collected_messages.push((peer_id, message));
}
}
Expand Down Expand Up @@ -2817,7 +2817,7 @@ fn test_do_not_publish_to_peer_below_publish_threshold() {
.into_iter()
.fold(vec![], |mut collected_publish, (peer_id, c)| {
while !c.priority.is_empty() {
if let Ok(RpcOut::Publish(message)) = c.priority.try_recv() {
if let Ok(RpcOut::Publish { message, .. }) = c.priority.try_recv() {
collected_publish.push((peer_id, message));
}
}
Expand Down Expand Up @@ -2871,7 +2871,7 @@ fn test_do_not_flood_publish_to_peer_below_publish_threshold() {
.into_iter()
.fold(vec![], |mut collected_publish, (peer_id, c)| {
while !c.priority.is_empty() {
if let Ok(RpcOut::Publish(message)) = c.priority.try_recv() {
if let Ok(RpcOut::Publish { message, .. }) = c.priority.try_recv() {
collected_publish.push((peer_id, message))
}
}
Expand Down Expand Up @@ -4394,7 +4394,7 @@ fn test_ignore_too_many_iwants_from_same_peer_for_same_message() {
assert_eq!(
queues.into_values().fold(0, |mut fwds, c| {
while !c.non_priority.is_empty() {
if let Ok(RpcOut::Forward(_)) = c.non_priority.try_recv() {
if let Ok(RpcOut::Forward { .. }) = c.non_priority.try_recv() {
fwds += 1;
}
}
Expand Down Expand Up @@ -4806,7 +4806,7 @@ fn test_publish_to_floodsub_peers_without_flood_publish() {
.fold(0, |mut collected_publish, (peer_id, c)| {
while !c.priority.is_empty() {
if matches!(c.priority.try_recv(),
Ok(RpcOut::Publish(_)) if peer_id == &p1 || peer_id == &p2)
Ok(RpcOut::Publish{..}) if peer_id == &p1 || peer_id == &p2)
{
collected_publish += 1;
}
Expand Down Expand Up @@ -4861,7 +4861,7 @@ fn test_do_not_use_floodsub_in_fanout() {
.fold(0, |mut collected_publish, (peer_id, c)| {
while !c.priority.is_empty() {
if matches!(c.priority.try_recv(),
Ok(RpcOut::Publish(_)) if peer_id == &p1 || peer_id == &p2)
Ok(RpcOut::Publish{..}) if peer_id == &p1 || peer_id == &p2)
{
collected_publish += 1;
}
Expand Down
33 changes: 31 additions & 2 deletions protocols/gossipsub/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,8 @@ pub struct Config {
iwant_followup_time: Duration,
published_message_ids_cache_time: Duration,
connection_handler_queue_len: usize,
connection_handler_publish_duration: Duration,
connection_handler_forward_duration: Duration,
}

impl Config {
Expand Down Expand Up @@ -352,10 +354,22 @@ impl Config {
self.published_message_ids_cache_time
}

/// The max number of messages a `ConnectionHandler` can buffer.
/// The max number of messages a `ConnectionHandler` can buffer. The default is 5000.
pub fn connection_handler_queue_len(&self) -> usize {
self.connection_handler_queue_len
}

/// The duration a message to be published can wait to be sent before it is abandoned. The
/// default is 5 seconds.
pub fn publish_queue_duration(&self) -> Duration {
self.connection_handler_publish_duration
}

/// The duration a message to be forwarded can wait to be sent before it is abandoned. The
/// default is 500ms.
pub fn forward_queue_duration(&self) -> Duration {
self.connection_handler_forward_duration
}
}

impl Default for Config {
Expand Down Expand Up @@ -423,7 +437,9 @@ impl Default for ConfigBuilder {
max_ihave_messages: 10,
iwant_followup_time: Duration::from_secs(3),
published_message_ids_cache_time: Duration::from_secs(10),
connection_handler_queue_len: 100,
connection_handler_queue_len: 5000,
connection_handler_publish_duration: Duration::from_secs(5),
connection_handler_forward_duration: Duration::from_millis(500),
},
invalid_protocol: false,
}
Expand Down Expand Up @@ -789,10 +805,23 @@ impl ConfigBuilder {
self
}

/// The max number of messages a `ConnectionHandler` can buffer. The default is 5000.
pub fn connection_handler_queue_len(&mut self, len: usize) {
self.config.connection_handler_queue_len = len;
}

/// The duration a message to be published can wait to be sent before it is abandoned. The
/// default is 5 seconds.
pub fn publish_queue_duration(&mut self, duration: Duration) {
self.config.connection_handler_publish_duration = duration;
}

/// The duration a message to be forwarded can wait to be sent before it is abandoned. The
/// default is 500ms.
pub fn forward_queue_duration(&mut self, duration: Duration) {
self.config.connection_handler_forward_duration = duration;
}

/// Constructs a [`Config`] from the given configuration and validates the settings.
pub fn build(&self) -> Result<Config, ConfigBuilderError> {
// check all constraints on config
Expand Down
Loading

0 comments on commit 73ae996

Please sign in to comment.