Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

swarm: Design utility type for tracking pending stream openings #4510

Closed
Tracked by #3268
thomaseizinger opened this issue Sep 18, 2023 · 1 comment
Closed
Tracked by #3268
Assignees
Labels
decision-pending Marks issues where a decision is pending before we can move forward. difficulty:hard

Comments

@thomaseizinger
Copy link
Contributor

thomaseizinger commented Sep 18, 2023

In #4275, we made a big effort towards #3268 but it became apparent that for more complex protocols like relay, having to re-code the state tracking for pending outbound streams is cumbersome and should be dealt with more proper.

I've captured some ideas in https://github.com/libp2p/rust-libp2p/pull/4275/files/f3d01ba56731b378a65778d2e2b3d0a2a3bad468#r1314359395 but they should be no means be taken literally. Very likely, this needs a few iterations before we know what this type should look like and offer.

A rough design could be:

struct OutboundStreams<TInfo> {
	info: VecDeque<TInfo>,
	open_stream_results: VecDeque<Result<Stream, StreamUpgradeError<()>>>,

	// possibly more state ...
}

impl<TInfo> OutboundStreams<TInfo> {
	pub fn request_new_stream(&mut self, info: TInfo);

	pub fn on_new_stream(&mut self, stream: Stream);
	pub fn on_stream_error(&mut self, error: StreamUpgradeError<()>);
	
	pub fn poll(&mut self, cx: Context<'_>) -> Poll<Event<TInfo>>>;
}

enum Event<TInfo> {
	
	Stream(Result<(Stream, TInfo), Error<TInfo>),
	/// Instruct the `ConnectionHandler` to request a new stream.
	RequestNewStream
}

enum Error<TInfo> {
	Timeout(TInfo),
    	Unsupported(TInfo),
	Io((TInfo, io::Error)),
}

The problem we want to solve is that users of this struct should have it easy in terms of correctly tracking the "info" state. The crucial bit here is that there should always be 1 TInfo instance per pending stream but a pending stream can result in a stream or an error!

Currently, those are two different events:

pub enum ConnectionEvent<'a, IP: InboundUpgradeSend, OP: OutboundUpgradeSend, IOI, OOI> {
/// Informs the handler about the output of a successful upgrade on a new inbound substream.
FullyNegotiatedInbound(FullyNegotiatedInbound<IP, IOI>),
/// Informs the handler about the output of a successful upgrade on a new outbound stream.
FullyNegotiatedOutbound(FullyNegotiatedOutbound<OP, OOI>),
/// Informs the handler about a change in the address of the remote.
AddressChange(AddressChange<'a>),
/// Informs the handler that upgrading an outbound substream to the given protocol has failed.
DialUpgradeError(DialUpgradeError<OOI, OP>),
/// Informs the handler that upgrading an inbound substream to the given protocol has failed.
ListenUpgradeError(ListenUpgradeError<IOI, IP>),
/// The local [`ConnectionHandler`] added or removed support for one or more protocols.
LocalProtocolsChange(ProtocolsChange<'a>),
/// The remote [`ConnectionHandler`] now supports a different set of protocols.
RemoteProtocolsChange(ProtocolsChange<'a>),
}

  • FullyNegotiatedOutbound
  • DialUpgradeError

Users must remember that they need to pop off an instance of their pending TInfos for either event. The above API design has two functions for this: on_new_stream and on_stream_error. This makes it a bit easier but I think overall, it would be better to redesign these events to be something like:

pub enum ConnectionEvent<'a, IP: InboundUpgradeSend, OP: OutboundUpgradeSend, IOI, OOI> {
    InboundStreamResult(Result<FullyNegotiatedInbound<IP, IOI>, ListenUpgradeError<IOI, IP>>),
    OutboundStreamResult(Result<FullyNegotiatedOutbound<OP, OOI>, DialUpgradeError<OOI, OP>>),
	
    /// Informs the handler about a change in the address of the remote.
    AddressChange(AddressChange<'a>),
    /// The local [`ConnectionHandler`] added or removed support for one or more protocols.
    LocalProtocolsChange(ProtocolsChange<'a>),
    /// The remote [`ConnectionHandler`] now supports a different set of protocols.
    RemoteProtocolsChange(ProtocolsChange<'a>),
}

Note how the above design only issues a single event for success and error cases. This would allow us to dispatch to a single callback on OutboundStreams and do the proper accounting of TInfos in there for the user.

@thomaseizinger thomaseizinger added this to the Simplify ConnectionHandler trait milestone Sep 18, 2023
@thomaseizinger thomaseizinger added the decision-pending Marks issues where a decision is pending before we can move forward. label Sep 18, 2023
@thomaseizinger thomaseizinger removed this from the Simplify ConnectionHandler trait milestone Sep 19, 2023
@thomaseizinger thomaseizinger self-assigned this Nov 10, 2023
@thomaseizinger
Copy link
Contributor Author

I am closing this in favor of the pattern to use oneshots and moving the Receiver directly into an async block that executes the rest of the protocol.

@thomaseizinger thomaseizinger closed this as not planned Won't fix, can't repro, duplicate, stale Nov 20, 2023
mergify bot pushed a commit that referenced this issue Nov 22, 2023
This refactoring addresses several aspects of the current handler implementation:

- Remove the manual state machine for outbound streams in favor of using `async-await`.
- Use `oneshot`s to track the number of requested outbound streams
- Use `futures_bounded::FuturesMap` to track the execution of a stream, thus applying a timeout to the entire request.

Resolves: #3130.
Related: #3268.
Related: #4510.

Pull-Request: #4901.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
decision-pending Marks issues where a decision is pending before we can move forward. difficulty:hard
Projects
None yet
Development

Successfully merging a pull request may close this issue.

1 participant