Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(storage): add Bloom filter based event lookups #1679

Merged
merged 27 commits into from
Jan 24, 2024
Merged
Show file tree
Hide file tree
Changes from 23 commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
8b96de9
feat(storage): do migration with `journal_mode=delete`
kkovaacs Jan 11, 2024
900c492
feat(storage): add per-block Bloom filter for events
kkovaacs Dec 18, 2023
8cd0a77
feat(storage): update per-block Bloom filter when inserting transacti…
kkovaacs Dec 18, 2023
0e4ef7e
feat(storage): filter events using Bloom filter
kkovaacs Dec 19, 2023
9c2a39d
feat(storage): add global cache to bloom filters
kkovaacs Dec 20, 2023
e7a70e9
fix(storage/event): add upper limit for blocks scanned
kkovaacs Jan 9, 2024
ce71db7
feat(storage): add `receipts_for_block` getter
kkovaacs Jan 10, 2024
601bd9d
chore(storage): remove `base64_felts_to_index_prefixed_base32_felts` …
kkovaacs Jan 10, 2024
8a78cc1
fix(storage/event): handle missing Bloom filters
kkovaacs Jan 11, 2024
4e8cbcb
fix(storage): remove Bloom filter cache entry when purging a block
kkovaacs Jan 12, 2024
76a781e
fix(storage/connection): don't re-export everything from the `event` …
kkovaacs Jan 12, 2024
fb08760
feat(storage): add `block_hash()` getter
kkovaacs Jan 16, 2024
3479318
feat(storage): add reorg_counter
kkovaacs Jan 16, 2024
6e8c25c
feat(storage): add Bloom filter cache to Storage/Connection/Transaction
kkovaacs Jan 15, 2024
26238b0
chore: upgrade test database fixture
kkovaacs Jan 16, 2024
c027a8e
fixup! feat(storage): update per-block Bloom filter when inserting tr…
kkovaacs Jan 18, 2024
0757793
feat(storage): filter events using Bloom filter
kkovaacs Jan 22, 2024
31062ed
feat(storage/event): return continuation token if bumping into limits
kkovaacs Jan 19, 2024
d5d9d2c
chore: make `test-log` a workspace dependency
kkovaacs Jan 22, 2024
b14c1f8
feat(storage): make `get_events` MAX_BLOCKS_TO_SCAN configurable
kkovaacs Jan 22, 2024
b30cece
fixup! feat(storage): add per-block Bloom filter for events
kkovaacs Jan 22, 2024
ffdcc94
feat(storage/event): add limit for number of Bloom filters loaded
kkovaacs Jan 22, 2024
cad71fd
chore: update CHANGELOG
kkovaacs Jan 22, 2024
7210b35
Update crates/storage/src/connection/event.rs
kkovaacs Jan 23, 2024
91f6694
Update crates/storage/src/connection/event.rs
kkovaacs Jan 23, 2024
122a424
fix(pathfinder/sync): add missing increment step of reorg counter
kkovaacs Jan 23, 2024
387663b
fix(pathfinder/config): rename Bloom filter load limit argument
kkovaacs Jan 23, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions CHANGELOG.md
Mirko-von-Leipzig marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,12 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

- `starknet_getEvents` incorrectly evaluates empty sub-lists in key filters for pending events.

### Changed

- `starknet_getEvents` implementation is now using a much simpler implementation that no longer relies on SQLite queries. In general this leads to more consistent query times and a roughly 20% smaller database.
- The migration step involves computing Bloom filters for all blocks and dropping database tables no longer needed. This takes more than one hour for a mainnet database.
- The new `storage.event-bloom-filter-cache-size`, `rpc.get-events-max-blocks-to-scan` and `rpc.get-events-max-bloom-filters-to-load` arguments control some aspects of the algorithm.

## [0.10.3] - 2024-01-04

### Added
Expand Down
25 changes: 23 additions & 2 deletions Cargo.lock
Mirko-von-Leipzig marked this conversation as resolved.
Show resolved Hide resolved

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,9 @@ serde_with = "3.0.0"
sha3 = "0.10"
# This one needs to match the version used by blockifier
starknet_api = "=0.6.0"
test-log = { version = "0.2.12", default-features = false, features = [
"trace",
] }
thiserror = "1.0.48"
tokio = "1.29.1"
tracing = "0.1.37"
Expand Down
5 changes: 5 additions & 0 deletions crates/crypto/src/algebra/field/felt.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,11 @@ impl Felt {
&self.0
}

/// Big-endian mutable representation of this [Felt].
pub fn as_mut_be_bytes(&mut self) -> &mut [u8; 32] {
&mut self.0
}

/// Convenience function which extends [Felt::from_be_bytes] to work with slices.
pub const fn from_be_slice(bytes: &[u8]) -> Result<Self, OverflowError> {
if bytes.len() > 32 {
Expand Down
4 changes: 2 additions & 2 deletions crates/executor/src/execution_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,9 +50,9 @@ impl<'tx> ExecutionState<'tx> {
if self.execute_on_parent_state && self.header.number.get() >= 10 {
let block_number_whose_hash_becomes_available =
pathfinder_common::BlockNumber::new_or_panic(self.header.number.get() - 10);
let (_, block_hash) = self
let block_hash = self
.transaction
.block_id(block_number_whose_hash_becomes_available.into())?
.block_hash(block_number_whose_hash_becomes_available.into())?
.context("Getting historical block hash")?;

tracing::trace!(%block_number_whose_hash_becomes_available, %block_hash, "Setting historical block hash");
Expand Down
4 changes: 1 addition & 3 deletions crates/gateway-client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,7 @@ lazy_static = { workspace = true }
pathfinder-crypto = { path = "../crypto" }
pretty_assertions_sorted = { workspace = true }
starknet-gateway-test-fixtures = { path = "../gateway-test-fixtures" }
test-log = { version = "0.2.12", default-features = false, features = [
"trace",
] }
test-log = { workspace = true }
tracing-subscriber = { workspace = true }

[[test]]
Expand Down
4 changes: 1 addition & 3 deletions crates/p2p/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,6 @@ fake = { workspace = true }
hex = { workspace = true }
rand = { workspace = true }
rstest = { workspace = true }
test-log = { version = "0.2.12", default-features = false, features = [
"trace",
] }
test-log = { workspace = true }
tokio = { version = "1.32.0", features = ["test-util"] }
tracing-subscriber = { version = "0.3.17", features = ["env-filter"] }
6 changes: 3 additions & 3 deletions crates/pathfinder/examples/feeder_gateway.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ async fn serve() -> anyhow::Result<()> {
let storage = pathfinder_storage::Storage::migrate(
database_path.into(),
pathfinder_storage::JournalMode::WAL,
1,
)?
.create_pool(NonZeroU32::new(10).unwrap())
.unwrap();
Expand Down Expand Up @@ -295,10 +296,9 @@ fn get_chain(tx: &pathfinder_storage::Transaction<'_>) -> anyhow::Result<Chain>
};

let genesis_hash = tx
.block_id(BlockNumber::GENESIS.into())
.block_hash(BlockNumber::GENESIS.into())
.unwrap()
.context("Getting genesis hash")?
.1;
.context("Getting genesis hash")?;

let chain = match genesis_hash {
MAINNET_GENESIS_HASH => Chain::Mainnet,
Expand Down
2 changes: 1 addition & 1 deletion crates/pathfinder/examples/migrate_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ fn main() {
let size_before = std::fs::metadata(&path).expect("Path does not exist").len() as i64;

let started_at = std::time::Instant::now();
pathfinder_storage::Storage::migrate(path.clone(), pathfinder_storage::JournalMode::WAL)
pathfinder_storage::Storage::migrate(path.clone(), pathfinder_storage::JournalMode::WAL, 1)
.unwrap();
let migrated_at = std::time::Instant::now();

Expand Down
2 changes: 1 addition & 1 deletion crates/pathfinder/examples/re_execute.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ fn main() -> anyhow::Result<()> {
let n_cpus = std::thread::available_parallelism().unwrap().get();

let database_path = std::env::args().nth(1).unwrap();
let storage = Storage::migrate(database_path.into(), JournalMode::WAL)?
let storage = Storage::migrate(database_path.into(), JournalMode::WAL, 1)?
.create_pool(NonZeroU32::new(n_cpus as u32 * 2).unwrap())?;
let mut db = storage
.connection()
Expand Down
2 changes: 1 addition & 1 deletion crates/pathfinder/examples/verify_block_hashes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ fn main() -> anyhow::Result<()> {
};

let database_path = std::env::args().nth(2).unwrap();
let storage = Storage::migrate(database_path.into(), JournalMode::WAL)?
let storage = Storage::migrate(database_path.into(), JournalMode::WAL, 1)?
.create_pool(NonZeroU32::new(1).unwrap())
.unwrap();
let mut db = storage
Expand Down
2 changes: 1 addition & 1 deletion crates/pathfinder/examples/verify_transaction_hashes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ fn main() -> anyhow::Result<()> {

println!("Migrating database...");

let storage = Storage::migrate(database_path.into(), JournalMode::WAL)?
let storage = Storage::migrate(database_path.into(), JournalMode::WAL, 1)?
.create_pool(NonZeroU32::new(1).unwrap())
.unwrap();
let mut db = storage
Expand Down
34 changes: 34 additions & 0 deletions crates/pathfinder/src/bin/pathfinder/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,34 @@ This should only be enabled for debugging purposes as it adds substantial proces
env = "PATHFINDER_GATEWAY_API_KEY"
)]
gateway_api_key: Option<String>,

#[arg(
long = "storage.event-bloom-filter-cache-size",
long_help = "The number of blocks whose event bloom filters are cached in memory. \
This cache speeds up event related RPC queries at the cost of using extra memory. \
Each cached filter takes 2 KiB of memory.",
env = "PATHFINDER_STORAGE_BLOOM_FILTER_CACHE_SIZE",
default_value = "524288"
)]
event_bloom_filter_cache_size: std::num::NonZeroUsize,

#[arg(
long = "rpc.get-events-max-blocks-to-scan",
long_help = "The number of blocks to scan for events when querying for events. \
This limit is used to prevent queries from taking too long.",
env = "PATHFINDER_RPC_GET_EVENTS_MAX_BLOCKS_TO_SCAN",
default_value = "500"
)]
get_events_max_blocks_to_scan: std::num::NonZeroUsize,

#[arg(
long = "rpc.get-events-max-bloom-filters-to-load",
long_help = "The number of Bloom filters to load for events when querying for events. \
This limit is used to prevent queries from taking too long.",
env = "PATHFINDER_RPC_GET_EVENTS_MAX_BLOOM_FILTERS_TO_LOAD",
default_value = "100000"
)]
get_events_max_bloom_filters_to_load: std::num::NonZeroUsize,
kkovaacs marked this conversation as resolved.
Show resolved Hide resolved
}

#[derive(clap::ValueEnum, Debug, Clone, Copy, PartialEq)]
Expand Down Expand Up @@ -461,6 +489,9 @@ pub struct Config {
pub is_sync_enabled: bool,
pub is_rpc_enabled: bool,
pub gateway_api_key: Option<String>,
pub event_bloom_filter_cache_size: NonZeroUsize,
pub get_events_max_blocks_to_scan: NonZeroUsize,
pub get_events_max_bloom_filters_to_load: NonZeroUsize,
}

pub struct Ethereum {
Expand Down Expand Up @@ -631,6 +662,9 @@ impl Config {
is_sync_enabled: cli.is_sync_enabled,
is_rpc_enabled: cli.is_rpc_enabled,
gateway_api_key: cli.gateway_api_key,
event_bloom_filter_cache_size: cli.event_bloom_filter_cache_size,
get_events_max_blocks_to_scan: cli.get_events_max_blocks_to_scan,
get_events_max_bloom_filters_to_load: cli.get_events_max_bloom_filters_to_load,
}
}
}
Expand Down
17 changes: 14 additions & 3 deletions crates/pathfinder/src/bin/pathfinder/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,8 +105,13 @@ async fn async_main() -> anyhow::Result<()> {
verify_networks(pathfinder_context.network, ethereum.chain)?;

// Setup and verify database
let storage_manager =
Storage::migrate(pathfinder_context.database.clone(), config.sqlite_wal).unwrap();

let storage_manager = Storage::migrate(
pathfinder_context.database.clone(),
config.sqlite_wal,
config.event_bloom_filter_cache_size.get(),
)
.unwrap();
let sync_storage = storage_manager
// 5 is enough for normal sync operations, and then `available_parallelism` for
// the rayon thread pool workers to use.
Expand Down Expand Up @@ -165,14 +170,20 @@ Hint: This is usually caused by exceeding the file descriptor limit of your syst

let (tx_pending, rx_pending) = tokio::sync::watch::channel(Default::default());

let rpc_config = pathfinder_rpc::context::RpcConfig {
batch_concurrency_limit: config.rpc_batch_concurrency_limit,
get_events_max_blocks_to_scan: config.get_events_max_blocks_to_scan,
get_events_max_bloom_filters_to_load: config.get_events_max_bloom_filters_to_load,
};

let context = pathfinder_rpc::context::RpcContext::new(
rpc_storage,
execution_storage,
sync_state.clone(),
pathfinder_context.network_id,
pathfinder_context.gateway.clone(),
rx_pending,
config.rpc_batch_concurrency_limit,
rpc_config,
);

let context = if config.websocket.enabled {
Expand Down
6 changes: 3 additions & 3 deletions crates/pathfinder/src/p2p_network/sync_handlers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,7 @@ fn get_transactions_for_block(
block_number: BlockNumber,
responses: &mut Vec<TransactionsResponse>,
) -> anyhow::Result<bool> {
let Some((_, block_hash)) = tx.block_id(block_number.into())? else {
let Some(block_hash) = tx.block_hash(block_number.into())? else {
return Ok(false);
};

Expand Down Expand Up @@ -286,7 +286,7 @@ fn get_receipts_for_block(
block_number: BlockNumber,
responses: &mut Vec<ReceiptsResponse>,
) -> anyhow::Result<bool> {
let Some((_, block_hash)) = tx.block_id(block_number.into())? else {
let Some(block_hash) = tx.block_hash(block_number.into())? else {
return Ok(false);
};

Expand Down Expand Up @@ -318,7 +318,7 @@ fn get_events_for_block(
block_number: BlockNumber,
responses: &mut Vec<EventsResponse>,
) -> anyhow::Result<bool> {
let Some((_, block_hash)) = tx.block_id(block_number.into())? else {
let Some(block_hash) = tx.block_hash(block_number.into())? else {
return Ok(false);
};

Expand Down
5 changes: 2 additions & 3 deletions crates/pathfinder/src/state/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -671,9 +671,8 @@ async fn l1_update(
.context("Insert update")?;

let l2_hash = transaction
.block_id(update.block_number.into())
.context("Fetching block hash")?
.map(|(_, hash)| hash);
.block_hash(update.block_number.into())
.context("Fetching block hash")?;

if let Some(l2_hash) = l2_hash {
if l2_hash == update.block_hash {
Expand Down
4 changes: 1 addition & 3 deletions crates/rpc/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,6 @@ pathfinder-crypto = { path = "../crypto" }
pretty_assertions_sorted = { workspace = true }
rstest = { workspace = true }
tempfile = "3.6"
test-log = { version = "0.2.12", default-features = false, features = [
"trace",
] }
test-log = { workspace = true }
tokio-tungstenite = "0.20"
tracing-subscriber = { workspace = true }
Binary file modified crates/rpc/fixtures/mainnet.sqlite
Binary file not shown.
21 changes: 17 additions & 4 deletions crates/rpc/src/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,13 @@ use std::sync::Arc;
type SequencerClient = starknet_gateway_client::Client;
use tokio::sync::watch as tokio_watch;

#[derive(Clone)]
pub struct RpcConfig {
pub batch_concurrency_limit: NonZeroUsize,
pub get_events_max_blocks_to_scan: NonZeroUsize,
pub get_events_max_bloom_filters_to_load: NonZeroUsize,
}

#[derive(Clone)]
pub struct RpcContext {
pub storage: Storage,
Expand All @@ -21,7 +28,7 @@ pub struct RpcContext {
pub eth_gas_price: gas_price::Cached,
pub sequencer: SequencerClient,
pub websocket: Option<WebsocketContext>,
pub batch_concurrency_limit: NonZeroUsize,
pub config: RpcConfig,
}

impl RpcContext {
Expand All @@ -32,7 +39,7 @@ impl RpcContext {
chain_id: ChainId,
sequencer: SequencerClient,
pending_data: tokio_watch::Receiver<PendingData>,
batch_concurrency_limit: NonZeroUsize,
config: RpcConfig,
) -> Self {
let pending_data = PendingWatcher::new(pending_data);
Self {
Expand All @@ -44,7 +51,7 @@ impl RpcContext {
eth_gas_price: gas_price::Cached::new(sequencer.clone()),
sequencer,
websocket: None,
batch_concurrency_limit,
config,
}
}

Expand Down Expand Up @@ -73,14 +80,20 @@ impl RpcContext {
let sync_state = Arc::new(SyncState::default());
let (_, rx) = tokio_watch::channel(Default::default());

let config = RpcConfig {
batch_concurrency_limit: NonZeroUsize::new(8).unwrap(),
get_events_max_blocks_to_scan: NonZeroUsize::new(1000).unwrap(),
get_events_max_bloom_filters_to_load: NonZeroUsize::new(1000).unwrap(),
};

Self::new(
storage.clone(),
storage,
sync_state,
chain_id,
sequencer.disable_retry_for_tests(),
rx,
NonZeroUsize::new(8).unwrap(),
config,
)
}

Expand Down
Loading