Skip to content
This repository has been archived by the owner on Oct 19, 2024. It is now read-only.

Commit

Permalink
feat: add paginated logs
Browse files Browse the repository at this point in the history
  • Loading branch information
meetmangukiya committed May 19, 2022
1 parent 75ba3f4 commit 28ac213
Show file tree
Hide file tree
Showing 5 changed files with 172 additions and 1 deletion.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,8 @@

### Unreleased

- Stream of paginated logs that load logs in small pages
[1285](https://github.com/gakonst/ethers-rs/pull/1285)
- Load previous logs before subscribing to new logs in case fromBlock is set
[1264](https://github.com/gakonst/ethers-rs/pull/1264)
- Add retries to the pending transaction future
Expand Down
21 changes: 21 additions & 0 deletions ethers-core/src/types/log.rs
Original file line number Diff line number Diff line change
Expand Up @@ -371,6 +371,27 @@ impl Filter {
self.topics[3] = Some(topic.into());
self
}

pub fn is_paginatable(&self) -> bool {
match self.block_option {
FilterBlockOption::AtBlockHash(_hash) => false,
FilterBlockOption::Range { from_block, to_block: _ } => {
if from_block.is_none() {
return false
}
from_block.unwrap().as_number().is_some()
}
}
}

pub fn get_from_block(&self) -> Option<U64> {
match self.block_option {
FilterBlockOption::AtBlockHash(_hash) => None,
FilterBlockOption::Range { from_block, to_block: _ } => {
from_block.map(|block| block.as_number()).unwrap_or(None)
}
}
}
}

/// Union type for representing a single value or a vector of values inside a filter
Expand Down
12 changes: 12 additions & 0 deletions ethers-providers/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@ pub use pending_transaction::PendingTransaction;
mod pending_escalator;
pub use pending_escalator::EscalatingPending;

mod log_query;
pub use log_query::LogQuery;

mod stream;
pub use futures_util::StreamExt;
pub use stream::{interval, FilterWatcher, TransactionStream, DEFAULT_POLL_INTERVAL};
Expand Down Expand Up @@ -421,6 +424,15 @@ pub trait Middleware: Sync + Send + Debug {
self.inner().get_logs(filter).await.map_err(FromErr::from)
}

/// Returns a stream of logs are loaded in pages of given page size
fn get_logs_paginated<'a>(
&'a self,
filter: &Filter,
page_size: u64,
) -> LogQuery<'a, Self::Provider> {
self.inner().get_logs_paginated(filter, page_size)
}

async fn new_filter(&self, filter: FilterKind<'_>) -> Result<U256, Self::Error> {
self.inner().new_filter(filter).await.map_err(FromErr::from)
}
Expand Down
132 changes: 132 additions & 0 deletions ethers-providers/src/log_query.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
use super::{JsonRpcClient, Middleware, PinBoxFut, Provider};
use ethers_core::types::{Filter, Log, U64};
use futures_core::stream::Stream;
use std::{
collections::VecDeque,
pin::Pin,
task::{Context, Poll},
};

pub struct LogQuery<'a, P> {
provider: &'a Provider<P>,
filter: Filter,
from_block: Option<U64>,
page_size: u64,
current_logs: VecDeque<Log>,
last_block: Option<U64>,
state: LogQueryState<'a>,
}

enum LogQueryState<'a> {
Initial,
LoadLastBlock(PinBoxFut<'a, U64>),
LoadLogs(PinBoxFut<'a, Vec<Log>>),
Consume,
}

impl<'a, P> LogQuery<'a, P>
where
P: JsonRpcClient,
{
pub fn new(provider: &'a Provider<P>, filter: &Filter) -> Self {
Self {
provider,
filter: filter.clone(),
from_block: filter.get_from_block(),
page_size: 10000,
current_logs: VecDeque::new(),
last_block: None,
state: LogQueryState::Initial,
}
}

/// set page size for pagination
pub fn with_page_size(mut self, page_size: u64) -> Self {
self.page_size = page_size;
self
}
}

macro_rules! rewake_with_new_state {
($ctx:ident, $this:ident, $new_state:expr) => {
$this.state = $new_state;
$ctx.waker().wake_by_ref();
return Poll::Pending
};
}

impl<'a, P> Unpin for LogQuery<'a, P> {}

impl<'a, P> Stream for LogQuery<'a, P>
where
P: JsonRpcClient,
{
type Item = Log;

fn poll_next(mut self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
match &mut self.state {
LogQueryState::Initial => {
if !self.filter.is_paginatable() {
// if not paginatable, load logs and consume
let filter = self.filter.clone();
let provider = self.provider;
let fut = Box::pin(async move { provider.get_logs(&filter).await });
rewake_with_new_state!(ctx, self, LogQueryState::LoadLogs(fut));
} else {
// if paginatable, load last block
let fut = self.provider.get_block_number();
rewake_with_new_state!(ctx, self, LogQueryState::LoadLastBlock(fut));
}
}
LogQueryState::LoadLastBlock(fut) => {
self.last_block = Some(
futures_util::ready!(fut.as_mut().poll(ctx))
.expect("error occurred loading last block"),
);

let from_block = self.filter.get_from_block().unwrap();
let to_block = from_block + self.page_size;
self.from_block = Some(to_block);

let filter = self.filter.clone().from_block(from_block).to_block(to_block);
let provider = self.provider;
// load first page of logs
let fut = Box::pin(async move { provider.get_logs(&filter).await });
rewake_with_new_state!(ctx, self, LogQueryState::LoadLogs(fut));
}
LogQueryState::LoadLogs(fut) => {
let logs = futures_util::ready!(fut.as_mut().poll(ctx))
.expect("error occurred loading logs");
self.current_logs = VecDeque::from(logs);
rewake_with_new_state!(ctx, self, LogQueryState::Consume);
}
LogQueryState::Consume => {
let log = self.current_logs.pop_front();
if log.is_none() {
// consumed all the logs
if !self.filter.is_paginatable() {
Poll::Ready(None)
} else {
// load new logs if there are still more pages to go through
let from_block = self.from_block.unwrap();
let to_block = from_block + self.page_size;

// no more pages to load, and everything is consumed
if from_block > self.last_block.unwrap() {
return Poll::Ready(None)
}
// load next page
self.from_block = Some(to_block);

let filter = self.filter.clone().from_block(from_block).to_block(to_block);
let provider = self.provider;
let fut = Box::pin(async move { provider.get_logs(&filter).await });
rewake_with_new_state!(ctx, self, LogQueryState::LoadLogs(fut));
}
} else {
Poll::Ready(log)
}
}
}
}
}
6 changes: 5 additions & 1 deletion ethers-providers/src/provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use crate::{
ens, erc, maybe,
pubsub::{PubsubClient, SubscriptionStream},
stream::{FilterWatcher, DEFAULT_POLL_INTERVAL},
FromErr, Http as HttpProvider, JsonRpcClient, JsonRpcClientWrapper, MockProvider,
FromErr, Http as HttpProvider, JsonRpcClient, JsonRpcClientWrapper, LogQuery, MockProvider,
PendingTransaction, QuorumProvider, RwClient, SyncingStatus,
};

Expand Down Expand Up @@ -647,6 +647,10 @@ impl<P: JsonRpcClient> Middleware for Provider<P> {
self.request("eth_getLogs", [filter]).await
}

fn get_logs_paginated<'a>(&'a self, filter: &Filter, page_size: u64) -> LogQuery<'a, P> {
LogQuery::new(self, filter).with_page_size(page_size)
}

/// Streams matching filter logs
async fn watch<'a>(
&'a self,
Expand Down

0 comments on commit 28ac213

Please sign in to comment.