diff --git a/crates/storage/src/connection.rs b/crates/storage/src/connection.rs index ea9dd0e7f..c0eb2c82e 100644 --- a/crates/storage/src/connection.rs +++ b/crates/storage/src/connection.rs @@ -204,10 +204,6 @@ impl<'inner> Transaction<'inner> { event::get_events(self, filter) } - pub fn event_count_for_block(&self, block: BlockId) -> anyhow::Result { - event::event_count_for_block(self, block) - } - pub fn insert_sierra_class( &self, sierra_hash: &SierraHash, diff --git a/crates/storage/src/connection/block.rs b/crates/storage/src/connection/block.rs index e2f6fa09a..e43004a9a 100644 --- a/crates/storage/src/connection/block.rs +++ b/crates/storage/src/connection/block.rs @@ -79,6 +79,13 @@ fn intern_starknet_version(tx: &Transaction<'_>, version: &StarknetVersion) -> a } pub(super) fn purge_block(tx: &Transaction<'_>, block: BlockNumber) -> anyhow::Result<()> { + tx.inner() + .execute( + "DELETE FROM starknet_events_filters WHERE block_number = ?", + params![&block], + ) + .context("Deleting bloom filter")?; + tx.inner() .execute( r"DELETE FROM starknet_transactions WHERE block_hash = ( diff --git a/crates/storage/src/connection/event.rs b/crates/storage/src/connection/event.rs index 0eca06c79..af149b77b 100644 --- a/crates/storage/src/connection/event.rs +++ b/crates/storage/src/connection/event.rs @@ -1,5 +1,6 @@ +use crate::bloom::BloomFilter; use crate::params::ToSql; -use crate::{prelude::*, BlockId}; +use crate::prelude::*; use anyhow::Context; use pathfinder_common::event::Event; @@ -66,71 +67,29 @@ pub struct KeyFilterResult<'a> { pub param: (&'static str, rusqlite::types::ToSqlOutput<'a>), } -pub(super) fn insert_events( +pub(super) fn insert_block_events<'a>( tx: &Transaction<'_>, block_number: BlockNumber, - transaction_hash: TransactionHash, - events: &[Event], + events: impl Iterator, ) -> anyhow::Result<()> { - let mut stmt = tx.inner().prepare( - r"INSERT INTO starknet_events ( block_number, idx, transaction_hash, from_address, keys, data) - VALUES (:block_number, :idx, :transaction_hash, :from_address, :keys, :data)" - )?; + let mut stmt = tx + .inner() + .prepare("INSERT INTO starknet_events_filters (block_number, bloom) VALUES (?, ?)")?; + + let mut bloom = BloomFilter::new(); + for event in events { + for (i, key) in event.keys.iter().enumerate() { + let mut key = key.0; + key.as_mut_be_bytes()[0] |= (i as u8) << 4; + bloom.set(&key); + } - let mut keys = String::new(); - let mut buffer = Vec::new(); - - for (idx, event) in events.iter().enumerate() { - keys.clear(); - event_keys_to_base64_strings(&event.keys, &mut keys); - - buffer.clear(); - encode_event_data_to_bytes(&event.data, &mut buffer); - - stmt.execute(named_params![ - ":block_number": &block_number, - ":idx": &idx.try_into_sql_int()?, - ":transaction_hash": &transaction_hash, - ":from_address": &event.from_address, - ":keys": &keys, - ":data": &buffer, - ]) - .context("Insert events into events table")?; + bloom.set(&event.from_address.0); } - Ok(()) -} -pub(super) fn event_count_for_block(tx: &Transaction<'_>, block: BlockId) -> anyhow::Result { - match block { - BlockId::Number(number) => tx - .inner() - .query_row( - "SELECT COUNT(*) FROM starknet_events - WHERE block_number = ?1", - params![&number], - |row| row.get(0), - ) - .context("Counting events"), - BlockId::Hash(hash) => tx - .inner() - .query_row( - "SELECT COUNT(*) FROM starknet_events - JOIN block_headers ON starknet_events.block_number = block_headers.number - WHERE block_headers.hash = ?1", - params![&hash], - |row| row.get(0), - ) - .context("Counting transactions"), - BlockId::Latest => { - // First get the latest block - let block = match tx.block_id(BlockId::Latest)? { - Some((number, _)) => number, - None => return Ok(0), - }; + stmt.execute(params![&block_number, &bloom.as_compressed_bytes()])?; - event_count_for_block(tx, block.into()) - } - } + Ok(()) } pub(super) fn get_events( @@ -1209,26 +1168,4 @@ mod tests { None => assert_eq!(result, None), } } - - #[test] - fn event_count_for_block() { - let (storage, test_data) = test_utils::setup_test_storage(); - let mut connection = storage.connection().unwrap(); - let tx = connection.transaction().unwrap(); - - assert_eq!( - tx.event_count_for_block(BlockId::Latest).unwrap(), - test_utils::EVENTS_PER_BLOCK - ); - assert_eq!( - tx.event_count_for_block(BlockId::Number(BlockNumber::new_or_panic(1))) - .unwrap(), - test_utils::EVENTS_PER_BLOCK - ); - assert_eq!( - tx.event_count_for_block(BlockId::Hash(test_data.headers.first().unwrap().hash)) - .unwrap(), - test_utils::EVENTS_PER_BLOCK - ); - } } diff --git a/crates/storage/src/connection/transaction.rs b/crates/storage/src/connection/transaction.rs index 6949b5e54..5e85dbb4e 100644 --- a/crates/storage/src/connection/transaction.rs +++ b/crates/storage/src/connection/transaction.rs @@ -49,12 +49,13 @@ pub(super) fn insert_transactions( ":receipt": &serialized_receipt, ":execution_status": &execution_status, ]).context("Inserting transaction data")?; - - // insert events from receipt - super::event::insert_events(tx, block_number, receipt.transaction_hash, &receipt.events) - .context("Inserting events")?; } + let events = transaction_data + .iter() + .flat_map(|(_, receipt)| &receipt.events); + super::event::insert_block_events(tx, block_number, events) + .context("Inserting events into Bloom filter")?; Ok(()) }