diff --git a/Cargo.lock b/Cargo.lock index ef7044b212..f8231f085b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1361,6 +1361,7 @@ dependencies = [ "camino-tempfile", "derive_more", "expectorate", + "itertools 0.13.0", "omicron-common", "omicron-workspace-hack", "schemars", diff --git a/clickhouse-admin/api/src/lib.rs b/clickhouse-admin/api/src/lib.rs index 46b3b94df4..7e66c9ca06 100644 --- a/clickhouse-admin/api/src/lib.rs +++ b/clickhouse-admin/api/src/lib.rs @@ -4,7 +4,7 @@ use clickhouse_admin_types::config::{KeeperConfig, ReplicaConfig}; use clickhouse_admin_types::{ - KeeperSettings, Lgif, RaftConfig, ServerSettings, + KeeperConf, KeeperSettings, Lgif, RaftConfig, ServerSettings, }; use dropshot::{ HttpError, HttpResponseCreated, HttpResponseOk, RequestContext, TypedBody, @@ -75,4 +75,13 @@ pub trait ClickhouseAdminApi { async fn raft_config( rqctx: RequestContext, ) -> Result, HttpError>; + + /// Retrieve configuration information from a keeper node. + #[endpoint { + method = GET, + path = "/keeper/conf", + }] + async fn keeper_conf( + rqctx: RequestContext, + ) -> Result, HttpError>; } diff --git a/clickhouse-admin/src/clickhouse_cli.rs b/clickhouse-admin/src/clickhouse_cli.rs index 95cc3ead9b..a84e2b3404 100644 --- a/clickhouse-admin/src/clickhouse_cli.rs +++ b/clickhouse-admin/src/clickhouse_cli.rs @@ -4,7 +4,7 @@ use anyhow::Result; use camino::Utf8PathBuf; -use clickhouse_admin_types::{Lgif, RaftConfig}; +use clickhouse_admin_types::{KeeperConf, Lgif, RaftConfig}; use dropshot::HttpError; use illumos_utils::{output_to_exec_error, ExecutionError}; use slog::Logger; @@ -92,6 +92,16 @@ impl ClickhouseCli { .await } + pub async fn keeper_conf(&self) -> Result { + self.keeper_client_non_interactive( + "conf", + "Retrieve keeper node configuration information", + KeeperConf::parse, + self.log.clone().unwrap(), + ) + .await + } + async fn keeper_client_non_interactive( &self, query: &str, diff --git a/clickhouse-admin/src/http_entrypoints.rs b/clickhouse-admin/src/http_entrypoints.rs index d8a961ad47..b1571f26c7 100644 --- a/clickhouse-admin/src/http_entrypoints.rs +++ b/clickhouse-admin/src/http_entrypoints.rs @@ -5,7 +5,7 @@ use crate::context::ServerContext; use clickhouse_admin_api::*; use clickhouse_admin_types::config::{KeeperConfig, ReplicaConfig}; -use clickhouse_admin_types::{Lgif, RaftConfig}; +use clickhouse_admin_types::{KeeperConf, Lgif, RaftConfig}; use dropshot::{ HttpError, HttpResponseCreated, HttpResponseOk, RequestContext, TypedBody, }; @@ -63,4 +63,12 @@ impl ClickhouseAdminApi for ClickhouseAdminImpl { let output = ctx.clickhouse_cli().raft_config().await?; Ok(HttpResponseOk(output)) } + + async fn keeper_conf( + rqctx: RequestContext, + ) -> Result, HttpError> { + let ctx = rqctx.context(); + let output = ctx.clickhouse_cli().keeper_conf().await?; + Ok(HttpResponseOk(output)) + } } diff --git a/clickhouse-admin/tests/integration_test.rs b/clickhouse-admin/tests/integration_test.rs index 0691767d0e..048c92d090 100644 --- a/clickhouse-admin/tests/integration_test.rs +++ b/clickhouse-admin/tests/integration_test.rs @@ -6,7 +6,7 @@ use anyhow::Context; use camino::Utf8PathBuf; use clickhouse_admin_types::config::ClickhouseHost; use clickhouse_admin_types::{KeeperServerInfo, KeeperServerType, RaftConfig}; -use clickward::{BasePorts, Deployment, DeploymentConfig, KeeperId}; +use clickward::{BasePorts, Deployment, DeploymentConfig}; use dropshot::test_util::log_prefix_for_test; use omicron_clickhouse_admin::ClickhouseCli; use omicron_test_utils::dev::test_setup_log; @@ -51,7 +51,7 @@ async fn test_lgif_parsing() -> anyhow::Result<()> { .context("failed to generate config")?; deployment.deploy().context("failed to deploy")?; - wait_for_keepers(&log, &deployment, vec![KeeperId(1)]).await?; + wait_for_keepers(&log, &deployment, vec![clickward::KeeperId(1)]).await?; let clickhouse_cli = ClickhouseCli::new( Utf8PathBuf::from_str("clickhouse").unwrap(), @@ -108,7 +108,7 @@ async fn test_raft_config_parsing() -> anyhow::Result<()> { wait_for_keepers( &log, &deployment, - (1..=num_keepers).map(KeeperId).collect(), + (1..=num_keepers).map(clickward::KeeperId).collect(), ) .await?; @@ -143,3 +143,57 @@ async fn test_raft_config_parsing() -> anyhow::Result<()> { logctx.cleanup_successful(); Ok(()) } + +#[tokio::test] +async fn test_keeper_conf_parsing() -> anyhow::Result<()> { + let logctx = test_setup_log("test_keeper_conf_parsing"); + let log = logctx.log.clone(); + + let (parent_dir, prefix) = log_prefix_for_test(logctx.test_name()); + let path = parent_dir.join(format!("{prefix}-oximeter-clickward-test")); + std::fs::create_dir(&path)?; + + // We spin up several replicated clusters and must use a + // separate set of ports in case the tests run concurrently. + let base_ports = BasePorts { + keeper: 49000, + raft: 49100, + clickhouse_tcp: 49200, + clickhouse_http: 49300, + clickhouse_interserver_http: 49400, + }; + + let config = DeploymentConfig { + path: path.clone(), + base_ports, + cluster_name: "oximeter_cluster".to_string(), + }; + + let mut deployment = Deployment::new(config); + + // We only need a single keeper to test the conf command + let num_keepers = 1; + let num_replicas = 1; + deployment + .generate_config(num_keepers, num_replicas) + .context("failed to generate config")?; + deployment.deploy().context("failed to deploy")?; + + wait_for_keepers(&log, &deployment, vec![clickward::KeeperId(1)]).await?; + + let clickhouse_cli = ClickhouseCli::new( + Utf8PathBuf::from_str("clickhouse").unwrap(), + SocketAddrV6::new(Ipv6Addr::LOCALHOST, 49001, 0, 0), + ) + .with_log(log.clone()); + + let conf = clickhouse_cli.keeper_conf().await.unwrap(); + + assert_eq!(conf.server_id, clickhouse_admin_types::KeeperId(1)); + + info!(&log, "Cleaning up test"); + deployment.teardown()?; + std::fs::remove_dir_all(path)?; + logctx.cleanup_successful(); + Ok(()) +} diff --git a/clickhouse-admin/types/Cargo.toml b/clickhouse-admin/types/Cargo.toml index 5b2f532e74..a7f1da1c86 100644 --- a/clickhouse-admin/types/Cargo.toml +++ b/clickhouse-admin/types/Cargo.toml @@ -13,6 +13,7 @@ atomicwrites.workspace = true camino.workspace = true camino-tempfile.workspace = true derive_more.workspace = true +itertools.workspace = true omicron-common.workspace = true omicron-workspace-hack.workspace = true schemars.workspace = true diff --git a/clickhouse-admin/types/src/config.rs b/clickhouse-admin/types/src/config.rs index d8157475b0..fca74146f9 100644 --- a/clickhouse-admin/types/src/config.rs +++ b/clickhouse-admin/types/src/config.rs @@ -606,3 +606,17 @@ impl Display for LogLevel { write!(f, "{s}") } } + +impl FromStr for LogLevel { + type Err = Error; + + fn from_str(s: &str) -> Result { + if s == "trace" { + Ok(LogLevel::Trace) + } else if s == "debug" { + Ok(LogLevel::Debug) + } else { + bail!("{s} is not a valid log level") + } + } +} diff --git a/clickhouse-admin/types/src/lib.rs b/clickhouse-admin/types/src/lib.rs index 59b6b6a28b..6cad52262e 100644 --- a/clickhouse-admin/types/src/lib.rs +++ b/clickhouse-admin/types/src/lib.rs @@ -6,6 +6,7 @@ use anyhow::{bail, Context, Error, Result}; use atomicwrites::AtomicFile; use camino::Utf8PathBuf; use derive_more::{Add, AddAssign, Display, From}; +use itertools::Itertools; use schemars::JsonSchema; use serde::{Deserialize, Serialize}; use slog::{info, Logger}; @@ -251,7 +252,7 @@ impl Lgif { let s = String::from_utf8_lossy(data); info!( log, - "Retrieved data from `clickhouse keeper-config lgif`"; + "Retrieved data from `clickhouse keeper-client --q lgif`"; "output" => ?s ); @@ -402,7 +403,7 @@ impl RaftConfig { let s = String::from_utf8_lossy(data); info!( log, - "Retrieved data from `clickhouse keeper-config --q 'get /keeper/config'`"; + "Retrieved data from `clickhouse keeper-client --q 'get /keeper/config'`"; "output" => ?s ); @@ -501,6 +502,404 @@ impl RaftConfig { } } +// While we generally use "Config", in this case we use "Conf" +// as it is the four letter word command we are invoking: +// `clickhouse keeper-client --q conf` +/// Keeper configuration information +#[derive(Debug, Clone, PartialEq, Eq, Deserialize, Serialize, JsonSchema)] +#[serde(rename_all = "snake_case")] +pub struct KeeperConf { + /// Unique server id, each participant of the ClickHouse Keeper cluster must + /// have a unique number (1, 2, 3, and so on). + pub server_id: KeeperId, + /// Whether Ipv6 is enabled. + pub enable_ipv6: bool, + /// Port for a client to connect. + pub tcp_port: u16, + /// Allolw list of 4lw commands. + pub four_letter_word_allow_list: String, + /// Max size of batch in requests count before it will be sent to RAFT. + pub max_requests_batch_size: u64, + /// Min timeout for client session (ms). + pub min_session_timeout_ms: u64, + /// Max timeout for client session (ms). + pub session_timeout_ms: u64, + /// Timeout for a single client operation (ms). + pub operation_timeout_ms: u64, + /// How often ClickHouse Keeper checks for dead sessions and removes them (ms). + pub dead_session_check_period_ms: u64, + /// How often a ClickHouse Keeper leader will send heartbeats to followers (ms). + pub heart_beat_interval_ms: u64, + /// If the follower does not receive a heartbeat from the leader in this interval, + /// then it can initiate leader election. Must be less than or equal to + /// election_timeout_upper_bound_ms. Ideally they shouldn't be equal. + pub election_timeout_lower_bound_ms: u64, + /// If the follower does not receive a heartbeat from the leader in this interval, + /// then it must initiate leader election. + pub election_timeout_upper_bound_ms: u64, + /// How many coordination log records to store before compaction. + pub reserved_log_items: u64, + /// How often ClickHouse Keeper will create new snapshots + /// (in the number of records in logs). + pub snapshot_distance: u64, + /// Allow to forward write requests from followers to the leader. + pub auto_forwarding: bool, + /// Wait to finish internal connections and shutdown (ms). + pub shutdown_timeout: u64, + /// If the server doesn't connect to other quorum participants in the specified + /// timeout it will terminate (ms). + pub startup_timeout: u64, + /// Text logging level about coordination (trace, debug, and so on). + pub raft_logs_level: LogLevel, + /// How many snapshots to keep. + pub snapshots_to_keep: u64, + /// How many log records to store in a single file. + pub rotate_log_storage_interval: u64, + /// Threshold when leader considers follower as stale and sends the snapshot + /// to it instead of logs. + pub stale_log_gap: u64, + /// When the node became fresh. + pub fresh_log_gap: u64, + /// Max size in bytes of batch of requests that can be sent to RAFT. + pub max_requests_batch_bytes_size: u64, + /// Maximum number of requests that can be in queue for processing. + pub max_request_queue_size: u64, + /// Max size of batch of requests to try to get before proceeding with RAFT. + /// Keeper will not wait for requests but take only requests that are already + /// in the queue. + pub max_requests_quick_batch_size: u64, + /// Whether to execute read requests as writes through whole RAFT consesus with + /// similar speed. + pub quorum_reads: bool, + /// Whether to call fsync on each change in RAFT changelog. + pub force_sync: bool, + /// Whether to write compressed coordination logs in ZSTD format. + pub compress_logs: bool, + /// Whether to write compressed snapshots in ZSTD format (instead of custom LZ4). + pub compress_snapshots_with_zstd_format: bool, + /// How many times we will try to apply configuration change (add/remove server) + /// to the cluster. + pub configuration_change_tries_count: u64, + /// If connection to a peer is silent longer than this limit * (heartbeat interval), + /// we re-establish the connection. + pub raft_limits_reconnect_limit: u64, + /// Path to coordination logs, just like ZooKeeper it is best to store logs + /// on non-busy nodes. + #[schemars(schema_with = "path_schema")] + pub log_storage_path: Utf8PathBuf, + /// Name of disk used for logs. + pub log_storage_disk: String, + /// Path to coordination snapshots. + #[schemars(schema_with = "path_schema")] + pub snapshot_storage_path: Utf8PathBuf, + /// Name of disk used for storage. + pub snapshot_storage_disk: String, +} + +impl KeeperConf { + pub fn parse(log: &Logger, data: &[u8]) -> Result { + // Like Lgif, the reponse we get from running `clickhouse keeper-client -h {HOST} --q conf` + // isn't in any known format (e.g. JSON), but rather a series of lines with key-value + // pairs separated by a tab. + let s = String::from_utf8_lossy(data); + info!( + log, + "Retrieved data from `clickhouse keeper-client --q conf`"; + "output" => ?s + ); + + let expected = KeeperConf::expected_keys(); + + // Verify the output contains the same amount of lines as the expected keys. + // This will ensure we catch any new key-value pairs appended to the lgif output. + let lines = s.trim().lines(); + if expected.len() != lines.count() { + bail!( + "Output from the Keeper differs to the expected output keys \ + Output: {s:?} \ + Expected output keys: {expected:?}" + ); + } + + let mut vals: Vec<&str> = Vec::new(); + // The output from the `conf` command contains the `max_requests_batch_size` field + // twice. We make sure to only read it once. + for (line, expected_key) in s.lines().zip(expected.clone()).unique() { + let mut split = line.split('='); + let Some(key) = split.next() else { + bail!("Returned None while attempting to retrieve key"); + }; + if key != expected_key { + bail!("Extracted key `{key:?}` from output differs from expected key `{expected_key}`"); + } + let Some(val) = split.next() else { + bail!("Command output has a line that does not contain a key-value pair: {key:?}"); + }; + vals.push(val); + } + + let mut iter = vals.into_iter(); + let server_id = match u64::from_str(iter.next().unwrap()) { + Ok(v) => KeeperId(v), + Err(e) => bail!("Unable to convert value into u64: {e}"), + }; + + let enable_ipv6 = match bool::from_str(iter.next().unwrap()) { + Ok(v) => v, + Err(e) => bail!("Unable to convert value into bool: {e}"), + }; + + let tcp_port = match u16::from_str(iter.next().unwrap()) { + Ok(v) => v, + Err(e) => bail!("Unable to convert value into u16: {e}"), + }; + + let four_letter_word_allow_list = iter.next().unwrap().to_string(); + + let max_requests_batch_size = match u64::from_str(iter.next().unwrap()) + { + Ok(v) => v, + Err(e) => bail!("Unable to convert value into u64: {e}"), + }; + + let min_session_timeout_ms = match u64::from_str(iter.next().unwrap()) { + Ok(v) => v, + Err(e) => bail!("Unable to convert value into u64: {e}"), + }; + + let session_timeout_ms = match u64::from_str(iter.next().unwrap()) { + Ok(v) => v, + Err(e) => bail!("Unable to convert value into u64: {e}"), + }; + + let operation_timeout_ms = match u64::from_str(iter.next().unwrap()) { + Ok(v) => v, + Err(e) => bail!("Unable to convert value into u64: {e}"), + }; + + let dead_session_check_period_ms = + match u64::from_str(iter.next().unwrap()) { + Ok(v) => v, + Err(e) => bail!("Unable to convert value into u64: {e}"), + }; + + let heart_beat_interval_ms = match u64::from_str(iter.next().unwrap()) { + Ok(v) => v, + Err(e) => bail!("Unable to convert value into u64: {e}"), + }; + + let election_timeout_lower_bound_ms = + match u64::from_str(iter.next().unwrap()) { + Ok(v) => v, + Err(e) => bail!("Unable to convert value into u64: {e}"), + }; + + let election_timeout_upper_bound_ms = + match u64::from_str(iter.next().unwrap()) { + Ok(v) => v, + Err(e) => bail!("Unable to convert value into u64: {e}"), + }; + + let reserved_log_items = match u64::from_str(iter.next().unwrap()) { + Ok(v) => v, + Err(e) => bail!("Unable to convert value into u64: {e}"), + }; + + let snapshot_distance = match u64::from_str(iter.next().unwrap()) { + Ok(v) => v, + Err(e) => bail!("Unable to convert value into u64: {e}"), + }; + + let auto_forwarding = match bool::from_str(iter.next().unwrap()) { + Ok(v) => v, + Err(e) => bail!("Unable to convert value into bool: {e}"), + }; + + let shutdown_timeout = match u64::from_str(iter.next().unwrap()) { + Ok(v) => v, + Err(e) => bail!("Unable to convert value into u64 {e}"), + }; + + let startup_timeout = match u64::from_str(iter.next().unwrap()) { + Ok(v) => v, + Err(e) => bail!("Unable to convert value into u64: {e}"), + }; + + let raft_logs_level = match LogLevel::from_str(iter.next().unwrap()) { + Ok(v) => v, + Err(e) => bail!("Unable to convert value into LogLevel: {e}"), + }; + + let snapshots_to_keep = match u64::from_str(iter.next().unwrap()) { + Ok(v) => v, + Err(e) => bail!("Unable to convert value into u64: {e}"), + }; + + let rotate_log_storage_interval = + match u64::from_str(iter.next().unwrap()) { + Ok(v) => v, + Err(e) => bail!("Unable to convert value into u64 {e}"), + }; + + let stale_log_gap = match u64::from_str(iter.next().unwrap()) { + Ok(v) => v, + Err(e) => bail!("Unable to convert value into u64: {e}"), + }; + + let fresh_log_gap = match u64::from_str(iter.next().unwrap()) { + Ok(v) => v, + Err(e) => bail!("Unable to convert value into u64: {e}"), + }; + + let max_requests_batch_bytes_size = + match u64::from_str(iter.next().unwrap()) { + Ok(v) => v, + Err(e) => bail!("Unable to convert value into u64 {e}"), + }; + + let max_request_queue_size = match u64::from_str(iter.next().unwrap()) { + Ok(v) => v, + Err(e) => bail!("Unable to convert value into u64 {e}"), + }; + + let max_requests_quick_batch_size = + match u64::from_str(iter.next().unwrap()) { + Ok(v) => v, + Err(e) => bail!("Unable to convert value into u64 {e}"), + }; + + let quorum_reads = match bool::from_str(iter.next().unwrap()) { + Ok(v) => v, + Err(e) => bail!("Unable to convert value into bool: {e}"), + }; + + let force_sync = match bool::from_str(iter.next().unwrap()) { + Ok(v) => v, + Err(e) => bail!("Unable to convert value into bool: {e}"), + }; + + let compress_logs = match bool::from_str(iter.next().unwrap()) { + Ok(v) => v, + Err(e) => bail!("Unable to convert value into bool: {e}"), + }; + + let compress_snapshots_with_zstd_format = + match bool::from_str(iter.next().unwrap()) { + Ok(v) => v, + Err(e) => bail!("Unable to convert value into bool: {e}"), + }; + + let configuration_change_tries_count = + match u64::from_str(iter.next().unwrap()) { + Ok(v) => v, + Err(e) => bail!("Unable to convert value into u64: {e}"), + }; + + let raft_limits_reconnect_limit = + match u64::from_str(iter.next().unwrap()) { + Ok(v) => v, + Err(e) => bail!("Unable to convert value into u64: {e}"), + }; + + let log_storage_path = match Utf8PathBuf::from_str(iter.next().unwrap()) + { + Ok(v) => v, + Err(e) => bail!("Unable to convert value into Utf8PathBuf: {e}"), + }; + + let log_storage_disk = iter.next().unwrap().to_string(); + + let snapshot_storage_path = + match Utf8PathBuf::from_str(iter.next().unwrap()) { + Ok(v) => v, + Err(e) => { + bail!("Unable to convert value into Utf8PathBuf: {e}") + } + }; + + let snapshot_storage_disk = iter.next().unwrap().to_string(); + + Ok(Self { + server_id, + enable_ipv6, + tcp_port, + four_letter_word_allow_list, + max_requests_batch_size, + min_session_timeout_ms, + session_timeout_ms, + operation_timeout_ms, + dead_session_check_period_ms, + heart_beat_interval_ms, + election_timeout_lower_bound_ms, + election_timeout_upper_bound_ms, + reserved_log_items, + snapshot_distance, + auto_forwarding, + shutdown_timeout, + startup_timeout, + raft_logs_level, + snapshots_to_keep, + rotate_log_storage_interval, + stale_log_gap, + fresh_log_gap, + max_requests_batch_bytes_size, + max_request_queue_size, + max_requests_quick_batch_size, + quorum_reads, + force_sync, + compress_logs, + compress_snapshots_with_zstd_format, + configuration_change_tries_count, + raft_limits_reconnect_limit, + log_storage_path, + log_storage_disk, + snapshot_storage_path, + snapshot_storage_disk, + }) + } + + fn expected_keys() -> Vec<&'static str> { + vec![ + "server_id", + "enable_ipv6", + "tcp_port", + "four_letter_word_allow_list", + "max_requests_batch_size", + "min_session_timeout_ms", + "session_timeout_ms", + "operation_timeout_ms", + "dead_session_check_period_ms", + "heart_beat_interval_ms", + "election_timeout_lower_bound_ms", + "election_timeout_upper_bound_ms", + "reserved_log_items", + "snapshot_distance", + "auto_forwarding", + "shutdown_timeout", + "startup_timeout", + "raft_logs_level", + "snapshots_to_keep", + "rotate_log_storage_interval", + "stale_log_gap", + "fresh_log_gap", + "max_requests_batch_size", + "max_requests_batch_bytes_size", + "max_request_queue_size", + "max_requests_quick_batch_size", + "quorum_reads", + "force_sync", + "compress_logs", + "compress_snapshots_with_zstd_format", + "configuration_change_tries_count", + "raft_limits_reconnect_limit", + "log_storage_path", + "log_storage_disk", + "snapshot_storage_path", + "snapshot_storage_disk", + ] + } +} + #[cfg(test)] mod tests { use camino::Utf8PathBuf; @@ -511,9 +910,9 @@ mod tests { use std::str::FromStr; use crate::{ - ClickhouseHost, KeeperId, KeeperServerInfo, KeeperServerType, - KeeperSettings, Lgif, RaftConfig, RaftServerSettings, ServerId, - ServerSettings, + ClickhouseHost, KeeperConf, KeeperId, KeeperServerInfo, + KeeperServerType, KeeperSettings, Lgif, LogLevel, RaftConfig, + RaftServerSettings, ServerId, ServerSettings, }; fn log() -> slog::Logger { @@ -960,4 +1359,316 @@ mod tests { "Output is not as expected. Server identifier: '' Expected server identifier: 'server.{SERVER_ID}'", ); } + + #[test] + fn test_full_keeper_conf_parse_success() { + let log = log(); + // This data contains the duplicated "max_requests_batch_size" that occurs in the + // real conf command output + let data = + "server_id=1 +enable_ipv6=true +tcp_port=20001 +four_letter_word_allow_list=conf,cons,crst,envi,ruok,srst,srvr,stat,wchs,dirs,mntr,isro,rcvr,apiv,csnp,lgif,rqld,rclc,clrs,ftfl +max_requests_batch_size=100 +min_session_timeout_ms=10000 +session_timeout_ms=30000 +operation_timeout_ms=10000 +dead_session_check_period_ms=500 +heart_beat_interval_ms=500 +election_timeout_lower_bound_ms=1000 +election_timeout_upper_bound_ms=2000 +reserved_log_items=100000 +snapshot_distance=100000 +auto_forwarding=true +shutdown_timeout=5000 +startup_timeout=180000 +raft_logs_level=trace +snapshots_to_keep=3 +rotate_log_storage_interval=100000 +stale_log_gap=10000 +fresh_log_gap=200 +max_requests_batch_size=100 +max_requests_batch_bytes_size=102400 +max_request_queue_size=100000 +max_requests_quick_batch_size=100 +quorum_reads=false +force_sync=true +compress_logs=true +compress_snapshots_with_zstd_format=true +configuration_change_tries_count=20 +raft_limits_reconnect_limit=50 +log_storage_path=./deployment/keeper-1/coordination/log +log_storage_disk=LocalLogDisk +snapshot_storage_path=./deployment/keeper-1/coordination/snapshots +snapshot_storage_disk=LocalSnapshotDisk +\n" + .as_bytes(); + let conf = KeeperConf::parse(&log, data).unwrap(); + + assert!(conf.server_id == KeeperId(1)); + assert!(conf.enable_ipv6); + assert!(conf.tcp_port == 20001); + assert!(conf.four_letter_word_allow_list == *"conf,cons,crst,envi,ruok,srst,srvr,stat,wchs,dirs,mntr,isro,rcvr,apiv,csnp,lgif,rqld,rclc,clrs,ftfl" ); + assert!(conf.max_requests_batch_size == 100); + assert!(conf.min_session_timeout_ms == 10000); + assert!(conf.session_timeout_ms == 30000); + assert!(conf.operation_timeout_ms == 10000); + assert!(conf.dead_session_check_period_ms == 500); + assert!(conf.heart_beat_interval_ms == 500); + assert!(conf.election_timeout_lower_bound_ms == 1000); + assert!(conf.election_timeout_upper_bound_ms == 2000); + assert!(conf.reserved_log_items == 100000); + assert!(conf.snapshot_distance == 100000); + assert!(conf.auto_forwarding); + assert!(conf.shutdown_timeout == 5000); + assert!(conf.startup_timeout == 180000); + assert!(conf.raft_logs_level == LogLevel::Trace); + assert!(conf.snapshots_to_keep == 3); + assert!(conf.rotate_log_storage_interval == 100000); + assert!(conf.stale_log_gap == 10000); + assert!(conf.fresh_log_gap == 200); + assert!(conf.max_requests_batch_bytes_size == 102400); + assert!(conf.max_request_queue_size == 100000); + assert!(conf.max_requests_quick_batch_size == 100); + assert!(!conf.quorum_reads); + assert!(conf.force_sync); + assert!(conf.compress_logs); + assert!(conf.compress_snapshots_with_zstd_format); + assert!(conf.configuration_change_tries_count == 20); + assert!(conf.raft_limits_reconnect_limit == 50); + assert!( + conf.log_storage_path + == Utf8PathBuf::from_str( + "./deployment/keeper-1/coordination/log" + ) + .unwrap() + ); + assert!(conf.log_storage_disk == *"LocalLogDisk"); + assert!( + conf.snapshot_storage_path + == Utf8PathBuf::from_str( + "./deployment/keeper-1/coordination/snapshots" + ) + .unwrap() + ); + assert!(conf.snapshot_storage_disk == *"LocalSnapshotDisk") + } + + #[test] + fn test_missing_value_keeper_conf_parse_fail() { + let log = log(); + // This data contains the duplicated "max_requests_batch_size" that occurs in the + // real conf command output + let data = + "server_id=1 +enable_ipv6=true +tcp_port=20001 +four_letter_word_allow_list=conf,cons,crst,envi,ruok,srst,srvr,stat,wchs,dirs,mntr,isro,rcvr,apiv,csnp,lgif,rqld,rclc,clrs,ftfl +max_requests_batch_size=100 +min_session_timeout_ms=10000 +session_timeout_ms= +operation_timeout_ms=10000 +dead_session_check_period_ms=500 +heart_beat_interval_ms=500 +election_timeout_lower_bound_ms=1000 +election_timeout_upper_bound_ms=2000 +reserved_log_items=100000 +snapshot_distance=100000 +auto_forwarding=true +shutdown_timeout=5000 +startup_timeout=180000 +raft_logs_level=trace +snapshots_to_keep=3 +rotate_log_storage_interval=100000 +stale_log_gap=10000 +fresh_log_gap=200 +max_requests_batch_size=100 +max_requests_batch_bytes_size=102400 +max_request_queue_size=100000 +max_requests_quick_batch_size=100 +quorum_reads=false +force_sync=true +compress_logs=true +compress_snapshots_with_zstd_format=true +configuration_change_tries_count=20 +raft_limits_reconnect_limit=50 +log_storage_path=./deployment/keeper-1/coordination/log +log_storage_disk=LocalLogDisk +snapshot_storage_path=./deployment/keeper-1/coordination/snapshots +snapshot_storage_disk=LocalSnapshotDisk +\n" + .as_bytes(); + let result = KeeperConf::parse(&log, data); + let error = result.unwrap_err(); + let root_cause = error.root_cause(); + + assert_eq!( + format!("{}", root_cause), + "Unable to convert value into u64: cannot parse integer from empty string" + ); + } + + #[test] + fn test_malformed_output_keeper_conf_parse_fail() { + let log = log(); + // This data contains the duplicated "max_requests_batch_size" that occurs in the + // real conf command output + let data = + "server_id=1 +enable_ipv6=true +tcp_port=20001 +four_letter_word_allow_list=conf,cons,crst,envi,ruok,srst,srvr,stat,wchs,dirs,mntr,isro,rcvr,apiv,csnp,lgif,rqld,rclc,clrs,ftfl +max_requests_batch_size=100 +min_session_timeout_ms=10000 +session_timeout_ms +operation_timeout_ms=10000 +dead_session_check_period_ms=500 +heart_beat_interval_ms=500 +election_timeout_lower_bound_ms=1000 +election_timeout_upper_bound_ms=2000 +reserved_log_items=100000 +snapshot_distance=100000 +auto_forwarding=true +shutdown_timeout=5000 +startup_timeout=180000 +raft_logs_level=trace +snapshots_to_keep=3 +rotate_log_storage_interval=100000 +stale_log_gap=10000 +fresh_log_gap=200 +max_requests_batch_size=100 +max_requests_batch_bytes_size=102400 +max_request_queue_size=100000 +max_requests_quick_batch_size=100 +quorum_reads=false +force_sync=true +compress_logs=true +compress_snapshots_with_zstd_format=true +configuration_change_tries_count=20 +raft_limits_reconnect_limit=50 +log_storage_path=./deployment/keeper-1/coordination/log +log_storage_disk=LocalLogDisk +snapshot_storage_path=./deployment/keeper-1/coordination/snapshots +snapshot_storage_disk=LocalSnapshotDisk +\n" + .as_bytes(); + let result = KeeperConf::parse(&log, data); + let error = result.unwrap_err(); + let root_cause = error.root_cause(); + + assert_eq!( + format!("{}", root_cause), + "Command output has a line that does not contain a key-value pair: \"session_timeout_ms\"" + ); + } + + #[test] + fn test_missing_field_keeper_conf_parse_fail() { + let log = log(); + // This data contains the duplicated "max_requests_batch_size" that occurs in the + // real conf command output + let data = + "server_id=1 +enable_ipv6=true +tcp_port=20001 +four_letter_word_allow_list=conf,cons,crst,envi,ruok,srst,srvr,stat,wchs,dirs,mntr,isro,rcvr,apiv,csnp,lgif,rqld,rclc,clrs,ftfl +max_requests_batch_size=100 +min_session_timeout_ms=10000 +operation_timeout_ms=10000 +dead_session_check_period_ms=500 +heart_beat_interval_ms=500 +election_timeout_lower_bound_ms=1000 +election_timeout_upper_bound_ms=2000 +reserved_log_items=100000 +snapshot_distance=100000 +auto_forwarding=true +shutdown_timeout=5000 +startup_timeout=180000 +raft_logs_level=trace +snapshots_to_keep=3 +rotate_log_storage_interval=100000 +stale_log_gap=10000 +fresh_log_gap=200 +max_requests_batch_size=100 +max_requests_batch_bytes_size=102400 +max_request_queue_size=100000 +max_requests_quick_batch_size=100 +quorum_reads=false +force_sync=true +compress_logs=true +compress_snapshots_with_zstd_format=true +configuration_change_tries_count=20 +raft_limits_reconnect_limit=50 +log_storage_path=./deployment/keeper-1/coordination/log +log_storage_disk=LocalLogDisk +snapshot_storage_path=./deployment/keeper-1/coordination/snapshots +snapshot_storage_disk=LocalSnapshotDisk +\n" + .as_bytes(); + let result = KeeperConf::parse(&log, data); + let error = result.unwrap_err(); + let root_cause = error.root_cause(); + + assert_eq!( + format!("{}", root_cause), + "Output from the Keeper differs to the expected output keys \ + Output: \"server_id=1\\nenable_ipv6=true\\ntcp_port=20001\\nfour_letter_word_allow_list=conf,cons,crst,envi,ruok,srst,srvr,stat,wchs,dirs,mntr,isro,rcvr,apiv,csnp,lgif,rqld,rclc,clrs,ftfl\\nmax_requests_batch_size=100\\nmin_session_timeout_ms=10000\\noperation_timeout_ms=10000\\ndead_session_check_period_ms=500\\nheart_beat_interval_ms=500\\nelection_timeout_lower_bound_ms=1000\\nelection_timeout_upper_bound_ms=2000\\nreserved_log_items=100000\\nsnapshot_distance=100000\\nauto_forwarding=true\\nshutdown_timeout=5000\\nstartup_timeout=180000\\nraft_logs_level=trace\\nsnapshots_to_keep=3\\nrotate_log_storage_interval=100000\\nstale_log_gap=10000\\nfresh_log_gap=200\\nmax_requests_batch_size=100\\nmax_requests_batch_bytes_size=102400\\nmax_request_queue_size=100000\\nmax_requests_quick_batch_size=100\\nquorum_reads=false\\nforce_sync=true\\ncompress_logs=true\\ncompress_snapshots_with_zstd_format=true\\nconfiguration_change_tries_count=20\\nraft_limits_reconnect_limit=50\\nlog_storage_path=./deployment/keeper-1/coordination/log\\nlog_storage_disk=LocalLogDisk\\nsnapshot_storage_path=./deployment/keeper-1/coordination/snapshots\\nsnapshot_storage_disk=LocalSnapshotDisk\\n\\n\" \ + Expected output keys: [\"server_id\", \"enable_ipv6\", \"tcp_port\", \"four_letter_word_allow_list\", \"max_requests_batch_size\", \"min_session_timeout_ms\", \"session_timeout_ms\", \"operation_timeout_ms\", \"dead_session_check_period_ms\", \"heart_beat_interval_ms\", \"election_timeout_lower_bound_ms\", \"election_timeout_upper_bound_ms\", \"reserved_log_items\", \"snapshot_distance\", \"auto_forwarding\", \"shutdown_timeout\", \"startup_timeout\", \"raft_logs_level\", \"snapshots_to_keep\", \"rotate_log_storage_interval\", \"stale_log_gap\", \"fresh_log_gap\", \"max_requests_batch_size\", \"max_requests_batch_bytes_size\", \"max_request_queue_size\", \"max_requests_quick_batch_size\", \"quorum_reads\", \"force_sync\", \"compress_logs\", \"compress_snapshots_with_zstd_format\", \"configuration_change_tries_count\", \"raft_limits_reconnect_limit\", \"log_storage_path\", \"log_storage_disk\", \"snapshot_storage_path\", \"snapshot_storage_disk\"]" + ); + } + + #[test] + fn test_non_existent_key_keeper_conf_parse_fail() { + let log = log(); + // This data contains the duplicated "max_requests_batch_size" that occurs in the + // real conf command output + let data = + "server_id=1 +enable_ipv6=true +tcp_port=20001 +four_letter_word_allow_list=conf,cons,crst,envi,ruok,srst,srvr,stat,wchs,dirs,mntr,isro,rcvr,apiv,csnp,lgif,rqld,rclc,clrs,ftfl +max_requests_batch_size=100 +min_session_timeout_ms=10000 +session_timeout_fake=100 +operation_timeout_ms=10000 +dead_session_check_period_ms=500 +heart_beat_interval_ms=500 +election_timeout_lower_bound_ms=1000 +election_timeout_upper_bound_ms=2000 +reserved_log_items=100000 +snapshot_distance=100000 +auto_forwarding=true +shutdown_timeout=5000 +startup_timeout=180000 +raft_logs_level=trace +snapshots_to_keep=3 +rotate_log_storage_interval=100000 +stale_log_gap=10000 +fresh_log_gap=200 +max_requests_batch_size=100 +max_requests_batch_bytes_size=102400 +max_request_queue_size=100000 +max_requests_quick_batch_size=100 +quorum_reads=false +force_sync=true +compress_logs=true +compress_snapshots_with_zstd_format=true +configuration_change_tries_count=20 +raft_limits_reconnect_limit=50 +log_storage_path=./deployment/keeper-1/coordination/log +log_storage_disk=LocalLogDisk +snapshot_storage_path=./deployment/keeper-1/coordination/snapshots +snapshot_storage_disk=LocalSnapshotDisk +\n" + .as_bytes(); + let result = KeeperConf::parse(&log, data); + let error = result.unwrap_err(); + let root_cause = error.root_cause(); + + assert_eq!( + format!("{}", root_cause), + "Extracted key `\"session_timeout_fake\"` from output differs from expected key `session_timeout_ms`" + ); + } } diff --git a/openapi/clickhouse-admin.json b/openapi/clickhouse-admin.json index ab67d255a3..57e24eacb1 100644 --- a/openapi/clickhouse-admin.json +++ b/openapi/clickhouse-admin.json @@ -10,6 +10,30 @@ "version": "0.0.1" }, "paths": { + "/keeper/conf": { + "get": { + "summary": "Retrieve configuration information from a keeper node.", + "operationId": "keeper_conf", + "responses": { + "200": { + "description": "successful operation", + "content": { + "application/json": { + "schema": { + "$ref": "#/components/schemas/KeeperConf" + } + } + } + }, + "4XX": { + "$ref": "#/components/responses/Error" + }, + "5XX": { + "$ref": "#/components/responses/Error" + } + } + } + }, "/keeper/config": { "put": { "summary": "Generate a ClickHouse configuration file for a keeper node on a specified", @@ -200,6 +224,243 @@ "format": "uint64", "minimum": 0 }, + "KeeperConf": { + "description": "Keeper configuration information", + "type": "object", + "properties": { + "auto_forwarding": { + "description": "Allow to forward write requests from followers to the leader.", + "type": "boolean" + }, + "compress_logs": { + "description": "Whether to write compressed coordination logs in ZSTD format.", + "type": "boolean" + }, + "compress_snapshots_with_zstd_format": { + "description": "Whether to write compressed snapshots in ZSTD format (instead of custom LZ4).", + "type": "boolean" + }, + "configuration_change_tries_count": { + "description": "How many times we will try to apply configuration change (add/remove server) to the cluster.", + "type": "integer", + "format": "uint64", + "minimum": 0 + }, + "dead_session_check_period_ms": { + "description": "How often ClickHouse Keeper checks for dead sessions and removes them (ms).", + "type": "integer", + "format": "uint64", + "minimum": 0 + }, + "election_timeout_lower_bound_ms": { + "description": "If the follower does not receive a heartbeat from the leader in this interval, then it can initiate leader election. Must be less than or equal to election_timeout_upper_bound_ms. Ideally they shouldn't be equal.", + "type": "integer", + "format": "uint64", + "minimum": 0 + }, + "election_timeout_upper_bound_ms": { + "description": "If the follower does not receive a heartbeat from the leader in this interval, then it must initiate leader election.", + "type": "integer", + "format": "uint64", + "minimum": 0 + }, + "enable_ipv6": { + "description": "Whether Ipv6 is enabled.", + "type": "boolean" + }, + "force_sync": { + "description": "Whether to call fsync on each change in RAFT changelog.", + "type": "boolean" + }, + "four_letter_word_allow_list": { + "description": "Allolw list of 4lw commands.", + "type": "string" + }, + "fresh_log_gap": { + "description": "When the node became fresh.", + "type": "integer", + "format": "uint64", + "minimum": 0 + }, + "heart_beat_interval_ms": { + "description": "How often a ClickHouse Keeper leader will send heartbeats to followers (ms).", + "type": "integer", + "format": "uint64", + "minimum": 0 + }, + "log_storage_disk": { + "description": "Name of disk used for logs.", + "type": "string" + }, + "log_storage_path": { + "description": "Path to coordination logs, just like ZooKeeper it is best to store logs on non-busy nodes.", + "type": "string", + "format": "Utf8PathBuf" + }, + "max_request_queue_size": { + "description": "Maximum number of requests that can be in queue for processing.", + "type": "integer", + "format": "uint64", + "minimum": 0 + }, + "max_requests_batch_bytes_size": { + "description": "Max size in bytes of batch of requests that can be sent to RAFT.", + "type": "integer", + "format": "uint64", + "minimum": 0 + }, + "max_requests_batch_size": { + "description": "Max size of batch in requests count before it will be sent to RAFT.", + "type": "integer", + "format": "uint64", + "minimum": 0 + }, + "max_requests_quick_batch_size": { + "description": "Max size of batch of requests to try to get before proceeding with RAFT. Keeper will not wait for requests but take only requests that are already in the queue.", + "type": "integer", + "format": "uint64", + "minimum": 0 + }, + "min_session_timeout_ms": { + "description": "Min timeout for client session (ms).", + "type": "integer", + "format": "uint64", + "minimum": 0 + }, + "operation_timeout_ms": { + "description": "Timeout for a single client operation (ms).", + "type": "integer", + "format": "uint64", + "minimum": 0 + }, + "quorum_reads": { + "description": "Whether to execute read requests as writes through whole RAFT consesus with similar speed.", + "type": "boolean" + }, + "raft_limits_reconnect_limit": { + "description": "If connection to a peer is silent longer than this limit * (heartbeat interval), we re-establish the connection.", + "type": "integer", + "format": "uint64", + "minimum": 0 + }, + "raft_logs_level": { + "description": "Text logging level about coordination (trace, debug, and so on).", + "allOf": [ + { + "$ref": "#/components/schemas/LogLevel" + } + ] + }, + "reserved_log_items": { + "description": "How many coordination log records to store before compaction.", + "type": "integer", + "format": "uint64", + "minimum": 0 + }, + "rotate_log_storage_interval": { + "description": "How many log records to store in a single file.", + "type": "integer", + "format": "uint64", + "minimum": 0 + }, + "server_id": { + "description": "Unique server id, each participant of the ClickHouse Keeper cluster must have a unique number (1, 2, 3, and so on).", + "allOf": [ + { + "$ref": "#/components/schemas/KeeperId" + } + ] + }, + "session_timeout_ms": { + "description": "Max timeout for client session (ms).", + "type": "integer", + "format": "uint64", + "minimum": 0 + }, + "shutdown_timeout": { + "description": "Wait to finish internal connections and shutdown (ms).", + "type": "integer", + "format": "uint64", + "minimum": 0 + }, + "snapshot_distance": { + "description": "How often ClickHouse Keeper will create new snapshots (in the number of records in logs).", + "type": "integer", + "format": "uint64", + "minimum": 0 + }, + "snapshot_storage_disk": { + "description": "Name of disk used for storage.", + "type": "string" + }, + "snapshot_storage_path": { + "description": "Path to coordination snapshots.", + "type": "string", + "format": "Utf8PathBuf" + }, + "snapshots_to_keep": { + "description": "How many snapshots to keep.", + "type": "integer", + "format": "uint64", + "minimum": 0 + }, + "stale_log_gap": { + "description": "Threshold when leader considers follower as stale and sends the snapshot to it instead of logs.", + "type": "integer", + "format": "uint64", + "minimum": 0 + }, + "startup_timeout": { + "description": "If the server doesn't connect to other quorum participants in the specified timeout it will terminate (ms).", + "type": "integer", + "format": "uint64", + "minimum": 0 + }, + "tcp_port": { + "description": "Port for a client to connect.", + "type": "integer", + "format": "uint16", + "minimum": 0 + } + }, + "required": [ + "auto_forwarding", + "compress_logs", + "compress_snapshots_with_zstd_format", + "configuration_change_tries_count", + "dead_session_check_period_ms", + "election_timeout_lower_bound_ms", + "election_timeout_upper_bound_ms", + "enable_ipv6", + "force_sync", + "four_letter_word_allow_list", + "fresh_log_gap", + "heart_beat_interval_ms", + "log_storage_disk", + "log_storage_path", + "max_request_queue_size", + "max_requests_batch_bytes_size", + "max_requests_batch_size", + "max_requests_quick_batch_size", + "min_session_timeout_ms", + "operation_timeout_ms", + "quorum_reads", + "raft_limits_reconnect_limit", + "raft_logs_level", + "reserved_log_items", + "rotate_log_storage_interval", + "server_id", + "session_timeout_ms", + "shutdown_timeout", + "snapshot_distance", + "snapshot_storage_disk", + "snapshot_storage_path", + "snapshots_to_keep", + "stale_log_gap", + "startup_timeout", + "tcp_port" + ] + }, "KeeperConfig": { "description": "Configuration for a ClickHouse keeper", "type": "object",