Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(Base token): add cbt metrics #2720

Merged
merged 9 commits into from
Aug 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

28 changes: 28 additions & 0 deletions core/lib/config/src/configs/base_token_adjuster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,12 @@ const DEFAULT_L1_TX_SENDING_MAX_ATTEMPTS: u32 = 3;
/// Default number of milliseconds to sleep between receipt checking attempts
const DEFAULT_L1_RECEIPT_CHECKING_SLEEP_MS: u64 = 30_000;

/// Default maximum number of attempts to fetch price from a remote API
const DEFAULT_PRICE_FETCHING_MAX_ATTEMPTS: u32 = 3;

/// Default number of milliseconds to sleep between price fetching attempts
const DEFAULT_PRICE_FETCHING_SLEEP_MS: u64 = 5_000;

/// Default number of milliseconds to sleep between transaction sending attempts
const DEFAULT_L1_TX_SENDING_SLEEP_MS: u64 = 30_000;

Expand Down Expand Up @@ -73,6 +79,14 @@ pub struct BaseTokenAdjusterConfig {
#[serde(default = "BaseTokenAdjusterConfig::default_l1_tx_sending_sleep_ms")]
pub l1_tx_sending_sleep_ms: u64,

/// Maximum number of attempts to fetch quote from a remote API before failing over
#[serde(default = "BaseTokenAdjusterConfig::default_price_fetching_max_attempts")]
pub price_fetching_max_attempts: u32,

/// Number of seconds to sleep between price fetching attempts
#[serde(default = "BaseTokenAdjusterConfig::default_price_fetching_sleep_ms")]
pub price_fetching_sleep_ms: u64,

/// Defines whether base_token_adjuster should halt the process if there was an error while
/// fetching or persisting the quote. Generally that should be set to false to not to halt
/// the server process if an external api is not available or if L1 is congested.
Expand All @@ -93,6 +107,8 @@ impl Default for BaseTokenAdjusterConfig {
l1_receipt_checking_sleep_ms: Self::default_l1_receipt_checking_sleep_ms(),
l1_tx_sending_max_attempts: Self::default_l1_tx_sending_max_attempts(),
l1_tx_sending_sleep_ms: Self::default_l1_tx_sending_sleep_ms(),
price_fetching_sleep_ms: Self::default_price_fetching_sleep_ms(),
price_fetching_max_attempts: Self::default_price_fetching_max_attempts(),
halt_on_error: Self::default_halt_on_error(),
}
}
Expand Down Expand Up @@ -135,6 +151,10 @@ impl BaseTokenAdjusterConfig {
Duration::from_millis(self.l1_tx_sending_sleep_ms)
}

pub fn price_fetching_sleep_duration(&self) -> Duration {
Duration::from_millis(self.price_fetching_sleep_ms)
}

pub fn default_l1_receipt_checking_max_attempts() -> u32 {
DEFAULT_L1_RECEIPT_CHECKING_MAX_ATTEMPTS
}
Expand All @@ -151,6 +171,14 @@ impl BaseTokenAdjusterConfig {
DEFAULT_L1_TX_SENDING_SLEEP_MS
}

pub fn default_price_fetching_sleep_ms() -> u64 {
DEFAULT_PRICE_FETCHING_SLEEP_MS
}

pub fn default_price_fetching_max_attempts() -> u32 {
DEFAULT_PRICE_FETCHING_MAX_ATTEMPTS
}

pub fn default_max_tx_gas() -> u64 {
DEFAULT_MAX_TX_GAS
}
Expand Down
2 changes: 2 additions & 0 deletions core/lib/config/src/testonly.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1045,6 +1045,8 @@ impl Distribution<configs::base_token_adjuster::BaseTokenAdjusterConfig> for Enc
l1_receipt_checking_sleep_ms: self.sample(rng),
l1_tx_sending_max_attempts: self.sample(rng),
l1_tx_sending_sleep_ms: self.sample(rng),
price_fetching_max_attempts: self.sample(rng),
price_fetching_sleep_ms: self.sample(rng),
halt_on_error: self.sample(rng),
}
}
Expand Down
8 changes: 8 additions & 0 deletions core/lib/env_config/src/base_token_adjuster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ mod tests {
l1_receipt_checking_sleep_ms: 20_000,
l1_tx_sending_max_attempts: 10,
l1_tx_sending_sleep_ms: 30_000,
price_fetching_max_attempts: 20,
price_fetching_sleep_ms: 10_000,
halt_on_error: true,
}
}
Expand All @@ -41,6 +43,8 @@ mod tests {
l1_receipt_checking_sleep_ms: 30_000,
l1_tx_sending_max_attempts: 3,
l1_tx_sending_sleep_ms: 30_000,
price_fetching_max_attempts: 3,
price_fetching_sleep_ms: 5_000,
halt_on_error: false,
}
}
Expand All @@ -58,6 +62,8 @@ mod tests {
BASE_TOKEN_ADJUSTER_L1_RECEIPT_CHECKING_SLEEP_MS=20000
BASE_TOKEN_ADJUSTER_L1_TX_SENDING_MAX_ATTEMPTS=10
BASE_TOKEN_ADJUSTER_L1_TX_SENDING_SLEEP_MS=30000
BASE_TOKEN_ADJUSTER_PRICE_FETCHING_MAX_ATTEMPTS=20
BASE_TOKEN_ADJUSTER_PRICE_FETCHING_SLEEP_MS=10000
BASE_TOKEN_ADJUSTER_HALT_ON_ERROR=true
"#;
lock.set_env(config);
Expand All @@ -79,6 +85,8 @@ mod tests {
"BASE_TOKEN_ADJUSTER_L1_RECEIPT_CHECKING_SLEEP_MS",
"BASE_TOKEN_ADJUSTER_L1_TX_SENDING_MAX_ATTEMPTS",
"BASE_TOKEN_ADJUSTER_L1_TX_SENDING_SLEEP_MS",
"BASE_TOKEN_ADJUSTER_PRICE_FETCHING_MAX_ATTEMPTS",
"BASE_TOKEN_ADJUSTER_PRICE_FETCHING_SLEEP_MS",
"BASE_TOKEN_ADJUSTER_HALT_ON_ERROR",
]);

Expand Down
8 changes: 8 additions & 0 deletions core/lib/protobuf_config/src/base_token_adjuster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,12 @@ impl ProtoRepr for proto::BaseTokenAdjuster {
l1_receipt_checking_max_attempts: self
.l1_receipt_checking_max_attempts
.unwrap_or(Self::Type::default_l1_receipt_checking_max_attempts()),
price_fetching_sleep_ms: self
.price_fetching_sleep_ms
.unwrap_or(Self::Type::default_price_fetching_sleep_ms()),
price_fetching_max_attempts: self
.price_fetching_max_attempts
.unwrap_or(Self::Type::default_price_fetching_max_attempts()),
l1_tx_sending_max_attempts: self
.l1_tx_sending_max_attempts
.unwrap_or(Self::Type::default_l1_tx_sending_max_attempts()),
Expand All @@ -47,6 +53,8 @@ impl ProtoRepr for proto::BaseTokenAdjuster {
l1_receipt_checking_max_attempts: Some(this.l1_receipt_checking_max_attempts),
l1_tx_sending_max_attempts: Some(this.l1_tx_sending_max_attempts),
l1_tx_sending_sleep_ms: Some(this.l1_tx_sending_sleep_ms),
price_fetching_max_attempts: Some(this.price_fetching_max_attempts),
price_fetching_sleep_ms: Some(this.price_fetching_sleep_ms),
max_tx_gas: Some(this.max_tx_gas),
default_priority_fee_per_gas: Some(this.default_priority_fee_per_gas),
max_acceptable_priority_fee_in_gwei: Some(this.max_acceptable_priority_fee_in_gwei),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,4 +13,6 @@ message BaseTokenAdjuster {
optional uint32 l1_tx_sending_max_attempts = 8;
optional uint64 l1_tx_sending_sleep_ms = 9;
optional bool halt_on_error = 10;
optional uint32 price_fetching_max_attempts = 11;
optional uint64 price_fetching_sleep_ms = 12;
}
3 changes: 2 additions & 1 deletion core/node/base_token_adjuster/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@ zksync_external_price_api.workspace = true
zksync_contracts.workspace = true
zksync_eth_client.workspace = true
zksync_node_fee_model.workspace = true

zksync_utils.workspace = true
vise.workspace = true

tokio = { workspace = true, features = ["time"] }
anyhow.workspace = true
Expand Down
150 changes: 93 additions & 57 deletions core/node/base_token_adjuster/src/base_token_ratio_persister.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::{cmp::max, fmt::Debug, sync::Arc, time::Duration};
use std::{cmp::max, fmt::Debug, sync::Arc, time::Instant};

use anyhow::Context as _;
use tokio::{sync::watch, time::sleep};
Expand All @@ -14,6 +14,8 @@ use zksync_types::{
Address, U256,
};

use crate::metrics::{OperationResult, OperationResultLabels, METRICS};

#[derive(Debug, Clone)]
pub struct BaseTokenRatioPersisterL1Params {
pub eth_client: Box<dyn BoundEthInterface>,
Expand Down Expand Up @@ -82,47 +84,7 @@ impl BaseTokenRatioPersister {
// TODO(PE-148): Consider shifting retry upon adding external API redundancy.
let new_ratio = self.retry_fetch_ratio().await?;
self.persist_ratio(new_ratio).await?;

let Some(l1_params) = &self.l1_params else {
return Ok(());
};

let max_attempts = self.config.l1_tx_sending_max_attempts;
let sleep_duration = self.config.l1_tx_sending_sleep_duration();
let mut result: anyhow::Result<()> = Ok(());
let mut prev_base_fee_per_gas: Option<u64> = None;
let mut prev_priority_fee_per_gas: Option<u64> = None;

for attempt in 0..max_attempts {
let (base_fee_per_gas, priority_fee_per_gas) =
self.get_eth_fees(l1_params, prev_base_fee_per_gas, prev_priority_fee_per_gas);

result = self
.send_ratio_to_l1(l1_params, new_ratio, base_fee_per_gas, priority_fee_per_gas)
.await;
if let Some(err) = result.as_ref().err() {
tracing::info!(
"Failed to update base token multiplier on L1, attempt {}, base_fee_per_gas {}, priority_fee_per_gas {}: {}",
attempt + 1,
base_fee_per_gas,
priority_fee_per_gas,
err
);
tokio::time::sleep(sleep_duration).await;
prev_base_fee_per_gas = Some(base_fee_per_gas);
prev_priority_fee_per_gas = Some(priority_fee_per_gas);
} else {
tracing::info!(
"Updated base token multiplier on L1: numerator {}, denominator {}, base_fee_per_gas {}, priority_fee_per_gas {}",
new_ratio.numerator.get(),
new_ratio.denominator.get(),
base_fee_per_gas,
priority_fee_per_gas
);
return result;
}
}
result
self.retry_update_ratio_on_l1(new_ratio).await
}

fn get_eth_fees(
Expand Down Expand Up @@ -157,36 +119,110 @@ impl BaseTokenRatioPersister {
(base_fee_per_gas, priority_fee_per_gas)
}

async fn retry_update_ratio_on_l1(&self, new_ratio: BaseTokenAPIRatio) -> anyhow::Result<()> {
let Some(l1_params) = &self.l1_params else {
return Ok(());
};

let max_attempts = self.config.l1_tx_sending_max_attempts;
let sleep_duration = self.config.l1_tx_sending_sleep_duration();
let mut prev_base_fee_per_gas: Option<u64> = None;
let mut prev_priority_fee_per_gas: Option<u64> = None;
let mut last_error = None;
for attempt in 0..max_attempts {
let (base_fee_per_gas, priority_fee_per_gas) =
self.get_eth_fees(l1_params, prev_base_fee_per_gas, prev_priority_fee_per_gas);

let start_time = Instant::now();
let result = self
.update_ratio_on_l1(l1_params, new_ratio, base_fee_per_gas, priority_fee_per_gas)
.await;

match result {
Ok(x) => {
tracing::info!(
"Updated base token multiplier on L1: numerator {}, denominator {}, base_fee_per_gas {}, priority_fee_per_gas {}",
new_ratio.numerator.get(),
new_ratio.denominator.get(),
base_fee_per_gas,
priority_fee_per_gas
);
METRICS
.l1_gas_used
.set(x.unwrap_or(U256::zero()).low_u128() as u64);
METRICS.l1_update_latency[&OperationResultLabels {
result: OperationResult::Success,
}]
.observe(start_time.elapsed());

return Ok(());
}
Err(err) => {
tracing::info!(
"Failed to update base token multiplier on L1, attempt {}, base_fee_per_gas {}, priority_fee_per_gas {}: {}",
attempt,
base_fee_per_gas,
priority_fee_per_gas,
err
);
METRICS.l1_update_latency[&OperationResultLabels {
result: OperationResult::Failure,
}]
.observe(start_time.elapsed());

tokio::time::sleep(sleep_duration).await;
prev_base_fee_per_gas = Some(base_fee_per_gas);
prev_priority_fee_per_gas = Some(priority_fee_per_gas);
last_error = Some(err)
}
}
}

let error_message = "Failed to update base token multiplier on L1";
Err(last_error
.map(|x| x.context(error_message))
.unwrap_or_else(|| anyhow::anyhow!(error_message)))
}

async fn retry_fetch_ratio(&self) -> anyhow::Result<BaseTokenAPIRatio> {
let sleep_duration = Duration::from_secs(1);
let max_retries = 5;
let mut attempts = 0;
let sleep_duration = self.config.price_fetching_sleep_duration();
let max_retries = self.config.price_fetching_max_attempts;
let mut last_error = None;

loop {
for attempt in 0..max_retries {
let start_time = Instant::now();
match self
.price_api_client
.fetch_ratio(self.base_token_address)
.await
{
Ok(ratio) => {
METRICS.external_price_api_latency[&OperationResultLabels {
result: OperationResult::Success,
}]
.observe(start_time.elapsed());
return Ok(ratio);
}
Err(err) if attempts < max_retries => {
attempts += 1;
Err(err) => {
tracing::warn!(
"Attempt {}/{} to fetch ratio from coingecko failed with err: {}. Retrying...",
attempts,
"Attempt {}/{} to fetch ratio from external price api failed with err: {}. Retrying...",
attempt,
max_retries,
err
);
last_error = Some(err);
METRICS.external_price_api_latency[&OperationResultLabels {
result: OperationResult::Failure,
}]
.observe(start_time.elapsed());
sleep(sleep_duration).await;
}
Err(err) => {
return Err(err)
.context("Failed to fetch base token ratio after multiple attempts");
}
}
}
let error_message = "Failed to fetch base token ratio after multiple attempts";
Err(last_error
.map(|x| x.context(error_message))
.unwrap_or_else(|| anyhow::anyhow!(error_message)))
}

async fn persist_ratio(&self, api_ratio: BaseTokenAPIRatio) -> anyhow::Result<usize> {
Expand All @@ -209,13 +245,13 @@ impl BaseTokenRatioPersister {
Ok(id)
}

async fn send_ratio_to_l1(
async fn update_ratio_on_l1(
&self,
l1_params: &BaseTokenRatioPersisterL1Params,
api_ratio: BaseTokenAPIRatio,
base_fee_per_gas: u64,
priority_fee_per_gas: u64,
) -> anyhow::Result<()> {
) -> anyhow::Result<Option<U256>> {
let fn_set_token_multiplier = l1_params
.chain_admin_contract
.function("setTokenMultiplier")
Expand Down Expand Up @@ -276,7 +312,7 @@ impl BaseTokenRatioPersister {
.context("failed getting receipt for `setTokenMultiplier` transaction")?;
if let Some(receipt) = maybe_receipt {
if receipt.status == Some(1.into()) {
return Ok(());
return Ok(receipt.gas_used);
}
return Err(anyhow::Error::msg(format!(
"`setTokenMultiplier` transaction {:?} failed with status {:?}",
Expand Down
1 change: 1 addition & 0 deletions core/node/base_token_adjuster/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,4 @@ pub use self::{

mod base_token_ratio_persister;
mod base_token_ratio_provider;
mod metrics;
Loading
Loading