Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

enhancement(compression): zstd compression support #17371

Merged
52 changes: 52 additions & 0 deletions src/sinks/aws_s3/integration_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,51 @@ async fn s3_gzip() {
assert_eq!(lines, response_lines);
}

#[tokio::test]
async fn s3_zstd() {
// Here, we're creating a bunch of events, approximately 3000, while setting our batch size
// to 1000, and using zstd compression. We test to ensure that all of the keys we end up
// writing represent the sum total of the lines: we expect 3 batches, each of which should
// have 1000 lines.
let cx = SinkContext::new_test();

let bucket = uuid::Uuid::new_v4().to_string();

create_bucket(&bucket, false).await;

let batch_size = 1_000;
let batch_multiplier = 3;
let config = S3SinkConfig {
compression: Compression::zstd_default(),
filename_time_format: "%s%f".into(),
..config(&bucket, batch_size)
};

let prefix = config.key_prefix.clone();
let service = config.create_service(&cx.globals.proxy).await.unwrap();
let sink = config.build_processor(service).unwrap();

let (lines, events, receiver) = make_events_batch(100, batch_size * batch_multiplier);
run_and_assert_sink_compliance(sink, events, &AWS_SINK_TAGS).await;
assert_eq!(receiver.await, BatchStatus::Delivered);

let keys = get_keys(&bucket, prefix).await;
assert_eq!(keys.len(), batch_multiplier);

let mut response_lines: Vec<String> = Vec::new();
let mut key_stream = stream::iter(keys);
while let Some(key) = key_stream.next().await {
assert!(key.ends_with(".log.zst"));

let obj = get_object(&bucket, key).await;
assert_eq!(obj.content_encoding, Some("zstd".to_string()));

response_lines.append(&mut get_zstd_lines(obj).await);
}

assert_eq!(lines, response_lines);
}

// NOTE: this test doesn't actually validate anything because localstack
// doesn't enforce the required Content-MD5 header on the request for
// buckets with object lock enabled
Expand Down Expand Up @@ -481,6 +526,13 @@ async fn get_gzipped_lines(obj: GetObjectOutput) -> Vec<String> {
buf_read.lines().map(|l| l.unwrap()).collect()
}

async fn get_zstd_lines(obj: GetObjectOutput) -> Vec<String> {
let body = get_object_output_body(obj).await;
let decoder = zstd::Decoder::new(body).expect("zstd decoder initialization failed");
let buf_read = BufReader::new(decoder);
buf_read.lines().map(|l| l.unwrap()).collect()
}

async fn get_object_output_body(obj: GetObjectOutput) -> impl std::io::Read {
obj.body.collect().await.unwrap().reader()
}
1 change: 1 addition & 0 deletions src/sinks/azure_blob/request_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ impl Compression {
Self::None => "text/plain",
Self::Gzip(_) => "application/gzip",
Self::Zlib(_) => "application/zlib",
Self::Zstd(_) => "application/zstd",
}
}
}
105 changes: 72 additions & 33 deletions src/sinks/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ use std::io::Write;

use bytes::{BufMut, Bytes, BytesMut};
use codecs::encoding::{CharacterDelimitedEncoder, Framer, Serializer};
use flate2::write::{GzEncoder, ZlibEncoder};
use futures::{future, FutureExt, SinkExt};
use http::{
header::{HeaderName, HeaderValue, AUTHORIZATION},
Expand All @@ -23,7 +22,7 @@ use crate::{
sinks::util::{
self,
http::{BatchedHttpSink, HttpEventEncoder, RequestConfig},
BatchConfig, Buffer, Compression, RealtimeSizeBasedDefaultBatchSettings,
BatchConfig, Buffer, Compression, Compressor, RealtimeSizeBasedDefaultBatchSettings,
TowerRequestConfig, UriSerde,
},
tls::{TlsConfig, TlsSettings},
Expand Down Expand Up @@ -375,24 +374,21 @@ impl util::http::HttpSink for HttpSink {
builder = builder.header("Content-Type", content_type);
}

match self.compression {
Compression::Gzip(level) => {
builder = builder.header("Content-Encoding", "gzip");

let buffer = BytesMut::new();
let mut w = GzEncoder::new(buffer.writer(), level.as_flate2());
w.write_all(&body).expect("Writing to Vec can't fail");
body = w.finish().expect("Writing to Vec can't fail").into_inner();
}
Compression::Zlib(level) => {
builder = builder.header("Content-Encoding", "deflate");

let buffer = BytesMut::new();
let mut w = ZlibEncoder::new(buffer.writer(), level.as_flate2());
w.write_all(&body).expect("Writing to Vec can't fail");
body = w.finish().expect("Writing to Vec can't fail").into_inner();
}
Compression::None => {}
let compression = self.compression;

if compression.is_compressed() {
builder = builder.header(
"Content-Encoding",
compression
.content_encoding()
.expect("Encoding should be specified."),
);

let mut compressor = Compressor::from(compression);
compressor
.write_all(&body)
.expect("Writing to Vec can't fail.");
body = compressor.finish().expect("Writing to Vec can't fail.");
}

let headers = builder
Expand Down Expand Up @@ -477,12 +473,12 @@ mod tests {
encoding::FramingConfig, JsonSerializerConfig, NewlineDelimitedEncoderConfig,
TextSerializerConfig,
};
use flate2::read::MultiGzDecoder;
use flate2::{read::MultiGzDecoder, read::ZlibDecoder};
use futures::{channel::mpsc, stream, StreamExt};
use headers::{Authorization, HeaderMapExt};
use http::request::Parts;
use hyper::{Method, Response, StatusCode};
use serde::Deserialize;
use serde::{de, Deserialize};
use vector_core::event::{BatchNotifier, BatchStatus, LogEvent};

use super::*;
Expand Down Expand Up @@ -812,15 +808,44 @@ mod tests {
}

#[tokio::test]
async fn json_compression() {
async fn json_gzip_compression() {
json_compression("gzip").await;
}

#[tokio::test]
async fn json_zstd_compression() {
json_compression("zstd").await;
}

#[tokio::test]
async fn json_zlib_compression() {
json_compression("zlib").await;
}

#[tokio::test]
async fn json_gzip_compression_with_payload_wrapper() {
json_compression_with_payload_wrapper("gzip").await;
}

#[tokio::test]
async fn json_zlib_compression_with_payload_wrapper() {
json_compression_with_payload_wrapper("zlib").await;
}

#[tokio::test]
async fn json_zstd_compression_with_payload_wrapper() {
json_compression_with_payload_wrapper("zstd").await;
}

async fn json_compression(compression: &str) {
components::assert_sink_compliance(&HTTP_SINK_TAGS, async {
let num_lines = 1000;

let in_addr = next_addr();

let config = r#"
uri = "http://$IN_ADDR/frames"
compression = "gzip"
compression = "$COMPRESSION"
encoding.codec = "json"
method = "post"

Expand All @@ -829,7 +854,9 @@ mod tests {
user = "waldo"
password = "hunter2"
"#
.replace("$IN_ADDR", &in_addr.to_string());
.replace("$IN_ADDR", &in_addr.to_string())
.replace("$COMPRESSION", compression);

let config: HttpSinkConfig = toml::from_str(&config).unwrap();

let cx = SinkContext::new_test();
Expand All @@ -856,8 +883,7 @@ mod tests {
Some(Authorization::basic("waldo", "hunter2")),
parts.headers.typed_get()
);
let lines: Vec<serde_json::Value> =
serde_json::from_reader(MultiGzDecoder::new(body.reader())).unwrap();
let lines: Vec<serde_json::Value> = parse_compressed_json(compression, body);
stream::iter(lines)
})
.map(|line| line.get("message").unwrap().as_str().unwrap().to_owned())
Expand All @@ -870,16 +896,15 @@ mod tests {
.await;
}

#[tokio::test]
async fn json_compression_with_payload_wrapper() {
async fn json_compression_with_payload_wrapper(compression: &str) {
components::assert_sink_compliance(&HTTP_SINK_TAGS, async {
let num_lines = 1000;

let in_addr = next_addr();

let config = r#"
uri = "http://$IN_ADDR/frames"
compression = "gzip"
compression = "$COMPRESSION"
encoding.codec = "json"
payload_prefix = '{"data":'
payload_suffix = "}"
Expand All @@ -890,7 +915,9 @@ mod tests {
user = "waldo"
password = "hunter2"
"#
.replace("$IN_ADDR", &in_addr.to_string());
.replace("$IN_ADDR", &in_addr.to_string())
.replace("$COMPRESSION", compression);

let config: HttpSinkConfig = toml::from_str(&config).unwrap();

let cx = SinkContext::new_test();
Expand Down Expand Up @@ -918,8 +945,8 @@ mod tests {
parts.headers.typed_get()
);

let message: serde_json::Value =
serde_json::from_reader(MultiGzDecoder::new(body.reader())).unwrap();
let message: serde_json::Value = parse_compressed_json(compression, body);

let lines: Vec<serde_json::Value> =
message["data"].as_array().unwrap().to_vec();
stream::iter(lines)
Expand All @@ -934,6 +961,18 @@ mod tests {
.await;
}

fn parse_compressed_json<T>(compression: &str, buf: Bytes) -> T
where
T: de::DeserializeOwned,
{
match compression {
"gzip" => serde_json::from_reader(MultiGzDecoder::new(buf.reader())).unwrap(),
"zstd" => serde_json::from_reader(zstd::Decoder::new(buf.reader()).unwrap()).unwrap(),
"zlib" => serde_json::from_reader(ZlibDecoder::new(buf.reader())).unwrap(),
_ => panic!("undefined compression: {}", compression),
}
}

async fn get_received(
rx: mpsc::Receiver<(Parts, Bytes)>,
assert_parts: impl Fn(Parts),
Expand Down
Loading