Skip to content

Commit

Permalink
feat(storage): update per-block Bloom filter when inserting transacti…
Browse files Browse the repository at this point in the history
…ons and receipt
  • Loading branch information
kkovaacs committed Jan 22, 2024
1 parent 900c492 commit 8cd0a77
Show file tree
Hide file tree
Showing 4 changed files with 30 additions and 89 deletions.
4 changes: 0 additions & 4 deletions crates/storage/src/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -204,10 +204,6 @@ impl<'inner> Transaction<'inner> {
event::get_events(self, filter)
}

pub fn event_count_for_block(&self, block: BlockId) -> anyhow::Result<usize> {
event::event_count_for_block(self, block)
}

pub fn insert_sierra_class(
&self,
sierra_hash: &SierraHash,
Expand Down
7 changes: 7 additions & 0 deletions crates/storage/src/connection/block.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,13 @@ fn intern_starknet_version(tx: &Transaction<'_>, version: &StarknetVersion) -> a
}

pub(super) fn purge_block(tx: &Transaction<'_>, block: BlockNumber) -> anyhow::Result<()> {
tx.inner()
.execute(
"DELETE FROM starknet_events_filters WHERE block_number = ?",
params![&block],
)
.context("Deleting bloom filter")?;

tx.inner()
.execute(
r"DELETE FROM starknet_transactions WHERE block_hash = (
Expand Down
99 changes: 18 additions & 81 deletions crates/storage/src/connection/event.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use crate::bloom::BloomFilter;
use crate::params::ToSql;
use crate::{prelude::*, BlockId};
use crate::prelude::*;

use anyhow::Context;
use pathfinder_common::event::Event;
Expand Down Expand Up @@ -66,71 +67,29 @@ pub struct KeyFilterResult<'a> {
pub param: (&'static str, rusqlite::types::ToSqlOutput<'a>),
}

pub(super) fn insert_events(
pub(super) fn insert_block_events<'a>(
tx: &Transaction<'_>,
block_number: BlockNumber,
transaction_hash: TransactionHash,
events: &[Event],
events: impl Iterator<Item = &'a Event>,
) -> anyhow::Result<()> {
let mut stmt = tx.inner().prepare(
r"INSERT INTO starknet_events ( block_number, idx, transaction_hash, from_address, keys, data)
VALUES (:block_number, :idx, :transaction_hash, :from_address, :keys, :data)"
)?;
let mut stmt = tx
.inner()
.prepare("INSERT INTO starknet_events_filters (block_number, bloom) VALUES (?, ?)")?;

let mut bloom = BloomFilter::new();
for event in events {
for (i, key) in event.keys.iter().enumerate() {
let mut key = key.0;
key.as_mut_be_bytes()[0] |= (i as u8) << 4;
bloom.set(&key);
}

let mut keys = String::new();
let mut buffer = Vec::new();

for (idx, event) in events.iter().enumerate() {
keys.clear();
event_keys_to_base64_strings(&event.keys, &mut keys);

buffer.clear();
encode_event_data_to_bytes(&event.data, &mut buffer);

stmt.execute(named_params![
":block_number": &block_number,
":idx": &idx.try_into_sql_int()?,
":transaction_hash": &transaction_hash,
":from_address": &event.from_address,
":keys": &keys,
":data": &buffer,
])
.context("Insert events into events table")?;
bloom.set(&event.from_address.0);
}
Ok(())
}

pub(super) fn event_count_for_block(tx: &Transaction<'_>, block: BlockId) -> anyhow::Result<usize> {
match block {
BlockId::Number(number) => tx
.inner()
.query_row(
"SELECT COUNT(*) FROM starknet_events
WHERE block_number = ?1",
params![&number],
|row| row.get(0),
)
.context("Counting events"),
BlockId::Hash(hash) => tx
.inner()
.query_row(
"SELECT COUNT(*) FROM starknet_events
JOIN block_headers ON starknet_events.block_number = block_headers.number
WHERE block_headers.hash = ?1",
params![&hash],
|row| row.get(0),
)
.context("Counting transactions"),
BlockId::Latest => {
// First get the latest block
let block = match tx.block_id(BlockId::Latest)? {
Some((number, _)) => number,
None => return Ok(0),
};
stmt.execute(params![&block_number, &bloom.as_compressed_bytes()])?;

event_count_for_block(tx, block.into())
}
}
Ok(())
}

pub(super) fn get_events<K: KeyFilter>(
Expand Down Expand Up @@ -1209,26 +1168,4 @@ mod tests {
None => assert_eq!(result, None),
}
}

#[test]
fn event_count_for_block() {
let (storage, test_data) = test_utils::setup_test_storage();
let mut connection = storage.connection().unwrap();
let tx = connection.transaction().unwrap();

assert_eq!(
tx.event_count_for_block(BlockId::Latest).unwrap(),
test_utils::EVENTS_PER_BLOCK
);
assert_eq!(
tx.event_count_for_block(BlockId::Number(BlockNumber::new_or_panic(1)))
.unwrap(),
test_utils::EVENTS_PER_BLOCK
);
assert_eq!(
tx.event_count_for_block(BlockId::Hash(test_data.headers.first().unwrap().hash))
.unwrap(),
test_utils::EVENTS_PER_BLOCK
);
}
}
9 changes: 5 additions & 4 deletions crates/storage/src/connection/transaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,12 +49,13 @@ pub(super) fn insert_transactions(
":receipt": &serialized_receipt,
":execution_status": &execution_status,
]).context("Inserting transaction data")?;

// insert events from receipt
super::event::insert_events(tx, block_number, receipt.transaction_hash, &receipt.events)
.context("Inserting events")?;
}

let events = transaction_data
.iter()
.flat_map(|(_, receipt)| &receipt.events);
super::event::insert_block_events(tx, block_number, events)
.context("Inserting events into Bloom filter")?;
Ok(())
}

Expand Down

0 comments on commit 8cd0a77

Please sign in to comment.