From 40f525cae3eb7c6867afeeed4b2bd82cf85f5a65 Mon Sep 17 00:00:00 2001 From: Nathan Fox Date: Thu, 17 Aug 2023 09:15:05 -0400 Subject: [PATCH] fix(aws_s3 source): Use the decoder to calculate type defs (#18274) * fix output type * fix output type too * update tests * fix tests * add json test * cleanup * ignore temporary compose files * fix .gitignore style --- scripts/integration/.gitignore | 1 + src/sources/aws_s3/mod.rs | 62 ++++++++++++++++++++++++++++++---- 2 files changed, 57 insertions(+), 6 deletions(-) create mode 100644 scripts/integration/.gitignore diff --git a/scripts/integration/.gitignore b/scripts/integration/.gitignore new file mode 100644 index 0000000000000..a86d6b738ce29 --- /dev/null +++ b/scripts/integration/.gitignore @@ -0,0 +1 @@ +*/compose-temp*.yaml diff --git a/src/sources/aws_s3/mod.rs b/src/sources/aws_s3/mod.rs index 0478d9d192fb1..5338ba144bef4 100644 --- a/src/sources/aws_s3/mod.rs +++ b/src/sources/aws_s3/mod.rs @@ -3,7 +3,7 @@ use std::{convert::TryInto, io::ErrorKind}; use async_compression::tokio::bufread; use aws_sdk_s3::types::ByteStream; use codecs::decoding::{DeserializerConfig, FramingConfig, NewlineDelimitedDecoderOptions}; -use codecs::{BytesDeserializerConfig, NewlineDelimitedDecoderConfig}; +use codecs::NewlineDelimitedDecoderConfig; use futures::{stream, stream::StreamExt, TryStreamExt}; use lookup::owned_value_path; use snafu::Snafu; @@ -14,7 +14,6 @@ use vrl::value::{kind::Collection, Kind}; use super::util::MultilineConfig; use crate::codecs::DecodingConfig; -use crate::config::DataType; use crate::{ aws::{auth::AwsAuthentication, create_client, create_client_and_region, RegionOrEndpoint}, common::{s3::S3ClientBuilder, sqs::SqsClientBuilder}, @@ -163,7 +162,8 @@ impl SourceConfig for AwsS3Config { fn outputs(&self, global_log_namespace: LogNamespace) -> Vec { let log_namespace = global_log_namespace.merge(self.log_namespace); - let mut schema_definition = BytesDeserializerConfig + let mut schema_definition = self + .decoding .schema_definition(log_namespace) .with_source_metadata( Self::NAME, @@ -199,7 +199,7 @@ impl SourceConfig for AwsS3Config { Self::NAME, None, &owned_value_path!("metadata"), - Kind::object(Collection::empty().with_unknown(Kind::bytes())), + Kind::object(Collection::empty().with_unknown(Kind::bytes())).or_undefined(), None, ); @@ -208,7 +208,10 @@ impl SourceConfig for AwsS3Config { schema_definition = schema_definition.unknown_fields(Kind::bytes()); } - vec![SourceOutput::new_logs(DataType::Log, schema_definition)] + vec![SourceOutput::new_logs( + self.decoding.output_type(), + schema_definition, + )] } fn can_acknowledge(&self) -> bool { @@ -440,6 +443,7 @@ mod integration_tests { use aws_sdk_s3::{types::ByteStream, Client as S3Client}; use aws_sdk_sqs::{model::QueueAttributeName, Client as SqsClient}; + use codecs::{decoding::DeserializerConfig, JsonDeserializerConfig}; use lookup::path; use similar_asserts::assert_eq; use vrl::value::Value; @@ -483,6 +487,35 @@ mod integration_tests { logs, Delivered, false, + DeserializerConfig::Bytes, + ) + .await; + } + + #[tokio::test] + async fn s3_process_json_message() { + trace_init(); + + let logs: Vec = random_lines(100).take(10).collect(); + + let json_logs: Vec = logs + .iter() + .map(|msg| { + // convert to JSON object + format!(r#"{{"message": "{}"}}"#, msg) + }) + .collect(); + + test_event( + None, + None, + None, + None, + json_logs.join("\n").into_bytes(), + logs, + Delivered, + false, + DeserializerConfig::Json(JsonDeserializerConfig::default()), ) .await; } @@ -502,6 +535,7 @@ mod integration_tests { logs, Delivered, true, + DeserializerConfig::Bytes, ) .await; } @@ -522,6 +556,7 @@ mod integration_tests { logs, Delivered, false, + DeserializerConfig::Bytes, ) .await; } @@ -542,6 +577,7 @@ mod integration_tests { logs, Delivered, false, + DeserializerConfig::Bytes, ) .await; } @@ -570,6 +606,7 @@ mod integration_tests { logs, Delivered, false, + DeserializerConfig::Bytes, ) .await; } @@ -599,6 +636,7 @@ mod integration_tests { logs, Delivered, false, + DeserializerConfig::Bytes, ) .await; } @@ -628,6 +666,7 @@ mod integration_tests { logs, Delivered, false, + DeserializerConfig::Bytes, ) .await; } @@ -655,6 +694,7 @@ mod integration_tests { vec!["abc\ndef\ngeh".to_owned()], Delivered, false, + DeserializerConfig::Bytes, ) .await; } @@ -677,6 +717,7 @@ mod integration_tests { logs, Errored, false, + DeserializerConfig::Bytes, ) .await; } @@ -696,6 +737,7 @@ mod integration_tests { logs, Rejected, false, + DeserializerConfig::Bytes, ) .await; } @@ -708,6 +750,7 @@ mod integration_tests { queue_url: &str, multiline: Option, log_namespace: bool, + decoding: DeserializerConfig, ) -> AwsS3Config { AwsS3Config { region: RegionOrEndpoint::with_both("us-east-1", s3_address()), @@ -723,6 +766,7 @@ mod integration_tests { }), acknowledgements: true.into(), log_namespace: Some(log_namespace), + decoding, ..Default::default() } } @@ -738,6 +782,7 @@ mod integration_tests { expected_lines: Vec, status: EventStatus, log_namespace: bool, + decoding: DeserializerConfig, ) { assert_source_compliance(&SOURCE_TAGS, async move { let key = key.unwrap_or_else(|| uuid::Uuid::new_v4().to_string()); @@ -750,7 +795,7 @@ mod integration_tests { tokio::time::sleep(Duration::from_secs(1)).await; - let config = config(&queue, multiline, log_namespace); + let config = config(&queue, multiline, log_namespace, decoding); s3.put_object() .bucket(bucket.clone()) @@ -831,6 +876,11 @@ mod integration_tests { assert_eq!(expected_lines.len(), events.len()); for (i, event) in events.iter().enumerate() { + + if let Some(schema_definition) = config.outputs(namespace).pop().unwrap().schema_definition { + schema_definition.is_valid_for_event(event).unwrap(); + } + let message = expected_lines[i].as_str(); let log = event.as_log();