Skip to content

Commit

Permalink
feat(storage): make get_events MAX_BLOCKS_TO_SCAN configurable
Browse files Browse the repository at this point in the history
  • Loading branch information
kkovaacs committed Jan 22, 2024
1 parent d5d9d2c commit b14c1f8
Show file tree
Hide file tree
Showing 7 changed files with 82 additions and 43 deletions.
17 changes: 14 additions & 3 deletions crates/pathfinder/src/bin/pathfinder/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,16 @@ This should only be enabled for debugging purposes as it adds substantial proces
env = "PATHFINDER_STORAGE_BLOOM_FILTER_CACHE_SIZE",
default_value = "524288"
)]
bloom_filter_cache_size: std::num::NonZeroUsize,
event_bloom_filter_cache_size: std::num::NonZeroUsize,

#[arg(
long = "rpc.get-events-max-blocks-to-scan",
long_help = "The number of blocks to scan for events when querying for events. \
This limit is used to prevent queries from taking too long.",
env = "PATHFINDER_RPC_GET_EVENTS_MAX_BLOCKS_TO_SCAN",
default_value = "500"
)]
get_events_max_blocks_to_scan: std::num::NonZeroUsize,
}

#[derive(clap::ValueEnum, Debug, Clone, Copy, PartialEq)]
Expand Down Expand Up @@ -471,7 +480,8 @@ pub struct Config {
pub is_sync_enabled: bool,
pub is_rpc_enabled: bool,
pub gateway_api_key: Option<String>,
pub bloom_filter_cache_size: NonZeroUsize,
pub event_bloom_filter_cache_size: NonZeroUsize,
pub get_events_max_blocks_to_scan: NonZeroUsize,
}

pub struct Ethereum {
Expand Down Expand Up @@ -642,7 +652,8 @@ impl Config {
is_sync_enabled: cli.is_sync_enabled,
is_rpc_enabled: cli.is_rpc_enabled,
gateway_api_key: cli.gateway_api_key,
bloom_filter_cache_size: cli.bloom_filter_cache_size,
event_bloom_filter_cache_size: cli.event_bloom_filter_cache_size,
get_events_max_blocks_to_scan: cli.get_events_max_blocks_to_scan,
}
}
}
Expand Down
9 changes: 7 additions & 2 deletions crates/pathfinder/src/bin/pathfinder/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ async fn async_main() -> anyhow::Result<()> {
let storage_manager = Storage::migrate(
pathfinder_context.database.clone(),
config.sqlite_wal,
config.bloom_filter_cache_size.get(),
config.event_bloom_filter_cache_size.get(),
)
.unwrap();
let sync_storage = storage_manager
Expand Down Expand Up @@ -170,14 +170,19 @@ Hint: This is usually caused by exceeding the file descriptor limit of your syst

let (tx_pending, rx_pending) = tokio::sync::watch::channel(Default::default());

let rpc_config = pathfinder_rpc::context::RpcConfig {
batch_concurrency_limit: config.rpc_batch_concurrency_limit,
get_events_max_blocks_to_scan: config.get_events_max_blocks_to_scan,
};

let context = pathfinder_rpc::context::RpcContext::new(
rpc_storage,
execution_storage,
sync_state.clone(),
pathfinder_context.network_id,
pathfinder_context.gateway.clone(),
rx_pending,
config.rpc_batch_concurrency_limit,
rpc_config,
);

let context = if config.websocket.enabled {
Expand Down
19 changes: 15 additions & 4 deletions crates/rpc/src/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,12 @@ use std::sync::Arc;
type SequencerClient = starknet_gateway_client::Client;
use tokio::sync::watch as tokio_watch;

#[derive(Clone)]
pub struct RpcConfig {
pub batch_concurrency_limit: NonZeroUsize,
pub get_events_max_blocks_to_scan: NonZeroUsize,
}

#[derive(Clone)]
pub struct RpcContext {
pub storage: Storage,
Expand All @@ -21,7 +27,7 @@ pub struct RpcContext {
pub eth_gas_price: gas_price::Cached,
pub sequencer: SequencerClient,
pub websocket: Option<WebsocketContext>,
pub batch_concurrency_limit: NonZeroUsize,
pub config: RpcConfig,
}

impl RpcContext {
Expand All @@ -32,7 +38,7 @@ impl RpcContext {
chain_id: ChainId,
sequencer: SequencerClient,
pending_data: tokio_watch::Receiver<PendingData>,
batch_concurrency_limit: NonZeroUsize,
config: RpcConfig,
) -> Self {
let pending_data = PendingWatcher::new(pending_data);
Self {
Expand All @@ -44,7 +50,7 @@ impl RpcContext {
eth_gas_price: gas_price::Cached::new(sequencer.clone()),
sequencer,
websocket: None,
batch_concurrency_limit,
config,
}
}

Expand Down Expand Up @@ -73,14 +79,19 @@ impl RpcContext {
let sync_state = Arc::new(SyncState::default());
let (_, rx) = tokio_watch::channel(Default::default());

let config = RpcConfig {
batch_concurrency_limit: NonZeroUsize::new(8).unwrap(),
get_events_max_blocks_to_scan: NonZeroUsize::new(1000).unwrap(),
};

Self::new(
storage.clone(),
storage,
sync_state,
chain_id,
sequencer.disable_retry_for_tests(),
rx,
NonZeroUsize::new(8).unwrap(),
config,
)
}

Expand Down
2 changes: 1 addition & 1 deletion crates/rpc/src/jsonrpc/router.rs
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ pub async fn rpc_handler(
}

let responses = run_concurrently(
state.context.batch_concurrency_limit,
state.context.config.batch_concurrency_limit,
requests.into_iter().enumerate(),
|(idx, request)| {
state
Expand Down
14 changes: 8 additions & 6 deletions crates/rpc/src/v03/method/get_events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -167,12 +167,14 @@ pub async fn get_events(
offset: requested_offset,
};

let page = transaction.events(&filter).map_err(|e| match e {
EventFilterError::PageSizeTooBig(_) => GetEventsError::PageSizeTooBig,
EventFilterError::TooManyMatches => GetEventsError::Custom(e.into()),
EventFilterError::Internal(e) => GetEventsError::Internal(e),
EventFilterError::PageSizeTooSmall => GetEventsError::Custom(e.into()),
})?;
let page = transaction
.events(&filter, context.config.get_events_max_blocks_to_scan)
.map_err(|e| match e {
EventFilterError::PageSizeTooBig(_) => GetEventsError::PageSizeTooBig,
EventFilterError::TooManyMatches => GetEventsError::Custom(e.into()),
EventFilterError::Internal(e) => GetEventsError::Internal(e),
EventFilterError::PageSizeTooSmall => GetEventsError::Custom(e.into()),
})?;

let mut events = types::GetEventsResult {
events: page.events.into_iter().map(|e| e.into()).collect(),
Expand Down
9 changes: 7 additions & 2 deletions crates/storage/src/connection.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use std::collections::HashMap;
use std::num::NonZeroUsize;
use std::sync::Arc;

mod block;
Expand Down Expand Up @@ -234,8 +235,12 @@ impl<'inner> Transaction<'inner> {
transaction::transaction_count(self, block)
}

pub fn events(&self, filter: &EventFilter) -> Result<PageOfEvents, EventFilterError> {
event::get_events(self, filter, event::MAX_BLOCKS_TO_SCAN)
pub fn events(
&self,
filter: &EventFilter,
max_blocks_to_scan: NonZeroUsize,
) -> Result<PageOfEvents, EventFilterError> {
event::get_events(self, filter, max_blocks_to_scan)
}

pub fn insert_sierra_class(
Expand Down
Loading

0 comments on commit b14c1f8

Please sign in to comment.