Skip to content

Commit

Permalink
Start integrating clickhouse clusters to blueprints (#6627)
Browse files Browse the repository at this point in the history
This PR integrates integrates clickhouse keeper and server node
allocation via `ClickhouseAllocator` into the `BlueprintBuilder`. It
goes about testing that allocation works as expected via the planner.
There are many more specific tests for the `ClickhouseAllocator` and the
tests here are mainly checking that everything fits together and the
same results occur when run through the planner. There is an additional
test and code to allow us to disable the cluster policy and expunge 
all clickhouse keeper and server zones in one shot.

This code is safe to merge because it is currently inert. There is no
way to enable the `ClickhousePolicy` outside of tests yet. This will
come in one or two follow up PRs where we add an internal nexus endpoint
for enabling the policy and then an OMDB command to trigger the
endpoint. Further OMDB support will be added for monitoring. We expect
that for the foreseeable future we will always deploy with
`ClickhousePolicy::deploy_with_standalone = true`. This is stage 1 of
RFD 468 where we run replicated clickhouse and the existing single node
clickhouse together. 

Lastly, there will be other PRs to plug in the actual inventory
collection and execution phases for clickhouse cluster reconfiguration.
We shouldn't bother even implementing the OMDB policy enablement until
all that is complete as it just won't work.
  • Loading branch information
andrewjstone authored Oct 2, 2024
1 parent b154c06 commit 6d65e1e
Show file tree
Hide file tree
Showing 24 changed files with 1,385 additions and 72 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 5 additions & 3 deletions nexus/db-model/src/deployment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -813,18 +813,20 @@ impl BpClickhouseClusterConfig {
.max_used_server_id
.0
.try_into()
.context("more than 2^63 IDs in use")?,
.context("more than 2^63 clickhouse server IDs in use")?,
max_used_keeper_id: config
.max_used_keeper_id
.0
.try_into()
.context("more than 2^63 IDs in use")?,
.context("more than 2^63 clickhouse keeper IDs in use")?,
cluster_name: config.cluster_name.clone(),
cluster_secret: config.cluster_secret.clone(),
highest_seen_keeper_leader_committed_log_index: config
.highest_seen_keeper_leader_committed_log_index
.try_into()
.context("more than 2^63 IDs in use")?,
.context(
"more than 2^63 clickhouse keeper log indexes in use",
)?,
})
}
}
Expand Down
1 change: 1 addition & 0 deletions nexus/db-queries/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ async-bb8-diesel.workspace = true
async-trait.workspace = true
camino.workspace = true
chrono.workspace = true
clickhouse-admin-types.workspace = true
const_format.workspace = true
diesel.workspace = true
diesel-dtrace.workspace = true
Expand Down
256 changes: 256 additions & 0 deletions nexus/db-queries/src/db/datastore/deployment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use async_bb8_diesel::AsyncConnection;
use async_bb8_diesel::AsyncRunQueryDsl;
use chrono::DateTime;
use chrono::Utc;
use clickhouse_admin_types::{KeeperId, ServerId};
use diesel::expression::SelectableHelper;
use diesel::pg::Pg;
use diesel::query_builder::AstPass;
Expand All @@ -36,6 +37,9 @@ use diesel::OptionalExtension;
use diesel::QueryDsl;
use diesel::RunQueryDsl;
use nexus_db_model::Blueprint as DbBlueprint;
use nexus_db_model::BpClickhouseClusterConfig;
use nexus_db_model::BpClickhouseKeeperZoneIdToNodeId;
use nexus_db_model::BpClickhouseServerZoneIdToNodeId;
use nexus_db_model::BpOmicronPhysicalDisk;
use nexus_db_model::BpOmicronZone;
use nexus_db_model::BpOmicronZoneNic;
Expand All @@ -49,6 +53,7 @@ use nexus_types::deployment::BlueprintPhysicalDisksConfig;
use nexus_types::deployment::BlueprintTarget;
use nexus_types::deployment::BlueprintZoneFilter;
use nexus_types::deployment::BlueprintZonesConfig;
use nexus_types::deployment::ClickhouseClusterConfig;
use nexus_types::deployment::CockroachDbPreserveDowngrade;
use nexus_types::external_api::views::SledState;
use omicron_common::api::external::DataPageParams;
Expand All @@ -58,6 +63,7 @@ use omicron_common::api::external::LookupType;
use omicron_common::api::external::ResourceType;
use omicron_common::bail_unless;
use omicron_uuid_kinds::GenericUuid;
use omicron_uuid_kinds::OmicronZoneUuid;
use omicron_uuid_kinds::SledUuid;
use std::collections::BTreeMap;
use uuid::Uuid;
Expand Down Expand Up @@ -180,6 +186,42 @@ impl DataStore {
})
.collect::<Result<Vec<BpOmicronZoneNic>, _>>()?;

let clickhouse_tables: Option<(_, _, _)> = if let Some(config) =
&blueprint.clickhouse_cluster_config
{
let mut keepers = vec![];
for (zone_id, keeper_id) in &config.keepers {
let keeper = BpClickhouseKeeperZoneIdToNodeId::new(
blueprint_id,
*zone_id,
*keeper_id,
)
.with_context(|| format!("zone {zone_id}, keeper {keeper_id}"))
.map_err(|e| Error::internal_error(&format!("{:#}", e)))?;
keepers.push(keeper)
}

let mut servers = vec![];
for (zone_id, server_id) in &config.servers {
let server = BpClickhouseServerZoneIdToNodeId::new(
blueprint_id,
*zone_id,
*server_id,
)
.with_context(|| format!("zone {zone_id}, server {server_id}"))
.map_err(|e| Error::internal_error(&format!("{:#}", e)))?;
servers.push(server);
}

let cluster_config =
BpClickhouseClusterConfig::new(blueprint_id, config)
.map_err(|e| Error::internal_error(&format!("{:#}", e)))?;

Some((cluster_config, keepers, servers))
} else {
None
};

// This implementation inserts all records associated with the
// blueprint in one transaction. This is required: we don't want
// any planner or executor to see a half-inserted blueprint, nor do we
Expand Down Expand Up @@ -258,7 +300,33 @@ impl DataStore {
.await?;
}

// Insert all clickhouse cluster related tables if necessary
if let Some((clickhouse_cluster_config, keepers, servers)) = clickhouse_tables {
{
use db::schema::bp_clickhouse_cluster_config::dsl;
let _ = diesel::insert_into(dsl::bp_clickhouse_cluster_config)
.values(clickhouse_cluster_config)
.execute_async(&conn)
.await?;
}
{
use db::schema::bp_clickhouse_keeper_zone_id_to_node_id::dsl;
let _ = diesel::insert_into(dsl::bp_clickhouse_keeper_zone_id_to_node_id)
.values(keepers)
.execute_async(&conn)
.await?;
}
{
use db::schema::bp_clickhouse_server_zone_id_to_node_id::dsl;
let _ = diesel::insert_into(dsl::bp_clickhouse_server_zone_id_to_node_id)
.values(servers)
.execute_async(&conn)
.await?;
}
}

Ok(())

})
.await
.map_err(|e| public_error_from_diesel(e, ErrorHandler::Server))?;
Expand Down Expand Up @@ -622,6 +690,156 @@ impl DataStore {
disks_config.disks.sort_unstable_by_key(|d| d.id);
}

// Load our `ClickhouseClusterConfig` if it exists
let clickhouse_cluster_config: Option<ClickhouseClusterConfig> = {
use db::schema::bp_clickhouse_cluster_config::dsl;

let res = dsl::bp_clickhouse_cluster_config
.filter(dsl::blueprint_id.eq(blueprint_id))
.select(BpClickhouseClusterConfig::as_select())
.get_result_async(&*conn)
.await
.optional()
.map_err(|e| {
public_error_from_diesel(e, ErrorHandler::Server)
})?;

match res {
None => None,
Some(bp_config) => {
// Load our clickhouse keeper configs for the given blueprint
let keepers: BTreeMap<OmicronZoneUuid, KeeperId> = {
use db::schema::bp_clickhouse_keeper_zone_id_to_node_id::dsl;
let mut keepers = BTreeMap::new();
let mut paginator = Paginator::new(SQL_BATCH_SIZE);
while let Some(p) = paginator.next() {
let batch = paginated(
dsl::bp_clickhouse_keeper_zone_id_to_node_id,
dsl::omicron_zone_id,
&p.current_pagparams(),
)
.filter(dsl::blueprint_id.eq(blueprint_id))
.select(
BpClickhouseKeeperZoneIdToNodeId::as_select(),
)
.load_async(&*conn)
.await
.map_err(|e| {
public_error_from_diesel(
e,
ErrorHandler::Server,
)
})?;

paginator =
p.found_batch(&batch, &|k| k.omicron_zone_id);

for k in batch {
let keeper_id = KeeperId(
u64::try_from(k.keeper_id).map_err(
|_| {
Error::internal_error(&format!(
"keeper id is negative: {}",
k.keeper_id
))
},
)?,
);
keepers.insert(
k.omicron_zone_id.into(),
keeper_id,
);
}
}
keepers
};

// Load our clickhouse server configs for the given blueprint
let servers: BTreeMap<OmicronZoneUuid, ServerId> = {
use db::schema::bp_clickhouse_server_zone_id_to_node_id::dsl;
let mut servers = BTreeMap::new();
let mut paginator = Paginator::new(SQL_BATCH_SIZE);
while let Some(p) = paginator.next() {
let batch = paginated(
dsl::bp_clickhouse_server_zone_id_to_node_id,
dsl::omicron_zone_id,
&p.current_pagparams(),
)
.filter(dsl::blueprint_id.eq(blueprint_id))
.select(
BpClickhouseServerZoneIdToNodeId::as_select(),
)
.load_async(&*conn)
.await
.map_err(|e| {
public_error_from_diesel(
e,
ErrorHandler::Server,
)
})?;

paginator =
p.found_batch(&batch, &|s| s.omicron_zone_id);

for s in batch {
let server_id = ServerId(
u64::try_from(s.server_id).map_err(
|_| {
Error::internal_error(&format!(
"server id is negative: {}",
s.server_id
))
},
)?,
);
servers.insert(
s.omicron_zone_id.into(),
server_id,
);
}
}
servers
};

Some(ClickhouseClusterConfig {
generation: bp_config.generation.into(),
max_used_server_id: ServerId(
u64::try_from(bp_config.max_used_server_id)
.map_err(|_| {
Error::internal_error(&format!(
"max server id is negative: {}",
bp_config.max_used_server_id
))
})?,
),
max_used_keeper_id: KeeperId(
u64::try_from(bp_config.max_used_keeper_id)
.map_err(|_| {
Error::internal_error(&format!(
"max keeper id is negative: {}",
bp_config.max_used_keeper_id
))
})?,
),
cluster_name: bp_config.cluster_name,
cluster_secret: bp_config.cluster_secret,
highest_seen_keeper_leader_committed_log_index:
u64::try_from(
bp_config.highest_seen_keeper_leader_committed_log_index,
)
.map_err(|_| {
Error::internal_error(&format!(
"max server id is negative: {}",
bp_config.highest_seen_keeper_leader_committed_log_index
))
})?,
keepers,
servers,
})
}
}
};

Ok(Blueprint {
id: blueprint_id,
blueprint_zones,
Expand All @@ -632,6 +850,7 @@ impl DataStore {
external_dns_version,
cockroachdb_fingerprint,
cockroachdb_setting_preserve_downgrade,
clickhouse_cluster_config,
time_created,
creator,
comment,
Expand Down Expand Up @@ -663,6 +882,9 @@ impl DataStore {
nsled_agent_zones,
nzones,
nnics,
nclickhouse_cluster_configs,
nclickhouse_keepers,
nclickhouse_servers,
) = conn
.transaction_async(|conn| async move {
// Ensure that blueprint we're about to delete is not the
Expand Down Expand Up @@ -759,6 +981,34 @@ impl DataStore {
.await?
};

let nclickhouse_cluster_configs = {
use db::schema::bp_clickhouse_cluster_config::dsl;
diesel::delete(
dsl::bp_clickhouse_cluster_config
.filter(dsl::blueprint_id.eq(blueprint_id)),
)
.execute_async(&conn)
.await?
};

let nclickhouse_keepers = {
use db::schema::bp_clickhouse_keeper_zone_id_to_node_id::dsl;
diesel::delete(dsl::bp_clickhouse_keeper_zone_id_to_node_id
.filter(dsl::blueprint_id.eq(blueprint_id)),
)
.execute_async(&conn)
.await?
};

let nclickhouse_servers = {
use db::schema::bp_clickhouse_server_zone_id_to_node_id::dsl;
diesel::delete(dsl::bp_clickhouse_server_zone_id_to_node_id
.filter(dsl::blueprint_id.eq(blueprint_id)),
)
.execute_async(&conn)
.await?
};

Ok((
nblueprints,
nsled_states,
Expand All @@ -767,6 +1017,9 @@ impl DataStore {
nsled_agent_zones,
nzones,
nnics,
nclickhouse_cluster_configs,
nclickhouse_keepers,
nclickhouse_servers,
))
})
.await
Expand All @@ -786,6 +1039,9 @@ impl DataStore {
"nsled_agent_zones" => nsled_agent_zones,
"nzones" => nzones,
"nnics" => nnics,
"nclickhouse_cluster_configs" => nclickhouse_cluster_configs,
"nclickhouse_keepers" => nclickhouse_keepers,
"nclickhouse_servers" => nclickhouse_servers
);

Ok(())
Expand Down
Loading

0 comments on commit 6d65e1e

Please sign in to comment.