diff --git a/.github/buildomat/jobs/package.sh b/.github/buildomat/jobs/package.sh index 77cc547b93..918538071d 100755 --- a/.github/buildomat/jobs/package.sh +++ b/.github/buildomat/jobs/package.sh @@ -104,6 +104,7 @@ ptime -m ./tools/build-global-zone-packages.sh "$tarball_src_dir" /work mkdir -p /work/zones zones=( out/clickhouse.tar.gz + out/clickhouse_keeper.tar.gz out/cockroachdb.tar.gz out/crucible-pantry.tar.gz out/crucible.tar.gz diff --git a/common/src/address.rs b/common/src/address.rs index 97d92ce1dc..a0490b9ae7 100644 --- a/common/src/address.rs +++ b/common/src/address.rs @@ -36,6 +36,7 @@ pub const PROPOLIS_PORT: u16 = 12400; pub const COCKROACH_PORT: u16 = 32221; pub const CRUCIBLE_PORT: u16 = 32345; pub const CLICKHOUSE_PORT: u16 = 8123; +pub const CLICKHOUSE_KEEPER_PORT: u16 = 9181; pub const OXIMETER_PORT: u16 = 12223; pub const DENDRITE_PORT: u16 = 12224; pub const DDMD_PORT: u16 = 8000; diff --git a/dev-tools/src/bin/omicron-dev.rs b/dev-tools/src/bin/omicron-dev.rs index c3a906c302..507e3bc918 100644 --- a/dev-tools/src/bin/omicron-dev.rs +++ b/dev-tools/src/bin/omicron-dev.rs @@ -268,7 +268,7 @@ async fn cmd_clickhouse_run(args: &ChRunArgs) -> Result<(), anyhow::Error> { // Start the database server process, possibly on a specific port let mut db_instance = - dev::clickhouse::ClickHouseInstance::new(args.port).await?; + dev::clickhouse::ClickHouseInstance::new_single_node(args.port).await?; println!( "omicron-dev: running ClickHouse with full command:\n\"clickhouse {}\"", db_instance.cmdline().join(" ") diff --git a/internal-dns-cli/src/bin/dnswait.rs b/internal-dns-cli/src/bin/dnswait.rs index 930f767da4..df4832f346 100644 --- a/internal-dns-cli/src/bin/dnswait.rs +++ b/internal-dns-cli/src/bin/dnswait.rs @@ -23,21 +23,31 @@ struct Opt { #[clap(long, action)] nameserver_addresses: Vec, - /// service name to be resolved (should be the target of a DNS name) + /// Service name to be resolved (should be the target of a DNS name) #[arg(value_enum)] srv_name: ServiceName, + + /// Output service host names only, omitting the port + #[clap(long, short = 'H', action)] + hostname_only: bool, } #[derive(Debug, Clone, Copy, ValueEnum)] #[value(rename_all = "kebab-case")] enum ServiceName { Cockroach, + Clickhouse, + ClickhouseKeeper, } impl From for internal_dns::ServiceName { fn from(value: ServiceName) -> Self { match value { ServiceName::Cockroach => internal_dns::ServiceName::Cockroach, + ServiceName::Clickhouse => internal_dns::ServiceName::Clickhouse, + ServiceName::ClickhouseKeeper => { + internal_dns::ServiceName::ClickhouseKeeper + } } } } @@ -91,7 +101,11 @@ async fn main() -> Result<()> { .context("unexpectedly gave up")?; for (target, port) in result { - println!("{}:{}", target, port) + if opt.hostname_only { + println!("{}", target) + } else { + println!("{}:{}", target, port) + } } Ok(()) diff --git a/internal-dns/src/config.rs b/internal-dns/src/config.rs index 3fcbbb208e..9a174c26b5 100644 --- a/internal-dns/src/config.rs +++ b/internal-dns/src/config.rs @@ -422,6 +422,10 @@ mod test { #[test] fn display_srv_service() { assert_eq!(ServiceName::Clickhouse.dns_name(), "_clickhouse._tcp",); + assert_eq!( + ServiceName::ClickhouseKeeper.dns_name(), + "_clickhouse-keeper._tcp", + ); assert_eq!(ServiceName::Cockroach.dns_name(), "_cockroach._tcp",); assert_eq!(ServiceName::InternalDns.dns_name(), "_nameservice._tcp",); assert_eq!(ServiceName::Nexus.dns_name(), "_nexus._tcp",); diff --git a/internal-dns/src/names.rs b/internal-dns/src/names.rs index 3f663263e1..663e04bcd9 100644 --- a/internal-dns/src/names.rs +++ b/internal-dns/src/names.rs @@ -17,6 +17,7 @@ pub const DNS_ZONE_EXTERNAL_TESTING: &str = "oxide-dev.test"; #[derive(Clone, Debug, Hash, Eq, Ord, PartialEq, PartialOrd)] pub enum ServiceName { Clickhouse, + ClickhouseKeeper, Cockroach, InternalDns, ExternalDns, @@ -38,6 +39,7 @@ impl ServiceName { fn service_kind(&self) -> &'static str { match self { ServiceName::Clickhouse => "clickhouse", + ServiceName::ClickhouseKeeper => "clickhouse-keeper", ServiceName::Cockroach => "cockroach", ServiceName::ExternalDns => "external-dns", ServiceName::InternalDns => "nameservice", @@ -61,6 +63,7 @@ impl ServiceName { pub(crate) fn dns_name(&self) -> String { match self { ServiceName::Clickhouse + | ServiceName::ClickhouseKeeper | ServiceName::Cockroach | ServiceName::InternalDns | ServiceName::ExternalDns diff --git a/nexus/benches/setup_benchmark.rs b/nexus/benches/setup_benchmark.rs index d9e9577a1f..304ccc8325 100644 --- a/nexus/benches/setup_benchmark.rs +++ b/nexus/benches/setup_benchmark.rs @@ -29,7 +29,7 @@ async fn do_crdb_setup() { // Wraps exclusively the ClickhouseDB portion of setup/teardown. async fn do_clickhouse_setup() { let mut clickhouse = - dev::clickhouse::ClickHouseInstance::new(0).await.unwrap(); + dev::clickhouse::ClickHouseInstance::new_single_node(0).await.unwrap(); clickhouse.cleanup().await.unwrap(); } diff --git a/nexus/db-model/src/dataset_kind.rs b/nexus/db-model/src/dataset_kind.rs index f4c6a5eee6..d068f48fd3 100644 --- a/nexus/db-model/src/dataset_kind.rs +++ b/nexus/db-model/src/dataset_kind.rs @@ -19,6 +19,7 @@ impl_enum_type!( Crucible => b"crucible" Cockroach => b"cockroach" Clickhouse => b"clickhouse" + ClickhouseKeeper => b"clickhouse_keeper" ExternalDns => b"external_dns" InternalDns => b"internal_dns" ); @@ -35,6 +36,9 @@ impl From for DatasetKind { internal_api::params::DatasetKind::Clickhouse => { DatasetKind::Clickhouse } + internal_api::params::DatasetKind::ClickhouseKeeper => { + DatasetKind::ClickhouseKeeper + } internal_api::params::DatasetKind::ExternalDns => { DatasetKind::ExternalDns } diff --git a/nexus/db-model/src/schema.rs b/nexus/db-model/src/schema.rs index 78ebb527cd..3fde9ee715 100644 --- a/nexus/db-model/src/schema.rs +++ b/nexus/db-model/src/schema.rs @@ -1130,7 +1130,7 @@ table! { /// /// This should be updated whenever the schema is changed. For more details, /// refer to: schema/crdb/README.adoc -pub const SCHEMA_VERSION: SemverVersion = SemverVersion::new(3, 0, 3); +pub const SCHEMA_VERSION: SemverVersion = SemverVersion::new(4, 0, 0); allow_tables_to_appear_in_same_query!( system_update, diff --git a/nexus/db-model/src/service_kind.rs b/nexus/db-model/src/service_kind.rs index afb29abaa7..d5a34f07db 100644 --- a/nexus/db-model/src/service_kind.rs +++ b/nexus/db-model/src/service_kind.rs @@ -18,6 +18,7 @@ impl_enum_type!( // Enum values Clickhouse => b"clickhouse" + ClickhouseKeeper => b"clickhouse_keeper" Cockroach => b"cockroach" Crucible => b"crucible" CruciblePantry => b"crucible_pantry" @@ -54,6 +55,9 @@ impl From for ServiceKind { internal_api::params::ServiceKind::Clickhouse => { ServiceKind::Clickhouse } + internal_api::params::ServiceKind::ClickhouseKeeper => { + ServiceKind::ClickhouseKeeper + } internal_api::params::ServiceKind::Cockroach => { ServiceKind::Cockroach } diff --git a/nexus/test-utils/src/lib.rs b/nexus/test-utils/src/lib.rs index 0f303b6b7e..24d9508468 100644 --- a/nexus/test-utils/src/lib.rs +++ b/nexus/test-utils/src/lib.rs @@ -321,7 +321,9 @@ impl<'a, N: NexusServer> ControlPlaneTestContextBuilder<'a, N> { let log = &self.logctx.log; debug!(log, "Starting Clickhouse"); let clickhouse = - dev::clickhouse::ClickHouseInstance::new(0).await.unwrap(); + dev::clickhouse::ClickHouseInstance::new_single_node(0) + .await + .unwrap(); let port = clickhouse.port(); let zpool_id = Uuid::new_v4(); diff --git a/nexus/tests/integration_tests/oximeter.rs b/nexus/tests/integration_tests/oximeter.rs index 7d1a4b318a..2cda594e18 100644 --- a/nexus/tests/integration_tests/oximeter.rs +++ b/nexus/tests/integration_tests/oximeter.rs @@ -110,7 +110,10 @@ async fn test_oximeter_reregistration() { ); let client = oximeter_db::Client::new(ch_address.into(), &context.logctx.log); - client.init_db().await.expect("Failed to initialize timeseries database"); + client + .init_single_node_db() + .await + .expect("Failed to initialize timeseries database"); // Helper to retrieve the timeseries from ClickHouse let timeseries_name = "integration_target:integration_metric"; diff --git a/nexus/types/src/internal_api/params.rs b/nexus/types/src/internal_api/params.rs index 89c7a88f1f..e2a5e3d094 100644 --- a/nexus/types/src/internal_api/params.rs +++ b/nexus/types/src/internal_api/params.rs @@ -125,6 +125,7 @@ pub enum DatasetKind { Crucible, Cockroach, Clickhouse, + ClickhouseKeeper, ExternalDns, InternalDns, } @@ -136,6 +137,7 @@ impl fmt::Display for DatasetKind { Crucible => "crucible", Cockroach => "cockroach", Clickhouse => "clickhouse", + ClickhouseKeeper => "clickhouse_keeper", ExternalDns => "external_dns", InternalDns => "internal_dns", }; @@ -168,6 +170,7 @@ pub struct ServiceNic { #[serde(rename_all = "snake_case", tag = "type", content = "content")] pub enum ServiceKind { Clickhouse, + ClickhouseKeeper, Cockroach, Crucible, CruciblePantry, @@ -186,6 +189,7 @@ impl fmt::Display for ServiceKind { use ServiceKind::*; let s = match self { Clickhouse => "clickhouse", + ClickhouseKeeper => "clickhouse_keeper", Cockroach => "cockroach", Crucible => "crucible", ExternalDns { .. } => "external_dns", diff --git a/openapi/nexus-internal.json b/openapi/nexus-internal.json index 80c5c2dc24..12043d8096 100644 --- a/openapi/nexus-internal.json +++ b/openapi/nexus-internal.json @@ -922,6 +922,7 @@ "crucible", "cockroach", "clickhouse", + "clickhouse_keeper", "external_dns", "internal_dns" ] @@ -2803,6 +2804,20 @@ "type" ] }, + { + "type": "object", + "properties": { + "type": { + "type": "string", + "enum": [ + "clickhouse_keeper" + ] + } + }, + "required": [ + "type" + ] + }, { "type": "object", "properties": { diff --git a/openapi/sled-agent.json b/openapi/sled-agent.json index 748c1a4716..c33653c468 100644 --- a/openapi/sled-agent.json +++ b/openapi/sled-agent.json @@ -1091,6 +1091,20 @@ "type" ] }, + { + "type": "object", + "properties": { + "type": { + "type": "string", + "enum": [ + "clickhouse_keeper" + ] + } + }, + "required": [ + "type" + ] + }, { "type": "object", "properties": { @@ -2524,6 +2538,24 @@ "type" ] }, + { + "type": "object", + "properties": { + "address": { + "type": "string" + }, + "type": { + "type": "string", + "enum": [ + "clickhouse_keeper" + ] + } + }, + "required": [ + "address", + "type" + ] + }, { "type": "object", "properties": { @@ -3115,6 +3147,7 @@ "type": "string", "enum": [ "clickhouse", + "clickhouse_keeper", "cockroach_db", "crucible_pantry", "crucible", diff --git a/oximeter/collector/src/lib.rs b/oximeter/collector/src/lib.rs index 5367b05056..bf75b567ea 100644 --- a/oximeter/collector/src/lib.rs +++ b/oximeter/collector/src/lib.rs @@ -321,7 +321,12 @@ impl OximeterAgent { ) }; let client = Client::new(db_address, &log); - client.init_db().await?; + let replicated = client.is_oximeter_cluster().await?; + if !replicated { + client.init_single_node_db().await?; + } else { + client.init_replicated_db().await?; + } // Spawn the task for aggregating and inserting all metrics tokio::spawn(async move { diff --git a/oximeter/db/src/bin/oxdb.rs b/oximeter/db/src/bin/oxdb.rs index 76152d7710..e14fdeb6a8 100644 --- a/oximeter/db/src/bin/oxdb.rs +++ b/oximeter/db/src/bin/oxdb.rs @@ -148,7 +148,7 @@ async fn make_client( let address = SocketAddr::new(address, port); let client = Client::new(address, &log); client - .init_db() + .init_single_node_db() .await .context("Failed to initialize timeseries database")?; Ok(client) @@ -261,13 +261,13 @@ async fn populate( Ok(()) } -async fn wipe_db( +async fn wipe_single_node_db( address: IpAddr, port: u16, log: Logger, ) -> Result<(), anyhow::Error> { let client = make_client(address, port, &log).await?; - client.wipe_db().await.context("Failed to wipe database") + client.wipe_single_node_db().await.context("Failed to wipe database") } async fn query( @@ -313,7 +313,7 @@ async fn main() { .unwrap(); } Subcommand::Wipe => { - wipe_db(args.address, args.port, log).await.unwrap() + wipe_single_node_db(args.address, args.port, log).await.unwrap() } Subcommand::Query { timeseries_name, diff --git a/oximeter/db/src/client.rs b/oximeter/db/src/client.rs index f566903f8f..ca45cef0b7 100644 --- a/oximeter/db/src/client.rs +++ b/oximeter/db/src/client.rs @@ -248,6 +248,13 @@ impl Client { .map_err(|e| Error::Database(e.to_string())) } + // Verifies if instance is part of oximeter_cluster + pub async fn is_oximeter_cluster(&self) -> Result { + let sql = String::from("SHOW CLUSTERS FORMAT JSONEachRow;"); + let res = self.execute_with_body(sql).await?; + Ok(res.contains("oximeter_cluster")) + } + // Verifies that the schema for a sample matches the schema in the database. // // If the schema exists in the database, and the sample matches that schema, `None` is @@ -432,11 +439,17 @@ pub trait DbWrite { /// Insert the given samples into the database. async fn insert_samples(&self, samples: &[Sample]) -> Result<(), Error>; - /// Initialize the telemetry database, creating tables as needed. - async fn init_db(&self) -> Result<(), Error>; + /// Initialize the replicated telemetry database, creating tables as needed. + async fn init_replicated_db(&self) -> Result<(), Error>; + + /// Initialize a single node telemetry database, creating tables as needed. + async fn init_single_node_db(&self) -> Result<(), Error>; - /// Wipe the ClickHouse database entirely. - async fn wipe_db(&self) -> Result<(), Error>; + /// Wipe the ClickHouse database entirely from a single node set up. + async fn wipe_single_node_db(&self) -> Result<(), Error>; + + /// Wipe the ClickHouse database entirely from a replicated set up. + async fn wipe_replicated_db(&self) -> Result<(), Error>; } #[async_trait] @@ -538,22 +551,41 @@ impl DbWrite for Client { Ok(()) } - /// Initialize the telemetry database, creating tables as needed. - async fn init_db(&self) -> Result<(), Error> { + /// Initialize the replicated telemetry database, creating tables as needed. + async fn init_replicated_db(&self) -> Result<(), Error> { // The HTTP client doesn't support multiple statements per query, so we break them out here // manually. debug!(self.log, "initializing ClickHouse database"); - let sql = include_str!("./db-init.sql"); + let sql = include_str!("./db-replicated-init.sql"); for query in sql.split("\n--\n") { self.execute(query.to_string()).await?; } Ok(()) } - /// Wipe the ClickHouse database entirely. - async fn wipe_db(&self) -> Result<(), Error> { + /// Initialize a single node telemetry database, creating tables as needed. + async fn init_single_node_db(&self) -> Result<(), Error> { + // The HTTP client doesn't support multiple statements per query, so we break them out here + // manually. + debug!(self.log, "initializing ClickHouse database"); + let sql = include_str!("./db-single-node-init.sql"); + for query in sql.split("\n--\n") { + self.execute(query.to_string()).await?; + } + Ok(()) + } + + /// Wipe the ClickHouse database entirely from a single node set up. + async fn wipe_single_node_db(&self) -> Result<(), Error> { + debug!(self.log, "wiping ClickHouse database"); + let sql = include_str!("./db-wipe-single-node.sql").to_string(); + self.execute(sql).await + } + + /// Wipe the ClickHouse database entirely from a replicated set up. + async fn wipe_replicated_db(&self) -> Result<(), Error> { debug!(self.log, "wiping ClickHouse database"); - let sql = include_str!("./db-wipe.sql").to_string(); + let sql = include_str!("./db-wipe-single-node.sql").to_string(); self.execute(sql).await } } @@ -605,6 +637,8 @@ mod tests { use oximeter::test_util; use oximeter::{Metric, Target}; use slog::o; + use std::time::Duration; + use tokio::time::sleep; // NOTE: It's important that each test run the ClickHouse server with different ports. // The tests each require a clean slate. Previously, we ran the tests in a different thread, @@ -621,28 +655,144 @@ mod tests { let log = slog::Logger::root(slog::Discard, o!()); // Let the OS assign a port and discover it after ClickHouse starts - let mut db = ClickHouseInstance::new(0) + let mut db = ClickHouseInstance::new_single_node(0) .await .expect("Failed to start ClickHouse"); let address = SocketAddr::new("::1".parse().unwrap(), db.port()); - Client::new(address, &log).wipe_db().await.unwrap(); + let client = Client::new(address, &log); + assert!(!client.is_oximeter_cluster().await.unwrap()); + + client.wipe_single_node_db().await.unwrap(); db.cleanup().await.expect("Failed to cleanup ClickHouse server"); } + #[tokio::test] + async fn test_build_replicated() { + let log = slog::Logger::root(slog::Discard, o!()); + + // Start all Keeper coordinator nodes + let cur_dir = std::env::current_dir().unwrap(); + let keeper_config = + cur_dir.as_path().join("src/configs/keeper_config.xml"); + + // Start Keeper 1 + let k1_port = 9181; + let k1_id = 1; + + let mut k1 = ClickHouseInstance::new_keeper( + k1_port, + k1_id, + keeper_config.clone(), + ) + .await + .expect("Failed to start ClickHouse keeper 1"); + + // Start Keeper 2 + let k2_port = 9182; + let k2_id = 2; + + let mut k2 = ClickHouseInstance::new_keeper( + k2_port, + k2_id, + keeper_config.clone(), + ) + .await + .expect("Failed to start ClickHouse keeper 2"); + + // Start Keeper 3 + let k3_port = 9183; + let k3_id = 3; + + let mut k3 = + ClickHouseInstance::new_keeper(k3_port, k3_id, keeper_config) + .await + .expect("Failed to start ClickHouse keeper 3"); + + // Start all replica nodes + let cur_dir = std::env::current_dir().unwrap(); + let replica_config = + cur_dir.as_path().join("src/configs/replica_config.xml"); + + // Start Replica 1 + let r1_port = 8123; + let r1_tcp_port = 9000; + let r1_interserver_port = 9009; + let r1_name = String::from("oximeter_cluster node 1"); + let r1_number = String::from("01"); + let mut db_1 = ClickHouseInstance::new_replicated( + r1_port, + r1_tcp_port, + r1_interserver_port, + r1_name, + r1_number, + replica_config.clone(), + ) + .await + .expect("Failed to start ClickHouse node 1"); + let r1_address = + SocketAddr::new("127.0.0.1".parse().unwrap(), db_1.port()); + + // Start Replica 2 + let r2_port = 8124; + let r2_tcp_port = 9001; + let r2_interserver_port = 9010; + let r2_name = String::from("oximeter_cluster node 2"); + let r2_number = String::from("02"); + let mut db_2 = ClickHouseInstance::new_replicated( + r2_port, + r2_tcp_port, + r2_interserver_port, + r2_name, + r2_number, + replica_config, + ) + .await + .expect("Failed to start ClickHouse node 2"); + let r2_address = + SocketAddr::new("127.0.0.1".parse().unwrap(), db_2.port()); + + // Create database in node 1 + let client_1 = Client::new(r1_address, &log); + assert!(client_1.is_oximeter_cluster().await.unwrap()); + client_1 + .init_replicated_db() + .await + .expect("Failed to initialize timeseries database"); + + // Wait to make sure data has been synchronised. + // TODO(https://github.com/oxidecomputer/omicron/issues/4001): Waiting for 5 secs is a bit sloppy, + // come up with a better way to do this. + sleep(Duration::from_secs(5)).await; + + // Verify database exists in node 2 + let client_2 = Client::new(r2_address, &log); + assert!(client_2.is_oximeter_cluster().await.unwrap()); + let sql = String::from("SHOW DATABASES FORMAT JSONEachRow;"); + + let result = client_2.execute_with_body(sql).await.unwrap(); + assert!(result.contains("oximeter")); + + k1.cleanup().await.expect("Failed to cleanup ClickHouse keeper 1"); + k2.cleanup().await.expect("Failed to cleanup ClickHouse keeper 2"); + k3.cleanup().await.expect("Failed to cleanup ClickHouse keeper 3"); + db_1.cleanup().await.expect("Failed to cleanup ClickHouse server 1"); + db_2.cleanup().await.expect("Failed to cleanup ClickHouse server 2"); + } + #[tokio::test] async fn test_client_insert() { let log = slog::Logger::root(slog::Discard, o!()); // Let the OS assign a port and discover it after ClickHouse starts - let mut db = ClickHouseInstance::new(0) + let mut db = ClickHouseInstance::new_single_node(0) .await .expect("Failed to start ClickHouse"); let address = SocketAddr::new("::1".parse().unwrap(), db.port()); let client = Client::new(address, &log); client - .init_db() + .init_single_node_db() .await .expect("Failed to initialize timeseries database"); let samples = { @@ -681,14 +831,14 @@ mod tests { let log = slog::Logger::root(slog::Discard, o!()); // Let the OS assign a port and discover it after ClickHouse starts - let mut db = ClickHouseInstance::new(0) + let mut db = ClickHouseInstance::new_single_node(0) .await .expect("Failed to start ClickHouse"); let address = SocketAddr::new("::1".parse().unwrap(), db.port()); let client = Client::new(address, &log); client - .init_db() + .init_single_node_db() .await .expect("Failed to initialize timeseries database"); let sample = test_util::make_sample(); @@ -715,14 +865,14 @@ mod tests { let log = slog::Logger::root(slog::Discard, o!()); // Let the OS assign a port and discover it after ClickHouse starts - let mut db = ClickHouseInstance::new(0) + let mut db = ClickHouseInstance::new_single_node(0) .await .expect("Failed to start ClickHouse"); let address = SocketAddr::new("::1".parse().unwrap(), db.port()); let client = Client::new(address, &log); client - .init_db() + .init_single_node_db() .await .expect("Failed to initialize timeseries database"); let sample = test_util::make_sample(); @@ -793,14 +943,14 @@ mod tests { let log = slog::Logger::root(slog_dtrace::Dtrace::new().0, o!()); // Let the OS assign a port and discover it after ClickHouse starts - let db = ClickHouseInstance::new(0) + let db = ClickHouseInstance::new_single_node(0) .await .expect("Failed to start ClickHouse"); let address = SocketAddr::new("::1".parse().unwrap(), db.port()); let client = Client::new(address, &log); client - .init_db() + .init_single_node_db() .await .expect("Failed to initialize timeseries database"); @@ -991,14 +1141,14 @@ mod tests { let log = Logger::root(slog::Discard, o!()); // Let the OS assign a port and discover it after ClickHouse starts - let db = ClickHouseInstance::new(0) + let db = ClickHouseInstance::new_single_node(0) .await .expect("Failed to start ClickHouse"); let address = SocketAddr::new("::1".parse().unwrap(), db.port()); let client = Client::new(address, &log); client - .init_db() + .init_single_node_db() .await .expect("Failed to initialize timeseries database"); @@ -1090,14 +1240,14 @@ mod tests { test_fn: impl Fn(&Service, &[RequestLatency], &[Sample], &[Timeseries]), ) { let (target, metrics, samples) = setup_select_test(); - let mut db = ClickHouseInstance::new(0) + let mut db = ClickHouseInstance::new_single_node(0) .await .expect("Failed to start ClickHouse"); let address = SocketAddr::new("::1".parse().unwrap(), db.port()); let log = Logger::root(slog::Discard, o!()); let client = Client::new(address, &log); client - .init_db() + .init_single_node_db() .await .expect("Failed to initialize timeseries database"); client @@ -1347,14 +1497,14 @@ mod tests { #[tokio::test] async fn test_select_timeseries_with_start_time() { let (_, metrics, samples) = setup_select_test(); - let mut db = ClickHouseInstance::new(0) + let mut db = ClickHouseInstance::new_single_node(0) .await .expect("Failed to start ClickHouse"); let address = SocketAddr::new("::1".parse().unwrap(), db.port()); let log = Logger::root(slog::Discard, o!()); let client = Client::new(address, &log); client - .init_db() + .init_single_node_db() .await .expect("Failed to initialize timeseries database"); client @@ -1391,14 +1541,14 @@ mod tests { #[tokio::test] async fn test_select_timeseries_with_limit() { let (_, _, samples) = setup_select_test(); - let mut db = ClickHouseInstance::new(0) + let mut db = ClickHouseInstance::new_single_node(0) .await .expect("Failed to start ClickHouse"); let address = SocketAddr::new("::1".parse().unwrap(), db.port()); let log = Logger::root(slog::Discard, o!()); let client = Client::new(address, &log); client - .init_db() + .init_single_node_db() .await .expect("Failed to initialize timeseries database"); client @@ -1509,14 +1659,14 @@ mod tests { #[tokio::test] async fn test_select_timeseries_with_order() { let (_, _, samples) = setup_select_test(); - let mut db = ClickHouseInstance::new(0) + let mut db = ClickHouseInstance::new_single_node(0) .await .expect("Failed to start ClickHouse"); let address = SocketAddr::new("::1".parse().unwrap(), db.port()); let log = Logger::root(slog::Discard, o!()); let client = Client::new(address, &log); client - .init_db() + .init_single_node_db() .await .expect("Failed to initialize timeseries database"); client diff --git a/oximeter/db/src/configs/keeper_config.xml b/oximeter/db/src/configs/keeper_config.xml new file mode 100644 index 0000000000..19ab99f909 --- /dev/null +++ b/oximeter/db/src/configs/keeper_config.xml @@ -0,0 +1,43 @@ + + + + trace + + + 1000M + 3 + + + + + + + + + + + + + 10000 + 30000 + trace + + + + + + 9234 + + + + + 9235 + + + + + 9236 + + + + diff --git a/oximeter/db/src/configs/replica_config.xml b/oximeter/db/src/configs/replica_config.xml new file mode 100644 index 0000000000..6a2cab5862 --- /dev/null +++ b/oximeter/db/src/configs/replica_config.xml @@ -0,0 +1,487 @@ + + + + + + + + trace + true + + + 1000M + 3 + + + + + + + + + + + + + + + + + + 4096 + + + 3 + + + + + false + + + /path/to/ssl_cert_file + /path/to/ssl_key_file + + + false + + + /path/to/ssl_ca_cert_file + + + none + + + 0 + + + -1 + -1 + + + false + + + + + + + + + + none + true + true + sslv2,sslv3 + true + + + + true + true + sslv2,sslv3 + true + + + + RejectCertificateHandler + + + + + + + + + 0 + + + 100 + + + 0 + + + + 10000 + + + + + + 0.9 + + + 4194304 + + + 0 + + + + + + 8589934592 + + + 5368709120 + + + + 1000 + + + 134217728 + + + 10000 + + + + + + + + + + + + + + false + + + false + + + false + + + false + + + + + random + + + + + + + + + ::/0 + + default + default + + + + + + + 3600 + 0 + 0 + 0 + 0 + 0 + + + + + + default + + + + + + default + + + true + + + false + + + + 01 + + oximeter_cluster + + + + + + mysecretphrase + + true + + + 9000 + + + + 9001 + + + + + + + + + 9181 + + + + 9182 + + + + 9183 + + + + + + 3600 + + + 3600 + + + 60 + + + + + + system + query_log
+ toYYYYMM(event_date) + + + 7500 +
+ + + + system + trace_log
+ + toYYYYMM(event_date) + 7500 +
+ + + + system + query_thread_log
+ toYYYYMM(event_date) + 7500 +
+ + + + system + query_views_log
+ toYYYYMM(event_date) + 7500 +
+ + + + system + part_log
+ toYYYYMM(event_date) + 7500 +
+ + + + system + metric_log
+ 7500 + 1000 +
+ + + + system + asynchronous_metric_log
+ + 7000 +
+ + + + system + crash_log
+ + + 1000 +
+ + + + system + processors_profile_log
+ + toYYYYMM(event_date) + 7500 +
+ + + + + /clickhouse/task_queue/ddl + + + 604800 + + + 60 + + + 1000 + + + + + + + + hide encrypt/decrypt arguments + ((?:aes_)?(?:encrypt|decrypt)(?:_mysql)?)\s*\(\s*(?:'(?:\\'|.)+'|.*?)\s*\) + \1(???) + + + + + false + false + +
diff --git a/oximeter/db/src/db-replicated-init.sql b/oximeter/db/src/db-replicated-init.sql new file mode 100644 index 0000000000..ddd973b825 --- /dev/null +++ b/oximeter/db/src/db-replicated-init.sql @@ -0,0 +1,294 @@ +CREATE DATABASE IF NOT EXISTS oximeter ON CLUSTER oximeter_cluster; +-- +CREATE TABLE IF NOT EXISTS oximeter.measurements_bool_local ON CLUSTER oximeter_cluster +( + timeseries_name String, + timeseries_key UInt64, + timestamp DateTime64(9, 'UTC'), + datum UInt8 +) +ENGINE = ReplicatedMergeTree('/clickhouse/tables/{shard}/measurements_bool_local', '{replica}') +ORDER BY (timeseries_name, timeseries_key, timestamp) +TTL toDateTime(timestamp) + INTERVAL 30 DAY; +-- +CREATE TABLE IF NOT EXISTS oximeter.measurements_bool ON CLUSTER oximeter_cluster +( + timeseries_name String, + timeseries_key UInt64, + timestamp DateTime64(9, 'UTC'), + datum UInt8 +) +ENGINE = Distributed('oximeter_cluster', 'oximeter', 'measurements_bool_local', timeseries_name) +ORDER BY (timeseries_name, timeseries_key, timestamp) +TTL toDateTime(timestamp) + INTERVAL 30 DAY; +-- +CREATE TABLE IF NOT EXISTS oximeter.measurements_i64_local ON CLUSTER oximeter_cluster +( + timeseries_name String, + timeseries_key UInt64, + timestamp DateTime64(9, 'UTC'), + datum Int64 +) +ENGINE = ReplicatedMergeTree('/clickhouse/tables/{shard}/measurements_i64_local', '{replica}') +ORDER BY (timeseries_name, timeseries_key, timestamp) +TTL toDateTime(timestamp) + INTERVAL 30 DAY; +-- +CREATE TABLE IF NOT EXISTS oximeter.measurements_i64 ON CLUSTER oximeter_cluster +( + timeseries_name String, + timeseries_key UInt64, + timestamp DateTime64(9, 'UTC'), + datum Int64 +) +ENGINE = Distributed('oximeter_cluster', 'oximeter', 'measurements_i64_local', timeseries_name) +ORDER BY (timeseries_name, timeseries_key, timestamp) +TTL toDateTime(timestamp) + INTERVAL 30 DAY; +-- +CREATE TABLE IF NOT EXISTS oximeter.measurements_f64_local ON CLUSTER oximeter_cluster +( + timeseries_name String, + timeseries_key UInt64, + timestamp DateTime64(9, 'UTC'), + datum Float64 +) +ENGINE = ReplicatedMergeTree('/clickhouse/tables/{shard}/measurements_f64_local', '{replica}') +ORDER BY (timeseries_name, timeseries_key, timestamp) +TTL toDateTime(timestamp) + INTERVAL 30 DAY; +-- +CREATE TABLE IF NOT EXISTS oximeter.measurements_f64 ON CLUSTER oximeter_cluster +( + timeseries_name String, + timeseries_key UInt64, + timestamp DateTime64(9, 'UTC'), + datum Float64 +) +ENGINE = Distributed('oximeter_cluster', 'oximeter', 'measurements_f64_local', timeseries_name) +ORDER BY (timeseries_name, timeseries_key, timestamp) +TTL toDateTime(timestamp) + INTERVAL 30 DAY; +-- +CREATE TABLE IF NOT EXISTS oximeter.measurements_string_local ON CLUSTER oximeter_cluster +( + timeseries_name String, + timeseries_key UInt64, + timestamp DateTime64(9, 'UTC'), + datum String +) +ENGINE = ReplicatedMergeTree('/clickhouse/tables/{shard}/measurements_string_local', '{replica}') +ORDER BY (timeseries_name, timeseries_key, timestamp) +TTL toDateTime(timestamp) + INTERVAL 30 DAY; +-- +CREATE TABLE IF NOT EXISTS oximeter.measurements_string ON CLUSTER oximeter_cluster +( + timeseries_name String, + timeseries_key UInt64, + timestamp DateTime64(9, 'UTC'), + datum String +) +ENGINE = Distributed('oximeter_cluster', 'oximeter', 'measurements_string_local', timeseries_name) +ORDER BY (timeseries_name, timeseries_key, timestamp) +TTL toDateTime(timestamp) + INTERVAL 30 DAY; +-- +CREATE TABLE IF NOT EXISTS oximeter.measurements_bytes_local ON CLUSTER oximeter_cluster +( + timeseries_name String, + timeseries_key UInt64, + timestamp DateTime64(9, 'UTC'), + datum Array(UInt8) +) +ENGINE = ReplicatedMergeTree('/clickhouse/tables/{shard}/measurements_bytes_local', '{replica}') +ORDER BY (timeseries_name, timeseries_key, timestamp) +TTL toDateTime(timestamp) + INTERVAL 30 DAY; +-- +CREATE TABLE IF NOT EXISTS oximeter.measurements_bytes ON CLUSTER oximeter_cluster +( + timeseries_name String, + timeseries_key UInt64, + timestamp DateTime64(9, 'UTC'), + datum Array(UInt8) +) +ENGINE = Distributed('oximeter_cluster', 'oximeter', 'measurements_bytes_local', timeseries_name) +ORDER BY (timeseries_name, timeseries_key, timestamp) +TTL toDateTime(timestamp) + INTERVAL 30 DAY; +-- +CREATE TABLE IF NOT EXISTS oximeter.measurements_cumulativei64_local ON CLUSTER oximeter_cluster +( + timeseries_name String, + timeseries_key UInt64, + start_time DateTime64(9, 'UTC'), + timestamp DateTime64(9, 'UTC'), + datum Int64 +) +ENGINE = ReplicatedMergeTree('/clickhouse/tables/{shard}/measurements_cumulativei64_local', '{replica}') +ORDER BY (timeseries_name, timeseries_key, start_time, timestamp) +TTL toDateTime(timestamp) + INTERVAL 30 DAY; +-- +CREATE TABLE IF NOT EXISTS oximeter.measurements_cumulativei64 ON CLUSTER oximeter_cluster +( + timeseries_name String, + timeseries_key UInt64, + start_time DateTime64(9, 'UTC'), + timestamp DateTime64(9, 'UTC'), + datum Int64 +) +ENGINE = Distributed('oximeter_cluster', 'oximeter', 'measurements_cumulativei64_local', timeseries_name) +ORDER BY (timeseries_name, timeseries_key, start_time, timestamp) +TTL toDateTime(timestamp) + INTERVAL 30 DAY; +-- +CREATE TABLE IF NOT EXISTS oximeter.measurements_cumulativef64_local ON CLUSTER oximeter_cluster +( + timeseries_name String, + timeseries_key UInt64, + start_time DateTime64(9, 'UTC'), + timestamp DateTime64(9, 'UTC'), + datum Float64 +) +ENGINE = ReplicatedMergeTree('/clickhouse/tables/{shard}/measurements_cumulativef64_local', '{replica}') +ORDER BY (timeseries_name, timeseries_key, start_time, timestamp) +TTL toDateTime(timestamp) + INTERVAL 30 DAY; +-- +CREATE TABLE IF NOT EXISTS oximeter.measurements_cumulativef64 ON CLUSTER oximeter_cluster +( + timeseries_name String, + timeseries_key UInt64, + start_time DateTime64(9, 'UTC'), + timestamp DateTime64(9, 'UTC'), + datum Float64 +) +ENGINE = Distributed('oximeter_cluster', 'oximeter', 'measurements_cumulativef64_local', timeseries_name) +ORDER BY (timeseries_name, timeseries_key, start_time, timestamp) +TTL toDateTime(timestamp) + INTERVAL 30 DAY; +-- +CREATE TABLE IF NOT EXISTS oximeter.measurements_histogrami64_local ON CLUSTER oximeter_cluster +( + timeseries_name String, + timeseries_key UInt64, + start_time DateTime64(9, 'UTC'), + timestamp DateTime64(9, 'UTC'), + bins Array(Int64), + counts Array(UInt64) +) +ENGINE = ReplicatedMergeTree('/clickhouse/tables/{shard}/measurements_histogrami64_local', '{replica}') +ORDER BY (timeseries_name, timeseries_key, start_time, timestamp) +TTL toDateTime(timestamp) + INTERVAL 30 DAY; +-- +CREATE TABLE IF NOT EXISTS oximeter.measurements_histogrami64 ON CLUSTER oximeter_cluster +( + timeseries_name String, + timeseries_key UInt64, + start_time DateTime64(9, 'UTC'), + timestamp DateTime64(9, 'UTC'), + bins Array(Int64), + counts Array(UInt64) +) +ENGINE = Distributed('oximeter_cluster', 'oximeter', 'measurements_histogrami64_local', timeseries_name) +ORDER BY (timeseries_name, timeseries_key, start_time, timestamp) +TTL toDateTime(timestamp) + INTERVAL 30 DAY; +-- +CREATE TABLE IF NOT EXISTS oximeter.measurements_histogramf64_local ON CLUSTER oximeter_cluster +( + timeseries_name String, + timeseries_key UInt64, + start_time DateTime64(9, 'UTC'), + timestamp DateTime64(9, 'UTC'), + bins Array(Float64), + counts Array(UInt64) +) +ENGINE = ReplicatedMergeTree('/clickhouse/tables/{shard}/measurements_histogramf64_local', '{replica}') +ORDER BY (timeseries_name, timeseries_key, start_time, timestamp) +TTL toDateTime(timestamp) + INTERVAL 30 DAY; +-- +CREATE TABLE IF NOT EXISTS oximeter.measurements_histogramf64 ON CLUSTER oximeter_cluster +( + timeseries_name String, + timeseries_key UInt64, + start_time DateTime64(9, 'UTC'), + timestamp DateTime64(9, 'UTC'), + bins Array(Float64), + counts Array(UInt64) +) +ENGINE = Distributed('oximeter_cluster', 'oximeter', 'measurements_histogramf64_local', timeseries_name) +ORDER BY (timeseries_name, timeseries_key, start_time, timestamp) +TTL toDateTime(timestamp) + INTERVAL 30 DAY; +-- +CREATE TABLE IF NOT EXISTS oximeter.fields_bool ON CLUSTER oximeter_cluster +( + timeseries_name String, + timeseries_key UInt64, + field_name String, + field_value UInt8 +) +ENGINE = ReplicatedReplacingMergeTree() +ORDER BY (timeseries_name, field_name, field_value, timeseries_key); +-- +CREATE TABLE IF NOT EXISTS oximeter.fields_i64 ON CLUSTER oximeter_cluster +( + timeseries_name String, + timeseries_key UInt64, + field_name String, + field_value Int64 +) +ENGINE = ReplicatedReplacingMergeTree() +ORDER BY (timeseries_name, field_name, field_value, timeseries_key); +-- +CREATE TABLE IF NOT EXISTS oximeter.fields_ipaddr ON CLUSTER oximeter_cluster +( + timeseries_name String, + timeseries_key UInt64, + field_name String, + field_value IPv6 +) +ENGINE = ReplicatedReplacingMergeTree() +ORDER BY (timeseries_name, field_name, field_value, timeseries_key); +-- +CREATE TABLE IF NOT EXISTS oximeter.fields_string ON CLUSTER oximeter_cluster +( + timeseries_name String, + timeseries_key UInt64, + field_name String, + field_value String +) +ENGINE = ReplicatedReplacingMergeTree() +ORDER BY (timeseries_name, field_name, field_value, timeseries_key); +-- +CREATE TABLE IF NOT EXISTS oximeter.fields_uuid ON CLUSTER oximeter_cluster +( + timeseries_name String, + timeseries_key UInt64, + field_name String, + field_value UUID +) +ENGINE = ReplicatedReplacingMergeTree() +ORDER BY (timeseries_name, field_name, field_value, timeseries_key); +-- +CREATE TABLE IF NOT EXISTS oximeter.timeseries_schema ON CLUSTER oximeter_cluster +( + timeseries_name String, + fields Nested( + name String, + type Enum( + 'Bool' = 1, + 'I64' = 2, + 'IpAddr' = 3, + 'String' = 4, + 'Uuid' = 6 + ), + source Enum( + 'Target' = 1, + 'Metric' = 2 + ) + ), + datum_type Enum( + 'Bool' = 1, + 'I64' = 2, + 'F64' = 3, + 'String' = 4, + 'Bytes' = 5, + 'CumulativeI64' = 6, + 'CumulativeF64' = 7, + 'HistogramI64' = 8, + 'HistogramF64' = 9 + ), + created DateTime64(9, 'UTC') +) +ENGINE = ReplicatedMergeTree() +ORDER BY (timeseries_name, fields.name); diff --git a/oximeter/db/src/db-init.sql b/oximeter/db/src/db-single-node-init.sql similarity index 100% rename from oximeter/db/src/db-init.sql rename to oximeter/db/src/db-single-node-init.sql diff --git a/oximeter/db/src/db-wipe-replicated.sql b/oximeter/db/src/db-wipe-replicated.sql new file mode 100644 index 0000000000..1ed4d270b7 --- /dev/null +++ b/oximeter/db/src/db-wipe-replicated.sql @@ -0,0 +1 @@ +DROP DATABASE IF EXISTS oximeter ON CLUSTER oximeter_cluster; diff --git a/oximeter/db/src/db-wipe.sql b/oximeter/db/src/db-wipe-single-node.sql similarity index 100% rename from oximeter/db/src/db-wipe.sql rename to oximeter/db/src/db-wipe-single-node.sql diff --git a/package-manifest.toml b/package-manifest.toml index 532510058a..728ed80079 100644 --- a/package-manifest.toml +++ b/package-manifest.toml @@ -114,13 +114,43 @@ output.type = "zone" [package.clickhouse] service_name = "clickhouse" only_for_targets.image = "standard" +source.type = "composite" +source.packages = [ "clickhouse_svc.tar.gz", "internal-dns-cli.tar.gz" ] +output.type = "zone" + +[package.clickhouse_svc] +service_name = "clickhouse_svc" +only_for_targets.image = "standard" source.type = "local" source.paths = [ { from = "out/clickhouse", to = "/opt/oxide/clickhouse" }, { from = "smf/clickhouse/manifest.xml", to = "/var/svc/manifest/site/clickhouse/manifest.xml" }, - { from = "smf/clickhouse/method_script.sh", to = "/opt/oxide/lib/svc/manifest/clickhouse.sh" } + { from = "smf/clickhouse/method_script.sh", to = "/opt/oxide/lib/svc/manifest/clickhouse.sh" }, + { from = "smf/clickhouse/config_replica.xml", to = "/opt/oxide/clickhouse/config.d/config_replica.xml" } ] output.type = "zone" +output.intermediate_only = true +setup_hint = "Run `./tools/ci_download_clickhouse` to download the necessary binaries" + +[package.clickhouse_keeper] +service_name = "clickhouse_keeper" +only_for_targets.image = "standard" +source.type = "composite" +source.packages = [ "clickhouse_keeper_svc.tar.gz", "internal-dns-cli.tar.gz" ] +output.type = "zone" + +[package.clickhouse_keeper_svc] +service_name = "clickhouse_keeper_svc" +only_for_targets.image = "standard" +source.type = "local" +source.paths = [ + { from = "out/clickhouse", to = "/opt/oxide/clickhouse_keeper" }, + { from = "smf/clickhouse_keeper/manifest.xml", to = "/var/svc/manifest/site/clickhouse_keeper/manifest.xml" }, + { from = "smf/clickhouse_keeper/method_script.sh", to = "/opt/oxide/lib/svc/manifest/clickhouse_keeper.sh" }, + { from = "smf/clickhouse_keeper/keeper_config.xml", to = "/opt/oxide/clickhouse_keeper/keeper_config.xml" } +] +output.type = "zone" +output.intermediate_only = true setup_hint = "Run `./tools/ci_download_clickhouse` to download the necessary binaries" [package.cockroachdb] diff --git a/schema/all-zone-requests.json b/schema/all-zone-requests.json index 72f60eecac..468f00ee0c 100644 --- a/schema/all-zone-requests.json +++ b/schema/all-zone-requests.json @@ -63,6 +63,20 @@ } } }, + { + "type": "object", + "required": [ + "type" + ], + "properties": { + "type": { + "type": "string", + "enum": [ + "clickhouse_keeper" + ] + } + } + }, { "type": "object", "required": [ @@ -554,6 +568,24 @@ } } }, + { + "type": "object", + "required": [ + "address", + "type" + ], + "properties": { + "address": { + "type": "string" + }, + "type": { + "type": "string", + "enum": [ + "clickhouse_keeper" + ] + } + } + }, { "type": "object", "required": [ @@ -706,6 +738,7 @@ "type": "string", "enum": [ "clickhouse", + "clickhouse_keeper", "cockroach_db", "crucible_pantry", "crucible", diff --git a/schema/crdb/4.0.0/up.sql b/schema/crdb/4.0.0/up.sql new file mode 100644 index 0000000000..f87308395a --- /dev/null +++ b/schema/crdb/4.0.0/up.sql @@ -0,0 +1,37 @@ +-- CRDB documentation recommends the following: +-- "Execute schema changes either as single statements (as an implicit transaction), +-- or in an explicit transaction consisting of the single schema change statement." +-- +-- For each schema change, we transactionally: +-- 1. Check the current version +-- 2. Apply the idempotent update + +BEGIN; +SELECT CAST( + IF( + ( + SELECT version = '3.0.3' and target_version = '4.0.0' + FROM omicron.public.db_metadata WHERE singleton = true + ), + 'true', + 'Invalid starting version for schema change' + ) AS BOOL +); + +ALTER TYPE omicron.public.service_kind ADD VALUE IF NOT EXISTS 'clickhouse_keeper'; +COMMIT; + +BEGIN; +SELECT CAST( + IF( + ( + SELECT version = '3.0.3' and target_version = '4.0.0' + FROM omicron.public.db_metadata WHERE singleton = true + ), + 'true', + 'Invalid starting version for schema change' + ) AS BOOL +); + +ALTER TYPE omicron.public.dataset_kind ADD VALUE IF NOT EXISTS 'clickhouse_keeper'; +COMMIT; \ No newline at end of file diff --git a/schema/crdb/dbinit.sql b/schema/crdb/dbinit.sql index 0d612e36ac..4b38b7dfe4 100644 --- a/schema/crdb/dbinit.sql +++ b/schema/crdb/dbinit.sql @@ -188,6 +188,7 @@ CREATE UNIQUE INDEX IF NOT EXISTS lookup_switch_by_rack ON omicron.public.switch CREATE TYPE IF NOT EXISTS omicron.public.service_kind AS ENUM ( 'clickhouse', + 'clickhouse_keeper', 'cockroach', 'crucible', 'crucible_pantry', @@ -391,6 +392,7 @@ CREATE TYPE IF NOT EXISTS omicron.public.dataset_kind AS ENUM ( 'crucible', 'cockroach', 'clickhouse', + 'clickhouse_keeper', 'external_dns', 'internal_dns' ); @@ -2560,7 +2562,7 @@ INSERT INTO omicron.public.db_metadata ( version, target_version ) VALUES - ( TRUE, NOW(), NOW(), '3.0.3', NULL) + ( TRUE, NOW(), NOW(), '4.0.0', NULL) ON CONFLICT DO NOTHING; COMMIT; diff --git a/schema/rss-service-plan.json b/schema/rss-service-plan.json index 2c6f1050da..725caf0900 100644 --- a/schema/rss-service-plan.json +++ b/schema/rss-service-plan.json @@ -63,6 +63,20 @@ } } }, + { + "type": "object", + "required": [ + "type" + ], + "properties": { + "type": { + "type": "string", + "enum": [ + "clickhouse_keeper" + ] + } + } + }, { "type": "object", "required": [ @@ -654,6 +668,24 @@ } } }, + { + "type": "object", + "required": [ + "address", + "type" + ], + "properties": { + "address": { + "type": "string" + }, + "type": { + "type": "string", + "enum": [ + "clickhouse_keeper" + ] + } + } + }, { "type": "object", "required": [ @@ -833,6 +865,7 @@ "type": "string", "enum": [ "clickhouse", + "clickhouse_keeper", "cockroach_db", "crucible_pantry", "crucible", diff --git a/sled-agent/src/params.rs b/sled-agent/src/params.rs index fcd6939413..5ef9594a2a 100644 --- a/sled-agent/src/params.rs +++ b/sled-agent/src/params.rs @@ -220,6 +220,7 @@ pub enum DatasetKind { CockroachDb, Crucible, Clickhouse, + ClickhouseKeeper, ExternalDns, InternalDns, } @@ -231,6 +232,7 @@ impl From for sled_agent_client::types::DatasetKind { CockroachDb => Self::CockroachDb, Crucible => Self::Crucible, Clickhouse => Self::Clickhouse, + ClickhouseKeeper => Self::ClickhouseKeeper, ExternalDns => Self::ExternalDns, InternalDns => Self::InternalDns, } @@ -244,6 +246,7 @@ impl From for nexus_client::types::DatasetKind { CockroachDb => Self::Cockroach, Crucible => Self::Crucible, Clickhouse => Self::Clickhouse, + ClickhouseKeeper => Self::ClickhouseKeeper, ExternalDns => Self::ExternalDns, InternalDns => Self::InternalDns, } @@ -257,6 +260,7 @@ impl std::fmt::Display for DatasetKind { Crucible => "crucible", CockroachDb { .. } => "cockroachdb", Clickhouse => "clickhouse", + ClickhouseKeeper => "clickhouse_keeper", ExternalDns { .. } => "external_dns", InternalDns { .. } => "internal_dns", }; @@ -356,6 +360,9 @@ pub enum ServiceType { Clickhouse { address: SocketAddrV6, }, + ClickhouseKeeper { + address: SocketAddrV6, + }, CockroachDb { address: SocketAddrV6, }, @@ -382,6 +389,9 @@ impl std::fmt::Display for ServiceType { ServiceType::Maghemite { .. } => write!(f, "mg-ddm"), ServiceType::SpSim => write!(f, "sp-sim"), ServiceType::Clickhouse { .. } => write!(f, "clickhouse"), + ServiceType::ClickhouseKeeper { .. } => { + write!(f, "clickhouse_keeper") + } ServiceType::CockroachDb { .. } => write!(f, "cockroachdb"), ServiceType::Crucible { .. } => write!(f, "crucible"), } @@ -484,6 +494,9 @@ impl TryFrom for sled_agent_client::types::ServiceType { St::Clickhouse { address } => { Ok(AutoSt::Clickhouse { address: address.to_string() }) } + St::ClickhouseKeeper { address } => { + Ok(AutoSt::ClickhouseKeeper { address: address.to_string() }) + } St::CockroachDb { address } => { Ok(AutoSt::CockroachDb { address: address.to_string() }) } @@ -508,6 +521,7 @@ impl TryFrom for sled_agent_client::types::ServiceType { #[serde(rename_all = "snake_case")] pub enum ZoneType { Clickhouse, + ClickhouseKeeper, CockroachDb, CruciblePantry, Crucible, @@ -523,6 +537,7 @@ impl From for sled_agent_client::types::ZoneType { fn from(zt: ZoneType) -> Self { match zt { ZoneType::Clickhouse => Self::Clickhouse, + ZoneType::ClickhouseKeeper => Self::ClickhouseKeeper, ZoneType::CockroachDb => Self::CockroachDb, ZoneType::Crucible => Self::Crucible, ZoneType::CruciblePantry => Self::CruciblePantry, @@ -541,6 +556,7 @@ impl std::fmt::Display for ZoneType { use ZoneType::*; let name = match self { Clickhouse => "clickhouse", + ClickhouseKeeper => "clickhouse_keeper", CockroachDb => "cockroachdb", Crucible => "crucible", CruciblePantry => "crucible_pantry", @@ -611,6 +627,7 @@ impl ServiceZoneRequest { ZoneType::Switch => None, // All other zones should be identified by their zone UUID. ZoneType::Clickhouse + | ZoneType::ClickhouseKeeper | ZoneType::CockroachDb | ZoneType::Crucible | ZoneType::ExternalDns @@ -760,6 +777,15 @@ impl ServiceZoneRequest { kind: NexusTypes::ServiceKind::Clickhouse, }); } + ServiceType::ClickhouseKeeper { address } => { + services.push(NexusTypes::ServicePutRequest { + service_id, + zone_id, + sled_id, + address: address.to_string(), + kind: NexusTypes::ServiceKind::ClickhouseKeeper, + }); + } ServiceType::Crucible { address } => { services.push(NexusTypes::ServicePutRequest { service_id, diff --git a/sled-agent/src/rack_setup/plan/service.rs b/sled-agent/src/rack_setup/plan/service.rs index 782188b2ec..2183aa7b63 100644 --- a/sled-agent/src/rack_setup/plan/service.rs +++ b/sled-agent/src/rack_setup/plan/service.rs @@ -56,7 +56,12 @@ const CRDB_COUNT: usize = 5; const OXIMETER_COUNT: usize = 1; // TODO(https://github.com/oxidecomputer/omicron/issues/732): Remove // when Nexus provisions Clickhouse. +// TODO(https://github.com/oxidecomputer/omicron/issues/4000): Set to 2 once we enable replicated ClickHouse const CLICKHOUSE_COUNT: usize = 1; +// TODO(https://github.com/oxidecomputer/omicron/issues/732): Remove +// when Nexus provisions Clickhouse keeper. +// TODO(https://github.com/oxidecomputer/omicron/issues/4000): Set to 3 once we enable replicated ClickHouse +const CLICKHOUSE_KEEPER_COUNT: usize = 0; // TODO(https://github.com/oxidecomputer/omicron/issues/732): Remove. // when Nexus provisions Crucible. const MINIMUM_U2_ZPOOL_COUNT: usize = 3; @@ -552,6 +557,46 @@ impl Plan { }); } + // Provision Clickhouse Keeper zones, continuing to stripe across sleds. + // TODO(https://github.com/oxidecomputer/omicron/issues/732): Remove + // Temporary linter rule until replicated Clickhouse is enabled + #[allow(clippy::reversed_empty_ranges)] + for _ in 0..CLICKHOUSE_KEEPER_COUNT { + let sled = { + let which_sled = + sled_allocator.next().ok_or(PlanError::NotEnoughSleds)?; + &mut sled_info[which_sled] + }; + let id = Uuid::new_v4(); + let ip = sled.addr_alloc.next().expect("Not enough addrs"); + let port = omicron_common::address::CLICKHOUSE_KEEPER_PORT; + let address = SocketAddrV6::new(ip, port, 0, 0); + let zone = dns_builder.host_zone(id, ip).unwrap(); + dns_builder + .service_backend_zone( + ServiceName::ClickhouseKeeper, + &zone, + port, + ) + .unwrap(); + let dataset_name = + sled.alloc_from_u2_zpool(DatasetKind::ClickhouseKeeper)?; + sled.request.services.push(ServiceZoneRequest { + id, + zone_type: ZoneType::ClickhouseKeeper, + addresses: vec![ip], + dataset: Some(DatasetRequest { + id, + name: dataset_name, + service_address: address, + }), + services: vec![ServiceZoneService { + id, + details: ServiceType::ClickhouseKeeper { address }, + }], + }); + } + // Provision Crucible Pantry zones, continuing to stripe across sleds. // TODO(https://github.com/oxidecomputer/omicron/issues/732): Remove for _ in 0..PANTRY_COUNT { diff --git a/sled-agent/src/services.rs b/sled-agent/src/services.rs index cfa1cdb5ac..a49847fea2 100644 --- a/sled-agent/src/services.rs +++ b/sled-agent/src/services.rs @@ -65,6 +65,7 @@ use itertools::Itertools; use omicron_common::address::Ipv6Subnet; use omicron_common::address::AZ_PREFIX; use omicron_common::address::BOOTSTRAP_ARTIFACT_PORT; +use omicron_common::address::CLICKHOUSE_KEEPER_PORT; use omicron_common::address::CLICKHOUSE_PORT; use omicron_common::address::COCKROACH_PORT; use omicron_common::address::CRUCIBLE_PANTRY_PORT; @@ -1000,6 +1001,41 @@ impl ServiceManager { Ok(()) } + async fn dns_install( + info: &SledAgentInfo, + ) -> Result { + // We want to configure the dns/install SMF service inside the + // zone with the list of DNS nameservers. This will cause + // /etc/resolv.conf to be populated inside the zone. To do + // this, we need the full list of nameservers. Fortunately, the + // nameservers provide a DNS name for the full list of + // nameservers. + // + // Note that when we configure the dns/install service, we're + // supplying values for an existing property group on the SMF + // *service*. We're not creating a new property group, nor are + // we configuring a property group on the instance. + let all_nameservers = info + .resolver + .lookup_all_ipv6(internal_dns::ServiceName::InternalDns) + .await?; + let mut dns_config_builder = PropertyGroupBuilder::new("install_props"); + for ns_addr in &all_nameservers { + dns_config_builder = dns_config_builder.add_property( + "nameserver", + "net_address", + &ns_addr.to_string(), + ); + } + Ok(ServiceBuilder::new("network/dns/install") + .add_property_group(dns_config_builder) + // We do need to enable the default instance of the + // dns/install service. It's enough to just mention it + // here, as the ServiceInstanceBuilder always enables the + // instance being added. + .add_instance(ServiceInstanceBuilder::new("default"))) + } + async fn initialize_zone( &self, request: &ZoneRequest, @@ -1084,6 +1120,9 @@ impl ServiceManager { let Some(info) = self.inner.sled_info.get() else { return Err(Error::SledAgentNotReady); }; + + let dns_service = Self::dns_install(info).await?; + let datalink = installed_zone.get_control_vnic_name(); let gateway = &info.underlay_address.to_string(); assert_eq!(request.zone.addresses.len(), 1); @@ -1096,13 +1135,15 @@ impl ServiceManager { .add_property("listen_addr", "astring", listen_addr) .add_property("listen_port", "astring", listen_port) .add_property("store", "astring", "/data"); - - let profile = ProfileBuilder::new("omicron").add_service( + let clickhouse_service = ServiceBuilder::new("oxide/clickhouse").add_instance( ServiceInstanceBuilder::new("default") .add_property_group(config), - ), - ); + ); + + let profile = ProfileBuilder::new("omicron") + .add_service(clickhouse_service) + .add_service(dns_service); profile .add_to_zone(&self.inner.log, &installed_zone) .await @@ -1111,42 +1152,51 @@ impl ServiceManager { })?; return Ok(RunningZone::boot(installed_zone).await?); } + ZoneType::ClickhouseKeeper => { + let Some(info) = self.inner.sled_info.get() else { + return Err(Error::SledAgentNotReady); + }; + + let dns_service = Self::dns_install(info).await?; + + let datalink = installed_zone.get_control_vnic_name(); + let gateway = &info.underlay_address.to_string(); + assert_eq!(request.zone.addresses.len(), 1); + let listen_addr = &request.zone.addresses[0].to_string(); + let listen_port = &CLICKHOUSE_KEEPER_PORT.to_string(); + + let config = PropertyGroupBuilder::new("config") + .add_property("datalink", "astring", datalink) + .add_property("gateway", "astring", gateway) + .add_property("listen_addr", "astring", listen_addr) + .add_property("listen_port", "astring", listen_port) + .add_property("store", "astring", "/data"); + let clickhouse_keeper_service = + ServiceBuilder::new("oxide/clickhouse_keeper") + .add_instance( + ServiceInstanceBuilder::new("default") + .add_property_group(config), + ); + let profile = ProfileBuilder::new("omicron") + .add_service(clickhouse_keeper_service) + .add_service(dns_service); + profile + .add_to_zone(&self.inner.log, &installed_zone) + .await + .map_err(|err| { + Error::io( + "Failed to setup clickhouse keeper profile", + err, + ) + })?; + return Ok(RunningZone::boot(installed_zone).await?); + } ZoneType::CockroachDb => { let Some(info) = self.inner.sled_info.get() else { return Err(Error::SledAgentNotReady); }; - // We want to configure the dns/install SMF service inside the - // zone with the list of DNS nameservers. This will cause - // /etc/resolv.conf to be populated inside the zone. To do - // this, we need the full list of nameservers. Fortunately, the - // nameservers provide a DNS name for the full list of - // nameservers. - // - // Note that when we configure the dns/install service, we're - // supplying values for an existing property group on the SMF - // *service*. We're not creating a new property group, nor are - // we configuring a property group on the instance. - let all_nameservers = info - .resolver - .lookup_all_ipv6(internal_dns::ServiceName::InternalDns) - .await?; - let mut dns_config_builder = - PropertyGroupBuilder::new("install_props"); - for ns_addr in &all_nameservers { - dns_config_builder = dns_config_builder.add_property( - "nameserver", - "net_address", - &ns_addr.to_string(), - ); - } - let dns_install = ServiceBuilder::new("network/dns/install") - .add_property_group(dns_config_builder) - // We do need to enable the default instance of the - // dns/install service. It's enough to just mention it - // here, as the ServiceInstanceBuilder always enables the - // instance being added. - .add_instance(ServiceInstanceBuilder::new("default")); + let dns_service = Self::dns_install(info).await?; // Configure the CockroachDB service. let datalink = installed_zone.get_control_vnic_name(); @@ -1173,7 +1223,7 @@ impl ServiceManager { let profile = ProfileBuilder::new("omicron") .add_service(cockroachdb_service) - .add_service(dns_install); + .add_service(dns_service); profile .add_to_zone(&self.inner.log, &installed_zone) .await @@ -1572,7 +1622,6 @@ impl ServiceManager { } ServiceType::Oximeter { address } => { info!(self.inner.log, "Setting up oximeter service"); - smfh.setprop("config/id", request.zone.id)?; smfh.setprop("config/address", address.to_string())?; smfh.refresh()?; @@ -1946,7 +1995,8 @@ impl ServiceManager { ServiceType::Crucible { .. } | ServiceType::CruciblePantry { .. } | ServiceType::CockroachDb { .. } - | ServiceType::Clickhouse { .. } => { + | ServiceType::Clickhouse { .. } + | ServiceType::ClickhouseKeeper { .. } => { panic!( "{} is a service which exists as part of a self-assembling zone", service.details, diff --git a/smf/clickhouse/config_replica.xml b/smf/clickhouse/config_replica.xml new file mode 100644 index 0000000000..180b906f64 --- /dev/null +++ b/smf/clickhouse/config_replica.xml @@ -0,0 +1,82 @@ + + + trace + true + + + 1000M + 3 + + + + + + + + + + + + + + + + + + + + + + + + + 01 + + oximeter_cluster + + + + + + mysecretphrase + + true + + + 9000 + + + + 9000 + + + + + + + + 9181 + + + + 9181 + + + + 9181 + + + + + + + + 604800 + + + 60 + + + 1000 + + diff --git a/smf/clickhouse/method_script.sh b/smf/clickhouse/method_script.sh index c278432110..3cc8c585ad 100755 --- a/smf/clickhouse/method_script.sh +++ b/smf/clickhouse/method_script.sh @@ -28,13 +28,136 @@ ipadm show-addr "$DATALINK/ll" || ipadm create-addr -t -T addrconf "$DATALINK/ll ipadm show-addr "$DATALINK/omicron6" || ipadm create-addr -t -T static -a "$LISTEN_ADDR" "$DATALINK/omicron6" route get -inet6 default -inet6 "$GATEWAY" || route add -inet6 default -inet6 "$GATEWAY" -args=( +# TEMPORARY: Racks will be set up with single node ClickHouse until +# Nexus provisions services so there is no divergence between racks +# https://github.com/oxidecomputer/omicron/issues/732 +single_node=true + +command=() +# TODO((https://github.com/oxidecomputer/omicron/issues/4000)): Remove single node mode once all racks are running in replicated mode +if $single_node +then + command+=( + "/opt/oxide/clickhouse/clickhouse" "server" "--log-file" "/var/tmp/clickhouse-server.log" "--errorlog-file" "/var/tmp/clickhouse-server.errlog" "--" "--path" "${DATASTORE}" "--listen_host" "$LISTEN_ADDR" "--http_port" "$LISTEN_PORT" -) + ) +else + # Retrieve hostnames (SRV records in internal DNS) of the clickhouse nodes. + CH_ADDRS="$(/opt/oxide/internal-dns-cli/bin/dnswait clickhouse -H)" + + if [[ -z "$CH_ADDRS" ]]; then + printf 'ERROR: found no hostnames for other ClickHouse nodes\n' >&2 + exit "$SMF_EXIT_ERR_CONFIG" + fi + + declare -a nodes=($CH_ADDRS) + + for i in "${nodes[@]}" + do + if ! grep -q "host.control-plane.oxide.internal" <<< "${i}"; then + printf 'ERROR: retrieved ClickHouse hostname does not match the expected format\n' >&2 + exit "$SMF_EXIT_ERR_CONFIG" + fi + done + + # Assign hostnames to replicas + REPLICA_HOST_01="${nodes[0]}" + REPLICA_HOST_02="${nodes[1]}" + + # Retrieve hostnames (SRV records in internal DNS) of the keeper nodes. + K_ADDRS="$(/opt/oxide/internal-dns-cli/bin/dnswait clickhouse-keeper -H)" + + if [[ -z "$K_ADDRS" ]]; then + printf 'ERROR: found no hostnames for other ClickHouse Keeper nodes\n' >&2 + exit "$SMF_EXIT_ERR_CONFIG" + fi + + declare -a keepers=($K_ADDRS) + + for i in "${keepers[@]}" + do + if ! grep -q "host.control-plane.oxide.internal" <<< "${i}"; then + printf 'ERROR: retrieved ClickHouse Keeper hostname does not match the expected format\n' >&2 + exit "$SMF_EXIT_ERR_CONFIG" + fi + done + + if [[ "${#keepers[@]}" != 3 ]] + then + printf "ERROR: expected 3 ClickHouse Keeper hosts, found "${#keepers[@]}" instead\n" >&2 + exit "$SMF_EXIT_ERR_CONFIG" + fi + + # Identify the node type this is as this will influence how the config is constructed + # TODO(https://github.com/oxidecomputer/omicron/issues/3824): There are probably much + # better ways to do this service discovery, but this works for now. + # The services contain the same IDs as the hostnames. + CLICKHOUSE_SVC="$(zonename | tr -dc [:digit:])" + REPLICA_IDENTIFIER_01="$( echo "${REPLICA_HOST_01}" | tr -dc [:digit:])" + REPLICA_IDENTIFIER_02="$( echo "${REPLICA_HOST_02}" | tr -dc [:digit:])" + if [[ $REPLICA_IDENTIFIER_01 == $CLICKHOUSE_SVC ]] + then + REPLICA_DISPLAY_NAME="oximeter_cluster node 1" + REPLICA_NUMBER="01" + elif [[ $REPLICA_IDENTIFIER_02 == $CLICKHOUSE_SVC ]] + then + REPLICA_DISPLAY_NAME="oximeter_cluster node 2" + REPLICA_NUMBER="02" + else + printf 'ERROR: service name does not match any of the identified ClickHouse hostnames\n' >&2 + exit "$SMF_EXIT_ERR_CONFIG" + fi + + # Setting environment variables this way is best practice, but has the downside of + # obscuring the field values to anyone ssh-ing into the zone. To mitigate this, + # we will be saving them to ${DATASTORE}/config_env_vars + export CH_LOG="${DATASTORE}/clickhouse-server.log" + export CH_ERROR_LOG="${DATASTORE}/clickhouse-server.errlog" + export CH_REPLICA_DISPLAY_NAME=${REPLICA_DISPLAY_NAME} + export CH_LISTEN_ADDR=${LISTEN_ADDR} + export CH_LISTEN_PORT=${LISTEN_PORT} + export CH_DATASTORE=${DATASTORE} + export CH_TMP_PATH="${DATASTORE}/tmp/" + export CH_USER_FILES_PATH="${DATASTORE}/user_files/" + export CH_USER_LOCAL_DIR="${DATASTORE}/access/" + export CH_FORMAT_SCHEMA_PATH="${DATASTORE}/format_schemas/" + export CH_REPLICA_NUMBER=${REPLICA_NUMBER} + export CH_REPLICA_HOST_01=${REPLICA_HOST_01} + export CH_REPLICA_HOST_02=${REPLICA_HOST_02} + export CH_KEEPER_HOST_01="${keepers[0]}" + export CH_KEEPER_HOST_02="${keepers[1]}" + export CH_KEEPER_HOST_03="${keepers[2]}" + + content="CH_LOG="${CH_LOG}"\n\ + CH_ERROR_LOG="${CH_ERROR_LOG}"\n\ + CH_REPLICA_DISPLAY_NAME="${CH_REPLICA_DISPLAY_NAME}"\n\ + CH_LISTEN_ADDR="${CH_LISTEN_ADDR}"\n\ + CH_LISTEN_PORT="${CH_LISTEN_PORT}"\n\ + CH_DATASTORE="${CH_DATASTORE}"\n\ + CH_TMP_PATH="${CH_TMP_PATH}"\n\ + CH_USER_FILES_PATH="${CH_USER_FILES_PATH}"\n\ + CH_USER_LOCAL_DIR="${CH_USER_LOCAL_DIR}"\n\ + CH_FORMAT_SCHEMA_PATH="${CH_FORMAT_SCHEMA_PATH}"\n\ + CH_REPLICA_NUMBER="${CH_REPLICA_NUMBER}"\n\ + CH_REPLICA_HOST_01="${CH_REPLICA_HOST_01}"\n\ + CH_REPLICA_HOST_02="${CH_REPLICA_HOST_02}"\n\ + CH_KEEPER_HOST_01="${CH_KEEPER_HOST_01}"\n\ + CH_KEEPER_HOST_02="${CH_KEEPER_HOST_02}"\n\ + CH_KEEPER_HOST_03="${CH_KEEPER_HOST_03}"" + + echo $content >> "${DATASTORE}/config_env_vars" + + + # The clickhouse binary must be run from within the directory that contains it. + # Otherwise, it does not automatically detect the configuration files, nor does + # it append them when necessary + cd /opt/oxide/clickhouse/ + command+=("./clickhouse" "server") +fi -exec /opt/oxide/clickhouse/clickhouse server "${args[@]}" & +exec "${command[@]}" & \ No newline at end of file diff --git a/smf/clickhouse_keeper/keeper_config.xml b/smf/clickhouse_keeper/keeper_config.xml new file mode 100644 index 0000000000..ec114694cc --- /dev/null +++ b/smf/clickhouse_keeper/keeper_config.xml @@ -0,0 +1,43 @@ + + + + trace + + + 1000M + 3 + + + + + + + + + + + + + 10000 + 30000 + trace + + + + + + 9234 + + + + + 9234 + + + + + 9234 + + + + diff --git a/smf/clickhouse_keeper/manifest.xml b/smf/clickhouse_keeper/manifest.xml new file mode 100644 index 0000000000..9e79cc131c --- /dev/null +++ b/smf/clickhouse_keeper/manifest.xml @@ -0,0 +1,43 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/smf/clickhouse_keeper/method_script.sh b/smf/clickhouse_keeper/method_script.sh new file mode 100755 index 0000000000..0e785f2aec --- /dev/null +++ b/smf/clickhouse_keeper/method_script.sh @@ -0,0 +1,128 @@ +#!/bin/bash + +set -x +set -o errexit +set -o pipefail + +. /lib/svc/share/smf_include.sh + +LISTEN_ADDR="$(svcprop -c -p config/listen_addr "${SMF_FMRI}")" +LISTEN_PORT="$(svcprop -c -p config/listen_port "${SMF_FMRI}")" +DATASTORE="$(svcprop -c -p config/store "${SMF_FMRI}")" +DATALINK="$(svcprop -c -p config/datalink "${SMF_FMRI}")" +GATEWAY="$(svcprop -c -p config/gateway "${SMF_FMRI}")" + +if [[ $DATALINK == unknown ]] || [[ $GATEWAY == unknown ]]; then + printf 'ERROR: missing datalink or gateway\n' >&2 + exit "$SMF_EXIT_ERR_CONFIG" +fi + +# TODO remove when https://github.com/oxidecomputer/stlouis/issues/435 is addressed +ipadm delete-if "$DATALINK" || true +ipadm create-if -t "$DATALINK" + +ipadm set-ifprop -t -p mtu=9000 -m ipv4 "$DATALINK" +ipadm set-ifprop -t -p mtu=9000 -m ipv6 "$DATALINK" + +ipadm show-addr "$DATALINK/ll" || ipadm create-addr -t -T addrconf "$DATALINK/ll" +ipadm show-addr "$DATALINK/omicron6" || ipadm create-addr -t -T static -a "$LISTEN_ADDR" "$DATALINK/omicron6" +route get -inet6 default -inet6 "$GATEWAY" || route add -inet6 default -inet6 "$GATEWAY" + +# Retrieve hostnames (SRV records in internal DNS) of all keeper nodes. +K_ADDRS="$(/opt/oxide/internal-dns-cli/bin/dnswait clickhouse-keeper -H)" + +if [[ -z "$K_ADDRS" ]]; then + printf 'ERROR: found no hostnames for other ClickHouse Keeper nodes\n' >&2 + exit "$SMF_EXIT_ERR_CONFIG" +fi + +declare -a keepers=($K_ADDRS) + +for i in "${keepers[@]}" +do + if ! grep -q "host.control-plane.oxide.internal" <<< "${i}"; then + printf 'ERROR: retrieved ClickHouse Keeper hostname does not match the expected format\n' >&2 + exit "$SMF_EXIT_ERR_CONFIG" + fi +done + +if [[ "${#keepers[@]}" != 3 ]] +then + printf "ERROR: expected 3 ClickHouse Keeper hosts, found "${#keepers[@]}" instead\n" >&2 + exit "$SMF_EXIT_ERR_CONFIG" +fi + +# Assign hostnames to replicas and keeper nodes +KEEPER_HOST_01="${keepers[0]}" +KEEPER_HOST_02="${keepers[1]}" +KEEPER_HOST_03="${keepers[2]}" + +# Generate unique reproduceable number IDs by removing letters from KEEPER_IDENTIFIER_* +# Keeper IDs must be numbers, and they cannot be reused (i.e. when a keeper node is +# unrecoverable the ID must be changed to something new). +# By trimming the hosts we can make sure all keepers will always be up to date when +# a new keeper is spun up. Clickhouse does not allow very large numbers, so we will +# be reducing to 7 characters. This should be enough entropy given the small amount +# of keepers we have. +KEEPER_ID_01="$( echo "${KEEPER_HOST_01}" | tr -dc [:digit:] | cut -c1-7)" +KEEPER_ID_02="$( echo "${KEEPER_HOST_02}" | tr -dc [:digit:] | cut -c1-7)" +KEEPER_ID_03="$( echo "${KEEPER_HOST_03}" | tr -dc [:digit:] | cut -c1-7)" + +# Identify the node type this is as this will influence how the config is constructed +# TODO(https://github.com/oxidecomputer/omicron/issues/3824): There are probably much better ways to do this service name lookup, but this works +# for now. The services contain the same IDs as the hostnames. +KEEPER_SVC="$(zonename | tr -dc [:digit:] | cut -c1-7)" +if [[ $KEEPER_ID_01 == $KEEPER_SVC ]] +then + KEEPER_ID_CURRENT=$KEEPER_ID_01 +elif [[ $KEEPER_ID_02 == $KEEPER_SVC ]] +then + KEEPER_ID_CURRENT=$KEEPER_ID_02 +elif [[ $KEEPER_ID_03 == $KEEPER_SVC ]] +then + KEEPER_ID_CURRENT=$KEEPER_ID_03 +else + printf 'ERROR: service name does not match any of the identified ClickHouse Keeper hostnames\n' >&2 + exit "$SMF_EXIT_ERR_CONFIG" +fi + +# Setting environment variables this way is best practice, but has the downside of +# obscuring the field values to anyone ssh=ing into the zone. To mitigate this, +# we will be saving them to ${DATASTORE}/config_env_vars +export CH_LOG="${DATASTORE}/clickhouse-keeper.log" +export CH_ERROR_LOG="${DATASTORE}/clickhouse-keeper.err.log" +export CH_LISTEN_ADDR=${LISTEN_ADDR} +export CH_DATASTORE=${DATASTORE} +export CH_LISTEN_PORT=${LISTEN_PORT} +export CH_KEEPER_ID_CURRENT=${KEEPER_ID_CURRENT} +export CH_LOG_STORAGE_PATH="${DATASTORE}/log" +export CH_SNAPSHOT_STORAGE_PATH="${DATASTORE}/snapshots" +export CH_KEEPER_ID_01=${KEEPER_ID_01} +export CH_KEEPER_ID_02=${KEEPER_ID_02} +export CH_KEEPER_ID_03=${KEEPER_ID_03} +export CH_KEEPER_HOST_01=${KEEPER_HOST_01} +export CH_KEEPER_HOST_02=${KEEPER_HOST_02} +export CH_KEEPER_HOST_03=${KEEPER_HOST_03} + +content="CH_LOG="${CH_LOG}"\n\ +CH_ERROR_LOG="${CH_ERROR_LOG}"\n\ +CH_LISTEN_ADDR="${CH_LISTEN_ADDR}"\n\ +CH_DATASTORE="${CH_DATASTORE}"\n\ +CH_LISTEN_PORT="${CH_LISTEN_PORT}"\n\ +CH_KEEPER_ID_CURRENT="${CH_KEEPER_ID_CURRENT}"\n\ +CH_LOG_STORAGE_PATH="${CH_LOG_STORAGE_PATH}"\n\ +CH_SNAPSHOT_STORAGE_PATH="${CH_SNAPSHOT_STORAGE_PATH}"\n\ +CH_KEEPER_ID_01="${CH_KEEPER_ID_01}"\n\ +CH_KEEPER_ID_02="${CH_KEEPER_ID_02}"\n\ +CH_KEEPER_ID_03="${CH_KEEPER_ID_03}"\n\ +CH_KEEPER_HOST_01="${CH_KEEPER_HOST_01}"\n\ +CH_KEEPER_HOST_02="${CH_KEEPER_HOST_02}"\n\ +CH_KEEPER_HOST_03="${CH_KEEPER_HOST_03}"" + +echo $content >> "${DATASTORE}/config_env_vars" + +# The clickhouse binary must be run from within the directory that contains it. +# Otherwise, it does not automatically detect the configuration files, nor does +# it append them when necessary +cd /opt/oxide/clickhouse_keeper/ +exec ./clickhouse keeper & diff --git a/smf/sled-agent/non-gimlet/config.toml b/smf/sled-agent/non-gimlet/config.toml index 63f4418eba..b4cb7e6cff 100644 --- a/smf/sled-agent/non-gimlet/config.toml +++ b/smf/sled-agent/non-gimlet/config.toml @@ -35,6 +35,10 @@ zpools = [ "oxp_f4b4dc87-ab46-49fb-a4b4-d361ae214c03", "oxp_14b4dc87-ab46-49fb-a4b4-d361ae214c03", "oxp_24b4dc87-ab46-49fb-a4b4-d361ae214c03", + "oxp_cd70d7f6-2354-4bf2-8012-55bf9eaf7930", + "oxp_ceb4461c-cf56-4719-ad3c-14430bfdfb60", + "oxp_31bd71cd-4736-4a12-a387-9b74b050396f", + "oxp_616b26df-e62a-4c68-b506-f4a923d8aaf7", ] # Percentage of usable physical DRAM to use for the VMM reservoir, which diff --git a/test-utils/src/dev/clickhouse.rs b/test-utils/src/dev/clickhouse.rs index 51c8f9965f..8e6920f0be 100644 --- a/test-utils/src/dev/clickhouse.rs +++ b/test-utils/src/dev/clickhouse.rs @@ -9,7 +9,7 @@ use std::process::Stdio; use std::time::Duration; use anyhow::Context; -use tempfile::TempDir; +use tempfile::{Builder, TempDir}; use thiserror::Error; use tokio::{ fs::File, @@ -20,9 +20,10 @@ use tokio::{ use crate::dev::poll; // Timeout used when starting up ClickHouse subprocess. -const CLICKHOUSE_TIMEOUT: Duration = Duration::from_secs(30); +// build-and-test (ubuntu-20.04) needs a little longer to get going +const CLICKHOUSE_TIMEOUT: Duration = Duration::from_secs(90); -/// A `ClickHouseInstance` is used to start and manage a ClickHouse server process. +/// A `ClickHouseInstance` is used to start and manage a ClickHouse single node server process. #[derive(Debug)] pub struct ClickHouseInstance { // Directory in which all data, logs, etc are stored. @@ -47,13 +48,16 @@ pub enum ClickHouseError { #[error("Invalid ClickHouse listening address")] InvalidAddress, + #[error("Invalid ClickHouse Keeper ID")] + InvalidKeeperId, + #[error("Failed to detect ClickHouse subprocess within timeout")] Timeout, } impl ClickHouseInstance { - /// Start a new ClickHouse server on the given IPv6 port. - pub async fn new(port: u16) -> Result { + /// Start a new single node ClickHouse server on the given IPv6 port. + pub async fn new_single_node(port: u16) -> Result { let data_dir = TempDir::new() .context("failed to create tempdir for ClickHouse data")?; let log_path = data_dir.path().join("clickhouse-server.log"); @@ -90,47 +94,8 @@ impl ClickHouseInstance { format!("failed to spawn `clickhouse` (with args: {:?})", &args) })?; - // Wait for the ClickHouse log file to become available, including the - // port number. - // - // We extract the port number from the log-file regardless of whether we - // know it already, as this is a more reliable check that the server is - // up and listening. Previously we only did this in the case we need to - // _learn_ the port, which introduces the possibility that we return - // from this function successfully, but the server itself is not yet - // ready to accept connections. let data_path = data_dir.path().to_path_buf(); - let port = poll::wait_for_condition( - || async { - let result = discover_local_listening_port( - &log_path, - CLICKHOUSE_TIMEOUT, - ) - .await; - match result { - // Successfully extracted the port, return it. - Ok(port) => Ok(port), - Err(e) => { - match e { - ClickHouseError::Io(ref inner) => { - if matches!( - inner.kind(), - std::io::ErrorKind::NotFound - ) { - return Err(poll::CondCheckError::NotYet); - } - } - _ => {} - } - Err(poll::CondCheckError::from(e)) - } - } - }, - &Duration::from_millis(500), - &CLICKHOUSE_TIMEOUT, - ) - .await - .context("waiting to discover ClickHouse port")?; + let port = wait_for_port(log_path).await?; Ok(Self { data_dir: Some(data_dir), @@ -141,6 +106,151 @@ impl ClickHouseInstance { }) } + /// Start a new replicated ClickHouse server on the given IPv6 port. + pub async fn new_replicated( + port: u16, + tcp_port: u16, + interserver_port: u16, + name: String, + r_number: String, + config_path: PathBuf, + ) -> Result { + let data_dir = TempDir::new() + .context("failed to create tempdir for ClickHouse data")?; + let log_path = data_dir.path().join("clickhouse-server.log"); + let err_log_path = data_dir.path().join("clickhouse-server.errlog"); + let tmp_path = data_dir.path().join("tmp/"); + let user_files_path = data_dir.path().join("user_files/"); + let access_path = data_dir.path().join("access/"); + let format_schemas_path = data_dir.path().join("format_schemas/"); + let args = vec![ + "server".to_string(), + "--config-file".to_string(), + format!("{}", config_path.display()), + ]; + + let child = tokio::process::Command::new("clickhouse") + .args(&args) + .stdin(Stdio::null()) + .stdout(Stdio::null()) + .stderr(Stdio::null()) + .env("CLICKHOUSE_WATCHDOG_ENABLE", "0") + .env("CH_LOG", &log_path) + .env("CH_ERROR_LOG", err_log_path) + .env("CH_REPLICA_DISPLAY_NAME", name) + .env("CH_LISTEN_ADDR", "::") + .env("CH_LISTEN_PORT", port.to_string()) + .env("CH_TCP_PORT", tcp_port.to_string()) + .env("CH_INTERSERVER_PORT", interserver_port.to_string()) + .env("CH_DATASTORE", data_dir.path()) + .env("CH_TMP_PATH", tmp_path) + .env("CH_USER_FILES_PATH", user_files_path) + .env("CH_USER_LOCAL_DIR", access_path) + .env("CH_FORMAT_SCHEMA_PATH", format_schemas_path) + .env("CH_REPLICA_NUMBER", r_number) + // There seems to be a bug using ipv6 with a replicated set up + // when installing all servers and coordinator nodes on the same + // server. For this reason we will be using ipv4 for testing. + .env("CH_REPLICA_HOST_01", "127.0.0.1") + .env("CH_REPLICA_HOST_02", "127.0.0.1") + .env("CH_KEEPER_HOST_01", "127.0.0.1") + .env("CH_KEEPER_HOST_02", "127.0.0.1") + .env("CH_KEEPER_HOST_03", "127.0.0.1") + .spawn() + .with_context(|| { + format!("failed to spawn `clickhouse` (with args: {:?})", &args) + })?; + + let data_path = data_dir.path().to_path_buf(); + + let result = wait_for_ready(log_path).await; + match result { + Ok(()) => Ok(Self { + data_dir: Some(data_dir), + data_path, + port, + args, + child: Some(child), + }), + Err(e) => Err(e), + } + } + + /// Start a new ClickHouse keeper on the given IPv6 port. + pub async fn new_keeper( + port: u16, + k_id: u16, + config_path: PathBuf, + ) -> Result { + // We assume that only 3 keepers will be run, and the ID of the keeper can only + // be one of "1", "2" or "3". This is to avoid having to pass the IDs of the + // other keepers as part of the function's parameters. + if ![1, 2, 3].contains(&k_id) { + return Err(ClickHouseError::InvalidKeeperId.into()); + } + // Keepers do not allow a dot in the beginning of the directory, so we must + // use a prefix. + let data_dir = Builder::new() + .prefix("k") + .tempdir() + .context("failed to create tempdir for ClickHouse Keeper data")?; + + let log_path = data_dir.path().join("clickhouse-keeper.log"); + let err_log_path = data_dir.path().join("clickhouse-keeper.err.log"); + let log_storage_path = data_dir.path().join("log"); + let snapshot_storage_path = data_dir.path().join("snapshots"); + let args = vec![ + "keeper".to_string(), + "--config-file".to_string(), + format!("{}", config_path.display()), + ]; + + let child = tokio::process::Command::new("clickhouse") + .args(&args) + .stdin(Stdio::null()) + .stdout(Stdio::null()) + .stderr(Stdio::null()) + .env("CLICKHOUSE_WATCHDOG_ENABLE", "0") + .env("CH_LOG", &log_path) + .env("CH_ERROR_LOG", err_log_path) + .env("CH_LISTEN_ADDR", "::") + .env("CH_LISTEN_PORT", port.to_string()) + .env("CH_KEEPER_ID_CURRENT", k_id.to_string()) + .env("CH_DATASTORE", data_dir.path()) + .env("CH_LOG_STORAGE_PATH", log_storage_path) + .env("CH_SNAPSHOT_STORAGE_PATH", snapshot_storage_path) + .env("CH_KEEPER_ID_01", "1") + .env("CH_KEEPER_ID_02", "2") + .env("CH_KEEPER_ID_03", "3") + // There seems to be a bug using ipv6 and localhost with a replicated + // set up when installing all servers and coordinator nodes on the same + // server. For this reason we will be using ipv4 for testing. + .env("CH_KEEPER_HOST_01", "127.0.0.1") + .env("CH_KEEPER_HOST_02", "127.0.0.1") + .env("CH_KEEPER_HOST_03", "127.0.0.1") + .spawn() + .with_context(|| { + format!( + "failed to spawn `clickhouse keeper` (with args: {:?})", + &args + ) + })?; + + let data_path = data_dir.path().to_path_buf(); + + let result = wait_for_ready(log_path).await; + match result { + Ok(()) => Ok(Self { + data_dir: Some(data_dir), + data_path, + port, + args, + child: Some(child), + }), + Err(e) => Err(e), + } + } + /// Wait for the ClickHouse server process to shutdown, after it's been killed. pub async fn wait_for_shutdown(&mut self) -> Result<(), anyhow::Error> { if let Some(mut child) = self.child.take() { @@ -204,6 +314,48 @@ impl Drop for ClickHouseInstance { } } +// Wait for the ClickHouse log file to become available, including the +// port number. +// +// We extract the port number from the log-file regardless of whether we +// know it already, as this is a more reliable check that the server is +// up and listening. Previously we only did this in the case we need to +// _learn_ the port, which introduces the possibility that we return +// from this function successfully, but the server itself is not yet +// ready to accept connections. +pub async fn wait_for_port(log_path: PathBuf) -> Result { + let p = poll::wait_for_condition( + || async { + let result = + discover_local_listening_port(&log_path, CLICKHOUSE_TIMEOUT) + .await; + match result { + // Successfully extracted the port, return it. + Ok(port) => Ok(port), + Err(e) => { + match e { + ClickHouseError::Io(ref inner) => { + if matches!( + inner.kind(), + std::io::ErrorKind::NotFound + ) { + return Err(poll::CondCheckError::NotYet); + } + } + _ => {} + } + Err(poll::CondCheckError::from(e)) + } + } + }, + &Duration::from_millis(500), + &CLICKHOUSE_TIMEOUT, + ) + .await + .context("waiting to discover ClickHouse port")?; + Ok(p) +} + // Parse the ClickHouse log file at the given path, looking for a line reporting the port number of // the HTTP server. This is only used if the port is chosen by the OS, not the caller. async fn discover_local_listening_port( @@ -257,10 +409,83 @@ async fn find_clickhouse_port_in_log( } } +// Wait for the ClickHouse log file to report it is ready to receive connections +pub async fn wait_for_ready(log_path: PathBuf) -> Result<(), anyhow::Error> { + let p = poll::wait_for_condition( + || async { + let result = discover_ready(&log_path, CLICKHOUSE_TIMEOUT).await; + match result { + Ok(ready) => Ok(ready), + Err(e) => { + match e { + ClickHouseError::Io(ref inner) => { + if matches!( + inner.kind(), + std::io::ErrorKind::NotFound + ) { + return Err(poll::CondCheckError::NotYet); + } + } + _ => {} + } + Err(poll::CondCheckError::from(e)) + } + } + }, + &Duration::from_millis(500), + &CLICKHOUSE_TIMEOUT, + ) + .await + .context("waiting to discover if ClickHouse is ready for connections")?; + Ok(p) +} + +// Parse the ClickHouse log file at the given path, looking for a line reporting that the server +// is ready for connections. +async fn discover_ready( + path: &Path, + timeout: Duration, +) -> Result<(), ClickHouseError> { + let timeout = Instant::now() + timeout; + tokio::time::timeout_at(timeout, clickhouse_ready_from_log(path)) + .await + .map_err(|_| ClickHouseError::Timeout)? +} + +// Parse the clickhouse log to know if the server is ready for connections. +// +// NOTE: This function loops forever until the expected line is found. It should be run under a +// timeout, or some other mechanism for cancelling it. +async fn clickhouse_ready_from_log(path: &Path) -> Result<(), ClickHouseError> { + let mut reader = BufReader::new(File::open(path).await?); + const READY: &str = " Application: Ready for connections"; + let mut lines = reader.lines(); + loop { + let line = lines.next_line().await?; + match line { + Some(line) => { + if let Some(_) = line.find(READY) { + return Ok(()); + } + } + None => { + // Reached EOF, just sleep for an interval and check again. + sleep(Duration::from_millis(10)).await; + + // We might have gotten a partial line; close the file, reopen + // it, and start reading again from the beginning. + reader = BufReader::new(File::open(path).await?); + lines = reader.lines(); + } + } + } +} + #[cfg(test)] mod tests { use super::{ - discover_local_listening_port, ClickHouseError, CLICKHOUSE_TIMEOUT, + discover_local_listening_port, discover_ready, ClickHouseError, + CLICKHOUSE_TIMEOUT, }; use std::process::Stdio; use std::{io::Write, sync::Arc, time::Duration}; @@ -302,6 +527,43 @@ mod tests { ); } + #[tokio::test] + async fn test_discover_clickhouse_ready() { + // Write some data to a fake log file + let mut file = NamedTempFile::new().unwrap(); + writeln!(file, "A garbage line").unwrap(); + writeln!( + file, + "2023.07.31 20:12:38.936192 [ 82373 ] Application: Ready for connections.", + ) + .unwrap(); + writeln!(file, "Another garbage line").unwrap(); + file.flush().unwrap(); + + assert!(matches!( + discover_ready(file.path(), CLICKHOUSE_TIMEOUT).await, + Ok(()) + )); + } + + #[tokio::test] + async fn test_discover_clickhouse_not_ready() { + // Write some data to a fake log file + let mut file = NamedTempFile::new().unwrap(); + writeln!(file, "A garbage line").unwrap(); + writeln!( + file, + "2023.07.31 20:12:38.936192 [ 82373 ] Application: Not ready for connections.", + ) + .unwrap(); + writeln!(file, "Another garbage line").unwrap(); + file.flush().unwrap(); + assert!(matches!( + discover_ready(file.path(), Duration::from_secs(1)).await, + Err(ClickHouseError::Timeout {}) + )); + } + // A regression test for #131. // // The function `discover_local_listening_port` initially read from the log file until EOF, but