diff --git a/crates/rpc/rpc-builder/src/auth.rs b/crates/rpc/rpc-builder/src/auth.rs index 6c55fac73421..f3a1ba660820 100644 --- a/crates/rpc/rpc-builder/src/auth.rs +++ b/crates/rpc/rpc-builder/src/auth.rs @@ -74,6 +74,7 @@ where eth_cache.clone(), DEFAULT_MAX_LOGS_PER_RESPONSE, Box::new(executor.clone()), + EthConfig::default().stale_filter_ttl, ); launch_with_eth_api(eth_api, eth_filter, engine_api, socket_addr, secret).await } diff --git a/crates/rpc/rpc-builder/src/eth.rs b/crates/rpc/rpc-builder/src/eth.rs index 55684ecd280d..e3b9d4dcc735 100644 --- a/crates/rpc/rpc-builder/src/eth.rs +++ b/crates/rpc/rpc-builder/src/eth.rs @@ -39,8 +39,14 @@ pub struct EthConfig { /// /// Defaults to [RPC_DEFAULT_GAS_CAP] pub rpc_gas_cap: u64, + /// + /// Sets TTL for stale filters + pub stale_filter_ttl: std::time::Duration, } +/// Default value for stale filter ttl +const DEFAULT_STALE_FILTER_TTL: std::time::Duration = std::time::Duration::from_secs(5 * 60); + impl Default for EthConfig { fn default() -> Self { Self { @@ -49,6 +55,7 @@ impl Default for EthConfig { max_tracing_requests: DEFAULT_MAX_TRACING_REQUESTS, max_logs_per_response: DEFAULT_MAX_LOGS_PER_RESPONSE, rpc_gas_cap: RPC_DEFAULT_GAS_CAP.into(), + stale_filter_ttl: DEFAULT_STALE_FILTER_TTL, } } } diff --git a/crates/rpc/rpc-builder/src/lib.rs b/crates/rpc/rpc-builder/src/lib.rs index 1395286cfef7..3e3ce098023e 100644 --- a/crates/rpc/rpc-builder/src/lib.rs +++ b/crates/rpc/rpc-builder/src/lib.rs @@ -1044,6 +1044,7 @@ where cache.clone(), self.config.eth.max_logs_per_response, executor.clone(), + self.config.eth.stale_filter_ttl, ); let pubsub = EthPubSub::with_spawner( diff --git a/crates/rpc/rpc/src/eth/filter.rs b/crates/rpc/rpc/src/eth/filter.rs index 8facee2ead48..5b0cc631ba98 100644 --- a/crates/rpc/rpc/src/eth/filter.rs +++ b/crates/rpc/rpc/src/eth/filter.rs @@ -17,8 +17,17 @@ use reth_rpc_api::EthFilterApiServer; use reth_rpc_types::{Filter, FilterBlockOption, FilterChanges, FilterId, FilteredParams, Log}; use reth_tasks::TaskSpawner; use reth_transaction_pool::TransactionPool; -use std::{collections::HashMap, iter::StepBy, ops::RangeInclusive, sync::Arc, time::Instant}; -use tokio::sync::{mpsc::Receiver, Mutex}; +use std::{ + collections::HashMap, + iter::StepBy, + ops::RangeInclusive, + sync::Arc, + time::{Duration, Instant}, +}; +use tokio::{ + sync::{mpsc::Receiver, Mutex}, + time::MissedTickBehavior, +}; use tracing::trace; /// The maximum number of headers we read at once when handling a range filter. @@ -30,19 +39,26 @@ pub struct EthFilter { inner: Arc>, } -impl EthFilter { +impl EthFilter +where + Provider: Send + Sync + 'static, + Pool: Send + Sync + 'static, +{ /// Creates a new, shareable instance. /// /// This uses the given pool to get notified about new transactions, the provider to interact /// with the blockchain, the cache to fetch cacheable data, like the logs and the /// max_logs_per_response to limit the amount of logs returned in a single response /// `eth_getLogs` + /// + /// This also spawns a task that periodically clears stale filters. pub fn new( provider: Provider, pool: Pool, eth_cache: EthStateCache, max_logs_per_response: usize, task_spawner: Box, + stale_filter_ttl: Duration, ) -> Self { let inner = EthFilterInner { provider, @@ -53,14 +69,51 @@ impl EthFilter { eth_cache, max_headers_range: MAX_HEADERS_RANGE, task_spawner, + stale_filter_ttl, }; - Self { inner: Arc::new(inner) } + + let eth_filter = Self { inner: Arc::new(inner) }; + + let this = eth_filter.clone(); + eth_filter.inner.task_spawner.clone().spawn_critical( + "eth-filters_stale-filters-clean", + Box::pin(async move { + this.watch_and_clear_stale_filters().await; + }), + ); + + eth_filter } /// Returns all currently active filters pub fn active_filters(&self) -> &ActiveFilters { &self.inner.active_filters } + + /// Endless future that [Self::clear_stale_filters] every `stale_filter_ttl` interval. + async fn watch_and_clear_stale_filters(&self) { + let mut interval = tokio::time::interval(self.inner.stale_filter_ttl); + interval.set_missed_tick_behavior(MissedTickBehavior::Delay); + loop { + interval.tick().await; + self.clear_stale_filters(Instant::now()).await; + } + } + + /// Clears all filters that have not been polled for longer than the configured + /// `stale_filter_ttl` at the given instant. + pub async fn clear_stale_filters(&self, now: Instant) { + trace!(target: "rpc::eth", "clear stale filters"); + self.active_filters().inner.lock().await.retain(|id, filter| { + let is_valid = (now - filter.last_poll_timestamp) < self.inner.stale_filter_ttl; + + if !is_valid { + trace!(target: "rpc::eth", "evict filter with id: {:?}", id); + } + + is_valid + }) + } } impl EthFilter @@ -244,7 +297,6 @@ impl Clone for EthFilter { #[derive(Debug)] struct EthFilterInner { /// The transaction pool. - #[allow(unused)] // we need this for non standard full transactions eventually pool: Pool, /// The provider that can interact with the chain. provider: Provider, @@ -259,8 +311,9 @@ struct EthFilterInner { /// maximum number of headers to read at once for range filter max_headers_range: u64, /// The type that can spawn tasks. - #[allow(unused)] task_spawner: Box, + /// Duration since the last filter poll, after which the filter is considered stale + stale_filter_ttl: Duration, } impl EthFilterInner