diff --git a/Cargo.lock b/Cargo.lock index 8af4e90323c5..1e56778ccbec 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -10589,12 +10589,7 @@ dependencies = [ "secrecy", "serde", "serde_json", - "strum", - "strum_macros", - "time", "tracing", - "url", - "vise", "zksync_basic_types", "zksync_concurrency", "zksync_consensus_utils", @@ -11853,7 +11848,6 @@ dependencies = [ "secrecy", "serde_json", "serde_yaml", - "time", "tracing", "zksync_basic_types", "zksync_config", diff --git a/Cargo.toml b/Cargo.toml index cdbc0d107f1d..2fcfec067d94 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -173,7 +173,6 @@ sqlx = "0.8.1" static_assertions = "1.1" structopt = "0.3.20" strum = "0.26" -strum_macros = "0.26.4" tempfile = "3.0.2" test-casing = "0.1.2" test-log = "0.2.15" diff --git a/core/lib/config/Cargo.toml b/core/lib/config/Cargo.toml index af39e5159ba8..46c0b27d4b03 100644 --- a/core/lib/config/Cargo.toml +++ b/core/lib/config/Cargo.toml @@ -18,15 +18,10 @@ zksync_concurrency.workspace = true zksync_vlog = { workspace = true, optional = true } tracing = { workspace = true, optional = true } -url.workspace = true anyhow.workspace = true rand.workspace = true secrecy.workspace = true serde = { workspace = true, features = ["derive"] } -time = { workspace = true, features = ["serde-human-readable"] } -strum.workspace = true -strum_macros.workspace = true -vise.workspace = true [dev-dependencies] serde_json.workspace = true diff --git a/core/lib/config/src/configs/mod.rs b/core/lib/config/src/configs/mod.rs index 2b848030d719..5bf9a49e0acf 100644 --- a/core/lib/config/src/configs/mod.rs +++ b/core/lib/config/src/configs/mod.rs @@ -60,7 +60,6 @@ pub mod house_keeper; pub mod object_store; pub mod observability; pub mod proof_data_handler; -pub mod prover_autoscaler; pub mod prover_job_monitor; pub mod pruning; pub mod secrets; diff --git a/core/lib/config/src/configs/observability.rs b/core/lib/config/src/configs/observability.rs index 42363cbcb4ff..f7ad2a97b91c 100644 --- a/core/lib/config/src/configs/observability.rs +++ b/core/lib/config/src/configs/observability.rs @@ -1,6 +1,8 @@ +use serde::Deserialize; + /// Configuration for the essential observability stack, like /// logging and sentry integration. -#[derive(Debug, Clone, PartialEq)] +#[derive(Debug, Clone, PartialEq, Deserialize)] pub struct ObservabilityConfig { /// URL of the Sentry instance to send events to. pub sentry_url: Option, @@ -15,7 +17,7 @@ pub struct ObservabilityConfig { pub log_directives: Option, } -#[derive(Debug, Clone, PartialEq)] +#[derive(Debug, Clone, PartialEq, Deserialize)] pub struct OpentelemetryConfig { /// Enables export of span data of specified level (and above) using opentelemetry exporters. pub level: String, diff --git a/core/lib/protobuf_config/Cargo.toml b/core/lib/protobuf_config/Cargo.toml index 87a0a63567ba..92d9bd53978c 100644 --- a/core/lib/protobuf_config/Cargo.toml +++ b/core/lib/protobuf_config/Cargo.toml @@ -26,7 +26,6 @@ rand.workspace = true hex.workspace = true secrecy.workspace = true tracing.workspace = true -time.workspace = true [build-dependencies] zksync_protobuf_build.workspace = true diff --git a/core/lib/protobuf_config/src/lib.rs b/core/lib/protobuf_config/src/lib.rs index 09f42422c511..90c0ba071c0b 100644 --- a/core/lib/protobuf_config/src/lib.rs +++ b/core/lib/protobuf_config/src/lib.rs @@ -28,7 +28,6 @@ mod observability; mod proof_data_handler; pub mod proto; mod prover; -mod prover_autoscaler; mod prover_job_monitor; mod pruning; mod secrets; diff --git a/core/lib/protobuf_config/src/proto/config/prover_autoscaler.proto b/core/lib/protobuf_config/src/proto/config/prover_autoscaler.proto deleted file mode 100644 index 742181653861..000000000000 --- a/core/lib/protobuf_config/src/proto/config/prover_autoscaler.proto +++ /dev/null @@ -1,74 +0,0 @@ -syntax = "proto3"; - -package zksync.config.prover_autoscaler; - -import "zksync/std.proto"; -import "zksync/config/observability.proto"; - -message ProverAutoscalerConfig { - optional std.Duration graceful_shutdown_timeout = 1; // optional - optional ProverAutoscalerAgentConfig agent_config = 2; // optional - optional ProverAutoscalerScalerConfig scaler_config = 3; // optional - optional observability.Observability observability = 4; // optional -} - -message ProverAutoscalerAgentConfig { - optional uint32 prometheus_port = 1; // required - optional uint32 http_port = 2; // required - repeated string namespaces = 3; // optional - optional string cluster_name = 4; // optional - optional bool dry_run = 5; // optional -} - -message ProtocolVersion { - optional string namespace = 1; // required - optional string protocol_version = 2; // required -} - -message ClusterPriority { - optional string cluster = 1; // required - optional uint32 priority = 2; // required -} - -message ProverSpeed { - optional string gpu = 1; // required - optional uint32 speed = 2; // required -} - -message MaxProver { - optional string cluster_and_gpu = 1; // required, format: / - optional uint32 max = 2; // required -} - -message MinProver { - optional string namespace = 1; // required - optional uint32 min = 2; // required -} - -message MaxReplica { - optional string cluster = 1; // required - optional uint64 max = 2; // required -} - -message ScalerTarget { - optional string queue_report_field = 1; // required - optional string deployment = 5; // required - repeated MaxReplica max_replicas = 3; // required at least one - optional uint64 speed = 4; // optional - reserved 2; reserved "pod_name_prefix"; -} - -message ProverAutoscalerScalerConfig { - optional uint32 prometheus_port = 1; // required - optional std.Duration scaler_run_interval = 2; // optional - optional string prover_job_monitor_url = 3; // required - repeated string agents = 4; // required at least one - repeated ProtocolVersion protocol_versions = 5; // required at least one - repeated ClusterPriority cluster_priorities = 6; // optional - repeated ProverSpeed prover_speed = 7; // optional - optional uint32 long_pending_duration_s = 8; // optional - repeated MaxProver max_provers = 9; // optional - repeated MinProver min_provers = 10; // optional - repeated ScalerTarget scaler_targets = 11; // optional - optional bool dry_run = 12; // optional -} diff --git a/core/lib/protobuf_config/src/prover_autoscaler.rs b/core/lib/protobuf_config/src/prover_autoscaler.rs deleted file mode 100644 index 6b67d9f620ff..000000000000 --- a/core/lib/protobuf_config/src/prover_autoscaler.rs +++ /dev/null @@ -1,301 +0,0 @@ -use std::collections::HashMap; - -use anyhow::Context; -use time::Duration; -use zksync_config::configs::{self, prover_autoscaler::Gpu}; -use zksync_protobuf::{read_optional, repr::ProtoRepr, required, ProtoFmt}; - -use crate::{proto::prover_autoscaler as proto, read_optional_repr}; - -impl ProtoRepr for proto::ProverAutoscalerConfig { - type Type = configs::prover_autoscaler::ProverAutoscalerConfig; - fn read(&self) -> anyhow::Result { - Ok(Self::Type { - graceful_shutdown_timeout: read_optional(&self.graceful_shutdown_timeout) - .context("graceful_shutdown_timeout")? - .unwrap_or(Self::Type::default_graceful_shutdown_timeout()), - agent_config: read_optional_repr(&self.agent_config), - scaler_config: read_optional_repr(&self.scaler_config), - observability: read_optional_repr(&self.observability), - }) - } - - fn build(this: &Self::Type) -> Self { - Self { - graceful_shutdown_timeout: Some(ProtoFmt::build(&this.graceful_shutdown_timeout)), - agent_config: this.agent_config.as_ref().map(ProtoRepr::build), - scaler_config: this.scaler_config.as_ref().map(ProtoRepr::build), - observability: this.observability.as_ref().map(ProtoRepr::build), - } - } -} - -impl ProtoRepr for proto::ProverAutoscalerAgentConfig { - type Type = configs::prover_autoscaler::ProverAutoscalerAgentConfig; - fn read(&self) -> anyhow::Result { - Ok(Self::Type { - prometheus_port: required(&self.prometheus_port) - .and_then(|x| Ok((*x).try_into()?)) - .context("prometheus_port")?, - http_port: required(&self.http_port) - .and_then(|x| Ok((*x).try_into()?)) - .context("http_port")?, - namespaces: self.namespaces.to_vec(), - cluster_name: Some("".to_string()), - dry_run: self.dry_run.unwrap_or(Self::Type::default_dry_run()), - }) - } - - fn build(this: &Self::Type) -> Self { - Self { - prometheus_port: Some(this.prometheus_port.into()), - http_port: Some(this.http_port.into()), - namespaces: this.namespaces.clone(), - cluster_name: this.cluster_name.clone(), - dry_run: Some(this.dry_run), - } - } -} - -impl ProtoRepr for proto::ProverAutoscalerScalerConfig { - type Type = configs::prover_autoscaler::ProverAutoscalerScalerConfig; - fn read(&self) -> anyhow::Result { - Ok(Self::Type { - prometheus_port: required(&self.prometheus_port) - .and_then(|x| Ok((*x).try_into()?)) - .context("prometheus_port")?, - scaler_run_interval: read_optional(&self.scaler_run_interval) - .context("scaler_run_interval")? - .unwrap_or(Self::Type::default_scaler_run_interval()), - prover_job_monitor_url: required(&self.prover_job_monitor_url) - .context("prover_job_monitor_url")? - .clone(), - agents: self.agents.to_vec(), - protocol_versions: self - .protocol_versions - .iter() - .enumerate() - .map(|(i, e)| e.read().context(i)) - .collect::>() - .context("protocol_versions")?, - cluster_priorities: self - .cluster_priorities - .iter() - .enumerate() - .map(|(i, e)| e.read().context(i)) - .collect::>() - .context("cluster_priorities")?, - prover_speed: self - .prover_speed - .iter() - .enumerate() - .map(|(i, e)| e.read().context(i)) - .collect::>() - .context("prover_speed")?, - long_pending_duration: match self.long_pending_duration_s { - Some(s) => Duration::seconds(s.into()), - None => Self::Type::default_long_pending_duration(), - }, - max_provers: self.max_provers.iter().fold(HashMap::new(), |mut acc, e| { - let (cluster_and_gpu, max) = e.read().expect("max_provers"); - if let Some((cluster, gpu)) = cluster_and_gpu.split_once('/') { - acc.entry(cluster.to_string()) - .or_default() - .insert(gpu.parse().expect("max_provers/gpu"), max); - } - acc - }), - min_provers: self - .min_provers - .iter() - .enumerate() - .map(|(i, e)| e.read().context(i)) - .collect::>() - .context("min_provers")?, - scaler_targets: self - .scaler_targets - .iter() - .enumerate() - .map(|(i, x)| x.read().context(i).unwrap()) - .collect::>(), - dry_run: self.dry_run.unwrap_or_default(), - }) - } - - fn build(this: &Self::Type) -> Self { - Self { - prometheus_port: Some(this.prometheus_port.into()), - scaler_run_interval: Some(ProtoFmt::build(&this.scaler_run_interval)), - prover_job_monitor_url: Some(this.prover_job_monitor_url.clone()), - agents: this.agents.clone(), - protocol_versions: this - .protocol_versions - .iter() - .map(|(k, v)| proto::ProtocolVersion::build(&(k.clone(), v.clone()))) - .collect(), - cluster_priorities: this - .cluster_priorities - .iter() - .map(|(k, v)| proto::ClusterPriority::build(&(k.clone(), *v))) - .collect(), - prover_speed: this - .prover_speed - .iter() - .map(|(k, v)| proto::ProverSpeed::build(&(*k, *v))) - .collect(), - long_pending_duration_s: Some(this.long_pending_duration.whole_seconds() as u32), - max_provers: this - .max_provers - .iter() - .flat_map(|(cluster, inner_map)| { - inner_map.iter().map(move |(gpu, max)| { - proto::MaxProver::build(&(format!("{}/{}", cluster, gpu), *max)) - }) - }) - .collect(), - min_provers: this - .min_provers - .iter() - .map(|(k, v)| proto::MinProver::build(&(k.clone(), *v))) - .collect(), - scaler_targets: this.scaler_targets.iter().map(ProtoRepr::build).collect(), - dry_run: Some(this.dry_run), - } - } -} - -impl ProtoRepr for proto::ProtocolVersion { - type Type = (String, String); - fn read(&self) -> anyhow::Result { - Ok(( - required(&self.namespace).context("namespace")?.clone(), - required(&self.protocol_version) - .context("protocol_version")? - .clone(), - )) - } - fn build(this: &Self::Type) -> Self { - Self { - namespace: Some(this.0.clone()), - protocol_version: Some(this.1.clone()), - } - } -} - -impl ProtoRepr for proto::ClusterPriority { - type Type = (String, u32); - fn read(&self) -> anyhow::Result { - Ok(( - required(&self.cluster).context("cluster")?.clone(), - *required(&self.priority).context("priority")?, - )) - } - fn build(this: &Self::Type) -> Self { - Self { - cluster: Some(this.0.clone()), - priority: Some(this.1), - } - } -} - -impl ProtoRepr for proto::ProverSpeed { - type Type = (Gpu, u32); - fn read(&self) -> anyhow::Result { - Ok(( - required(&self.gpu).context("gpu")?.parse()?, - *required(&self.speed).context("speed")?, - )) - } - fn build(this: &Self::Type) -> Self { - Self { - gpu: Some(this.0.to_string()), - speed: Some(this.1), - } - } -} - -impl ProtoRepr for proto::MaxProver { - type Type = (String, u32); - fn read(&self) -> anyhow::Result { - Ok(( - required(&self.cluster_and_gpu) - .context("cluster_and_gpu")? - .parse()?, - *required(&self.max).context("max")?, - )) - } - fn build(this: &Self::Type) -> Self { - Self { - cluster_and_gpu: Some(this.0.to_string()), - max: Some(this.1), - } - } -} - -impl ProtoRepr for proto::MinProver { - type Type = (String, u32); - fn read(&self) -> anyhow::Result { - Ok(( - required(&self.namespace).context("namespace")?.clone(), - *required(&self.min).context("min")?, - )) - } - fn build(this: &Self::Type) -> Self { - Self { - namespace: Some(this.0.to_string()), - min: Some(this.1), - } - } -} - -impl ProtoRepr for proto::MaxReplica { - type Type = (String, usize); - fn read(&self) -> anyhow::Result { - Ok(( - required(&self.cluster).context("cluster")?.parse()?, - *required(&self.max).context("max")? as usize, - )) - } - fn build(this: &Self::Type) -> Self { - Self { - cluster: Some(this.0.to_string()), - max: Some(this.1 as u64), - } - } -} - -impl ProtoRepr for proto::ScalerTarget { - type Type = configs::prover_autoscaler::ScalerTarget; - fn read(&self) -> anyhow::Result { - Ok(Self::Type { - queue_report_field: required(&self.queue_report_field) - .and_then(|x| Ok((*x).parse()?)) - .context("queue_report_field")?, - deployment: required(&self.deployment).context("deployment")?.clone(), - max_replicas: self - .max_replicas - .iter() - .enumerate() - .map(|(i, e)| e.read().context(i)) - .collect::>() - .context("max_replicas")?, - speed: match self.speed { - Some(x) => x as usize, - None => Self::Type::default_speed(), - }, - }) - } - - fn build(this: &Self::Type) -> Self { - Self { - queue_report_field: Some(this.queue_report_field.to_string()), - deployment: Some(this.deployment.clone()), - max_replicas: this - .max_replicas - .iter() - .map(|(k, v)| proto::MaxReplica::build(&(k.clone(), *v))) - .collect(), - speed: Some(this.speed as u64), - } - } -} diff --git a/prover/Cargo.lock b/prover/Cargo.lock index 809ed9b3a66d..4c30f9d1aa5c 100644 --- a/prover/Cargo.lock +++ b/prover/Cargo.lock @@ -2616,6 +2616,16 @@ version = "2.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9a3a5bfb195931eeb336b2a7b4d761daec841b97f947d34394601737a7bba5e4" +[[package]] +name = "humantime-serde" +version = "1.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "57a3db5ea5923d99402c94e9feb261dc5ee9b4efa158b0315f788cf549cc200c" +dependencies = [ + "humantime", + "serde", +] + [[package]] name = "hyper" version = "0.14.29" @@ -7887,12 +7897,7 @@ dependencies = [ "rand 0.8.5", "secrecy 0.8.0", "serde", - "strum", - "strum_macros", - "time", "tracing", - "url", - "vise", "zksync_basic_types", "zksync_concurrency", "zksync_consensus_utils", @@ -8341,7 +8346,6 @@ dependencies = [ "secrecy 0.8.0", "serde_json", "serde_yaml", - "time", "tracing", "zksync_basic_types", "zksync_config", @@ -8358,10 +8362,10 @@ dependencies = [ "async-trait", "axum", "chrono", - "clap 4.5.4", "ctrlc", "debug-map-sorted", "futures 0.3.30", + "humantime-serde", "k8s-openapi", "kube", "once_cell", @@ -8371,9 +8375,10 @@ dependencies = [ "rustls", "serde", "serde_json", + "serde_yaml", "structopt", "strum", - "time", + "strum_macros", "tokio", "tracing", "tracing-subscriber", @@ -8382,7 +8387,6 @@ dependencies = [ "vise", "zksync_config", "zksync_core_leftovers", - "zksync_protobuf_config", "zksync_prover_job_monitor", "zksync_types", "zksync_utils", diff --git a/prover/Cargo.toml b/prover/Cargo.toml index 32c3185f64c3..e53efaae1968 100644 --- a/prover/Cargo.toml +++ b/prover/Cargo.toml @@ -28,6 +28,8 @@ debug-map-sorted = "0.1.1" dialoguer = "0.11" futures = "0.3" hex = "0.4" +humantime = "2.1" +humantime-serde = "1.1" indicatif = "0.16" itertools = "0.10.5" jemallocator = "0.5" @@ -47,12 +49,13 @@ rustls = { version = "0.23.12", features = ["ring"] } serde = "1.0" serde_derive = "1.0" serde_json = "1.0" +serde_yaml = "0.9" sha3 = "0.10.8" sqlx = { version = "0.8.1", default-features = false } structopt = "0.3.26" strum = { version = "0.26" } +strum_macros = "0.26" tempfile = "3" -time = "0.3.36" tokio = "1" tokio-util = "0.7.11" toml_edit = "0.14.4" diff --git a/prover/crates/bin/prover_autoscaler/Cargo.toml b/prover/crates/bin/prover_autoscaler/Cargo.toml index fbf3ecae9098..88569aa87e94 100644 --- a/prover/crates/bin/prover_autoscaler/Cargo.toml +++ b/prover/crates/bin/prover_autoscaler/Cargo.toml @@ -16,16 +16,15 @@ zksync_utils.workspace = true zksync_types.workspace = true zksync_config = { workspace = true, features = ["observability_ext"] } zksync_prover_job_monitor.workspace = true -zksync_protobuf_config.workspace = true debug-map-sorted.workspace = true anyhow.workspace = true async-trait.workspace = true axum.workspace = true chrono.workspace = true -clap = { workspace = true, features = ["derive"] } ctrlc = { workspace = true, features = ["termination"] } futures.workspace = true +humantime-serde.workspace = true k8s-openapi = { workspace = true, features = ["v1_30"] } kube = { workspace = true, features = ["runtime", "derive"] } once_cell.workspace = true @@ -35,9 +34,10 @@ ring.workspace = true rustls = { workspace = true, features = ["ring"] } serde = { workspace = true, features = ["derive"] } serde_json.workspace = true +serde_yaml.workspace = true structopt.workspace = true strum.workspace = true -time.workspace = true +strum_macros.workspace = true tokio = { workspace = true, features = ["time", "macros"] } tracing-subscriber = { workspace = true, features = ["env-filter"] } tracing.workspace = true diff --git a/core/lib/config/src/configs/prover_autoscaler.rs b/prover/crates/bin/prover_autoscaler/src/config.rs similarity index 86% rename from core/lib/config/src/configs/prover_autoscaler.rs rename to prover/crates/bin/prover_autoscaler/src/config.rs index 4191208b96e3..c34cff15951a 100644 --- a/core/lib/config/src/configs/prover_autoscaler.rs +++ b/prover/crates/bin/prover_autoscaler/src/config.rs @@ -1,18 +1,17 @@ -use std::collections::HashMap; +use std::{collections::HashMap, path::PathBuf, time::Duration}; +use anyhow::Context; use serde::Deserialize; use strum::Display; use strum_macros::EnumString; -use time::Duration; use vise::EncodeLabelValue; - -use crate::configs::ObservabilityConfig; +use zksync_config::configs::ObservabilityConfig; /// Config used for running ProverAutoscaler (both Scaler and Agent). -#[derive(Debug, Clone, PartialEq)] +#[derive(Debug, Clone, PartialEq, Deserialize)] pub struct ProverAutoscalerConfig { /// Amount of time ProverJobMonitor will wait all it's tasks to finish. - // TODO: find a way to use #[serde(with = "humantime_serde")] with time::Duration. + #[serde(with = "humantime_serde")] pub graceful_shutdown_timeout: Duration, pub agent_config: Option, pub scaler_config: Option, @@ -40,7 +39,10 @@ pub struct ProverAutoscalerScalerConfig { /// Port for prometheus metrics connection. pub prometheus_port: u16, /// The interval between runs for global Scaler. - #[serde(default = "ProverAutoscalerScalerConfig::default_scaler_run_interval")] + #[serde( + with = "humantime_serde", + default = "ProverAutoscalerScalerConfig::default_scaler_run_interval" + )] pub scaler_run_interval: Duration, /// URL to get queue reports from. /// In production should be "http://prover-job-monitor.stage2.svc.cluster.local:3074/queue_report". @@ -59,7 +61,10 @@ pub struct ProverAutoscalerScalerConfig { /// Minimum number of provers per namespace. pub min_provers: HashMap, /// Duration after which pending pod considered long pending. - #[serde(default = "ProverAutoscalerScalerConfig::default_long_pending_duration")] + #[serde( + with = "humantime_serde", + default = "ProverAutoscalerScalerConfig::default_long_pending_duration" + )] pub long_pending_duration: Duration, /// List of simple autoscaler targets. pub scaler_targets: Vec, @@ -136,7 +141,7 @@ pub struct ScalerTarget { impl ProverAutoscalerConfig { /// Default graceful shutdown timeout -- 5 seconds pub fn default_graceful_shutdown_timeout() -> Duration { - Duration::seconds(5) + Duration::from_secs(5) } } @@ -153,7 +158,7 @@ impl ProverAutoscalerAgentConfig { impl ProverAutoscalerScalerConfig { /// Default scaler_run_interval -- 10s pub fn default_scaler_run_interval() -> Duration { - Duration::seconds(10) + Duration::from_secs(10) } /// Default prover_job_monitor_url -- cluster local URL @@ -163,7 +168,7 @@ impl ProverAutoscalerScalerConfig { /// Default long_pending_duration -- 10m pub fn default_long_pending_duration() -> Duration { - Duration::minutes(10) + Duration::from_secs(600) } } @@ -172,3 +177,9 @@ impl ScalerTarget { 1 } } + +pub fn config_from_yaml(path: &PathBuf) -> anyhow::Result { + let yaml = std::fs::read_to_string(path) + .with_context(|| format!("failed to read {}", path.display()))?; + Ok(serde_yaml::from_str(&yaml)?) +} diff --git a/prover/crates/bin/prover_autoscaler/src/global/queuer.rs b/prover/crates/bin/prover_autoscaler/src/global/queuer.rs index 7255f479647e..baeb5b70a4ef 100644 --- a/prover/crates/bin/prover_autoscaler/src/global/queuer.rs +++ b/prover/crates/bin/prover_autoscaler/src/global/queuer.rs @@ -2,11 +2,13 @@ use std::{collections::HashMap, ops::Deref}; use anyhow::{Context, Ok}; use reqwest::Method; -use zksync_config::configs::prover_autoscaler::QueueReportFields; use zksync_prover_job_monitor::autoscaler_queue_reporter::{QueueReport, VersionedQueueReport}; use zksync_utils::http_with_retries::send_request_with_retries; -use crate::metrics::{AUTOSCALER_METRICS, DEFAULT_ERROR_CODE}; +use crate::{ + config::QueueReportFields, + metrics::{AUTOSCALER_METRICS, DEFAULT_ERROR_CODE}, +}; const MAX_RETRIES: usize = 5; diff --git a/prover/crates/bin/prover_autoscaler/src/global/scaler.rs b/prover/crates/bin/prover_autoscaler/src/global/scaler.rs index dc652999da5f..829b95dd7514 100644 --- a/prover/crates/bin/prover_autoscaler/src/global/scaler.rs +++ b/prover/crates/bin/prover_autoscaler/src/global/scaler.rs @@ -4,14 +4,12 @@ use chrono::Utc; use debug_map_sorted::SortedOutputExt; use once_cell::sync::Lazy; use regex::Regex; -use zksync_config::configs::prover_autoscaler::{ - Gpu, ProverAutoscalerScalerConfig, QueueReportFields, ScalerTarget, -}; use super::{queuer, watcher}; use crate::{ agent::{ScaleDeploymentRequest, ScaleRequest}, cluster_types::{Cluster, Clusters, Pod, PodStatus}, + config::{Gpu, ProverAutoscalerScalerConfig, QueueReportFields, ScalerTarget}, metrics::AUTOSCALER_METRICS, task_wiring::Task, }; @@ -128,7 +126,7 @@ impl Scaler { simple_scalers.push(SimpleScaler::new( c, config.cluster_priorities.clone(), - chrono::Duration::seconds(config.long_pending_duration.whole_seconds()), + chrono::Duration::seconds(config.long_pending_duration.as_secs() as i64), )) } Self { @@ -150,7 +148,7 @@ impl GpuScaler { max_provers: config.max_provers, prover_speed: config.prover_speed, long_pending_duration: chrono::Duration::seconds( - config.long_pending_duration.whole_seconds(), + config.long_pending_duration.as_secs() as i64, ), } } diff --git a/prover/crates/bin/prover_autoscaler/src/lib.rs b/prover/crates/bin/prover_autoscaler/src/lib.rs index 0b0d704c9078..019fe2b7fb4d 100644 --- a/prover/crates/bin/prover_autoscaler/src/lib.rs +++ b/prover/crates/bin/prover_autoscaler/src/lib.rs @@ -1,5 +1,6 @@ pub mod agent; pub(crate) mod cluster_types; +pub mod config; pub mod global; pub mod k8s; pub(crate) mod metrics; diff --git a/prover/crates/bin/prover_autoscaler/src/main.rs b/prover/crates/bin/prover_autoscaler/src/main.rs index ac5121dccd9c..b153cb9c4d59 100644 --- a/prover/crates/bin/prover_autoscaler/src/main.rs +++ b/prover/crates/bin/prover_autoscaler/src/main.rs @@ -6,10 +6,9 @@ use tokio::{ sync::{oneshot, watch}, task::JoinHandle, }; -use zksync_core_leftovers::temp_config_store::read_yaml_repr; -use zksync_protobuf_config::proto::prover_autoscaler; use zksync_prover_autoscaler::{ agent, + config::{config_from_yaml, ProverAutoscalerConfig}, global::{self}, k8s::{Scaler, Watcher}, task_wiring::TaskRunner, @@ -56,15 +55,11 @@ struct Opt { async fn main() -> anyhow::Result<()> { let opt = Opt::from_args(); let general_config = - read_yaml_repr::(&opt.config_path) - .context("general config")?; + config_from_yaml::(&opt.config_path).context("general config")?; let observability_config = general_config .observability .context("observability config")?; let _observability_guard = observability_config.install()?; - // That's unfortunate that there are at least 3 different Duration in rust and we use all 3 in this repo. - // TODO: Consider updating zksync_protobuf to support std::time::Duration. - let graceful_shutdown_timeout = general_config.graceful_shutdown_timeout.unsigned_abs(); let (stop_signal_sender, stop_signal_receiver) = oneshot::channel(); let mut stop_signal_sender = Some(stop_signal_sender); @@ -77,9 +72,6 @@ async fn main() -> anyhow::Result<()> { let (stop_sender, stop_receiver) = watch::channel(false); - let _ = rustls::crypto::ring::default_provider().install_default(); - let client = kube::Client::try_default().await?; - let mut tasks = vec![]; match opt.job { @@ -92,6 +84,9 @@ async fn main() -> anyhow::Result<()> { let exporter_config = PrometheusExporterConfig::pull(agent_config.prometheus_port); tasks.push(tokio::spawn(exporter_config.run(stop_receiver.clone()))); + let _ = rustls::crypto::ring::default_provider().install_default(); + let client = kube::Client::try_default().await?; + // TODO: maybe get cluster name from curl -H "Metadata-Flavor: Google" // http://metadata.google.internal/computeMetadata/v1/instance/attributes/cluster-name let watcher = Watcher::new(client.clone(), cluster, agent_config.namespaces); @@ -107,7 +102,7 @@ async fn main() -> anyhow::Result<()> { AutoscalerType::Scaler => { tracing::info!("Starting ProverAutoscaler Scaler"); let scaler_config = general_config.scaler_config.context("scaler_config")?; - let interval = scaler_config.scaler_run_interval.unsigned_abs(); + let interval = scaler_config.scaler_run_interval; let exporter_config = PrometheusExporterConfig::pull(scaler_config.prometheus_port); tasks.push(tokio::spawn(exporter_config.run(stop_receiver.clone()))); let watcher = @@ -127,7 +122,9 @@ async fn main() -> anyhow::Result<()> { } } stop_sender.send(true).ok(); - tasks.complete(graceful_shutdown_timeout).await; + tasks + .complete(general_config.graceful_shutdown_timeout) + .await; Ok(()) } diff --git a/prover/crates/bin/prover_autoscaler/src/metrics.rs b/prover/crates/bin/prover_autoscaler/src/metrics.rs index 39860a9e8f09..115ae3b74259 100644 --- a/prover/crates/bin/prover_autoscaler/src/metrics.rs +++ b/prover/crates/bin/prover_autoscaler/src/metrics.rs @@ -1,5 +1,6 @@ use vise::{Counter, Gauge, LabeledFamily, Metrics}; -use zksync_config::configs::prover_autoscaler::Gpu; + +use crate::config::Gpu; pub const DEFAULT_ERROR_CODE: u16 = 500;