diff --git a/Cargo.lock b/Cargo.lock index 3a6bd9081946a..05fc9c8d31ae3 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3089,7 +3089,7 @@ dependencies = [ [[package]] name = "mz-compute" -version = "0.0.0" +version = "0.26.1-dev" dependencies = [ "anyhow", "axum", @@ -3114,6 +3114,7 @@ dependencies = [ "mz-repr", "mz-storage", "mz-timely-util", + "once_cell", "prometheus", "prost-build", "rdkafka", @@ -5685,7 +5686,7 @@ checksum = "a2eb9349b6444b326872e140eb1cf5e7c522154d69e7a0ffb0fb81c06b37543f" [[package]] name = "storaged" -version = "0.0.0" +version = "0.26.1-dev" dependencies = [ "anyhow", "axum", @@ -5698,6 +5699,7 @@ dependencies = [ "mz-pid-file", "mz-prof", "mz-storage", + "once_cell", "serde", "tikv-jemallocator", "timely", diff --git a/src/billing-demo/src/bin/billing-demo/main.rs b/src/billing-demo/src/bin/billing-demo/main.rs index 4cfd2b2045a86..c0a326048b3fb 100644 --- a/src/billing-demo/src/bin/billing-demo/main.rs +++ b/src/billing-demo/src/bin/billing-demo/main.rs @@ -27,6 +27,7 @@ use prost::Message; use tokio::time::{self, Duration}; use tracing::{error, info, trace}; +use mz_ore::cli::{self, CliConfig}; use mz_ore::task; use mz_test_util::kafka::kafka_client; use mz_test_util::mz_client; @@ -47,7 +48,7 @@ async fn main() { } async fn run() -> Result<()> { - let config: Args = mz_ore::cli::parse_args(); + let config: Args = cli::parse_args(CliConfig::default()); let k_config = config.kafka_config(); let mz_config = config.mz_config(); diff --git a/src/compute/Cargo.toml b/src/compute/Cargo.toml index be1662d7c437b..c50531f191ed7 100644 --- a/src/compute/Cargo.toml +++ b/src/compute/Cargo.toml @@ -1,7 +1,7 @@ [package] name = "mz-compute" description = "Materialize's compute layer." -version = "0.0.0" +version = "0.26.1-dev" edition = "2021" rust-version = "1.61.0" publish = false @@ -30,6 +30,7 @@ mz-prof = { path = "../prof" } mz-repr = { path = "../repr" } mz-storage = { path = "../storage", default-features = false } mz-timely-util = { path = "../timely-util" } +once_cell = "1.12.0" prometheus = { version = "0.13.1", default-features = false } rdkafka = { git = "https://github.com/fede1024/rust-rdkafka.git", features = ["cmake-build", "ssl-vendored", "libz-static", "zstd"] } scopeguard = "1.1.0" diff --git a/src/compute/src/bin/computed.rs b/src/compute/src/bin/computed.rs index 097a75b8dfa3b..e4671531002d7 100644 --- a/src/compute/src/bin/computed.rs +++ b/src/compute/src/bin/computed.rs @@ -14,7 +14,7 @@ use std::process; use anyhow::bail; use futures::sink::SinkExt; use futures::stream::TryStreamExt; -use mz_build_info::{build_info, BuildInfo}; +use once_cell::sync::Lazy; use serde::de::DeserializeOwned; use serde::ser::Serialize; use tokio::net::TcpListener; @@ -22,9 +22,11 @@ use tokio::select; use tracing::info; use tracing_subscriber::filter::Targets; +use mz_build_info::{build_info, BuildInfo}; use mz_dataflow_types::client::{ComputeClient, GenericClient}; use mz_dataflow_types::reconciliation::command::ComputeCommandReconcile; use mz_dataflow_types::ConnectorContext; +use mz_ore::cli::{self, CliConfig}; use mz_ore::metrics::MetricsRegistry; use mz_ore::now::SYSTEM_TIME; @@ -45,34 +47,31 @@ static ALLOC: tikv_jemallocator::Jemalloc = tikv_jemallocator::Jemalloc; const BUILD_INFO: BuildInfo = build_info!(); +pub static VERSION: Lazy = Lazy::new(|| BUILD_INFO.human_version()); + /// Independent dataflow server for Materialize. #[derive(clap::Parser)] +#[clap(version = VERSION.as_str())] struct Args { /// The address on which to listen for a connection from the controller. #[clap( long, - env = "COMPUTED_LISTEN_ADDR", + env = "LISTEN_ADDR", value_name = "HOST:PORT", default_value = "127.0.0.1:2100" )] listen_addr: String, /// Number of dataflow worker threads. - #[clap( - short, - long, - env = "COMPUTED_WORKERS", - value_name = "W", - default_value = "1" - )] + #[clap(short, long, env = "WORKERS", value_name = "W", default_value = "1")] workers: usize, /// Number of this computed process. - #[clap(short = 'p', long, env = "COMPUTED_PROCESS", value_name = "P")] + #[clap(short = 'p', long, env = "PROCESS", value_name = "P")] process: Option, /// Total number of computed processes. #[clap( short = 'n', long, - env = "COMPUTED_PROCESSES", + env = "PROCESSES", value_name = "N", default_value = "1" )] @@ -107,7 +106,7 @@ struct Args { /// The default value for this option is "info". #[clap( long, - env = "COMPUTED_LOG_FILTER", + env = "LOG_FILTER", value_name = "FILTER", default_value = "info" )] @@ -120,7 +119,11 @@ struct Args { #[tokio::main] async fn main() { - if let Err(err) = run(mz_ore::cli::parse_args()).await { + let args = cli::parse_args(CliConfig { + env_prefix: Some("COMPUTED_"), + enable_version_flag: true, + }); + if let Err(err) = run(args).await { eprintln!("computed: fatal: {:#}", err); process::exit(1); } diff --git a/src/dataflow-bin/src/bin/dataflow-logview.rs b/src/dataflow-bin/src/bin/dataflow-logview.rs index 01c7540ae8102..f1326e6d081aa 100644 --- a/src/dataflow-bin/src/bin/dataflow-logview.rs +++ b/src/dataflow-bin/src/bin/dataflow-logview.rs @@ -18,6 +18,8 @@ use timely::dataflow::operators::capture::Replay; use timely::dataflow::operators::Inspect; use timely::logging::{TimelyEvent, WorkerIdentifier}; +use mz_ore::cli::{self, CliConfig}; + /// Views Timely logs from a running dataflow server. /// /// Listens for incoming log connections from a Timely/Differential program running @@ -40,7 +42,7 @@ struct Args { } fn main() { - let args: Args = mz_ore::cli::parse_args(); + let args: Args = cli::parse_args(CliConfig::default()); let listener = TcpListener::bind((&*args.listen_addr, args.port)).expect("binding tcp listener"); diff --git a/src/kafka-util/src/bin/kgen.rs b/src/kafka-util/src/bin/kgen.rs index d81c6eb2f2533..f492c838538ca 100644 --- a/src/kafka-util/src/bin/kgen.rs +++ b/src/kafka-util/src/bin/kgen.rs @@ -33,6 +33,7 @@ use mz_avro::schema::{SchemaNode, SchemaPiece, SchemaPieceOrNamed}; use mz_avro::types::{DecimalValue, Value}; use mz_avro::Schema; use mz_ore::cast::CastFrom; +use mz_ore::cli::{self, CliConfig}; use mz_ore::retry::Retry; trait Generator: FnMut(&mut ThreadRng) -> R + Send + Sync { @@ -583,7 +584,7 @@ struct Args { #[tokio::main] async fn main() -> anyhow::Result<()> { - let args: Args = mz_ore::cli::parse_args(); + let args: Args = cli::parse_args(CliConfig::default()); let value_gen = match args.value_format { ValueFormat::Bytes => { diff --git a/src/materialized/src/bin/materialized/main.rs b/src/materialized/src/bin/materialized/main.rs index f1639f29b091f..456ecddad3315 100644 --- a/src/materialized/src/bin/materialized/main.rs +++ b/src/materialized/src/bin/materialized/main.rs @@ -51,7 +51,7 @@ use mz_frontegg_auth::{FronteggAuthentication, FronteggConfig}; use mz_orchestrator_kubernetes::{KubernetesImagePullPolicy, KubernetesOrchestratorConfig}; use mz_orchestrator_process::ProcessOrchestratorConfig; use mz_ore::cgroup::{detect_memory_limit, MemoryLimit}; -use mz_ore::cli::KeyValueArg; +use mz_ore::cli::{self, CliConfig, KeyValueArg}; use mz_ore::id_gen::PortAllocator; use mz_ore::metrics::MetricsRegistry; use mz_ore::now::SYSTEM_TIME; @@ -91,13 +91,12 @@ fn parse_optional_duration(s: &str) -> Result { #[derive(Parser, Debug)] #[clap( next_line_help = true, - args_override_self = true, version = VERSION.as_str(), long_version = LONG_VERSION.as_str(), )] pub struct Args { /// [DANGEROUS] Enable experimental features. - #[clap(long, env = "MZ_EXPERIMENTAL")] + #[clap(long, env = "EXPERIMENTAL")] experimental: bool, /// The address on which Prometheus metrics get exposed. @@ -109,7 +108,7 @@ pub struct Args { long, hide = true, value_name = "HOST:PORT", - env = "MZ_THIRD_PARTY_METRICS_ADDR" + env = "THIRD_PARTY_METRICS_ADDR" )] metrics_listen_addr: Option, @@ -137,7 +136,7 @@ pub struct Args { #[clap( long, hide = true, - env = "MZ_POD_NAME", + env = "POD_NAME", required_if_eq("orchestrator", "kubernetes") )] pod_name: Option, @@ -165,7 +164,7 @@ pub struct Args { computed_image: Option, /// The host on which processes spawned by the process orchestrator listen /// for connections. - #[clap(long, hide = true, env = "MZ_PROCESS_LISTEN_HOST")] + #[clap(long, hide = true, env = "PROCESS_LISTEN_HOST")] process_listen_host: Option, /// The image pull policy to use for services created by the Kubernetes /// orchestrator. @@ -183,7 +182,7 @@ pub struct Args { /// Address of a storage process that the controller should connect to. #[clap( long, - env = "MZ_STORAGE_CONTROLLER_ADDR", + env = "STORAGE_CONTROLLER_ADDR", value_name = "HOST:ADDR", conflicts_with = "orchestrator" )] @@ -193,10 +192,10 @@ pub struct Args { /// How much historical detail to maintain in arrangements. /// /// Set to "off" to disable logical compaction. - #[clap(long, env = "MZ_LOGICAL_COMPACTION_WINDOW", parse(try_from_str = parse_optional_duration), value_name = "DURATION", default_value = "1ms")] + #[clap(long, env = "LOGICAL_COMPACTION_WINDOW", parse(try_from_str = parse_optional_duration), value_name = "DURATION", default_value = "1ms")] logical_compaction_window: OptionalDuration, /// Default frequency with which to advance timestamps - #[clap(long, env = "MZ_TIMESTAMP_FREQUENCY", hide = true, parse(try_from_str = mz_repr::util::parse_duration), value_name = "DURATION", default_value = "1s")] + #[clap(long, env = "TIMESTAMP_FREQUENCY", hide = true, parse(try_from_str = mz_repr::util::parse_duration), value_name = "DURATION", default_value = "1s")] timestamp_frequency: Duration, // === Logging options. === @@ -225,7 +224,7 @@ pub struct Args { /// The default value for this option is "info". #[clap( long, - env = "MZ_LOG_FILTER", + env = "LOG_FILTER", value_name = "FILTER", default_value = "info" )] @@ -235,7 +234,7 @@ pub struct Args { /// The address on which to listen for connections. #[clap( long, - env = "MZ_LISTEN_ADDR", + env = "LISTEN_ADDR", value_name = "HOST:PORT", default_value = "127.0.0.1:6875" )] @@ -264,7 +263,7 @@ pub struct Args { /// The most secure mode is "verify-full". This is the default mode when /// the --tls-cert option is specified. Otherwise the default is "disable". #[clap( - long, env = "MZ_TLS_MODE", + long, env = "TLS_MODE", possible_values = &["disable", "require", "verify-ca", "verify-full"], default_value = "disable", default_value_ifs = &[ @@ -276,7 +275,7 @@ pub struct Args { tls_mode: String, #[clap( long, - env = "MZ_TLS_CA", + env = "TLS_CA", required_if_eq("tls-mode", "verify-ca"), required_if_eq("tls-mode", "verify-full"), value_name = "PATH" @@ -285,7 +284,7 @@ pub struct Args { /// Certificate file for TLS connections. #[clap( long, - env = "MZ_TLS_CERT", + env = "TLS_CERT", requires = "tls-key", required_if_eq_any(&[("tls-mode", "allow"), ("tls-mode", "require"), ("tls-mode", "verify-ca"), ("tls-mode", "verify-full")]), value_name = "PATH" @@ -294,7 +293,7 @@ pub struct Args { /// Private key file for TLS connections. #[clap( long, - env = "MZ_TLS_KEY", + env = "TLS_KEY", requires = "tls-cert", required_if_eq_any(&[("tls-mode", "allow"), ("tls-mode", "require"), ("tls-mode", "verify-ca"), ("tls-mode", "verify-full")]), value_name = "PATH" @@ -303,24 +302,19 @@ pub struct Args { /// Specifies the tenant id when authenticating users. Must be a valid UUID. #[clap( long, - env = "MZ_FRONTEGG_TENANT", + env = "FRONTEGG_TENANT", requires_all = &["frontegg-jwk", "frontegg-api-token-url"], hide = true )] frontegg_tenant: Option, /// JWK used to validate JWTs during user authentication as a PEM public /// key. Can optionally be base64 encoded with the URL-safe alphabet. - #[clap( - long, - env = "MZ_FRONTEGG_JWK", - requires = "frontegg-tenant", - hide = true - )] + #[clap(long, env = "FRONTEGG_JWK", requires = "frontegg-tenant", hide = true)] frontegg_jwk: Option, /// The full URL (including path) to the api-token endpoint. #[clap( long, - env = "MZ_FRONTEGG_API_TOKEN_URL", + env = "FRONTEGG_API_TOKEN_URL", requires = "frontegg-tenant", hide = true )] @@ -328,14 +322,14 @@ pub struct Args { /// A common string prefix that is expected to be present at the beginning of passwords. #[clap( long, - env = "MZ_FRONTEGG_PASSWORD_PREFIX", + env = "FRONTEGG_PASSWORD_PREFIX", requires = "frontegg-tenant", hide = true )] frontegg_password_prefix: Option, /// Enable cross-origin resource sharing (CORS) for HTTP requests from the /// specified origin. - #[structopt(long, env = "MZ_CORS_ALLOWED_ORIGIN", hide = true)] + #[structopt(long, env = "CORS_ALLOWED_ORIGIN", hide = true)] cors_allowed_origin: Vec, // === Storage options. === @@ -343,7 +337,7 @@ pub struct Args { #[clap( short = 'D', long, - env = "MZ_DATA_DIRECTORY", + env = "DATA_DIRECTORY", value_name = "PATH", default_value = "mzdata" )] @@ -351,13 +345,13 @@ pub struct Args { /// Where the persist library should store its blob data. /// /// Defaults to the `persist/blob` in the data directory. - #[clap(long, env = "MZ_PERSIST_BLOB_URL")] + #[clap(long, env = "PERSIST_BLOB_URL")] persist_blob_url: Option, /// Where the persist library should perform consensus. - #[clap(long, env = "MZ_PERSIST_CONSENSUS_URL")] + #[clap(long, env = "PERSIST_CONSENSUS_URL")] persist_consensus_url: Url, /// Postgres catalog stash connection string. - #[clap(long, env = "MZ_CATALOG_POSTGRES_STASH", value_name = "POSTGRES_URL")] + #[clap(long, env = "CATALOG_POSTGRES_STASH", value_name = "POSTGRES_URL")] catalog_postgres_stash: String, // === AWS options. === @@ -371,9 +365,9 @@ pub struct Args { /// If not provided, tracing is not sent. /// /// You most likely also need to provide - /// `--opentelemetry-header`/`MZ_OPENTELEMETRY_HEADER` + /// `--opentelemetry-header`/`OPENTELEMETRY_HEADER` /// depending on the collector you are talking to. - #[clap(long, env = "MZ_OPENTELEMETRY_ENDPOINT", hide = true)] + #[clap(long, env = "OPENTELEMETRY_ENDPOINT", hide = true)] opentelemetry_endpoint: Option, /// Headers to pass to the OpenTelemetry collector. @@ -382,7 +376,7 @@ pub struct Args { #[clap( long, value_name = "HEADER", - env = "MZ_OPENTELEMETRY_HEADER", + env = "OPENTELEMETRY_HEADER", requires = "opentelemetry-endpoint", use_value_delimiter = true, hide = true @@ -393,17 +387,17 @@ pub struct Args { /// Defaults to `debug`. #[clap( long, - env = "MZ_OPENTELEMETRY_LOG_FILTER", + env = "OPENTELEMETRY_LOG_FILTER", requires = "opentelemetry-log-filter", hide = true )] opentelemetry_log_filter: Option, - #[clap(long, env = "MZ_CLUSTER_REPLICA_SIZES")] + #[clap(long, env = "CLUSTER_REPLICA_SIZES")] cluster_replica_sizes: Option, /// Availability zones compute resources may be deployed in. - #[clap(long, env = "MZ_AVAILABILITY_ZONE", use_value_delimiter = true)] + #[clap(long, env = "AVAILABILITY_ZONE", use_value_delimiter = true)] availability_zone: Vec, #[cfg(feature = "tokio-console")] @@ -412,7 +406,7 @@ pub struct Args { tokio_console: bool, /// Prefix commands issued by the process orchestrator with the supplied value. - #[clap(long, env = "MZ_PROCESS_ORCHESTRATOR_WRAPPER")] + #[clap(long, env = "PROCESS_ORCHESTRATOR_WRAPPER")] process_orchestrator_wrapper: Option, } @@ -439,7 +433,11 @@ impl Orchestrator { } fn main() { - if let Err(err) = run(Args::parse()) { + let args = cli::parse_args(CliConfig { + env_prefix: Some("MZ_"), + enable_version_flag: true, + }); + if let Err(err) = run(args) { eprintln!("materialized: {:#}", err); process::exit(1); } diff --git a/src/orchestrator/src/lib.rs b/src/orchestrator/src/lib.rs index 230675e99ed84..910bed0840193 100644 --- a/src/orchestrator/src/lib.rs +++ b/src/orchestrator/src/lib.rs @@ -35,6 +35,7 @@ use serde::{Deserialize, Deserializer, Serialize}; pub trait Orchestrator: fmt::Debug + Send + Sync { // Default host used to bind to. fn listen_host(&self) -> &str; + /// Enter a namespace in the orchestrator. fn namespace(&self, namespace: &str) -> Arc; } diff --git a/src/ore/src/cli.rs b/src/ore/src/cli.rs index 172fc1038f15e..6acc1de809b44 100644 --- a/src/ore/src/cli.rs +++ b/src/ore/src/cli.rs @@ -15,6 +15,7 @@ //! Command-line parsing utilities. +use std::ffi::OsString; use std::fmt::Display; use std::str::FromStr; @@ -29,16 +30,54 @@ USAGE: {all-args}"; -/// Parses command-line arguments according to a `StructOpt` parser after +/// Configures command-line parsing via [`parse_args`]. +#[derive(Debug, Default, Clone)] +pub struct CliConfig<'a> { + /// An optional prefix to apply to the environment variable name for all + /// arguments with an environment variable fallback. + // + // TODO(benesch): switch to the clap-native `env_prefix` option if that + // gets implemented: https://github.com/clap-rs/clap/issues/3221. + pub env_prefix: Option<&'a str>, + /// Enable clap's built-in `--version` flag. + /// + /// We disable this by default because most of our binaries are not + /// meaningfully versioned. + pub enable_version_flag: bool, +} + +/// Parses command-line arguments according to a clap `Parser` after /// applying Materialize-specific customizations. -pub fn parse_args() -> O +pub fn parse_args(config: CliConfig) -> O where O: Parser, { - let clap = O::command() - .disable_version_flag(true) - .args_override_self(true) - .help_template(NO_VERSION_HELP_TEMPLATE); + // Construct the prefixed environment variable names for all + // environment-enabled arguments, if requested. We have to construct these + // names before constructing `clap` below to get the lifetimes to work out. + let arg_envs: Vec<_> = O::command() + .get_arguments() + .filter_map(|arg| match (config.env_prefix, arg.get_env()) { + (Some(prefix), Some(env)) => { + let mut prefixed_env = OsString::from(prefix); + prefixed_env.push(env); + Some((arg.get_id(), prefixed_env)) + } + _ => None, + }) + .collect(); + + let mut clap = O::command().args_override_self(true); + + if !config.enable_version_flag { + clap = clap.disable_version_flag(true); + clap = clap.help_template(NO_VERSION_HELP_TEMPLATE); + } + + for (arg, env) in &arg_envs { + clap = clap.mut_arg(*arg, |arg| arg.env_os(env)); + } + O::from_arg_matches(&clap.get_matches()).unwrap() } diff --git a/src/persist-client/examples/persistcli.rs b/src/persist-client/examples/persistcli.rs index d94d295f1dabc..e55cb0957d10b 100644 --- a/src/persist-client/examples/persistcli.rs +++ b/src/persist-client/examples/persistcli.rs @@ -20,6 +20,8 @@ use std::sync::Once; use tracing_subscriber::{EnvFilter, FmtSubscriber}; +use mz_ore::cli::{self, CliConfig}; + pub mod maelstrom; pub mod open_loop; pub mod source_example; @@ -43,7 +45,7 @@ fn main() { // that all logging goes to stderr. init_logging(); - let args: Args = mz_ore::cli::parse_args(); + let args: Args = cli::parse_args(CliConfig::default()); // Mirror the tokio Runtime configuration in our production binaries. let ncpus_useful = usize::max(1, std::cmp::min(num_cpus::get(), num_cpus::get_physical())); diff --git a/src/pgtest/src/main.rs b/src/pgtest/src/main.rs index b94d5361d6352..c9511cc3606bd 100644 --- a/src/pgtest/src/main.rs +++ b/src/pgtest/src/main.rs @@ -7,6 +7,8 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. +use mz_ore::cli::{self, CliConfig}; + /// Verifies the correctness of a PostgreSQL-like server. #[derive(clap::Parser)] struct Args { @@ -21,7 +23,7 @@ struct Args { } fn main() { - let args: Args = mz_ore::cli::parse_args(); + let args: Args = cli::parse_args(CliConfig::default()); mz_pgtest::walk( args.addr, args.user, diff --git a/src/s3-datagen/src/main.rs b/src/s3-datagen/src/main.rs index b108897e94d60..f397da3ca8368 100644 --- a/src/s3-datagen/src/main.rs +++ b/src/s3-datagen/src/main.rs @@ -20,6 +20,7 @@ use tracing::{error, info, Level}; use tracing_subscriber::filter::EnvFilter; use mz_ore::cast::CastFrom; +use mz_ore::cli::{self, CliConfig}; /// Generate meaningless data in S3 to test download speeds #[derive(Parser)] @@ -72,7 +73,7 @@ async fn main() { } async fn run() -> anyhow::Result<()> { - let args: Args = mz_ore::cli::parse_args(); + let args: Args = cli::parse_args(CliConfig::default()); tracing_subscriber::fmt() .with_env_filter(args.log_filter) diff --git a/src/sqllogictest/src/bin/sqllogictest.rs b/src/sqllogictest/src/bin/sqllogictest.rs index fed602d01e70c..4b91fe409ddc5 100644 --- a/src/sqllogictest/src/bin/sqllogictest.rs +++ b/src/sqllogictest/src/bin/sqllogictest.rs @@ -18,6 +18,7 @@ use chrono::Utc; use time::Instant; use walkdir::WalkDir; +use mz_ore::cli::{self, CliConfig}; use mz_sqllogictest::runner::{self, Outcomes, RunConfig, WriteFmt}; use mz_sqllogictest::util; @@ -59,7 +60,7 @@ async fn main() { mz_ore::panic::set_abort_on_panic(); mz_ore::test::init_logging_default("warn"); - let args: Args = mz_ore::cli::parse_args(); + let args: Args = cli::parse_args(CliConfig::default()); let config = RunConfig { stdout: &OutputStream::new(io::stdout(), args.timestamps), diff --git a/src/storaged/Cargo.toml b/src/storaged/Cargo.toml index 50023900ab4e6..8283fb5863b8f 100644 --- a/src/storaged/Cargo.toml +++ b/src/storaged/Cargo.toml @@ -1,7 +1,7 @@ [package] name = "storaged" description = "Materialize's storage server." -version = "0.0.0" +version = "0.26.1-dev" edition = "2021" rust-version = "1.61.0" publish = false @@ -18,6 +18,7 @@ mz-ore = { path = "../ore", features = ["task", "tracing_"] } mz-pid-file = { path = "../pid-file" } mz-prof = { path = "../prof" } mz-storage = { path = "../storage", features = ["server"] } +once_cell = "1.0.12" serde = "1.0.137" timely = { git = "https://github.com/TimelyDataflow/timely-dataflow", default-features = false, features = ["bincode"] } tokio = { version = "1.18.2", features = ["net"] } diff --git a/src/storaged/src/main.rs b/src/storaged/src/main.rs index 5928a4c4d86fe..be35b0713308c 100644 --- a/src/storaged/src/main.rs +++ b/src/storaged/src/main.rs @@ -15,6 +15,7 @@ use std::process; use anyhow::bail; use futures::sink::SinkExt; use futures::stream::TryStreamExt; +use once_cell::sync::Lazy; use serde::de::DeserializeOwned; use serde::ser::Serialize; use tokio::net::TcpListener; @@ -25,6 +26,7 @@ use tracing_subscriber::filter::Targets; use mz_build_info::{build_info, BuildInfo}; use mz_dataflow_types::client::{GenericClient, StorageClient}; use mz_dataflow_types::ConnectorContext; +use mz_ore::cli::{self, CliConfig}; use mz_ore::metrics::MetricsRegistry; use mz_ore::now::SYSTEM_TIME; use mz_pid_file::PidFile; @@ -44,25 +46,22 @@ static ALLOC: tikv_jemallocator::Jemalloc = tikv_jemallocator::Jemalloc; const BUILD_INFO: BuildInfo = build_info!(); +pub static VERSION: Lazy = Lazy::new(|| BUILD_INFO.human_version()); + /// Independent storage server for Materialize. #[derive(clap::Parser)] +#[clap(version = VERSION.as_str())] struct Args { /// The address on which to listen for a connection from the controller. #[clap( long, - env = "STORAGED_LISTEN_ADDR", + env = "LISTEN_ADDR", value_name = "HOST:PORT", default_value = "127.0.0.1:2100" )] listen_addr: String, /// Number of dataflow worker threads. - #[clap( - short, - long, - env = "STORAGED_WORKERS", - value_name = "W", - default_value = "1" - )] + #[clap(short, long, env = "WORKERS", value_name = "W", default_value = "1")] workers: usize, /// The hostnames of all storaged processes in the cluster. #[clap()] @@ -95,7 +94,7 @@ struct Args { /// The default value for this option is "info". #[clap( long, - env = "STORAGED_LOG_FILTER", + env = "LOG_FILTER", value_name = "FILTER", default_value = "info" )] @@ -108,7 +107,11 @@ struct Args { #[tokio::main] async fn main() { - if let Err(err) = run(mz_ore::cli::parse_args()).await { + let args = cli::parse_args(CliConfig { + env_prefix: Some("STORAGED_"), + enable_version_flag: true, + }); + if let Err(err) = run(args).await { eprintln!("storaged: fatal: {:#}", err); process::exit(1); } diff --git a/src/testdrive/src/bin/testdrive.rs b/src/testdrive/src/bin/testdrive.rs index 6e6e12989772b..3a262df1744bd 100644 --- a/src/testdrive/src/bin/testdrive.rs +++ b/src/testdrive/src/bin/testdrive.rs @@ -29,6 +29,7 @@ use tracing_subscriber::filter::EnvFilter; use url::Url; use walkdir::WalkDir; +use mz_ore::cli::{self, CliConfig}; use mz_ore::path::PathExt; use mz_testdrive::Config; @@ -177,7 +178,7 @@ struct Args { #[tokio::main] async fn main() { - let args: Args = mz_ore::cli::parse_args(); + let args: Args = cli::parse_args(CliConfig::default()); tracing_subscriber::fmt() .with_env_filter(args.log_filter) diff --git a/test/perf-kinesis/src/bin/perf-kinesis/main.rs b/test/perf-kinesis/src/bin/perf-kinesis/main.rs index f1c448a00c629..ad3ec4f9444b0 100644 --- a/test/perf-kinesis/src/bin/perf-kinesis/main.rs +++ b/test/perf-kinesis/src/bin/perf-kinesis/main.rs @@ -32,6 +32,7 @@ use rand::Rng; use tracing::info; use tracing_subscriber::filter::EnvFilter; +use mz_ore::cli::{self, CliConfig}; use mz_ore::task; use mz_test_util::mz_client; @@ -48,7 +49,7 @@ async fn main() { async fn run() -> Result<(), anyhow::Error> { let timer = std::time::Instant::now(); - let args: Args = mz_ore::cli::parse_args(); + let args: Args = cli::parse_args(CliConfig::default()); tracing_subscriber::fmt() .with_env_filter(args.log_filter)