From 06ec5f3e6bb66025a3ec1e5b4d314c7ff1e116c7 Mon Sep 17 00:00:00 2001 From: perekopskiy <53865202+perekopskiy@users.noreply.github.com> Date: Tue, 11 Jun 2024 10:55:11 +0300 Subject: [PATCH 1/2] fix(db): Optimize `get_l2_blocks_to_execute_for_l1_batch` (#2199) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ## What ❔ Optimize `get_l2_blocks_to_execute_for_l1_batch` ## Why ❔ `transactions.l1_batch_number` is not indexed ## Checklist - [ ] PR title corresponds to the body of PR (we generate changelog entries from PRs). - [ ] Tests for the changes have been added / updated. - [ ] Documentation comments have been added / updated. - [ ] Code has been formatted via `zk fmt` and `zk lint`. - [ ] Spellcheck has been run via `zk spellcheck`. --- ...fb7a33a8fea8ab7fdefb7d9210673245a2a6f6c.json} | 4 ++-- core/lib/dal/src/transactions_dal.rs | 16 +++++++++++++++- 2 files changed, 17 insertions(+), 3 deletions(-) rename core/lib/dal/.sqlx/{query-f63586d59264eab7388ad1de823227ecaa45d76d1ba260074898fe57c059a15a.json => query-f023e5fa599b279acd6ac02dffb7a33a8fea8ab7fdefb7d9210673245a2a6f6c.json} (86%) diff --git a/core/lib/dal/.sqlx/query-f63586d59264eab7388ad1de823227ecaa45d76d1ba260074898fe57c059a15a.json b/core/lib/dal/.sqlx/query-f023e5fa599b279acd6ac02dffb7a33a8fea8ab7fdefb7d9210673245a2a6f6c.json similarity index 86% rename from core/lib/dal/.sqlx/query-f63586d59264eab7388ad1de823227ecaa45d76d1ba260074898fe57c059a15a.json rename to core/lib/dal/.sqlx/query-f023e5fa599b279acd6ac02dffb7a33a8fea8ab7fdefb7d9210673245a2a6f6c.json index d62e213ef57b..2cd001b274da 100644 --- a/core/lib/dal/.sqlx/query-f63586d59264eab7388ad1de823227ecaa45d76d1ba260074898fe57c059a15a.json +++ b/core/lib/dal/.sqlx/query-f023e5fa599b279acd6ac02dffb7a33a8fea8ab7fdefb7d9210673245a2a6f6c.json @@ -1,6 +1,6 @@ { "db_name": "PostgreSQL", - "query": "\n SELECT\n *\n FROM\n transactions\n WHERE\n l1_batch_number = $1\n ORDER BY\n miniblock_number,\n index_in_block\n ", + "query": "\n SELECT\n *\n FROM\n transactions\n WHERE\n miniblock_number BETWEEN (\n SELECT\n MIN(number)\n FROM\n miniblocks\n WHERE\n miniblocks.l1_batch_number = $1\n ) AND (\n SELECT\n MAX(number)\n FROM\n miniblocks\n WHERE\n miniblocks.l1_batch_number = $1\n )\n ORDER BY\n miniblock_number,\n index_in_block\n ", "describe": { "columns": [ { @@ -228,5 +228,5 @@ true ] }, - "hash": "f63586d59264eab7388ad1de823227ecaa45d76d1ba260074898fe57c059a15a" + "hash": "f023e5fa599b279acd6ac02dffb7a33a8fea8ab7fdefb7d9210673245a2a6f6c" } diff --git a/core/lib/dal/src/transactions_dal.rs b/core/lib/dal/src/transactions_dal.rs index fec3fe04946e..f76b61ec1646 100644 --- a/core/lib/dal/src/transactions_dal.rs +++ b/core/lib/dal/src/transactions_dal.rs @@ -1927,7 +1927,21 @@ impl TransactionsDal<'_, '_> { FROM transactions WHERE - l1_batch_number = $1 + miniblock_number BETWEEN ( + SELECT + MIN(number) + FROM + miniblocks + WHERE + miniblocks.l1_batch_number = $1 + ) AND ( + SELECT + MAX(number) + FROM + miniblocks + WHERE + miniblocks.l1_batch_number = $1 + ) ORDER BY miniblock_number, index_in_block From 6d6b57e83471b9e5c044bbe6021d22a85b95cd33 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tomasz=20Grze=C5=9Bkiewicz?= Date: Tue, 11 Jun 2024 10:01:09 +0200 Subject: [PATCH 2/2] chore(eth-sender): refactor of eth-sender gas fees (#2085) I've added more detailed logs.. I've also generally split the calculations and metrics for fees for blob and non-blob transactions as they are radically different. --------- Signed-off-by: tomg10 --- core/node/eth_sender/src/eth_tx_manager.rs | 216 ++++++++++++--------- core/node/eth_sender/src/metrics.rs | 13 +- 2 files changed, 140 insertions(+), 89 deletions(-) diff --git a/core/node/eth_sender/src/eth_tx_manager.rs b/core/node/eth_sender/src/eth_tx_manager.rs index 87d7ffd2ae47..a158889f26f7 100644 --- a/core/node/eth_sender/src/eth_tx_manager.rs +++ b/core/node/eth_sender/src/eth_tx_manager.rs @@ -1,4 +1,8 @@ -use std::{sync::Arc, time::Duration}; +use std::{ + cmp::{max, min}, + sync::Arc, + time::Duration, +}; use anyhow::Context as _; use tokio::sync::watch; @@ -13,16 +17,17 @@ use zksync_node_fee_model::l1_gas_price::L1TxParamsProvider; use zksync_shared_metrics::BlockL1Stage; use zksync_types::{ aggregated_operations::AggregatedActionType, - eth_sender::{EthTx, EthTxBlobSidecar}, + eth_sender::{EthTx, EthTxBlobSidecar, TxHistory}, web3::{BlockId, BlockNumber}, Address, L1BlockNumber, Nonce, EIP_1559_TX_TYPE, EIP_4844_TX_TYPE, H256, U256, }; use zksync_utils::time::seconds_since_epoch; use super::{metrics::METRICS, EthSenderError}; +use crate::metrics::TransactionType; #[derive(Debug)] -struct EthFee { +struct EthFees { base_fee_per_gas: u64, priority_fee_per_gas: u64, blob_base_fee_per_gas: Option, @@ -120,64 +125,58 @@ impl EthTxManager { None } - async fn calculate_fee( + fn calculate_fees_with_blob_sidecar( &self, - storage: &mut Connection<'_, Core>, - tx: &EthTx, - time_in_mempool: u32, - ) -> Result { + previous_sent_tx: &Option, + ) -> Result { let base_fee_per_gas = self.gas_adjuster.get_base_fee(0); let priority_fee_per_gas = self.gas_adjuster.get_priority_fee(); let blob_base_fee_per_gas = Some(self.gas_adjuster.get_blob_base_fee()); - if tx.blob_sidecar.is_some() { - if time_in_mempool != 0 { - // for blob transactions on re-sending need to double all gas prices - let previous_sent_tx = storage - .eth_sender_dal() - .get_last_sent_eth_tx(tx.id) - .await - .unwrap() - .unwrap(); - return Ok(EthFee { - base_fee_per_gas: std::cmp::max( - previous_sent_tx.base_fee_per_gas * 2, - base_fee_per_gas, - ), - priority_fee_per_gas: std::cmp::max( - previous_sent_tx.priority_fee_per_gas * 2, - priority_fee_per_gas, - ), - blob_base_fee_per_gas: std::cmp::max( - previous_sent_tx.blob_base_fee_per_gas.map(|v| v * 2), - blob_base_fee_per_gas, - ), - }); - } - return Ok(EthFee { - base_fee_per_gas, - priority_fee_per_gas, - blob_base_fee_per_gas, + if let Some(previous_sent_tx) = previous_sent_tx { + // for blob transactions on re-sending need to double all gas prices + return Ok(EthFees { + base_fee_per_gas: max(previous_sent_tx.base_fee_per_gas * 2, base_fee_per_gas), + priority_fee_per_gas: max( + previous_sent_tx.priority_fee_per_gas * 2, + priority_fee_per_gas, + ), + blob_base_fee_per_gas: max( + previous_sent_tx.blob_base_fee_per_gas.map(|v| v * 2), + blob_base_fee_per_gas, + ), }); } + Ok(EthFees { + base_fee_per_gas, + priority_fee_per_gas, + blob_base_fee_per_gas, + }) + } + fn calculate_fees_no_blob_sidecar( + &self, + previous_sent_tx: &Option, + time_in_mempool: u32, + ) -> Result { let base_fee_per_gas = self.gas_adjuster.get_base_fee(time_in_mempool); - - let priority_fee_per_gas = if time_in_mempool != 0 { - METRICS.transaction_resent.inc(); - let priority_fee_per_gas = self - .increase_priority_fee(storage, tx.id, base_fee_per_gas) - .await?; - tracing::info!( - "Resending operation {} with base fee {:?} and priority fee {:?}", - tx.id, + if let Some(previous_sent_tx) = previous_sent_tx { + self.verify_base_fee_not_too_low_on_resend( + previous_sent_tx.id, + previous_sent_tx.base_fee_per_gas, base_fee_per_gas, - priority_fee_per_gas + )?; + } + + let mut priority_fee_per_gas = self.gas_adjuster.get_priority_fee(); + + if let Some(previous_sent_tx) = previous_sent_tx { + // Increase `priority_fee_per_gas` by at least 20% to prevent "replacement transaction under-priced" error. + priority_fee_per_gas = max( + priority_fee_per_gas, + (previous_sent_tx.priority_fee_per_gas * 6) / 5 + 1, ); - priority_fee_per_gas - } else { - self.gas_adjuster.get_priority_fee() - }; + } // Extra check to prevent sending transaction will extremely high priority fee. if priority_fee_per_gas > self.config.max_acceptable_priority_fee_in_gwei { @@ -188,52 +187,53 @@ impl EthTxManager { ); } - Ok(EthFee { + Ok(EthFees { base_fee_per_gas, blob_base_fee_per_gas: None, priority_fee_per_gas, }) } - async fn increase_priority_fee( + async fn calculate_fees( &self, - storage: &mut Connection<'_, Core>, - eth_tx_id: u32, - base_fee_per_gas: u64, - ) -> Result { - let previous_sent_tx = storage - .eth_sender_dal() - .get_last_sent_eth_tx(eth_tx_id) - .await - .unwrap() - .unwrap(); + previous_sent_tx: &Option, + has_blob_sidecar: bool, + time_in_mempool: u32, + ) -> Result { + match has_blob_sidecar { + true => self.calculate_fees_with_blob_sidecar(previous_sent_tx), + false => self.calculate_fees_no_blob_sidecar(previous_sent_tx, time_in_mempool), + } + } - let previous_base_fee = previous_sent_tx.base_fee_per_gas; - let previous_priority_fee = previous_sent_tx.priority_fee_per_gas; + fn verify_base_fee_not_too_low_on_resend( + &self, + tx_id: u32, + previous_base_fee: u64, + base_fee_to_use: u64, + ) -> Result<(), EthSenderError> { let next_block_minimal_base_fee = self.gas_adjuster.get_next_block_minimal_base_fee(); - - if base_fee_per_gas <= next_block_minimal_base_fee.min(previous_base_fee) { + if base_fee_to_use <= min(next_block_minimal_base_fee, previous_base_fee) { // If the base fee is lower than the previous used one // or is lower than the minimal possible value for the next block, sending is skipped. tracing::info!( - "Skipping gas adjustment for operation {}, \ - base_fee_per_gas: suggested for resending {:?}, previously sent {:?}, next block minimum {:?}", - eth_tx_id, - base_fee_per_gas, + "Base fee too low for resend detected for tx {}, \ + suggested base_fee_per_gas {:?}, \ + previous_base_fee {:?}, \ + next_block_minimal_base_fee {:?}", + tx_id, + base_fee_to_use, previous_base_fee, next_block_minimal_base_fee ); let err = ClientError::Custom("base_fee_per_gas is too low".into()); let err = EnrichedClientError::new(err, "increase_priority_fee") - .with_arg("base_fee_per_gas", &base_fee_per_gas) + .with_arg("base_fee_to_use", &base_fee_to_use) .with_arg("previous_base_fee", &previous_base_fee) .with_arg("next_block_minimal_base_fee", &next_block_minimal_base_fee); return Err(err.into()); } - - // Increase `priority_fee_per_gas` by at least 20% to prevent "replacement transaction under-priced" error. - Ok((previous_priority_fee + (previous_priority_fee / 5) + 1) - .max(self.gas_adjuster.get_priority_fee())) + Ok(()) } pub(crate) async fn send_eth_tx( @@ -243,18 +243,59 @@ impl EthTxManager { time_in_mempool: u32, current_block: L1BlockNumber, ) -> Result { - let EthFee { + let previous_sent_tx = storage + .eth_sender_dal() + .get_last_sent_eth_tx(tx.id) + .await + .unwrap(); + let has_blob_sidecar = tx.blob_sidecar.is_some(); + + let EthFees { base_fee_per_gas, priority_fee_per_gas, blob_base_fee_per_gas, - } = self.calculate_fee(storage, tx, time_in_mempool).await?; + } = self + .calculate_fees(&previous_sent_tx, has_blob_sidecar, time_in_mempool) + .await?; - METRICS.used_base_fee_per_gas.observe(base_fee_per_gas); - METRICS - .used_priority_fee_per_gas - .observe(priority_fee_per_gas); + if let Some(previous_sent_tx) = previous_sent_tx { + METRICS.transaction_resent.inc(); + tracing::info!( + "Resending tx {} at block {current_block} with \ + base_fee_per_gas {base_fee_per_gas:?}, \ + priority_fee_per_gas {priority_fee_per_gas:?}, \ + blob_fee_per_gas {blob_base_fee_per_gas:?}, \ + previously sent with \ + base_fee_per_gas {:?}, \ + priority_fee_per_gas {:?}, \ + blob_fee_per_gas {:?}, \ + ", + tx.id, + previous_sent_tx.base_fee_per_gas, + previous_sent_tx.priority_fee_per_gas, + previous_sent_tx.blob_base_fee_per_gas + ); + } else { + tracing::info!( + "Sending tx {} at block {current_block} with \ + base_fee_per_gas {base_fee_per_gas:?}, \ + priority_fee_per_gas {priority_fee_per_gas:?}, \ + blob_fee_per_gas {blob_base_fee_per_gas:?}", + tx.id + ); + } + + if let Some(blob_base_fee_per_gas) = blob_base_fee_per_gas { + METRICS.used_blob_fee_per_gas[&TransactionType::Blob].observe(blob_base_fee_per_gas); + METRICS.used_base_fee_per_gas[&TransactionType::Blob].observe(base_fee_per_gas); + METRICS.used_priority_fee_per_gas[&TransactionType::Blob].observe(priority_fee_per_gas); + } else { + METRICS.used_base_fee_per_gas[&TransactionType::Regular].observe(base_fee_per_gas); + METRICS.used_priority_fee_per_gas[&TransactionType::Regular] + .observe(priority_fee_per_gas); + } - let blob_gas_price = if tx.blob_sidecar.is_some() { + let blob_gas_price = if has_blob_sidecar { Some( blob_base_fee_per_gas .expect("always ready to query blob gas price for blob transactions; qed") @@ -293,11 +334,12 @@ impl EthTxManager { .await { tracing::warn!( - "Error when sending new signed tx for tx {}, base_fee_per_gas {}, priority_fee_per_gas: {}: {}", - tx.id, - base_fee_per_gas, - priority_fee_per_gas, - error + "Error Sending tx {} at block {current_block} with \ + base_fee_per_gas {base_fee_per_gas:?}, \ + priority_fee_per_gas {priority_fee_per_gas:?}, \ + blob_fee_per_gas {blob_base_fee_per_gas:?},\ + error {error}", + tx.id ); } } diff --git a/core/node/eth_sender/src/metrics.rs b/core/node/eth_sender/src/metrics.rs index 50c0e218692f..bd36444780cb 100644 --- a/core/node/eth_sender/src/metrics.rs +++ b/core/node/eth_sender/src/metrics.rs @@ -33,6 +33,13 @@ pub(super) enum BlockNumberVariant { #[metrics(label = "type")] pub(super) struct ActionTypeLabel(AggregatedActionType); +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, EncodeLabelSet, EncodeLabelValue)] +#[metrics(label = "transaction_type", rename_all = "snake_case")] +pub(super) enum TransactionType { + Blob, + Regular, +} + impl From for ActionTypeLabel { fn from(action_type: AggregatedActionType) -> Self { Self(action_type) @@ -83,9 +90,11 @@ pub(super) struct EthSenderMetrics { /// Number of transactions resent by the Ethereum sender. pub transaction_resent: Counter, #[metrics(buckets = FEE_BUCKETS)] - pub used_base_fee_per_gas: Histogram, + pub used_base_fee_per_gas: Family>, + #[metrics(buckets = FEE_BUCKETS)] + pub used_priority_fee_per_gas: Family>, #[metrics(buckets = FEE_BUCKETS)] - pub used_priority_fee_per_gas: Histogram, + pub used_blob_fee_per_gas: Family>, /// Last L1 block observed by the Ethereum sender. pub last_known_l1_block: Family>, /// Number of in-flight txs produced by the Ethereum sender.