From 80003ba1a22069ebecf8e70090291ca71b804192 Mon Sep 17 00:00:00 2001 From: tomg10 Date: Thu, 27 Jun 2024 17:30:39 +0200 Subject: [PATCH 1/8] handle transactions for different operators separately Signed-off-by: tomg10 --- ...ee0993b94cf574784b8b117eaacf37579328.json} | 8 +- ...3c2f1906015389191f50d0a91bcb40c30e42.json} | 7 +- core/lib/dal/src/eth_sender_dal.rs | 23 ++- .../eth_sender/src/abstract_l1_interface.rs | 41 ++--- core/node/eth_sender/src/eth_tx_manager.rs | 144 ++++++++++-------- core/node/eth_sender/src/metrics.rs | 4 +- core/node/eth_sender/src/tests.rs | 50 ++++-- 7 files changed, 158 insertions(+), 119 deletions(-) rename core/lib/dal/.sqlx/{query-23be43bf705d679ca751c89353716065fcad42c6b621efb3a135a16b477dcfd9.json => query-49e09e7ae0e7671aaaacb5a81c97ee0993b94cf574784b8b117eaacf37579328.json} (77%) rename core/lib/dal/.sqlx/{query-5659480e5d79dab3399e35539b240e7eb9f598999c28015a504605f88bf84b33.json => query-eb3ab0f132b04a2d1c9cc2c050273c2f1906015389191f50d0a91bcb40c30e42.json} (80%) diff --git a/core/lib/dal/.sqlx/query-23be43bf705d679ca751c89353716065fcad42c6b621efb3a135a16b477dcfd9.json b/core/lib/dal/.sqlx/query-49e09e7ae0e7671aaaacb5a81c97ee0993b94cf574784b8b117eaacf37579328.json similarity index 77% rename from core/lib/dal/.sqlx/query-23be43bf705d679ca751c89353716065fcad42c6b621efb3a135a16b477dcfd9.json rename to core/lib/dal/.sqlx/query-49e09e7ae0e7671aaaacb5a81c97ee0993b94cf574784b8b117eaacf37579328.json index c0e8bb9d2553..9969ce032733 100644 --- a/core/lib/dal/.sqlx/query-23be43bf705d679ca751c89353716065fcad42c6b621efb3a135a16b477dcfd9.json +++ b/core/lib/dal/.sqlx/query-49e09e7ae0e7671aaaacb5a81c97ee0993b94cf574784b8b117eaacf37579328.json @@ -1,6 +1,6 @@ { "db_name": "PostgreSQL", - "query": "\n SELECT\n *\n FROM\n eth_txs\n WHERE\n confirmed_eth_tx_history_id IS NULL\n AND id <= (\n SELECT\n COALESCE(MAX(eth_tx_id), 0)\n FROM\n eth_txs_history\n WHERE\n sent_at_block IS NOT NULL\n )\n ORDER BY\n id\n ", + "query": "\n SELECT\n *\n FROM\n eth_txs\n WHERE\n from_addr IS NOT DISTINCT FROM $1\n AND confirmed_eth_tx_history_id IS NULL\n AND id <= (\n SELECT\n COALESCE(MAX(eth_tx_id), 0)\n FROM\n eth_txs_history\n WHERE\n sent_at_block IS NOT NULL\n )\n ORDER BY\n id\n ", "describe": { "columns": [ { @@ -75,7 +75,9 @@ } ], "parameters": { - "Left": [] + "Left": [ + "Bytea" + ] }, "nullable": [ false, @@ -94,5 +96,5 @@ true ] }, - "hash": "23be43bf705d679ca751c89353716065fcad42c6b621efb3a135a16b477dcfd9" + "hash": "49e09e7ae0e7671aaaacb5a81c97ee0993b94cf574784b8b117eaacf37579328" } diff --git a/core/lib/dal/.sqlx/query-5659480e5d79dab3399e35539b240e7eb9f598999c28015a504605f88bf84b33.json b/core/lib/dal/.sqlx/query-eb3ab0f132b04a2d1c9cc2c050273c2f1906015389191f50d0a91bcb40c30e42.json similarity index 80% rename from core/lib/dal/.sqlx/query-5659480e5d79dab3399e35539b240e7eb9f598999c28015a504605f88bf84b33.json rename to core/lib/dal/.sqlx/query-eb3ab0f132b04a2d1c9cc2c050273c2f1906015389191f50d0a91bcb40c30e42.json index 5948d75785b2..157e5ae6db7e 100644 --- a/core/lib/dal/.sqlx/query-5659480e5d79dab3399e35539b240e7eb9f598999c28015a504605f88bf84b33.json +++ b/core/lib/dal/.sqlx/query-eb3ab0f132b04a2d1c9cc2c050273c2f1906015389191f50d0a91bcb40c30e42.json @@ -1,6 +1,6 @@ { "db_name": "PostgreSQL", - "query": "\n SELECT\n *\n FROM\n eth_txs\n WHERE\n id > (\n SELECT\n COALESCE(MAX(eth_tx_id), 0)\n FROM\n eth_txs_history\n )\n ORDER BY\n id\n LIMIT\n $1\n ", + "query": "\n SELECT\n *\n FROM\n eth_txs\n WHERE\n from_addr IS NOT DISTINCT FROM $2\n AND id > (\n SELECT\n COALESCE(MAX(eth_tx_id), 0)\n FROM\n eth_txs_history\n )\n ORDER BY\n id\n LIMIT\n $1\n ", "describe": { "columns": [ { @@ -76,7 +76,8 @@ ], "parameters": { "Left": [ - "Int8" + "Int8", + "Bytea" ] }, "nullable": [ @@ -96,5 +97,5 @@ true ] }, - "hash": "5659480e5d79dab3399e35539b240e7eb9f598999c28015a504605f88bf84b33" + "hash": "eb3ab0f132b04a2d1c9cc2c050273c2f1906015389191f50d0a91bcb40c30e42" } diff --git a/core/lib/dal/src/eth_sender_dal.rs b/core/lib/dal/src/eth_sender_dal.rs index d32ed082131e..5c6ef54996ef 100644 --- a/core/lib/dal/src/eth_sender_dal.rs +++ b/core/lib/dal/src/eth_sender_dal.rs @@ -22,7 +22,10 @@ pub struct EthSenderDal<'a, 'c> { } impl EthSenderDal<'_, '_> { - pub async fn get_inflight_txs(&mut self) -> sqlx::Result> { + pub async fn get_inflight_txs( + &mut self, + operator_address: Option
, + ) -> sqlx::Result> { let txs = sqlx::query_as!( StorageEthTx, r#" @@ -31,7 +34,8 @@ impl EthSenderDal<'_, '_> { FROM eth_txs WHERE - confirmed_eth_tx_history_id IS NULL + from_addr IS NOT DISTINCT FROM $1 + AND confirmed_eth_tx_history_id IS NULL AND id <= ( SELECT COALESCE(MAX(eth_tx_id), 0) @@ -42,7 +46,8 @@ impl EthSenderDal<'_, '_> { ) ORDER BY id - "# + "#, + operator_address.as_ref().map(|h160| h160.as_bytes()), ) .fetch_all(self.storage.conn()) .await?; @@ -121,7 +126,11 @@ impl EthSenderDal<'_, '_> { .map(Into::into)) } - pub async fn get_new_eth_txs(&mut self, limit: u64) -> sqlx::Result> { + pub async fn get_new_eth_txs( + &mut self, + limit: u64, + operator_address: &Option
, + ) -> sqlx::Result> { let txs = sqlx::query_as!( StorageEthTx, r#" @@ -130,7 +139,8 @@ impl EthSenderDal<'_, '_> { FROM eth_txs WHERE - id > ( + from_addr IS NOT DISTINCT FROM $2 + AND id > ( SELECT COALESCE(MAX(eth_tx_id), 0) FROM @@ -141,7 +151,8 @@ impl EthSenderDal<'_, '_> { LIMIT $1 "#, - limit as i64 + limit as i64, + operator_address.as_ref().map(|h160| h160.as_bytes()), ) .fetch_all(self.storage.conn()) .await?; diff --git a/core/node/eth_sender/src/abstract_l1_interface.rs b/core/node/eth_sender/src/abstract_l1_interface.rs index e9290df2eb14..acc7c265186d 100644 --- a/core/node/eth_sender/src/abstract_l1_interface.rs +++ b/core/node/eth_sender/src/abstract_l1_interface.rs @@ -1,6 +1,7 @@ use std::fmt; use async_trait::async_trait; +use vise::{EncodeLabelSet, EncodeLabelValue}; use zksync_eth_client::{ clients::{DynClient, L1}, BoundEthInterface, EnrichedClientResult, EthInterface, ExecutedTxStatus, FailureInfo, Options, @@ -32,6 +33,13 @@ pub(crate) struct L1BlockNumbers { pub latest: L1BlockNumber, } +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, EncodeLabelSet, EncodeLabelValue)] +#[metrics(label = "type", rename_all = "snake_case")] +pub(crate) enum OperatorType { + NonBlob, + Blob, +} + #[async_trait] pub(super) trait AbstractL1Interface: 'static + Sync + Send + fmt::Debug { async fn failure_reason(&self, tx_hash: H256) -> Option; @@ -51,11 +59,7 @@ pub(super) trait AbstractL1Interface: 'static + Sync + Send + fmt::Debug { async fn get_operator_nonce( &self, block_numbers: L1BlockNumbers, - ) -> Result; - - async fn get_blobs_operator_nonce( - &self, - block_numbers: L1BlockNumbers, + operator_type: OperatorType, ) -> Result, EthSenderError>; async fn sign_tx( @@ -122,28 +126,13 @@ impl AbstractL1Interface for RealL1Interface { async fn get_operator_nonce( &self, block_numbers: L1BlockNumbers, - ) -> Result { - let finalized = self - .ethereum_gateway() - .nonce_at(block_numbers.finalized.0.into()) - .await? - .as_u32() - .into(); - - let latest = self - .ethereum_gateway() - .nonce_at(block_numbers.latest.0.into()) - .await? - .as_u32() - .into(); - Ok(OperatorNonce { finalized, latest }) - } - - async fn get_blobs_operator_nonce( - &self, - block_numbers: L1BlockNumbers, + operator_type: OperatorType, ) -> Result, EthSenderError> { - match &self.ethereum_gateway_blobs() { + let gateway = match operator_type { + OperatorType::NonBlob => Some(self.ethereum_gateway()), + OperatorType::Blob => self.ethereum_gateway_blobs(), + }; + match gateway { None => Ok(None), Some(gateway) => { let finalized = gateway diff --git a/core/node/eth_sender/src/eth_tx_manager.rs b/core/node/eth_sender/src/eth_tx_manager.rs index 8ea4bb98b15f..f4330a7a2b62 100644 --- a/core/node/eth_sender/src/eth_tx_manager.rs +++ b/core/node/eth_sender/src/eth_tx_manager.rs @@ -14,7 +14,9 @@ use zksync_utils::time::seconds_since_epoch; use super::{metrics::METRICS, EthSenderError}; use crate::{ - abstract_l1_interface::{AbstractL1Interface, L1BlockNumbers, OperatorNonce, RealL1Interface}, + abstract_l1_interface::{ + AbstractL1Interface, L1BlockNumbers, OperatorNonce, OperatorType, RealL1Interface, + }, eth_fees_oracle::{EthFees, EthFeesOracle, GasAdjusterFeesOracle}, metrics::TransactionType, }; @@ -240,57 +242,44 @@ impl EthTxManager { } } + pub(crate) fn operator_address(&self, operator_type: OperatorType) -> Option
{ + if operator_type == OperatorType::NonBlob { + None + } else { + self.l1_interface.get_blobs_operator_account() + } + } // Monitors the in-flight transactions, marks mined ones as confirmed, // returns the one that has to be resent (if there is one). - pub(super) async fn monitor_inflight_transactions( + pub(super) async fn monitor_inflight_transactions_single_operator( &mut self, storage: &mut Connection<'_, Core>, l1_block_numbers: L1BlockNumbers, + operator_type: OperatorType, ) -> Result, EthSenderError> { - METRICS.track_block_numbers(&l1_block_numbers); let operator_nonce = self .l1_interface - .get_operator_nonce(l1_block_numbers) - .await?; - - let non_blob_tx_to_resend = self - .apply_inflight_txs_statuses_and_get_first_to_resend( - storage, - l1_block_numbers, - operator_nonce, - None, - ) + .get_operator_nonce(l1_block_numbers, operator_type) .await?; - let blobs_operator_nonce = self - .l1_interface - .get_blobs_operator_nonce(l1_block_numbers) - .await?; - let blobs_operator_address = self.l1_interface.get_blobs_operator_account(); + if let Some(operator_nonce) = operator_nonce { + let inflight_txs = storage + .eth_sender_dal() + .get_inflight_txs(self.operator_address(operator_type)) + .await + .unwrap(); + METRICS.number_of_inflight_txs[&operator_type].set(inflight_txs.len()); - let mut blob_tx_to_resend = None; - if let Some(blobs_operator_nonce) = blobs_operator_nonce { - // need to check if both nonce and address are `Some` - if blobs_operator_address.is_none() { - panic!("blobs_operator_address has to be set its nonce is known; qed"); - } - blob_tx_to_resend = self + Ok(self .apply_inflight_txs_statuses_and_get_first_to_resend( storage, l1_block_numbers, - blobs_operator_nonce, - blobs_operator_address, + operator_nonce, + inflight_txs, ) - .await?; - } - - // We have to resend non-blob transactions first, otherwise in case of a temporary - // spike in activity, all Execute and PublishProof would need to wait until all commit txs - // are sent, which may take some time. We treat them as if they had higher priority. - if non_blob_tx_to_resend.is_some() { - Ok(non_blob_tx_to_resend) + .await?) } else { - Ok(blob_tx_to_resend) + Ok(None) } } @@ -299,12 +288,9 @@ impl EthTxManager { storage: &mut Connection<'_, Core>, l1_block_numbers: L1BlockNumbers, operator_nonce: OperatorNonce, - operator_address: Option
, + inflight_txs: Vec, ) -> Result, EthSenderError> { - let inflight_txs = storage.eth_sender_dal().get_inflight_txs().await.unwrap(); - METRICS.number_of_inflight_txs.set(inflight_txs.len()); - - tracing::trace!( + tracing::info!( "Going through not confirmed txs. \ Block numbers: latest {}, finalized {}, \ operator's nonce: latest {}, finalized {}", @@ -316,17 +302,13 @@ impl EthTxManager { // Not confirmed transactions, ordered by nonce for tx in inflight_txs { - tracing::trace!( + tracing::info!( "Checking tx id: {}, operator_nonce: {:?}, tx nonce: {}", tx.id, operator_nonce, tx.nonce, ); - if tx.from_addr != operator_address { - continue; - } - // If the `operator_nonce.latest` <= `tx.nonce`, this means // that `tx` is not mined and we should resend it. // We only resend the first un-mined transaction. @@ -362,6 +344,12 @@ impl EthTxManager { tx.nonce, ); + tracing::info!( + "Updating status of tx {} of type {} with nonce {}", + tx.id, + tx.tx_type, + tx.nonce + ); match self.check_all_sending_attempts(storage, &tx).await { Ok(Some(tx_status)) => { self.apply_tx_status(storage, &tx, tx_status, l1_block_numbers.finalized) @@ -580,10 +568,11 @@ impl EthTxManager { &mut self, storage: &mut Connection<'_, Core>, current_block: L1BlockNumber, + operator_type: OperatorType, ) { let number_inflight_txs = storage .eth_sender_dal() - .get_inflight_txs() + .get_inflight_txs(self.operator_address(operator_type)) .await .unwrap() .len(); @@ -596,7 +585,10 @@ impl EthTxManager { // Get the new eth tx and create history item for them let new_eth_tx = storage .eth_sender_dal() - .get_new_eth_txs(number_of_available_slots_for_eth_txs) + .get_new_eth_txs( + number_of_available_slots_for_eth_txs, + &self.operator_address(operator_type), + ) .await .unwrap(); @@ -606,6 +598,29 @@ impl EthTxManager { } } + async fn update_statuses_and_resend_if_needed( + &mut self, + storage: &mut Connection<'_, Core>, + l1_block_numbers: L1BlockNumbers, + operator_type: OperatorType, + ) -> Result<(), EthSenderError> { + if let Some((tx, sent_at_block)) = self + .monitor_inflight_transactions_single_operator(storage, l1_block_numbers, operator_type) + .await? + { + // New gas price depends on the time this tx spent in mempool. + let time_in_mempool = l1_block_numbers.latest.0 - sent_at_block; + + // We don't want to return early in case resend does not succeed - + // the error is logged anyway, but early returns will prevent + // sending new operations. + let _ = self + .send_eth_tx(storage, &tx, time_in_mempool, l1_block_numbers.latest) + .await?; + } + Ok(()) + } + #[tracing::instrument(skip(self, storage))] async fn loop_iteration( &mut self, @@ -614,29 +629,30 @@ impl EthTxManager { ) -> Result { let l1_block_numbers = self.l1_interface.get_l1_block_numbers().await?; - self.send_new_eth_txs(storage, l1_block_numbers.latest) - .await; - if l1_block_numbers.latest <= previous_block { // Nothing to do - no new blocks were mined. return Ok(previous_block); } - if let Some((tx, sent_at_block)) = self - .monitor_inflight_transactions(storage, l1_block_numbers) - .await? - { - // New gas price depends on the time this tx spent in mempool. - let time_in_mempool = l1_block_numbers.latest.0 - sent_at_block; + METRICS.track_block_numbers(&l1_block_numbers); - // We don't want to return early in case resend does not succeed - - // the error is logged anyway, but early returns will prevent - // sending new operations. - let _ = self - .send_eth_tx(storage, &tx, time_in_mempool, l1_block_numbers.latest) + let mut last_error = None; + for operator_type in [OperatorType::NonBlob, OperatorType::Blob] { + self.send_new_eth_txs(storage, l1_block_numbers.latest, operator_type) + .await; + let result = self + .update_statuses_and_resend_if_needed(storage, l1_block_numbers, OperatorType::Blob) .await; - } - Ok(l1_block_numbers.latest) + //We don't want an error in sending non-blob transactions interrupt sending blob txs + if let Err(error) = result { + last_error = Some(error); + } + } + if let Some(last_error) = last_error { + Err(last_error) + } else { + Ok(l1_block_numbers.latest) + } } } diff --git a/core/node/eth_sender/src/metrics.rs b/core/node/eth_sender/src/metrics.rs index 32425baa5eee..ff41b6ff9a7e 100644 --- a/core/node/eth_sender/src/metrics.rs +++ b/core/node/eth_sender/src/metrics.rs @@ -8,7 +8,7 @@ use zksync_shared_metrics::{BlockL1Stage, BlockStage, APP_METRICS}; use zksync_types::{aggregated_operations::AggregatedActionType, eth_sender::EthTx}; use zksync_utils::time::seconds_since_epoch; -use crate::abstract_l1_interface::L1BlockNumbers; +use crate::abstract_l1_interface::{L1BlockNumbers, OperatorType}; #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, EncodeLabelSet, EncodeLabelValue)] #[metrics(label = "kind", rename_all = "snake_case")] @@ -98,7 +98,7 @@ pub(super) struct EthSenderMetrics { /// Last L1 block observed by the Ethereum sender. pub last_known_l1_block: Family>, /// Number of in-flight txs produced by the Ethereum sender. - pub number_of_inflight_txs: Gauge, + pub number_of_inflight_txs: Family>, #[metrics(buckets = GAS_BUCKETS)] pub l1_gas_used: Family>, #[metrics(buckets = Buckets::LATENCIES)] diff --git a/core/node/eth_sender/src/tests.rs b/core/node/eth_sender/src/tests.rs index 4853c7bb2299..45835a50c33b 100644 --- a/core/node/eth_sender/src/tests.rs +++ b/core/node/eth_sender/src/tests.rs @@ -28,8 +28,9 @@ use zksync_types::{ }; use crate::{ - abstract_l1_interface::L1BlockNumbers, aggregated_operations::AggregatedOperation, Aggregator, - EthSenderError, EthTxAggregator, EthTxManager, + abstract_l1_interface::{L1BlockNumbers, OperatorType}, + aggregated_operations::AggregatedOperation, + Aggregator, EthSenderError, EthTxAggregator, EthTxManager, }; // Alias to conveniently call static methods of `ETHSender`. @@ -332,7 +333,7 @@ async fn confirm_many( .storage() .await .eth_sender_dal() - .get_inflight_txs() + .get_inflight_txs(tester.manager.operator_address(OperatorType::NonBlob)) .await .unwrap() .len(), @@ -347,9 +348,10 @@ async fn confirm_many( let to_resend = tester .manager - .monitor_inflight_transactions( + .monitor_inflight_transactions_single_operator( &mut tester.conn.connection().await.unwrap(), tester.get_block_numbers().await, + OperatorType::NonBlob, ) .await?; @@ -359,7 +361,7 @@ async fn confirm_many( .storage() .await .eth_sender_dal() - .get_inflight_txs() + .get_inflight_txs(tester.manager.operator_address(OperatorType::NonBlob)) .await .unwrap() .len(), @@ -433,7 +435,7 @@ async fn resend_each_block(commitment_mode: L1BatchCommitmentMode) -> anyhow::Re .storage() .await .eth_sender_dal() - .get_inflight_txs() + .get_inflight_txs(tester.manager.operator_address(OperatorType::NonBlob)) .await .unwrap() .len(), @@ -461,7 +463,11 @@ async fn resend_each_block(commitment_mode: L1BatchCommitmentMode) -> anyhow::Re let (to_resend, _) = tester .manager - .monitor_inflight_transactions(&mut tester.conn.connection().await.unwrap(), block_numbers) + .monitor_inflight_transactions_single_operator( + &mut tester.conn.connection().await.unwrap(), + block_numbers, + OperatorType::NonBlob, + ) .await? .unwrap(); @@ -482,7 +488,7 @@ async fn resend_each_block(commitment_mode: L1BatchCommitmentMode) -> anyhow::Re .storage() .await .eth_sender_dal() - .get_inflight_txs() + .get_inflight_txs(tester.manager.operator_address(OperatorType::NonBlob)) .await .unwrap() .len(), @@ -568,7 +574,7 @@ async fn dont_resend_already_mined(commitment_mode: L1BatchCommitmentMode) -> an .storage() .await .eth_sender_dal() - .get_inflight_txs() + .get_inflight_txs(tester.manager.operator_address(OperatorType::NonBlob)) .await .unwrap() .len(), @@ -582,9 +588,10 @@ async fn dont_resend_already_mined(commitment_mode: L1BatchCommitmentMode) -> an let to_resend = tester .manager - .monitor_inflight_transactions( + .monitor_inflight_transactions_single_operator( &mut tester.conn.connection().await.unwrap(), tester.get_block_numbers().await, + OperatorType::NonBlob, ) .await?; @@ -594,7 +601,7 @@ async fn dont_resend_already_mined(commitment_mode: L1BatchCommitmentMode) -> an .storage() .await .eth_sender_dal() - .get_inflight_txs() + .get_inflight_txs(tester.manager.operator_address(OperatorType::NonBlob)) .await .unwrap() .len(), @@ -680,9 +687,10 @@ async fn three_scenarios(commitment_mode: L1BatchCommitmentMode) -> anyhow::Resu let (to_resend, _) = tester .manager - .monitor_inflight_transactions( + .monitor_inflight_transactions_single_operator( &mut tester.conn.connection().await.unwrap(), tester.get_block_numbers().await, + OperatorType::NonBlob, ) .await? .expect("we should be trying to resend the last tx"); @@ -693,7 +701,7 @@ async fn three_scenarios(commitment_mode: L1BatchCommitmentMode) -> anyhow::Resu .storage() .await .eth_sender_dal() - .get_inflight_txs() + .get_inflight_txs(tester.manager.operator_address(OperatorType::NonBlob)) .await .unwrap() .len(), @@ -767,9 +775,10 @@ async fn failed_eth_tx(commitment_mode: L1BatchCommitmentMode) { .execute_tx(hash, false, EthSenderTester::WAIT_CONFIRMATIONS); tester .manager - .monitor_inflight_transactions( + .monitor_inflight_transactions_single_operator( &mut tester.conn.connection().await.unwrap(), tester.get_block_numbers().await, + OperatorType::NonBlob, ) .await .unwrap(); @@ -1253,9 +1262,20 @@ async fn confirm_tx(tester: &mut EthSenderTester, hash: H256) { .execute_tx(hash, true, EthSenderTester::WAIT_CONFIRMATIONS); tester .manager - .monitor_inflight_transactions( + .monitor_inflight_transactions_single_operator( + &mut tester.conn.connection().await.unwrap(), + tester.get_block_numbers().await, + OperatorType::NonBlob, + ) + .await + .unwrap(); + + tester + .manager + .monitor_inflight_transactions_single_operator( &mut tester.conn.connection().await.unwrap(), tester.get_block_numbers().await, + OperatorType::Blob, ) .await .unwrap(); From 26a1af3138a9b5c4bb2311729b4c2c4d6a4b56ea Mon Sep 17 00:00:00 2001 From: tomg10 Date: Thu, 27 Jun 2024 19:10:18 +0200 Subject: [PATCH 2/8] refactor Signed-off-by: tomg10 --- core/node/eth_sender/src/eth_tx_manager.rs | 39 ++++++---------------- 1 file changed, 10 insertions(+), 29 deletions(-) diff --git a/core/node/eth_sender/src/eth_tx_manager.rs b/core/node/eth_sender/src/eth_tx_manager.rs index f4330a7a2b62..af1459c2825f 100644 --- a/core/node/eth_sender/src/eth_tx_manager.rs +++ b/core/node/eth_sender/src/eth_tx_manager.rs @@ -546,19 +546,13 @@ impl EthTxManager { tracing::info!("Stop signal received, eth_tx_manager is shutting down"); break; } + let l1_block_numbers = self.l1_interface.get_l1_block_numbers().await?; + METRICS.track_block_numbers(&l1_block_numbers); - match self.loop_iteration(&mut storage, last_known_l1_block).await { - Ok(block) => last_known_l1_block = block, - Err(e) => { - // Web3 API request failures can cause this, - // and anything more important is already properly reported. - tracing::warn!("eth_sender error {:?}", e); - if e.is_transient() { - METRICS.l1_transient_errors.inc(); - } - } + if last_known_l1_block < l1_block_numbers.latest { + self.loop_iteration(&mut storage, l1_block_numbers).await; + last_known_l1_block = l1_block_numbers.latest; } - tokio::time::sleep(self.config.tx_poll_period()).await; } Ok(()) @@ -625,18 +619,8 @@ impl EthTxManager { async fn loop_iteration( &mut self, storage: &mut Connection<'_, Core>, - previous_block: L1BlockNumber, - ) -> Result { - let l1_block_numbers = self.l1_interface.get_l1_block_numbers().await?; - - if l1_block_numbers.latest <= previous_block { - // Nothing to do - no new blocks were mined. - return Ok(previous_block); - } - - METRICS.track_block_numbers(&l1_block_numbers); - - let mut last_error = None; + l1_block_numbers: L1BlockNumbers, + ) { for operator_type in [OperatorType::NonBlob, OperatorType::Blob] { self.send_new_eth_txs(storage, l1_block_numbers.latest, operator_type) .await; @@ -646,13 +630,10 @@ impl EthTxManager { //We don't want an error in sending non-blob transactions interrupt sending blob txs if let Err(error) = result { - last_error = Some(error); + // Web3 API request failures can cause this, + // and anything more important is already properly reported. + tracing::warn!("eth_sender error {:?}", error); } } - if let Some(last_error) = last_error { - Err(last_error) - } else { - Ok(l1_block_numbers.latest) - } } } From fb598553e0f9a2ee0808b5249279008349effb73 Mon Sep 17 00:00:00 2001 From: tomg10 Date: Thu, 27 Jun 2024 19:25:43 +0200 Subject: [PATCH 3/8] refactor Signed-off-by: tomg10 --- core/node/eth_sender/src/eth_tx_manager.rs | 3 +++ 1 file changed, 3 insertions(+) diff --git a/core/node/eth_sender/src/eth_tx_manager.rs b/core/node/eth_sender/src/eth_tx_manager.rs index af1459c2825f..fd758b2a28bf 100644 --- a/core/node/eth_sender/src/eth_tx_manager.rs +++ b/core/node/eth_sender/src/eth_tx_manager.rs @@ -633,6 +633,9 @@ impl EthTxManager { // Web3 API request failures can cause this, // and anything more important is already properly reported. tracing::warn!("eth_sender error {:?}", error); + if e.is_transient() { + METRICS.l1_transient_errors.inc(); + } } } } From 47ca705f84666301ba4d9355a157d1ccaa6bad56 Mon Sep 17 00:00:00 2001 From: tomg10 Date: Thu, 27 Jun 2024 19:26:12 +0200 Subject: [PATCH 4/8] refactor Signed-off-by: tomg10 --- core/node/eth_sender/src/eth_tx_manager.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/node/eth_sender/src/eth_tx_manager.rs b/core/node/eth_sender/src/eth_tx_manager.rs index fd758b2a28bf..53b562244d0f 100644 --- a/core/node/eth_sender/src/eth_tx_manager.rs +++ b/core/node/eth_sender/src/eth_tx_manager.rs @@ -633,7 +633,7 @@ impl EthTxManager { // Web3 API request failures can cause this, // and anything more important is already properly reported. tracing::warn!("eth_sender error {:?}", error); - if e.is_transient() { + if error.is_transient() { METRICS.l1_transient_errors.inc(); } } From 9e5891b63ac29583e1c5a2085396941874b56939 Mon Sep 17 00:00:00 2001 From: tomg10 Date: Thu, 27 Jun 2024 21:36:18 +0200 Subject: [PATCH 5/8] fix Signed-off-by: tomg10 --- core/node/eth_sender/src/eth_tx_manager.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/core/node/eth_sender/src/eth_tx_manager.rs b/core/node/eth_sender/src/eth_tx_manager.rs index 53b562244d0f..a99d4110a026 100644 --- a/core/node/eth_sender/src/eth_tx_manager.rs +++ b/core/node/eth_sender/src/eth_tx_manager.rs @@ -621,11 +621,12 @@ impl EthTxManager { storage: &mut Connection<'_, Core>, l1_block_numbers: L1BlockNumbers, ) { + tracing::info!("Loop iteration at block {}", l1_block_numbers.latest); for operator_type in [OperatorType::NonBlob, OperatorType::Blob] { self.send_new_eth_txs(storage, l1_block_numbers.latest, operator_type) .await; let result = self - .update_statuses_and_resend_if_needed(storage, l1_block_numbers, OperatorType::Blob) + .update_statuses_and_resend_if_needed(storage, l1_block_numbers, operator_type) .await; //We don't want an error in sending non-blob transactions interrupt sending blob txs From 55bb806180e870beeef46e253d9b615f689def9c Mon Sep 17 00:00:00 2001 From: tomg10 Date: Fri, 28 Jun 2024 09:25:51 +0200 Subject: [PATCH 6/8] fix Signed-off-by: tomg10 --- core/node/eth_sender/src/eth_tx_manager.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/core/node/eth_sender/src/eth_tx_manager.rs b/core/node/eth_sender/src/eth_tx_manager.rs index a99d4110a026..035203580d5d 100644 --- a/core/node/eth_sender/src/eth_tx_manager.rs +++ b/core/node/eth_sender/src/eth_tx_manager.rs @@ -290,7 +290,7 @@ impl EthTxManager { operator_nonce: OperatorNonce, inflight_txs: Vec, ) -> Result, EthSenderError> { - tracing::info!( + tracing::trace!( "Going through not confirmed txs. \ Block numbers: latest {}, finalized {}, \ operator's nonce: latest {}, finalized {}", @@ -302,7 +302,7 @@ impl EthTxManager { // Not confirmed transactions, ordered by nonce for tx in inflight_txs { - tracing::info!( + tracing::trace!( "Checking tx id: {}, operator_nonce: {:?}, tx nonce: {}", tx.id, operator_nonce, From 992230ff62a61dc9a83d717ca87286554db401d7 Mon Sep 17 00:00:00 2001 From: tomg10 Date: Fri, 28 Jun 2024 10:53:19 +0200 Subject: [PATCH 7/8] PR feedback Signed-off-by: tomg10 --- core/lib/dal/src/eth_sender_dal.rs | 4 ++-- core/node/eth_sender/src/eth_tx_manager.rs | 3 +++ 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/core/lib/dal/src/eth_sender_dal.rs b/core/lib/dal/src/eth_sender_dal.rs index 5c6ef54996ef..bb27cf8c1f68 100644 --- a/core/lib/dal/src/eth_sender_dal.rs +++ b/core/lib/dal/src/eth_sender_dal.rs @@ -34,7 +34,7 @@ impl EthSenderDal<'_, '_> { FROM eth_txs WHERE - from_addr IS NOT DISTINCT FROM $1 + from_addr IS NOT DISTINCT FROM $1 -- can't just use equality as NULL != NULL AND confirmed_eth_tx_history_id IS NULL AND id <= ( SELECT @@ -139,7 +139,7 @@ impl EthSenderDal<'_, '_> { FROM eth_txs WHERE - from_addr IS NOT DISTINCT FROM $2 + from_addr IS NOT DISTINCT FROM $2 -- can't just use equality as NULL != NULL AND id > ( SELECT COALESCE(MAX(eth_tx_id), 0) diff --git a/core/node/eth_sender/src/eth_tx_manager.rs b/core/node/eth_sender/src/eth_tx_manager.rs index 035203580d5d..7e69a23c16ff 100644 --- a/core/node/eth_sender/src/eth_tx_manager.rs +++ b/core/node/eth_sender/src/eth_tx_manager.rs @@ -622,6 +622,9 @@ impl EthTxManager { l1_block_numbers: L1BlockNumbers, ) { tracing::info!("Loop iteration at block {}", l1_block_numbers.latest); + // We can treat those two operators independently as they have different nonces and + // aggregator makes sure that corresponding Commit transaction is confirmed before creating + // a PublishProof transaction for operator_type in [OperatorType::NonBlob, OperatorType::Blob] { self.send_new_eth_txs(storage, l1_block_numbers.latest, operator_type) .await; From 9eda8785919b1ca2e819912b5decc46d79277a8a Mon Sep 17 00:00:00 2001 From: tomg10 Date: Fri, 28 Jun 2024 14:29:54 +0200 Subject: [PATCH 8/8] missing sqlx file update Signed-off-by: tomg10 --- ...7616d847c0b7a62723b395d9b28ca025e6b0b1b7dc9ef93c6b81.json} | 4 ++-- ...2b353b66c501bf73b36b935046a9132f045ab105eaeac30c4a4d.json} | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) rename core/lib/dal/.sqlx/{query-eb3ab0f132b04a2d1c9cc2c050273c2f1906015389191f50d0a91bcb40c30e42.json => query-7e6e8cd0b5217616d847c0b7a62723b395d9b28ca025e6b0b1b7dc9ef93c6b81.json} (80%) rename core/lib/dal/.sqlx/{query-49e09e7ae0e7671aaaacb5a81c97ee0993b94cf574784b8b117eaacf37579328.json => query-aa92b31d0e0a2b353b66c501bf73b36b935046a9132f045ab105eaeac30c4a4d.json} (77%) diff --git a/core/lib/dal/.sqlx/query-eb3ab0f132b04a2d1c9cc2c050273c2f1906015389191f50d0a91bcb40c30e42.json b/core/lib/dal/.sqlx/query-7e6e8cd0b5217616d847c0b7a62723b395d9b28ca025e6b0b1b7dc9ef93c6b81.json similarity index 80% rename from core/lib/dal/.sqlx/query-eb3ab0f132b04a2d1c9cc2c050273c2f1906015389191f50d0a91bcb40c30e42.json rename to core/lib/dal/.sqlx/query-7e6e8cd0b5217616d847c0b7a62723b395d9b28ca025e6b0b1b7dc9ef93c6b81.json index 157e5ae6db7e..6e284803521a 100644 --- a/core/lib/dal/.sqlx/query-eb3ab0f132b04a2d1c9cc2c050273c2f1906015389191f50d0a91bcb40c30e42.json +++ b/core/lib/dal/.sqlx/query-7e6e8cd0b5217616d847c0b7a62723b395d9b28ca025e6b0b1b7dc9ef93c6b81.json @@ -1,6 +1,6 @@ { "db_name": "PostgreSQL", - "query": "\n SELECT\n *\n FROM\n eth_txs\n WHERE\n from_addr IS NOT DISTINCT FROM $2\n AND id > (\n SELECT\n COALESCE(MAX(eth_tx_id), 0)\n FROM\n eth_txs_history\n )\n ORDER BY\n id\n LIMIT\n $1\n ", + "query": "\n SELECT\n *\n FROM\n eth_txs\n WHERE\n from_addr IS NOT DISTINCT FROM $2 -- can't just use equality as NULL != NULL\n AND id > (\n SELECT\n COALESCE(MAX(eth_tx_id), 0)\n FROM\n eth_txs_history\n )\n ORDER BY\n id\n LIMIT\n $1\n ", "describe": { "columns": [ { @@ -97,5 +97,5 @@ true ] }, - "hash": "eb3ab0f132b04a2d1c9cc2c050273c2f1906015389191f50d0a91bcb40c30e42" + "hash": "7e6e8cd0b5217616d847c0b7a62723b395d9b28ca025e6b0b1b7dc9ef93c6b81" } diff --git a/core/lib/dal/.sqlx/query-49e09e7ae0e7671aaaacb5a81c97ee0993b94cf574784b8b117eaacf37579328.json b/core/lib/dal/.sqlx/query-aa92b31d0e0a2b353b66c501bf73b36b935046a9132f045ab105eaeac30c4a4d.json similarity index 77% rename from core/lib/dal/.sqlx/query-49e09e7ae0e7671aaaacb5a81c97ee0993b94cf574784b8b117eaacf37579328.json rename to core/lib/dal/.sqlx/query-aa92b31d0e0a2b353b66c501bf73b36b935046a9132f045ab105eaeac30c4a4d.json index 9969ce032733..b80a10462c07 100644 --- a/core/lib/dal/.sqlx/query-49e09e7ae0e7671aaaacb5a81c97ee0993b94cf574784b8b117eaacf37579328.json +++ b/core/lib/dal/.sqlx/query-aa92b31d0e0a2b353b66c501bf73b36b935046a9132f045ab105eaeac30c4a4d.json @@ -1,6 +1,6 @@ { "db_name": "PostgreSQL", - "query": "\n SELECT\n *\n FROM\n eth_txs\n WHERE\n from_addr IS NOT DISTINCT FROM $1\n AND confirmed_eth_tx_history_id IS NULL\n AND id <= (\n SELECT\n COALESCE(MAX(eth_tx_id), 0)\n FROM\n eth_txs_history\n WHERE\n sent_at_block IS NOT NULL\n )\n ORDER BY\n id\n ", + "query": "\n SELECT\n *\n FROM\n eth_txs\n WHERE\n from_addr IS NOT DISTINCT FROM $1 -- can't just use equality as NULL != NULL\n AND confirmed_eth_tx_history_id IS NULL\n AND id <= (\n SELECT\n COALESCE(MAX(eth_tx_id), 0)\n FROM\n eth_txs_history\n WHERE\n sent_at_block IS NOT NULL\n )\n ORDER BY\n id\n ", "describe": { "columns": [ { @@ -96,5 +96,5 @@ true ] }, - "hash": "49e09e7ae0e7671aaaacb5a81c97ee0993b94cf574784b8b117eaacf37579328" + "hash": "aa92b31d0e0a2b353b66c501bf73b36b935046a9132f045ab105eaeac30c4a4d" }