diff --git a/Cargo.lock b/Cargo.lock index 73f47054c..7a3b15e73 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6168,6 +6168,7 @@ dependencies = [ "bincode", "bitvec", "bloomfilter", + "cached", "const_format", "data-encoding", "fake", diff --git a/crates/storage/Cargo.toml b/crates/storage/Cargo.toml index 3ccce5690..ae12614f8 100644 --- a/crates/storage/Cargo.toml +++ b/crates/storage/Cargo.toml @@ -13,6 +13,7 @@ base64 = { workspace = true } bincode = "2.0.0-rc.3" bitvec = { workspace = true } bloomfilter = "1.0.12" +cached = "0.44.0" const_format = { workspace = true } data-encoding = "2.4.0" fake = { workspace = true } diff --git a/crates/storage/src/bloom.rs b/crates/storage/src/bloom.rs index 854f08ff9..8aba1a084 100644 --- a/crates/storage/src/bloom.rs +++ b/crates/storage/src/bloom.rs @@ -1,6 +1,7 @@ use bloomfilter::Bloom; use pathfinder_crypto::Felt; +#[derive(Clone)] pub(crate) struct BloomFilter(Bloom); impl BloomFilter { diff --git a/crates/storage/src/connection/event.rs b/crates/storage/src/connection/event.rs index 1de0b6cef..ca7dd6dff 100644 --- a/crates/storage/src/connection/event.rs +++ b/crates/storage/src/connection/event.rs @@ -1,6 +1,9 @@ +use std::sync::{Mutex, MutexGuard}; + use crate::bloom::BloomFilter; use crate::prelude::*; +use cached::{Cached, SizedCache}; use pathfinder_common::event::Event; use pathfinder_common::{ BlockHash, BlockNumber, ContractAddress, EventData, EventKey, TransactionHash, @@ -103,10 +106,6 @@ pub(super) fn get_events( let mut offset = filter.offset; let key_filter_is_empty = filter.keys.iter().flatten().count() == 0; - let mut bloom_stmt = tx - .inner() - .prepare_cached("SELECT bloom FROM starknet_events_filters WHERE block_number = ?")?; - let mut emitted_events = Vec::new(); for block_number in from_block..=to_block { @@ -118,12 +117,7 @@ pub(super) fn get_events( tracing::trace!(%block_number, %events_required, "Processing block"); if !key_filter_is_empty || filter.contract_address.is_some() { - let bloom = bloom_stmt - .query_row(params![&block_number], |row| { - let bytes: Vec = row.get(0)?; - Ok(BloomFilter::from_compressed_bytes(&bytes)) - }) - .optional()?; + let bloom = load_bloom(tx, block_number)?; let Some(bloom) = bloom else { break; }; @@ -214,6 +208,63 @@ pub(super) fn get_events( }) } +fn load_bloom( + tx: &Transaction<'_>, + block_number: u64, +) -> Result, EventFilterError> { + if let Some(bloom) = GLOBAL_CACHE.get(block_number)? { + return Ok(Some(bloom)); + } + + let mut stmt = tx + .inner() + .prepare_cached("SELECT bloom FROM starknet_events_filters WHERE block_number = ?")?; + + let bloom = stmt + .query_row(params![&block_number], |row| { + let bytes: Vec = row.get(0)?; + Ok(BloomFilter::from_compressed_bytes(&bytes)) + }) + .optional()?; + + if let Some(bloom) = &bloom { + GLOBAL_CACHE.set(block_number, bloom.clone())?; + } + + Ok(bloom) +} + +lazy_static::lazy_static! { + // FIXME: remove bloom filters when purging blocks + static ref GLOBAL_CACHE: BloomFilterCache = BloomFilterCache::new(); +} + +struct BloomFilterCache(Mutex>); + +impl BloomFilterCache { + fn new() -> Self { + Self(Mutex::new(SizedCache::with_size(512 * 1024))) + } + + fn locked_cache( + &self, + ) -> Result>, EventFilterError> { + self.0.lock().map_err(|err| { + tracing::warn!("Bloom filter cache lock is poisoned. Cause: {}.", err); + EventFilterError::Internal(anyhow::anyhow!("Poisoned lock")) + }) + } + + pub fn get(&self, block_number: u64) -> Result, EventFilterError> { + Ok(self.locked_cache()?.cache_get(&block_number).cloned()) + } + + pub fn set(&self, block_number: u64, bloom: BloomFilter) -> Result<(), EventFilterError> { + self.locked_cache()?.cache_set(block_number, bloom); + Ok(()) + } +} + fn keys_in_bloom(bloom: &BloomFilter, keys: &[Vec]) -> bool { keys.iter().enumerate().all(|(idx, keys)| { if keys.is_empty() {