Skip to content

Commit

Permalink
feat(storage/bloom): use reorg_counter as part of Bloom filter cache key
Browse files Browse the repository at this point in the history
To avoid issues during reorgs we need to use the reorg counter as part
of the Bloom filter cache key. Otherwise it would be possible for
transactions still running with the reorged-away blocks present to
re-add a Bloom filter with the old contents to the cache.
  • Loading branch information
kkovaacs committed Jan 16, 2024
1 parent 15d25ba commit 5982362
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 27 deletions.
32 changes: 21 additions & 11 deletions crates/storage/src/bloom.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use std::sync::{Mutex, MutexGuard};

use bloomfilter::Bloom;
use cached::{Cached, SizedCache};
use pathfinder_common::BlockHash;
use pathfinder_common::{BlockNumber, ReorgCounter};
use pathfinder_crypto::Felt;

#[derive(Clone)]
Expand Down Expand Up @@ -67,7 +67,8 @@ impl BloomFilter {
}
}

pub(crate) struct Cache(Mutex<SizedCache<BlockHash, BloomFilter>>);
type CacheKey = (ReorgCounter, BlockNumber);
pub(crate) struct Cache(Mutex<SizedCache<CacheKey, BloomFilter>>);

#[derive(Debug, thiserror::Error)]
pub(crate) enum CacheError {
Expand All @@ -82,25 +83,34 @@ impl Cache {

fn locked_cache(
&self,
) -> Result<MutexGuard<'_, SizedCache<BlockHash, BloomFilter>>, CacheError> {
) -> Result<MutexGuard<'_, SizedCache<CacheKey, BloomFilter>>, CacheError> {
self.0.lock().map_err(|err| {
tracing::warn!("Bloom filter cache lock is poisoned. Cause: {}.", err);
CacheError::Poisoned
})
}

pub fn get(&self, block_hash: &BlockHash) -> Result<Option<BloomFilter>, CacheError> {
Ok(self.locked_cache()?.cache_get(block_hash).cloned())
pub fn get(
&self,
reorg_counter: ReorgCounter,
block_number: BlockNumber,
) -> Result<Option<BloomFilter>, CacheError> {
Ok(self
.locked_cache()?
.cache_get(&(reorg_counter, block_number))
.cloned())
}

pub fn set(&self, block_hash: BlockHash, bloom: BloomFilter) -> Result<(), CacheError> {
self.locked_cache()?.cache_set(block_hash, bloom);
pub fn set(
&self,
reorg_counter: ReorgCounter,
block_number: BlockNumber,
bloom: BloomFilter,
) -> Result<(), CacheError> {
self.locked_cache()?
.cache_set((reorg_counter, block_number), bloom);
Ok(())
}

pub fn remove(&self, block_hash: &BlockHash) -> Result<Option<BloomFilter>, CacheError> {
Ok(self.locked_cache()?.cache_remove(block_hash))
}
}

#[cfg(test)]
Expand Down
32 changes: 16 additions & 16 deletions crates/storage/src/connection/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use crate::prelude::*;

use pathfinder_common::event::Event;
use pathfinder_common::{
BlockHash, BlockNumber, ContractAddress, EventData, EventKey, TransactionHash,
BlockHash, BlockNumber, ContractAddress, EventData, EventKey, ReorgCounter, TransactionHash,
};

pub const PAGE_SIZE_LIMIT: usize = 1_024;
Expand Down Expand Up @@ -105,6 +105,8 @@ pub(super) fn get_events(
return Err(EventFilterError::PageSizeTooSmall);
}

let reorg_counter = tx.reorg_counter()?;

let from_block = filter.from_block.unwrap_or(BlockNumber::GENESIS).get();
let to_block = filter.to_block.unwrap_or(BlockNumber::MAX).get();
let mut offset = filter.offset;
Expand All @@ -114,22 +116,16 @@ pub(super) fn get_events(
let mut blocks_scanned: usize = 0;

for block_number in from_block..=to_block {
let block_number = BlockNumber::new_or_panic(block_number);
if emitted_events.len() > filter.page_size {
break;
}
let events_required = filter.page_size + 1 - emitted_events.len();

tracing::trace!(%block_number, %events_required, "Processing block");

let block_header = tx.block_header(crate::BlockId::Number(BlockNumber::new_or_panic(
block_number,
)))?;
let Some(block_header) = block_header else {
break;
};

if !key_filter_is_empty || filter.contract_address.is_some() {
let bloom = load_bloom(tx, block_number, &block_header.hash)?;
let bloom = load_bloom(tx, reorg_counter, block_number)?;
if let Some(bloom) = bloom {
if !keys_in_bloom(&bloom, &filter.keys) {
tracing::trace!("Bloom filter did not match keys");
Expand All @@ -146,14 +142,17 @@ pub(super) fn get_events(
};
}

let block_header = tx.block_header(crate::BlockId::Number(block_number))?;
let Some(block_header) = block_header else {
break;
};

blocks_scanned += 1;
if blocks_scanned > MAX_BLOCKS_TO_SCAN {
return Err(EventFilterError::TooManyMatches);
}

let receipts = tx.receipts_for_block(crate::BlockId::Number(BlockNumber::new_or_panic(
block_number,
)))?;
let receipts = tx.receipts_for_block(block_header.hash.into())?;
let Some(receipts) = receipts else {
break;
};
Expand Down Expand Up @@ -220,10 +219,10 @@ pub(super) fn get_events(

fn load_bloom(
tx: &Transaction<'_>,
block_number: u64,
block_hash: &BlockHash,
reorg_counter: ReorgCounter,
block_number: BlockNumber,
) -> Result<Option<BloomFilter>, EventFilterError> {
if let Some(bloom) = tx.bloom_filter_cache.get(block_hash)? {
if let Some(bloom) = tx.bloom_filter_cache.get(reorg_counter, block_number)? {
return Ok(Some(bloom));
}

Expand All @@ -239,7 +238,8 @@ fn load_bloom(
.optional()?;

if let Some(bloom) = &bloom {
tx.bloom_filter_cache.set(*block_hash, bloom.clone())?;
tx.bloom_filter_cache
.set(reorg_counter, block_number, bloom.clone())?;
}

Ok(bloom)
Expand Down

0 comments on commit 5982362

Please sign in to comment.