Skip to content

Commit

Permalink
feat(storage): add per-block Bloom filter for events
Browse files Browse the repository at this point in the history
This change adds a new table storing Bloom filters for events emitted
in that block. The filter stores the keys (prefixed with the index) and
the address of the contract that emitted the event and can be used to
avoid loading the receipts for blocks that do not contain matching
events.
  • Loading branch information
kkovaacs committed Jan 22, 2024
1 parent 8b96de9 commit 900c492
Show file tree
Hide file tree
Showing 7 changed files with 254 additions and 2 deletions.
22 changes: 20 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 5 additions & 0 deletions crates/crypto/src/algebra/field/felt.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,11 @@ impl Felt {
&self.0
}

/// Big-endian mutable representation of this [Felt].
pub fn as_mut_be_bytes(&mut self) -> &mut [u8; 32] {
&mut self.0
}

/// Convenience function which extends [Felt::from_be_bytes] to work with slices.
pub const fn from_be_slice(bytes: &[u8]) -> Result<Self, OverflowError> {
if bytes.len() > 32 {
Expand Down
1 change: 1 addition & 0 deletions crates/storage/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ anyhow = { workspace = true }
base64 = { workspace = true }
bincode = "2.0.0-rc.3"
bitvec = { workspace = true }
bloomfilter = "1.0.12"
const_format = { workspace = true }
data-encoding = "2.4.0"
fake = { workspace = true }
Expand Down
130 changes: 130 additions & 0 deletions crates/storage/src/bloom.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
use bloomfilter::Bloom;
use pathfinder_crypto::Felt;

pub(crate) struct BloomFilter(Bloom<Felt>);

impl BloomFilter {
// The size of the bitmap used by the Bloom filter (in bytes).
const BITMAP_BYTES: u64 = 2048;
// The maximal number of items anticipated to be inserted into the Bloom filter.
const ITEMS_COUNT: u32 = 1024;
// The number of hash functions used by the Bloom filter.
// We need this value to be able to re-create the filter with the deserialized bitmap.
const K_NUM: u32 = 12;
// The seed used by the hash functions of the filter.
// This is a randomly generated vector of 32 bytes.
const SEED: [u8; 32] = [
0xef, 0x51, 0x88, 0x74, 0xef, 0x08, 0x3d, 0xf6, 0x7d, 0x7a, 0x93, 0xb7, 0xb3, 0x13, 0x1f,
0x87, 0xd3, 0x26, 0xbd, 0x49, 0xc7, 0x18, 0xcc, 0xe5, 0xd7, 0xe8, 0xa0, 0xdb, 0xea, 0x80,
0x67, 0x52,
];

pub fn new() -> Self {
let bloom = Bloom::new_with_seed(
Self::BITMAP_BYTES as usize,
Self::ITEMS_COUNT as usize,
&Self::SEED,
);
assert_eq!(bloom.number_of_hash_functions(), Self::K_NUM);

Self(bloom)
}

pub fn from_compressed_bytes(bytes: &[u8]) -> Self {
let bytes = zstd::bulk::decompress(bytes, Self::BITMAP_BYTES as usize * 2)
.expect("Decompressing Bloom filter");
Self::from_bytes(&bytes)
}

fn from_bytes(bytes: &[u8]) -> Self {
let k1 = u64::from_le_bytes(Self::SEED[0..8].try_into().unwrap());
let k2 = u64::from_le_bytes(Self::SEED[8..16].try_into().unwrap());
let k3 = u64::from_le_bytes(Self::SEED[16..24].try_into().unwrap());
let k4 = u64::from_le_bytes(Self::SEED[24..32].try_into().unwrap());
let bloom = Bloom::from_existing(
bytes,
Self::BITMAP_BYTES * 8,
Self::K_NUM,
[(k1, k2), (k3, k4)],
);
Self(bloom)
}

pub fn to_compressed_bytes(&self) -> Vec<u8> {
let bytes = self.to_bytes();
zstd::bulk::compress(&bytes, 0).expect("Compressing Bloom filter")
}

fn to_bytes(&self) -> Vec<u8> {
self.0.bitmap()
}

fn set(&mut self, key: &Felt) {
self.0.set(key);
}

pub fn set_address(&mut self, address: &ContractAddress) {
self.set(&address.0);
}

pub fn set_keys(&mut self, keys: &[EventKey]) {
for (i, key) in keys.iter().take(EVENT_KEY_FILTER_LIMIT).enumerate() {
let mut key = key.0;
key.as_mut_be_bytes()[0] |= (i as u8) << 4;
self.set(&key);
}
}

fn check(&self, key: &Felt) -> bool {
self.0.check(key)
}

pub fn check_address(&self, address: &ContractAddress) -> bool {
self.check(&address.0)
}

pub fn check_keys(&self, keys: &[Vec<EventKey>]) -> bool {
keys.iter().enumerate().all(|(idx, keys)| {
if keys.is_empty() {
return true;
};

keys.iter().any(|key| {
let mut key = key.0;
key.as_mut_be_bytes()[0] |= (idx as u8) << 4;
tracing::trace!(%idx, %key, "Checking key in filter");
self.check(&key)
})
})
}
}

#[cfg(test)]
mod tests {
use pathfinder_common::felt;

use super::*;

const KEY: Felt = felt!("0x0218b538681900fad5a0b2ffe1d6781c0c3f14df5d32071ace0bdc9d46cb69eb");
const KEY_NOT_IN_FILTER: Felt =
felt!("0x0218b538681900fad5a0b2ffe1d6781c0c3f14df5d32071ace0bdc9d46cb69ec");

#[test]
fn set_and_check() {
let mut bloom = BloomFilter::new();
bloom.set(&KEY);
assert!(bloom.check(&KEY));
assert!(!bloom.check(&KEY_NOT_IN_FILTER));
}

#[test]
fn serialize_roundtrip() {
let mut bloom = BloomFilter::new();
bloom.set(&KEY);

let bytes = bloom.to_compressed_bytes();
let bloom = BloomFilter::from_compressed_bytes(&bytes);
assert!(bloom.check(&KEY));
assert!(!bloom.check(&KEY_NOT_IN_FILTER));
}
}
1 change: 1 addition & 0 deletions crates/storage/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
// This is intended for internal use only -- do not make public.
mod prelude;

mod bloom;
mod connection;
pub mod fake;
mod params;
Expand Down
2 changes: 2 additions & 0 deletions crates/storage/src/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ mod revision_0042;
mod revision_0043;
mod revision_0044;
mod revision_0045;
mod revision_0046;

pub(crate) use base::base_schema;

Expand All @@ -18,6 +19,7 @@ pub fn migrations() -> &'static [MigrationFn] {
revision_0043::migrate,
revision_0044::migrate,
revision_0045::migrate,
revision_0046::migrate,
]
}

Expand Down
95 changes: 95 additions & 0 deletions crates/storage/src/schema/revision_0046.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
use std::time::{Duration, Instant};

use crate::{bloom::BloomFilter, params::RowExt};

use anyhow::Context;
use pathfinder_common::EventKey;
use rusqlite::params;

pub(crate) fn migrate(tx: &rusqlite::Transaction<'_>) -> anyhow::Result<()> {
tx.execute_batch(
r"
CREATE TABLE starknet_events_filters (
block_number INTEGER NOT NULL PRIMARY KEY,
bloom BLOB NOT NULL
);
",
)
.context("Creating event Bloom filter table")?;

tracing::info!("Creating Bloom filters for events");

let mut query_statement = tx.prepare(
r"SELECT
block_number,
from_address,
keys
FROM starknet_events
ORDER BY block_number
",
)?;

let mut insert_statement =
tx.prepare(r"INSERT INTO starknet_events_filters (block_number, bloom) VALUES (?, ?)")?;

let mut rows = query_statement.query([])?;

let mut prev_block_number: u64 = 0;
let mut bloom = BloomFilter::new();
let mut events_in_filter: usize = 0;
let mut progress_logged = Instant::now();
const LOG_RATE: Duration = Duration::from_secs(10);

while let Some(row) = rows.next().context("Fetching next receipt")? {
let block_number = row.get_block_number("block_number")?;

let current_block_number = block_number.get();
if current_block_number > prev_block_number {
if current_block_number % 1024 == 0 && progress_logged.elapsed() > LOG_RATE {
tracing::debug!(%current_block_number, "Processing events");
progress_logged = Instant::now();
}

insert_statement.execute(params![prev_block_number, bloom.to_compressed_bytes()])?;

bloom = BloomFilter::new();
prev_block_number = current_block_number;
events_in_filter = 0;
}

let from_address = row.get_contract_address("from_address")?;
let keys = row.get_ref_unwrap("keys").as_str()?;
// no need to allocate a vec for this in loop
let mut temp = [0u8; 32];
let keys: Vec<EventKey> = keys
.split(' ')
.map(|key| {
let used = base64::decode_config_slice(key, base64::STANDARD, &mut temp)
.map_err(anyhow::Error::from)?;
let key = pathfinder_crypto::Felt::from_be_slice(&temp[..used])
.map_err(anyhow::Error::from)?;
Ok(EventKey(key))
})
.collect::<Result<_, anyhow::Error>>()?;

bloom.set_keys(&keys);
bloom.set_address(&from_address);

events_in_filter += 1;
}

if events_in_filter > 0 {
insert_statement.execute(params![prev_block_number, bloom.to_compressed_bytes()])?;
}

tracing::info!("Dropping starknet_events table");
tx.execute_batch(
r"
DROP TABLE starknet_events_keys_03;
DROP TABLE starknet_events;
",
)
.context("Dropping starknet_events table")?;

Ok(())
}

0 comments on commit 900c492

Please sign in to comment.