diff --git a/core/lib/dal/.sqlx/query-5659480e5d79dab3399e35539b240e7eb9f598999c28015a504605f88bf84b33.json b/core/lib/dal/.sqlx/query-7e6e8cd0b5217616d847c0b7a62723b395d9b28ca025e6b0b1b7dc9ef93c6b81.json similarity index 79% rename from core/lib/dal/.sqlx/query-5659480e5d79dab3399e35539b240e7eb9f598999c28015a504605f88bf84b33.json rename to core/lib/dal/.sqlx/query-7e6e8cd0b5217616d847c0b7a62723b395d9b28ca025e6b0b1b7dc9ef93c6b81.json index 5948d75785b2..6e284803521a 100644 --- a/core/lib/dal/.sqlx/query-5659480e5d79dab3399e35539b240e7eb9f598999c28015a504605f88bf84b33.json +++ b/core/lib/dal/.sqlx/query-7e6e8cd0b5217616d847c0b7a62723b395d9b28ca025e6b0b1b7dc9ef93c6b81.json @@ -1,6 +1,6 @@ { "db_name": "PostgreSQL", - "query": "\n SELECT\n *\n FROM\n eth_txs\n WHERE\n id > (\n SELECT\n COALESCE(MAX(eth_tx_id), 0)\n FROM\n eth_txs_history\n )\n ORDER BY\n id\n LIMIT\n $1\n ", + "query": "\n SELECT\n *\n FROM\n eth_txs\n WHERE\n from_addr IS NOT DISTINCT FROM $2 -- can't just use equality as NULL != NULL\n AND id > (\n SELECT\n COALESCE(MAX(eth_tx_id), 0)\n FROM\n eth_txs_history\n )\n ORDER BY\n id\n LIMIT\n $1\n ", "describe": { "columns": [ { @@ -76,7 +76,8 @@ ], "parameters": { "Left": [ - "Int8" + "Int8", + "Bytea" ] }, "nullable": [ @@ -96,5 +97,5 @@ true ] }, - "hash": "5659480e5d79dab3399e35539b240e7eb9f598999c28015a504605f88bf84b33" + "hash": "7e6e8cd0b5217616d847c0b7a62723b395d9b28ca025e6b0b1b7dc9ef93c6b81" } diff --git a/core/lib/dal/.sqlx/query-23be43bf705d679ca751c89353716065fcad42c6b621efb3a135a16b477dcfd9.json b/core/lib/dal/.sqlx/query-aa92b31d0e0a2b353b66c501bf73b36b935046a9132f045ab105eaeac30c4a4d.json similarity index 75% rename from core/lib/dal/.sqlx/query-23be43bf705d679ca751c89353716065fcad42c6b621efb3a135a16b477dcfd9.json rename to core/lib/dal/.sqlx/query-aa92b31d0e0a2b353b66c501bf73b36b935046a9132f045ab105eaeac30c4a4d.json index c0e8bb9d2553..b80a10462c07 100644 --- a/core/lib/dal/.sqlx/query-23be43bf705d679ca751c89353716065fcad42c6b621efb3a135a16b477dcfd9.json +++ b/core/lib/dal/.sqlx/query-aa92b31d0e0a2b353b66c501bf73b36b935046a9132f045ab105eaeac30c4a4d.json @@ -1,6 +1,6 @@ { "db_name": "PostgreSQL", - "query": "\n SELECT\n *\n FROM\n eth_txs\n WHERE\n confirmed_eth_tx_history_id IS NULL\n AND id <= (\n SELECT\n COALESCE(MAX(eth_tx_id), 0)\n FROM\n eth_txs_history\n WHERE\n sent_at_block IS NOT NULL\n )\n ORDER BY\n id\n ", + "query": "\n SELECT\n *\n FROM\n eth_txs\n WHERE\n from_addr IS NOT DISTINCT FROM $1 -- can't just use equality as NULL != NULL\n AND confirmed_eth_tx_history_id IS NULL\n AND id <= (\n SELECT\n COALESCE(MAX(eth_tx_id), 0)\n FROM\n eth_txs_history\n WHERE\n sent_at_block IS NOT NULL\n )\n ORDER BY\n id\n ", "describe": { "columns": [ { @@ -75,7 +75,9 @@ } ], "parameters": { - "Left": [] + "Left": [ + "Bytea" + ] }, "nullable": [ false, @@ -94,5 +96,5 @@ true ] }, - "hash": "23be43bf705d679ca751c89353716065fcad42c6b621efb3a135a16b477dcfd9" + "hash": "aa92b31d0e0a2b353b66c501bf73b36b935046a9132f045ab105eaeac30c4a4d" } diff --git a/core/lib/dal/src/eth_sender_dal.rs b/core/lib/dal/src/eth_sender_dal.rs index d32ed082131e..bb27cf8c1f68 100644 --- a/core/lib/dal/src/eth_sender_dal.rs +++ b/core/lib/dal/src/eth_sender_dal.rs @@ -22,7 +22,10 @@ pub struct EthSenderDal<'a, 'c> { } impl EthSenderDal<'_, '_> { - pub async fn get_inflight_txs(&mut self) -> sqlx::Result> { + pub async fn get_inflight_txs( + &mut self, + operator_address: Option
, + ) -> sqlx::Result> { let txs = sqlx::query_as!( StorageEthTx, r#" @@ -31,7 +34,8 @@ impl EthSenderDal<'_, '_> { FROM eth_txs WHERE - confirmed_eth_tx_history_id IS NULL + from_addr IS NOT DISTINCT FROM $1 -- can't just use equality as NULL != NULL + AND confirmed_eth_tx_history_id IS NULL AND id <= ( SELECT COALESCE(MAX(eth_tx_id), 0) @@ -42,7 +46,8 @@ impl EthSenderDal<'_, '_> { ) ORDER BY id - "# + "#, + operator_address.as_ref().map(|h160| h160.as_bytes()), ) .fetch_all(self.storage.conn()) .await?; @@ -121,7 +126,11 @@ impl EthSenderDal<'_, '_> { .map(Into::into)) } - pub async fn get_new_eth_txs(&mut self, limit: u64) -> sqlx::Result> { + pub async fn get_new_eth_txs( + &mut self, + limit: u64, + operator_address: &Option
, + ) -> sqlx::Result> { let txs = sqlx::query_as!( StorageEthTx, r#" @@ -130,7 +139,8 @@ impl EthSenderDal<'_, '_> { FROM eth_txs WHERE - id > ( + from_addr IS NOT DISTINCT FROM $2 -- can't just use equality as NULL != NULL + AND id > ( SELECT COALESCE(MAX(eth_tx_id), 0) FROM @@ -141,7 +151,8 @@ impl EthSenderDal<'_, '_> { LIMIT $1 "#, - limit as i64 + limit as i64, + operator_address.as_ref().map(|h160| h160.as_bytes()), ) .fetch_all(self.storage.conn()) .await?; diff --git a/core/node/eth_sender/src/abstract_l1_interface.rs b/core/node/eth_sender/src/abstract_l1_interface.rs index e9290df2eb14..acc7c265186d 100644 --- a/core/node/eth_sender/src/abstract_l1_interface.rs +++ b/core/node/eth_sender/src/abstract_l1_interface.rs @@ -1,6 +1,7 @@ use std::fmt; use async_trait::async_trait; +use vise::{EncodeLabelSet, EncodeLabelValue}; use zksync_eth_client::{ clients::{DynClient, L1}, BoundEthInterface, EnrichedClientResult, EthInterface, ExecutedTxStatus, FailureInfo, Options, @@ -32,6 +33,13 @@ pub(crate) struct L1BlockNumbers { pub latest: L1BlockNumber, } +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, EncodeLabelSet, EncodeLabelValue)] +#[metrics(label = "type", rename_all = "snake_case")] +pub(crate) enum OperatorType { + NonBlob, + Blob, +} + #[async_trait] pub(super) trait AbstractL1Interface: 'static + Sync + Send + fmt::Debug { async fn failure_reason(&self, tx_hash: H256) -> Option; @@ -51,11 +59,7 @@ pub(super) trait AbstractL1Interface: 'static + Sync + Send + fmt::Debug { async fn get_operator_nonce( &self, block_numbers: L1BlockNumbers, - ) -> Result; - - async fn get_blobs_operator_nonce( - &self, - block_numbers: L1BlockNumbers, + operator_type: OperatorType, ) -> Result, EthSenderError>; async fn sign_tx( @@ -122,28 +126,13 @@ impl AbstractL1Interface for RealL1Interface { async fn get_operator_nonce( &self, block_numbers: L1BlockNumbers, - ) -> Result { - let finalized = self - .ethereum_gateway() - .nonce_at(block_numbers.finalized.0.into()) - .await? - .as_u32() - .into(); - - let latest = self - .ethereum_gateway() - .nonce_at(block_numbers.latest.0.into()) - .await? - .as_u32() - .into(); - Ok(OperatorNonce { finalized, latest }) - } - - async fn get_blobs_operator_nonce( - &self, - block_numbers: L1BlockNumbers, + operator_type: OperatorType, ) -> Result, EthSenderError> { - match &self.ethereum_gateway_blobs() { + let gateway = match operator_type { + OperatorType::NonBlob => Some(self.ethereum_gateway()), + OperatorType::Blob => self.ethereum_gateway_blobs(), + }; + match gateway { None => Ok(None), Some(gateway) => { let finalized = gateway diff --git a/core/node/eth_sender/src/eth_tx_manager.rs b/core/node/eth_sender/src/eth_tx_manager.rs index 8ea4bb98b15f..7e69a23c16ff 100644 --- a/core/node/eth_sender/src/eth_tx_manager.rs +++ b/core/node/eth_sender/src/eth_tx_manager.rs @@ -14,7 +14,9 @@ use zksync_utils::time::seconds_since_epoch; use super::{metrics::METRICS, EthSenderError}; use crate::{ - abstract_l1_interface::{AbstractL1Interface, L1BlockNumbers, OperatorNonce, RealL1Interface}, + abstract_l1_interface::{ + AbstractL1Interface, L1BlockNumbers, OperatorNonce, OperatorType, RealL1Interface, + }, eth_fees_oracle::{EthFees, EthFeesOracle, GasAdjusterFeesOracle}, metrics::TransactionType, }; @@ -240,57 +242,44 @@ impl EthTxManager { } } + pub(crate) fn operator_address(&self, operator_type: OperatorType) -> Option
{ + if operator_type == OperatorType::NonBlob { + None + } else { + self.l1_interface.get_blobs_operator_account() + } + } // Monitors the in-flight transactions, marks mined ones as confirmed, // returns the one that has to be resent (if there is one). - pub(super) async fn monitor_inflight_transactions( + pub(super) async fn monitor_inflight_transactions_single_operator( &mut self, storage: &mut Connection<'_, Core>, l1_block_numbers: L1BlockNumbers, + operator_type: OperatorType, ) -> Result, EthSenderError> { - METRICS.track_block_numbers(&l1_block_numbers); let operator_nonce = self .l1_interface - .get_operator_nonce(l1_block_numbers) - .await?; - - let non_blob_tx_to_resend = self - .apply_inflight_txs_statuses_and_get_first_to_resend( - storage, - l1_block_numbers, - operator_nonce, - None, - ) + .get_operator_nonce(l1_block_numbers, operator_type) .await?; - let blobs_operator_nonce = self - .l1_interface - .get_blobs_operator_nonce(l1_block_numbers) - .await?; - let blobs_operator_address = self.l1_interface.get_blobs_operator_account(); + if let Some(operator_nonce) = operator_nonce { + let inflight_txs = storage + .eth_sender_dal() + .get_inflight_txs(self.operator_address(operator_type)) + .await + .unwrap(); + METRICS.number_of_inflight_txs[&operator_type].set(inflight_txs.len()); - let mut blob_tx_to_resend = None; - if let Some(blobs_operator_nonce) = blobs_operator_nonce { - // need to check if both nonce and address are `Some` - if blobs_operator_address.is_none() { - panic!("blobs_operator_address has to be set its nonce is known; qed"); - } - blob_tx_to_resend = self + Ok(self .apply_inflight_txs_statuses_and_get_first_to_resend( storage, l1_block_numbers, - blobs_operator_nonce, - blobs_operator_address, + operator_nonce, + inflight_txs, ) - .await?; - } - - // We have to resend non-blob transactions first, otherwise in case of a temporary - // spike in activity, all Execute and PublishProof would need to wait until all commit txs - // are sent, which may take some time. We treat them as if they had higher priority. - if non_blob_tx_to_resend.is_some() { - Ok(non_blob_tx_to_resend) + .await?) } else { - Ok(blob_tx_to_resend) + Ok(None) } } @@ -299,11 +288,8 @@ impl EthTxManager { storage: &mut Connection<'_, Core>, l1_block_numbers: L1BlockNumbers, operator_nonce: OperatorNonce, - operator_address: Option
, + inflight_txs: Vec, ) -> Result, EthSenderError> { - let inflight_txs = storage.eth_sender_dal().get_inflight_txs().await.unwrap(); - METRICS.number_of_inflight_txs.set(inflight_txs.len()); - tracing::trace!( "Going through not confirmed txs. \ Block numbers: latest {}, finalized {}, \ @@ -323,10 +309,6 @@ impl EthTxManager { tx.nonce, ); - if tx.from_addr != operator_address { - continue; - } - // If the `operator_nonce.latest` <= `tx.nonce`, this means // that `tx` is not mined and we should resend it. // We only resend the first un-mined transaction. @@ -362,6 +344,12 @@ impl EthTxManager { tx.nonce, ); + tracing::info!( + "Updating status of tx {} of type {} with nonce {}", + tx.id, + tx.tx_type, + tx.nonce + ); match self.check_all_sending_attempts(storage, &tx).await { Ok(Some(tx_status)) => { self.apply_tx_status(storage, &tx, tx_status, l1_block_numbers.finalized) @@ -558,19 +546,13 @@ impl EthTxManager { tracing::info!("Stop signal received, eth_tx_manager is shutting down"); break; } + let l1_block_numbers = self.l1_interface.get_l1_block_numbers().await?; + METRICS.track_block_numbers(&l1_block_numbers); - match self.loop_iteration(&mut storage, last_known_l1_block).await { - Ok(block) => last_known_l1_block = block, - Err(e) => { - // Web3 API request failures can cause this, - // and anything more important is already properly reported. - tracing::warn!("eth_sender error {:?}", e); - if e.is_transient() { - METRICS.l1_transient_errors.inc(); - } - } + if last_known_l1_block < l1_block_numbers.latest { + self.loop_iteration(&mut storage, l1_block_numbers).await; + last_known_l1_block = l1_block_numbers.latest; } - tokio::time::sleep(self.config.tx_poll_period()).await; } Ok(()) @@ -580,10 +562,11 @@ impl EthTxManager { &mut self, storage: &mut Connection<'_, Core>, current_block: L1BlockNumber, + operator_type: OperatorType, ) { let number_inflight_txs = storage .eth_sender_dal() - .get_inflight_txs() + .get_inflight_txs(self.operator_address(operator_type)) .await .unwrap() .len(); @@ -596,7 +579,10 @@ impl EthTxManager { // Get the new eth tx and create history item for them let new_eth_tx = storage .eth_sender_dal() - .get_new_eth_txs(number_of_available_slots_for_eth_txs) + .get_new_eth_txs( + number_of_available_slots_for_eth_txs, + &self.operator_address(operator_type), + ) .await .unwrap(); @@ -606,24 +592,14 @@ impl EthTxManager { } } - #[tracing::instrument(skip(self, storage))] - async fn loop_iteration( + async fn update_statuses_and_resend_if_needed( &mut self, storage: &mut Connection<'_, Core>, - previous_block: L1BlockNumber, - ) -> Result { - let l1_block_numbers = self.l1_interface.get_l1_block_numbers().await?; - - self.send_new_eth_txs(storage, l1_block_numbers.latest) - .await; - - if l1_block_numbers.latest <= previous_block { - // Nothing to do - no new blocks were mined. - return Ok(previous_block); - } - + l1_block_numbers: L1BlockNumbers, + operator_type: OperatorType, + ) -> Result<(), EthSenderError> { if let Some((tx, sent_at_block)) = self - .monitor_inflight_transactions(storage, l1_block_numbers) + .monitor_inflight_transactions_single_operator(storage, l1_block_numbers, operator_type) .await? { // New gas price depends on the time this tx spent in mempool. @@ -634,9 +610,37 @@ impl EthTxManager { // sending new operations. let _ = self .send_eth_tx(storage, &tx, time_in_mempool, l1_block_numbers.latest) - .await; + .await?; } + Ok(()) + } - Ok(l1_block_numbers.latest) + #[tracing::instrument(skip(self, storage))] + async fn loop_iteration( + &mut self, + storage: &mut Connection<'_, Core>, + l1_block_numbers: L1BlockNumbers, + ) { + tracing::info!("Loop iteration at block {}", l1_block_numbers.latest); + // We can treat those two operators independently as they have different nonces and + // aggregator makes sure that corresponding Commit transaction is confirmed before creating + // a PublishProof transaction + for operator_type in [OperatorType::NonBlob, OperatorType::Blob] { + self.send_new_eth_txs(storage, l1_block_numbers.latest, operator_type) + .await; + let result = self + .update_statuses_and_resend_if_needed(storage, l1_block_numbers, operator_type) + .await; + + //We don't want an error in sending non-blob transactions interrupt sending blob txs + if let Err(error) = result { + // Web3 API request failures can cause this, + // and anything more important is already properly reported. + tracing::warn!("eth_sender error {:?}", error); + if error.is_transient() { + METRICS.l1_transient_errors.inc(); + } + } + } } } diff --git a/core/node/eth_sender/src/metrics.rs b/core/node/eth_sender/src/metrics.rs index 471a56b9bea5..462fe3ed6e59 100644 --- a/core/node/eth_sender/src/metrics.rs +++ b/core/node/eth_sender/src/metrics.rs @@ -8,7 +8,7 @@ use zksync_shared_metrics::{BlockL1Stage, BlockStage, APP_METRICS}; use zksync_types::{aggregated_operations::AggregatedActionType, eth_sender::EthTx}; use zksync_utils::time::seconds_since_epoch; -use crate::abstract_l1_interface::L1BlockNumbers; +use crate::abstract_l1_interface::{L1BlockNumbers, OperatorType}; #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, EncodeLabelSet, EncodeLabelValue)] #[metrics(label = "kind", rename_all = "snake_case")] @@ -98,7 +98,7 @@ pub(super) struct EthSenderMetrics { /// Last L1 block observed by the Ethereum sender. pub last_known_l1_block: Family>, /// Number of in-flight txs produced by the Ethereum sender. - pub number_of_inflight_txs: Gauge, + pub number_of_inflight_txs: Family>, #[metrics(buckets = GAS_BUCKETS)] pub l1_gas_used: Family>, #[metrics(buckets = Buckets::LATENCIES)] diff --git a/core/node/eth_sender/src/tests.rs b/core/node/eth_sender/src/tests.rs index 4853c7bb2299..45835a50c33b 100644 --- a/core/node/eth_sender/src/tests.rs +++ b/core/node/eth_sender/src/tests.rs @@ -28,8 +28,9 @@ use zksync_types::{ }; use crate::{ - abstract_l1_interface::L1BlockNumbers, aggregated_operations::AggregatedOperation, Aggregator, - EthSenderError, EthTxAggregator, EthTxManager, + abstract_l1_interface::{L1BlockNumbers, OperatorType}, + aggregated_operations::AggregatedOperation, + Aggregator, EthSenderError, EthTxAggregator, EthTxManager, }; // Alias to conveniently call static methods of `ETHSender`. @@ -332,7 +333,7 @@ async fn confirm_many( .storage() .await .eth_sender_dal() - .get_inflight_txs() + .get_inflight_txs(tester.manager.operator_address(OperatorType::NonBlob)) .await .unwrap() .len(), @@ -347,9 +348,10 @@ async fn confirm_many( let to_resend = tester .manager - .monitor_inflight_transactions( + .monitor_inflight_transactions_single_operator( &mut tester.conn.connection().await.unwrap(), tester.get_block_numbers().await, + OperatorType::NonBlob, ) .await?; @@ -359,7 +361,7 @@ async fn confirm_many( .storage() .await .eth_sender_dal() - .get_inflight_txs() + .get_inflight_txs(tester.manager.operator_address(OperatorType::NonBlob)) .await .unwrap() .len(), @@ -433,7 +435,7 @@ async fn resend_each_block(commitment_mode: L1BatchCommitmentMode) -> anyhow::Re .storage() .await .eth_sender_dal() - .get_inflight_txs() + .get_inflight_txs(tester.manager.operator_address(OperatorType::NonBlob)) .await .unwrap() .len(), @@ -461,7 +463,11 @@ async fn resend_each_block(commitment_mode: L1BatchCommitmentMode) -> anyhow::Re let (to_resend, _) = tester .manager - .monitor_inflight_transactions(&mut tester.conn.connection().await.unwrap(), block_numbers) + .monitor_inflight_transactions_single_operator( + &mut tester.conn.connection().await.unwrap(), + block_numbers, + OperatorType::NonBlob, + ) .await? .unwrap(); @@ -482,7 +488,7 @@ async fn resend_each_block(commitment_mode: L1BatchCommitmentMode) -> anyhow::Re .storage() .await .eth_sender_dal() - .get_inflight_txs() + .get_inflight_txs(tester.manager.operator_address(OperatorType::NonBlob)) .await .unwrap() .len(), @@ -568,7 +574,7 @@ async fn dont_resend_already_mined(commitment_mode: L1BatchCommitmentMode) -> an .storage() .await .eth_sender_dal() - .get_inflight_txs() + .get_inflight_txs(tester.manager.operator_address(OperatorType::NonBlob)) .await .unwrap() .len(), @@ -582,9 +588,10 @@ async fn dont_resend_already_mined(commitment_mode: L1BatchCommitmentMode) -> an let to_resend = tester .manager - .monitor_inflight_transactions( + .monitor_inflight_transactions_single_operator( &mut tester.conn.connection().await.unwrap(), tester.get_block_numbers().await, + OperatorType::NonBlob, ) .await?; @@ -594,7 +601,7 @@ async fn dont_resend_already_mined(commitment_mode: L1BatchCommitmentMode) -> an .storage() .await .eth_sender_dal() - .get_inflight_txs() + .get_inflight_txs(tester.manager.operator_address(OperatorType::NonBlob)) .await .unwrap() .len(), @@ -680,9 +687,10 @@ async fn three_scenarios(commitment_mode: L1BatchCommitmentMode) -> anyhow::Resu let (to_resend, _) = tester .manager - .monitor_inflight_transactions( + .monitor_inflight_transactions_single_operator( &mut tester.conn.connection().await.unwrap(), tester.get_block_numbers().await, + OperatorType::NonBlob, ) .await? .expect("we should be trying to resend the last tx"); @@ -693,7 +701,7 @@ async fn three_scenarios(commitment_mode: L1BatchCommitmentMode) -> anyhow::Resu .storage() .await .eth_sender_dal() - .get_inflight_txs() + .get_inflight_txs(tester.manager.operator_address(OperatorType::NonBlob)) .await .unwrap() .len(), @@ -767,9 +775,10 @@ async fn failed_eth_tx(commitment_mode: L1BatchCommitmentMode) { .execute_tx(hash, false, EthSenderTester::WAIT_CONFIRMATIONS); tester .manager - .monitor_inflight_transactions( + .monitor_inflight_transactions_single_operator( &mut tester.conn.connection().await.unwrap(), tester.get_block_numbers().await, + OperatorType::NonBlob, ) .await .unwrap(); @@ -1253,9 +1262,20 @@ async fn confirm_tx(tester: &mut EthSenderTester, hash: H256) { .execute_tx(hash, true, EthSenderTester::WAIT_CONFIRMATIONS); tester .manager - .monitor_inflight_transactions( + .monitor_inflight_transactions_single_operator( + &mut tester.conn.connection().await.unwrap(), + tester.get_block_numbers().await, + OperatorType::NonBlob, + ) + .await + .unwrap(); + + tester + .manager + .monitor_inflight_transactions_single_operator( &mut tester.conn.connection().await.unwrap(), tester.get_block_numbers().await, + OperatorType::Blob, ) .await .unwrap();