From f8c892a4c0e0c4bf8550f52b5092482f414d07b7 Mon Sep 17 00:00:00 2001 From: sistemd Date: Tue, 19 Nov 2024 11:33:55 +0100 Subject: [PATCH] purge aggregate bloom filter --- crates/pathfinder/Cargo.toml | 1 + crates/pathfinder/src/state/sync.rs | 5 ++ crates/pathfinder/src/sync/checkpoint.rs | 5 ++ crates/rpc/Cargo.toml | 2 - crates/storage/Cargo.toml | 2 - crates/storage/src/connection.rs | 2 +- crates/storage/src/connection/block.rs | 9 ++ crates/storage/src/connection/event.rs | 105 +++++++++++++++++++++++ crates/storage/src/lib.rs | 96 +-------------------- 9 files changed, 127 insertions(+), 100 deletions(-) diff --git a/crates/pathfinder/Cargo.toml b/crates/pathfinder/Cargo.toml index a554719537..86ee8836a8 100644 --- a/crates/pathfinder/Cargo.toml +++ b/crates/pathfinder/Cargo.toml @@ -14,6 +14,7 @@ path = "src/lib.rs" [features] tokio-console = ["console-subscriber", "tokio/tracing"] p2p = [] +aggregate_bloom = [] [dependencies] anyhow = { workspace = true } diff --git a/crates/pathfinder/src/state/sync.rs b/crates/pathfinder/src/state/sync.rs index 378d1cd6e4..c93b25e278 100644 --- a/crates/pathfinder/src/state/sync.rs +++ b/crates/pathfinder/src/state/sync.rs @@ -1082,6 +1082,11 @@ async fn l2_reorg( head -= 1; } + #[cfg(feature = "aggregate_bloom")] + transaction + .reconstruct_running_aggregate() + .context("Reconstructing running aggregate bloom")?; + // Track combined L1 and L2 state. let l1_l2_head = transaction.l1_l2_pointer().context("Query L1-L2 head")?; if let Some(l1_l2_head) = l1_l2_head { diff --git a/crates/pathfinder/src/sync/checkpoint.rs b/crates/pathfinder/src/sync/checkpoint.rs index 14179b7e7c..a437b9fb27 100644 --- a/crates/pathfinder/src/sync/checkpoint.rs +++ b/crates/pathfinder/src/sync/checkpoint.rs @@ -665,6 +665,11 @@ async fn rollback_to_anchor( head -= 1; } + #[cfg(feature = "aggregate_bloom")] + transaction + .reconstruct_running_aggregate() + .context("Reconstructing running aggregate bloom")?; + Ok(()) }) .await diff --git a/crates/rpc/Cargo.toml b/crates/rpc/Cargo.toml index c52d643acf..f7c88f0902 100644 --- a/crates/rpc/Cargo.toml +++ b/crates/rpc/Cargo.toml @@ -10,8 +10,6 @@ rust-version = { workspace = true } [features] aggregate_bloom = [] -default = [] - [dependencies] anyhow = { workspace = true } async-trait = { workspace = true } diff --git a/crates/storage/Cargo.toml b/crates/storage/Cargo.toml index 1f770f13cd..fcd2b3190e 100644 --- a/crates/storage/Cargo.toml +++ b/crates/storage/Cargo.toml @@ -10,8 +10,6 @@ rust-version = { workspace = true } [features] aggregate_bloom = [] -default = [] - [dependencies] anyhow = { workspace = true } base64 = { workspace = true } diff --git a/crates/storage/src/connection.rs b/crates/storage/src/connection.rs index 55defedb46..46ead118f0 100644 --- a/crates/storage/src/connection.rs +++ b/crates/storage/src/connection.rs @@ -5,7 +5,7 @@ use std::sync::Mutex; mod block; mod class; mod ethereum; -mod event; +pub(crate) mod event; mod reference; mod reorg_counter; mod signature; diff --git a/crates/storage/src/connection/block.rs b/crates/storage/src/connection/block.rs index c424bdd6c9..5a4f64763d 100644 --- a/crates/storage/src/connection/block.rs +++ b/crates/storage/src/connection/block.rs @@ -116,6 +116,15 @@ impl Transaction<'_> { /// /// This includes block header, block body and state update information. pub fn purge_block(&self, block: BlockNumber) -> anyhow::Result<()> { + self.inner() + .execute( + r" + DELETE FROM starknet_events_filters_aggregate + WHERE from_block <= :block AND to_block >= :block + ", + named_params![":block": &block], + ) + .context("Deleting aggregate bloom filter")?; self.inner() .execute( "DELETE FROM starknet_events_filters WHERE block_number = ?", diff --git a/crates/storage/src/connection/event.rs b/crates/storage/src/connection/event.rs index f28a84e813..66ebf636ac 100644 --- a/crates/storage/src/connection/event.rs +++ b/crates/storage/src/connection/event.rs @@ -69,12 +69,28 @@ pub struct PageOfEvents { impl Transaction<'_> { #[cfg(feature = "aggregate_bloom")] + pub fn reconstruct_running_aggregate(&self) -> anyhow::Result<()> { + let aggregate = reconstruct_running_aggregate(self.inner())?; + let mut running_aggregate = match self.running_aggregate.lock() { + Ok(guard) => guard, + Err(poisoned) => { + tracing::error!("Poisoned lock in reconstruct_running_aggregate"); + poisoned.into_inner() + } + }; + + *running_aggregate = aggregate; + + Ok(()) + } + /// Upsert the [aggregate event bloom filter](AggregateBloom) for the given /// block number. This function operates under the assumption that /// blocks are _never_ skipped so even if there are no events for a /// block, this function should still be called with an empty iterator. /// When testing it is fine to skip blocks, as long as the block at the end /// of the current range is not skipped. + #[cfg(feature = "aggregate_bloom")] pub(super) fn upsert_block_events_aggregate<'a>( &self, block_number: BlockNumber, @@ -619,6 +635,95 @@ impl Transaction<'_> { } } +/// Reconstruct the [aggregate](crate::bloom::AggregateBloom) for the range of +/// blocks between the last stored `to_block` in the aggregate Bloom filter +/// table and the last overall block in the database. This is needed because the +/// aggregate Bloom filter for each [block +/// range](crate::bloom::AggregateBloom::BLOCK_RANGE_LEN) is stored once the +/// range is complete, before that it is kept in memory and can be lost upon +/// shutdown. +#[cfg(feature = "aggregate_bloom")] +pub fn reconstruct_running_aggregate( + tx: &rusqlite::Transaction<'_>, +) -> anyhow::Result { + use super::transaction; + + let mut last_to_block_stmt = tx.prepare( + r" + SELECT to_block + FROM starknet_events_filters_aggregate + ORDER BY from_block DESC LIMIT 1 + ", + )?; + let mut events_to_reconstruct_stmt = tx.prepare( + r" + SELECT events + FROM transactions + WHERE block_number >= :first_running_aggregate_block + ", + )?; + + let last_to_block = last_to_block_stmt + .query_row([], |row| row.get::<_, u64>(0)) + .optional() + .context("Querying last stored aggregate to_block")?; + + let first_running_aggregate_block = match last_to_block { + Some(last_to_block) => BlockNumber::new_or_panic(last_to_block + 1), + // Aggregate Bloom filter table is empty -> reconstruct running aggregate + // from the genesis block. + None => BlockNumber::GENESIS, + }; + + let events_to_reconstruct: Vec>>> = events_to_reconstruct_stmt + .query_and_then( + named_params![":first_running_aggregate_block": &first_running_aggregate_block], + |row| { + let events: Option = row + .get_optional_blob(0)? + .map(|events_blob| -> anyhow::Result<_> { + let events = transaction::compression::decompress_events(events_blob) + .context("Decompressing events")?; + let events: transaction::dto::EventsForBlock = + bincode::serde::decode_from_slice(&events, bincode::config::standard()) + .context("Deserializing events")? + .0; + + Ok(events) + }) + .transpose()?; + + Ok(events.map(|events| { + events + .events() + .into_iter() + .map(|e| e.into_iter().map(Into::into).collect()) + .collect() + })) + }, + ) + .context("Querying events to reconstruct")? + .collect::>()?; + + let mut running_aggregate = AggregateBloom::new(first_running_aggregate_block); + + for (block, events_for_block) in events_to_reconstruct.iter().enumerate() { + if let Some(events) = events_for_block { + let block_number = first_running_aggregate_block + block as u64; + + let mut bloom = BloomFilter::new(); + for event in events.iter().flatten() { + bloom.set_keys(&event.keys); + bloom.set_address(&event.from_address); + } + + running_aggregate.add_bloom(&bloom, block_number); + } + } + + Ok(running_aggregate) +} + fn continuation_token( events: &[EmittedEvent], previous_token: ContinuationToken, diff --git a/crates/storage/src/lib.rs b/crates/storage/src/lib.rs index 47bd04ac77..c0929622e5 100644 --- a/crates/storage/src/lib.rs +++ b/crates/storage/src/lib.rs @@ -272,7 +272,7 @@ impl StorageBuilder { } #[cfg(feature = "aggregate_bloom")] - let running_aggregate = reconstruct_running_aggregate(&mut connection) + let running_aggregate = event::reconstruct_running_aggregate(&connection.transaction()?) .context("Reconstructing running aggregate bloom filter")?; connection @@ -514,100 +514,6 @@ fn schema_version(connection: &rusqlite::Connection) -> anyhow::Result { Ok(version) } -/// Reconstruct the [aggregate](bloom::AggregateBloom) for the range of blocks -/// between the last stored to_block in the aggregate Bloom filter table and the -/// last overall block in the database. This is needed because the aggregate -/// Bloom filter for each [block range](bloom::AggregateBloom::BLOCK_RANGE_LEN) -/// is stored once the range is complete, before that it is kept in memory and -/// can be lost upon shutdown. -#[cfg(feature = "aggregate_bloom")] -fn reconstruct_running_aggregate( - connection: &mut rusqlite::Connection, -) -> anyhow::Result { - use bloom::BloomFilter; - use params::{named_params, RowExt}; - use pathfinder_common::event::Event; - use transaction; - - let tx = connection - .transaction() - .context("Creating database transaction")?; - let mut select_last_to_block_stmt = tx.prepare( - r" - SELECT to_block - FROM starknet_events_filters_aggregate - ORDER BY from_block DESC LIMIT 1 - ", - )?; - let mut events_to_reconstruct_stmt = tx.prepare( - r" - SELECT events - FROM transactions - WHERE block_number >= :first_running_aggregate_block - ", - )?; - - let last_to_block = select_last_to_block_stmt - .query_row([], |row| row.get::<_, u64>(0)) - .optional() - .context("Querying last stored aggregate to_block")?; - - let first_running_aggregate_block = match last_to_block { - Some(last_to_block) => BlockNumber::new_or_panic(last_to_block + 1), - // Aggregate Bloom filter table is empty -> reconstruct running aggregate - // from the genesis block. - None => BlockNumber::GENESIS, - }; - - let events_to_reconstruct: Vec>>> = events_to_reconstruct_stmt - .query_and_then( - named_params![":first_running_aggregate_block": &first_running_aggregate_block], - |row| { - let events: Option = row - .get_optional_blob(0)? - .map(|events_blob| -> anyhow::Result<_> { - let events = transaction::compression::decompress_events(events_blob) - .context("Decompressing events")?; - let events: transaction::dto::EventsForBlock = - bincode::serde::decode_from_slice(&events, bincode::config::standard()) - .context("Deserializing events")? - .0; - - Ok(events) - }) - .transpose()?; - - Ok(events.map(|events| { - events - .events() - .into_iter() - .map(|e| e.into_iter().map(Into::into).collect()) - .collect() - })) - }, - ) - .context("Querying events to reconstruct")? - .collect::>()?; - - let mut running_aggregate = AggregateBloom::new(first_running_aggregate_block); - - for (block, events_for_block) in events_to_reconstruct.iter().enumerate() { - if let Some(events) = events_for_block { - let block_number = first_running_aggregate_block + block as u64; - - let mut bloom = BloomFilter::new(); - for event in events.iter().flatten() { - bloom.set_keys(&event.keys); - bloom.set_address(&event.from_address); - } - - running_aggregate.add_bloom(&bloom, block_number); - } - } - - Ok(running_aggregate) -} - #[cfg(test)] mod tests { use super::*;