Skip to content

Commit

Permalink
Unconditionally (but serially) initialize the database
Browse files Browse the repository at this point in the history
  • Loading branch information
plotnick committed Nov 18, 2024
1 parent 83efab7 commit 012b6a1
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 36 deletions.
8 changes: 4 additions & 4 deletions clickhouse-admin/src/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,19 +33,19 @@ impl ServerContext {

pub struct SingleServerContext {
clickhouse_cli: ClickhouseCli,
db_initialized: Arc<Mutex<bool>>,
initialization_lock: Arc<Mutex<()>>,
}

impl SingleServerContext {
pub fn new(clickhouse_cli: ClickhouseCli) -> Self {
Self { clickhouse_cli, db_initialized: Arc::new(Mutex::new(false)) }
Self { clickhouse_cli, initialization_lock: Arc::new(Mutex::new(())) }
}

pub fn clickhouse_cli(&self) -> &ClickhouseCli {
&self.clickhouse_cli
}

pub fn db_initialized(&self) -> Arc<Mutex<bool>> {
self.db_initialized.clone()
pub fn initialization_lock(&self) -> Arc<Mutex<()>> {
self.initialization_lock.clone()
}
}
60 changes: 28 additions & 32 deletions clickhouse-admin/src/http_entrypoints.rs
Original file line number Diff line number Diff line change
Expand Up @@ -130,38 +130,34 @@ impl ClickhouseAdminSingleApi for ClickhouseAdminSingleImpl {
) -> Result<HttpResponseUpdatedNoContent, HttpError> {
let log = &rqctx.log;
let ctx = rqctx.context();
let initialized = ctx.db_initialized();
let mut initialized = initialized.lock().await;
if !*initialized {
let http_address = ctx.clickhouse_cli().listen_address;
let native_address = SocketAddrV6::new(
*http_address.ip(),
CLICKHOUSE_TCP_PORT,
0,
0,
);

let client = OximeterClient::new(
http_address.into(),
native_address.into(),
log,
);
debug!(
log,
"initializing single-node ClickHouse \
at {http_address} to version {OXIMETER_VERSION}"
);
client
.initialize_db_with_version(false, OXIMETER_VERSION)
.await
.map_err(|e| {
HttpError::for_internal_error(format!(
"can't initialize single-node ClickHouse \
at {http_address} to version {OXIMETER_VERSION}: {e}",
))
})?;
*initialized = true;
}
let http_address = ctx.clickhouse_cli().listen_address;
let native_address =
SocketAddrV6::new(*http_address.ip(), CLICKHOUSE_TCP_PORT, 0, 0);
let client = OximeterClient::new(
http_address.into(),
native_address.into(),
log,
);
debug!(
log,
"initializing single-node ClickHouse \
at {http_address} to version {OXIMETER_VERSION}"
);

// Database initialization is idempotent, but not concurrency-safe.
// Use a mutex to serialize requests.
let lock = ctx.initialization_lock();
let _guard = lock.lock().await;
client
.initialize_db_with_version(false, OXIMETER_VERSION)
.await
.map_err(|e| {
HttpError::for_internal_error(format!(
"can't initialize single-node ClickHouse \
at {http_address} to version {OXIMETER_VERSION}: {e}",
))
})?;

Ok(HttpResponseUpdatedNoContent())
}
}

0 comments on commit 012b6a1

Please sign in to comment.