From 900c492b2f14f3ed79f75deece8ca68925da1f55 Mon Sep 17 00:00:00 2001 From: Krisztian Kovacs Date: Mon, 18 Dec 2023 10:29:03 +0100 Subject: [PATCH] feat(storage): add per-block Bloom filter for events This change adds a new table storing Bloom filters for events emitted in that block. The filter stores the keys (prefixed with the index) and the address of the contract that emitted the event and can be used to avoid loading the receipts for blocks that do not contain matching events. --- Cargo.lock | 22 +++- crates/crypto/src/algebra/field/felt.rs | 5 + crates/storage/Cargo.toml | 1 + crates/storage/src/bloom.rs | 130 +++++++++++++++++++++ crates/storage/src/lib.rs | 1 + crates/storage/src/schema.rs | 2 + crates/storage/src/schema/revision_0046.rs | 95 +++++++++++++++ 7 files changed, 254 insertions(+), 2 deletions(-) create mode 100644 crates/storage/src/bloom.rs create mode 100644 crates/storage/src/schema/revision_0046.rs diff --git a/Cargo.lock b/Cargo.lock index b18850e00..60a1a515f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -847,6 +847,17 @@ dependencies = [ "log", ] +[[package]] +name = "bloomfilter" +version = "1.0.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b92db7965d438b8b4b1c1d0aedd188440a1084593c9eb7f6657e3df7e906d934" +dependencies = [ + "bit-vec", + "getrandom", + "siphasher 1.0.0", +] + [[package]] name = "bs58" version = "0.5.0" @@ -6156,6 +6167,7 @@ dependencies = [ "base64 0.13.1", "bincode", "bitvec", + "bloomfilter", "const_format", "data-encoding", "fake", @@ -6248,7 +6260,7 @@ version = "0.10.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b6796ad771acdc0123d2a88dc428b5e38ef24456743ddb1744ed628f9815c096" dependencies = [ - "siphasher", + "siphasher 0.3.10", ] [[package]] @@ -6257,7 +6269,7 @@ version = "0.11.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "90fcb95eef784c2ac79119d1dd819e162b5da872ce6f3c3abe1e8ca1c082f72b" dependencies = [ - "siphasher", + "siphasher 0.3.10", ] [[package]] @@ -7535,6 +7547,12 @@ version = "0.3.10" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7bd3e3206899af3f8b12af284fafc038cc1dc2b41d1b89dd17297221c5d225de" +[[package]] +name = "siphasher" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "54ac45299ccbd390721be55b412d41931911f654fa99e2cb8bfb57184b2061fe" + [[package]] name = "sketches-ddsketch" version = "0.2.1" diff --git a/crates/crypto/src/algebra/field/felt.rs b/crates/crypto/src/algebra/field/felt.rs index d4177692d..e401c197e 100644 --- a/crates/crypto/src/algebra/field/felt.rs +++ b/crates/crypto/src/algebra/field/felt.rs @@ -95,6 +95,11 @@ impl Felt { &self.0 } + /// Big-endian mutable representation of this [Felt]. + pub fn as_mut_be_bytes(&mut self) -> &mut [u8; 32] { + &mut self.0 + } + /// Convenience function which extends [Felt::from_be_bytes] to work with slices. pub const fn from_be_slice(bytes: &[u8]) -> Result { if bytes.len() > 32 { diff --git a/crates/storage/Cargo.toml b/crates/storage/Cargo.toml index 613e6a718..3355a981f 100644 --- a/crates/storage/Cargo.toml +++ b/crates/storage/Cargo.toml @@ -12,6 +12,7 @@ anyhow = { workspace = true } base64 = { workspace = true } bincode = "2.0.0-rc.3" bitvec = { workspace = true } +bloomfilter = "1.0.12" const_format = { workspace = true } data-encoding = "2.4.0" fake = { workspace = true } diff --git a/crates/storage/src/bloom.rs b/crates/storage/src/bloom.rs new file mode 100644 index 000000000..854f08ff9 --- /dev/null +++ b/crates/storage/src/bloom.rs @@ -0,0 +1,130 @@ +use bloomfilter::Bloom; +use pathfinder_crypto::Felt; + +pub(crate) struct BloomFilter(Bloom); + +impl BloomFilter { + // The size of the bitmap used by the Bloom filter (in bytes). + const BITMAP_BYTES: u64 = 2048; + // The maximal number of items anticipated to be inserted into the Bloom filter. + const ITEMS_COUNT: u32 = 1024; + // The number of hash functions used by the Bloom filter. + // We need this value to be able to re-create the filter with the deserialized bitmap. + const K_NUM: u32 = 12; + // The seed used by the hash functions of the filter. + // This is a randomly generated vector of 32 bytes. + const SEED: [u8; 32] = [ + 0xef, 0x51, 0x88, 0x74, 0xef, 0x08, 0x3d, 0xf6, 0x7d, 0x7a, 0x93, 0xb7, 0xb3, 0x13, 0x1f, + 0x87, 0xd3, 0x26, 0xbd, 0x49, 0xc7, 0x18, 0xcc, 0xe5, 0xd7, 0xe8, 0xa0, 0xdb, 0xea, 0x80, + 0x67, 0x52, + ]; + + pub fn new() -> Self { + let bloom = Bloom::new_with_seed( + Self::BITMAP_BYTES as usize, + Self::ITEMS_COUNT as usize, + &Self::SEED, + ); + assert_eq!(bloom.number_of_hash_functions(), Self::K_NUM); + + Self(bloom) + } + + pub fn from_compressed_bytes(bytes: &[u8]) -> Self { + let bytes = zstd::bulk::decompress(bytes, Self::BITMAP_BYTES as usize * 2) + .expect("Decompressing Bloom filter"); + Self::from_bytes(&bytes) + } + + fn from_bytes(bytes: &[u8]) -> Self { + let k1 = u64::from_le_bytes(Self::SEED[0..8].try_into().unwrap()); + let k2 = u64::from_le_bytes(Self::SEED[8..16].try_into().unwrap()); + let k3 = u64::from_le_bytes(Self::SEED[16..24].try_into().unwrap()); + let k4 = u64::from_le_bytes(Self::SEED[24..32].try_into().unwrap()); + let bloom = Bloom::from_existing( + bytes, + Self::BITMAP_BYTES * 8, + Self::K_NUM, + [(k1, k2), (k3, k4)], + ); + Self(bloom) + } + + pub fn to_compressed_bytes(&self) -> Vec { + let bytes = self.to_bytes(); + zstd::bulk::compress(&bytes, 0).expect("Compressing Bloom filter") + } + + fn to_bytes(&self) -> Vec { + self.0.bitmap() + } + + fn set(&mut self, key: &Felt) { + self.0.set(key); + } + + pub fn set_address(&mut self, address: &ContractAddress) { + self.set(&address.0); + } + + pub fn set_keys(&mut self, keys: &[EventKey]) { + for (i, key) in keys.iter().take(EVENT_KEY_FILTER_LIMIT).enumerate() { + let mut key = key.0; + key.as_mut_be_bytes()[0] |= (i as u8) << 4; + self.set(&key); + } + } + + fn check(&self, key: &Felt) -> bool { + self.0.check(key) + } + + pub fn check_address(&self, address: &ContractAddress) -> bool { + self.check(&address.0) + } + + pub fn check_keys(&self, keys: &[Vec]) -> bool { + keys.iter().enumerate().all(|(idx, keys)| { + if keys.is_empty() { + return true; + }; + + keys.iter().any(|key| { + let mut key = key.0; + key.as_mut_be_bytes()[0] |= (idx as u8) << 4; + tracing::trace!(%idx, %key, "Checking key in filter"); + self.check(&key) + }) + }) + } +} + +#[cfg(test)] +mod tests { + use pathfinder_common::felt; + + use super::*; + + const KEY: Felt = felt!("0x0218b538681900fad5a0b2ffe1d6781c0c3f14df5d32071ace0bdc9d46cb69eb"); + const KEY_NOT_IN_FILTER: Felt = + felt!("0x0218b538681900fad5a0b2ffe1d6781c0c3f14df5d32071ace0bdc9d46cb69ec"); + + #[test] + fn set_and_check() { + let mut bloom = BloomFilter::new(); + bloom.set(&KEY); + assert!(bloom.check(&KEY)); + assert!(!bloom.check(&KEY_NOT_IN_FILTER)); + } + + #[test] + fn serialize_roundtrip() { + let mut bloom = BloomFilter::new(); + bloom.set(&KEY); + + let bytes = bloom.to_compressed_bytes(); + let bloom = BloomFilter::from_compressed_bytes(&bytes); + assert!(bloom.check(&KEY)); + assert!(!bloom.check(&KEY_NOT_IN_FILTER)); + } +} diff --git a/crates/storage/src/lib.rs b/crates/storage/src/lib.rs index 88847851e..df4552dad 100644 --- a/crates/storage/src/lib.rs +++ b/crates/storage/src/lib.rs @@ -5,6 +5,7 @@ // This is intended for internal use only -- do not make public. mod prelude; +mod bloom; mod connection; pub mod fake; mod params; diff --git a/crates/storage/src/schema.rs b/crates/storage/src/schema.rs index b62dcd456..cf5eae45c 100644 --- a/crates/storage/src/schema.rs +++ b/crates/storage/src/schema.rs @@ -5,6 +5,7 @@ mod revision_0042; mod revision_0043; mod revision_0044; mod revision_0045; +mod revision_0046; pub(crate) use base::base_schema; @@ -18,6 +19,7 @@ pub fn migrations() -> &'static [MigrationFn] { revision_0043::migrate, revision_0044::migrate, revision_0045::migrate, + revision_0046::migrate, ] } diff --git a/crates/storage/src/schema/revision_0046.rs b/crates/storage/src/schema/revision_0046.rs new file mode 100644 index 000000000..7c73b6499 --- /dev/null +++ b/crates/storage/src/schema/revision_0046.rs @@ -0,0 +1,95 @@ +use std::time::{Duration, Instant}; + +use crate::{bloom::BloomFilter, params::RowExt}; + +use anyhow::Context; +use pathfinder_common::EventKey; +use rusqlite::params; + +pub(crate) fn migrate(tx: &rusqlite::Transaction<'_>) -> anyhow::Result<()> { + tx.execute_batch( + r" + CREATE TABLE starknet_events_filters ( + block_number INTEGER NOT NULL PRIMARY KEY, + bloom BLOB NOT NULL + ); + ", + ) + .context("Creating event Bloom filter table")?; + + tracing::info!("Creating Bloom filters for events"); + + let mut query_statement = tx.prepare( + r"SELECT + block_number, + from_address, + keys + FROM starknet_events + ORDER BY block_number + ", + )?; + + let mut insert_statement = + tx.prepare(r"INSERT INTO starknet_events_filters (block_number, bloom) VALUES (?, ?)")?; + + let mut rows = query_statement.query([])?; + + let mut prev_block_number: u64 = 0; + let mut bloom = BloomFilter::new(); + let mut events_in_filter: usize = 0; + let mut progress_logged = Instant::now(); + const LOG_RATE: Duration = Duration::from_secs(10); + + while let Some(row) = rows.next().context("Fetching next receipt")? { + let block_number = row.get_block_number("block_number")?; + + let current_block_number = block_number.get(); + if current_block_number > prev_block_number { + if current_block_number % 1024 == 0 && progress_logged.elapsed() > LOG_RATE { + tracing::debug!(%current_block_number, "Processing events"); + progress_logged = Instant::now(); + } + + insert_statement.execute(params![prev_block_number, bloom.to_compressed_bytes()])?; + + bloom = BloomFilter::new(); + prev_block_number = current_block_number; + events_in_filter = 0; + } + + let from_address = row.get_contract_address("from_address")?; + let keys = row.get_ref_unwrap("keys").as_str()?; + // no need to allocate a vec for this in loop + let mut temp = [0u8; 32]; + let keys: Vec = keys + .split(' ') + .map(|key| { + let used = base64::decode_config_slice(key, base64::STANDARD, &mut temp) + .map_err(anyhow::Error::from)?; + let key = pathfinder_crypto::Felt::from_be_slice(&temp[..used]) + .map_err(anyhow::Error::from)?; + Ok(EventKey(key)) + }) + .collect::>()?; + + bloom.set_keys(&keys); + bloom.set_address(&from_address); + + events_in_filter += 1; + } + + if events_in_filter > 0 { + insert_statement.execute(params![prev_block_number, bloom.to_compressed_bytes()])?; + } + + tracing::info!("Dropping starknet_events table"); + tx.execute_batch( + r" + DROP TABLE starknet_events_keys_03; + DROP TABLE starknet_events; + ", + ) + .context("Dropping starknet_events table")?; + + Ok(()) +}