Skip to content

Commit

Permalink
feat(clickhouse sink): make database and table templateable (vect…
Browse files Browse the repository at this point in the history
  • Loading branch information
dsmith3197 authored Jul 18, 2023
1 parent fbc0308 commit 536a7f1
Show file tree
Hide file tree
Showing 7 changed files with 193 additions and 49 deletions.
18 changes: 11 additions & 7 deletions src/sinks/clickhouse/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,11 @@ pub struct ClickhouseConfig {

/// The table that data is inserted into.
#[configurable(metadata(docs::examples = "mytable"))]
pub table: String,
pub table: Template,

/// The database that contains the table that data is inserted into.
#[configurable(metadata(docs::examples = "mydatabase"))]
pub database: Option<String>,
pub database: Option<Template>,

/// Sets `input_format_skip_unknown_fields`, allowing ClickHouse to discard fields not present in the table schema.
#[serde(default)]
Expand Down Expand Up @@ -90,25 +90,30 @@ impl SinkConfig for ClickhouseConfig {
let service = ClickhouseService::new(
client.clone(),
auth.clone(),
&endpoint,
self.database.as_deref(),
self.table.as_str(),
endpoint.clone(),
self.skip_unknown_fields,
self.date_time_best_effort,
)?;
);

let request_limits = self.request.unwrap_with(&Default::default());
let service = ServiceBuilder::new()
.settings(request_limits, ClickhouseRetryLogic::default())
.service(service);

let batch_settings = self.batch.into_batcher_settings()?;
let database = self.database.clone().unwrap_or_else(|| {
"default"
.try_into()
.expect("'default' should be a valid template")
});
let sink = ClickhouseSink::new(
batch_settings,
self.compression,
self.encoding.clone(),
service,
protocol,
database,
self.table.clone(),
);

let healthcheck = Box::pin(healthcheck(client, endpoint, auth));
Expand All @@ -126,7 +131,6 @@ impl SinkConfig for ClickhouseConfig {
}

async fn healthcheck(client: HttpClient, endpoint: Uri, auth: Option<Auth>) -> crate::Result<()> {
// TODO: check if table exists?
let uri = format!("{}/?query=SELECT%201", endpoint);
let mut request = Request::get(uri).body(Body::empty()).unwrap();

Expand Down
74 changes: 69 additions & 5 deletions src/sinks/clickhouse/integration_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ async fn insert_events() {

let config = ClickhouseConfig {
endpoint: host.parse().unwrap(),
table: table.clone(),
table: table.clone().try_into().unwrap(),
compression: Compression::None,
batch,
request: TowerRequestConfig {
Expand Down Expand Up @@ -94,7 +94,7 @@ async fn skip_unknown_fields() {

let config = ClickhouseConfig {
endpoint: host.parse().unwrap(),
table: table.clone(),
table: table.clone().try_into().unwrap(),
skip_unknown_fields: true,
compression: Compression::None,
batch,
Expand Down Expand Up @@ -140,7 +140,7 @@ async fn insert_events_unix_timestamps() {

let config = ClickhouseConfig {
endpoint: host.parse().unwrap(),
table: table.clone(),
table: table.clone().try_into().unwrap(),
compression: Compression::None,
encoding: Transformer::new(None, None, Some(TimestampFormat::Unix)).unwrap(),
batch,
Expand Down Expand Up @@ -269,7 +269,7 @@ async fn no_retry_on_incorrect_data() {

let config = ClickhouseConfig {
endpoint: host.parse().unwrap(),
table: table.clone(),
table: table.clone().try_into().unwrap(),
compression: Compression::None,
batch,
..Default::default()
Expand Down Expand Up @@ -320,7 +320,7 @@ async fn no_retry_on_incorrect_data_warp() {

let config = ClickhouseConfig {
endpoint: host.parse().unwrap(),
table: gen_table(),
table: gen_table().try_into().unwrap(),
batch,
..Default::default()
};
Expand All @@ -338,6 +338,70 @@ async fn no_retry_on_incorrect_data_warp() {
assert_eq!(receiver.try_recv(), Ok(BatchStatus::Rejected));
}

#[tokio::test]
async fn templated_table() {
trace_init();

let n_tables = 2;
let table_events: Vec<(String, Event, BatchStatusReceiver)> = (0..n_tables)
.map(|_| {
let table = gen_table();
let (mut event, receiver) = make_event();
event.as_mut_log().insert("table", table.as_str());
(table, event, receiver)
})
.collect();

let host = clickhouse_address();

let mut batch = BatchConfig::default();
batch.max_events = Some(1);

let config = ClickhouseConfig {
endpoint: host.parse().unwrap(),
table: "{{ .table }}".try_into().unwrap(),
batch,
..Default::default()
};

let client = ClickhouseClient::new(host);
for (table, _, _) in &table_events {
client
.create_table(
table,
"host String, timestamp String, message String, table String",
)
.await;
}

let (sink, _hc) = config.build(SinkContext::default()).await.unwrap();

let events: Vec<Event> = table_events
.iter()
.map(|(_, event, _)| event.clone())
.collect();
run_and_assert_sink_compliance(sink, stream::iter(events), &SINK_TAGS).await;

for (table, event, mut receiver) in table_events {
let output = client.select_all(&table).await;
assert_eq!(1, output.rows, "table {} should have 1 row", table);

let expected = serde_json::to_value(event.into_log()).unwrap();
assert_eq!(
expected, output.data[0],
"table \"{}\"'s one row should have the correct data",
table
);

assert_eq!(
receiver.try_recv(),
Ok(BatchStatus::Delivered),
"table \"{}\"'s event should have been delivered",
table
);
}
}

fn make_event() -> (Event, BatchStatusReceiver) {
let (batch, receiver) = BatchNotifier::new_with_receiver();
let mut event = LogEvent::from("raw log line").with_batch_notifier(&batch);
Expand Down
61 changes: 35 additions & 26 deletions src/sinks/clickhouse/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ use crate::{

#[derive(Debug, Clone)]
pub struct ClickhouseRequest {
pub database: String,
pub table: String,
pub body: Bytes,
pub compression: Compression,
pub finalizers: EventFinalizers,
Expand Down Expand Up @@ -107,31 +109,28 @@ impl RetryLogic for ClickhouseRetryLogic {
#[derive(Debug, Clone)]
pub struct ClickhouseService {
client: HttpClient,
uri: Uri,
auth: Option<Auth>,
endpoint: Uri,
skip_unknown_fields: bool,
date_time_best_effort: bool,
}

impl ClickhouseService {
/// Creates a new `ClickhouseService`.
pub fn new(
pub const fn new(
client: HttpClient,
auth: Option<Auth>,
endpoint: &Uri,
database: Option<&str>,
table: &str,
endpoint: Uri,
skip_unknown_fields: bool,
date_time_best_effort: bool,
) -> crate::Result<Self> {
// Set the URI query once during initialization, as it won't change throughout the lifecycle
// of the service.
let uri = set_uri_query(
) -> Self {
Self {
client,
auth,
endpoint,
database.unwrap_or("default"),
table,
skip_unknown_fields,
date_time_best_effort,
)?;
Ok(Self { client, auth, uri })
}
}
}

Expand All @@ -148,22 +147,32 @@ impl Service<ClickhouseRequest> for ClickhouseService {
// Emission of Error internal event is handled upstream by the caller.
fn call(&mut self, request: ClickhouseRequest) -> Self::Future {
let mut client = self.client.clone();
let auth = self.auth.clone();

let mut builder = Request::post(&self.uri)
.header(CONTENT_TYPE, "application/x-ndjson")
.header(CONTENT_LENGTH, request.body.len());
if let Some(ce) = request.compression.content_encoding() {
builder = builder.header(CONTENT_ENCODING, ce);
}
if let Some(auth) = &self.auth {
builder = auth.apply_builder(builder);
}

let http_request = builder
.body(Body::from(request.body))
.expect("building HTTP request failed unexpectedly");
// Build the URI outside of the boxed future to avoid unnecessary clones.
let uri = set_uri_query(
&self.endpoint,
&request.database,
&request.table,
self.skip_unknown_fields,
self.date_time_best_effort,
);

Box::pin(async move {
let mut builder = Request::post(&uri?)
.header(CONTENT_TYPE, "application/x-ndjson")
.header(CONTENT_LENGTH, request.body.len());
if let Some(ce) = request.compression.content_encoding() {
builder = builder.header(CONTENT_ENCODING, ce);
}
if let Some(auth) = auth {
builder = auth.apply_builder(builder);
}

let http_request = builder
.body(Body::from(request.body))
.expect("building HTTP request failed unexpectedly");

let response = client.call(http_request).in_current_span().await?;
let (parts, body) = response.into_parts();
let body = body::to_bytes(body).await?;
Expand Down
Loading

0 comments on commit 536a7f1

Please sign in to comment.