Skip to content

Commit

Permalink
call gather_supported_protocols at 5 seconds
Browse files Browse the repository at this point in the history
Minimise the runtime impact of this function by calling
not more than once every 5 seconds.

Also fixes the case where an idle connection would not
update the local supported protocols after retunring
Poll::Pending.

Signed-off-by: alindima <[email protected]>
  • Loading branch information
alindima committed Aug 4, 2023
1 parent f95f39f commit bb0606c
Showing 1 changed file with 39 additions and 8 deletions.
47 changes: 39 additions & 8 deletions swarm/src/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,17 @@ use std::{fmt, io, mem, pin::Pin, task::Context, task::Poll};

static NEXT_CONNECTION_ID: AtomicUsize = AtomicUsize::new(1);

/// Determines how often we should check for changes in the local supported protocols.
/// This serves two purposes:
/// 1. To not perform the check more often than 5 seconds (as it's time consuming and the protocols
/// should rarely change).
/// 2. To make sure that the `Connection` future is ever woken up to perform the check if all the
/// other higher-priority sub-futures are still Pending.
#[cfg(test)]
const LOCAL_SUPPORTED_PROTOCOLS_UPDATE_INTERVAL: Duration = Duration::from_millis(500);
#[cfg(not(test))]
const LOCAL_SUPPORTED_PROTOCOLS_UPDATE_INTERVAL: Duration = Duration::from_secs(5);

/// Connection identifier.
#[derive(Debug, Copy, Clone, Hash, PartialEq, Eq, PartialOrd, Ord)]
pub struct ConnectionId(usize);
Expand Down Expand Up @@ -149,6 +160,8 @@ where

local_supported_protocols: HashSet<StreamProtocol>,
remote_supported_protocols: HashSet<StreamProtocol>,
/// Timer for when we should check for changes in the local supported protocols.
update_local_supported_protocols_timer: Delay,
}

impl<THandler> fmt::Debug for Connection<THandler>
Expand Down Expand Up @@ -196,6 +209,9 @@ where
requested_substreams: Default::default(),
local_supported_protocols: initial_protocols,
remote_supported_protocols: Default::default(),
update_local_supported_protocols_timer: Delay::new(
LOCAL_SUPPORTED_PROTOCOLS_UPDATE_INTERVAL,
),
}
}

Expand Down Expand Up @@ -227,6 +243,7 @@ where
substream_upgrade_protocol_override,
local_supported_protocols: supported_protocols,
remote_supported_protocols,
update_local_supported_protocols_timer,
} = self.get_mut();

loop {
Expand Down Expand Up @@ -415,17 +432,26 @@ where
}
}

let new_protocols = gather_supported_protocols(handler);
let changes = ProtocolsChange::from_full_sets(supported_protocols, &new_protocols);
if update_local_supported_protocols_timer
.poll_unpin(cx)
.is_ready()
{
// Arm the timer again.
*update_local_supported_protocols_timer =
Delay::new(LOCAL_SUPPORTED_PROTOCOLS_UPDATE_INTERVAL);

if !changes.is_empty() {
for change in changes {
handler.on_connection_event(ConnectionEvent::LocalProtocolsChange(change));
}
let new_protocols = gather_supported_protocols(handler);
let changes = ProtocolsChange::from_full_sets(supported_protocols, &new_protocols);

*supported_protocols = new_protocols;
if !changes.is_empty() {
for change in changes {
handler.on_connection_event(ConnectionEvent::LocalProtocolsChange(change));
}

*supported_protocols = new_protocols;

continue; // Go back to the top, handler can potentially make progress again.
continue; // Go back to the top, handler can potentially make progress again.
}
}

return Poll::Pending; // Nothing can make progress, return `Pending`.
Expand Down Expand Up @@ -760,15 +786,19 @@ mod tests {
0,
);

let update_timeout = LOCAL_SUPPORTED_PROTOCOLS_UPDATE_INTERVAL + Duration::from_millis(500);

// First, start listening on a single protocol.
connection.handler.listen_on(&["/foo"]);
std::thread::sleep(update_timeout);
let _ = connection.poll_noop_waker();

assert_eq!(connection.handler.local_added, vec![vec!["/foo"]]);
assert!(connection.handler.local_removed.is_empty());

// Second, listen on two protocols.
connection.handler.listen_on(&["/foo", "/bar"]);
std::thread::sleep(update_timeout);
let _ = connection.poll_noop_waker();

assert_eq!(
Expand All @@ -780,6 +810,7 @@ mod tests {

// Third, stop listening on the first protocol.
connection.handler.listen_on(&["/bar"]);
std::thread::sleep(update_timeout);
let _ = connection.poll_noop_waker();

assert_eq!(
Expand Down

0 comments on commit bb0606c

Please sign in to comment.