Skip to content

Commit

Permalink
feat: stale filters cleaner (#5080)
Browse files Browse the repository at this point in the history
Co-authored-by: Matthias Seitz <[email protected]>
  • Loading branch information
allnil and mattsse authored Oct 19, 2023
1 parent a136c33 commit 5dd5555
Show file tree
Hide file tree
Showing 4 changed files with 68 additions and 6 deletions.
1 change: 1 addition & 0 deletions crates/rpc/rpc-builder/src/auth.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ where
eth_cache.clone(),
DEFAULT_MAX_LOGS_PER_RESPONSE,
Box::new(executor.clone()),
EthConfig::default().stale_filter_ttl,
);
launch_with_eth_api(eth_api, eth_filter, engine_api, socket_addr, secret).await
}
Expand Down
7 changes: 7 additions & 0 deletions crates/rpc/rpc-builder/src/eth.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,14 @@ pub struct EthConfig {
///
/// Defaults to [RPC_DEFAULT_GAS_CAP]
pub rpc_gas_cap: u64,
///
/// Sets TTL for stale filters
pub stale_filter_ttl: std::time::Duration,
}

/// Default value for stale filter ttl
const DEFAULT_STALE_FILTER_TTL: std::time::Duration = std::time::Duration::from_secs(5 * 60);

impl Default for EthConfig {
fn default() -> Self {
Self {
Expand All @@ -49,6 +55,7 @@ impl Default for EthConfig {
max_tracing_requests: DEFAULT_MAX_TRACING_REQUESTS,
max_logs_per_response: DEFAULT_MAX_LOGS_PER_RESPONSE,
rpc_gas_cap: RPC_DEFAULT_GAS_CAP.into(),
stale_filter_ttl: DEFAULT_STALE_FILTER_TTL,
}
}
}
Expand Down
1 change: 1 addition & 0 deletions crates/rpc/rpc-builder/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1044,6 +1044,7 @@ where
cache.clone(),
self.config.eth.max_logs_per_response,
executor.clone(),
self.config.eth.stale_filter_ttl,
);

let pubsub = EthPubSub::with_spawner(
Expand Down
65 changes: 59 additions & 6 deletions crates/rpc/rpc/src/eth/filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,17 @@ use reth_rpc_api::EthFilterApiServer;
use reth_rpc_types::{Filter, FilterBlockOption, FilterChanges, FilterId, FilteredParams, Log};
use reth_tasks::TaskSpawner;
use reth_transaction_pool::TransactionPool;
use std::{collections::HashMap, iter::StepBy, ops::RangeInclusive, sync::Arc, time::Instant};
use tokio::sync::{mpsc::Receiver, Mutex};
use std::{
collections::HashMap,
iter::StepBy,
ops::RangeInclusive,
sync::Arc,
time::{Duration, Instant},
};
use tokio::{
sync::{mpsc::Receiver, Mutex},
time::MissedTickBehavior,
};
use tracing::trace;

/// The maximum number of headers we read at once when handling a range filter.
Expand All @@ -30,19 +39,26 @@ pub struct EthFilter<Provider, Pool> {
inner: Arc<EthFilterInner<Provider, Pool>>,
}

impl<Provider, Pool> EthFilter<Provider, Pool> {
impl<Provider, Pool> EthFilter<Provider, Pool>
where
Provider: Send + Sync + 'static,
Pool: Send + Sync + 'static,
{
/// Creates a new, shareable instance.
///
/// This uses the given pool to get notified about new transactions, the provider to interact
/// with the blockchain, the cache to fetch cacheable data, like the logs and the
/// max_logs_per_response to limit the amount of logs returned in a single response
/// `eth_getLogs`
///
/// This also spawns a task that periodically clears stale filters.
pub fn new(
provider: Provider,
pool: Pool,
eth_cache: EthStateCache,
max_logs_per_response: usize,
task_spawner: Box<dyn TaskSpawner>,
stale_filter_ttl: Duration,
) -> Self {
let inner = EthFilterInner {
provider,
Expand All @@ -53,14 +69,51 @@ impl<Provider, Pool> EthFilter<Provider, Pool> {
eth_cache,
max_headers_range: MAX_HEADERS_RANGE,
task_spawner,
stale_filter_ttl,
};
Self { inner: Arc::new(inner) }

let eth_filter = Self { inner: Arc::new(inner) };

let this = eth_filter.clone();
eth_filter.inner.task_spawner.clone().spawn_critical(
"eth-filters_stale-filters-clean",
Box::pin(async move {
this.watch_and_clear_stale_filters().await;
}),
);

eth_filter
}

/// Returns all currently active filters
pub fn active_filters(&self) -> &ActiveFilters {
&self.inner.active_filters
}

/// Endless future that [Self::clear_stale_filters] every `stale_filter_ttl` interval.
async fn watch_and_clear_stale_filters(&self) {
let mut interval = tokio::time::interval(self.inner.stale_filter_ttl);
interval.set_missed_tick_behavior(MissedTickBehavior::Delay);
loop {
interval.tick().await;
self.clear_stale_filters(Instant::now()).await;
}
}

/// Clears all filters that have not been polled for longer than the configured
/// `stale_filter_ttl` at the given instant.
pub async fn clear_stale_filters(&self, now: Instant) {
trace!(target: "rpc::eth", "clear stale filters");
self.active_filters().inner.lock().await.retain(|id, filter| {
let is_valid = (now - filter.last_poll_timestamp) < self.inner.stale_filter_ttl;

if !is_valid {
trace!(target: "rpc::eth", "evict filter with id: {:?}", id);
}

is_valid
})
}
}

impl<Provider, Pool> EthFilter<Provider, Pool>
Expand Down Expand Up @@ -244,7 +297,6 @@ impl<Provider, Pool> Clone for EthFilter<Provider, Pool> {
#[derive(Debug)]
struct EthFilterInner<Provider, Pool> {
/// The transaction pool.
#[allow(unused)] // we need this for non standard full transactions eventually
pool: Pool,
/// The provider that can interact with the chain.
provider: Provider,
Expand All @@ -259,8 +311,9 @@ struct EthFilterInner<Provider, Pool> {
/// maximum number of headers to read at once for range filter
max_headers_range: u64,
/// The type that can spawn tasks.
#[allow(unused)]
task_spawner: Box<dyn TaskSpawner>,
/// Duration since the last filter poll, after which the filter is considered stale
stale_filter_ttl: Duration,
}

impl<Provider, Pool> EthFilterInner<Provider, Pool>
Expand Down

0 comments on commit 5dd5555

Please sign in to comment.