Skip to content

Commit

Permalink
Merge branch 'main' into daniyar/pla-969-protective-reads-metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
itegulov authored Jun 11, 2024
2 parents 9bea4bc + 6d6b57e commit 60a7123
Show file tree
Hide file tree
Showing 4 changed files with 157 additions and 92 deletions.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

16 changes: 15 additions & 1 deletion core/lib/dal/src/transactions_dal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1927,7 +1927,21 @@ impl TransactionsDal<'_, '_> {
FROM
transactions
WHERE
l1_batch_number = $1
miniblock_number BETWEEN (
SELECT
MIN(number)
FROM
miniblocks
WHERE
miniblocks.l1_batch_number = $1
) AND (
SELECT
MAX(number)
FROM
miniblocks
WHERE
miniblocks.l1_batch_number = $1
)
ORDER BY
miniblock_number,
index_in_block
Expand Down
216 changes: 129 additions & 87 deletions core/node/eth_sender/src/eth_tx_manager.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,8 @@
use std::{sync::Arc, time::Duration};
use std::{
cmp::{max, min},
sync::Arc,
time::Duration,
};

use anyhow::Context as _;
use tokio::sync::watch;
Expand All @@ -13,16 +17,17 @@ use zksync_node_fee_model::l1_gas_price::L1TxParamsProvider;
use zksync_shared_metrics::BlockL1Stage;
use zksync_types::{
aggregated_operations::AggregatedActionType,
eth_sender::{EthTx, EthTxBlobSidecar},
eth_sender::{EthTx, EthTxBlobSidecar, TxHistory},
web3::{BlockId, BlockNumber},
Address, L1BlockNumber, Nonce, EIP_1559_TX_TYPE, EIP_4844_TX_TYPE, H256, U256,
};
use zksync_utils::time::seconds_since_epoch;

use super::{metrics::METRICS, EthSenderError};
use crate::metrics::TransactionType;

#[derive(Debug)]
struct EthFee {
struct EthFees {
base_fee_per_gas: u64,
priority_fee_per_gas: u64,
blob_base_fee_per_gas: Option<u64>,
Expand Down Expand Up @@ -120,64 +125,58 @@ impl EthTxManager {
None
}

async fn calculate_fee(
fn calculate_fees_with_blob_sidecar(
&self,
storage: &mut Connection<'_, Core>,
tx: &EthTx,
time_in_mempool: u32,
) -> Result<EthFee, EthSenderError> {
previous_sent_tx: &Option<TxHistory>,
) -> Result<EthFees, EthSenderError> {
let base_fee_per_gas = self.gas_adjuster.get_base_fee(0);
let priority_fee_per_gas = self.gas_adjuster.get_priority_fee();
let blob_base_fee_per_gas = Some(self.gas_adjuster.get_blob_base_fee());

if tx.blob_sidecar.is_some() {
if time_in_mempool != 0 {
// for blob transactions on re-sending need to double all gas prices
let previous_sent_tx = storage
.eth_sender_dal()
.get_last_sent_eth_tx(tx.id)
.await
.unwrap()
.unwrap();
return Ok(EthFee {
base_fee_per_gas: std::cmp::max(
previous_sent_tx.base_fee_per_gas * 2,
base_fee_per_gas,
),
priority_fee_per_gas: std::cmp::max(
previous_sent_tx.priority_fee_per_gas * 2,
priority_fee_per_gas,
),
blob_base_fee_per_gas: std::cmp::max(
previous_sent_tx.blob_base_fee_per_gas.map(|v| v * 2),
blob_base_fee_per_gas,
),
});
}
return Ok(EthFee {
base_fee_per_gas,
priority_fee_per_gas,
blob_base_fee_per_gas,
if let Some(previous_sent_tx) = previous_sent_tx {
// for blob transactions on re-sending need to double all gas prices
return Ok(EthFees {
base_fee_per_gas: max(previous_sent_tx.base_fee_per_gas * 2, base_fee_per_gas),
priority_fee_per_gas: max(
previous_sent_tx.priority_fee_per_gas * 2,
priority_fee_per_gas,
),
blob_base_fee_per_gas: max(
previous_sent_tx.blob_base_fee_per_gas.map(|v| v * 2),
blob_base_fee_per_gas,
),
});
}
Ok(EthFees {
base_fee_per_gas,
priority_fee_per_gas,
blob_base_fee_per_gas,
})
}

fn calculate_fees_no_blob_sidecar(
&self,
previous_sent_tx: &Option<TxHistory>,
time_in_mempool: u32,
) -> Result<EthFees, EthSenderError> {
let base_fee_per_gas = self.gas_adjuster.get_base_fee(time_in_mempool);

let priority_fee_per_gas = if time_in_mempool != 0 {
METRICS.transaction_resent.inc();
let priority_fee_per_gas = self
.increase_priority_fee(storage, tx.id, base_fee_per_gas)
.await?;
tracing::info!(
"Resending operation {} with base fee {:?} and priority fee {:?}",
tx.id,
if let Some(previous_sent_tx) = previous_sent_tx {
self.verify_base_fee_not_too_low_on_resend(
previous_sent_tx.id,
previous_sent_tx.base_fee_per_gas,
base_fee_per_gas,
priority_fee_per_gas
)?;
}

let mut priority_fee_per_gas = self.gas_adjuster.get_priority_fee();

if let Some(previous_sent_tx) = previous_sent_tx {
// Increase `priority_fee_per_gas` by at least 20% to prevent "replacement transaction under-priced" error.
priority_fee_per_gas = max(
priority_fee_per_gas,
(previous_sent_tx.priority_fee_per_gas * 6) / 5 + 1,
);
priority_fee_per_gas
} else {
self.gas_adjuster.get_priority_fee()
};
}

// Extra check to prevent sending transaction will extremely high priority fee.
if priority_fee_per_gas > self.config.max_acceptable_priority_fee_in_gwei {
Expand All @@ -188,52 +187,53 @@ impl EthTxManager {
);
}

Ok(EthFee {
Ok(EthFees {
base_fee_per_gas,
blob_base_fee_per_gas: None,
priority_fee_per_gas,
})
}

async fn increase_priority_fee(
async fn calculate_fees(
&self,
storage: &mut Connection<'_, Core>,
eth_tx_id: u32,
base_fee_per_gas: u64,
) -> Result<u64, EthSenderError> {
let previous_sent_tx = storage
.eth_sender_dal()
.get_last_sent_eth_tx(eth_tx_id)
.await
.unwrap()
.unwrap();
previous_sent_tx: &Option<TxHistory>,
has_blob_sidecar: bool,
time_in_mempool: u32,
) -> Result<EthFees, EthSenderError> {
match has_blob_sidecar {
true => self.calculate_fees_with_blob_sidecar(previous_sent_tx),
false => self.calculate_fees_no_blob_sidecar(previous_sent_tx, time_in_mempool),
}
}

let previous_base_fee = previous_sent_tx.base_fee_per_gas;
let previous_priority_fee = previous_sent_tx.priority_fee_per_gas;
fn verify_base_fee_not_too_low_on_resend(
&self,
tx_id: u32,
previous_base_fee: u64,
base_fee_to_use: u64,
) -> Result<(), EthSenderError> {
let next_block_minimal_base_fee = self.gas_adjuster.get_next_block_minimal_base_fee();

if base_fee_per_gas <= next_block_minimal_base_fee.min(previous_base_fee) {
if base_fee_to_use <= min(next_block_minimal_base_fee, previous_base_fee) {
// If the base fee is lower than the previous used one
// or is lower than the minimal possible value for the next block, sending is skipped.
tracing::info!(
"Skipping gas adjustment for operation {}, \
base_fee_per_gas: suggested for resending {:?}, previously sent {:?}, next block minimum {:?}",
eth_tx_id,
base_fee_per_gas,
"Base fee too low for resend detected for tx {}, \
suggested base_fee_per_gas {:?}, \
previous_base_fee {:?}, \
next_block_minimal_base_fee {:?}",
tx_id,
base_fee_to_use,
previous_base_fee,
next_block_minimal_base_fee
);
let err = ClientError::Custom("base_fee_per_gas is too low".into());
let err = EnrichedClientError::new(err, "increase_priority_fee")
.with_arg("base_fee_per_gas", &base_fee_per_gas)
.with_arg("base_fee_to_use", &base_fee_to_use)
.with_arg("previous_base_fee", &previous_base_fee)
.with_arg("next_block_minimal_base_fee", &next_block_minimal_base_fee);
return Err(err.into());
}

// Increase `priority_fee_per_gas` by at least 20% to prevent "replacement transaction under-priced" error.
Ok((previous_priority_fee + (previous_priority_fee / 5) + 1)
.max(self.gas_adjuster.get_priority_fee()))
Ok(())
}

pub(crate) async fn send_eth_tx(
Expand All @@ -243,18 +243,59 @@ impl EthTxManager {
time_in_mempool: u32,
current_block: L1BlockNumber,
) -> Result<H256, EthSenderError> {
let EthFee {
let previous_sent_tx = storage
.eth_sender_dal()
.get_last_sent_eth_tx(tx.id)
.await
.unwrap();
let has_blob_sidecar = tx.blob_sidecar.is_some();

let EthFees {
base_fee_per_gas,
priority_fee_per_gas,
blob_base_fee_per_gas,
} = self.calculate_fee(storage, tx, time_in_mempool).await?;
} = self
.calculate_fees(&previous_sent_tx, has_blob_sidecar, time_in_mempool)
.await?;

METRICS.used_base_fee_per_gas.observe(base_fee_per_gas);
METRICS
.used_priority_fee_per_gas
.observe(priority_fee_per_gas);
if let Some(previous_sent_tx) = previous_sent_tx {
METRICS.transaction_resent.inc();
tracing::info!(
"Resending tx {} at block {current_block} with \
base_fee_per_gas {base_fee_per_gas:?}, \
priority_fee_per_gas {priority_fee_per_gas:?}, \
blob_fee_per_gas {blob_base_fee_per_gas:?}, \
previously sent with \
base_fee_per_gas {:?}, \
priority_fee_per_gas {:?}, \
blob_fee_per_gas {:?}, \
",
tx.id,
previous_sent_tx.base_fee_per_gas,
previous_sent_tx.priority_fee_per_gas,
previous_sent_tx.blob_base_fee_per_gas
);
} else {
tracing::info!(
"Sending tx {} at block {current_block} with \
base_fee_per_gas {base_fee_per_gas:?}, \
priority_fee_per_gas {priority_fee_per_gas:?}, \
blob_fee_per_gas {blob_base_fee_per_gas:?}",
tx.id
);
}

if let Some(blob_base_fee_per_gas) = blob_base_fee_per_gas {
METRICS.used_blob_fee_per_gas[&TransactionType::Blob].observe(blob_base_fee_per_gas);
METRICS.used_base_fee_per_gas[&TransactionType::Blob].observe(base_fee_per_gas);
METRICS.used_priority_fee_per_gas[&TransactionType::Blob].observe(priority_fee_per_gas);
} else {
METRICS.used_base_fee_per_gas[&TransactionType::Regular].observe(base_fee_per_gas);
METRICS.used_priority_fee_per_gas[&TransactionType::Regular]
.observe(priority_fee_per_gas);
}

let blob_gas_price = if tx.blob_sidecar.is_some() {
let blob_gas_price = if has_blob_sidecar {
Some(
blob_base_fee_per_gas
.expect("always ready to query blob gas price for blob transactions; qed")
Expand Down Expand Up @@ -293,11 +334,12 @@ impl EthTxManager {
.await
{
tracing::warn!(
"Error when sending new signed tx for tx {}, base_fee_per_gas {}, priority_fee_per_gas: {}: {}",
tx.id,
base_fee_per_gas,
priority_fee_per_gas,
error
"Error Sending tx {} at block {current_block} with \
base_fee_per_gas {base_fee_per_gas:?}, \
priority_fee_per_gas {priority_fee_per_gas:?}, \
blob_fee_per_gas {blob_base_fee_per_gas:?},\
error {error}",
tx.id
);
}
}
Expand Down
13 changes: 11 additions & 2 deletions core/node/eth_sender/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,13 @@ pub(super) enum BlockNumberVariant {
#[metrics(label = "type")]
pub(super) struct ActionTypeLabel(AggregatedActionType);

#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, EncodeLabelSet, EncodeLabelValue)]
#[metrics(label = "transaction_type", rename_all = "snake_case")]
pub(super) enum TransactionType {
Blob,
Regular,
}

impl From<AggregatedActionType> for ActionTypeLabel {
fn from(action_type: AggregatedActionType) -> Self {
Self(action_type)
Expand Down Expand Up @@ -83,9 +90,11 @@ pub(super) struct EthSenderMetrics {
/// Number of transactions resent by the Ethereum sender.
pub transaction_resent: Counter,
#[metrics(buckets = FEE_BUCKETS)]
pub used_base_fee_per_gas: Histogram<u64>,
pub used_base_fee_per_gas: Family<TransactionType, Histogram<u64>>,
#[metrics(buckets = FEE_BUCKETS)]
pub used_priority_fee_per_gas: Family<TransactionType, Histogram<u64>>,
#[metrics(buckets = FEE_BUCKETS)]
pub used_priority_fee_per_gas: Histogram<u64>,
pub used_blob_fee_per_gas: Family<TransactionType, Histogram<u64>>,
/// Last L1 block observed by the Ethereum sender.
pub last_known_l1_block: Family<BlockNumberVariant, Gauge<usize>>,
/// Number of in-flight txs produced by the Ethereum sender.
Expand Down

0 comments on commit 60a7123

Please sign in to comment.