Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(Base token): add cbt metrics #2720

Merged
merged 9 commits into from
Aug 23, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

28 changes: 28 additions & 0 deletions core/lib/config/src/configs/base_token_adjuster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,12 @@ const DEFAULT_L1_TX_SENDING_MAX_ATTEMPTS: u32 = 3;
/// Default number of milliseconds to sleep between receipt checking attempts
const DEFAULT_L1_RECEIPT_CHECKING_SLEEP_MS: u64 = 30_000;

/// Default maximum number of attempts to fetch price from a remote API
const DEFAULT_PRICE_FETCHING_MAX_ATTEMPTS: u32 = 3;

/// Default number of milliseconds to sleep between price fetching attempts
const DEFAULT_PRICE_FETCHING_SLEEP_MS: u64 = 5_000;

/// Default number of milliseconds to sleep between transaction sending attempts
const DEFAULT_L1_TX_SENDING_SLEEP_MS: u64 = 30_000;

Expand Down Expand Up @@ -73,6 +79,14 @@ pub struct BaseTokenAdjusterConfig {
#[serde(default = "BaseTokenAdjusterConfig::default_l1_tx_sending_sleep_ms")]
pub l1_tx_sending_sleep_ms: u64,

/// Maximum number of attempts to fetch quote from a remote API before failing over
#[serde(default = "BaseTokenAdjusterConfig::default_price_fetching_max_attempts")]
pub price_fetching_max_attempts: u32,

/// Number of seconds to sleep between price fetching attempts
#[serde(default = "BaseTokenAdjusterConfig::default_price_fetching_sleep_ms")]
pub price_fetching_sleep_ms: u64,

/// Defines whether base_token_adjuster should halt the process if there was an error while
/// fetching or persisting the quote. Generally that should be set to false to not to halt
/// the server process if an external api is not available or if L1 is congested.
Expand All @@ -93,6 +107,8 @@ impl Default for BaseTokenAdjusterConfig {
l1_receipt_checking_sleep_ms: Self::default_l1_receipt_checking_sleep_ms(),
l1_tx_sending_max_attempts: Self::default_l1_tx_sending_max_attempts(),
l1_tx_sending_sleep_ms: Self::default_l1_tx_sending_sleep_ms(),
price_fetching_sleep_ms: Self::default_price_fetching_sleep_ms(),
price_fetching_max_attempts: Self::default_price_fetching_max_attempts(),
halt_on_error: Self::default_halt_on_error(),
}
}
Expand Down Expand Up @@ -135,6 +151,10 @@ impl BaseTokenAdjusterConfig {
Duration::from_millis(self.l1_tx_sending_sleep_ms)
}

pub fn price_fetching_sleep_duration(&self) -> Duration {
Duration::from_millis(self.price_fetching_sleep_ms)
}

pub fn default_l1_receipt_checking_max_attempts() -> u32 {
DEFAULT_L1_RECEIPT_CHECKING_MAX_ATTEMPTS
}
Expand All @@ -151,6 +171,14 @@ impl BaseTokenAdjusterConfig {
DEFAULT_L1_TX_SENDING_SLEEP_MS
}

pub fn default_price_fetching_sleep_ms() -> u64 {
DEFAULT_PRICE_FETCHING_SLEEP_MS
}

pub fn default_price_fetching_max_attempts() -> u32 {
DEFAULT_PRICE_FETCHING_MAX_ATTEMPTS
}

pub fn default_max_tx_gas() -> u64 {
DEFAULT_MAX_TX_GAS
}
Expand Down
2 changes: 2 additions & 0 deletions core/lib/config/src/testonly.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1045,6 +1045,8 @@ impl Distribution<configs::base_token_adjuster::BaseTokenAdjusterConfig> for Enc
l1_receipt_checking_sleep_ms: self.sample(rng),
l1_tx_sending_max_attempts: self.sample(rng),
l1_tx_sending_sleep_ms: self.sample(rng),
price_fetching_max_attempts: self.sample(rng),
price_fetching_sleep_ms: self.sample(rng),
halt_on_error: self.sample(rng),
}
}
Expand Down
8 changes: 8 additions & 0 deletions core/lib/env_config/src/base_token_adjuster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ mod tests {
l1_receipt_checking_sleep_ms: 20_000,
l1_tx_sending_max_attempts: 10,
l1_tx_sending_sleep_ms: 30_000,
price_fetching_max_attempts: 20,
price_fetching_sleep_ms: 10_000,
halt_on_error: true,
}
}
Expand All @@ -41,6 +43,8 @@ mod tests {
l1_receipt_checking_sleep_ms: 30_000,
l1_tx_sending_max_attempts: 3,
l1_tx_sending_sleep_ms: 30_000,
price_fetching_max_attempts: 3,
price_fetching_sleep_ms: 5_000,
halt_on_error: false,
}
}
Expand All @@ -58,6 +62,8 @@ mod tests {
BASE_TOKEN_ADJUSTER_L1_RECEIPT_CHECKING_SLEEP_MS=20000
BASE_TOKEN_ADJUSTER_L1_TX_SENDING_MAX_ATTEMPTS=10
BASE_TOKEN_ADJUSTER_L1_TX_SENDING_SLEEP_MS=30000
BASE_TOKEN_ADJUSTER_PRICE_FETCHING_MAX_ATTEMPTS=20
BASE_TOKEN_ADJUSTER_PRICE_FETCHING_SLEEP_MS=10000
BASE_TOKEN_ADJUSTER_HALT_ON_ERROR=true
"#;
lock.set_env(config);
Expand All @@ -79,6 +85,8 @@ mod tests {
"BASE_TOKEN_ADJUSTER_L1_RECEIPT_CHECKING_SLEEP_MS",
"BASE_TOKEN_ADJUSTER_L1_TX_SENDING_MAX_ATTEMPTS",
"BASE_TOKEN_ADJUSTER_L1_TX_SENDING_SLEEP_MS",
"BASE_TOKEN_ADJUSTER_PRICE_FETCHING_MAX_ATTEMPTS",
"BASE_TOKEN_ADJUSTER_PRICE_FETCHING_SLEEP_MS",
"BASE_TOKEN_ADJUSTER_HALT_ON_ERROR",
]);

Expand Down
8 changes: 8 additions & 0 deletions core/lib/protobuf_config/src/base_token_adjuster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,12 @@ impl ProtoRepr for proto::BaseTokenAdjuster {
l1_receipt_checking_max_attempts: self
.l1_receipt_checking_max_attempts
.unwrap_or(Self::Type::default_l1_receipt_checking_max_attempts()),
price_fetching_sleep_ms: self
.price_fetching_sleep_ms
.unwrap_or(Self::Type::default_price_fetching_sleep_ms()),
price_fetching_max_attempts: self
.price_fetching_max_attempts
.unwrap_or(Self::Type::default_price_fetching_max_attempts()),
l1_tx_sending_max_attempts: self
.l1_tx_sending_max_attempts
.unwrap_or(Self::Type::default_l1_tx_sending_max_attempts()),
Expand All @@ -47,6 +53,8 @@ impl ProtoRepr for proto::BaseTokenAdjuster {
l1_receipt_checking_max_attempts: Some(this.l1_receipt_checking_max_attempts),
l1_tx_sending_max_attempts: Some(this.l1_tx_sending_max_attempts),
l1_tx_sending_sleep_ms: Some(this.l1_tx_sending_sleep_ms),
price_fetching_max_attempts: Some(this.price_fetching_max_attempts),
price_fetching_sleep_ms: Some(this.price_fetching_sleep_ms),
max_tx_gas: Some(this.max_tx_gas),
default_priority_fee_per_gas: Some(this.default_priority_fee_per_gas),
max_acceptable_priority_fee_in_gwei: Some(this.max_acceptable_priority_fee_in_gwei),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,4 +13,6 @@ message BaseTokenAdjuster {
optional uint32 l1_tx_sending_max_attempts = 8;
optional uint64 l1_tx_sending_sleep_ms = 9;
optional bool halt_on_error = 10;
optional uint32 price_fetching_max_attempts = 11;
optional uint64 price_fetching_sleep_ms = 12;
}
3 changes: 2 additions & 1 deletion core/node/base_token_adjuster/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@ zksync_external_price_api.workspace = true
zksync_contracts.workspace = true
zksync_eth_client.workspace = true
zksync_node_fee_model.workspace = true

zksync_utils.workspace = true
vise.workspace = true

tokio = { workspace = true, features = ["time"] }
anyhow.workspace = true
Expand Down
163 changes: 107 additions & 56 deletions core/node/base_token_adjuster/src/base_token_ratio_persister.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@ use zksync_types::{
web3::{contract::Tokenize, BlockNumber},
Address, U256,
};
use zksync_utils::time::millis_since_epoch;

use crate::metrics::{OperationResult, OperationResultLabels, METRICS};

#[derive(Debug, Clone)]
pub struct BaseTokenRatioPersisterL1Params {
Expand Down Expand Up @@ -82,47 +85,7 @@ impl BaseTokenRatioPersister {
// TODO(PE-148): Consider shifting retry upon adding external API redundancy.
let new_ratio = self.retry_fetch_ratio().await?;
self.persist_ratio(new_ratio).await?;

let Some(l1_params) = &self.l1_params else {
return Ok(());
};

let max_attempts = self.config.l1_tx_sending_max_attempts;
let sleep_duration = self.config.l1_tx_sending_sleep_duration();
let mut result: anyhow::Result<()> = Ok(());
let mut prev_base_fee_per_gas: Option<u64> = None;
let mut prev_priority_fee_per_gas: Option<u64> = None;

for attempt in 0..max_attempts {
let (base_fee_per_gas, priority_fee_per_gas) =
self.get_eth_fees(l1_params, prev_base_fee_per_gas, prev_priority_fee_per_gas);

result = self
.send_ratio_to_l1(l1_params, new_ratio, base_fee_per_gas, priority_fee_per_gas)
.await;
if let Some(err) = result.as_ref().err() {
tracing::info!(
"Failed to update base token multiplier on L1, attempt {}, base_fee_per_gas {}, priority_fee_per_gas {}: {}",
attempt + 1,
base_fee_per_gas,
priority_fee_per_gas,
err
);
tokio::time::sleep(sleep_duration).await;
prev_base_fee_per_gas = Some(base_fee_per_gas);
prev_priority_fee_per_gas = Some(priority_fee_per_gas);
} else {
tracing::info!(
"Updated base token multiplier on L1: numerator {}, denominator {}, base_fee_per_gas {}, priority_fee_per_gas {}",
new_ratio.numerator.get(),
new_ratio.denominator.get(),
base_fee_per_gas,
priority_fee_per_gas
);
return result;
}
}
result
self.retry_update_ratio_on_l1(new_ratio).await
}

fn get_eth_fees(
Expand Down Expand Up @@ -157,36 +120,120 @@ impl BaseTokenRatioPersister {
(base_fee_per_gas, priority_fee_per_gas)
}

async fn retry_update_ratio_on_l1(&self, new_ratio: BaseTokenAPIRatio) -> anyhow::Result<()> {
let Some(l1_params) = &self.l1_params else {
return Ok(());
};

let start_time = millis_since_epoch();
ischasny marked this conversation as resolved.
Show resolved Hide resolved
let max_attempts = self.config.l1_tx_sending_max_attempts;
let sleep_duration = self.config.l1_tx_sending_sleep_duration();
let mut prev_base_fee_per_gas: Option<u64> = None;
let mut prev_priority_fee_per_gas: Option<u64> = None;
let mut last_error = None;
for attempt in 0..max_attempts {
let (base_fee_per_gas, priority_fee_per_gas) =
self.get_eth_fees(l1_params, prev_base_fee_per_gas, prev_priority_fee_per_gas);

let result = self
.update_ratio_on_l1(l1_params, new_ratio, base_fee_per_gas, priority_fee_per_gas)
.await;

match result {
Ok(x) => {
tracing::info!(
"Updated base token multiplier on L1: numerator {}, denominator {}, base_fee_per_gas {}, priority_fee_per_gas {}",
new_ratio.numerator.get(),
new_ratio.denominator.get(),
base_fee_per_gas,
priority_fee_per_gas
);
ischasny marked this conversation as resolved.
Show resolved Hide resolved
METRICS
.l1_gas_used
.observe(x.unwrap_or(U256::zero()).low_u128() as f64);
METRICS.l1_update_latency[&OperationResultLabels {
result: OperationResult::Success,
attempts: attempt,
}]
.observe(Self::duration_from_millis(
millis_since_epoch() - start_time,
));
return Ok(());
}
Err(err) => {
tracing::info!(
"Failed to update base token multiplier on L1, attempt {}, base_fee_per_gas {}, priority_fee_per_gas {}: {}",
attempt,
base_fee_per_gas,
priority_fee_per_gas,
err
ischasny marked this conversation as resolved.
Show resolved Hide resolved
);
tokio::time::sleep(sleep_duration).await;
prev_base_fee_per_gas = Some(base_fee_per_gas);
prev_priority_fee_per_gas = Some(priority_fee_per_gas);
last_error = Some(err)
}
}
}
METRICS.l1_update_latency[&OperationResultLabels {
result: OperationResult::Failure,
attempts: max_attempts,
}]
.observe(Self::duration_from_millis(
millis_since_epoch() - start_time,
));
let error_message = "Failed to update base token multiplier on L1";
Err(last_error
.map(|x| x.context(error_message))
.unwrap_or_else(|| anyhow::anyhow!(error_message)))
}

async fn retry_fetch_ratio(&self) -> anyhow::Result<BaseTokenAPIRatio> {
let sleep_duration = Duration::from_secs(1);
let max_retries = 5;
let mut attempts = 0;
let start_time = millis_since_epoch();
let sleep_duration = self.config.price_fetching_sleep_duration();
let max_retries = self.config.price_fetching_max_attempts;
let mut last_error = None;

loop {
for attempt in 0..max_retries {
match self
.price_api_client
.fetch_ratio(self.base_token_address)
.await
{
Ok(ratio) => {
METRICS.external_price_api_latency[&OperationResultLabels {
result: OperationResult::Success,
attempts: attempt,
}]
.observe(Self::duration_from_millis(
millis_since_epoch() - start_time,
));
return Ok(ratio);
}
Err(err) if attempts < max_retries => {
attempts += 1;
Err(err) => {
tracing::warn!(
"Attempt {}/{} to fetch ratio from coingecko failed with err: {}. Retrying...",
attempts,
"Attempt {}/{} to fetch ratio from external price api failed with err: {}. Retrying...",
attempt,
max_retries,
err
);
last_error = Some(err);
sleep(sleep_duration).await;
}
Err(err) => {
return Err(err)
.context("Failed to fetch base token ratio after multiple attempts");
}
}
}
METRICS.external_price_api_latency[&OperationResultLabels {
result: OperationResult::Failure,
attempts: max_retries,
}]
.observe(Self::duration_from_millis(
millis_since_epoch() - start_time,
));

let error_message = "Failed to fetch base token ratio after multiple attempts";
Err(last_error
.map(|x| x.context(error_message))
.unwrap_or_else(|| anyhow::anyhow!(error_message)))
}

async fn persist_ratio(&self, api_ratio: BaseTokenAPIRatio) -> anyhow::Result<usize> {
Expand All @@ -209,13 +256,13 @@ impl BaseTokenRatioPersister {
Ok(id)
}

async fn send_ratio_to_l1(
async fn update_ratio_on_l1(
&self,
l1_params: &BaseTokenRatioPersisterL1Params,
api_ratio: BaseTokenAPIRatio,
base_fee_per_gas: u64,
priority_fee_per_gas: u64,
) -> anyhow::Result<()> {
) -> anyhow::Result<Option<U256>> {
let fn_set_token_multiplier = l1_params
.chain_admin_contract
.function("setTokenMultiplier")
Expand Down Expand Up @@ -276,7 +323,7 @@ impl BaseTokenRatioPersister {
.context("failed getting receipt for `setTokenMultiplier` transaction")?;
if let Some(receipt) = maybe_receipt {
if receipt.status == Some(1.into()) {
return Ok(());
return Ok(receipt.gas_used);
}
return Err(anyhow::Error::msg(format!(
"`setTokenMultiplier` transaction {:?} failed with status {:?}",
Expand All @@ -293,4 +340,8 @@ impl BaseTokenRatioPersister {
max_attempts
)))
}

fn duration_from_millis(millis: u128) -> Duration {
Duration::from_millis(millis as u64)
}
}
1 change: 1 addition & 0 deletions core/node/base_token_adjuster/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,4 @@ pub use self::{

mod base_token_ratio_persister;
mod base_token_ratio_provider;
mod metrics;
Loading
Loading