Skip to content

Commit

Permalink
test: add new test & store timestamp instead of text
Browse files Browse the repository at this point in the history
  • Loading branch information
jorgehermo9 committed Dec 15, 2024
1 parent c88bccc commit f58a51b
Show file tree
Hide file tree
Showing 3 changed files with 196 additions and 22 deletions.
102 changes: 98 additions & 4 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -360,7 +360,7 @@ semver = { version = "1.0.23", default-features = false, features = ["serde", "s
smallvec = { version = "1", default-features = false, features = ["union", "serde"] }
snap = { version = "1.1.1", default-features = false }
socket2 = { version = "0.5.8", default-features = false }
sqlx = { version = "0.8.2", default-features= false, features = ["derive", "postgres", "runtime-tokio"], optional=true }
sqlx = { version = "0.8.2", default-features= false, features = ["derive", "postgres", "chrono", "runtime-tokio"], optional=true }
stream-cancel = { version = "0.8.2", default-features = false }
strip-ansi-escapes = { version = "0.2.0", default-features = false }
syslog = { version = "6.1.1", default-features = false, optional = true }
Expand Down
114 changes: 97 additions & 17 deletions src/sinks/postgres/integration_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,15 @@ use crate::{
sinks::{postgres::PostgresConfig, util::test::load_sink},
test_util::{components::run_and_assert_sink_compliance, temp_table, trace_init},
};
use chrono::{DateTime, Utc};
use futures::stream;
use serde::{Deserialize, Serialize};
use sqlx::{Connection, FromRow, PgConnection};
use std::future::ready;
use vector_lib::event::{BatchNotifier, BatchStatus, BatchStatusReceiver, Event, LogEvent};

const POSTGRES_SINK_TAGS: [&str; 2] = ["endpoint", "protocol"];

fn pg_url() -> String {
std::env::var("PG_URL").expect("PG_URL must be set")
}
Expand All @@ -19,6 +22,11 @@ fn create_event(id: i64) -> Event {
event.insert("host", "example.com");
let event_payload = event.clone().into_parts().0;
event.insert("payload", event_payload);
let timestamp = Utc::now();
// Postgres does not support nanosecond-resolution, so we truncate the timestamp to microsecond-resolution.
let timestamp_microsecond_resolution =
DateTime::from_timestamp_micros(timestamp.timestamp_micros());
event.insert("timestamp", timestamp_microsecond_resolution);
event.into()
}

Expand All @@ -28,11 +36,17 @@ fn create_event_with_notifier(id: i64) -> (Event, BatchStatusReceiver) {
(event, receiver)
}

fn create_events(count: usize) -> (Vec<Event>, BatchStatusReceiver) {
let mut events = (0..count as i64).map(create_event).collect::<Vec<_>>();
let receiver = BatchNotifier::apply_to(&mut events);
return (events, receiver);
}

#[derive(Debug, Serialize, Deserialize, FromRow)]
struct TestEvent {
id: i64,
host: String,
timestamp: String,
timestamp: DateTime<Utc>,
message: String,
payload: serde_json::Value,
}
Expand All @@ -58,16 +72,88 @@ async fn prepare_config() -> (PostgresConfig, String, PgConnection) {
(config, table, connection)
}

// TODO: create table that has an `insertion_date` that defaults to NOW in postgres, so we can order
// by it and get the event insertion order to check with the expected order.
#[tokio::test]
async fn insert_single_event() {
let (config, table, mut connection) = prepare_config().await;
let (sink, _hc) = config.build(SinkContext::default()).await.unwrap();
let create_table_sql =
format!("CREATE TABLE IF NOT EXISTS {table} (id BIGINT, host TEXT, timestamp TIMESTAMPTZ, message TEXT, payload JSONB)");
sqlx::query(&create_table_sql)
.execute(&mut connection)
.await
.unwrap();

let (input_event, mut receiver) = create_event_with_notifier(0);
let input_log_event = input_event.clone().into_log();
let expected_value = serde_json::to_value(&input_log_event).unwrap();

run_and_assert_sink_compliance(sink, stream::once(ready(input_event)), &POSTGRES_SINK_TAGS)
.await;
// We drop the event to notify the receiver that the batch was delivered.
std::mem::drop(input_log_event);
assert_eq!(receiver.try_recv(), Ok(BatchStatus::Delivered));

let select_all_sql = format!("SELECT * FROM {table}");
let actual_event: TestEvent = sqlx::query_as(&select_all_sql)
.fetch_one(&mut connection)
.await
.unwrap();
let actual_value = serde_json::to_value(actual_event).unwrap();
assert_eq!(expected_value, actual_value);
}

#[tokio::test]
async fn insert_multiple_events() {
let (config, table, mut connection) = prepare_config().await;
let (sink, _hc) = config.build(SinkContext::default()).await.unwrap();
let create_table_sql = format!(
"CREATE TABLE IF NOT EXISTS {table} (id BIGINT, host TEXT, timestamp TIMESTAMPTZ, message TEXT, payload JSONB)"
);
sqlx::query(&create_table_sql)
.execute(&mut connection)
.await
.unwrap();

let (input_events, mut receiver) = create_events(100);
let input_log_events = input_events
.clone()
.into_iter()
.map(Event::into_log)
.collect::<Vec<_>>();
let expected_values = input_log_events
.iter()
.map(|event| serde_json::to_value(event).unwrap())
.collect::<Vec<_>>();
run_and_assert_sink_compliance(sink, stream::iter(input_events), &POSTGRES_SINK_TAGS).await;
// We drop the event to notify the receiver that the batch was delivered.
std::mem::drop(input_log_events);
assert_eq!(receiver.try_recv(), Ok(BatchStatus::Delivered));

let select_all_sql = format!("SELECT * FROM {table} ORDER BY id");
let actual_events: Vec<TestEvent> = sqlx::query_as(&select_all_sql)
.fetch_all(&mut connection)
.await
.unwrap();
let actual_values = actual_events
.iter()
.map(|event| serde_json::to_value(event).unwrap())
.collect::<Vec<_>>();
assert_eq!(expected_values, actual_values);
}

// Using null::{table} with jsonb_populate_recordset does not work with default values
// https://dba.stackexchange.com/questions/308114/use-default-value-instead-of-inserted-null
// https://stackoverflow.com/questions/49992531/postgresql-insert-a-null-convert-to-default
// TODO: this cannot be fixed without a workaround involving a trigger creation, which is beyond
// Vector's job in the DB. We should document this limitation alongside with this test.
#[tokio::test]
async fn default_columns_are_not_populated() {
let (config, table, mut connection) = prepare_config().await;
let (sink, _hc) = config.build(SinkContext::default()).await.unwrap();
// We store the timestamp as text and not as `timestamp with timezone` postgres type due to
// postgres not supporting nanosecond-resolution (it does support microsecond-resolution).
let create_table_sql =
format!("CREATE TABLE IF NOT EXISTS {table} (id bigint, host text, timestamp text, message text, payload jsonb)");
format!("CREATE TABLE IF NOT EXISTS {table} (id BIGINT, not_existing_column TEXT DEFAULT 'default_value')");
sqlx::query(&create_table_sql)
.execute(&mut connection)
.await
Expand All @@ -77,23 +163,17 @@ async fn insert_single_event() {
run_and_assert_sink_compliance(
sink,
stream::once(ready(input_event.clone())),
&["endpoint", "protocol"],
&POSTGRES_SINK_TAGS,
)
.await;
// We drop the event to notify the receiver that the batch was delivered.
std::mem::drop(input_event);
assert_eq!(receiver.try_recv(), Ok(BatchStatus::Delivered));

let select_all_sql = format!("SELECT * FROM {table}");
let actual_event: TestEvent = sqlx::query_as(&select_all_sql)
let select_all_sql = format!("SELECT not_existing_column FROM {table}");
let inserted_not_existing_column: (Option<String>,) = sqlx::query_as(&select_all_sql)
.fetch_one(&mut connection)
.await
.unwrap();

// drop input_event after comparing with response
{
let input_log_event = input_event.into_log();
let expected_value = serde_json::to_value(&input_log_event).unwrap();
let actual_value = serde_json::to_value(actual_event).unwrap();
assert_eq!(expected_value, actual_value);
}

assert_eq!(receiver.try_recv(), Ok(BatchStatus::Delivered));
assert_eq!(inserted_not_existing_column.0, None);
}

0 comments on commit f58a51b

Please sign in to comment.