diff --git a/Cargo.lock b/Cargo.lock index 40651c009440f..1c25f354f0e86 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1649,6 +1649,9 @@ name = "bitflags" version = "2.4.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "327762f6e5a765692301e5bb513e0d9fef63be86bbc14528052b1cd3e6f03e07" +dependencies = [ + "serde", +] [[package]] name = "bitmask-enum" @@ -1974,12 +1977,13 @@ dependencies = [ [[package]] name = "cc" -version = "1.0.83" +version = "1.2.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f1174fb0b6ec23863f8b971027804a42614e347eafb0a95bf0b12cdae21fc4d0" +checksum = "9157bbaa6b165880c27a4293a474c91cdcf265cc68cc829bf10be0964a391caf" dependencies = [ "jobserver", "libc", + "shlex", ] [[package]] @@ -5173,9 +5177,9 @@ checksum = "8eaf4bc02d17cbdd7ff4c7438cafcdf7fb9a4613313ad11b4f8fefe7d3fa0130" [[package]] name = "jobserver" -version = "0.1.27" +version = "0.1.32" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8c37f63953c4c63420ed5fd3d6d398c719489b9f872b9fa683262f8edd363c7d" +checksum = "48d1dbcbbeb6a7fec7e059840aa538bd62aaccf972c7346c4d9d2059312853d0" dependencies = [ "libc", ] @@ -5503,6 +5507,17 @@ version = "0.2.8" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4ec2a862134d2a7d32d7983ddcdd1c4923530833c9f2ea1a44fc5fa473989058" +[[package]] +name = "libsqlite3-sys" +version = "0.30.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2e99fb7a497b1e3339bc746195567ed8d3e24945ecd636e3619d20b9de9e9149" +dependencies = [ + "cc", + "pkg-config", + "vcpkg", +] + [[package]] name = "libz-sys" version = "1.1.12" @@ -9056,6 +9071,12 @@ dependencies = [ "winapi", ] +[[package]] +name = "shlex" +version = "1.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0fda2ff0d084019ba4d7c6f371c95d8fd75ce3524c3cb8fb653a3023f6323e64" + [[package]] name = "signal-hook" version = "0.3.17" @@ -9326,7 +9347,9 @@ checksum = "93334716a037193fac19df402f8571269c84a00852f6a7066b5d2616dcd64d3e" dependencies = [ "sqlx-core", "sqlx-macros", + "sqlx-mysql", "sqlx-postgres", + "sqlx-sqlite", ] [[package]] @@ -9338,6 +9361,7 @@ dependencies = [ "atoi", "byteorder", "bytes 1.9.0", + "chrono", "crc", "crossbeam-queue", "either", @@ -9398,13 +9422,58 @@ dependencies = [ "serde_json", "sha2", "sqlx-core", + "sqlx-mysql", "sqlx-postgres", + "sqlx-sqlite", "syn 2.0.90", "tempfile", "tokio", "url", ] +[[package]] +name = "sqlx-mysql" +version = "0.8.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "64bb4714269afa44aef2755150a0fc19d756fb580a67db8885608cf02f47d06a" +dependencies = [ + "atoi", + "base64 0.22.1", + "bitflags 2.4.1", + "byteorder", + "bytes 1.9.0", + "chrono", + "crc", + "digest", + "dotenvy", + "either", + "futures-channel", + "futures-core", + "futures-io", + "futures-util", + "generic-array", + "hex", + "hkdf", + "hmac", + "itoa", + "log", + "md-5", + "memchr", + "once_cell", + "percent-encoding", + "rand 0.8.5", + "rsa", + "serde", + "sha1", + "sha2", + "smallvec", + "sqlx-core", + "stringprep", + "thiserror 1.0.68", + "tracing 0.1.41", + "whoami", +] + [[package]] name = "sqlx-postgres" version = "0.8.2" @@ -9415,6 +9484,7 @@ dependencies = [ "base64 0.22.1", "bitflags 2.4.1", "byteorder", + "chrono", "crc", "dotenvy", "etcetera", @@ -9443,6 +9513,30 @@ dependencies = [ "whoami", ] +[[package]] +name = "sqlx-sqlite" +version = "0.8.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d5b2cf34a45953bfd3daaf3db0f7a7878ab9b7a6b91b422d24a7a9e4c857b680" +dependencies = [ + "atoi", + "chrono", + "flume 0.11.0", + "futures-channel", + "futures-core", + "futures-executor", + "futures-intrusive", + "futures-util", + "libsqlite3-sys", + "log", + "percent-encoding", + "serde", + "serde_urlencoded", + "sqlx-core", + "tracing 0.1.41", + "url", +] + [[package]] name = "stable_deref_trait" version = "1.2.0" diff --git a/Cargo.toml b/Cargo.toml index a4f86714f4deb..4aa40a2eb415b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -360,7 +360,7 @@ semver = { version = "1.0.23", default-features = false, features = ["serde", "s smallvec = { version = "1", default-features = false, features = ["union", "serde"] } snap = { version = "1.1.1", default-features = false } socket2 = { version = "0.5.8", default-features = false } -sqlx = { version = "0.8.2", default-features= false, features = ["derive", "postgres", "runtime-tokio"], optional=true } +sqlx = { version = "0.8.2", default-features= false, features = ["derive", "postgres", "chrono", "runtime-tokio"], optional=true } stream-cancel = { version = "0.8.2", default-features = false } strip-ansi-escapes = { version = "0.2.0", default-features = false } syslog = { version = "6.1.1", default-features = false, optional = true } diff --git a/src/sinks/postgres/integration_tests.rs b/src/sinks/postgres/integration_tests.rs index 56ca2adcc14a7..674ec4030cc4e 100644 --- a/src/sinks/postgres/integration_tests.rs +++ b/src/sinks/postgres/integration_tests.rs @@ -3,12 +3,15 @@ use crate::{ sinks::{postgres::PostgresConfig, util::test::load_sink}, test_util::{components::run_and_assert_sink_compliance, temp_table, trace_init}, }; +use chrono::{DateTime, Utc}; use futures::stream; use serde::{Deserialize, Serialize}; use sqlx::{Connection, FromRow, PgConnection}; use std::future::ready; use vector_lib::event::{BatchNotifier, BatchStatus, BatchStatusReceiver, Event, LogEvent}; +const POSTGRES_SINK_TAGS: [&str; 2] = ["endpoint", "protocol"]; + fn pg_url() -> String { std::env::var("PG_URL").expect("PG_URL must be set") } @@ -19,6 +22,11 @@ fn create_event(id: i64) -> Event { event.insert("host", "example.com"); let event_payload = event.clone().into_parts().0; event.insert("payload", event_payload); + let timestamp = Utc::now(); + // Postgres does not support nanosecond-resolution, so we truncate the timestamp to microsecond-resolution. + let timestamp_microsecond_resolution = + DateTime::from_timestamp_micros(timestamp.timestamp_micros()); + event.insert("timestamp", timestamp_microsecond_resolution); event.into() } @@ -28,11 +36,17 @@ fn create_event_with_notifier(id: i64) -> (Event, BatchStatusReceiver) { (event, receiver) } +fn create_events(count: usize) -> (Vec, BatchStatusReceiver) { + let mut events = (0..count as i64).map(create_event).collect::>(); + let receiver = BatchNotifier::apply_to(&mut events); + return (events, receiver); +} + #[derive(Debug, Serialize, Deserialize, FromRow)] struct TestEvent { id: i64, host: String, - timestamp: String, + timestamp: DateTime, message: String, payload: serde_json::Value, } @@ -58,16 +72,88 @@ async fn prepare_config() -> (PostgresConfig, String, PgConnection) { (config, table, connection) } -// TODO: create table that has an `insertion_date` that defaults to NOW in postgres, so we can order -// by it and get the event insertion order to check with the expected order. #[tokio::test] async fn insert_single_event() { + let (config, table, mut connection) = prepare_config().await; + let (sink, _hc) = config.build(SinkContext::default()).await.unwrap(); + let create_table_sql = + format!("CREATE TABLE IF NOT EXISTS {table} (id BIGINT, host TEXT, timestamp TIMESTAMPTZ, message TEXT, payload JSONB)"); + sqlx::query(&create_table_sql) + .execute(&mut connection) + .await + .unwrap(); + + let (input_event, mut receiver) = create_event_with_notifier(0); + let input_log_event = input_event.clone().into_log(); + let expected_value = serde_json::to_value(&input_log_event).unwrap(); + + run_and_assert_sink_compliance(sink, stream::once(ready(input_event)), &POSTGRES_SINK_TAGS) + .await; + // We drop the event to notify the receiver that the batch was delivered. + std::mem::drop(input_log_event); + assert_eq!(receiver.try_recv(), Ok(BatchStatus::Delivered)); + + let select_all_sql = format!("SELECT * FROM {table}"); + let actual_event: TestEvent = sqlx::query_as(&select_all_sql) + .fetch_one(&mut connection) + .await + .unwrap(); + let actual_value = serde_json::to_value(actual_event).unwrap(); + assert_eq!(expected_value, actual_value); +} + +#[tokio::test] +async fn insert_multiple_events() { + let (config, table, mut connection) = prepare_config().await; + let (sink, _hc) = config.build(SinkContext::default()).await.unwrap(); + let create_table_sql = format!( + "CREATE TABLE IF NOT EXISTS {table} (id BIGINT, host TEXT, timestamp TIMESTAMPTZ, message TEXT, payload JSONB)" + ); + sqlx::query(&create_table_sql) + .execute(&mut connection) + .await + .unwrap(); + + let (input_events, mut receiver) = create_events(100); + let input_log_events = input_events + .clone() + .into_iter() + .map(Event::into_log) + .collect::>(); + let expected_values = input_log_events + .iter() + .map(|event| serde_json::to_value(event).unwrap()) + .collect::>(); + run_and_assert_sink_compliance(sink, stream::iter(input_events), &POSTGRES_SINK_TAGS).await; + // We drop the event to notify the receiver that the batch was delivered. + std::mem::drop(input_log_events); + assert_eq!(receiver.try_recv(), Ok(BatchStatus::Delivered)); + + let select_all_sql = format!("SELECT * FROM {table} ORDER BY id"); + let actual_events: Vec = sqlx::query_as(&select_all_sql) + .fetch_all(&mut connection) + .await + .unwrap(); + let actual_values = actual_events + .iter() + .map(|event| serde_json::to_value(event).unwrap()) + .collect::>(); + assert_eq!(expected_values, actual_values); +} + +// Using null::{table} with jsonb_populate_recordset does not work with default values +// https://dba.stackexchange.com/questions/308114/use-default-value-instead-of-inserted-null +// https://stackoverflow.com/questions/49992531/postgresql-insert-a-null-convert-to-default +// TODO: this cannot be fixed without a workaround involving a trigger creation, which is beyond +// Vector's job in the DB. We should document this limitation alongside with this test. +#[tokio::test] +async fn default_columns_are_not_populated() { let (config, table, mut connection) = prepare_config().await; let (sink, _hc) = config.build(SinkContext::default()).await.unwrap(); // We store the timestamp as text and not as `timestamp with timezone` postgres type due to // postgres not supporting nanosecond-resolution (it does support microsecond-resolution). let create_table_sql = - format!("CREATE TABLE IF NOT EXISTS {table} (id bigint, host text, timestamp text, message text, payload jsonb)"); + format!("CREATE TABLE IF NOT EXISTS {table} (id BIGINT, not_existing_column TEXT DEFAULT 'default_value')"); sqlx::query(&create_table_sql) .execute(&mut connection) .await @@ -77,23 +163,17 @@ async fn insert_single_event() { run_and_assert_sink_compliance( sink, stream::once(ready(input_event.clone())), - &["endpoint", "protocol"], + &POSTGRES_SINK_TAGS, ) .await; + // We drop the event to notify the receiver that the batch was delivered. + std::mem::drop(input_event); + assert_eq!(receiver.try_recv(), Ok(BatchStatus::Delivered)); - let select_all_sql = format!("SELECT * FROM {table}"); - let actual_event: TestEvent = sqlx::query_as(&select_all_sql) + let select_all_sql = format!("SELECT not_existing_column FROM {table}"); + let inserted_not_existing_column: (Option,) = sqlx::query_as(&select_all_sql) .fetch_one(&mut connection) .await .unwrap(); - - // drop input_event after comparing with response - { - let input_log_event = input_event.into_log(); - let expected_value = serde_json::to_value(&input_log_event).unwrap(); - let actual_value = serde_json::to_value(actual_event).unwrap(); - assert_eq!(expected_value, actual_value); - } - - assert_eq!(receiver.try_recv(), Ok(BatchStatus::Delivered)); + assert_eq!(inserted_not_existing_column.0, None); }