Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Start integrating clickhouse clusters to blueprints #6627

Merged
merged 20 commits into from
Oct 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 5 additions & 3 deletions nexus/db-model/src/deployment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -813,18 +813,20 @@ impl BpClickhouseClusterConfig {
.max_used_server_id
.0
.try_into()
.context("more than 2^63 IDs in use")?,
.context("more than 2^63 clickhouse server IDs in use")?,
max_used_keeper_id: config
.max_used_keeper_id
.0
.try_into()
.context("more than 2^63 IDs in use")?,
.context("more than 2^63 clickhouse keeper IDs in use")?,
cluster_name: config.cluster_name.clone(),
cluster_secret: config.cluster_secret.clone(),
highest_seen_keeper_leader_committed_log_index: config
.highest_seen_keeper_leader_committed_log_index
.try_into()
.context("more than 2^63 IDs in use")?,
.context(
"more than 2^63 clickhouse keeper log indexes in use",
)?,
})
}
}
Expand Down
1 change: 1 addition & 0 deletions nexus/db-queries/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ async-bb8-diesel.workspace = true
async-trait.workspace = true
camino.workspace = true
chrono.workspace = true
clickhouse-admin-types.workspace = true
const_format.workspace = true
diesel.workspace = true
diesel-dtrace.workspace = true
Expand Down
256 changes: 256 additions & 0 deletions nexus/db-queries/src/db/datastore/deployment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use async_bb8_diesel::AsyncConnection;
use async_bb8_diesel::AsyncRunQueryDsl;
use chrono::DateTime;
use chrono::Utc;
use clickhouse_admin_types::{KeeperId, ServerId};
use diesel::expression::SelectableHelper;
use diesel::pg::Pg;
use diesel::query_builder::AstPass;
Expand All @@ -36,6 +37,9 @@ use diesel::OptionalExtension;
use diesel::QueryDsl;
use diesel::RunQueryDsl;
use nexus_db_model::Blueprint as DbBlueprint;
use nexus_db_model::BpClickhouseClusterConfig;
use nexus_db_model::BpClickhouseKeeperZoneIdToNodeId;
use nexus_db_model::BpClickhouseServerZoneIdToNodeId;
use nexus_db_model::BpOmicronPhysicalDisk;
use nexus_db_model::BpOmicronZone;
use nexus_db_model::BpOmicronZoneNic;
Expand All @@ -49,6 +53,7 @@ use nexus_types::deployment::BlueprintPhysicalDisksConfig;
use nexus_types::deployment::BlueprintTarget;
use nexus_types::deployment::BlueprintZoneFilter;
use nexus_types::deployment::BlueprintZonesConfig;
use nexus_types::deployment::ClickhouseClusterConfig;
use nexus_types::deployment::CockroachDbPreserveDowngrade;
use nexus_types::external_api::views::SledState;
use omicron_common::api::external::DataPageParams;
Expand All @@ -58,6 +63,7 @@ use omicron_common::api::external::LookupType;
use omicron_common::api::external::ResourceType;
use omicron_common::bail_unless;
use omicron_uuid_kinds::GenericUuid;
use omicron_uuid_kinds::OmicronZoneUuid;
use omicron_uuid_kinds::SledUuid;
use std::collections::BTreeMap;
use uuid::Uuid;
Expand Down Expand Up @@ -180,6 +186,42 @@ impl DataStore {
})
.collect::<Result<Vec<BpOmicronZoneNic>, _>>()?;

let clickhouse_tables: Option<(_, _, _)> = if let Some(config) =
&blueprint.clickhouse_cluster_config
{
let mut keepers = vec![];
for (zone_id, keeper_id) in &config.keepers {
let keeper = BpClickhouseKeeperZoneIdToNodeId::new(
blueprint_id,
*zone_id,
*keeper_id,
)
.with_context(|| format!("zone {zone_id}, keeper {keeper_id}"))
.map_err(|e| Error::internal_error(&format!("{:#}", e)))?;
keepers.push(keeper)
}

let mut servers = vec![];
for (zone_id, server_id) in &config.servers {
let server = BpClickhouseServerZoneIdToNodeId::new(
blueprint_id,
*zone_id,
*server_id,
)
.with_context(|| format!("zone {zone_id}, server {server_id}"))
.map_err(|e| Error::internal_error(&format!("{:#}", e)))?;
servers.push(server);
}

let cluster_config =
BpClickhouseClusterConfig::new(blueprint_id, config)
.map_err(|e| Error::internal_error(&format!("{:#}", e)))?;

Some((cluster_config, keepers, servers))
} else {
None
};

// This implementation inserts all records associated with the
// blueprint in one transaction. This is required: we don't want
// any planner or executor to see a half-inserted blueprint, nor do we
Expand Down Expand Up @@ -258,7 +300,33 @@ impl DataStore {
.await?;
}

// Insert all clickhouse cluster related tables if necessary
if let Some((clickhouse_cluster_config, keepers, servers)) = clickhouse_tables {
{
use db::schema::bp_clickhouse_cluster_config::dsl;
let _ = diesel::insert_into(dsl::bp_clickhouse_cluster_config)
.values(clickhouse_cluster_config)
.execute_async(&conn)
.await?;
}
{
use db::schema::bp_clickhouse_keeper_zone_id_to_node_id::dsl;
let _ = diesel::insert_into(dsl::bp_clickhouse_keeper_zone_id_to_node_id)
.values(keepers)
.execute_async(&conn)
.await?;
}
{
use db::schema::bp_clickhouse_server_zone_id_to_node_id::dsl;
let _ = diesel::insert_into(dsl::bp_clickhouse_server_zone_id_to_node_id)
.values(servers)
.execute_async(&conn)
.await?;
}
}

Ok(())

})
.await
.map_err(|e| public_error_from_diesel(e, ErrorHandler::Server))?;
Expand Down Expand Up @@ -622,6 +690,156 @@ impl DataStore {
disks_config.disks.sort_unstable_by_key(|d| d.id);
}

// Load our `ClickhouseClusterConfig` if it exists
let clickhouse_cluster_config: Option<ClickhouseClusterConfig> = {
use db::schema::bp_clickhouse_cluster_config::dsl;

let res = dsl::bp_clickhouse_cluster_config
.filter(dsl::blueprint_id.eq(blueprint_id))
.select(BpClickhouseClusterConfig::as_select())
.get_result_async(&*conn)
.await
.optional()
.map_err(|e| {
public_error_from_diesel(e, ErrorHandler::Server)
})?;

match res {
None => None,
Some(bp_config) => {
// Load our clickhouse keeper configs for the given blueprint
let keepers: BTreeMap<OmicronZoneUuid, KeeperId> = {
use db::schema::bp_clickhouse_keeper_zone_id_to_node_id::dsl;
let mut keepers = BTreeMap::new();
let mut paginator = Paginator::new(SQL_BATCH_SIZE);
while let Some(p) = paginator.next() {
let batch = paginated(
dsl::bp_clickhouse_keeper_zone_id_to_node_id,
dsl::omicron_zone_id,
&p.current_pagparams(),
)
.filter(dsl::blueprint_id.eq(blueprint_id))
.select(
BpClickhouseKeeperZoneIdToNodeId::as_select(),
)
.load_async(&*conn)
.await
.map_err(|e| {
public_error_from_diesel(
e,
ErrorHandler::Server,
)
})?;

paginator =
p.found_batch(&batch, &|k| k.omicron_zone_id);

for k in batch {
let keeper_id = KeeperId(
u64::try_from(k.keeper_id).map_err(
|_| {
Error::internal_error(&format!(
"keeper id is negative: {}",
k.keeper_id
))
},
)?,
);
keepers.insert(
k.omicron_zone_id.into(),
keeper_id,
);
}
}
keepers
};

// Load our clickhouse server configs for the given blueprint
let servers: BTreeMap<OmicronZoneUuid, ServerId> = {
use db::schema::bp_clickhouse_server_zone_id_to_node_id::dsl;
let mut servers = BTreeMap::new();
let mut paginator = Paginator::new(SQL_BATCH_SIZE);
while let Some(p) = paginator.next() {
let batch = paginated(
dsl::bp_clickhouse_server_zone_id_to_node_id,
dsl::omicron_zone_id,
&p.current_pagparams(),
)
.filter(dsl::blueprint_id.eq(blueprint_id))
.select(
BpClickhouseServerZoneIdToNodeId::as_select(),
)
.load_async(&*conn)
.await
.map_err(|e| {
public_error_from_diesel(
e,
ErrorHandler::Server,
)
})?;

paginator =
p.found_batch(&batch, &|s| s.omicron_zone_id);

for s in batch {
let server_id = ServerId(
u64::try_from(s.server_id).map_err(
|_| {
Error::internal_error(&format!(
"server id is negative: {}",
s.server_id
))
},
)?,
);
servers.insert(
s.omicron_zone_id.into(),
server_id,
);
}
}
servers
};

Some(ClickhouseClusterConfig {
generation: bp_config.generation.into(),
max_used_server_id: ServerId(
u64::try_from(bp_config.max_used_server_id)
.map_err(|_| {
Error::internal_error(&format!(
"max server id is negative: {}",
bp_config.max_used_server_id
))
})?,
),
max_used_keeper_id: KeeperId(
u64::try_from(bp_config.max_used_keeper_id)
.map_err(|_| {
Error::internal_error(&format!(
"max keeper id is negative: {}",
bp_config.max_used_keeper_id
))
})?,
),
cluster_name: bp_config.cluster_name,
cluster_secret: bp_config.cluster_secret,
highest_seen_keeper_leader_committed_log_index:
u64::try_from(
bp_config.highest_seen_keeper_leader_committed_log_index,
)
.map_err(|_| {
Error::internal_error(&format!(
"max server id is negative: {}",
bp_config.highest_seen_keeper_leader_committed_log_index
))
})?,
keepers,
servers,
})
}
}
};

Ok(Blueprint {
id: blueprint_id,
blueprint_zones,
Expand All @@ -632,6 +850,7 @@ impl DataStore {
external_dns_version,
cockroachdb_fingerprint,
cockroachdb_setting_preserve_downgrade,
clickhouse_cluster_config,
time_created,
creator,
comment,
Expand Down Expand Up @@ -663,6 +882,9 @@ impl DataStore {
nsled_agent_zones,
nzones,
nnics,
nclickhouse_cluster_configs,
nclickhouse_keepers,
nclickhouse_servers,
) = conn
.transaction_async(|conn| async move {
// Ensure that blueprint we're about to delete is not the
Expand Down Expand Up @@ -759,6 +981,34 @@ impl DataStore {
.await?
};

let nclickhouse_cluster_configs = {
use db::schema::bp_clickhouse_cluster_config::dsl;
diesel::delete(
dsl::bp_clickhouse_cluster_config
.filter(dsl::blueprint_id.eq(blueprint_id)),
)
.execute_async(&conn)
.await?
};

let nclickhouse_keepers = {
use db::schema::bp_clickhouse_keeper_zone_id_to_node_id::dsl;
diesel::delete(dsl::bp_clickhouse_keeper_zone_id_to_node_id
.filter(dsl::blueprint_id.eq(blueprint_id)),
)
.execute_async(&conn)
.await?
};

let nclickhouse_servers = {
use db::schema::bp_clickhouse_server_zone_id_to_node_id::dsl;
diesel::delete(dsl::bp_clickhouse_server_zone_id_to_node_id
.filter(dsl::blueprint_id.eq(blueprint_id)),
)
.execute_async(&conn)
.await?
};

Ok((
nblueprints,
nsled_states,
Expand All @@ -767,6 +1017,9 @@ impl DataStore {
nsled_agent_zones,
nzones,
nnics,
nclickhouse_cluster_configs,
nclickhouse_keepers,
nclickhouse_servers,
))
})
.await
Expand All @@ -786,6 +1039,9 @@ impl DataStore {
"nsled_agent_zones" => nsled_agent_zones,
"nzones" => nzones,
"nnics" => nnics,
"nclickhouse_cluster_configs" => nclickhouse_cluster_configs,
"nclickhouse_keepers" => nclickhouse_keepers,
"nclickhouse_servers" => nclickhouse_servers
);

Ok(())
Expand Down
Loading
Loading