From 8445079622626d868c05d7e0f59237445d231950 Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Fri, 28 Apr 2023 13:47:10 +0200 Subject: [PATCH] refactor(perf): don't use `OutboundOpenInfo` Instead of passing the command along, we store it in a buffer and retrieve it once the stream is upgraded. Related: https://github.com/libp2p/rust-libp2p/issues/3268. Pull-Request: #3763. --- protocols/perf/src/client/handler.rs | 52 +++++++++++++++++----------- 1 file changed, 32 insertions(+), 20 deletions(-) diff --git a/protocols/perf/src/client/handler.rs b/protocols/perf/src/client/handler.rs index f75a43f0a4e..d8e3854a91d 100644 --- a/protocols/perf/src/client/handler.rs +++ b/protocols/perf/src/client/handler.rs @@ -61,6 +61,8 @@ pub struct Handler { >, >, + requested_streams: VecDeque, + outbound: FuturesUnordered>>, keep_alive: KeepAlive, @@ -70,6 +72,7 @@ impl Handler { pub fn new() -> Self { Self { queued_events: Default::default(), + requested_streams: Default::default(), outbound: Default::default(), keep_alive: KeepAlive::Yes, } @@ -88,7 +91,7 @@ impl ConnectionHandler for Handler { type Error = Void; type InboundProtocol = DeniedUpgrade; type OutboundProtocol = ReadyUpgrade<&'static [u8]>; - type OutboundOpenInfo = Command; + type OutboundOpenInfo = (); type InboundOpenInfo = (); fn listen_protocol(&self) -> SubstreamProtocol { @@ -96,9 +99,10 @@ impl ConnectionHandler for Handler { } fn on_behaviour_event(&mut self, command: Self::InEvent) { + self.requested_streams.push_back(command); self.queued_events .push_back(ConnectionHandlerEvent::OutboundSubstreamRequest { - protocol: SubstreamProtocol::new(ReadyUpgrade::new(crate::PROTOCOL_NAME), command), + protocol: SubstreamProtocol::new(ReadyUpgrade::new(crate::PROTOCOL_NAME), ()), }) } @@ -117,26 +121,34 @@ impl ConnectionHandler for Handler { }) => void::unreachable(protocol), ConnectionEvent::FullyNegotiatedOutbound(FullyNegotiatedOutbound { protocol, - info: Command { params, id }, - }) => self.outbound.push( - crate::protocol::send_receive(params, protocol) - .map_ok(move |timers| Event { - id, - result: Ok(RunStats { params, timers }), - }) - .boxed(), - ), + info: (), + }) => { + let Command { id, params } = self + .requested_streams + .pop_front() + .expect("opened a stream without a pending command"); + self.outbound.push( + crate::protocol::send_receive(params, protocol) + .map_ok(move |timers| Event { + id, + result: Ok(RunStats { params, timers }), + }) + .boxed(), + ); + } ConnectionEvent::AddressChange(_) => {} - ConnectionEvent::DialUpgradeError(DialUpgradeError { - info: Command { id, .. }, - error, - }) => self - .queued_events - .push_back(ConnectionHandlerEvent::Custom(Event { - id, - result: Err(error), - })), + ConnectionEvent::DialUpgradeError(DialUpgradeError { info: (), error }) => { + let Command { id, .. } = self + .requested_streams + .pop_front() + .expect("requested stream without pending command"); + self.queued_events + .push_back(ConnectionHandlerEvent::Custom(Event { + id, + result: Err(error), + })); + } ConnectionEvent::ListenUpgradeError(ListenUpgradeError { info: (), error }) => { match error { ConnectionHandlerUpgrErr::Timeout => {}