Skip to content

Commit

Permalink
fix(aws_s3 source): Use the decoder to calculate type defs (#18274)
Browse files Browse the repository at this point in the history
* fix output type

* fix output type too

* update tests

* fix tests

* add json test

* cleanup

* ignore temporary compose files

* fix .gitignore style
  • Loading branch information
fuchsnj authored Aug 17, 2023
1 parent 294c1dd commit 40f525c
Show file tree
Hide file tree
Showing 2 changed files with 57 additions and 6 deletions.
1 change: 1 addition & 0 deletions scripts/integration/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
*/compose-temp*.yaml
62 changes: 56 additions & 6 deletions src/sources/aws_s3/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use std::{convert::TryInto, io::ErrorKind};
use async_compression::tokio::bufread;
use aws_sdk_s3::types::ByteStream;
use codecs::decoding::{DeserializerConfig, FramingConfig, NewlineDelimitedDecoderOptions};
use codecs::{BytesDeserializerConfig, NewlineDelimitedDecoderConfig};
use codecs::NewlineDelimitedDecoderConfig;
use futures::{stream, stream::StreamExt, TryStreamExt};
use lookup::owned_value_path;
use snafu::Snafu;
Expand All @@ -14,7 +14,6 @@ use vrl::value::{kind::Collection, Kind};

use super::util::MultilineConfig;
use crate::codecs::DecodingConfig;
use crate::config::DataType;
use crate::{
aws::{auth::AwsAuthentication, create_client, create_client_and_region, RegionOrEndpoint},
common::{s3::S3ClientBuilder, sqs::SqsClientBuilder},
Expand Down Expand Up @@ -163,7 +162,8 @@ impl SourceConfig for AwsS3Config {

fn outputs(&self, global_log_namespace: LogNamespace) -> Vec<SourceOutput> {
let log_namespace = global_log_namespace.merge(self.log_namespace);
let mut schema_definition = BytesDeserializerConfig
let mut schema_definition = self
.decoding
.schema_definition(log_namespace)
.with_source_metadata(
Self::NAME,
Expand Down Expand Up @@ -199,7 +199,7 @@ impl SourceConfig for AwsS3Config {
Self::NAME,
None,
&owned_value_path!("metadata"),
Kind::object(Collection::empty().with_unknown(Kind::bytes())),
Kind::object(Collection::empty().with_unknown(Kind::bytes())).or_undefined(),
None,
);

Expand All @@ -208,7 +208,10 @@ impl SourceConfig for AwsS3Config {
schema_definition = schema_definition.unknown_fields(Kind::bytes());
}

vec![SourceOutput::new_logs(DataType::Log, schema_definition)]
vec![SourceOutput::new_logs(
self.decoding.output_type(),
schema_definition,
)]
}

fn can_acknowledge(&self) -> bool {
Expand Down Expand Up @@ -440,6 +443,7 @@ mod integration_tests {

use aws_sdk_s3::{types::ByteStream, Client as S3Client};
use aws_sdk_sqs::{model::QueueAttributeName, Client as SqsClient};
use codecs::{decoding::DeserializerConfig, JsonDeserializerConfig};
use lookup::path;
use similar_asserts::assert_eq;
use vrl::value::Value;
Expand Down Expand Up @@ -483,6 +487,35 @@ mod integration_tests {
logs,
Delivered,
false,
DeserializerConfig::Bytes,
)
.await;
}

#[tokio::test]
async fn s3_process_json_message() {
trace_init();

let logs: Vec<String> = random_lines(100).take(10).collect();

let json_logs: Vec<String> = logs
.iter()
.map(|msg| {
// convert to JSON object
format!(r#"{{"message": "{}"}}"#, msg)
})
.collect();

test_event(
None,
None,
None,
None,
json_logs.join("\n").into_bytes(),
logs,
Delivered,
false,
DeserializerConfig::Json(JsonDeserializerConfig::default()),
)
.await;
}
Expand All @@ -502,6 +535,7 @@ mod integration_tests {
logs,
Delivered,
true,
DeserializerConfig::Bytes,
)
.await;
}
Expand All @@ -522,6 +556,7 @@ mod integration_tests {
logs,
Delivered,
false,
DeserializerConfig::Bytes,
)
.await;
}
Expand All @@ -542,6 +577,7 @@ mod integration_tests {
logs,
Delivered,
false,
DeserializerConfig::Bytes,
)
.await;
}
Expand Down Expand Up @@ -570,6 +606,7 @@ mod integration_tests {
logs,
Delivered,
false,
DeserializerConfig::Bytes,
)
.await;
}
Expand Down Expand Up @@ -599,6 +636,7 @@ mod integration_tests {
logs,
Delivered,
false,
DeserializerConfig::Bytes,
)
.await;
}
Expand Down Expand Up @@ -628,6 +666,7 @@ mod integration_tests {
logs,
Delivered,
false,
DeserializerConfig::Bytes,
)
.await;
}
Expand Down Expand Up @@ -655,6 +694,7 @@ mod integration_tests {
vec!["abc\ndef\ngeh".to_owned()],
Delivered,
false,
DeserializerConfig::Bytes,
)
.await;
}
Expand All @@ -677,6 +717,7 @@ mod integration_tests {
logs,
Errored,
false,
DeserializerConfig::Bytes,
)
.await;
}
Expand All @@ -696,6 +737,7 @@ mod integration_tests {
logs,
Rejected,
false,
DeserializerConfig::Bytes,
)
.await;
}
Expand All @@ -708,6 +750,7 @@ mod integration_tests {
queue_url: &str,
multiline: Option<MultilineConfig>,
log_namespace: bool,
decoding: DeserializerConfig,
) -> AwsS3Config {
AwsS3Config {
region: RegionOrEndpoint::with_both("us-east-1", s3_address()),
Expand All @@ -723,6 +766,7 @@ mod integration_tests {
}),
acknowledgements: true.into(),
log_namespace: Some(log_namespace),
decoding,
..Default::default()
}
}
Expand All @@ -738,6 +782,7 @@ mod integration_tests {
expected_lines: Vec<String>,
status: EventStatus,
log_namespace: bool,
decoding: DeserializerConfig,
) {
assert_source_compliance(&SOURCE_TAGS, async move {
let key = key.unwrap_or_else(|| uuid::Uuid::new_v4().to_string());
Expand All @@ -750,7 +795,7 @@ mod integration_tests {

tokio::time::sleep(Duration::from_secs(1)).await;

let config = config(&queue, multiline, log_namespace);
let config = config(&queue, multiline, log_namespace, decoding);

s3.put_object()
.bucket(bucket.clone())
Expand Down Expand Up @@ -831,6 +876,11 @@ mod integration_tests {

assert_eq!(expected_lines.len(), events.len());
for (i, event) in events.iter().enumerate() {

if let Some(schema_definition) = config.outputs(namespace).pop().unwrap().schema_definition {
schema_definition.is_valid_for_event(event).unwrap();
}

let message = expected_lines[i].as_str();

let log = event.as_log();
Expand Down

0 comments on commit 40f525c

Please sign in to comment.