Skip to content
This repository has been archived by the owner on Oct 19, 2024. It is now read-only.

Commit

Permalink
feat: add EventStream::select to combine multiple event streams (#725)
Browse files Browse the repository at this point in the history
* feat: add EventStream::select to combine multiple event streams

* chore: update changelog
  • Loading branch information
mattsse authored Dec 23, 2021
1 parent ade8cd9 commit 11406ee
Show file tree
Hide file tree
Showing 2 changed files with 138 additions and 1 deletion.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,8 @@

### Unreleased

- Add `EventStream::select` to combine streams with different event types
[#725](https://github.com/gakonst/ethers-rs/pull/725)
- Substitute output tuples with rust struct types for function calls
[#664](https://github.com/gakonst/ethers-rs/pull/664)
- Add AbiType implementation during EthAbiType expansion
Expand Down
137 changes: 136 additions & 1 deletion ethers-contract/src/stream.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
use crate::LogMeta;
use ethers_core::types::{Log, U256};
use futures_util::stream::{Stream, StreamExt};
use futures_util::{
future::Either,
stream::{Stream, StreamExt},
};
use pin_project::pin_project;
use std::{
pin::Pin,
Expand All @@ -23,6 +26,7 @@ pub struct EventStream<'a, T, R, E> {
}

impl<'a, T, R, E> EventStream<'a, T, R, E> {
/// Turns this stream of events into a stream that also yields the event's metadata
pub fn with_meta(self) -> EventStreamMeta<'a, T, R, E> {
EventStreamMeta(self)
}
Expand All @@ -49,9 +53,140 @@ where
}
}

impl<'a, T, R, E> EventStream<'a, T, R, E>
where
T: Stream<Item = Log> + Unpin + 'a,
R: 'a,
E: 'a,
{
/// This function will attempt to pull events from both event streams. Each
/// stream will be polled in a round-robin fashion, and whenever a stream is
/// ready to yield an event that event is yielded.
///
/// After one of the two event streams completes, the remaining one will be
/// polled exclusively. The returned stream completes when both input
/// streams have completed.
///
///
/// Note that this function consumes both streams and returns a wrapped
/// version of them.
/// The item of the wrapped stream is an `Either`, and the items that the `self` streams yields
/// will be stored in the left-hand variant of that `Either` and the other stream's (`st`) items
/// will be wrapped into the right-hand variant of that `Either`.
///
/// # Example
///
/// ```
/// # async fn test<M:ethers_providers::Middleware>(contract: ethers_contract::Contract<M>) {
/// # use ethers_core::types::*;
/// # use futures_util::stream::StreamExt;
/// # use futures_util::future::Either;
/// # use ethers_contract::{Contract, ContractFactory, EthEvent};
///
/// #[derive(Clone, Debug, EthEvent)]
/// pub struct Approval {
/// #[ethevent(indexed)]
/// pub token_owner: Address,
/// #[ethevent(indexed)]
/// pub spender: Address,
/// pub tokens: U256,
/// }
///
/// #[derive(Clone, Debug, EthEvent)]
/// pub struct Transfer {
/// #[ethevent(indexed)]
/// pub from: Address,
/// #[ethevent(indexed)]
/// pub to: Address,
/// pub tokens: U256,
/// }
///
///
/// let ev1 = contract.event::<Approval>().from_block(1337).to_block(2000);
/// let ev2 = contract.event::<Transfer>();
///
/// let mut events = ev1.stream().await.unwrap().select(ev2.stream().await.unwrap()).ok();
///
/// while let Some(either) = events.next().await {
/// match either {
/// Either::Left(approval) => { let Approval{token_owner,spender,tokens} = approval; }
/// Either::Right(transfer) => { let Transfer{from,to,tokens} = transfer; }
/// }
/// }
///
/// # }
/// ```
pub fn select<St>(self, st: St) -> SelectEvent<SelectEither<'a, Result<R, E>, St::Item>>
where
St: Stream + Unpin + 'a,
{
SelectEvent(Box::pin(futures_util::stream::select(
self.map(Either::Left),
st.map(Either::Right),
)))
}
}

pub type SelectEither<'a, L, R> = Pin<Box<dyn Stream<Item = Either<L, R>> + 'a>>;

#[pin_project]
pub struct SelectEvent<T>(#[pin] T);

impl<'a, T, L, LE, R, RE> SelectEvent<T>
where
T: Stream<Item = Either<Result<L, LE>, Result<R, RE>>> + 'a,
L: 'a,
LE: 'a,
R: 'a,
RE: 'a,
{
/// Turns a stream of Results to a stream of `Result::ok` for both arms
pub fn ok(self) -> Pin<Box<dyn Stream<Item = Either<L, R>> + 'a>> {
Box::pin(self.filter_map(|e| async move {
match e {
Either::Left(res) => res.ok().map(Either::Left),
Either::Right(res) => res.ok().map(Either::Right),
}
}))
}
}

impl<T: Stream> Stream for SelectEvent<T> {
type Item = T::Item;

fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let this = self.project();
this.0.poll_next(cx)
}
}

/// Wrapper around a `EventStream`, that in addition to the deserialized Event type also yields the
/// `LogMeta`.
#[pin_project]
pub struct EventStreamMeta<'a, T, R, E>(pub EventStream<'a, T, R, E>);

impl<'a, T, R, E> EventStreamMeta<'a, T, R, E>
where
T: Stream<Item = Log> + Unpin + 'a,
R: 'a,
E: 'a,
{
/// See `EventStream::select`
#[allow(clippy::type_complexity)]
pub fn select<St>(
self,
st: St,
) -> SelectEvent<SelectEither<'a, Result<(R, LogMeta), E>, St::Item>>
where
St: Stream + Unpin + 'a,
{
SelectEvent(Box::pin(futures_util::stream::select(
self.map(Either::Left),
st.map(Either::Right),
)))
}
}

impl<'a, T, R, E> Stream for EventStreamMeta<'a, T, R, E>
where
T: Stream<Item = Log> + Unpin,
Expand Down

0 comments on commit 11406ee

Please sign in to comment.