This repository has been archived by the owner on Oct 19, 2024. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 791
feat: add EventStream::select to combine multiple event streams #725
Merged
Merged
Changes from all commits
Commits
Show all changes
2 commits
Select commit
Hold shift + click to select a range
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,6 +1,9 @@ | ||
use crate::LogMeta; | ||
use ethers_core::types::{Log, U256}; | ||
use futures_util::stream::{Stream, StreamExt}; | ||
use futures_util::{ | ||
future::Either, | ||
stream::{Stream, StreamExt}, | ||
}; | ||
use pin_project::pin_project; | ||
use std::{ | ||
pin::Pin, | ||
|
@@ -23,6 +26,7 @@ pub struct EventStream<'a, T, R, E> { | |
} | ||
|
||
impl<'a, T, R, E> EventStream<'a, T, R, E> { | ||
/// Turns this stream of events into a stream that also yields the event's metadata | ||
pub fn with_meta(self) -> EventStreamMeta<'a, T, R, E> { | ||
EventStreamMeta(self) | ||
} | ||
|
@@ -49,9 +53,140 @@ where | |
} | ||
} | ||
|
||
impl<'a, T, R, E> EventStream<'a, T, R, E> | ||
where | ||
T: Stream<Item = Log> + Unpin + 'a, | ||
R: 'a, | ||
E: 'a, | ||
{ | ||
/// This function will attempt to pull events from both event streams. Each | ||
/// stream will be polled in a round-robin fashion, and whenever a stream is | ||
/// ready to yield an event that event is yielded. | ||
/// | ||
/// After one of the two event streams completes, the remaining one will be | ||
/// polled exclusively. The returned stream completes when both input | ||
/// streams have completed. | ||
/// | ||
/// | ||
/// Note that this function consumes both streams and returns a wrapped | ||
/// version of them. | ||
/// The item of the wrapped stream is an `Either`, and the items that the `self` streams yields | ||
/// will be stored in the left-hand variant of that `Either` and the other stream's (`st`) items | ||
/// will be wrapped into the right-hand variant of that `Either`. | ||
/// | ||
/// # Example | ||
/// | ||
/// ``` | ||
/// # async fn test<M:ethers_providers::Middleware>(contract: ethers_contract::Contract<M>) { | ||
/// # use ethers_core::types::*; | ||
/// # use futures_util::stream::StreamExt; | ||
/// # use futures_util::future::Either; | ||
/// # use ethers_contract::{Contract, ContractFactory, EthEvent}; | ||
/// | ||
/// #[derive(Clone, Debug, EthEvent)] | ||
/// pub struct Approval { | ||
/// #[ethevent(indexed)] | ||
/// pub token_owner: Address, | ||
/// #[ethevent(indexed)] | ||
/// pub spender: Address, | ||
/// pub tokens: U256, | ||
/// } | ||
/// | ||
/// #[derive(Clone, Debug, EthEvent)] | ||
/// pub struct Transfer { | ||
/// #[ethevent(indexed)] | ||
/// pub from: Address, | ||
/// #[ethevent(indexed)] | ||
/// pub to: Address, | ||
/// pub tokens: U256, | ||
/// } | ||
/// | ||
/// | ||
/// let ev1 = contract.event::<Approval>().from_block(1337).to_block(2000); | ||
/// let ev2 = contract.event::<Transfer>(); | ||
/// | ||
/// let mut events = ev1.stream().await.unwrap().select(ev2.stream().await.unwrap()).ok(); | ||
/// | ||
/// while let Some(either) = events.next().await { | ||
/// match either { | ||
/// Either::Left(approval) => { let Approval{token_owner,spender,tokens} = approval; } | ||
/// Either::Right(transfer) => { let Transfer{from,to,tokens} = transfer; } | ||
/// } | ||
/// } | ||
/// | ||
/// # } | ||
/// ``` | ||
pub fn select<St>(self, st: St) -> SelectEvent<SelectEither<'a, Result<R, E>, St::Item>> | ||
where | ||
St: Stream + Unpin + 'a, | ||
{ | ||
SelectEvent(Box::pin(futures_util::stream::select( | ||
self.map(Either::Left), | ||
st.map(Either::Right), | ||
))) | ||
} | ||
} | ||
|
||
pub type SelectEither<'a, L, R> = Pin<Box<dyn Stream<Item = Either<L, R>> + 'a>>; | ||
|
||
#[pin_project] | ||
pub struct SelectEvent<T>(#[pin] T); | ||
|
||
impl<'a, T, L, LE, R, RE> SelectEvent<T> | ||
where | ||
T: Stream<Item = Either<Result<L, LE>, Result<R, RE>>> + 'a, | ||
L: 'a, | ||
LE: 'a, | ||
R: 'a, | ||
RE: 'a, | ||
{ | ||
/// Turns a stream of Results to a stream of `Result::ok` for both arms | ||
pub fn ok(self) -> Pin<Box<dyn Stream<Item = Either<L, R>> + 'a>> { | ||
Box::pin(self.filter_map(|e| async move { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do we want the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. yes this skips all erroneous values, but is probably useful if you're only interested in the |
||
match e { | ||
Either::Left(res) => res.ok().map(Either::Left), | ||
Either::Right(res) => res.ok().map(Either::Right), | ||
} | ||
})) | ||
} | ||
} | ||
|
||
impl<T: Stream> Stream for SelectEvent<T> { | ||
type Item = T::Item; | ||
|
||
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { | ||
let this = self.project(); | ||
this.0.poll_next(cx) | ||
} | ||
} | ||
|
||
/// Wrapper around a `EventStream`, that in addition to the deserialized Event type also yields the | ||
/// `LogMeta`. | ||
#[pin_project] | ||
pub struct EventStreamMeta<'a, T, R, E>(pub EventStream<'a, T, R, E>); | ||
|
||
impl<'a, T, R, E> EventStreamMeta<'a, T, R, E> | ||
where | ||
T: Stream<Item = Log> + Unpin + 'a, | ||
R: 'a, | ||
E: 'a, | ||
{ | ||
/// See `EventStream::select` | ||
#[allow(clippy::type_complexity)] | ||
pub fn select<St>( | ||
self, | ||
st: St, | ||
) -> SelectEvent<SelectEither<'a, Result<(R, LogMeta), E>, St::Item>> | ||
where | ||
St: Stream + Unpin + 'a, | ||
{ | ||
SelectEvent(Box::pin(futures_util::stream::select( | ||
self.map(Either::Left), | ||
st.map(Either::Right), | ||
))) | ||
} | ||
} | ||
|
||
impl<'a, T, R, E> Stream for EventStreamMeta<'a, T, R, E> | ||
where | ||
T: Stream<Item = Log> + Unpin, | ||
|
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wonder if there's a world where we do this with a macro instead of a function (like the futures
select!
), so that we are not limited to only selecting 2 events?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
driving multiple streams to completion with
select!
is a bit verbose and the core problem is that we need to return a type that represents all the different events.you can do this with
EventStream::select
which will return an
Either<OtherEvent, Either<Approval, Transfer>>
But there'd be also the possibility to create your own event type and use the
Contract::event_with_filter
functionThere was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, right! OK if that pattern works then should be fine.