diff --git a/core/lib/config/src/configs/eth_sender.rs b/core/lib/config/src/configs/eth_sender.rs index e932cd9819b9..89f8d459a1d9 100644 --- a/core/lib/config/src/configs/eth_sender.rs +++ b/core/lib/config/src/configs/eth_sender.rs @@ -24,7 +24,7 @@ impl EthConfig { Self { sender: Some(SenderConfig { aggregated_proof_sizes: vec![1], - wait_confirmations: Some(1), + wait_confirmations: Some(10), tx_poll_period: 1, aggregate_tx_poll_period: 1, max_txs_in_flight: 30, diff --git a/core/lib/dal/.sqlx/query-2a2680234c38904e5c19df45193a8c13d04079683e09c65f7f4e76a9987e2ab4.json b/core/lib/dal/.sqlx/query-0fede71ed258790cf70d6d6a32dcf9654c06dfef57863281601c947830ad448a.json similarity index 81% rename from core/lib/dal/.sqlx/query-2a2680234c38904e5c19df45193a8c13d04079683e09c65f7f4e76a9987e2ab4.json rename to core/lib/dal/.sqlx/query-0fede71ed258790cf70d6d6a32dcf9654c06dfef57863281601c947830ad448a.json index 8b984f4939a8..cdf425de713b 100644 --- a/core/lib/dal/.sqlx/query-2a2680234c38904e5c19df45193a8c13d04079683e09c65f7f4e76a9987e2ab4.json +++ b/core/lib/dal/.sqlx/query-0fede71ed258790cf70d6d6a32dcf9654c06dfef57863281601c947830ad448a.json @@ -1,6 +1,6 @@ { "db_name": "PostgreSQL", - "query": "\n INSERT INTO\n eth_txs (\n raw_tx,\n nonce,\n tx_type,\n contract_address,\n predicted_gas_cost,\n created_at,\n updated_at,\n from_addr,\n blob_sidecar\n )\n VALUES\n ($1, $2, $3, $4, $5, NOW(), NOW(), $6, $7)\n RETURNING\n *\n ", + "query": "\n INSERT INTO\n eth_txs (\n raw_tx,\n nonce,\n tx_type,\n contract_address,\n predicted_gas_cost,\n created_at,\n updated_at,\n from_addr,\n blob_sidecar,\n is_gateway\n )\n VALUES\n ($1, $2, $3, $4, $5, NOW(), NOW(), $6, $7, $8)\n RETURNING\n *\n ", "describe": { "columns": [ { @@ -72,6 +72,11 @@ "ordinal": 13, "name": "blob_sidecar", "type_info": "Bytea" + }, + { + "ordinal": 14, + "name": "is_gateway", + "type_info": "Bool" } ], "parameters": { @@ -82,7 +87,8 @@ "Text", "Int8", "Bytea", - "Bytea" + "Bytea", + "Bool" ] }, "nullable": [ @@ -99,8 +105,9 @@ true, false, true, - true + true, + false ] }, - "hash": "2a2680234c38904e5c19df45193a8c13d04079683e09c65f7f4e76a9987e2ab4" + "hash": "0fede71ed258790cf70d6d6a32dcf9654c06dfef57863281601c947830ad448a" } diff --git a/core/lib/dal/.sqlx/query-5b7d2612dd2dd064ea0095b40669754ed7219a77459ef40cd99d7d4d0749e538.json b/core/lib/dal/.sqlx/query-5b7d2612dd2dd064ea0095b40669754ed7219a77459ef40cd99d7d4d0749e538.json new file mode 100644 index 000000000000..88bac1a36022 --- /dev/null +++ b/core/lib/dal/.sqlx/query-5b7d2612dd2dd064ea0095b40669754ed7219a77459ef40cd99d7d4d0749e538.json @@ -0,0 +1,20 @@ +{ + "db_name": "PostgreSQL", + "query": "\n SELECT\n COUNT(*)\n FROM\n eth_txs\n WHERE\n confirmed_eth_tx_history_id IS NULL\n AND is_gateway = FALSE\n ", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "count", + "type_info": "Int8" + } + ], + "parameters": { + "Left": [] + }, + "nullable": [ + null + ] + }, + "hash": "5b7d2612dd2dd064ea0095b40669754ed7219a77459ef40cd99d7d4d0749e538" +} diff --git a/core/lib/dal/.sqlx/query-6692ff6c0fbb2fc94f5cd2837a43ce80f9b2b27758651ccfc09df61a4ae8a363.json b/core/lib/dal/.sqlx/query-6692ff6c0fbb2fc94f5cd2837a43ce80f9b2b27758651ccfc09df61a4ae8a363.json index 985f998b439a..49578cd67bec 100644 --- a/core/lib/dal/.sqlx/query-6692ff6c0fbb2fc94f5cd2837a43ce80f9b2b27758651ccfc09df61a4ae8a363.json +++ b/core/lib/dal/.sqlx/query-6692ff6c0fbb2fc94f5cd2837a43ce80f9b2b27758651ccfc09df61a4ae8a363.json @@ -72,6 +72,11 @@ "ordinal": 13, "name": "blob_sidecar", "type_info": "Bytea" + }, + { + "ordinal": 14, + "name": "is_gateway", + "type_info": "Bool" } ], "parameters": { @@ -93,7 +98,8 @@ true, false, true, - true + true, + false ] }, "hash": "6692ff6c0fbb2fc94f5cd2837a43ce80f9b2b27758651ccfc09df61a4ae8a363" diff --git a/core/lib/dal/.sqlx/query-6bb5eab89be2b08a08c00b5cd8d725208b0ecfe8065c8f893ff38c49072a21fc.json b/core/lib/dal/.sqlx/query-a71a87d91dcf0f624dbd64eb8828f65ff83204ebab2ea31847ae305a098823b0.json similarity index 70% rename from core/lib/dal/.sqlx/query-6bb5eab89be2b08a08c00b5cd8d725208b0ecfe8065c8f893ff38c49072a21fc.json rename to core/lib/dal/.sqlx/query-a71a87d91dcf0f624dbd64eb8828f65ff83204ebab2ea31847ae305a098823b0.json index 71318c9a1023..28058b9e42a7 100644 --- a/core/lib/dal/.sqlx/query-6bb5eab89be2b08a08c00b5cd8d725208b0ecfe8065c8f893ff38c49072a21fc.json +++ b/core/lib/dal/.sqlx/query-a71a87d91dcf0f624dbd64eb8828f65ff83204ebab2ea31847ae305a098823b0.json @@ -1,6 +1,6 @@ { "db_name": "PostgreSQL", - "query": "\n SELECT\n *\n FROM\n eth_txs\n WHERE\n from_addr IS NOT DISTINCT FROM $1 -- can't just use equality as NULL != NULL\n AND confirmed_eth_tx_history_id IS NULL\n AND id <= (\n SELECT\n COALESCE(MAX(eth_tx_id), 0)\n FROM\n eth_txs_history\n JOIN eth_txs ON eth_txs.id = eth_txs_history.eth_tx_id\n WHERE\n eth_txs_history.sent_at_block IS NOT NULL\n AND eth_txs.from_addr IS NOT DISTINCT FROM $1\n )\n ORDER BY\n id\n ", + "query": "\n SELECT\n *\n FROM\n eth_txs\n WHERE\n from_addr IS NOT DISTINCT FROM $1 -- can't just use equality as NULL != NULL\n AND confirmed_eth_tx_history_id IS NULL\n AND is_gateway = $2\n AND id <= (\n SELECT\n COALESCE(MAX(eth_tx_id), 0)\n FROM\n eth_txs_history\n JOIN eth_txs ON eth_txs.id = eth_txs_history.eth_tx_id\n WHERE\n eth_txs_history.sent_at_block IS NOT NULL\n AND eth_txs.from_addr IS NOT DISTINCT FROM $1\n AND is_gateway = $2\n )\n ORDER BY\n id\n ", "describe": { "columns": [ { @@ -72,11 +72,17 @@ "ordinal": 13, "name": "blob_sidecar", "type_info": "Bytea" + }, + { + "ordinal": 14, + "name": "is_gateway", + "type_info": "Bool" } ], "parameters": { "Left": [ - "Bytea" + "Bytea", + "Bool" ] }, "nullable": [ @@ -93,8 +99,9 @@ true, false, true, - true + true, + false ] }, - "hash": "6bb5eab89be2b08a08c00b5cd8d725208b0ecfe8065c8f893ff38c49072a21fc" + "hash": "a71a87d91dcf0f624dbd64eb8828f65ff83204ebab2ea31847ae305a098823b0" } diff --git a/core/lib/dal/.sqlx/query-4570e9ffd0b2973d0bc2986c391d0a59076dda4aa572ade2492f37e537fdf6ed.json b/core/lib/dal/.sqlx/query-eab36591af61369e36e3dab79025ac6758a0a4e367f93a9bd48ec82c51e09755.json similarity index 68% rename from core/lib/dal/.sqlx/query-4570e9ffd0b2973d0bc2986c391d0a59076dda4aa572ade2492f37e537fdf6ed.json rename to core/lib/dal/.sqlx/query-eab36591af61369e36e3dab79025ac6758a0a4e367f93a9bd48ec82c51e09755.json index 7297bcdcad23..fb6ea1d2d3e5 100644 --- a/core/lib/dal/.sqlx/query-4570e9ffd0b2973d0bc2986c391d0a59076dda4aa572ade2492f37e537fdf6ed.json +++ b/core/lib/dal/.sqlx/query-eab36591af61369e36e3dab79025ac6758a0a4e367f93a9bd48ec82c51e09755.json @@ -1,6 +1,6 @@ { "db_name": "PostgreSQL", - "query": "\n SELECT\n *\n FROM\n eth_txs\n WHERE\n from_addr IS NOT DISTINCT FROM $2 -- can't just use equality as NULL != NULL\n AND id > (\n SELECT\n COALESCE(MAX(eth_tx_id), 0)\n FROM\n eth_txs_history\n JOIN eth_txs ON eth_txs.id = eth_txs_history.eth_tx_id\n WHERE\n eth_txs_history.sent_at_block IS NOT NULL\n AND eth_txs.from_addr IS NOT DISTINCT FROM $2\n )\n ORDER BY\n id\n LIMIT\n $1\n ", + "query": "\n SELECT\n *\n FROM\n eth_txs\n WHERE\n from_addr IS NOT DISTINCT FROM $2 -- can't just use equality as NULL != NULL\n AND is_gateway = $3\n AND id > (\n SELECT\n COALESCE(MAX(eth_tx_id), 0)\n FROM\n eth_txs_history\n JOIN eth_txs ON eth_txs.id = eth_txs_history.eth_tx_id\n WHERE\n eth_txs_history.sent_at_block IS NOT NULL\n AND eth_txs.from_addr IS NOT DISTINCT FROM $2\n AND is_gateway = $3\n )\n ORDER BY\n id\n LIMIT\n $1\n ", "describe": { "columns": [ { @@ -72,12 +72,18 @@ "ordinal": 13, "name": "blob_sidecar", "type_info": "Bytea" + }, + { + "ordinal": 14, + "name": "is_gateway", + "type_info": "Bool" } ], "parameters": { "Left": [ "Int8", - "Bytea" + "Bytea", + "Bool" ] }, "nullable": [ @@ -94,8 +100,9 @@ true, false, true, - true + true, + false ] }, - "hash": "4570e9ffd0b2973d0bc2986c391d0a59076dda4aa572ade2492f37e537fdf6ed" + "hash": "eab36591af61369e36e3dab79025ac6758a0a4e367f93a9bd48ec82c51e09755" } diff --git a/core/lib/dal/migrations/20240803083814_add_is_gateway_column_to_eth_txs.down.sql b/core/lib/dal/migrations/20240803083814_add_is_gateway_column_to_eth_txs.down.sql new file mode 100644 index 000000000000..02fbc8cb075d --- /dev/null +++ b/core/lib/dal/migrations/20240803083814_add_is_gateway_column_to_eth_txs.down.sql @@ -0,0 +1 @@ +ALTER TABLE eth_txs DROP COLUMN is_gateway; diff --git a/core/lib/dal/migrations/20240803083814_add_is_gateway_column_to_eth_txs.up.sql b/core/lib/dal/migrations/20240803083814_add_is_gateway_column_to_eth_txs.up.sql new file mode 100644 index 000000000000..af1ef835cf37 --- /dev/null +++ b/core/lib/dal/migrations/20240803083814_add_is_gateway_column_to_eth_txs.up.sql @@ -0,0 +1 @@ +ALTER TABLE eth_txs ADD COLUMN is_gateway BOOLEAN NOT NULL DEFAULT FALSE; diff --git a/core/lib/dal/src/blocks_dal.rs b/core/lib/dal/src/blocks_dal.rs index dbb56b42a463..60956101a8c5 100644 --- a/core/lib/dal/src/blocks_dal.rs +++ b/core/lib/dal/src/blocks_dal.rs @@ -2548,7 +2548,16 @@ mod tests { async fn save_mock_eth_tx(action_type: AggregatedActionType, conn: &mut Connection<'_, Core>) { conn.eth_sender_dal() - .save_eth_tx(1, vec![], action_type, Address::default(), 1, None, None) + .save_eth_tx( + 1, + vec![], + action_type, + Address::default(), + 1, + None, + None, + false, + ) .await .unwrap(); } diff --git a/core/lib/dal/src/blocks_web3_dal.rs b/core/lib/dal/src/blocks_web3_dal.rs index 54ea7cc11f16..36a4acc0a6db 100644 --- a/core/lib/dal/src/blocks_web3_dal.rs +++ b/core/lib/dal/src/blocks_web3_dal.rs @@ -977,6 +977,7 @@ mod tests { 0, None, None, + false, ) .await .unwrap(); diff --git a/core/lib/dal/src/eth_sender_dal.rs b/core/lib/dal/src/eth_sender_dal.rs index f1ff515f506e..eb7e1cd642c1 100644 --- a/core/lib/dal/src/eth_sender_dal.rs +++ b/core/lib/dal/src/eth_sender_dal.rs @@ -25,6 +25,7 @@ impl EthSenderDal<'_, '_> { pub async fn get_inflight_txs( &mut self, operator_address: Option
, + is_gateway: bool, ) -> sqlx::Result> { let txs = sqlx::query_as!( StorageEthTx, @@ -36,6 +37,7 @@ impl EthSenderDal<'_, '_> { WHERE from_addr IS NOT DISTINCT FROM $1 -- can't just use equality as NULL != NULL AND confirmed_eth_tx_history_id IS NULL + AND is_gateway = $2 AND id <= ( SELECT COALESCE(MAX(eth_tx_id), 0) @@ -45,17 +47,40 @@ impl EthSenderDal<'_, '_> { WHERE eth_txs_history.sent_at_block IS NOT NULL AND eth_txs.from_addr IS NOT DISTINCT FROM $1 + AND is_gateway = $2 ) ORDER BY id "#, operator_address.as_ref().map(|h160| h160.as_bytes()), + is_gateway ) .fetch_all(self.storage.conn()) .await?; Ok(txs.into_iter().map(|tx| tx.into()).collect()) } + pub async fn get_non_gateway_inflight_txs_count_for_gateway_migration( + &mut self, + ) -> sqlx::Result { + let count = sqlx::query!( + r#" + SELECT + COUNT(*) + FROM + eth_txs + WHERE + confirmed_eth_tx_history_id IS NULL + AND is_gateway = FALSE + "# + ) + .fetch_one(self.storage.conn()) + .await? + .count + .unwrap(); + Ok(count.try_into().unwrap()) + } + pub async fn get_eth_l1_batches(&mut self) -> sqlx::Result { struct EthTxRow { number: i64, @@ -132,6 +157,7 @@ impl EthSenderDal<'_, '_> { &mut self, limit: u64, operator_address: &Option
, + is_gateway: bool, ) -> sqlx::Result> { let txs = sqlx::query_as!( StorageEthTx, @@ -142,6 +168,7 @@ impl EthSenderDal<'_, '_> { eth_txs WHERE from_addr IS NOT DISTINCT FROM $2 -- can't just use equality as NULL != NULL + AND is_gateway = $3 AND id > ( SELECT COALESCE(MAX(eth_tx_id), 0) @@ -151,6 +178,7 @@ impl EthSenderDal<'_, '_> { WHERE eth_txs_history.sent_at_block IS NOT NULL AND eth_txs.from_addr IS NOT DISTINCT FROM $2 + AND is_gateway = $3 ) ORDER BY id @@ -159,6 +187,7 @@ impl EthSenderDal<'_, '_> { "#, limit as i64, operator_address.as_ref().map(|h160| h160.as_bytes()), + is_gateway ) .fetch_all(self.storage.conn()) .await?; @@ -202,6 +231,7 @@ impl EthSenderDal<'_, '_> { predicted_gas_cost: u32, from_address: Option
, blob_sidecar: Option, + is_gateway: bool, ) -> sqlx::Result { let address = format!("{:#x}", contract_address); let eth_tx = sqlx::query_as!( @@ -217,10 +247,11 @@ impl EthSenderDal<'_, '_> { created_at, updated_at, from_addr, - blob_sidecar + blob_sidecar, + is_gateway ) VALUES - ($1, $2, $3, $4, $5, NOW(), NOW(), $6, $7) + ($1, $2, $3, $4, $5, NOW(), NOW(), $6, $7, $8) RETURNING * "#, @@ -232,6 +263,7 @@ impl EthSenderDal<'_, '_> { from_address.as_ref().map(Address::as_bytes), blob_sidecar.map(|sidecar| bincode::serialize(&sidecar) .expect("can always bincode serialize EthTxBlobSidecar; qed")), + is_gateway, ) .fetch_one(self.storage.conn()) .await?; diff --git a/core/lib/dal/src/models/storage_eth_tx.rs b/core/lib/dal/src/models/storage_eth_tx.rs index 2654ffe0e0a7..c721f938838e 100644 --- a/core/lib/dal/src/models/storage_eth_tx.rs +++ b/core/lib/dal/src/models/storage_eth_tx.rs @@ -29,6 +29,7 @@ pub struct StorageEthTx { // // Format a `bincode`-encoded `EthTxBlobSidecar` enum. pub blob_sidecar: Option>, + pub is_gateway: bool, } #[derive(Debug, Default)] @@ -83,6 +84,7 @@ impl From for EthTx { blob_sidecar: tx.blob_sidecar.map(|b| { bincode::deserialize(&b).expect("EthTxBlobSidecar is encoded correctly; qed") }), + is_gateway: tx.is_gateway, } } } diff --git a/core/lib/eth_client/src/clients/mock.rs b/core/lib/eth_client/src/clients/mock.rs index 46ad5dc5310e..b33554b6292c 100644 --- a/core/lib/eth_client/src/clients/mock.rs +++ b/core/lib/eth_client/src/clients/mock.rs @@ -107,7 +107,7 @@ impl MockSettlementLayerInner { self.block_number += confirmations; let nonce = self.current_nonce; self.current_nonce += 1; - tracing::info!("Executing tx with hash {tx_hash:?}, success: {success}, current nonce: {}, confirmations: {confirmations}", self.current_nonce); + tracing::info!("Executing tx with hash {tx_hash:?} at block {}, success: {success}, current nonce: {}, confirmations: {confirmations}", self.block_number - confirmations, self.current_nonce); let tx_nonce = self.sent_txs[&tx_hash].nonce; if non_ordering_confirmations { diff --git a/core/lib/types/src/eth_sender.rs b/core/lib/types/src/eth_sender.rs index bab57165b3dc..09ea915283eb 100644 --- a/core/lib/types/src/eth_sender.rs +++ b/core/lib/types/src/eth_sender.rs @@ -51,6 +51,7 @@ pub struct EthTx { /// this transaction. If it is set to `None` this transaction was sent by the main operator. pub from_addr: Option
, pub blob_sidecar: Option, + pub is_gateway: bool, } impl std::fmt::Debug for EthTx { diff --git a/core/node/eth_sender/src/abstract_l1_interface.rs b/core/node/eth_sender/src/abstract_l1_interface.rs index 1f1956c9dd84..312f483fd29e 100644 --- a/core/node/eth_sender/src/abstract_l1_interface.rs +++ b/core/node/eth_sender/src/abstract_l1_interface.rs @@ -9,7 +9,6 @@ use zksync_eth_client::{ #[cfg(test)] use zksync_types::web3; use zksync_types::{ - aggregated_operations::AggregatedActionType, eth_sender::{EthTx, EthTxBlobSidecar}, web3::{BlockId, BlockNumber}, Address, L1BlockNumber, Nonce, EIP_1559_TX_TYPE, EIP_4844_TX_TYPE, H256, U256, @@ -37,14 +36,25 @@ pub(crate) struct L1BlockNumbers { pub(crate) enum OperatorType { NonBlob, Blob, + Gateway, } #[async_trait] pub(super) trait AbstractL1Interface: 'static + Sync + Send + fmt::Debug { - async fn failure_reason(&self, tx_hash: H256) -> Option; + fn supported_operator_types(&self) -> Vec; + + async fn failure_reason( + &self, + tx_hash: H256, + operator_type: OperatorType, + ) -> Option; #[cfg(test)] - async fn get_tx(&self, tx_hash: H256) -> EnrichedClientResult>; + async fn get_tx( + &self, + tx_hash: H256, + operator_type: OperatorType, + ) -> EnrichedClientResult>; async fn get_tx_status( &self, @@ -73,50 +83,77 @@ pub(super) trait AbstractL1Interface: 'static + Sync + Send + fmt::Debug { priority_fee_per_gas: u64, blob_gas_price: Option, max_aggregated_tx_gas: U256, + operator_type: OperatorType, ) -> SignedCallResult; - async fn get_l1_block_numbers(&self) -> Result; - - fn ethereum_gateway(&self) -> &dyn BoundEthInterface; - - fn ethereum_gateway_blobs(&self) -> Option<&dyn BoundEthInterface>; + async fn get_l1_block_numbers( + &self, + operator_type: OperatorType, + ) -> Result; } #[derive(Debug)] pub(super) struct RealL1Interface { - pub ethereum_gateway: Box, + pub ethereum_gateway: Option>, pub ethereum_gateway_blobs: Option>, + pub l2_gateway: Option>, pub wait_confirmations: Option, } impl RealL1Interface { - pub(crate) fn query_client(&self) -> &dyn EthInterface { - self.ethereum_gateway().as_ref() + fn query_client(&self, operator_type: OperatorType) -> &dyn EthInterface { + match operator_type { + OperatorType::NonBlob => self.ethereum_gateway.as_deref().unwrap().as_ref(), + OperatorType::Blob => self.ethereum_gateway_blobs.as_deref().unwrap().as_ref(), + OperatorType::Gateway => self.l2_gateway.as_deref().unwrap().as_ref(), + } } - pub(crate) fn query_client_for_operator( - &self, - operator_type: OperatorType, - ) -> &dyn EthInterface { - if operator_type == OperatorType::Blob { - self.ethereum_gateway_blobs().unwrap().as_ref() - } else { - self.ethereum_gateway().as_ref() + fn bound_query_client(&self, operator_type: OperatorType) -> &dyn BoundEthInterface { + match operator_type { + OperatorType::NonBlob => self.ethereum_gateway.as_deref().unwrap(), + OperatorType::Blob => self.ethereum_gateway_blobs.as_deref().unwrap(), + OperatorType::Gateway => self.l2_gateway.as_deref().unwrap(), } } } #[async_trait] impl AbstractL1Interface for RealL1Interface { - async fn failure_reason(&self, tx_hash: H256) -> Option { - self.query_client().failure_reason(tx_hash).await.expect( - "Tx is already failed, it's safe to fail here and apply the status on the next run", - ) + fn supported_operator_types(&self) -> Vec { + let mut result = vec![]; + if self.l2_gateway.is_some() { + result.push(OperatorType::Gateway); + } + if self.ethereum_gateway_blobs.is_some() { + result.push(OperatorType::Blob) + } + if self.ethereum_gateway.is_some() { + result.push(OperatorType::NonBlob); + } + result + } + + async fn failure_reason( + &self, + tx_hash: H256, + operator_type: OperatorType, + ) -> Option { + self.query_client(operator_type) + .failure_reason(tx_hash) + .await + .expect( + "Tx is already failed, it's safe to fail here and apply the status on the next run", + ) } #[cfg(test)] - async fn get_tx(&self, tx_hash: H256) -> EnrichedClientResult> { - self.query_client().get_tx(tx_hash).await + async fn get_tx( + &self, + tx_hash: H256, + operator_type: OperatorType, + ) -> EnrichedClientResult> { + self.query_client(operator_type).get_tx(tx_hash).await } async fn get_tx_status( @@ -124,7 +161,7 @@ impl AbstractL1Interface for RealL1Interface { tx_hash: H256, operator_type: OperatorType, ) -> Result, EthSenderError> { - self.query_client_for_operator(operator_type) + self.query_client(operator_type) .get_tx_status(tx_hash) .await .map_err(Into::into) @@ -135,13 +172,12 @@ impl AbstractL1Interface for RealL1Interface { tx_bytes: RawTransactionBytes, operator_type: OperatorType, ) -> EnrichedClientResult { - self.query_client_for_operator(operator_type) - .send_raw_tx(tx_bytes) - .await + self.query_client(operator_type).send_raw_tx(tx_bytes).await } fn get_blobs_operator_account(&self) -> Option
{ - self.ethereum_gateway_blobs() + self.ethereum_gateway_blobs + .as_deref() .as_ref() .map(|s| s.sender_account()) } @@ -151,27 +187,20 @@ impl AbstractL1Interface for RealL1Interface { block_numbers: L1BlockNumbers, operator_type: OperatorType, ) -> Result, EthSenderError> { - let gateway = match operator_type { - OperatorType::NonBlob => Some(self.ethereum_gateway()), - OperatorType::Blob => self.ethereum_gateway_blobs(), - }; - match gateway { - None => Ok(None), - Some(gateway) => { - let finalized = gateway - .nonce_at(block_numbers.finalized.0.into()) - .await? - .as_u32() - .into(); - - let latest = gateway - .nonce_at(block_numbers.latest.0.into()) - .await? - .as_u32() - .into(); - Ok(Some(OperatorNonce { finalized, latest })) - } - } + let finalized = self + .bound_query_client(operator_type) + .nonce_at(block_numbers.finalized.0.into()) + .await? + .as_u32() + .into(); + + let latest = self + .bound_query_client(operator_type) + .nonce_at(block_numbers.latest.0.into()) + .await? + .as_u32() + .into(); + Ok(Some(OperatorNonce { finalized, latest })) } async fn sign_tx( @@ -181,22 +210,9 @@ impl AbstractL1Interface for RealL1Interface { priority_fee_per_gas: u64, blob_gas_price: Option, max_aggregated_tx_gas: U256, + operator_type: OperatorType, ) -> SignedCallResult { - // Chose the signing gateway. Use a custom one in case - // the operator is in 4844 mode and the operation at hand is Commit. - // then the optional gateway is used to send this transaction from a - // custom sender account. - let signing_gateway = if let Some(blobs_gateway) = self.ethereum_gateway_blobs() { - if tx.tx_type == AggregatedActionType::Commit { - blobs_gateway - } else { - self.ethereum_gateway() - } - } else { - self.ethereum_gateway() - }; - - signing_gateway + self.bound_query_client(operator_type) .sign_prepared_tx_for_addr( tx.raw_tx.clone(), tx.contract_address, @@ -206,34 +222,40 @@ impl AbstractL1Interface for RealL1Interface { opt.max_fee_per_gas = Some(U256::from(base_fee_per_gas + priority_fee_per_gas)); opt.max_priority_fee_per_gas = Some(U256::from(priority_fee_per_gas)); opt.nonce = Some(tx.nonce.0.into()); - opt.transaction_type = if tx.blob_sidecar.is_some() { + opt.transaction_type = Some(EIP_1559_TX_TYPE.into()); + if tx.blob_sidecar.is_some() { + opt.transaction_type = Some(EIP_4844_TX_TYPE.into()); opt.max_fee_per_blob_gas = blob_gas_price; - Some(EIP_4844_TX_TYPE.into()) - } else { - Some(EIP_1559_TX_TYPE.into()) - }; - opt.blob_versioned_hashes = tx.blob_sidecar.as_ref().map(|s| match s { - EthTxBlobSidecar::EthTxBlobSidecarV1(s) => s - .blobs - .iter() - .map(|blob| H256::from_slice(&blob.versioned_hash)) - .collect(), - }); + opt.blob_versioned_hashes = tx.blob_sidecar.as_ref().map(|s| match s { + EthTxBlobSidecar::EthTxBlobSidecarV1(s) => s + .blobs + .iter() + .map(|blob| H256::from_slice(&blob.versioned_hash)) + .collect(), + }); + } }), ) .await .expect("Failed to sign transaction") } - async fn get_l1_block_numbers(&self) -> Result { + async fn get_l1_block_numbers( + &self, + operator_type: OperatorType, + ) -> Result { let (finalized, safe) = if let Some(confirmations) = self.wait_confirmations { - let latest_block_number = self.query_client().block_number().await?.as_u64(); + let latest_block_number: u64 = self + .query_client(operator_type) + .block_number() + .await? + .as_u64(); let finalized = (latest_block_number.saturating_sub(confirmations) as u32).into(); (finalized, finalized) } else { let finalized = self - .query_client() + .query_client(operator_type) .block(BlockId::Number(BlockNumber::Finalized)) .await? .expect("Finalized block must be present on L1") @@ -243,7 +265,7 @@ impl AbstractL1Interface for RealL1Interface { .into(); let safe = self - .query_client() + .query_client(operator_type) .block(BlockId::Number(BlockNumber::Safe)) .await? .expect("Safe block must be present on L1") @@ -254,7 +276,12 @@ impl AbstractL1Interface for RealL1Interface { (finalized, safe) }; - let latest = self.query_client().block_number().await?.as_u32().into(); + let latest = self + .query_client(operator_type) + .block_number() + .await? + .as_u32() + .into(); Ok(L1BlockNumbers { finalized, @@ -262,12 +289,4 @@ impl AbstractL1Interface for RealL1Interface { safe, }) } - - fn ethereum_gateway(&self) -> &dyn BoundEthInterface { - self.ethereum_gateway.as_ref() - } - - fn ethereum_gateway_blobs(&self) -> Option<&dyn BoundEthInterface> { - self.ethereum_gateway_blobs.as_deref() - } } diff --git a/core/node/eth_sender/src/eth_fees_oracle.rs b/core/node/eth_sender/src/eth_fees_oracle.rs index 89d10bc2b1e5..271a33d49c32 100644 --- a/core/node/eth_sender/src/eth_fees_oracle.rs +++ b/core/node/eth_sender/src/eth_fees_oracle.rs @@ -5,30 +5,32 @@ use std::{ }; use zksync_eth_client::{ClientError, EnrichedClientError}; -use zksync_node_fee_model::l1_gas_price::L1TxParamsProvider; +use zksync_node_fee_model::l1_gas_price::TxParamsProvider; use zksync_types::eth_sender::TxHistory; -use crate::EthSenderError; +use crate::{abstract_l1_interface::OperatorType, EthSenderError}; #[derive(Debug)] pub(crate) struct EthFees { pub(crate) base_fee_per_gas: u64, pub(crate) priority_fee_per_gas: u64, pub(crate) blob_base_fee_per_gas: Option, + #[allow(dead_code)] + pub(crate) pubdata_price: Option, } pub(crate) trait EthFeesOracle: 'static + Sync + Send + fmt::Debug { fn calculate_fees( &self, previous_sent_tx: &Option, - has_blob_sidecar: bool, time_in_mempool: u32, + operator_type: OperatorType, ) -> Result; } #[derive(Debug)] pub(crate) struct GasAdjusterFeesOracle { - pub gas_adjuster: Arc, + pub gas_adjuster: Arc, pub max_acceptable_priority_fee_in_gwei: u64, } @@ -53,12 +55,14 @@ impl GasAdjusterFeesOracle { previous_sent_tx.blob_base_fee_per_gas.map(|v| v * 2), blob_base_fee_per_gas, ), + pubdata_price: None, }); } Ok(EthFees { base_fee_per_gas, priority_fee_per_gas, blob_base_fee_per_gas, + pubdata_price: None, }) } @@ -105,6 +109,7 @@ impl GasAdjusterFeesOracle { base_fee_per_gas, blob_base_fee_per_gas: None, priority_fee_per_gas, + pubdata_price: None, }) } @@ -143,9 +148,10 @@ impl EthFeesOracle for GasAdjusterFeesOracle { fn calculate_fees( &self, previous_sent_tx: &Option, - has_blob_sidecar: bool, time_in_mempool: u32, + operator_type: OperatorType, ) -> Result { + let has_blob_sidecar = operator_type == OperatorType::Blob; if has_blob_sidecar { self.calculate_fees_with_blob_sidecar(previous_sent_tx) } else { diff --git a/core/node/eth_sender/src/eth_tx_aggregator.rs b/core/node/eth_sender/src/eth_tx_aggregator.rs index 9ec79dfc300b..312e9d31e9ff 100644 --- a/core/node/eth_sender/src/eth_tx_aggregator.rs +++ b/core/node/eth_sender/src/eth_tx_aggregator.rs @@ -356,7 +356,7 @@ impl EthTxAggregator { .await { let tx = self - .save_eth_tx(storage, &agg_op, contracts_are_pre_shared_bridge) + .save_eth_tx(storage, &agg_op, contracts_are_pre_shared_bridge, false) .await?; Self::report_eth_tx_saving(storage, &agg_op, &tx).await; } @@ -521,6 +521,7 @@ impl EthTxAggregator { storage: &mut Connection<'_, Core>, aggregated_op: &AggregatedOperation, contracts_are_pre_shared_bridge: bool, + is_gateway: bool, ) -> Result { let mut transaction = storage.start_transaction().await.unwrap(); let op_type = aggregated_op.get_action_type(); @@ -553,6 +554,7 @@ impl EthTxAggregator { eth_tx_predicted_gas, sender_addr, encoded_aggregated_op.sidecar, + is_gateway, ) .await .unwrap(); diff --git a/core/node/eth_sender/src/eth_tx_manager.rs b/core/node/eth_sender/src/eth_tx_manager.rs index 79a9b1dfdb58..a97aed88a0a5 100644 --- a/core/node/eth_sender/src/eth_tx_manager.rs +++ b/core/node/eth_sender/src/eth_tx_manager.rs @@ -6,7 +6,7 @@ use zksync_dal::{Connection, ConnectionPool, Core, CoreDal}; use zksync_eth_client::{ encode_blob_tx_with_sidecar, BoundEthInterface, ExecutedTxStatus, RawTransactionBytes, }; -use zksync_node_fee_model::l1_gas_price::L1TxParamsProvider; +use zksync_node_fee_model::l1_gas_price::TxParamsProvider; use zksync_shared_metrics::BlockL1Stage; use zksync_types::{eth_sender::EthTx, Address, L1BlockNumber, H256, U256}; use zksync_utils::time::seconds_since_epoch; @@ -37,11 +37,12 @@ impl EthTxManager { pub fn new( pool: ConnectionPool, config: SenderConfig, - gas_adjuster: Arc, - ethereum_gateway: Box, + gas_adjuster: Arc, + ethereum_gateway: Option>, ethereum_gateway_blobs: Option>, + l2_gateway: Option>, ) -> Self { - let ethereum_gateway = ethereum_gateway.for_component("eth_tx_manager"); + let ethereum_gateway = ethereum_gateway.map(|eth| eth.for_component("eth_tx_manager")); let ethereum_gateway_blobs = ethereum_gateway_blobs.map(|eth| eth.for_component("eth_tx_manager")); let fees_oracle = GasAdjusterFeesOracle { @@ -52,6 +53,7 @@ impl EthTxManager { l1_interface: Box::new(RealL1Interface { ethereum_gateway, ethereum_gateway_blobs, + l2_gateway, wait_confirmations: config.wait_confirmations, }), config, @@ -77,18 +79,12 @@ impl EthTxManager { .await .unwrap() { - let operator_type = if op.blob_sidecar.is_some() { - OperatorType::Blob - } else { - OperatorType::NonBlob - }; - // `status` is a Result here and we don't unwrap it with `?` // because if we do and get an `Err`, we won't finish the for loop, // which means we might miss the transaction that actually succeeded. match self .l1_interface - .get_tx_status(history_item.tx_hash, operator_type) + .get_tx_status(history_item.tx_hash, self.operator_type(op)) .await { Ok(Some(s)) => return Ok(Some(s)), @@ -118,23 +114,19 @@ impl EthTxManager { .get_last_sent_eth_tx(tx.id) .await .unwrap(); - let has_blob_sidecar = tx.blob_sidecar.is_some(); let EthFees { base_fee_per_gas, priority_fee_per_gas, blob_base_fee_per_gas, + pubdata_price: _, } = self.fees_oracle.calculate_fees( &previous_sent_tx, - has_blob_sidecar, time_in_mempool, + self.operator_type(tx), )?; - let operator_type = if tx.blob_sidecar.is_some() { - OperatorType::Blob - } else { - OperatorType::NonBlob - }; + let operator_type = self.operator_type(tx); if let Some(previous_sent_tx) = previous_sent_tx { METRICS.transaction_resent.inc(); @@ -177,7 +169,7 @@ impl EthTxManager { .observe(priority_fee_per_gas); } - let blob_gas_price = if has_blob_sidecar { + let blob_gas_price = if tx.blob_sidecar.is_some() { Some( blob_base_fee_per_gas .expect("always ready to query blob gas price for blob transactions; qed") @@ -195,6 +187,7 @@ impl EthTxManager { priority_fee_per_gas, blob_gas_price, self.config.max_aggregated_tx_gas.into(), + operator_type, ) .await; @@ -286,7 +279,10 @@ impl EthTxManager { if let Some(operator_nonce) = operator_nonce { let inflight_txs = storage .eth_sender_dal() - .get_inflight_txs(self.operator_address(operator_type)) + .get_inflight_txs( + self.operator_address(operator_type), + operator_type == OperatorType::Gateway, + ) .await .unwrap(); METRICS.number_of_inflight_txs[&operator_type].set(inflight_txs.len()); @@ -427,6 +423,16 @@ impl EthTxManager { } } + fn operator_type(&self, tx: &EthTx) -> OperatorType { + if tx.is_gateway { + OperatorType::Gateway + } else if tx.from_addr.is_none() { + OperatorType::NonBlob + } else { + OperatorType::Blob + } + } + pub async fn fail_tx( &self, storage: &mut Connection<'_, Core>, @@ -440,7 +446,7 @@ impl EthTxManager { .unwrap(); let failure_reason = self .l1_interface - .failure_reason(tx_status.receipt.transaction_hash) + .failure_reason(tx_status.receipt.transaction_hash, self.operator_type(tx)) .await; tracing::error!( @@ -513,10 +519,13 @@ impl EthTxManager { tracing::info!("Stop signal received, eth_tx_manager is shutting down"); break; } - let l1_block_numbers = self.l1_interface.get_l1_block_numbers().await?; + let l1_block_numbers = self + .l1_interface + .get_l1_block_numbers(OperatorType::Blob) + .await?; METRICS.track_block_numbers(&l1_block_numbers); - self.loop_iteration(&mut storage, l1_block_numbers).await; + self.loop_iteration(&mut storage).await; tokio::time::sleep(self.config.tx_poll_period()).await; } Ok(()) @@ -530,7 +539,10 @@ impl EthTxManager { ) { let number_inflight_txs = storage .eth_sender_dal() - .get_inflight_txs(self.operator_address(operator_type)) + .get_inflight_txs( + self.operator_address(operator_type), + operator_type == OperatorType::Gateway, + ) .await .unwrap() .len(); @@ -546,6 +558,7 @@ impl EthTxManager { .get_new_eth_txs( number_of_available_slots_for_eth_txs, &self.operator_address(operator_type), + operator_type == OperatorType::Gateway, ) .await .unwrap(); @@ -594,17 +607,46 @@ impl EthTxManager { Ok(()) } - #[tracing::instrument(skip_all, name = "EthTxManager::loop_iteration")] - pub async fn loop_iteration( + pub async fn assert_there_are_no_pre_gateway_txs_with_gateway_enabled( &mut self, storage: &mut Connection<'_, Core>, - l1_block_numbers: L1BlockNumbers, ) { - tracing::debug!("Loop iteration at block {}", l1_block_numbers.latest); - // We can treat those two operators independently as they have different nonces and + if !self + .l1_interface + .supported_operator_types() + .contains(&OperatorType::Gateway) + { + return; + } + + let inflight_count = storage + .eth_sender_dal() + .get_non_gateway_inflight_txs_count_for_gateway_migration() + .await + .unwrap(); + if inflight_count != 0 { + panic!("eth-sender was switched to gateway, but there are still {inflight_count} pre-gateway transactions in-flight!") + } + } + + #[tracing::instrument(skip_all, name = "EthTxManager::loop_iteration")] + pub async fn loop_iteration(&mut self, storage: &mut Connection<'_, Core>) { + self.assert_there_are_no_pre_gateway_txs_with_gateway_enabled(storage) + .await; + + // We can treat blob and non-blob operators independently as they have different nonces and // aggregator makes sure that corresponding Commit transaction is confirmed before creating // a PublishProof transaction - for operator_type in [OperatorType::NonBlob, OperatorType::Blob] { + for operator_type in self.l1_interface.supported_operator_types() { + let l1_block_numbers = self + .l1_interface + .get_l1_block_numbers(operator_type) + .await + .unwrap(); + tracing::info!( + "Loop iteration at block {} for {operator_type:?} operator", + l1_block_numbers.latest + ); self.send_new_eth_txs(storage, l1_block_numbers.latest, operator_type) .await; let result = self diff --git a/core/node/eth_sender/src/tester.rs b/core/node/eth_sender/src/tester.rs index 5bd5181ed8c7..508a38e61732 100644 --- a/core/node/eth_sender/src/tester.rs +++ b/core/node/eth_sender/src/tester.rs @@ -111,6 +111,7 @@ pub(crate) struct EthSenderTester { pub conn: ConnectionPool, pub gateway: Box, pub gateway_blobs: Box, + pub l2_gateway: Box, pub manager: MockEthTxManager, pub aggregator: EthTxAggregator, pub gas_adjuster: Arc, @@ -120,6 +121,7 @@ pub(crate) struct EthSenderTester { next_l1_batch_number_to_prove: L1BatchNumber, next_l1_batch_number_to_execute: L1BatchNumber, tx_sent_in_last_iteration_count: usize, + pub is_l2: bool, } impl EthSenderTester { @@ -176,6 +178,26 @@ impl EthSenderTester { gateway.advance_block_number(Self::WAIT_CONFIRMATIONS); let gateway = Box::new(gateway); + let l2_gateway: MockSettlementLayer = MockSettlementLayer::builder() + .with_fee_history( + std::iter::repeat_with(|| BaseFees { + base_fee_per_gas: 0, + base_fee_per_blob_gas: 0.into(), + l2_pubdata_price: 0.into(), + }) + .take(Self::WAIT_CONFIRMATIONS as usize) + .chain(history.clone()) + .collect(), + ) + .with_non_ordering_confirmation(non_ordering_confirmations) + .with_call_handler(move |call, _| { + assert_eq!(call.to, Some(contracts_config.l1_multicall3_addr)); + crate::tests::mock_multicall_response() + }) + .build(); + l2_gateway.advance_block_number(Self::WAIT_CONFIRMATIONS); + let l2_gateway = Box::new(l2_gateway); + let gateway_blobs = MockSettlementLayer::builder() .with_fee_history( std::iter::repeat_with(|| BaseFees { @@ -249,8 +271,9 @@ impl EthSenderTester { connection_pool.clone(), eth_sender.clone(), gas_adjuster.clone(), - gateway.clone(), + Some(gateway.clone()), Some(gateway_blobs.clone()), + None, ); let connection_pool_clone = connection_pool.clone(); @@ -264,6 +287,7 @@ impl EthSenderTester { Self { gateway, gateway_blobs, + l2_gateway, manager, aggregator, gas_adjuster, @@ -274,9 +298,23 @@ impl EthSenderTester { next_l1_batch_number_to_execute: L1BatchNumber(1), next_l1_batch_number_to_prove: L1BatchNumber(1), tx_sent_in_last_iteration_count: 0, + is_l2: false, } } + pub fn switch_to_using_gateway(&mut self) { + self.manager = EthTxManager::new( + self.conn.clone(), + EthConfig::for_tests().sender.unwrap(), + self.gas_adjuster.clone(), + None, + None, + Some(self.l2_gateway.clone()), + ); + self.is_l2 = true; + tracing::info!("Switched eth-sender tester to use Gateway!"); + } + pub async fn storage(&self) -> Connection<'_, Core> { self.conn.connection().await.unwrap() } @@ -285,7 +323,7 @@ impl EthSenderTester { let latest = self .manager .l1_interface() - .get_l1_block_numbers() + .get_l1_block_numbers(OperatorType::NonBlob) .await .unwrap() .latest; @@ -341,13 +379,18 @@ impl EthSenderTester { .get_last_sent_eth_tx_hash(l1_batch_number, operation_type) .await .unwrap(); - let (gateway, other) = if tx.blob_base_fee_per_gas.is_some() { - (self.gateway_blobs.as_ref(), self.gateway.as_ref()) + if !self.is_l2 { + let (gateway, other) = if tx.blob_base_fee_per_gas.is_some() { + (self.gateway_blobs.as_ref(), self.gateway.as_ref()) + } else { + (self.gateway.as_ref(), self.gateway_blobs.as_ref()) + }; + gateway.execute_tx(tx.tx_hash, success, confirmations); + other.advance_block_number(confirmations); } else { - (self.gateway.as_ref(), self.gateway_blobs.as_ref()) - }; - gateway.execute_tx(tx.tx_hash, success, confirmations); - other.advance_block_number(confirmations); + self.l2_gateway + .execute_tx(tx.tx_hash, success, confirmations); + } } pub async fn seal_l1_batch(&mut self) -> L1BatchHeader { @@ -407,15 +450,17 @@ impl EthSenderTester { pub async fn run_eth_sender_tx_manager_iteration_after_n_blocks(&mut self, n: u64) { self.gateway.advance_block_number(n); self.gateway_blobs.advance_block_number(n); - let tx_sent_before = self.gateway.sent_tx_count() + self.gateway_blobs.sent_tx_count(); + self.l2_gateway.advance_block_number(n); + let tx_sent_before = self.gateway.sent_tx_count() + + self.gateway_blobs.sent_tx_count() + + self.l2_gateway.sent_tx_count(); self.manager - .loop_iteration( - &mut self.conn.connection().await.unwrap(), - self.get_block_numbers().await, - ) + .loop_iteration(&mut self.conn.connection().await.unwrap()) .await; - self.tx_sent_in_last_iteration_count = - (self.gateway.sent_tx_count() + self.gateway_blobs.sent_tx_count()) - tx_sent_before; + self.tx_sent_in_last_iteration_count = (self.gateway.sent_tx_count() + + self.gateway_blobs.sent_tx_count() + + self.l2_gateway.sent_tx_count()) + - tx_sent_before; } pub async fn run_eth_sender_tx_manager_iteration(&mut self) { @@ -467,6 +512,7 @@ impl EthSenderTester { &mut self.conn.connection().await.unwrap(), &aggregated_operation, false, + self.is_l2, ) .await .unwrap() @@ -491,14 +537,18 @@ impl EthSenderTester { } pub async fn confirm_tx(&mut self, hash: H256, is_blob: bool) { - let (gateway, other) = if is_blob { - (self.gateway_blobs.as_ref(), self.gateway.as_ref()) + if !self.is_l2 { + let (gateway, other) = if is_blob { + (self.gateway_blobs.as_ref(), self.gateway.as_ref()) + } else { + (self.gateway.as_ref(), self.gateway_blobs.as_ref()) + }; + gateway.execute_tx(hash, true, EthSenderTester::WAIT_CONFIRMATIONS); + other.advance_block_number(EthSenderTester::WAIT_CONFIRMATIONS); } else { - (self.gateway.as_ref(), self.gateway_blobs.as_ref()) - }; - gateway.execute_tx(hash, true, EthSenderTester::WAIT_CONFIRMATIONS); - other.advance_block_number(EthSenderTester::WAIT_CONFIRMATIONS); - + self.l2_gateway + .execute_tx(hash, true, EthSenderTester::WAIT_CONFIRMATIONS); + } self.run_eth_sender_tx_manager_iteration().await; } @@ -543,13 +593,13 @@ impl EthSenderTester { } pub async fn assert_inflight_txs_count_equals(&mut self, value: usize) { - //sanity check - assert!(self.manager.operator_address(OperatorType::Blob).is_some()); - assert_eq!( + let inflight_count = if !self.is_l2 { + //sanity check + assert!(self.manager.operator_address(OperatorType::Blob).is_some()); self.storage() .await .eth_sender_dal() - .get_inflight_txs(self.manager.operator_address(OperatorType::NonBlob)) + .get_inflight_txs(self.manager.operator_address(OperatorType::NonBlob), false) .await .unwrap() .len() @@ -557,11 +607,22 @@ impl EthSenderTester { .storage() .await .eth_sender_dal() - .get_inflight_txs(self.manager.operator_address(OperatorType::Blob)) + .get_inflight_txs(self.manager.operator_address(OperatorType::Blob), false) .await .unwrap() - .len(), - value, + .len() + } else { + self.storage() + .await + .eth_sender_dal() + .get_inflight_txs(None, true) + .await + .unwrap() + .len() + }; + + assert_eq!( + inflight_count, value, "Unexpected number of in-flight transactions" ); } diff --git a/core/node/eth_sender/src/tests.rs b/core/node/eth_sender/src/tests.rs index 83c37dd5d0a5..e03532458f18 100644 --- a/core/node/eth_sender/src/tests.rs +++ b/core/node/eth_sender/src/tests.rs @@ -160,6 +160,7 @@ async fn resend_each_block(commitment_mode: L1BatchCommitmentMode) -> anyhow::Re &mut tester.conn.connection().await.unwrap(), &get_dummy_operation(0), false, + false, ) .await?; @@ -175,7 +176,10 @@ async fn resend_each_block(commitment_mode: L1BatchCommitmentMode) -> anyhow::Re .storage() .await .eth_sender_dal() - .get_inflight_txs(tester.manager.operator_address(OperatorType::NonBlob)) + .get_inflight_txs( + tester.manager.operator_address(OperatorType::NonBlob), + false + ) .await .unwrap() .len(), @@ -185,7 +189,7 @@ async fn resend_each_block(commitment_mode: L1BatchCommitmentMode) -> anyhow::Re let sent_tx = tester .manager .l1_interface() - .get_tx(hash) + .get_tx(hash, OperatorType::NonBlob) .await .unwrap() .expect("no transaction"); @@ -228,7 +232,10 @@ async fn resend_each_block(commitment_mode: L1BatchCommitmentMode) -> anyhow::Re .storage() .await .eth_sender_dal() - .get_inflight_txs(tester.manager.operator_address(OperatorType::NonBlob)) + .get_inflight_txs( + tester.manager.operator_address(OperatorType::NonBlob), + false + ) .await .unwrap() .len(), @@ -238,7 +245,7 @@ async fn resend_each_block(commitment_mode: L1BatchCommitmentMode) -> anyhow::Re let resent_tx = tester .manager .l1_interface() - .get_tx(resent_hash) + .get_tx(resent_hash, OperatorType::NonBlob) .await .unwrap() .expect("no transaction"); @@ -425,6 +432,67 @@ async fn transactions_are_not_resent_on_the_same_block() { tester.assert_just_sent_tx_count_equals(0).await; } +#[should_panic( + expected = "eth-sender was switched to gateway, but there are still 1 pre-gateway transactions in-flight!" +)] +#[test_log::test(tokio::test)] +async fn switching_to_gateway_while_some_transactions_were_in_flight_should_cause_panic() { + let mut tester = EthSenderTester::new( + ConnectionPool::::test_pool().await, + vec![100; 100], + true, + true, + L1BatchCommitmentMode::Rollup, + ) + .await; + + let _genesis_l1_batch = TestL1Batch::sealed(&mut tester).await; + let first_l1_batch = TestL1Batch::sealed(&mut tester).await; + + first_l1_batch.save_commit_tx(&mut tester).await; + tester.run_eth_sender_tx_manager_iteration().await; + + // sanity check + tester.assert_inflight_txs_count_equals(1).await; + + tester.switch_to_using_gateway(); + tester.run_eth_sender_tx_manager_iteration().await; +} + +#[test_log::test(tokio::test)] +async fn switching_to_gateway_works_for_most_basic_scenario() { + let mut tester = EthSenderTester::new( + ConnectionPool::::test_pool().await, + vec![100; 100], + true, + true, + L1BatchCommitmentMode::Rollup, + ) + .await; + + let _genesis_l1_batch = TestL1Batch::sealed(&mut tester).await; + let first_l1_batch = TestL1Batch::sealed(&mut tester).await; + + first_l1_batch.save_commit_tx(&mut tester).await; + tester.run_eth_sender_tx_manager_iteration().await; + + first_l1_batch.execute_commit_tx(&mut tester).await; + tester.run_eth_sender_tx_manager_iteration().await; + // sanity check + tester.assert_inflight_txs_count_equals(0).await; + + tester.switch_to_using_gateway(); + tester.run_eth_sender_tx_manager_iteration().await; + + first_l1_batch.save_prove_tx(&mut tester).await; + tester.run_eth_sender_tx_manager_iteration().await; + tester.assert_inflight_txs_count_equals(1).await; + + first_l1_batch.execute_prove_tx(&mut tester).await; + tester.run_eth_sender_tx_manager_iteration().await; + tester.assert_inflight_txs_count_equals(0).await; +} + #[test_casing(2, COMMITMENT_MODES)] #[test_log::test(tokio::test)] async fn correct_order_for_confirmations( diff --git a/core/node/fee_model/src/l1_gas_price/gas_adjuster/mod.rs b/core/node/fee_model/src/l1_gas_price/gas_adjuster/mod.rs index 244220da026f..4ed9cf1330ea 100644 --- a/core/node/fee_model/src/l1_gas_price/gas_adjuster/mod.rs +++ b/core/node/fee_model/src/l1_gas_price/gas_adjuster/mod.rs @@ -12,7 +12,7 @@ use zksync_types::{commitment::L1BatchCommitmentMode, L1_GAS_PER_PUBDATA_BYTE, U use zksync_web3_decl::client::{DynClient, L1, L2}; use self::metrics::METRICS; -use super::L1TxParamsProvider; +use super::TxParamsProvider; mod metrics; #[cfg(test)] @@ -310,7 +310,7 @@ impl GasAdjuster { } } -impl L1TxParamsProvider for GasAdjuster { +impl TxParamsProvider for GasAdjuster { // This is the method where we decide how much we are ready to pay for the // base_fee based on the number of L1 blocks the transaction has been in the mempool. // This is done in order to avoid base_fee spikes (e.g. during NFT drops) and @@ -331,21 +331,6 @@ impl L1TxParamsProvider for GasAdjuster { new_fee as u64 } - fn get_blob_base_fee(&self) -> u64 { - let a = self.config.pricing_formula_parameter_a; - let b = self.config.pricing_formula_parameter_b; - - // Use the single evaluation at zero of the following: - // Currently we use an exponential formula. - // The alternative is a linear one: - // `let scale_factor = a + b * time_in_mempool as f64;` - let scale_factor = a * b.powf(0.0); - let median = self.blob_base_fee_statistics.median(); - METRICS.median_blob_base_fee_per_gas.set(median.as_u64()); - let new_fee = median.as_u64() as f64 * scale_factor; - new_fee as u64 - } - fn get_next_block_minimal_base_fee(&self) -> u64 { let last_block_base_fee = self.base_fee_statistics.last_added_value(); @@ -379,6 +364,14 @@ impl L1TxParamsProvider for GasAdjuster { fn get_blob_tx_priority_fee(&self) -> u64 { self.get_priority_fee() * 2 } + + fn get_gateway_tx_base_fee(&self) -> u64 { + todo!() + } + + fn get_gateway_tx_pubdata_price(&self) -> u64 { + todo!() + } } /// Helper structure responsible for collecting the data about recent transactions, diff --git a/core/node/fee_model/src/l1_gas_price/mod.rs b/core/node/fee_model/src/l1_gas_price/mod.rs index 29db21bc1733..2a5d63089ca1 100644 --- a/core/node/fee_model/src/l1_gas_price/mod.rs +++ b/core/node/fee_model/src/l1_gas_price/mod.rs @@ -14,13 +14,10 @@ mod main_node_fetcher; /// mining time into account. /// /// This trait, as a bound, should only be used in components that actually sign and send transactions. -pub trait L1TxParamsProvider: fmt::Debug + 'static + Send + Sync { +pub trait TxParamsProvider: fmt::Debug + 'static + Send + Sync { /// Returns the recommended `max_fee_per_gas` value (EIP1559). fn get_base_fee(&self, time_in_mempool: u32) -> u64; - /// Returns the recommended `max_blob_fee_per_gas` value (EIP4844). - fn get_blob_base_fee(&self) -> u64; - /// Returns the recommended `max_priority_fee_per_gas` value (EIP1559). fn get_priority_fee(&self) -> u64; @@ -35,4 +32,10 @@ pub trait L1TxParamsProvider: fmt::Debug + 'static + Send + Sync { /// Returns the recommended `max_priority_fee_per_gas` value (EIP1559) for blob transaction. fn get_blob_tx_priority_fee(&self) -> u64; + + /// Returns the recommended `max_fee_per_gas` value for gateway transactions. + fn get_gateway_tx_base_fee(&self) -> u64; + + /// Returns the recommended `max_fee_per_gas` value for gateway transactions. + fn get_gateway_tx_pubdata_price(&self) -> u64; } diff --git a/core/node/node_framework/src/implementations/layers/eth_sender/manager.rs b/core/node/node_framework/src/implementations/layers/eth_sender/manager.rs index b5f8ee423138..d6989d8db72b 100644 --- a/core/node/node_framework/src/implementations/layers/eth_sender/manager.rs +++ b/core/node/node_framework/src/implementations/layers/eth_sender/manager.rs @@ -6,7 +6,10 @@ use zksync_eth_sender::EthTxManager; use crate::{ implementations::resources::{ circuit_breakers::CircuitBreakersResource, - eth_interface::{BoundEthInterfaceForBlobsResource, BoundEthInterfaceResource}, + eth_interface::{ + BoundEthInterfaceForBlobsResource, BoundEthInterfaceForL2Resource, + BoundEthInterfaceResource, + }, gas_adjuster::GasAdjusterResource, pools::{MasterPool, PoolResource, ReplicaPool}, }, @@ -27,7 +30,7 @@ use crate::{ /// - `PoolResource` /// - `BoundEthInterfaceResource` /// - `BoundEthInterfaceForBlobsResource` (optional) -/// - `L1TxParamsResource` +/// - `TxParamsResource` /// - `CircuitBreakersResource` (adds a circuit breaker) /// /// ## Adds tasks @@ -45,6 +48,7 @@ pub struct Input { pub replica_pool: PoolResource, pub eth_client: BoundEthInterfaceResource, pub eth_client_blobs: Option, + pub l2_client: Option, pub gas_adjuster: GasAdjusterResource, #[context(default)] pub circuit_breakers: CircuitBreakersResource, @@ -79,6 +83,7 @@ impl WiringLayer for EthTxManagerLayer { let eth_client = input.eth_client.0; let eth_client_blobs = input.eth_client_blobs.map(|c| c.0); + let l2_client = input.l2_client.map(|c| c.0); let config = self.eth_sender_config.sender.context("sender")?; @@ -88,8 +93,9 @@ impl WiringLayer for EthTxManagerLayer { master_pool, config, gas_adjuster, - eth_client, + Some(eth_client), eth_client_blobs, + l2_client, ); // Insert circuit breaker. diff --git a/core/node/node_framework/src/implementations/layers/l1_gas.rs b/core/node/node_framework/src/implementations/layers/l1_gas.rs index 9a4ccb8264f6..35c4bc3fc205 100644 --- a/core/node/node_framework/src/implementations/layers/l1_gas.rs +++ b/core/node/node_framework/src/implementations/layers/l1_gas.rs @@ -9,7 +9,7 @@ use crate::{ base_token_ratio_provider::BaseTokenRatioProviderResource, fee_input::{ApiFeeInputResource, SequencerFeeInputResource}, gas_adjuster::GasAdjusterResource, - l1_tx_params::L1TxParamsResource, + l1_tx_params::TxParamsResource, pools::{PoolResource, ReplicaPool}, }, wiring_layer::{WiringError, WiringLayer}, @@ -38,7 +38,7 @@ pub struct Input { pub struct Output { pub sequencer_fee_input: SequencerFeeInputResource, pub api_fee_input: ApiFeeInputResource, - pub l1_tx_params: L1TxParamsResource, + pub l1_tx_params: TxParamsResource, } impl L1GasLayer { diff --git a/core/node/node_framework/src/implementations/resources/eth_interface.rs b/core/node/node_framework/src/implementations/resources/eth_interface.rs index 5879610b75ed..24b7df327f63 100644 --- a/core/node/node_framework/src/implementations/resources/eth_interface.rs +++ b/core/node/node_framework/src/implementations/resources/eth_interface.rs @@ -46,3 +46,12 @@ impl Resource for BoundEthInterfaceForBlobsResource { "common/bound_eth_interface_for_blobs".into() } } + +#[derive(Debug, Clone)] +pub struct BoundEthInterfaceForL2Resource(pub Box); + +impl Resource for BoundEthInterfaceForL2Resource { + fn name() -> String { + "common/bound_eth_interface_for_l2".into() + } +} diff --git a/core/node/node_framework/src/implementations/resources/l1_tx_params.rs b/core/node/node_framework/src/implementations/resources/l1_tx_params.rs index 676828c39885..5cb8af5ed44c 100644 --- a/core/node/node_framework/src/implementations/resources/l1_tx_params.rs +++ b/core/node/node_framework/src/implementations/resources/l1_tx_params.rs @@ -1,20 +1,20 @@ use std::sync::Arc; -use zksync_node_fee_model::l1_gas_price::L1TxParamsProvider; +use zksync_node_fee_model::l1_gas_price::TxParamsProvider; use crate::resource::Resource; -/// A resource that provides [`L1TxParamsProvider`] implementation to the service. +/// A resource that provides [`TxParamsProvider`] implementation to the service. #[derive(Debug, Clone)] -pub struct L1TxParamsResource(pub Arc); +pub struct TxParamsResource(pub Arc); -impl Resource for L1TxParamsResource { +impl Resource for TxParamsResource { fn name() -> String { - "common/l1_tx_params".into() + "common/tx_params".into() } } -impl From> for L1TxParamsResource { +impl From> for TxParamsResource { fn from(provider: Arc) -> Self { Self(provider) }