diff --git a/Cargo.lock b/Cargo.lock index 5073188d6321..55bbb4b55828 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -9666,8 +9666,12 @@ dependencies = [ "secrecy", "serde", "serde_json", + "strum", + "strum_macros", + "time", "tracing", "url", + "vise", "zksync_basic_types", "zksync_concurrency", "zksync_consensus_utils", @@ -10899,6 +10903,7 @@ dependencies = [ "secrecy", "serde_json", "serde_yaml", + "time", "tracing", "zksync_basic_types", "zksync_config", diff --git a/Cargo.toml b/Cargo.toml index 691341f71ba7..5d516e97abaf 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,85 +1,85 @@ [workspace] members = [ - # Binaries - "core/bin/block_reverter", - "core/bin/contract-verifier", - "core/bin/external_node", - "core/bin/merkle_tree_consistency_checker", - "core/bin/snapshots_creator", - "core/bin/selector_generator", - "core/bin/system-constants-generator", - "core/bin/verified_sources_fetcher", - "core/bin/zksync_server", - "core/bin/genesis_generator", - "core/bin/zksync_tee_prover", - # Node services - "core/node/node_framework", - "core/node/proof_data_handler", - "core/node/block_reverter", - "core/node/commitment_generator", - "core/node/house_keeper", - "core/node/genesis", - "core/node/shared_metrics", - "core/node/db_pruner", - "core/node/fee_model", - "core/node/da_dispatcher", - "core/node/eth_sender", - "core/node/vm_runner", - "core/node/test_utils", - "core/node/state_keeper", - "core/node/reorg_detector", - "core/node/consistency_checker", - "core/node/metadata_calculator", - "core/node/node_sync", - "core/node/node_storage_init", - "core/node/consensus", - "core/node/contract_verification_server", - "core/node/api_server", - "core/node/tee_verifier_input_producer", - "core/node/base_token_adjuster", - "core/node/external_proof_integration_api", - "core/node/logs_bloom_backfill", - "core/node/da_clients", - # Libraries - "core/lib/db_connection", - "core/lib/zksync_core_leftovers", - "core/lib/basic_types", - "core/lib/config", - "core/lib/constants", - "core/lib/contract_verifier", - "core/lib/contracts", - "core/lib/circuit_breaker", - "core/lib/dal", - "core/lib/env_config", - "core/lib/da_client", - "core/lib/eth_client", - "core/lib/eth_signer", - "core/lib/l1_contract_interface", - "core/lib/mempool", - "core/lib/merkle_tree", - "core/lib/mini_merkle_tree", - "core/lib/node_framework_derive", - "core/lib/object_store", - "core/lib/prover_interface", - "core/lib/queued_job_processor", - "core/lib/state", - "core/lib/storage", - "core/lib/tee_verifier", - "core/lib/types", - "core/lib/protobuf_config", - "core/lib/utils", - "core/lib/vlog", - "core/lib/multivm", - "core/lib/vm_interface", - "core/lib/vm_executor", - "core/lib/web3_decl", - "core/lib/snapshots_applier", - "core/lib/crypto_primitives", - "core/lib/external_price_api", - # Test infrastructure - "core/tests/test_account", - "core/tests/loadnext", - "core/tests/vm-benchmark", + # Binaries + "core/bin/block_reverter", + "core/bin/contract-verifier", + "core/bin/external_node", + "core/bin/merkle_tree_consistency_checker", + "core/bin/snapshots_creator", + "core/bin/selector_generator", + "core/bin/system-constants-generator", + "core/bin/verified_sources_fetcher", + "core/bin/zksync_server", + "core/bin/genesis_generator", + "core/bin/zksync_tee_prover", + # Node services + "core/node/node_framework", + "core/node/proof_data_handler", + "core/node/block_reverter", + "core/node/commitment_generator", + "core/node/house_keeper", + "core/node/genesis", + "core/node/shared_metrics", + "core/node/db_pruner", + "core/node/fee_model", + "core/node/da_dispatcher", + "core/node/eth_sender", + "core/node/vm_runner", + "core/node/test_utils", + "core/node/state_keeper", + "core/node/reorg_detector", + "core/node/consistency_checker", + "core/node/metadata_calculator", + "core/node/node_sync", + "core/node/node_storage_init", + "core/node/consensus", + "core/node/contract_verification_server", + "core/node/api_server", + "core/node/tee_verifier_input_producer", + "core/node/base_token_adjuster", + "core/node/external_proof_integration_api", + "core/node/logs_bloom_backfill", + "core/node/da_clients", + # Libraries + "core/lib/db_connection", + "core/lib/zksync_core_leftovers", + "core/lib/basic_types", + "core/lib/config", + "core/lib/constants", + "core/lib/contract_verifier", + "core/lib/contracts", + "core/lib/circuit_breaker", + "core/lib/dal", + "core/lib/env_config", + "core/lib/da_client", + "core/lib/eth_client", + "core/lib/eth_signer", + "core/lib/l1_contract_interface", + "core/lib/mempool", + "core/lib/merkle_tree", + "core/lib/mini_merkle_tree", + "core/lib/node_framework_derive", + "core/lib/object_store", + "core/lib/prover_interface", + "core/lib/queued_job_processor", + "core/lib/state", + "core/lib/storage", + "core/lib/tee_verifier", + "core/lib/types", + "core/lib/protobuf_config", + "core/lib/utils", + "core/lib/vlog", + "core/lib/multivm", + "core/lib/vm_interface", + "core/lib/vm_executor", + "core/lib/web3_decl", + "core/lib/snapshots_applier", + "core/lib/crypto_primitives", + "core/lib/external_price_api", + # Test infrastructure + "core/tests/test_account", + "core/tests/loadnext", + "core/tests/vm-benchmark", ] resolver = "2" @@ -172,6 +172,7 @@ sqlx = "0.8.1" static_assertions = "1.1" structopt = "0.3.20" strum = "0.26" +strum_macros = "0.26.4" tempfile = "3.0.2" test-casing = "0.1.2" test-log = "0.2.15" @@ -185,7 +186,7 @@ tower-http = "0.5.2" tracing = "0.1" tracing-subscriber = "0.3" tracing-opentelemetry = "0.25.0" -time = "0.3.36" # Has to be same as used by `tracing-subscriber` +time = "0.3.36" # Has to be same as used by `tracing-subscriber` url = "2" web3 = "0.19.0" fraction = "0.15.3" diff --git a/core/lib/config/Cargo.toml b/core/lib/config/Cargo.toml index d1ab5ce8438f..af39e5159ba8 100644 --- a/core/lib/config/Cargo.toml +++ b/core/lib/config/Cargo.toml @@ -23,6 +23,10 @@ anyhow.workspace = true rand.workspace = true secrecy.workspace = true serde = { workspace = true, features = ["derive"] } +time = { workspace = true, features = ["serde-human-readable"] } +strum.workspace = true +strum_macros.workspace = true +vise.workspace = true [dev-dependencies] serde_json.workspace = true diff --git a/core/lib/config/src/configs/mod.rs b/core/lib/config/src/configs/mod.rs index 1ad503e0687f..a8d136d632ea 100644 --- a/core/lib/config/src/configs/mod.rs +++ b/core/lib/config/src/configs/mod.rs @@ -60,6 +60,7 @@ pub mod house_keeper; pub mod object_store; pub mod observability; pub mod proof_data_handler; +pub mod prover_autoscaler; pub mod prover_job_monitor; pub mod pruning; pub mod secrets; diff --git a/core/lib/config/src/configs/prover_autoscaler.rs b/core/lib/config/src/configs/prover_autoscaler.rs new file mode 100644 index 000000000000..41131fc1b8c7 --- /dev/null +++ b/core/lib/config/src/configs/prover_autoscaler.rs @@ -0,0 +1,117 @@ +use std::collections::HashMap; + +use serde::Deserialize; +use strum::Display; +use strum_macros::EnumString; +use time::Duration; +use vise::EncodeLabelValue; + +use crate::configs::ObservabilityConfig; + +/// Config used for running ProverAutoscaler (both Scaler and Agent). +#[derive(Debug, Clone, PartialEq)] +pub struct ProverAutoscalerConfig { + /// Amount of time ProverJobMonitor will wait all it's tasks to finish. + // TODO: find a way to use #[serde(with = "humantime_serde")] with time::Duration. + pub graceful_shutdown_timeout: Duration, + pub agent_config: Option, + pub scaler_config: Option, + pub observability: Option, +} + +#[derive(Debug, Clone, PartialEq, Deserialize)] +pub struct ProverAutoscalerAgentConfig { + /// Port for prometheus metrics connection. + pub prometheus_port: u16, + /// HTTP port for global Scaler to connect to the Agent running in a cluster. + pub http_port: u16, + /// List of namespaces to watch. + #[serde(default = "ProverAutoscalerAgentConfig::default_namespaces")] + pub namespaces: Vec, + /// Watched cluster name. Also can be set via flag. + pub cluster_name: Option, +} + +#[derive(Debug, Clone, PartialEq, Deserialize, Default)] +pub struct ProverAutoscalerScalerConfig { + /// Port for prometheus metrics connection. + pub prometheus_port: u16, + /// The interval between runs for global Scaler. + #[serde(default = "ProverAutoscalerScalerConfig::default_scaler_run_interval")] + pub scaler_run_interval: Duration, + /// URL to get queue reports from. + /// In production should be "http://prover-job-monitor.stage2.svc.cluster.local:3074/queue_report". + #[serde(default = "ProverAutoscalerScalerConfig::default_prover_job_monitor_url")] + pub prover_job_monitor_url: String, + /// List of ProverAutoscaler Agents to get cluster data from. + pub agents: Vec, + /// Mapping of namespaces to protocol versions. + pub protocol_versions: HashMap, + /// Default priorities, which cluster to prefer when there is no other information. + pub cluster_priorities: HashMap, + /// Prover speed per GPU. Used to calculate desired number of provers for queue size. + pub prover_speed: HashMap, + /// Duration after which pending pod considered long pending. + #[serde(default = "ProverAutoscalerScalerConfig::default_long_pending_duration")] + pub long_pending_duration: Duration, +} + +#[derive( + Default, + Debug, + Display, + Hash, + PartialEq, + Eq, + Clone, + Copy, + Ord, + PartialOrd, + EnumString, + EncodeLabelValue, + Deserialize, +)] +pub enum Gpu { + #[default] + Unknown, + #[strum(ascii_case_insensitive)] + L4, + #[strum(ascii_case_insensitive)] + T4, + #[strum(ascii_case_insensitive)] + V100, + #[strum(ascii_case_insensitive)] + P100, + #[strum(ascii_case_insensitive)] + A100, +} + +impl ProverAutoscalerConfig { + /// Default graceful shutdown timeout -- 5 seconds + pub fn default_graceful_shutdown_timeout() -> Duration { + Duration::seconds(5) + } +} + +impl ProverAutoscalerAgentConfig { + pub fn default_namespaces() -> Vec { + vec!["prover-blue".to_string(), "prover-red".to_string()] + } +} + +impl ProverAutoscalerScalerConfig { + /// Default scaler_run_interval -- 10s + pub fn default_scaler_run_interval() -> Duration { + Duration::seconds(10) + } + + /// Default prover_job_monitor_url -- cluster local URL + pub fn default_prover_job_monitor_url() -> String { + "http://localhost:3074/queue_report".to_string() + } + + /// Default long_pending_duration -- 10m + pub fn default_long_pending_duration() -> Duration { + Duration::minutes(10) + } +} diff --git a/core/lib/protobuf_config/Cargo.toml b/core/lib/protobuf_config/Cargo.toml index 92d9bd53978c..87a0a63567ba 100644 --- a/core/lib/protobuf_config/Cargo.toml +++ b/core/lib/protobuf_config/Cargo.toml @@ -26,6 +26,7 @@ rand.workspace = true hex.workspace = true secrecy.workspace = true tracing.workspace = true +time.workspace = true [build-dependencies] zksync_protobuf_build.workspace = true diff --git a/core/lib/protobuf_config/src/lib.rs b/core/lib/protobuf_config/src/lib.rs index c89199359aaa..7bbe955561b2 100644 --- a/core/lib/protobuf_config/src/lib.rs +++ b/core/lib/protobuf_config/src/lib.rs @@ -28,6 +28,7 @@ mod observability; mod proof_data_handler; pub mod proto; mod prover; +mod prover_autoscaler; mod prover_job_monitor; mod pruning; mod secrets; diff --git a/core/lib/protobuf_config/src/proto/config/prover_autoscaler.proto b/core/lib/protobuf_config/src/proto/config/prover_autoscaler.proto new file mode 100644 index 000000000000..e1d11b94d8f1 --- /dev/null +++ b/core/lib/protobuf_config/src/proto/config/prover_autoscaler.proto @@ -0,0 +1,46 @@ +syntax = "proto3"; + +package zksync.config.prover_autoscaler; + +import "zksync/std.proto"; +import "zksync/config/observability.proto"; + +message ProverAutoscalerConfig { + optional std.Duration graceful_shutdown_timeout = 1; // optional + optional ProverAutoscalerAgentConfig agent_config = 2; // optional + optional ProverAutoscalerScalerConfig scaler_config = 3; // optional + optional observability.Observability observability = 4; // optional +} + +message ProverAutoscalerAgentConfig { + optional uint32 prometheus_port = 1; // required + optional uint32 http_port = 2; // required + repeated string namespaces = 3; // optional + optional string cluster_name = 4; // optional +} + +message ProtocolVersion { + optional string namespace = 1; // required + optional string protocol_version = 2; // required +} + +message ClusterPriority { + optional string cluster = 1; // required + optional uint32 priority = 2; // required +} + +message ProverSpeed { + optional string gpu = 1; // required + optional uint32 speed = 2; // required +} + +message ProverAutoscalerScalerConfig { + optional uint32 prometheus_port = 1; // required + optional std.Duration scaler_run_interval = 2; // optional + optional string prover_job_monitor_url = 3; // required + repeated string agents = 4; // required at least one + repeated ProtocolVersion protocol_versions = 5; // repeated at least one + repeated ClusterPriority cluster_priorities = 6; // optional + repeated ProverSpeed prover_speed = 7; // optional + optional uint32 long_pending_duration_s = 8; // optional +} diff --git a/core/lib/protobuf_config/src/prover_autoscaler.rs b/core/lib/protobuf_config/src/prover_autoscaler.rs new file mode 100644 index 000000000000..f7da099cb829 --- /dev/null +++ b/core/lib/protobuf_config/src/prover_autoscaler.rs @@ -0,0 +1,172 @@ +use anyhow::Context as _; +use time::Duration; +use zksync_config::configs::{self, prover_autoscaler::Gpu}; +use zksync_protobuf::{read_optional, repr::ProtoRepr, required, ProtoFmt}; + +use crate::{proto::prover_autoscaler as proto, read_optional_repr}; + +impl ProtoRepr for proto::ProverAutoscalerConfig { + type Type = configs::prover_autoscaler::ProverAutoscalerConfig; + fn read(&self) -> anyhow::Result { + Ok(Self::Type { + graceful_shutdown_timeout: read_optional(&self.graceful_shutdown_timeout) + .context("graceful_shutdown_timeout")? + .unwrap_or(Self::Type::default_graceful_shutdown_timeout()), + agent_config: read_optional_repr(&self.agent_config), + scaler_config: read_optional_repr(&self.scaler_config), + observability: read_optional_repr(&self.observability), + }) + } + + fn build(this: &Self::Type) -> Self { + Self { + graceful_shutdown_timeout: Some(ProtoFmt::build(&this.graceful_shutdown_timeout)), + agent_config: this.agent_config.as_ref().map(ProtoRepr::build), + scaler_config: this.scaler_config.as_ref().map(ProtoRepr::build), + observability: this.observability.as_ref().map(ProtoRepr::build), + } + } +} + +impl ProtoRepr for proto::ProverAutoscalerAgentConfig { + type Type = configs::prover_autoscaler::ProverAutoscalerAgentConfig; + fn read(&self) -> anyhow::Result { + Ok(Self::Type { + prometheus_port: required(&self.prometheus_port) + .and_then(|x| Ok((*x).try_into()?)) + .context("prometheus_port")?, + http_port: required(&self.http_port) + .and_then(|x| Ok((*x).try_into()?)) + .context("http_port")?, + namespaces: self.namespaces.to_vec(), + cluster_name: Some("".to_string()), + }) + } + + fn build(this: &Self::Type) -> Self { + Self { + prometheus_port: Some(this.prometheus_port.into()), + http_port: Some(this.http_port.into()), + namespaces: this.namespaces.clone(), + cluster_name: this.cluster_name.clone(), + } + } +} + +impl ProtoRepr for proto::ProverAutoscalerScalerConfig { + type Type = configs::prover_autoscaler::ProverAutoscalerScalerConfig; + fn read(&self) -> anyhow::Result { + Ok(Self::Type { + prometheus_port: required(&self.prometheus_port) + .and_then(|x| Ok((*x).try_into()?)) + .context("prometheus_port")?, + scaler_run_interval: read_optional(&self.scaler_run_interval) + .context("scaler_run_interval")? + .unwrap_or(Self::Type::default_scaler_run_interval()), + prover_job_monitor_url: required(&self.prover_job_monitor_url) + .context("prover_job_monitor_url")? + .clone(), + agents: self.agents.to_vec(), + protocol_versions: self + .protocol_versions + .iter() + .enumerate() + .map(|(i, e)| e.read().context(i)) + .collect::>() + .context("protocol_versions")?, + cluster_priorities: self + .cluster_priorities + .iter() + .enumerate() + .map(|(i, e)| e.read().context(i)) + .collect::>() + .context("cluster_priorities")?, + prover_speed: self + .prover_speed + .iter() + .enumerate() + .map(|(i, e)| e.read().context(i)) + .collect::>() + .context("prover_speed")?, + long_pending_duration: match self.long_pending_duration_s { + Some(s) => Duration::seconds(s.into()), + None => Self::Type::default_long_pending_duration(), + }, + }) + } + + fn build(this: &Self::Type) -> Self { + Self { + prometheus_port: Some(this.prometheus_port.into()), + scaler_run_interval: Some(ProtoFmt::build(&this.scaler_run_interval)), + prover_job_monitor_url: Some(this.prover_job_monitor_url.clone()), + agents: this.agents.clone(), + protocol_versions: this + .protocol_versions + .iter() + .map(|(k, v)| proto::ProtocolVersion::build(&(k.clone(), v.clone()))) + .collect(), + cluster_priorities: this + .cluster_priorities + .iter() + .map(|(k, v)| proto::ClusterPriority::build(&(k.clone(), *v))) + .collect(), + prover_speed: this + .prover_speed + .iter() + .map(|(k, v)| proto::ProverSpeed::build(&(*k, *v))) + .collect(), + long_pending_duration_s: Some(this.long_pending_duration.whole_seconds() as u32), + } + } +} + +impl ProtoRepr for proto::ProtocolVersion { + type Type = (String, String); + fn read(&self) -> anyhow::Result { + Ok(( + required(&self.namespace).context("namespace")?.clone(), + required(&self.protocol_version) + .context("protocol_version")? + .clone(), + )) + } + fn build(this: &Self::Type) -> Self { + Self { + namespace: Some(this.0.clone()), + protocol_version: Some(this.1.clone()), + } + } +} + +impl ProtoRepr for proto::ClusterPriority { + type Type = (String, u32); + fn read(&self) -> anyhow::Result { + Ok(( + required(&self.cluster).context("cluster")?.clone(), + *required(&self.priority).context("priority")?, + )) + } + fn build(this: &Self::Type) -> Self { + Self { + cluster: Some(this.0.clone()), + priority: Some(this.1), + } + } +} + +impl ProtoRepr for proto::ProverSpeed { + type Type = (Gpu, u32); + fn read(&self) -> anyhow::Result { + Ok(( + required(&self.gpu).context("gpu")?.parse()?, + *required(&self.speed).context("speed")?, + )) + } + fn build(this: &Self::Type) -> Self { + Self { + gpu: Some(this.0.to_string()), + speed: Some(this.1), + } + } +} diff --git a/prover/Cargo.lock b/prover/Cargo.lock index 22ec5c534858..c085c1b5455f 100644 --- a/prover/Cargo.lock +++ b/prover/Cargo.lock @@ -46,6 +46,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e89da841a80418a9b391ebaea17f5c112ffaaa96f621d2c285b5174da76b9011" dependencies = [ "cfg-if", + "getrandom", "once_cell", "version_check", "zerocopy", @@ -208,6 +209,18 @@ dependencies = [ "wait-timeout", ] +[[package]] +name = "async-broadcast" +version = "0.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "20cd0e2e25ea8e5f7e9df04578dc6cf5c83577fd09b1a46aaf5c85e1c33f2a7e" +dependencies = [ + "event-listener", + "event-listener-strategy", + "futures-core", + "pin-project-lite", +] + [[package]] name = "async-stream" version = "0.3.5" @@ -275,9 +288,9 @@ checksum = "0c4b4d0bd25bd0b74681c0ad21497610ce1b7c91b1022cd21c80c6fbdd9476b0" [[package]] name = "aws-lc-rs" -version = "1.8.0" +version = "1.8.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a8a47f2fb521b70c11ce7369a6c5fa4bd6af7e5d62ec06303875bafe7c6ba245" +checksum = "4ae74d9bd0a7530e8afd1770739ad34b36838829d6ad61818f9230f683f5ad77" dependencies = [ "aws-lc-sys", "mirai-annotations", @@ -287,9 +300,9 @@ dependencies = [ [[package]] name = "aws-lc-sys" -version = "0.19.0" +version = "0.20.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2927c7af777b460b7ccd95f8b67acd7b4c04ec8896bf0c8e80ba30523cffc057" +checksum = "0f0e249228c6ad2d240c2dc94b714d711629d52bad946075d8e9b2f5391f0703" dependencies = [ "bindgen 0.69.4", "cc", @@ -355,6 +368,17 @@ dependencies = [ "tracing", ] +[[package]] +name = "backoff" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b62ddb9cb1ec0a098ad4bbf9344d0713fa193ae1a80af55febcff2627b6a00c1" +dependencies = [ + "getrandom", + "instant", + "rand 0.8.5", +] + [[package]] name = "backtrace" version = "0.3.72" @@ -1331,8 +1355,18 @@ version = "0.13.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a01d95850c592940db9b8194bc39f4bc0e89dee5c4265e4b1807c34a9aba453c" dependencies = [ - "darling_core", - "darling_macro", + "darling_core 0.13.4", + "darling_macro 0.13.4", +] + +[[package]] +name = "darling" +version = "0.20.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6f63b86c8a8826a49b8c21f08a2d07338eec8d900540f8630dc76284be802989" +dependencies = [ + "darling_core 0.20.10", + "darling_macro 0.20.10", ] [[package]] @@ -1349,17 +1383,51 @@ dependencies = [ "syn 1.0.109", ] +[[package]] +name = "darling_core" +version = "0.20.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "95133861a8032aaea082871032f5815eb9e98cef03fa916ab4500513994df9e5" +dependencies = [ + "fnv", + "ident_case", + "proc-macro2 1.0.85", + "quote 1.0.36", + "strsim 0.11.1", + "syn 2.0.66", +] + [[package]] name = "darling_macro" version = "0.13.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9c972679f83bdf9c42bd905396b6c3588a843a17f0f16dfcfa3e2c5d57441835" dependencies = [ - "darling_core", + "darling_core 0.13.4", "quote 1.0.36", "syn 1.0.109", ] +[[package]] +name = "darling_macro" +version = "0.20.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d336a2a514f6ccccaa3e09b02d41d35330c07ddf03a62165fcec10bb561c7806" +dependencies = [ + "darling_core 0.20.10", + "quote 1.0.36", + "syn 2.0.66", +] + +[[package]] +name = "debug-map-sorted" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "75c7dfa83618734bf9fa07aadaa1166b634e9427bb9bc5a1c2332d04d73fb721" +dependencies = [ + "itertools 0.10.5", +] + [[package]] name = "debugid" version = "0.8.0" @@ -1503,6 +1571,12 @@ version = "1.0.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "56ce8c6da7551ec6c462cbaf3bfbc75131ebbfa1c944aeaa9dab51ca1c5f0c3b" +[[package]] +name = "dyn-clone" +version = "1.0.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0d6ef0072f8a535281e4876be788938b528e9a1d43900b82c2569af7da799125" + [[package]] name = "ecdsa" version = "0.14.8" @@ -1784,6 +1858,16 @@ dependencies = [ "pin-project-lite", ] +[[package]] +name = "event-listener-strategy" +version = "0.5.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0f214dc438f977e6d4e3500aaa277f5ad94ca83fbbd9b1a15713ce2344ccc5a1" +dependencies = [ + "event-listener", + "pin-project-lite", +] + [[package]] name = "fastrand" version = "2.1.0" @@ -1862,6 +1946,15 @@ dependencies = [ "miniz_oxide", ] +[[package]] +name = "fluent-uri" +version = "0.1.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "17c704e9dbe1ddd863da1e6ff3567795087b1eb201ce80d8fa81162e1516500d" +dependencies = [ + "bitflags 1.3.2", +] + [[package]] name = "flume" version = "0.11.0" @@ -2328,6 +2421,30 @@ dependencies = [ "hashbrown 0.14.5", ] +[[package]] +name = "headers" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "322106e6bd0cba2d5ead589ddb8150a13d7c4217cf80d7c4f682ca994ccc6aa9" +dependencies = [ + "base64 0.21.7", + "bytes", + "headers-core", + "http 1.1.0", + "httpdate", + "mime", + "sha1", +] + +[[package]] +name = "headers-core" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "54b4a22553d4242c49fddb9ba998a99962b5cc6f22cb5a3482bec22522403ce4" +dependencies = [ + "http 1.1.0", +] + [[package]] name = "heck" version = "0.3.3" @@ -2521,6 +2638,26 @@ dependencies = [ "want", ] +[[package]] +name = "hyper-http-proxy" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5d06dbdfbacf34d996c6fb540a71a684a7aae9056c71951163af8a8a4c07b9a4" +dependencies = [ + "bytes", + "futures-util", + "headers", + "http 1.1.0", + "hyper 1.3.1", + "hyper-rustls", + "hyper-util", + "pin-project-lite", + "rustls-native-certs", + "tokio", + "tokio-rustls", + "tower-service", +] + [[package]] name = "hyper-rustls" version = "0.27.2" @@ -2533,6 +2670,7 @@ dependencies = [ "hyper-util", "log", "rustls", + "rustls-native-certs", "rustls-pki-types", "tokio", "tokio-rustls", @@ -2710,6 +2848,15 @@ dependencies = [ "regex", ] +[[package]] +name = "instant" +version = "0.1.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7a5bbe824c507c5da5956355e86a746d82e0e1464f65d862cc5e71da70e94b2c" +dependencies = [ + "cfg-if", +] + [[package]] name = "ipnet" version = "2.9.0" @@ -2822,6 +2969,44 @@ dependencies = [ "wasm-bindgen", ] +[[package]] +name = "json-patch" +version = "2.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5b1fb8864823fad91877e6caea0baca82e49e8db50f8e5c9f9a453e27d3330fc" +dependencies = [ + "jsonptr", + "serde", + "serde_json", + "thiserror", +] + +[[package]] +name = "jsonpath-rust" +version = "0.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "19d8fe85bd70ff715f31ce8c739194b423d79811a19602115d611a3ec85d6200" +dependencies = [ + "lazy_static", + "once_cell", + "pest", + "pest_derive", + "regex", + "serde_json", + "thiserror", +] + +[[package]] +name = "jsonptr" +version = "0.4.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1c6e529149475ca0b2820835d3dce8fcc41c6b943ca608d32f35b449255e4627" +dependencies = [ + "fluent-uri", + "serde", + "serde_json", +] + [[package]] name = "jsonrpsee" version = "0.23.2" @@ -3006,6 +3191,19 @@ dependencies = [ "signature 2.2.0", ] +[[package]] +name = "k8s-openapi" +version = "0.23.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9c8847402328d8301354c94d605481f25a6bdc1ed65471fd96af8eca71141b13" +dependencies = [ + "base64 0.22.1", + "chrono", + "serde", + "serde-value", + "serde_json", +] + [[package]] name = "keccak" version = "0.1.5" @@ -3015,6 +3213,116 @@ dependencies = [ "cpufeatures", ] +[[package]] +name = "kube" +version = "0.95.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fa21063c854820a77c5d7f8deeb7ffa55246d8304e4bcd8cce2956752c6604f8" +dependencies = [ + "k8s-openapi", + "kube-client", + "kube-core", + "kube-derive", + "kube-runtime", +] + +[[package]] +name = "kube-client" +version = "0.95.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "31c2355f5c9d8a11900e71a6fe1e47abd5ec45bf971eb4b162ffe97b46db9bb7" +dependencies = [ + "base64 0.22.1", + "bytes", + "chrono", + "either", + "futures 0.3.30", + "home", + "http 1.1.0", + "http-body 1.0.0", + "http-body-util", + "hyper 1.3.1", + "hyper-http-proxy", + "hyper-rustls", + "hyper-timeout", + "hyper-util", + "jsonpath-rust", + "k8s-openapi", + "kube-core", + "pem", + "rustls", + "rustls-pemfile 2.1.2", + "secrecy", + "serde", + "serde_json", + "serde_yaml", + "thiserror", + "tokio", + "tokio-util", + "tower", + "tower-http", + "tracing", +] + +[[package]] +name = "kube-core" +version = "0.95.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f3030bd91c9db544a50247e7d48d7db9cf633c172732dce13351854526b1e666" +dependencies = [ + "chrono", + "form_urlencoded", + "http 1.1.0", + "json-patch", + "k8s-openapi", + "schemars", + "serde", + "serde-value", + "serde_json", + "thiserror", +] + +[[package]] +name = "kube-derive" +version = "0.95.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fa98be978eddd70a773aa8e86346075365bfb7eb48783410852dbf7cb57f0c27" +dependencies = [ + "darling 0.20.10", + "proc-macro2 1.0.85", + "quote 1.0.36", + "serde_json", + "syn 2.0.66", +] + +[[package]] +name = "kube-runtime" +version = "0.95.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5895cb8aa641ac922408f128b935652b34c2995f16ad7db0984f6caa50217914" +dependencies = [ + "ahash 0.8.11", + "async-broadcast", + "async-stream", + "async-trait", + "backoff", + "derivative", + "futures 0.3.30", + "hashbrown 0.14.5", + "json-patch", + "jsonptr", + "k8s-openapi", + "kube-client", + "parking_lot", + "pin-project", + "serde", + "serde_json", + "thiserror", + "tokio", + "tokio-util", + "tracing", +] + [[package]] name = "lazy_static" version = "1.5.0" @@ -3812,6 +4120,51 @@ version = "2.3.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e3148f5046208a5d56bcfc03053e3ca6334e51da8dfb19b6cdc8b306fae3283e" +[[package]] +name = "pest" +version = "2.7.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cd53dff83f26735fdc1ca837098ccf133605d794cdae66acfc2bfac3ec809d95" +dependencies = [ + "memchr", + "thiserror", + "ucd-trie", +] + +[[package]] +name = "pest_derive" +version = "2.7.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2a548d2beca6773b1c244554d36fcf8548a8a58e74156968211567250e48e49a" +dependencies = [ + "pest", + "pest_generator", +] + +[[package]] +name = "pest_generator" +version = "2.7.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3c93a82e8d145725dcbaf44e5ea887c8a869efdcc28706df2d08c69e17077183" +dependencies = [ + "pest", + "pest_meta", + "proc-macro2 1.0.85", + "quote 1.0.36", + "syn 2.0.66", +] + +[[package]] +name = "pest_meta" +version = "2.7.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a941429fea7e08bedec25e4f6785b6ffaacc6b755da98df5ef3e7dcf4a124c4f" +dependencies = [ + "once_cell", + "pest", + "sha2 0.10.8", +] + [[package]] name = "petgraph" version = "0.6.5" @@ -4756,9 +5109,9 @@ dependencies = [ [[package]] name = "rustls" -version = "0.23.10" +version = "0.23.12" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "05cff451f60db80f490f3c182b77c35260baace73209e9cdbbe526bfe3a4d402" +checksum = "c58f8c84392efc0a126acce10fa59ff7b3d2ac06ab451a33f2741989b806b044" dependencies = [ "aws-lc-rs", "log", @@ -4837,9 +5190,9 @@ checksum = "84e217e7fdc8466b5b35d30f8c0a30febd29173df4a3a0c2115d306b9c4117ad" [[package]] name = "rustls-webpki" -version = "0.102.4" +version = "0.102.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ff448f7e92e913c4b7d4c6d8e4540a1724b319b4152b8aef6d4cf8339712b33e" +checksum = "84678086bd54edf2b415183ed7a94d0efb049f1b646a33e22a36f3794be6ae56" dependencies = [ "aws-lc-rs", "ring", @@ -4889,6 +5242,30 @@ dependencies = [ "windows-sys 0.52.0", ] +[[package]] +name = "schemars" +version = "0.8.21" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "09c024468a378b7e36765cd36702b7a90cc3cba11654f6685c8f233408e89e92" +dependencies = [ + "dyn-clone", + "schemars_derive", + "serde", + "serde_json", +] + +[[package]] +name = "schemars_derive" +version = "0.8.21" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b1eee588578aff73f856ab961cd2f79e36bc45d7ded33a7562adba4667aecc0e" +dependencies = [ + "proc-macro2 1.0.85", + "quote 1.0.36", + "serde_derive_internals", + "syn 2.0.66", +] + [[package]] name = "scopeguard" version = "1.2.0" @@ -4953,6 +5330,7 @@ version = "0.8.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9bd1c54ea06cfd2f6b63219704de0b9b4f72dcc2b8fdef820be6cd799780e91e" dependencies = [ + "serde", "zeroize", ] @@ -5136,13 +5514,25 @@ dependencies = [ "syn 2.0.66", ] +[[package]] +name = "serde_derive_internals" +version = "0.29.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "18d26a20a969b9e3fdf2fc2d9f21eda6c40e2de84c9408bb5d3b05d499aae711" +dependencies = [ + "proc-macro2 1.0.85", + "quote 1.0.36", + "syn 2.0.66", +] + [[package]] name = "serde_json" -version = "1.0.117" +version = "1.0.128" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "455182ea6142b14f93f4bc5320a2b31c1f266b66a4a5c858b013302a5d8cbfc3" +checksum = "6ff5456707a1de34e7e37f2a6fd3d3f808c318259cbd01ab6377795054b483d8" dependencies = [ "itoa", + "memchr", "ryu", "serde", ] @@ -5187,7 +5577,7 @@ version = "1.5.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e182d6ec6f05393cc0e5ed1bf81ad6db3a8feedf8ee515ecdd369809bcce8082" dependencies = [ - "darling", + "darling 0.13.4", "proc-macro2 1.0.85", "quote 1.0.36", "syn 1.0.109", @@ -6092,6 +6482,7 @@ dependencies = [ "futures-io", "futures-sink", "pin-project-lite", + "slab", "tokio", ] @@ -6195,6 +6586,25 @@ dependencies = [ "tracing", ] +[[package]] +name = "tower-http" +version = "0.5.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1e9cd434a998747dd2c4276bc96ee2e0c7a2eadf3cae88e52be55a05fa9053f5" +dependencies = [ + "base64 0.21.7", + "bitflags 2.6.0", + "bytes", + "http 1.1.0", + "http-body 1.0.0", + "http-body-util", + "mime", + "pin-project-lite", + "tower-layer", + "tower-service", + "tracing", +] + [[package]] name = "tower-layer" version = "0.3.2" @@ -6319,6 +6729,12 @@ version = "1.17.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "42ff0bf0c66b8238c6f3b578df37d0b7848e55df8577b3f74f92a69acceeb825" +[[package]] +name = "ucd-trie" +version = "0.1.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ed646292ffc8188ef8ea4d1e0e0150fb15a5c2e12ad9b8fc191ae7a8a7f3c4b9" + [[package]] name = "uint" version = "0.9.5" @@ -6449,9 +6865,9 @@ dependencies = [ [[package]] name = "url" -version = "2.5.0" +version = "2.5.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "31e6302e3bb753d46e83516cae55ae196fc0c309407cf11ab35cc51a4c2a4633" +checksum = "22784dbdf76fdde8af1aeda5622b546b422b6fc585325248a2bf9f5e41e94d6c" dependencies = [ "form_urlencoded", "idna", @@ -7405,8 +7821,12 @@ dependencies = [ "rand 0.8.5", "secrecy", "serde", + "strum", + "strum_macros", + "time", "tracing", "url", + "vise", "zksync_basic_types", "zksync_concurrency", "zksync_consensus_utils", @@ -7836,6 +8256,7 @@ dependencies = [ "secrecy", "serde_json", "serde_yaml", + "time", "tracing", "zksync_basic_types", "zksync_config", @@ -7844,6 +8265,44 @@ dependencies = [ "zksync_types", ] +[[package]] +name = "zksync_prover_autoscaler" +version = "0.1.0" +dependencies = [ + "anyhow", + "async-trait", + "axum", + "chrono", + "clap 4.5.4", + "ctrlc", + "debug-map-sorted", + "futures 0.3.30", + "k8s-openapi", + "kube", + "once_cell", + "regex", + "reqwest 0.12.5", + "ring", + "rustls", + "serde", + "serde_json", + "structopt", + "strum", + "time", + "tokio", + "tracing", + "tracing-subscriber", + "url", + "vise", + "zksync_config", + "zksync_core_leftovers", + "zksync_protobuf_config", + "zksync_prover_job_monitor", + "zksync_types", + "zksync_utils", + "zksync_vlog", +] + [[package]] name = "zksync_prover_dal" version = "0.1.0" diff --git a/prover/Cargo.toml b/prover/Cargo.toml index e95bae3d4c16..742eee649de1 100644 --- a/prover/Cargo.toml +++ b/prover/Cargo.toml @@ -1,8 +1,5 @@ [workspace] -members = [ - "crates/bin/*", - "crates/lib/*", -] +members = ["crates/bin/*", "crates/lib/*"] resolver = "2" @@ -19,20 +16,23 @@ categories = ["cryptography"] [workspace.dependencies] # Common dependencies anyhow = "1.0" -axum = "0.7.5" async-trait = "0.1" +axum = "0.7.5" bincode = "1" chrono = "0.4.38" clap = "4.4.6" colored = "2.0" const-decoder = "0.3.0" ctrlc = "3.1" +debug-map-sorted = "0.1.1" dialoguer = "0.11" futures = "0.3" hex = "0.4" -itertools = "0.10.5" indicatif = "0.16" +itertools = "0.10.5" jemallocator = "0.5" +k8s-openapi = { version = "0.23.0", features = ["v1_30"] } +kube = { version = "0.95.0", features = ["runtime", "derive"] } local-ip-address = "0.5.0" log = "0.4.20" md5 = "0.7.0" @@ -42,6 +42,8 @@ queues = "1.1.0" rand = "0.8" regex = "1.10.4" reqwest = "0.12" +ring = "0.17.8" +rustls = { version = "0.23.12", features = ["ring"] } serde = "1.0" serde_derive = "1.0" serde_json = "1.0" @@ -50,11 +52,13 @@ sqlx = { version = "0.8.1", default-features = false } structopt = "0.3.26" strum = { version = "0.26" } tempfile = "3" +time = "0.3.36" tokio = "1" tokio-util = "0.7.11" toml_edit = "0.14.4" tracing = "0.1" tracing-subscriber = "0.3" +url = "2.5.2" vise = "0.2.0" # Proving dependencies @@ -84,6 +88,7 @@ zksync_eth_client = { path = "../core/lib/eth_client" } zksync_contracts = { path = "../core/lib/contracts" } zksync_core_leftovers = { path = "../core/lib/zksync_core_leftovers" } zksync_periodic_job = { path = "../core/lib/periodic_job" } +zksync_protobuf_config = { path = "../core/lib/protobuf_config" } # Prover workspace dependencies zksync_prover_dal = { path = "crates/lib/prover_dal" } @@ -91,6 +96,7 @@ zksync_prover_fri_types = { path = "crates/lib/prover_fri_types" } zksync_prover_fri_utils = { path = "crates/lib/prover_fri_utils" } zksync_prover_keystore = { path = "crates/lib/keystore" } zksync_vk_setup_data_generator_server_fri = { path = "crates/bin/vk_setup_data_generator_server_fri" } +zksync_prover_job_monitor = { path = "crates/bin/prover_job_monitor" } # for `perf` profiling [profile.perf] diff --git a/prover/crates/bin/prover_autoscaler/Cargo.toml b/prover/crates/bin/prover_autoscaler/Cargo.toml new file mode 100644 index 000000000000..9743b45593e7 --- /dev/null +++ b/prover/crates/bin/prover_autoscaler/Cargo.toml @@ -0,0 +1,45 @@ +[package] +name = "zksync_prover_autoscaler" +version.workspace = true +edition.workspace = true +authors.workspace = true +homepage.workspace = true +repository.workspace = true +license.workspace = true +keywords.workspace = true +categories.workspace = true + +[dependencies] +zksync_core_leftovers.workspace = true +zksync_vlog.workspace = true +zksync_utils.workspace = true +zksync_types.workspace = true +zksync_config = { workspace = true, features = ["observability_ext"] } +zksync_prover_job_monitor.workspace = true +zksync_protobuf_config.workspace = true + +debug-map-sorted.workspace = true +anyhow.workspace = true +async-trait.workspace = true +axum.workspace = true +chrono.workspace = true +clap = { workspace = true, features = ["derive"] } +ctrlc = { workspace = true, features = ["termination"] } +futures.workspace = true +k8s-openapi = { workspace = true, features = ["v1_30"] } +kube = { workspace = true, features = ["runtime", "derive"] } +once_cell.workspace = true +regex.workspace = true +reqwest = { workspace = true, features = ["json"] } +ring.workspace = true +rustls = { workspace = true, features = ["ring"] } +serde = { workspace = true, features = ["derive"] } +serde_json.workspace = true +structopt.workspace = true +strum.workspace = true +time.workspace = true +tokio = { workspace = true, features = ["time", "macros"] } +tracing-subscriber = { workspace = true, features = ["env-filter"] } +tracing.workspace = true +url.workspace = true +vise.workspace = true diff --git a/prover/crates/bin/prover_autoscaler/src/agent.rs b/prover/crates/bin/prover_autoscaler/src/agent.rs new file mode 100644 index 000000000000..3269a43815c9 --- /dev/null +++ b/prover/crates/bin/prover_autoscaler/src/agent.rs @@ -0,0 +1,130 @@ +use std::net::SocketAddr; + +use anyhow::Context as _; +use axum::{ + extract::State, + response::IntoResponse, + routing::{get, post}, + Json, Router, +}; +use futures::future; +use reqwest::StatusCode; +use serde::{Deserialize, Serialize}; +use tokio::sync::watch; + +use crate::{ + cluster_types::Cluster, + k8s::{Scaler, Watcher}, +}; + +struct AppError(anyhow::Error); + +impl IntoResponse for AppError { + fn into_response(self) -> axum::response::Response { + ( + StatusCode::INTERNAL_SERVER_ERROR, + format!("Something went wrong: {}", self.0), + ) + .into_response() + } +} + +pub async fn run_server( + port: u16, + watcher: Watcher, + scaler: Scaler, + mut stop_receiver: watch::Receiver, +) -> anyhow::Result<()> { + let bind_address = SocketAddr::from(([0, 0, 0, 0], port)); + tracing::debug!("Starting Autoscaler agent on {bind_address}"); + let app = create_agent_router(watcher, scaler); + + let listener = tokio::net::TcpListener::bind(bind_address) + .await + .with_context(|| format!("Failed binding Autoscaler agent to {bind_address}"))?; + axum::serve(listener, app) + .with_graceful_shutdown(async move { + if stop_receiver.changed().await.is_err() { + tracing::warn!( + "Stop signal sender for Autoscaler agent was dropped without sending a signal" + ); + } + tracing::info!("Stop signal received, Autoscaler agent is shutting down"); + }) + .await + .context("Autoscaler agent failed")?; + tracing::info!("Autoscaler agent shut down"); + Ok(()) +} + +fn create_agent_router(watcher: Watcher, scaler: Scaler) -> Router { + let app = App { watcher, scaler }; + Router::new() + .route("/healthz", get(health)) + .route("/cluster", get(get_cluster)) + .route("/scale", post(scale)) + .with_state(app) +} + +// TODO: Use +// https://github.com/matter-labs/zksync-era/blob/9821a20018c367ce246dba656daab5c2e7757973/core/node/api_server/src/healthcheck.rs#L53 +// instead. +async fn health() -> &'static str { + "Ok\n" +} + +#[derive(Clone)] +struct App { + watcher: Watcher, + scaler: Scaler, +} + +async fn get_cluster(State(app): State) -> Result, AppError> { + let cluster = app.watcher.cluster.lock().await.clone(); + Ok(Json(cluster)) +} + +#[derive(Clone, Debug, Serialize, Deserialize)] +pub struct ScaleDeploymentRequest { + pub namespace: String, + pub name: String, + pub size: i32, +} + +#[derive(Clone, Debug, Serialize, Deserialize)] +pub struct ScaleRequest { + pub deployments: Vec, +} + +#[derive(Debug, Serialize, Deserialize)] +pub struct ScaleResponse { + pub scale_result: Vec, +} + +/// To test or forse scale in particular cluster use: +/// $ curl -X POST -H "Content-Type: application/json" --data '{"deployments": [{"namespace": "prover-red", "name": "witness-vector-generator-spec-9-f", "size":0},{"namespace": "prover-red", "name": "witness-vector-generator-spec-9-c", "size":0}]}' :8081/scale +async fn scale( + State(app): State, + Json(payload): Json, +) -> Result, AppError> { + let handles: Vec<_> = payload + .deployments + .into_iter() + .map(|d| { + let s = app.scaler.clone(); + tokio::spawn(async move { + match s.scale(&d.namespace, &d.name, d.size).await { + Ok(()) => "".to_string(), + Err(err) => err.to_string(), + } + }) + }) + .collect(); + + let scale_result = future::join_all(handles) + .await + .into_iter() + .map(Result::unwrap) + .collect(); + Ok(Json(ScaleResponse { scale_result })) +} diff --git a/prover/crates/bin/prover_autoscaler/src/cluster_types.rs b/prover/crates/bin/prover_autoscaler/src/cluster_types.rs new file mode 100644 index 000000000000..b074e0774c97 --- /dev/null +++ b/prover/crates/bin/prover_autoscaler/src/cluster_types.rs @@ -0,0 +1,66 @@ +use std::collections::{BTreeMap, HashMap}; + +use chrono::{DateTime, Utc}; +use serde::{Deserialize, Serialize, Serializer}; +use strum::{Display, EnumString}; + +#[derive(Debug, Clone, Default, Serialize, Deserialize)] +pub struct Pod { + // pub name: String, // TODO: Consider if it's needed. + pub owner: String, + pub status: String, + pub changed: DateTime, +} +#[derive(Debug, Clone, Default, Serialize, Deserialize)] +pub struct Deployment { + // pub name: String, // TODO: Consider if it's needed. + pub running: i32, + pub desired: i32, +} + +fn ordered_map( + value: &HashMap, + serializer: S, +) -> Result +where + S: Serializer, +{ + let ordered: BTreeMap<_, _> = value.iter().collect(); + ordered.serialize(serializer) +} + +#[derive(Debug, Default, Clone, Serialize, Deserialize)] +pub struct Namespace { + #[serde(serialize_with = "ordered_map")] + pub deployments: HashMap, + pub pods: HashMap, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct Cluster { + pub name: String, + pub namespaces: HashMap, +} +impl Default for Cluster { + fn default() -> Self { + Self { + name: "".to_string(), + namespaces: HashMap::new(), + } + } +} + +#[derive(Debug, Default, Clone, Serialize, Deserialize)] +pub struct Clusters { + pub clusters: HashMap, +} + +#[derive(Default, Debug, EnumString, Display, Hash, PartialEq, Eq, Clone, Copy)] +pub enum PodStatus { + #[default] + Unknown, + Running, + Pending, + LongPending, + NeedToMove, +} diff --git a/prover/crates/bin/prover_autoscaler/src/global/mod.rs b/prover/crates/bin/prover_autoscaler/src/global/mod.rs new file mode 100644 index 000000000000..5e4afb938437 --- /dev/null +++ b/prover/crates/bin/prover_autoscaler/src/global/mod.rs @@ -0,0 +1,3 @@ +pub mod queuer; +pub mod scaler; +pub mod watcher; diff --git a/prover/crates/bin/prover_autoscaler/src/global/queuer.rs b/prover/crates/bin/prover_autoscaler/src/global/queuer.rs new file mode 100644 index 000000000000..1ef5d96386b5 --- /dev/null +++ b/prover/crates/bin/prover_autoscaler/src/global/queuer.rs @@ -0,0 +1,41 @@ +use std::collections::HashMap; + +use anyhow::{Context, Ok}; +use reqwest::Method; +use zksync_prover_job_monitor::autoscaler_queue_reporter::VersionedQueueReport; +use zksync_utils::http_with_retries::send_request_with_retries; + +#[derive(Debug)] +pub struct Queue { + pub queue: HashMap, +} + +#[derive(Default)] +pub struct Queuer { + pub prover_job_monitor_url: String, +} + +impl Queuer { + pub fn new(pjm_url: String) -> Self { + Self { + prover_job_monitor_url: pjm_url, + } + } + + pub async fn get_queue(&self) -> anyhow::Result { + let url = &self.prover_job_monitor_url; + let response = send_request_with_retries(url, 5, Method::GET, None, None).await; + let res = response + .map_err(|err| anyhow::anyhow!("Failed fetching queue from url: {url}: {err:?}"))? + .json::>() + .await + .context("Failed to read response as json")?; + + Ok(Queue { + queue: res + .iter() + .map(|x| (x.version.to_string(), x.report.prover_jobs.queued as u64)) + .collect::>(), + }) + } +} diff --git a/prover/crates/bin/prover_autoscaler/src/global/scaler.rs b/prover/crates/bin/prover_autoscaler/src/global/scaler.rs new file mode 100644 index 000000000000..9f37c4d11675 --- /dev/null +++ b/prover/crates/bin/prover_autoscaler/src/global/scaler.rs @@ -0,0 +1,360 @@ +use std::{collections::HashMap, str::FromStr}; + +use chrono::Utc; +use debug_map_sorted::SortedOutputExt; +use once_cell::sync::Lazy; +use regex::Regex; +use zksync_config::configs::prover_autoscaler::{Gpu, ProverAutoscalerScalerConfig}; + +use super::{queuer, watcher}; +use crate::{ + cluster_types::{Cluster, Clusters, Pod, PodStatus}, + metrics::AUTOSCALER_METRICS, + task_wiring::Task, +}; + +const DEFAULT_SPEED: u32 = 500; + +#[derive(Default, Debug, PartialEq, Eq)] +struct GPUPool { + name: String, + gpu: Gpu, + provers: HashMap, // TODO: consider using i64 everywhere to avoid type casts. + preemtions: u64, + max_pool_size: u32, +} + +impl GPUPool { + fn sum_by_pod_status(&self, ps: PodStatus) -> u32 { + self.provers.get(&ps).cloned().unwrap_or(0) + } + + fn to_key(&self) -> GPUPoolKey { + GPUPoolKey { + cluster: self.name.clone(), + gpu: self.gpu, + } + } +} + +#[derive(Debug, Eq, Hash, PartialEq)] +struct GPUPoolKey { + cluster: String, + gpu: Gpu, +} + +static PROVER_DEPLOYMENT_RE: Lazy = + Lazy::new(|| Regex::new(r"^prover-gpu-fri-spec-(\d{1,2})?(-(?[ltvpa]\d+))?$").unwrap()); +static PROVER_POD_RE: Lazy = + Lazy::new(|| Regex::new(r"^prover-gpu-fri-spec-(\d{1,2})?(-(?[ltvpa]\d+))?").unwrap()); + +pub struct Scaler { + /// namespace to Protocol Version configuration. + namespaces: HashMap, + watcher: watcher::Watcher, + queuer: queuer::Queuer, + + /// Which cluster to use first. + cluster_priorities: HashMap, + prover_speed: HashMap, + long_pending_duration: chrono::Duration, +} + +struct ProverPodGpu<'a> { + name: &'a str, + pod: &'a Pod, + gpu: Gpu, +} + +impl<'a> ProverPodGpu<'a> { + fn new(name: &'a str, pod: &'a Pod) -> Option> { + PROVER_POD_RE.captures(name).map(|caps| Self { + name, + pod, + gpu: Gpu::from_str(caps.name("gpu").map_or("l4", |m| m.as_str())).unwrap_or_default(), + }) + } +} + +impl Scaler { + pub fn new( + watcher: watcher::Watcher, + queuer: queuer::Queuer, + config: ProverAutoscalerScalerConfig, + ) -> Self { + Self { + namespaces: config.protocol_versions, + watcher, + queuer, + cluster_priorities: config.cluster_priorities, + prover_speed: config.prover_speed, + long_pending_duration: chrono::Duration::seconds( + config.long_pending_duration.whole_seconds(), + ), + } + } + + fn convert_to_gpu_pool(&self, namespace: &String, cluster: &Cluster) -> Vec { + let mut gp_map = HashMap::new(); // + let Some(namespace_value) = &cluster.namespaces.get(namespace) else { + // No namespace in config, ignoring. + return vec![]; + }; + + for caps in namespace_value + .deployments + .keys() + .filter_map(|dn| PROVER_DEPLOYMENT_RE.captures(dn)) + { + // Processing only provers. + let gpu = + Gpu::from_str(caps.name("gpu").map_or("l4", |m| m.as_str())).unwrap_or_default(); + let e = gp_map.entry(gpu).or_insert(GPUPool { + name: cluster.name.clone(), + gpu, + max_pool_size: 100, // TODO: get from the agent. + ..Default::default() + }); + + // Initialize pool only if we have ready deployments. + e.provers.insert(PodStatus::Running, 0); + } + + for ppg in namespace_value + .pods + .iter() + .filter_map(|(pn, pv)| ProverPodGpu::new(pn, pv)) + { + let e = gp_map.entry(ppg.gpu).or_insert(GPUPool { + name: cluster.name.clone(), + gpu: ppg.gpu, + ..Default::default() + }); + let mut status = PodStatus::from_str(&ppg.pod.status).unwrap_or_default(); + if status == PodStatus::Pending + && ppg.pod.changed < Utc::now() - self.long_pending_duration + { + status = PodStatus::LongPending; + } + tracing::info!( + "pod {}: status: {}, real status: {}", + ppg.name, + status, + ppg.pod.status + ); + e.provers.entry(status).and_modify(|n| *n += 1).or_insert(1); + } + + tracing::info!("From pods {:?}", gp_map.sorted_debug()); + + gp_map.into_values().collect() + } + + fn sorted_clusters(&self, namespace: &String, clusters: &Clusters) -> Vec { + let mut gpu_pools: Vec = clusters + .clusters + .values() + .flat_map(|c| self.convert_to_gpu_pool(namespace, c)) + .collect(); + + gpu_pools.sort_by(|a, b| { + a.gpu + .cmp(&b.gpu) // Sort by GPU first. + .then( + a.sum_by_pod_status(PodStatus::NeedToMove) + .cmp(&b.sum_by_pod_status(PodStatus::NeedToMove)), + ) // Sort by need to evict. + .then( + a.sum_by_pod_status(PodStatus::LongPending) + .cmp(&b.sum_by_pod_status(PodStatus::LongPending)), + ) // Sort by long Pending pods. + .then(a.preemtions.cmp(&b.preemtions)) // Sort by preemtions in the cluster. + .then( + self.cluster_priorities + .get(&a.name) + .unwrap_or(&1000) + .cmp(self.cluster_priorities.get(&b.name).unwrap_or(&1000)), + ) // Sort by priority. + .then(b.max_pool_size.cmp(&a.max_pool_size)) // Reverse sort by cluster size. + }); + + gpu_pools + } + + fn speed(&self, gpu: Gpu) -> u64 { + self.prover_speed + .get(&gpu) + .cloned() + .unwrap_or(DEFAULT_SPEED) + .into() + } + + fn provers_to_speed(&self, gpu: Gpu, n: u32) -> u64 { + self.speed(gpu) * n as u64 + } + + fn normalize_queue(&self, gpu: Gpu, q: u64) -> u64 { + let speed = self.speed(gpu); + // Divide and round up if there's any remainder. + (q + speed - 1) / speed * speed + } + + fn run(&self, namespace: &String, q: u64, clusters: &Clusters) -> HashMap { + let sc = self.sorted_clusters(namespace, clusters); + tracing::debug!("Sorted clusters for namespace {}: {:?}", namespace, &sc); + + let mut total: i64 = 0; + let mut provers: HashMap = HashMap::new(); + for c in &sc { + for (status, p) in &c.provers { + match status { + PodStatus::Running | PodStatus::Pending => { + total += self.provers_to_speed(c.gpu, *p) as i64; + provers + .entry(c.to_key()) + .and_modify(|x| *x += p) + .or_insert(*p); + } + _ => (), // Ignore LongPending as not running here. + } + } + } + + // Remove unneeded pods. + if (total as u64) > self.normalize_queue(Gpu::L4, q) { + for c in sc.iter().rev() { + let mut excess_queue = total as u64 - self.normalize_queue(c.gpu, q); + let mut excess_provers = (excess_queue / self.speed(c.gpu)) as u32; + let p = provers.entry(c.to_key()).or_default(); + if *p < excess_provers { + excess_provers = *p; + excess_queue = *p as u64 * self.speed(c.gpu); + } + *p -= excess_provers; + total -= excess_queue as i64; + if total <= 0 { + break; + }; + } + } + + // Reduce load in over capacity pools. + for c in &sc { + let p = provers.entry(c.to_key()).or_default(); + if c.max_pool_size < *p { + let excess = *p - c.max_pool_size; + total -= excess as i64 * self.speed(c.gpu) as i64; + *p -= excess; + } + } + + tracing::debug!("Queue coverd with provers: {}", total); + // Add required provers. + if (total as u64) < q { + for c in &sc { + let mut required_queue = q - total as u64; + let mut required_provers = + (self.normalize_queue(c.gpu, required_queue) / self.speed(c.gpu)) as u32; + let p = provers.entry(c.to_key()).or_default(); + if *p + required_provers > c.max_pool_size { + required_provers = c.max_pool_size - *p; + required_queue = required_provers as u64 * self.speed(c.gpu); + } + *p += required_provers; + total += required_queue as i64; + } + } + + tracing::debug!("run result: provers {:?}, total: {}", &provers, total); + + provers + } +} + +#[async_trait::async_trait] +impl Task for Scaler { + async fn invoke(&self) -> anyhow::Result<()> { + let queue = self.queuer.get_queue().await.unwrap(); + + // TODO: Check that clusters data is ready. + let clusters = self.watcher.clusters.lock().await; + for (ns, ppv) in &self.namespaces { + let q = queue.queue.get(ppv).cloned().unwrap_or(0); + if q > 0 { + let provers = self.run(ns, q, &clusters); + for (k, num) in &provers { + AUTOSCALER_METRICS.provers[&(k.cluster.clone(), ns.clone(), k.gpu)] + .set(*num as u64); + } + // TODO: compare before and desired, send commands [cluster,namespace,deployment] -> provers + } + } + + Ok(()) + } +} + +#[cfg(test)] +mod tests { + use std::sync::Arc; + + use tokio::sync::Mutex; + + use super::*; + use crate::{ + cluster_types::{self, Deployment, Namespace, Pod}, + global::{queuer, watcher}, + }; + + #[test] + fn test_run() { + let watcher = watcher::Watcher { + cluster_agents: vec![], + clusters: Arc::new(Mutex::new(cluster_types::Clusters { + ..Default::default() + })), + }; + let queuer = queuer::Queuer { + prover_job_monitor_url: "".to_string(), + }; + let scaler = Scaler::new(watcher, queuer, ProverAutoscalerScalerConfig::default()); + let got = scaler.run( + &"prover".to_string(), + 1499, + &Clusters { + clusters: HashMap::from([( + "foo".to_string(), + Cluster { + name: "foo".to_string(), + namespaces: HashMap::from([( + "prover".to_string(), + Namespace { + deployments: HashMap::from([( + "prover-gpu-fri-spec-1".to_string(), + Deployment { + ..Default::default() + }, + )]), + pods: HashMap::from([( + "prover-gpu-fri-spec-1-c47644679-x9xqp".to_string(), + Pod { + status: "Running".to_string(), + ..Default::default() + }, + )]), + }, + )]), + }, + )]), + }, + ); + let want = HashMap::from([( + GPUPoolKey { + cluster: "foo".to_string(), + gpu: Gpu::L4, + }, + 3, + )]); + assert!(got == want); + } +} diff --git a/prover/crates/bin/prover_autoscaler/src/global/watcher.rs b/prover/crates/bin/prover_autoscaler/src/global/watcher.rs new file mode 100644 index 000000000000..ef3ebd3b8193 --- /dev/null +++ b/prover/crates/bin/prover_autoscaler/src/global/watcher.rs @@ -0,0 +1,89 @@ +use std::{collections::HashMap, sync::Arc}; + +use anyhow::{Context, Ok}; +use futures::future; +use reqwest::Method; +use tokio::sync::Mutex; +use url::Url; +use zksync_utils::http_with_retries::send_request_with_retries; + +use crate::{ + cluster_types::{Cluster, Clusters}, + task_wiring::Task, +}; + +#[derive(Clone)] +pub struct Watcher { + /// List of base URLs of all agents. + pub cluster_agents: Vec>, + pub clusters: Arc>, +} + +impl Watcher { + pub fn new(agent_urls: Vec) -> Self { + Self { + cluster_agents: agent_urls + .into_iter() + .map(|u| { + Arc::new( + Url::parse(&u) + .unwrap_or_else(|e| panic!("Unparsable Agent URL {}: {}", u, e)), + ) + }) + .collect(), + clusters: Arc::new(Mutex::new(Clusters { + clusters: HashMap::new(), + })), + } + } +} + +#[async_trait::async_trait] +impl Task for Watcher { + async fn invoke(&self) -> anyhow::Result<()> { + let handles: Vec<_> = self + .cluster_agents + .clone() + .into_iter() + .map(|a| { + tracing::debug!("Getting cluster data from agent {}.", a); + tokio::spawn(async move { + let url: String = a + .clone() + .join("/cluster") + .context("Failed to join URL with /cluster")? + .to_string(); + let response = + send_request_with_retries(&url, 5, Method::GET, None, None).await; + response + .map_err(|err| { + anyhow::anyhow!("Failed fetching cluster from url: {url}: {err:?}") + })? + .json::() + .await + .context("Failed to read response as json") + }) + }) + .collect(); + + future::try_join_all( + future::join_all(handles) + .await + .into_iter() + .map(|h| async move { + let c = h.unwrap().unwrap(); + self.clusters + .lock() + .await + .clusters + .insert(c.name.clone(), c); + Ok(()) + }) + .collect::>(), + ) + .await + .unwrap(); + + Ok(()) + } +} diff --git a/prover/crates/bin/prover_autoscaler/src/k8s/mod.rs b/prover/crates/bin/prover_autoscaler/src/k8s/mod.rs new file mode 100644 index 000000000000..0804b9eaa404 --- /dev/null +++ b/prover/crates/bin/prover_autoscaler/src/k8s/mod.rs @@ -0,0 +1,5 @@ +pub use scaler::Scaler; +pub use watcher::Watcher; + +mod scaler; +mod watcher; diff --git a/prover/crates/bin/prover_autoscaler/src/k8s/scaler.rs b/prover/crates/bin/prover_autoscaler/src/k8s/scaler.rs new file mode 100644 index 000000000000..170b0b106507 --- /dev/null +++ b/prover/crates/bin/prover_autoscaler/src/k8s/scaler.rs @@ -0,0 +1,27 @@ +use k8s_openapi::api; +use kube::api::{Api, Patch, PatchParams}; + +#[derive(Clone)] +pub struct Scaler { + pub client: kube::Client, +} + +impl Scaler { + pub async fn scale(&self, namespace: &str, name: &str, size: i32) -> anyhow::Result<()> { + let deployments: Api = + Api::namespaced(self.client.clone(), namespace); + + let patch = serde_json::json!({ + "apiVersion": "apps/v1", + "kind": "Deployment", + "spec": { + "replicas": size + } + }); + let pp = PatchParams::default(); + deployments.patch(name, &pp, &Patch::Merge(patch)).await?; + tracing::info!("Scaled deployment/{} to {} replica(s).", name, size); + + Ok(()) + } +} diff --git a/prover/crates/bin/prover_autoscaler/src/k8s/watcher.rs b/prover/crates/bin/prover_autoscaler/src/k8s/watcher.rs new file mode 100644 index 000000000000..8746d17663be --- /dev/null +++ b/prover/crates/bin/prover_autoscaler/src/k8s/watcher.rs @@ -0,0 +1,138 @@ +use std::{collections::HashMap, sync::Arc}; + +use chrono::Utc; +use futures::{stream, StreamExt, TryStreamExt}; +use k8s_openapi::api; +use kube::{ + api::{Api, ResourceExt}, + runtime::{watcher, WatchStreamExt}, +}; +use tokio::sync::Mutex; + +use crate::{ + cluster_types::{Cluster, Deployment, Namespace, Pod}, + metrics::AUTOSCALER_METRICS, +}; + +#[derive(Clone)] +pub struct Watcher { + pub client: kube::Client, + pub cluster: Arc>, +} + +impl Watcher { + pub fn new(client: kube::Client, cluster_name: String, namespaces: Vec) -> Self { + let mut ns = HashMap::new(); + namespaces.into_iter().for_each(|n| { + ns.insert(n, Namespace::default()); + }); + + Self { + client, + cluster: Arc::new(Mutex::new(Cluster { + name: cluster_name, + namespaces: ns, + })), + } + } + + pub async fn run(self) -> anyhow::Result<()> { + // TODO: add actual metrics + AUTOSCALER_METRICS.protocol_version.set(1); + AUTOSCALER_METRICS.calls.inc_by(1); + + // TODO: watch for a list of namespaces, get: + // - deployments (name, running, desired) [done] + // - pods (name, parent deployment, statuses, when the last status change) [~done] + // - events (number of scheduling failures in last N seconds, which deployments) + // - events (preemptions, which deployment, when, how many) + // - pool size from GCP (name, size, which GPU) + let mut watchers = vec![]; + for namespace in self.cluster.lock().await.namespaces.keys() { + let deployments: Api = + Api::namespaced(self.client.clone(), namespace); + watchers.push( + watcher(deployments, watcher::Config::default()) + .default_backoff() + .applied_objects() + .map_ok(Watched::Deploy) + .boxed(), + ); + + let pods: Api = Api::namespaced(self.client.clone(), namespace); + watchers.push( + watcher(pods, watcher::Config::default()) + .default_backoff() + .applied_objects() + .map_ok(Watched::Pod) + .boxed(), + ); + } + // select on applied events from all watchers + let mut combo_stream = stream::select_all(watchers); + // SelectAll Stream elements must have the same Item, so all packed in this: + #[allow(clippy::large_enum_variant)] + enum Watched { + Deploy(api::apps::v1::Deployment), + Pod(api::core::v1::Pod), + } + while let Some(o) = combo_stream.try_next().await? { + match o { + Watched::Deploy(d) => { + let namespace = match d.namespace() { + Some(n) => n.to_string(), + None => continue, + }; + let mut cluster = self.cluster.lock().await; + let v = cluster.namespaces.get_mut(&namespace).unwrap(); + let dep = v + .deployments + .entry(d.name_any()) + .or_insert(Deployment::default()); + let nums = d.status.clone().unwrap_or_default(); + dep.running = nums.available_replicas.unwrap_or_default(); + dep.desired = nums.replicas.unwrap_or_default(); + + tracing::info!( + "Got deployment: {}, size: {}/{} un {}", + d.name_any(), + nums.available_replicas.unwrap_or_default(), + nums.replicas.unwrap_or_default(), + nums.unavailable_replicas.unwrap_or_default(), + ) + } + Watched::Pod(p) => { + let namespace = match p.namespace() { + Some(n) => n.to_string(), + None => continue, + }; + let mut cluster = self.cluster.lock().await; + let v = cluster.namespaces.get_mut(&namespace).unwrap(); + let pod = v.pods.entry(p.name_any()).or_insert(Pod::default()); + pod.owner = p + .owner_references() + .iter() + .map(|x| format!("{}/{}", x.kind.clone(), x.name.clone())) + .collect::>() + .join(":"); + // TODO: Collect replica sets to match deployments and pods. + let phase = p + .status + .clone() + .unwrap_or_default() + .phase + .unwrap_or_default(); + if phase != pod.status { + // TODO: try to get an idea how to set correct value on restart. + pod.changed = Utc::now(); + } + pod.status = phase; + + tracing::info!("Got pod: {}", p.name_any()) + } + } + } + + Ok(()) + } +} diff --git a/prover/crates/bin/prover_autoscaler/src/lib.rs b/prover/crates/bin/prover_autoscaler/src/lib.rs new file mode 100644 index 000000000000..0b0d704c9078 --- /dev/null +++ b/prover/crates/bin/prover_autoscaler/src/lib.rs @@ -0,0 +1,6 @@ +pub mod agent; +pub(crate) mod cluster_types; +pub mod global; +pub mod k8s; +pub(crate) mod metrics; +pub mod task_wiring; diff --git a/prover/crates/bin/prover_autoscaler/src/main.rs b/prover/crates/bin/prover_autoscaler/src/main.rs new file mode 100644 index 000000000000..196bd6deb81e --- /dev/null +++ b/prover/crates/bin/prover_autoscaler/src/main.rs @@ -0,0 +1,148 @@ +use std::time::Duration; + +use anyhow::Context; +use structopt::StructOpt; +use tokio::{ + sync::{oneshot, watch}, + task::JoinHandle, +}; +use zksync_core_leftovers::temp_config_store::read_yaml_repr; +use zksync_protobuf_config::proto::prover_autoscaler; +use zksync_prover_autoscaler::{ + agent, + global::{self}, + k8s::{Scaler, Watcher}, + task_wiring::TaskRunner, +}; +use zksync_utils::wait_for_tasks::ManagedTasks; +use zksync_vlog::prometheus::PrometheusExporterConfig; + +/// Represents the sequential number of the Prover Autoscaler type. +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, serde::Serialize, serde::Deserialize)] +pub enum AutoscalerType { + Scaler, + Agent, +} + +impl std::str::FromStr for AutoscalerType { + type Err = String; + + fn from_str(s: &str) -> Result { + match s { + "scaler" => Ok(AutoscalerType::Scaler), + "agent" => Ok(AutoscalerType::Agent), + other => Err(format!("{} is not a valid AutoscalerType", other)), + } + } +} + +#[derive(Debug, StructOpt)] +#[structopt(name = "Prover Autoscaler", about = "Run Prover Autoscaler components")] +struct Opt { + /// Prover Autoscaler can run Agent or Scaler type. + /// + /// Specify `agent` or `scaler` + #[structopt(short, long, default_value = "agent")] + job: AutoscalerType, + /// Name of the cluster Agent is watching. + #[structopt(long)] + cluster_name: Option, + /// Path to the configuration file. + #[structopt(long)] + config_path: std::path::PathBuf, +} + +#[tokio::main] +async fn main() -> anyhow::Result<()> { + let opt = Opt::from_args(); + let general_config = + read_yaml_repr::(&opt.config_path) + .context("general config")?; + let observability_config = general_config + .observability + .context("observability config")?; + let _observability_guard = observability_config.install()?; + // That's unfortunate that there are at least 3 different Duration in rust and we use all 3 in this repo. + // TODO: Consider updating zksync_protobuf to support std::time::Duration. + let graceful_shutdown_timeout = general_config.graceful_shutdown_timeout.unsigned_abs(); + + let (stop_signal_sender, stop_signal_receiver) = oneshot::channel(); + let mut stop_signal_sender = Some(stop_signal_sender); + ctrlc::set_handler(move || { + if let Some(sender) = stop_signal_sender.take() { + sender.send(()).ok(); + } + }) + .context("Error setting Ctrl+C handler")?; + + let (stop_sender, stop_receiver) = watch::channel(false); + + let _ = rustls::crypto::ring::default_provider().install_default(); + let client = kube::Client::try_default().await?; + + tracing::info!("Starting ProverAutoscaler"); + + let mut tasks = vec![]; + + match opt.job { + AutoscalerType::Agent => { + let agent_config = general_config.agent_config.context("agent_config")?; + let exporter_config = PrometheusExporterConfig::pull(agent_config.prometheus_port); + tasks.push(tokio::spawn(exporter_config.run(stop_receiver.clone()))); + + // TODO: maybe get cluster name from curl -H "Metadata-Flavor: Google" + // http://metadata.google.internal/computeMetadata/v1/instance/attributes/cluster-name + let watcher = Watcher::new( + client.clone(), + opt.cluster_name + .context("cluster_name is required for Agent")?, + agent_config.namespaces, + ); + let scaler = Scaler { client }; + tasks.push(tokio::spawn(watcher.clone().run())); + tasks.push(tokio::spawn(agent::run_server( + agent_config.http_port, + watcher, + scaler, + stop_receiver.clone(), + ))) + } + AutoscalerType::Scaler => { + let scaler_config = general_config.scaler_config.context("scaler_config")?; + let interval = scaler_config.scaler_run_interval.unsigned_abs(); + let exporter_config = PrometheusExporterConfig::pull(scaler_config.prometheus_port); + tasks.push(tokio::spawn(exporter_config.run(stop_receiver.clone()))); + let watcher = global::watcher::Watcher::new(scaler_config.agents.clone()); + let queuer = global::queuer::Queuer::new(scaler_config.prover_job_monitor_url.clone()); + let scaler = global::scaler::Scaler::new(watcher.clone(), queuer, scaler_config); + tasks.extend(get_tasks(watcher, scaler, interval, stop_receiver)?); + } + } + + let mut tasks = ManagedTasks::new(tasks); + + tokio::select! { + _ = tasks.wait_single() => {}, + _ = stop_signal_receiver => { + tracing::info!("Stop signal received, shutting down"); + } + } + stop_sender.send(true).ok(); + tasks.complete(graceful_shutdown_timeout).await; + + Ok(()) +} + +fn get_tasks( + watcher: global::watcher::Watcher, + scaler: global::scaler::Scaler, + interval: Duration, + stop_receiver: watch::Receiver, +) -> anyhow::Result>>> { + let mut task_runner = TaskRunner::default(); + + task_runner.add("Watcher", interval, watcher); + task_runner.add("Scaler", interval, scaler); + + Ok(task_runner.spawn(stop_receiver)) +} diff --git a/prover/crates/bin/prover_autoscaler/src/metrics.rs b/prover/crates/bin/prover_autoscaler/src/metrics.rs new file mode 100644 index 000000000000..09cbaa6ba00f --- /dev/null +++ b/prover/crates/bin/prover_autoscaler/src/metrics.rs @@ -0,0 +1,14 @@ +use vise::{Counter, Gauge, LabeledFamily, Metrics}; +use zksync_config::configs::prover_autoscaler::Gpu; + +#[derive(Debug, Metrics)] +#[metrics(prefix = "autoscaler")] +pub(crate) struct AutoscalerMetrics { + pub protocol_version: Gauge, + pub calls: Counter, + #[metrics(labels = ["target_cluster", "target_namespace", "gpu"])] + pub provers: LabeledFamily<(String, String, Gpu), Gauge, 3>, +} + +#[vise::register] +pub(crate) static AUTOSCALER_METRICS: vise::Global = vise::Global::new(); diff --git a/prover/crates/bin/prover_autoscaler/src/task_wiring.rs b/prover/crates/bin/prover_autoscaler/src/task_wiring.rs new file mode 100644 index 000000000000..9b60145ad9ea --- /dev/null +++ b/prover/crates/bin/prover_autoscaler/src/task_wiring.rs @@ -0,0 +1,72 @@ +use std::time::Duration; + +use anyhow::Context; +use tracing::Instrument; + +/// Task trait to be run in ProverJobMonitor. +#[async_trait::async_trait] +pub trait Task { + async fn invoke(&self) -> anyhow::Result<()>; +} + +/// Wrapper for Task with a periodic interface. Holds information about the task and provides DB connectivity. +struct PeriodicTask { + job: Box, + name: String, + interval: Duration, +} + +impl PeriodicTask { + async fn run( + &self, + mut stop_receiver: tokio::sync::watch::Receiver, + ) -> anyhow::Result<()> { + tracing::info!( + "Started Task {} with run interval: {:?}", + self.name, + self.interval + ); + + let mut interval = tokio::time::interval(self.interval); + + while !*stop_receiver.borrow_and_update() { + interval.tick().await; + self.job + .invoke() + .instrument(tracing::info_span!("run", service_name = %self.name)) + .await + .context("failed to invoke task")?; + } + tracing::info!("Stop signal received; Task {} is shut down", self.name); + Ok(()) + } +} + +/// Wrapper on a vector of task. Makes adding/spawning tasks and sharing resources ergonomic. +#[derive(Default)] +pub struct TaskRunner { + tasks: Vec, +} + +impl TaskRunner { + pub fn add(&mut self, name: &str, interval: Duration, job: T) { + self.tasks.push(PeriodicTask { + name: name.into(), + interval, + job: Box::new(job), + }); + } + + pub fn spawn( + self, + stop_receiver: tokio::sync::watch::Receiver, + ) -> Vec>> { + self.tasks + .into_iter() + .map(|task| { + let receiver = stop_receiver.clone(); + tokio::spawn(async move { task.run(receiver).await }) + }) + .collect() + } +}