Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(bloom): purge aggregate bloom filters #2386

Merged
merged 1 commit into from
Nov 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions crates/pathfinder/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ path = "src/lib.rs"
[features]
tokio-console = ["console-subscriber", "tokio/tracing"]
p2p = []
aggregate_bloom = []

[dependencies]
anyhow = { workspace = true }
Expand Down
5 changes: 5 additions & 0 deletions crates/pathfinder/src/state/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1082,6 +1082,11 @@ async fn l2_reorg(
head -= 1;
}

#[cfg(feature = "aggregate_bloom")]
transaction
.reconstruct_running_aggregate()
.context("Reconstructing running aggregate bloom")?;

// Track combined L1 and L2 state.
let l1_l2_head = transaction.l1_l2_pointer().context("Query L1-L2 head")?;
if let Some(l1_l2_head) = l1_l2_head {
Expand Down
5 changes: 5 additions & 0 deletions crates/pathfinder/src/sync/checkpoint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -665,6 +665,11 @@ async fn rollback_to_anchor(
head -= 1;
}

#[cfg(feature = "aggregate_bloom")]
transaction
.reconstruct_running_aggregate()
.context("Reconstructing running aggregate bloom")?;

Ok(())
})
.await
Expand Down
2 changes: 0 additions & 2 deletions crates/rpc/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,6 @@ rust-version = { workspace = true }
[features]
aggregate_bloom = []

default = []

[dependencies]
anyhow = { workspace = true }
async-trait = { workspace = true }
Expand Down
2 changes: 0 additions & 2 deletions crates/storage/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,6 @@ rust-version = { workspace = true }
[features]
aggregate_bloom = []

default = []

[dependencies]
anyhow = { workspace = true }
base64 = { workspace = true }
Expand Down
2 changes: 1 addition & 1 deletion crates/storage/src/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use std::sync::Mutex;
mod block;
mod class;
mod ethereum;
mod event;
pub(crate) mod event;
mod reference;
mod reorg_counter;
mod signature;
Expand Down
9 changes: 9 additions & 0 deletions crates/storage/src/connection/block.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,15 @@ impl Transaction<'_> {
///
/// This includes block header, block body and state update information.
pub fn purge_block(&self, block: BlockNumber) -> anyhow::Result<()> {
self.inner()
.execute(
r"
DELETE FROM starknet_events_filters_aggregate
WHERE from_block <= :block AND to_block >= :block
",
named_params![":block": &block],
)
.context("Deleting aggregate bloom filter")?;
self.inner()
.execute(
"DELETE FROM starknet_events_filters WHERE block_number = ?",
Expand Down
105 changes: 105 additions & 0 deletions crates/storage/src/connection/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,12 +69,28 @@ pub struct PageOfEvents {

impl Transaction<'_> {
#[cfg(feature = "aggregate_bloom")]
pub fn reconstruct_running_aggregate(&self) -> anyhow::Result<()> {
let aggregate = reconstruct_running_aggregate(self.inner())?;
let mut running_aggregate = match self.running_aggregate.lock() {
Ok(guard) => guard,
Err(poisoned) => {
tracing::error!("Poisoned lock in reconstruct_running_aggregate");
poisoned.into_inner()
}
};

*running_aggregate = aggregate;

Ok(())
}

/// Upsert the [aggregate event bloom filter](AggregateBloom) for the given
/// block number. This function operates under the assumption that
/// blocks are _never_ skipped so even if there are no events for a
/// block, this function should still be called with an empty iterator.
/// When testing it is fine to skip blocks, as long as the block at the end
/// of the current range is not skipped.
#[cfg(feature = "aggregate_bloom")]
pub(super) fn upsert_block_events_aggregate<'a>(
&self,
block_number: BlockNumber,
Expand Down Expand Up @@ -619,6 +635,95 @@ impl Transaction<'_> {
}
}

/// Reconstruct the [aggregate](crate::bloom::AggregateBloom) for the range of
/// blocks between the last stored `to_block` in the aggregate Bloom filter
/// table and the last overall block in the database. This is needed because the
/// aggregate Bloom filter for each [block
/// range](crate::bloom::AggregateBloom::BLOCK_RANGE_LEN) is stored once the
/// range is complete, before that it is kept in memory and can be lost upon
/// shutdown.
#[cfg(feature = "aggregate_bloom")]
pub fn reconstruct_running_aggregate(
tx: &rusqlite::Transaction<'_>,
) -> anyhow::Result<AggregateBloom> {
use super::transaction;

let mut last_to_block_stmt = tx.prepare(
r"
SELECT to_block
FROM starknet_events_filters_aggregate
ORDER BY from_block DESC LIMIT 1
",
)?;
let mut events_to_reconstruct_stmt = tx.prepare(
r"
SELECT events
FROM transactions
WHERE block_number >= :first_running_aggregate_block
",
)?;

let last_to_block = last_to_block_stmt
.query_row([], |row| row.get::<_, u64>(0))
.optional()
.context("Querying last stored aggregate to_block")?;

let first_running_aggregate_block = match last_to_block {
Some(last_to_block) => BlockNumber::new_or_panic(last_to_block + 1),
// Aggregate Bloom filter table is empty -> reconstruct running aggregate
// from the genesis block.
None => BlockNumber::GENESIS,
};

let events_to_reconstruct: Vec<Option<Vec<Vec<Event>>>> = events_to_reconstruct_stmt
.query_and_then(
named_params![":first_running_aggregate_block": &first_running_aggregate_block],
|row| {
let events: Option<transaction::dto::EventsForBlock> = row
.get_optional_blob(0)?
.map(|events_blob| -> anyhow::Result<_> {
let events = transaction::compression::decompress_events(events_blob)
.context("Decompressing events")?;
let events: transaction::dto::EventsForBlock =
bincode::serde::decode_from_slice(&events, bincode::config::standard())
.context("Deserializing events")?
.0;

Ok(events)
})
.transpose()?;

Ok(events.map(|events| {
events
.events()
.into_iter()
.map(|e| e.into_iter().map(Into::into).collect())
.collect()
}))
},
)
.context("Querying events to reconstruct")?
.collect::<anyhow::Result<_>>()?;

let mut running_aggregate = AggregateBloom::new(first_running_aggregate_block);

for (block, events_for_block) in events_to_reconstruct.iter().enumerate() {
if let Some(events) = events_for_block {
let block_number = first_running_aggregate_block + block as u64;

let mut bloom = BloomFilter::new();
for event in events.iter().flatten() {
bloom.set_keys(&event.keys);
bloom.set_address(&event.from_address);
}

running_aggregate.add_bloom(&bloom, block_number);
}
}

Ok(running_aggregate)
}

fn continuation_token(
events: &[EmittedEvent],
previous_token: ContinuationToken,
Expand Down
96 changes: 1 addition & 95 deletions crates/storage/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -272,7 +272,7 @@ impl StorageBuilder {
}

#[cfg(feature = "aggregate_bloom")]
let running_aggregate = reconstruct_running_aggregate(&mut connection)
let running_aggregate = event::reconstruct_running_aggregate(&connection.transaction()?)
.context("Reconstructing running aggregate bloom filter")?;

connection
Expand Down Expand Up @@ -514,100 +514,6 @@ fn schema_version(connection: &rusqlite::Connection) -> anyhow::Result<usize> {
Ok(version)
}

/// Reconstruct the [aggregate](bloom::AggregateBloom) for the range of blocks
/// between the last stored to_block in the aggregate Bloom filter table and the
/// last overall block in the database. This is needed because the aggregate
/// Bloom filter for each [block range](bloom::AggregateBloom::BLOCK_RANGE_LEN)
/// is stored once the range is complete, before that it is kept in memory and
/// can be lost upon shutdown.
#[cfg(feature = "aggregate_bloom")]
fn reconstruct_running_aggregate(
connection: &mut rusqlite::Connection,
) -> anyhow::Result<AggregateBloom> {
use bloom::BloomFilter;
use params::{named_params, RowExt};
use pathfinder_common::event::Event;
use transaction;

let tx = connection
.transaction()
.context("Creating database transaction")?;
let mut select_last_to_block_stmt = tx.prepare(
r"
SELECT to_block
FROM starknet_events_filters_aggregate
ORDER BY from_block DESC LIMIT 1
",
)?;
let mut events_to_reconstruct_stmt = tx.prepare(
r"
SELECT events
FROM transactions
WHERE block_number >= :first_running_aggregate_block
",
)?;

let last_to_block = select_last_to_block_stmt
.query_row([], |row| row.get::<_, u64>(0))
.optional()
.context("Querying last stored aggregate to_block")?;

let first_running_aggregate_block = match last_to_block {
Some(last_to_block) => BlockNumber::new_or_panic(last_to_block + 1),
// Aggregate Bloom filter table is empty -> reconstruct running aggregate
// from the genesis block.
None => BlockNumber::GENESIS,
};

let events_to_reconstruct: Vec<Option<Vec<Vec<Event>>>> = events_to_reconstruct_stmt
.query_and_then(
named_params![":first_running_aggregate_block": &first_running_aggregate_block],
|row| {
let events: Option<transaction::dto::EventsForBlock> = row
.get_optional_blob(0)?
.map(|events_blob| -> anyhow::Result<_> {
let events = transaction::compression::decompress_events(events_blob)
.context("Decompressing events")?;
let events: transaction::dto::EventsForBlock =
bincode::serde::decode_from_slice(&events, bincode::config::standard())
.context("Deserializing events")?
.0;

Ok(events)
})
.transpose()?;

Ok(events.map(|events| {
events
.events()
.into_iter()
.map(|e| e.into_iter().map(Into::into).collect())
.collect()
}))
},
)
.context("Querying events to reconstruct")?
.collect::<anyhow::Result<_>>()?;

let mut running_aggregate = AggregateBloom::new(first_running_aggregate_block);

for (block, events_for_block) in events_to_reconstruct.iter().enumerate() {
if let Some(events) = events_for_block {
let block_number = first_running_aggregate_block + block as u64;

let mut bloom = BloomFilter::new();
for event in events.iter().flatten() {
bloom.set_keys(&event.keys);
bloom.set_address(&event.from_address);
}

running_aggregate.add_bloom(&bloom, block_number);
}
}

Ok(running_aggregate)
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down