Skip to content

Commit

Permalink
[clickhouse] Internal monitoring endpoint (oxidecomputer#7114)
Browse files Browse the repository at this point in the history
## Overview

This commit adds an endpoint to retrieve timeseries from the `system`
database. For the time being we will only add support for the
`metric_log` and `asynchronous_metric_log` tables.

This endpoint is still a bit bare bones, but will be a good start to
begin monitoring the ClickHouse servers.

## Examples

### Queries per second

```console
$ curl "http://[::1]:8888/timeseries/metric_log/ProfileEvent_Query/avg" | jq
  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100  2883  100  2883    0     0   5912      0 --:--:-- --:--:-- --:--:--  5907
[
  {
    "time": "2024-11-20T04:09:00Z",
    "value": 0.0
  },
  {
    "time": "2024-11-20T04:10:00Z",
    "value": 0.0
  },
  {
    "time": "2024-11-20T04:11:00Z",
    "value": 0.06666666666666667
  }
]
```

### Disk usage

```console
$ curl "http://[::1]:8888/timeseries/asynchronous_metric_log/DiskUsed_default/avg?interval=120&time_range=3600&timestamp_format=unix_epoch" | jq
  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100  1786  100  1786    0     0   3728      0 --:--:-- --:--:-- --:--:--  3728
[
 {
    "time": "1732513320",
    "value": 120491427254.85716
  },
  {
    "time": "1732513440",
    "value": 120382774033.06668
  },
  {
    "time": "1732513560",
    "value": 120364752622.93332
  }
]
```

Related: oxidecomputer#6953
  • Loading branch information
karencfv authored Nov 26, 2024
1 parent e87ac05 commit 1f83f07
Show file tree
Hide file tree
Showing 8 changed files with 658 additions and 22 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -336,7 +336,7 @@ clickhouse-admin-server-client = { path = "clients/clickhouse-admin-server-clien
clickhouse-admin-single-client = { path = "clients/clickhouse-admin-single-client" }
clickhouse-admin-types = { path = "clickhouse-admin/types" }
clickhouse-admin-test-utils = { path = "clickhouse-admin/test-utils" }
clickward = { git = "https://github.com/oxidecomputer/clickward", rev = "a1b342c2558e835d09e6e39a40d3de798a29c2f" }
clickward = { git = "https://github.com/oxidecomputer/clickward", rev = "242fd812aaeafec99ba01b5505ffbb2bd2370917" }
cockroach-admin-api = { path = "cockroach-admin/api" }
cockroach-admin-client = { path = "clients/cockroach-admin-client" }
cockroach-admin-types = { path = "cockroach-admin/types" }
Expand Down
37 changes: 34 additions & 3 deletions clickhouse-admin/api/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,13 @@

use clickhouse_admin_types::{
ClickhouseKeeperClusterMembership, DistributedDdlQueue, KeeperConf,
KeeperConfig, KeeperConfigurableSettings, Lgif, RaftConfig, ReplicaConfig,
ServerConfigurableSettings,
KeeperConfig, KeeperConfigurableSettings, Lgif, MetricInfoPath, RaftConfig,
ReplicaConfig, ServerConfigurableSettings, SystemTimeSeries,
TimeSeriesSettingsQuery,
};
use dropshot::{
HttpError, HttpResponseCreated, HttpResponseOk,
HttpResponseUpdatedNoContent, RequestContext, TypedBody,
HttpResponseUpdatedNoContent, Path, Query, RequestContext, TypedBody,
};

/// API interface for our clickhouse-admin-keeper server
Expand Down Expand Up @@ -116,6 +117,21 @@ pub trait ClickhouseAdminServerApi {
async fn distributed_ddl_queue(
rqctx: RequestContext<Self::Context>,
) -> Result<HttpResponseOk<Vec<DistributedDdlQueue>>, HttpError>;

/// Retrieve time series from the system database.
///
/// The value of each data point is the average of all stored data points
/// within the interval.
/// These are internal ClickHouse metrics.
#[endpoint {
method = GET,
path = "/timeseries/{table}/{metric}/avg"
}]
async fn system_timeseries_avg(
rqctx: RequestContext<Self::Context>,
path_params: Path<MetricInfoPath>,
query_params: Query<TimeSeriesSettingsQuery>,
) -> Result<HttpResponseOk<Vec<SystemTimeSeries>>, HttpError>;
}

/// API interface for our clickhouse-admin-single server
Expand All @@ -136,4 +152,19 @@ pub trait ClickhouseAdminSingleApi {
async fn init_db(
rqctx: RequestContext<Self::Context>,
) -> Result<HttpResponseUpdatedNoContent, HttpError>;

/// Retrieve time series from the system database.
///
/// The value of each data point is the average of all stored data points
/// within the interval.
/// These are internal ClickHouse metrics.
#[endpoint {
method = GET,
path = "/timeseries/{table}/{metric}/avg"
}]
async fn system_timeseries_avg(
rqctx: RequestContext<Self::Context>,
path_params: Path<MetricInfoPath>,
query_params: Query<TimeSeriesSettingsQuery>,
) -> Result<HttpResponseOk<Vec<SystemTimeSeries>>, HttpError>;
}
68 changes: 54 additions & 14 deletions clickhouse-admin/src/clickhouse_cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,19 +6,23 @@ use anyhow::Result;
use camino::Utf8PathBuf;
use clickhouse_admin_types::{
ClickhouseKeeperClusterMembership, DistributedDdlQueue, KeeperConf,
KeeperId, Lgif, RaftConfig, OXIMETER_CLUSTER,
KeeperId, Lgif, RaftConfig, SystemTimeSeries, SystemTimeSeriesSettings,
OXIMETER_CLUSTER,
};
use dropshot::HttpError;
use illumos_utils::{output_to_exec_error, ExecutionError};
use slog::Logger;
use slog::{debug, Logger};
use slog_error_chain::{InlineErrorChain, SlogInlineError};
use std::collections::BTreeSet;
use std::ffi::OsStr;
use std::fmt::Display;
use std::io;
use std::net::SocketAddrV6;
use std::time::Duration;
use tokio::process::Command;

const DEFAULT_COMMAND_TIMEOUT: Duration = Duration::from_secs(30);

#[derive(Debug, thiserror::Error, SlogInlineError)]
pub enum ClickhouseCliError {
#[error("failed to run `clickhouse {subcommand}`")]
Expand All @@ -38,13 +42,16 @@ pub enum ClickhouseCliError {
#[source]
err: anyhow::Error,
},
#[error("clickhouse server unavailable: {0}")]
ServerUnavailable(String),
}

impl From<ClickhouseCliError> for HttpError {
fn from(err: ClickhouseCliError) -> Self {
match err {
ClickhouseCliError::Run { .. }
| ClickhouseCliError::Parse { .. }
| ClickhouseCliError::ServerUnavailable { .. }
| ClickhouseCliError::ExecutionError(_) => {
let message = InlineErrorChain::new(&err).to_string();
HttpError {
Expand Down Expand Up @@ -161,6 +168,25 @@ impl ClickhouseCli {
.await
}

pub async fn system_timeseries_avg(
&self,
settings: SystemTimeSeriesSettings,
) -> Result<Vec<SystemTimeSeries>, ClickhouseCliError> {
let log = self.log.clone().unwrap();
let query = settings.query_avg();

debug!(&log, "Querying system database"; "query" => &query);

self.client_non_interactive(
ClickhouseClientType::Server,
&query,
"Retrieve time series from the system database",
SystemTimeSeries::parse,
log,
)
.await
}

async fn client_non_interactive<F, T>(
&self,
client: ClickhouseClientType,
Expand All @@ -182,19 +208,33 @@ impl ClickhouseCli {
.arg("--query")
.arg(query);

let output = command.output().await.map_err(|err| {
let err_args: Vec<&OsStr> = command.as_std().get_args().collect();
let err_args_parsed: Vec<String> = err_args
.iter()
.map(|&os_str| os_str.to_string_lossy().into_owned())
.collect();
let err_args_str = err_args_parsed.join(" ");
ClickhouseCliError::Run {
description: subcommand_description,
subcommand: err_args_str,
err,
let now = tokio::time::Instant::now();
let result =
tokio::time::timeout(DEFAULT_COMMAND_TIMEOUT, command.output())
.await;

let elapsed = now.elapsed();
let output = match result {
Ok(result) => result.map_err(|err| {
let err_args: Vec<&OsStr> =
command.as_std().get_args().collect();
let err_args_parsed: Vec<String> = err_args
.iter()
.map(|&os_str| os_str.to_string_lossy().into_owned())
.collect();
let err_args_str = err_args_parsed.join(" ");
ClickhouseCliError::Run {
description: subcommand_description,
subcommand: err_args_str,
err,
}
})?,
Err(e) => {
return Err(ClickhouseCliError::ServerUnavailable(format!(
"command timed out after {elapsed:?}: {e}"
)))
}
})?;
};

if !output.status.success() {
return Err(output_to_exec_error(command.as_std(), &output).into());
Expand Down
37 changes: 34 additions & 3 deletions clickhouse-admin/src/http_entrypoints.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,13 @@ use crate::context::{ServerContext, SingleServerContext};
use clickhouse_admin_api::*;
use clickhouse_admin_types::{
ClickhouseKeeperClusterMembership, DistributedDdlQueue, KeeperConf,
KeeperConfig, KeeperConfigurableSettings, Lgif, RaftConfig, ReplicaConfig,
ServerConfigurableSettings,
KeeperConfig, KeeperConfigurableSettings, Lgif, MetricInfoPath, RaftConfig,
ReplicaConfig, ServerConfigurableSettings, SystemTimeSeries,
SystemTimeSeriesSettings, TimeSeriesSettingsQuery,
};
use dropshot::{
ApiDescription, HttpError, HttpResponseCreated, HttpResponseOk,
HttpResponseUpdatedNoContent, RequestContext, TypedBody,
HttpResponseUpdatedNoContent, Path, Query, RequestContext, TypedBody,
};
use illumos_utils::svcadm::Svcadm;
use omicron_common::address::CLICKHOUSE_TCP_PORT;
Expand Down Expand Up @@ -64,6 +65,21 @@ impl ClickhouseAdminServerApi for ClickhouseAdminServerImpl {
let output = ctx.clickhouse_cli().distributed_ddl_queue().await?;
Ok(HttpResponseOk(output))
}

async fn system_timeseries_avg(
rqctx: RequestContext<Self::Context>,
path_params: Path<MetricInfoPath>,
query_params: Query<TimeSeriesSettingsQuery>,
) -> Result<HttpResponseOk<Vec<SystemTimeSeries>>, HttpError> {
let ctx = rqctx.context();
let retrieval_settings = query_params.into_inner();
let metric_info = path_params.into_inner();
let settings =
SystemTimeSeriesSettings { retrieval_settings, metric_info };
let output =
ctx.clickhouse_cli().system_timeseries_avg(settings).await?;
Ok(HttpResponseOk(output))
}
}

enum ClickhouseAdminKeeperImpl {}
Expand Down Expand Up @@ -155,4 +171,19 @@ impl ClickhouseAdminSingleApi for ClickhouseAdminSingleImpl {

Ok(HttpResponseUpdatedNoContent())
}

async fn system_timeseries_avg(
rqctx: RequestContext<Self::Context>,
path_params: Path<MetricInfoPath>,
query_params: Query<TimeSeriesSettingsQuery>,
) -> Result<HttpResponseOk<Vec<SystemTimeSeries>>, HttpError> {
let ctx = rqctx.context();
let retrieval_settings = query_params.into_inner();
let metric_info = path_params.into_inner();
let settings =
SystemTimeSeriesSettings { retrieval_settings, metric_info };
let output =
ctx.clickhouse_cli().system_timeseries_avg(settings).await?;
Ok(HttpResponseOk(output))
}
}
Loading

0 comments on commit 1f83f07

Please sign in to comment.