diff --git a/protocols/noise/Cargo.toml b/protocols/noise/Cargo.toml index 53eb0ff0f2f..0edcf65e97c 100644 --- a/protocols/noise/Cargo.toml +++ b/protocols/noise/Cargo.toml @@ -27,6 +27,7 @@ snow = { version = "0.6.1", features = ["ring-resolver"], default-features = fal snow = { version = "0.6.1", features = ["default-resolver"], default-features = false } [dev-dependencies] +async-std = "~1.5" env_logger = "0.7.1" libp2p-tcp = { version = "0.19.0", path = "../../transports/tcp", features = ["async-std"] } quickcheck = "0.9.0" diff --git a/swarm/src/behaviour.rs b/swarm/src/behaviour.rs index 16e804f91c8..05a3e4ab510 100644 --- a/swarm/src/behaviour.rs +++ b/swarm/src/behaviour.rs @@ -101,11 +101,57 @@ pub trait NetworkBehaviour: Send + 'static { fn inject_connection_closed(&mut self, _: &PeerId, _: &ConnectionId, _: &ConnectedPoint) {} - /// Informs the behaviour about an event generated by the handler dedicated to the peer identified by `peer_id`. - /// for the behaviour. + /// Informs the behaviour that one or more connections to a peer are + /// currently busy and unable to receive the given event which was + /// previously emitted by `NetworkBehaviour::poll`. + /// + /// By calling this method, the `Swarm` tries to propagate backpressure + /// from the underlying `Network` to the `NetworkBehaviour`. If the + /// `NetworkBehaviour` is unable to re-schedule the given event to be + /// emitted on a later invocation of `NetworkBehaviour::poll`, it must + /// return an `Err` with the given event, in which case the `Swarm` will + /// buffer the event until it can be delivered or the connection(s) close. + /// Note that in this case, the `Swarm` will not call + /// `NetworkBehaviour::poll` again before the buffered event has been + /// consumed, resulting in backpressure on all network connections. + /// Implementing this method thus amounts to implementing peer and + /// connection-specific backpressure in a `NetworkBehaviour`. + /// + /// By default, this method always returns an `Err`, indicating that + /// the behaviour cannot handle backpressure and leaves it to the + /// `Swarm`. + /// + /// > **Note**: A given event should only be re-emitted by the behaviour + /// > after `NetworkBehaviour::poll` returned `Poll::Pending` at least once, + /// > to allow the underlying `Network` to make progress. Failure to do + /// > so is considered programmer error and may put the `Swarm` in a + /// > busy-loop or an unrecoverable infinite loop. Returning `Poll::Pending` + /// > at least once before trying to re-emit an event that could not be + /// > delivered to the `Network` previously is the essence of what it + /// > means to "try again later". + /// + /// > **Note**: If the given event has been previously emitted with + /// > `NotifyHandler::All` and there are multiple connections to the peer, + /// > it may already have been delivered to some of these connections, + /// > in which case emitting the given event later with `NotifyHandler::All` + /// > again will result in duplicate event delivery as well as potentially + /// > sending the event to new connections. If that is undesirable, + /// > the behaviour can make sure to only re-emit the event for the + /// > connection ID(s) given to this method. + fn inject_connections_busy( + &mut self, + _: &PeerId, + _: impl Iterator, + e: <::Handler as ProtocolsHandler>::InEvent + ) -> Result<(), <::Handler as ProtocolsHandler>::InEvent> + { + Err(e) + } + + /// Informs the behaviour about a network event on a connecton with a peer. /// /// The `peer_id` is guaranteed to be in a connected state. In other words, `inject_connected` - /// has previously been called with this `PeerId`. + /// has previously been called with the same `PeerId`. fn inject_event( &mut self, peer_id: PeerId, @@ -149,10 +195,27 @@ pub trait NetworkBehaviour: Send + 'static { fn inject_listener_closed(&mut self, _id: ListenerId, _reason: Result<(), &std::io::Error>) { } - /// Polls for things that swarm should do. + /// Polls for behaviour to ask for the next instruction for the `Swarm`. + /// + /// The behaviour is `poll`ed by the `Swarm` until it returns `Poll::Pending`, + /// which signals the `Swarm` that the `NetworkBehaviour` is waiting for + /// progress (and more events, see `inject_event`) from the underlying + /// `Network` before it may be `poll`ed again. + /// + /// > **Note**: A `NetworkBehaviour` must eventually and repeatedly emit + /// > `Poll::Pending` to allow the `Network` to make progress. A + /// > `NetworkBehaviour` that never again emits `Poll::Pending` after a + /// > certain point is analogous to a `NetworkBehaviour` that no longer + /// > needs any input from the `Network` from that point onwards and is + /// > thus typically a programmer error. /// - /// This API mimics the API of the `Stream` trait. The method may register the current task in - /// order to wake it up at a later point in time. + /// > **Note**: When `poll`ed, a `NetworkBehaviour` may register the + /// > current task to be woken up with the given `Context`, if it wishes + /// > to be `poll`ed again even in the absence of any network activity. + /// > When `poll` returns `Poll::Pending` without registering the current + /// > task to be woken up, the behaviour is nevertheless always `poll`ed + /// > again when there is activity on the underlying `Network`, but not + /// > earlier. fn poll(&mut self, cx: &mut Context, params: &mut impl PollParameters) -> Poll::Handler as ProtocolsHandler>::InEvent, Self::OutEvent>>; } diff --git a/swarm/src/lib.rs b/swarm/src/lib.rs index f848f64f9d3..866731fb8ad 100644 --- a/swarm/src/lib.rs +++ b/swarm/src/lib.rs @@ -285,7 +285,18 @@ where /// Pending event to be delivered to connection handlers /// (or dropped if the peer disconnected) before the `behaviour` /// can be polled again. - pending_event: Option<(PeerId, PendingNotifyHandler, TInEvent)> + pending_event: Option<(PeerId, PendingNotifyHandler, TInEvent)>, + + /// The current lead of `NetworkBehaviour::inject_event` calls + /// as compared to `NetworkBehaviour::poll` calls. + event_lead: usize, + + /// The maximum lead of `NetworkBehaviour::inject_event` calls + /// as compared to `NetworkBehaviour::poll` calls. + max_event_lead: usize, + + /// Whether the behaviour is waiting for more network activity. + behaviour_pending: bool, } impl Deref for @@ -486,141 +497,169 @@ where TBehaviour: NetworkBehaviour, let this = &mut *self; loop { - let mut network_not_ready = false; - - // First let the network make progress. - match this.network.poll(cx) { - Poll::Pending => network_not_ready = true, - Poll::Ready(NetworkEvent::ConnectionEvent { connection, event }) => { - let peer = connection.peer_id().clone(); - let connection = connection.id(); - this.behaviour.inject_event(peer, connection, event); - }, - Poll::Ready(NetworkEvent::ConnectionEstablished { connection, num_established }) => { - let peer_id = connection.peer_id().clone(); - let endpoint = connection.endpoint().clone(); - if this.banned_peers.contains(&peer_id) { - this.network.peer(peer_id.clone()) - .into_connected() - .expect("the Network just notified us that we were connected; QED") - .disconnect(); - return Poll::Ready(SwarmEvent::BannedPeer { - peer_id, + let mut network_pending = false; + + // First let the network make progress, if the behaviour is waiting + // for progress on the network and we are not too far ahead in terms + // of events given to the behaviour before it must be `poll`ed. + if this.event_lead < this.max_event_lead && this.behaviour_pending { + match this.network.poll(cx) { + Poll::Pending => network_pending = true, + Poll::Ready(NetworkEvent::ConnectionEvent { connection, event }) => { + let peer = connection.peer_id().clone(); + let connection = connection.id(); + this.behaviour.inject_event(peer, connection, event); + this.event_lead += 1; + }, + Poll::Ready(NetworkEvent::ConnectionEstablished { connection, num_established }) => { + let peer_id = connection.peer_id().clone(); + let endpoint = connection.endpoint().clone(); + if this.banned_peers.contains(&peer_id) { + this.network.peer(peer_id.clone()) + .into_connected() + .expect("the Network just notified us that we were connected; QED") + .disconnect(); + return Poll::Ready(SwarmEvent::BannedPeer { + peer_id, + endpoint, + }); + } else { + log::debug!("Connection established: {:?}; Total (peer): {}.", + connection.connected(), num_established); + let endpoint = connection.endpoint().clone(); + this.behaviour.inject_connection_established(&peer_id, &connection.id(), &endpoint); + if num_established.get() == 1 { + this.behaviour.inject_connected(&peer_id); + } + return Poll::Ready(SwarmEvent::ConnectionEstablished { + peer_id, num_established, endpoint + }); + } + }, + Poll::Ready(NetworkEvent::ConnectionError { id, connected, error, num_established }) => { + log::debug!("Connection {:?} closed: {:?}", connected, error); + let info = connected.info; + let endpoint = connected.endpoint; + this.behaviour.inject_connection_closed(info.peer_id(), &id, &endpoint); + if num_established == 0 { + this.behaviour.inject_disconnected(info.peer_id()); + } + return Poll::Ready(SwarmEvent::ConnectionClosed { + peer_id: info.peer_id().clone(), endpoint, + cause: error, + num_established, }); - } else { - log::debug!("Connection established: {:?}; Total (peer): {}.", - connection.connected(), num_established); - let endpoint = connection.endpoint().clone(); - this.behaviour.inject_connection_established(&peer_id, &connection.id(), &endpoint); - if num_established.get() == 1 { - this.behaviour.inject_connected(&peer_id); + }, + Poll::Ready(NetworkEvent::IncomingConnection(incoming)) => { + let handler = this.behaviour.new_handler(); + let local_addr = incoming.local_addr().clone(); + let send_back_addr = incoming.send_back_addr().clone(); + if let Err(e) = incoming.accept(handler.into_node_handler_builder()) { + log::warn!("Incoming connection rejected: {:?}", e); } - return Poll::Ready(SwarmEvent::ConnectionEstablished { - peer_id, num_established, endpoint + return Poll::Ready(SwarmEvent::IncomingConnection { + local_addr, + send_back_addr, }); + }, + Poll::Ready(NetworkEvent::NewListenerAddress { listener_id, listen_addr }) => { + log::debug!("Listener {:?}; New address: {:?}", listener_id, listen_addr); + if !this.listened_addrs.contains(&listen_addr) { + this.listened_addrs.push(listen_addr.clone()) + } + this.behaviour.inject_new_listen_addr(&listen_addr); + return Poll::Ready(SwarmEvent::NewListenAddr(listen_addr)); } - }, - Poll::Ready(NetworkEvent::ConnectionError { id, connected, error, num_established }) => { - log::debug!("Connection {:?} closed: {:?}", connected, error); - let info = connected.info; - let endpoint = connected.endpoint; - this.behaviour.inject_connection_closed(info.peer_id(), &id, &endpoint); - if num_established == 0 { - this.behaviour.inject_disconnected(info.peer_id()); - } - return Poll::Ready(SwarmEvent::ConnectionClosed { - peer_id: info.peer_id().clone(), - endpoint, - cause: error, - num_established, - }); - }, - Poll::Ready(NetworkEvent::IncomingConnection(incoming)) => { - let handler = this.behaviour.new_handler(); - let local_addr = incoming.local_addr().clone(); - let send_back_addr = incoming.send_back_addr().clone(); - if let Err(e) = incoming.accept(handler.into_node_handler_builder()) { - log::warn!("Incoming connection rejected: {:?}", e); - } - return Poll::Ready(SwarmEvent::IncomingConnection { - local_addr, - send_back_addr, - }); - }, - Poll::Ready(NetworkEvent::NewListenerAddress { listener_id, listen_addr }) => { - log::debug!("Listener {:?}; New address: {:?}", listener_id, listen_addr); - if !this.listened_addrs.contains(&listen_addr) { - this.listened_addrs.push(listen_addr.clone()) + Poll::Ready(NetworkEvent::ExpiredListenerAddress { listener_id, listen_addr }) => { + log::debug!("Listener {:?}; Expired address {:?}.", listener_id, listen_addr); + this.listened_addrs.retain(|a| a != &listen_addr); + this.behaviour.inject_expired_listen_addr(&listen_addr); + return Poll::Ready(SwarmEvent::ExpiredListenAddr(listen_addr)); } - this.behaviour.inject_new_listen_addr(&listen_addr); - return Poll::Ready(SwarmEvent::NewListenAddr(listen_addr)); - } - Poll::Ready(NetworkEvent::ExpiredListenerAddress { listener_id, listen_addr }) => { - log::debug!("Listener {:?}; Expired address {:?}.", listener_id, listen_addr); - this.listened_addrs.retain(|a| a != &listen_addr); - this.behaviour.inject_expired_listen_addr(&listen_addr); - return Poll::Ready(SwarmEvent::ExpiredListenAddr(listen_addr)); - } - Poll::Ready(NetworkEvent::ListenerClosed { listener_id, addresses, reason }) => { - log::debug!("Listener {:?}; Closed by {:?}.", listener_id, reason); - for addr in addresses.iter() { - this.behaviour.inject_expired_listen_addr(addr); + Poll::Ready(NetworkEvent::ListenerClosed { listener_id, addresses, reason }) => { + log::debug!("Listener {:?}; Closed by {:?}.", listener_id, reason); + for addr in addresses.iter() { + this.behaviour.inject_expired_listen_addr(addr); + } + this.behaviour.inject_listener_closed(listener_id, match &reason { + Ok(()) => Ok(()), + Err(err) => Err(err), + }); + return Poll::Ready(SwarmEvent::ListenerClosed { + addresses, + reason, + }); } - this.behaviour.inject_listener_closed(listener_id, match &reason { - Ok(()) => Ok(()), - Err(err) => Err(err), - }); - return Poll::Ready(SwarmEvent::ListenerClosed { - addresses, - reason, - }); + Poll::Ready(NetworkEvent::ListenerError { listener_id, error }) => { + this.behaviour.inject_listener_error(listener_id, &error); + return Poll::Ready(SwarmEvent::ListenerError { + error, + }); + }, + Poll::Ready(NetworkEvent::IncomingConnectionError { local_addr, send_back_addr, error }) => { + log::debug!("Incoming connection failed: {:?}", error); + return Poll::Ready(SwarmEvent::IncomingConnectionError { + local_addr, + send_back_addr, + error, + }); + }, + Poll::Ready(NetworkEvent::DialError { peer_id, multiaddr, error, attempts_remaining }) => { + log::debug!( + "Connection attempt to {:?} via {:?} failed with {:?}. Attempts remaining: {}.", + peer_id, multiaddr, error, attempts_remaining); + this.behaviour.inject_addr_reach_failure(Some(&peer_id), &multiaddr, &error); + if attempts_remaining == 0 { + this.behaviour.inject_dial_failure(&peer_id); + } + return Poll::Ready(SwarmEvent::UnreachableAddr { + peer_id, + address: multiaddr, + error, + attempts_remaining, + }); + }, + Poll::Ready(NetworkEvent::UnknownPeerDialError { multiaddr, error, .. }) => { + log::debug!("Connection attempt to address {:?} of unknown peer failed with {:?}", + multiaddr, error); + this.behaviour.inject_addr_reach_failure(None, &multiaddr, &error); + return Poll::Ready(SwarmEvent::UnknownPeerUnreachableAddr { + address: multiaddr, + error, + }); + }, } - Poll::Ready(NetworkEvent::ListenerError { listener_id, error }) => { - this.behaviour.inject_listener_error(listener_id, &error); - return Poll::Ready(SwarmEvent::ListenerError { - error, - }); - }, - Poll::Ready(NetworkEvent::IncomingConnectionError { local_addr, send_back_addr, error }) => { - log::debug!("Incoming connection failed: {:?}", error); - return Poll::Ready(SwarmEvent::IncomingConnectionError { - local_addr, - send_back_addr, - error, - }); - }, - Poll::Ready(NetworkEvent::DialError { peer_id, multiaddr, error, attempts_remaining }) => { - log::debug!( - "Connection attempt to {:?} via {:?} failed with {:?}. Attempts remaining: {}.", - peer_id, multiaddr, error, attempts_remaining); - this.behaviour.inject_addr_reach_failure(Some(&peer_id), &multiaddr, &error); - if attempts_remaining == 0 { - this.behaviour.inject_dial_failure(&peer_id); - } - return Poll::Ready(SwarmEvent::UnreachableAddr { - peer_id, - address: multiaddr, - error, - attempts_remaining, - }); - }, - Poll::Ready(NetworkEvent::UnknownPeerDialError { multiaddr, error, .. }) => { - log::debug!("Connection attempt to address {:?} of unknown peer failed with {:?}", - multiaddr, error); - this.behaviour.inject_addr_reach_failure(None, &multiaddr, &error); - return Poll::Ready(SwarmEvent::UnknownPeerUnreachableAddr { - address: multiaddr, - error, - }); - }, } // After the network had a chance to make progress, try to deliver - // the pending event emitted by the behaviour in the previous iteration - // to the connection handler(s). The pending event must be delivered - // before polling the behaviour again. If the targeted peer - // meanwhie disconnected, the event is discarded. + // the pending event that could not be delivered previously. + // The pending event must be delivered before the behaviour can + // be polled again, as it may request delivery of yet another + // event, which cannot be buffered. + // + // If the targeted peer meanwhile disconnected, the event is discarded. + // + // If the pending event cannot be delivered because the connection + // is busy, we continue to poll the network until `max_event_lead` is + // reached, at which point we must wait for the wakeup that the + // connection can receive another event so that the pending event + // can be consumed. + // + // That results in the following behaviour if the pending event + // cannot be delivered: + // + // 1. All connections are temporarily unable to get new data to send + // from the behaviour (since we only buffer a single pending event, + // `NetworkBehaviour::poll` is not called again until the event is + // consumed). + // + // 2. New connections continue to get accepted and in general + // network I/O progresses until `max_event_lead` is reached. + // + // 3. All connections can continue to receive data and send data + // in the form of events to `NetworkBehaviour::inject_event` until + // `max_event_lead` is reached. if let Some((peer_id, handler, event)) = this.pending_event.take() { if let Some(mut peer) = this.network.peer(peer_id.clone()).into_connected() { match handler { @@ -628,21 +667,33 @@ where TBehaviour: NetworkBehaviour, if let Some(mut conn) = peer.connection(conn_id) { if let Some(event) = notify_one(&mut conn, event, cx) { this.pending_event = Some((peer_id, handler, event)); - return Poll::Pending + if this.event_lead < this.max_event_lead { + continue + } else { + return Poll::Pending + } } }, PendingNotifyHandler::Any(ids) => { if let Some((event, ids)) = notify_any(ids, &mut peer, event, cx) { let handler = PendingNotifyHandler::Any(ids); this.pending_event = Some((peer_id, handler, event)); - return Poll::Pending + if this.event_lead < this.max_event_lead { + continue + } else { + return Poll::Pending + } } } PendingNotifyHandler::All(ids) => { if let Some((event, ids)) = notify_all(ids, &mut peer, event, cx) { let handler = PendingNotifyHandler::All(ids); this.pending_event = Some((peer_id, handler, event)); - return Poll::Pending + if this.event_lead < this.max_event_lead { + continue + } else { + return Poll::Pending + } } } } @@ -651,6 +702,9 @@ where TBehaviour: NetworkBehaviour, debug_assert!(this.pending_event.is_none()); + // Assume the behaviour can make progress, unless determined otherwise. + this.behaviour_pending = false; + let behaviour_poll = { let mut parameters = SwarmPollParameters { local_peer_id: &mut this.network.local_peer_id(), @@ -658,12 +712,28 @@ where TBehaviour: NetworkBehaviour, listened_addrs: &this.listened_addrs, external_addrs: &this.external_addrs }; + this.event_lead = this.event_lead.saturating_sub(1); this.behaviour.poll(cx, &mut parameters) }; match behaviour_poll { - Poll::Pending if network_not_ready => return Poll::Pending, - Poll::Pending => (), + Poll::Pending => { + // The behaviour has nothing to produce, thus it is + // waiting for more network activity and events. + this.behaviour_pending = true; + this.event_lead = 0; + if network_pending { + // The network was polled before polling the + // behaviour and is waiting for I/O, hence there + // is nothing further to do until the next wakeup. + return Poll::Pending + } else { + // The network was either not polled in this iteration + // or emitted a connection event, i.e. may be able to + // make progress, so continue. + continue + } + }, Poll::Ready(NetworkBehaviourAction::GenerateEvent(event)) => { return Poll::Ready(SwarmEvent::Behaviour(event)) }, @@ -707,26 +777,44 @@ where TBehaviour: NetworkBehaviour, NotifyHandler::One(connection) => { if let Some(mut conn) = peer.connection(connection) { if let Some(event) = notify_one(&mut conn, event, cx) { - let handler = PendingNotifyHandler::One(connection); - this.pending_event = Some((peer_id, handler, event)); - return Poll::Pending + if let Err(event) = this.behaviour.inject_connections_busy( + &peer_id, std::iter::once(connection), event + ) { + let handler = PendingNotifyHandler::One(connection); + this.pending_event = Some((peer_id, handler, event)); + // The behaviour must (possibly involuntarily) wait for + // the network to make progress and deliver the event. + this.behaviour_pending = true; + } } } } NotifyHandler::Any => { let ids = peer.connections().into_ids().collect(); if let Some((event, ids)) = notify_any(ids, &mut peer, event, cx) { - let handler = PendingNotifyHandler::Any(ids); - this.pending_event = Some((peer_id, handler, event)); - return Poll::Pending + if let Err(event) = this.behaviour.inject_connections_busy( + &peer_id, ids.iter().copied(), event + ) { + let handler = PendingNotifyHandler::Any(ids); + this.pending_event = Some((peer_id, handler, event)); + // The behaviour must (possibly involuntarily) wait for + // the network to make progress and deliver the event. + this.behaviour_pending = true; + } } } NotifyHandler::All => { let ids = peer.connections().into_ids().collect(); if let Some((event, ids)) = notify_all(ids, &mut peer, event, cx) { - let handler = PendingNotifyHandler::All(ids); - this.pending_event = Some((peer_id, handler, event)); - return Poll::Pending + if let Err(event) = this.behaviour.inject_connections_busy( + &peer_id, ids.iter().copied(), event + ) { + let handler = PendingNotifyHandler::All(ids); + this.pending_event = Some((peer_id, handler, event)); + // The behaviour must (possibly involuntarily) wait for + // the network to make progress and deliver the event. + this.behaviour_pending = true; + } } } } @@ -741,6 +829,7 @@ where TBehaviour: NetworkBehaviour, } }, } + } } } @@ -807,6 +896,14 @@ where TPeerId: Eq + Hash + Clone, TConnInfo: ConnectionInfo { + if ids.len() == 1 { + if let Some(mut conn) = peer.connection(ids[0]) { + return notify_one(&mut conn, event, cx).map(|e| (e, ids)) + } else { + return None + } + } + let mut pending = SmallVec::new(); let mut event = Some(event); // (1) for id in ids.into_iter() { @@ -860,6 +957,8 @@ where if ids.len() == 1 { if let Some(mut conn) = peer.connection(ids[0]) { return notify_one(&mut conn, event, cx).map(|e| (e, ids)) + } else { + return None } } @@ -959,6 +1058,7 @@ pub struct SwarmBuilder { transport: BoxTransport<(TConnInfo, StreamMuxerBox), io::Error>, behaviour: TBehaviour, network_config: NetworkConfig, + max_event_lead: usize, } impl SwarmBuilder @@ -990,6 +1090,7 @@ where TBehaviour: NetworkBehaviour, transport, behaviour, network_config: Default::default(), + max_event_lead: 1, } } @@ -1044,6 +1145,34 @@ where TBehaviour: NetworkBehaviour, self } + /// Configures the maximum lead in terms of number of connection events + /// that may be given to `NetworkBehaviour::inject_event` before a + /// call to `NetworkBehaviour::poll` must be made. Intuitively, + /// this setting determines how quickly back-pressure it exerted + /// on the underlying `Network` - the larger the permitted event + /// lead, the later back-pressure kicks in if a connection is busy. + /// + /// The `Swarm` always tries to `poll` after each call to `inject_event` + /// but may fail to do so if an event emitted by the behaviour cannot + /// currently be delivered to a connection and the behaviour does + /// not itself implement back-pressure via + /// `NetworkBehaviour::inject_connections_busy`, in which case the + /// `Swarm` will simply exert back-pressure on the entire `Network` + /// by waiting until the event can be delivered (or dropped). + /// + /// A configured event lead it > 1 allows the underlying `Network` + /// and the `NetworkBehaviour` to make a limited amount of progress + /// before the next call to `NetworkBehaviour::poll`, even in the + /// case where the behaviour itself does not implement back-pressure. + /// It thus indirectly limits the sizes of buffers in the `NetworkBehaviour` + /// that grow with calls to `NetworkBehaviour::inject_event`, if any. + /// + /// The default value is `1`, i.e. very conservative. + pub fn max_event_lead(mut self, n: usize) -> Self { + self.max_event_lead = n; + self + } + /// Configures a limit for the number of simultaneous incoming /// connection attempts. pub fn incoming_connection_limit(mut self, n: usize) -> Self { @@ -1105,7 +1234,10 @@ where TBehaviour: NetworkBehaviour, listened_addrs: SmallVec::new(), external_addrs: Addresses::default(), banned_peers: HashSet::new(), - pending_event: None + pending_event: None, + max_event_lead: self.max_event_lead, + event_lead: 0, + behaviour_pending: false, } } }