Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(eth-watch): redesign to support multiple chains #2867

Merged
merged 17 commits into from
Sep 26, 2024
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 9 additions & 0 deletions core/lib/contracts/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,11 @@ const GETTERS_FACET_CONTRACT_FILE: (&str, &str) = (

const MULTICALL3_CONTRACT_FILE: (&str, &str) = ("dev-contracts", "Multicall3.sol/Multicall3.json");
const VERIFIER_CONTRACT_FILE: (&str, &str) = ("state-transition", "Verifier.sol/Verifier.json");

const GETTERS_CONTRACT_FILE: (&str, &str) = (
"state-transition/chain-interfaces",
"IGetters.sol/IGetters.json",
);
tomg10 marked this conversation as resolved.
Show resolved Hide resolved
const _IERC20_CONTRACT_FILE: &str =
"contracts/l1-contracts/artifacts/contracts/common/interfaces/IERC20.sol/IERC20.json";
const _FAIL_ON_RECEIVE_CONTRACT_FILE: &str =
Expand Down Expand Up @@ -162,6 +167,10 @@ pub fn verifier_contract() -> Contract {
load_contract_for_both_compilers(VERIFIER_CONTRACT_FILE)
}

pub fn getters_contract() -> Contract {
load_contract_for_both_compilers(GETTERS_CONTRACT_FILE)
}

#[derive(Debug, Clone)]
pub struct TestContract {
/// Contract bytecode to be used for sending deploy transaction.
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
DROP TABLE IF EXISTS processed_events;

DROP TYPE IF EXISTS event_type;

Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
CREATE TYPE event_type AS ENUM ('ProtocolUpgrades', 'PriorityTransactions');

CREATE TABLE processed_events
(
type event_type NOT NULL,
chain_id BIGINT NOT NULL,
next_block_to_process BIGINT NOT NULL,
PRIMARY KEY (chain_id, type)
)
152 changes: 152 additions & 0 deletions core/lib/dal/src/eth_watcher_dal.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
use zksync_db_connection::{connection::Connection, error::DalResult, instrument::InstrumentExt};
use zksync_types::SLChainId;

use crate::Core;

pub struct ProcessedEventsDal<'a, 'c> {
tomg10 marked this conversation as resolved.
Show resolved Hide resolved
pub(crate) storage: &'a mut Connection<'c, Core>,
}

#[derive(Debug, Copy, Clone, sqlx::Type)]
#[sqlx(type_name = "event_type")]
pub enum EventType {
ProtocolUpgrades,
PriorityTransactions,
}

impl ProcessedEventsDal<'_, '_> {
pub async fn get_or_set_next_block_to_process(
tomg10 marked this conversation as resolved.
Show resolved Hide resolved
&mut self,
event_type: EventType,
chain_id: SLChainId,
next_block_to_process: u64,
) -> DalResult<u64> {
let result = sqlx::query!(
r#"
SELECT
next_block_to_process
FROM
processed_events
WHERE
TYPE = $1
AND chain_id = $2
"#,
event_type as EventType,
chain_id.0 as i64
)
.instrument("get_or_set_next_block_to_process")
.with_arg("event_type", &event_type)
.with_arg("chain_id", &chain_id)
.fetch_optional(self.storage)
.await?;

if let Some(row) = result {
Ok(row.next_block_to_process as u64)
} else {
sqlx::query!(
r#"
INSERT INTO
processed_events (
TYPE,
chain_id,
next_block_to_process
)
VALUES
($1, $2, $3)
"#,
event_type as EventType,
chain_id.0 as i64,
next_block_to_process as i64
)
.instrument("get_or_set_next_block_to_process - insert")
.with_arg("event_type", &event_type)
.with_arg("chain_id", &chain_id)
.execute(self.storage)
.await?;

Ok(next_block_to_process)
}
}

pub async fn update_next_block_to_process(
&mut self,
event_type: EventType,
chain_id: SLChainId,
next_block_to_process: u64,
) -> DalResult<()> {
sqlx::query!(
r#"
UPDATE processed_events
SET
next_block_to_process = $3
WHERE
TYPE = $1
AND chain_id = $2
"#,
event_type as EventType,
chain_id.0 as i64,
next_block_to_process as i64
)
.instrument("update_next_block_to_process")
.with_arg("event_type", &event_type)
.with_arg("chain_id", &chain_id)
.execute(self.storage)
.await?;
Ok(())
}
}

#[cfg(test)]
mod tests {
use super::*;
use crate::{ConnectionPool, Core, CoreDal};

#[tokio::test]
async fn test_get_or_set_next_block_to_process_with_different_event_types() {
let pool = ConnectionPool::<Core>::test_pool().await;
let mut conn = pool.connection().await.unwrap();
let mut dal = conn.processed_events_dal();

// Test with ProtocolUpgrades
let next_block = dal
.get_or_set_next_block_to_process(EventType::ProtocolUpgrades, SLChainId(1), 100)
.await
.expect("Failed to get or set next block to process");
assert_eq!(next_block, 100);

// Test with PriorityTransactions
let next_block = dal
.get_or_set_next_block_to_process(EventType::PriorityTransactions, SLChainId(1), 200)
.await
.expect("Failed to get or set next block to process");
assert_eq!(next_block, 200);

// Test with PriorityTransactions
let next_block = dal
.get_or_set_next_block_to_process(EventType::PriorityTransactions, SLChainId(2), 300)
.await
.expect("Failed to get or set next block to process");
assert_eq!(next_block, 300);

// Verify that the initial block is not updated for ProtocolUpgrades
let next_block = dal
.get_or_set_next_block_to_process(EventType::ProtocolUpgrades, SLChainId(1), 150)
.await
.expect("Failed to get or set next block to process");
assert_eq!(next_block, 100);

// Verify that the initial block is not updated for PriorityTransactions
let next_block = dal
.get_or_set_next_block_to_process(EventType::PriorityTransactions, SLChainId(1), 250)
.await
.expect("Failed to get or set next block to process");
assert_eq!(next_block, 200);

// Verify that the initial block is not updated for PriorityTransactions
let next_block = dal
.get_or_set_next_block_to_process(EventType::PriorityTransactions, SLChainId(2), 350)
.await
.expect("Failed to get or set next block to process");
assert_eq!(next_block, 300);
}
}
12 changes: 10 additions & 2 deletions core/lib/dal/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,9 @@ use crate::{
base_token_dal::BaseTokenDal, blocks_dal::BlocksDal, blocks_web3_dal::BlocksWeb3Dal,
consensus_dal::ConsensusDal, contract_verification_dal::ContractVerificationDal,
data_availability_dal::DataAvailabilityDal, eth_sender_dal::EthSenderDal,
events_dal::EventsDal, events_web3_dal::EventsWeb3Dal, factory_deps_dal::FactoryDepsDal,
proof_generation_dal::ProofGenerationDal, protocol_versions_dal::ProtocolVersionsDal,
eth_watcher_dal::ProcessedEventsDal, events_dal::EventsDal, events_web3_dal::EventsWeb3Dal,
factory_deps_dal::FactoryDepsDal, proof_generation_dal::ProofGenerationDal,
protocol_versions_dal::ProtocolVersionsDal,
protocol_versions_web3_dal::ProtocolVersionsWeb3Dal, pruning_dal::PruningDal,
snapshot_recovery_dal::SnapshotRecoveryDal, snapshots_creator_dal::SnapshotsCreatorDal,
snapshots_dal::SnapshotsDal, storage_logs_dal::StorageLogsDal,
Expand All @@ -35,6 +36,7 @@ pub mod consensus_dal;
pub mod contract_verification_dal;
mod data_availability_dal;
pub mod eth_sender_dal;
pub mod eth_watcher_dal;
pub mod events_dal;
pub mod events_web3_dal;
pub mod factory_deps_dal;
Expand Down Expand Up @@ -132,6 +134,8 @@ where
fn vm_runner_dal(&mut self) -> VmRunnerDal<'_, 'a>;

fn base_token_dal(&mut self) -> BaseTokenDal<'_, 'a>;

fn processed_events_dal(&mut self) -> ProcessedEventsDal<'_, 'a>;
}

#[derive(Clone, Debug)]
Expand Down Expand Up @@ -258,4 +262,8 @@ impl<'a> CoreDal<'a> for Connection<'a, Core> {
fn base_token_dal(&mut self) -> BaseTokenDal<'_, 'a> {
BaseTokenDal { storage: self }
}

fn processed_events_dal(&mut self) -> ProcessedEventsDal<'_, 'a> {
ProcessedEventsDal { storage: self }
}
}
1 change: 1 addition & 0 deletions core/node/eth_watch/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,3 +28,4 @@ async-recursion.workspace = true

[dev-dependencies]
zksync_concurrency.workspace = true
test-log.workspace = true
Loading
Loading