Skip to content

Commit

Permalink
purge aggregate bloom filter
Browse files Browse the repository at this point in the history
  • Loading branch information
sistemd committed Nov 15, 2024
1 parent aeb82cc commit ddcbc8c
Show file tree
Hide file tree
Showing 9 changed files with 127 additions and 100 deletions.
1 change: 1 addition & 0 deletions crates/pathfinder/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ path = "src/lib.rs"
[features]
tokio-console = ["console-subscriber", "tokio/tracing"]
p2p = []
aggregate_bloom = []

[dependencies]
anyhow = { workspace = true }
Expand Down
5 changes: 5 additions & 0 deletions crates/pathfinder/src/state/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1082,6 +1082,11 @@ async fn l2_reorg(
head -= 1;
}

#[cfg(feature = "aggregate_bloom")]
transaction
.reconstruct_running_aggregate()
.context("Reconstructing running aggregate bloom")?;

// Track combined L1 and L2 state.
let l1_l2_head = transaction.l1_l2_pointer().context("Query L1-L2 head")?;
if let Some(l1_l2_head) = l1_l2_head {
Expand Down
5 changes: 5 additions & 0 deletions crates/pathfinder/src/sync/checkpoint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -665,6 +665,11 @@ async fn rollback_to_anchor(
head -= 1;
}

#[cfg(feature = "aggregate_bloom")]
transaction
.reconstruct_running_aggregate()
.context("Reconstructing running aggregate bloom")?;

Ok(())
})
.await
Expand Down
2 changes: 0 additions & 2 deletions crates/rpc/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,6 @@ rust-version = { workspace = true }
[features]
aggregate_bloom = []

default = []

[dependencies]
anyhow = { workspace = true }
async-trait = { workspace = true }
Expand Down
2 changes: 0 additions & 2 deletions crates/storage/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,6 @@ rust-version = { workspace = true }
[features]
aggregate_bloom = []

default = []

[dependencies]
anyhow = { workspace = true }
base64 = { workspace = true }
Expand Down
2 changes: 1 addition & 1 deletion crates/storage/src/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use std::sync::Mutex;
mod block;
mod class;
mod ethereum;
mod event;
pub(crate) mod event;
mod reference;
mod reorg_counter;
mod signature;
Expand Down
9 changes: 9 additions & 0 deletions crates/storage/src/connection/block.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,15 @@ impl Transaction<'_> {
///
/// This includes block header, block body and state update information.
pub fn purge_block(&self, block: BlockNumber) -> anyhow::Result<()> {
self.inner()
.execute(
r"
DELETE FROM starknet_events_filters_aggregate
WHERE from_block <= :block AND to_block >= :block
",
named_params![":block": &block],
)
.context("Deleting aggregate bloom filter")?;
self.inner()
.execute(
"DELETE FROM starknet_events_filters WHERE block_number = ?",
Expand Down
105 changes: 105 additions & 0 deletions crates/storage/src/connection/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,12 +69,28 @@ pub struct PageOfEvents {

impl Transaction<'_> {
#[cfg(feature = "aggregate_bloom")]
pub fn reconstruct_running_aggregate(&self) -> anyhow::Result<()> {
let aggregate = reconstruct_running_aggregate(self.inner())?;
let mut running_aggregate = match self.running_aggregate.lock() {
Ok(guard) => guard,
Err(poisoned) => {
tracing::error!("Poisoned lock in reconstruct_running_aggregate");
poisoned.into_inner()
}
};

*running_aggregate = aggregate;

Ok(())
}

/// Upsert the [aggregate event bloom filter](AggregateBloom) for the given
/// block number. This function operates under the assumption that
/// blocks are _never_ skipped so even if there are no events for a
/// block, this function should still be called with an empty iterator.
/// When testing it is fine to skip blocks, as long as the block at the end
/// of the current range is not skipped.
#[cfg(feature = "aggregate_bloom")]
pub(super) fn upsert_block_events_aggregate<'a>(
&self,
block_number: BlockNumber,
Expand Down Expand Up @@ -619,6 +635,95 @@ impl Transaction<'_> {
}
}

/// Reconstruct the [aggregate](crate::bloom::AggregateBloom) for the range of
/// blocks between the last stored `to_block` in the aggregate Bloom filter
/// table and the last overall block in the database. This is needed because the
/// aggregate Bloom filter for each [block
/// range](crate::bloom::AggregateBloom::BLOCK_RANGE_LEN) is stored once the
/// range is complete, before that it is kept in memory and can be lost upon
/// shutdown.
#[cfg(feature = "aggregate_bloom")]
pub fn reconstruct_running_aggregate(
tx: &rusqlite::Transaction<'_>,
) -> anyhow::Result<AggregateBloom> {
use super::transaction;

let mut last_to_block_stmt = tx.prepare(
r"
SELECT to_block
FROM starknet_events_filters_aggregate
ORDER BY from_block DESC LIMIT 1
",
)?;
let mut events_to_reconstruct_stmt = tx.prepare(
r"
SELECT events
FROM transactions
WHERE block_number >= :first_running_aggregate_block
",
)?;

let last_to_block = last_to_block_stmt
.query_row([], |row| row.get::<_, u64>(0))
.optional()
.context("Querying last stored aggregate to_block")?;

let first_running_aggregate_block = match last_to_block {
Some(last_to_block) => BlockNumber::new_or_panic(last_to_block + 1),
// Aggregate Bloom filter table is empty -> reconstruct running aggregate
// from the genesis block.
None => BlockNumber::GENESIS,
};

let events_to_reconstruct: Vec<Option<Vec<Vec<Event>>>> = events_to_reconstruct_stmt
.query_and_then(
named_params![":first_running_aggregate_block": &first_running_aggregate_block],
|row| {
let events: Option<transaction::dto::EventsForBlock> = row
.get_optional_blob(0)?
.map(|events_blob| -> anyhow::Result<_> {
let events = transaction::compression::decompress_events(events_blob)
.context("Decompressing events")?;
let events: transaction::dto::EventsForBlock =
bincode::serde::decode_from_slice(&events, bincode::config::standard())
.context("Deserializing events")?
.0;

Ok(events)
})
.transpose()?;

Ok(events.map(|events| {
events
.events()
.into_iter()
.map(|e| e.into_iter().map(Into::into).collect())
.collect()
}))
},
)
.context("Querying events to reconstruct")?
.collect::<anyhow::Result<_>>()?;

let mut running_aggregate = AggregateBloom::new(first_running_aggregate_block);

for (block, events_for_block) in events_to_reconstruct.iter().enumerate() {
if let Some(events) = events_for_block {
let block_number = first_running_aggregate_block + block as u64;

let mut bloom = BloomFilter::new();
for event in events.iter().flatten() {
bloom.set_keys(&event.keys);
bloom.set_address(&event.from_address);
}

running_aggregate.add_bloom(&bloom, block_number);
}
}

Ok(running_aggregate)
}

fn continuation_token(
events: &[EmittedEvent],
previous_token: ContinuationToken,
Expand Down
96 changes: 1 addition & 95 deletions crates/storage/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -272,7 +272,7 @@ impl StorageBuilder {
}

#[cfg(feature = "aggregate_bloom")]
let running_aggregate = reconstruct_running_aggregate(&mut connection)
let running_aggregate = event::reconstruct_running_aggregate(&connection.transaction()?)
.context("Reconstructing running aggregate bloom filter")?;

connection
Expand Down Expand Up @@ -514,100 +514,6 @@ fn schema_version(connection: &rusqlite::Connection) -> anyhow::Result<usize> {
Ok(version)
}

/// Reconstruct the [aggregate](bloom::AggregateBloom) for the range of blocks
/// between the last stored to_block in the aggregate Bloom filter table and the
/// last overall block in the database. This is needed because the aggregate
/// Bloom filter for each [block range](bloom::AggregateBloom::BLOCK_RANGE_LEN)
/// is stored once the range is complete, before that it is kept in memory and
/// can be lost upon shutdown.
#[cfg(feature = "aggregate_bloom")]
fn reconstruct_running_aggregate(
connection: &mut rusqlite::Connection,
) -> anyhow::Result<AggregateBloom> {
use bloom::BloomFilter;
use params::{named_params, RowExt};
use pathfinder_common::event::Event;
use transaction;

let tx = connection
.transaction()
.context("Creating database transaction")?;
let mut select_last_to_block_stmt = tx.prepare(
r"
SELECT to_block
FROM starknet_events_filters_aggregate
ORDER BY from_block DESC LIMIT 1
",
)?;
let mut events_to_reconstruct_stmt = tx.prepare(
r"
SELECT events
FROM transactions
WHERE block_number >= :first_running_aggregate_block
",
)?;

let last_to_block = select_last_to_block_stmt
.query_row([], |row| row.get::<_, u64>(0))
.optional()
.context("Querying last stored aggregate to_block")?;

let first_running_aggregate_block = match last_to_block {
Some(last_to_block) => BlockNumber::new_or_panic(last_to_block + 1),
// Aggregate Bloom filter table is empty -> reconstruct running aggregate
// from the genesis block.
None => BlockNumber::GENESIS,
};

let events_to_reconstruct: Vec<Option<Vec<Vec<Event>>>> = events_to_reconstruct_stmt
.query_and_then(
named_params![":first_running_aggregate_block": &first_running_aggregate_block],
|row| {
let events: Option<transaction::dto::EventsForBlock> = row
.get_optional_blob(0)?
.map(|events_blob| -> anyhow::Result<_> {
let events = transaction::compression::decompress_events(events_blob)
.context("Decompressing events")?;
let events: transaction::dto::EventsForBlock =
bincode::serde::decode_from_slice(&events, bincode::config::standard())
.context("Deserializing events")?
.0;

Ok(events)
})
.transpose()?;

Ok(events.map(|events| {
events
.events()
.into_iter()
.map(|e| e.into_iter().map(Into::into).collect())
.collect()
}))
},
)
.context("Querying events to reconstruct")?
.collect::<anyhow::Result<_>>()?;

let mut running_aggregate = AggregateBloom::new(first_running_aggregate_block);

for (block, events_for_block) in events_to_reconstruct.iter().enumerate() {
if let Some(events) = events_for_block {
let block_number = first_running_aggregate_block + block as u64;

let mut bloom = BloomFilter::new();
for event in events.iter().flatten() {
bloom.set_keys(&event.keys);
bloom.set_address(&event.from_address);
}

running_aggregate.add_bloom(&bloom, block_number);
}
}

Ok(running_aggregate)
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down

0 comments on commit ddcbc8c

Please sign in to comment.