From c957dd8011213e0e95fa5962e2310321b29a0d16 Mon Sep 17 00:00:00 2001 From: perekopskiy <53865202+perekopskiy@users.noreply.github.com> Date: Mon, 16 Sep 2024 10:58:39 +0300 Subject: [PATCH] fix(eth_watch): fix `get_events_inner` (#2882) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ## What ❔ - `get_events_inner` should recursively call itself - `get_events_inner` should allow passing `None` as topics and/or addresses ## Why ❔ bug fix ## Checklist - [ ] PR title corresponds to the body of PR (we generate changelog entries from PRs). - [ ] Tests for the changes have been added / updated. - [ ] Documentation comments have been added / updated. - [ ] Code has been formatted via `zk fmt` and `zk lint`. --- Cargo.lock | 12 ++++++++ Cargo.toml | 1 + core/node/eth_watch/Cargo.toml | 1 + core/node/eth_watch/src/client.rs | 51 +++++++++++++++++++++---------- 4 files changed, 49 insertions(+), 16 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 59b464f8501d..c47e5b77e391 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -336,6 +336,17 @@ dependencies = [ "windows-sys 0.59.0", ] +[[package]] +name = "async-recursion" +version = "1.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3b43422f69d8ff38f95f1b2bb76517c91589a924d1559a0e935d7c8ce0274c11" +dependencies = [ + "proc-macro2 1.0.86", + "quote 1.0.36", + "syn 2.0.72", +] + [[package]] name = "async-signal" version = "0.2.10" @@ -9823,6 +9834,7 @@ name = "zksync_eth_watch" version = "0.1.0" dependencies = [ "anyhow", + "async-recursion", "async-trait", "thiserror", "tokio", diff --git a/Cargo.toml b/Cargo.toml index 5eb862f0bcb7..1e2fb9e0c7aa 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -105,6 +105,7 @@ categories = ["cryptography"] anyhow = "1" assert_matches = "1.5" async-trait = "0.1" +async-recursion = "1" axum = "0.7.5" backon = "0.4.4" bigdecimal = "0.4.5" diff --git a/core/node/eth_watch/Cargo.toml b/core/node/eth_watch/Cargo.toml index bbdc4ba27d34..a3d6325f4a24 100644 --- a/core/node/eth_watch/Cargo.toml +++ b/core/node/eth_watch/Cargo.toml @@ -24,6 +24,7 @@ anyhow.workspace = true thiserror.workspace = true async-trait.workspace = true tracing.workspace = true +async-recursion.workspace = true [dev-dependencies] zksync_concurrency.workspace = true diff --git a/core/node/eth_watch/src/client.rs b/core/node/eth_watch/src/client.rs index 67e603041e6c..237c8e5bc2e6 100644 --- a/core/node/eth_watch/src/client.rs +++ b/core/node/eth_watch/src/client.rs @@ -100,21 +100,24 @@ impl EthHttpQueryClient { .collect() } + #[async_recursion::async_recursion] async fn get_events_inner( &self, from: BlockNumber, to: BlockNumber, - topics1: Vec, - topics2: Vec, - addresses: Vec
, + topics1: Option>, + topics2: Option>, + addresses: Option>, retries_left: usize, ) -> EnrichedClientResult> { - let filter = FilterBuilder::default() + let mut builder = FilterBuilder::default() .from_block(from) .to_block(to) - .topics(Some(topics1), Some(topics2), None, None) - .address(addresses) - .build(); + .topics(topics1.clone(), topics2.clone(), None, None); + if let Some(addresses) = addresses.clone() { + builder = builder.address(addresses); + } + let filter = builder.build(); let mut result = self.client.logs(&filter).await; // This code is compatible with both Infura and Alchemy API providers. @@ -168,17 +171,33 @@ impl EthHttpQueryClient { tracing::warn!("Splitting block range in half: {from:?} - {mid:?} - {to:?}"); let mut first_half = self - .get_events(from, BlockNumber::Number(mid), RETRY_LIMIT) + .get_events_inner( + from, + BlockNumber::Number(mid), + topics1.clone(), + topics2.clone(), + addresses.clone(), + RETRY_LIMIT, + ) .await?; let mut second_half = self - .get_events(BlockNumber::Number(mid + 1u64), to, RETRY_LIMIT) + .get_events_inner( + BlockNumber::Number(mid + 1u64), + to, + topics1, + topics2, + addresses, + RETRY_LIMIT, + ) .await?; first_half.append(&mut second_half); result = Ok(first_half); } else if should_retry(err_code, err_message) && retries_left > 0 { tracing::warn!("Retrying. Retries left: {retries_left}"); - result = self.get_events(from, to, retries_left - 1).await; + result = self + .get_events_inner(from, to, topics1, topics2, addresses, retries_left - 1) + .await; } } @@ -216,9 +235,9 @@ impl EthClient for EthHttpQueryClient { .get_events_inner( from_block.into(), to_block.into(), - vec![self.new_upgrade_cut_data_signature], - vec![packed_version], - vec![state_transition_manager_address], + Some(vec![self.new_upgrade_cut_data_signature]), + Some(vec![packed_version]), + Some(vec![state_transition_manager_address]), RETRY_LIMIT, ) .await?; @@ -235,9 +254,9 @@ impl EthClient for EthHttpQueryClient { self.get_events_inner( from, to, - self.topics.clone(), - Vec::new(), - self.get_default_address_list(), + Some(self.topics.clone()), + None, + Some(self.get_default_address_list()), retries_left, ) .await