From 5465590ec6a45591f31922b7e8848b0ea4fb830a Mon Sep 17 00:00:00 2001 From: Pierre Krieger Date: Thu, 2 Apr 2020 11:10:28 +0200 Subject: [PATCH 1/3] Better logging for notifications and buffer size increase --- .../src/protocol/generic_proto/handler/notif_out.rs | 13 +++++++++---- .../protocol/generic_proto/upgrade/notifications.rs | 5 ++--- 2 files changed, 11 insertions(+), 7 deletions(-) diff --git a/client/network/src/protocol/generic_proto/handler/notif_out.rs b/client/network/src/protocol/generic_proto/handler/notif_out.rs index 62d2409be8141..aa0ee640b47f4 100644 --- a/client/network/src/protocol/generic_proto/handler/notif_out.rs +++ b/client/network/src/protocol/generic_proto/handler/notif_out.rs @@ -75,12 +75,13 @@ impl IntoProtocolsHandler for NotifsOutHandlerProto { DeniedUpgrade } - fn into_handler(self, _: &PeerId, _: &ConnectedPoint) -> Self::Handler { + fn into_handler(self, peer_id: &PeerId, _: &ConnectedPoint) -> Self::Handler { NotifsOutHandler { protocol_name: self.protocol_name, when_connection_open: Instant::now(), state: State::Disabled, events_queue: SmallVec::new(), + peer_id: peer_id.clone(), } } } @@ -108,6 +109,9 @@ pub struct NotifsOutHandler { /// This queue must only ever be modified to insert elements at the back, or remove the first /// element. events_queue: SmallVec<[ProtocolsHandlerEvent; 16]>, + + /// Who we are connected to. + peer_id: PeerId, } /// Our relationship with the node we're connected to. @@ -300,11 +304,12 @@ impl ProtocolsHandler for NotifsOutHandler { NotifsOutHandlerIn::Send(msg) => if let State::Open { substream, .. } = &mut self.state { - if let Some(Ok(_)) = substream.send(msg).now_or_never() { - } else { + if !matches!(substream.send(msg).now_or_never(), Some(Ok(_))) { log::warn!( target: "sub-libp2p", - "📞 Failed to push message to queue, dropped it" + "📞 Queue of notifications with {} is full, dropped message (protocol: {:?})", + self.peer_id, + self.protocol_name, ); } } else { diff --git a/client/network/src/protocol/generic_proto/upgrade/notifications.rs b/client/network/src/protocol/generic_proto/upgrade/notifications.rs index b6ae1425f1161..54d575a443961 100644 --- a/client/network/src/protocol/generic_proto/upgrade/notifications.rs +++ b/client/network/src/protocol/generic_proto/upgrade/notifications.rs @@ -43,9 +43,8 @@ use unsigned_varint::codec::UviBytes; /// Maximum allowed size of the two handshake messages, in bytes. const MAX_HANDSHAKE_SIZE: usize = 1024; -/// Maximum number of buffered messages before we consider the remote unresponsive and kill the -/// substream. -const MAX_PENDING_MESSAGES: usize = 256; +/// Maximum number of buffered messages before we refuse to accept more. +const MAX_PENDING_MESSAGES: usize = 1024; /// Upgrade that accepts a substream, sends back a status message, then becomes a unidirectional /// stream of messages. From 8cc37a6a0adfbad93e26c20ceb7f56b0901867da Mon Sep 17 00:00:00 2001 From: Pierre Krieger Date: Thu, 2 Apr 2020 15:32:13 +0200 Subject: [PATCH 2/3] Address review --- client/network/src/protocol/generic_proto/handler/notif_out.rs | 2 +- .../network/src/protocol/generic_proto/upgrade/notifications.rs | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/client/network/src/protocol/generic_proto/handler/notif_out.rs b/client/network/src/protocol/generic_proto/handler/notif_out.rs index aa0ee640b47f4..a21b45c6d74a6 100644 --- a/client/network/src/protocol/generic_proto/handler/notif_out.rs +++ b/client/network/src/protocol/generic_proto/handler/notif_out.rs @@ -307,7 +307,7 @@ impl ProtocolsHandler for NotifsOutHandler { if !matches!(substream.send(msg).now_or_never(), Some(Ok(_))) { log::warn!( target: "sub-libp2p", - "📞 Queue of notifications with {} is full, dropped message (protocol: {:?})", + "📞 Notifications queue with peer {} is full, dropped message (protocol: {:?})", self.peer_id, self.protocol_name, ); diff --git a/client/network/src/protocol/generic_proto/upgrade/notifications.rs b/client/network/src/protocol/generic_proto/upgrade/notifications.rs index 54d575a443961..fa2d50952a333 100644 --- a/client/network/src/protocol/generic_proto/upgrade/notifications.rs +++ b/client/network/src/protocol/generic_proto/upgrade/notifications.rs @@ -44,7 +44,7 @@ use unsigned_varint::codec::UviBytes; /// Maximum allowed size of the two handshake messages, in bytes. const MAX_HANDSHAKE_SIZE: usize = 1024; /// Maximum number of buffered messages before we refuse to accept more. -const MAX_PENDING_MESSAGES: usize = 1024; +const MAX_PENDING_MESSAGES: usize = 256; /// Upgrade that accepts a substream, sends back a status message, then becomes a unidirectional /// stream of messages. From 597fb242ef0e4bd391861e5420acdd94481d6f9c Mon Sep 17 00:00:00 2001 From: Pierre Krieger Date: Fri, 3 Apr 2020 11:30:19 +0200 Subject: [PATCH 3/3] Improve warning about notifications queue and remove spurious triggers --- .../generic_proto/handler/notif_out.rs | 9 ++++----- .../generic_proto/upgrade/notifications.rs | 19 +++++++++++++------ 2 files changed, 17 insertions(+), 11 deletions(-) diff --git a/client/network/src/protocol/generic_proto/handler/notif_out.rs b/client/network/src/protocol/generic_proto/handler/notif_out.rs index e2a86e96fd102..dd38826496e7a 100644 --- a/client/network/src/protocol/generic_proto/handler/notif_out.rs +++ b/client/network/src/protocol/generic_proto/handler/notif_out.rs @@ -312,11 +312,7 @@ impl ProtocolsHandler for NotifsOutHandler { NotifsOutHandlerIn::Send(msg) => if let State::Open { substream, .. } = &mut self.state { - if let Some(Ok(_)) = substream.send(msg).now_or_never() { - if let Some(metric) = &self.queue_size_report { - metric.observe(substream.queue_len() as f64); - } - } else { + if substream.push_message(msg).is_err() { log::warn!( target: "sub-libp2p", "📞 Notifications queue with peer {} is full, dropped message (protocol: {:?})", @@ -324,6 +320,9 @@ impl ProtocolsHandler for NotifsOutHandler { self.protocol_name, ); } + if let Some(metric) = &self.queue_size_report { + metric.observe(substream.queue_len() as f64); + } } else { // This is an API misuse. log::warn!( diff --git a/client/network/src/protocol/generic_proto/upgrade/notifications.rs b/client/network/src/protocol/generic_proto/upgrade/notifications.rs index f7c7e3ce97e9b..f626110a3346c 100644 --- a/client/network/src/protocol/generic_proto/upgrade/notifications.rs +++ b/client/network/src/protocol/generic_proto/upgrade/notifications.rs @@ -284,6 +284,18 @@ impl NotificationsOutSubstream { pub fn queue_len(&self) -> u32 { u32::try_from(self.messages_queue.len()).unwrap_or(u32::max_value()) } + + /// Push a message to the queue of messages. + /// + /// This has the same effect as the `Sink::start_send` implementation. + pub fn push_message(&mut self, item: Vec) -> Result<(), NotificationsOutError> { + if self.messages_queue.len() >= MAX_PENDING_MESSAGES { + return Err(NotificationsOutError::Clogged); + } + + self.messages_queue.push_back(item); + Ok(()) + } } impl Sink> for NotificationsOutSubstream @@ -296,12 +308,7 @@ impl Sink> for NotificationsOutSubstream } fn start_send(mut self: Pin<&mut Self>, item: Vec) -> Result<(), Self::Error> { - if self.messages_queue.len() >= MAX_PENDING_MESSAGES { - return Err(NotificationsOutError::Clogged); - } - - self.messages_queue.push_back(item); - Ok(()) + self.push_message(item) } fn poll_flush(self: Pin<&mut Self>, cx: &mut Context) -> Poll> {