diff --git a/plugins/out_kinesis_firehose/firehose.c b/plugins/out_kinesis_firehose/firehose.c index b2bc01caa17..5451c00459d 100644 --- a/plugins/out_kinesis_firehose/firehose.c +++ b/plugins/out_kinesis_firehose/firehose.c @@ -35,6 +35,8 @@ #include #include +#include + #include #include #include @@ -119,6 +121,15 @@ static int cb_firehose_init(struct flb_output_instance *ins, ctx->sts_endpoint = (char *) tmp; } + tmp = flb_output_get_property("compression", ins); + if (tmp) { + ret = flb_aws_compression_get_type(tmp); + if (ret == -1) { + flb_plg_error(ctx->ins, "unknown compression: %s", tmp); + goto error; + } + ctx->compression = ret; + } tmp = flb_output_get_property("log_key", ins); if (tmp) { @@ -427,6 +438,15 @@ static struct flb_config_map config_map[] = { "Custom endpoint for the STS API." }, + { + FLB_CONFIG_MAP_STR, "compression", NULL, + 0, FLB_FALSE, 0, + "Compression type for Firehose records. Each log record is individually compressed " + "and sent to Firehose. 'gzip' and 'arrow' are the supported values. " + "'arrow' is only an available if Apache Arrow was enabled at compile time. " + "Defaults to no compression." + }, + { FLB_CONFIG_MAP_STR, "log_key", NULL, 0, FLB_TRUE, offsetof(struct flb_firehose, log_key), diff --git a/plugins/out_kinesis_firehose/firehose.h b/plugins/out_kinesis_firehose/firehose.h index 7f84e0002dc..85dd0f72f6d 100644 --- a/plugins/out_kinesis_firehose/firehose.h +++ b/plugins/out_kinesis_firehose/firehose.h @@ -89,6 +89,7 @@ struct flb_firehose { char *sts_endpoint; int custom_endpoint; int retry_requests; + int compression; /* must be freed on shutdown if custom_endpoint is not set */ char *endpoint; diff --git a/plugins/out_kinesis_firehose/firehose_api.c b/plugins/out_kinesis_firehose/firehose_api.c index 2e5cd751e43..4b1568536e0 100644 --- a/plugins/out_kinesis_firehose/firehose_api.c +++ b/plugins/out_kinesis_firehose/firehose_api.c @@ -36,6 +36,8 @@ #include #include +#include + #include #include #include @@ -160,6 +162,7 @@ static int process_event(struct flb_firehose *ctx, struct flush *buf, struct tm *tmp; size_t len; size_t tmp_size; + void *compressed_tmp_buf; tmp_buf_ptr = buf->tmp_buf + buf->tmp_buf_offset; ret = flb_msgpack_to_json(tmp_buf_ptr, @@ -260,29 +263,52 @@ static int process_event(struct flb_firehose *ctx, struct flush *buf, memcpy(tmp_buf_ptr + written, "\n", 1); written++; - /* - * check if event_buf is initialized and big enough - * Base64 encoding will increase size by ~4/3 - */ - size = (written * 1.5) + 4; - if (buf->event_buf == NULL || buf->event_buf_size < size) { - flb_free(buf->event_buf); - buf->event_buf = flb_malloc(size); - buf->event_buf_size = size; - if (buf->event_buf == NULL) { + if (ctx->compression == FLB_AWS_COMPRESS_NONE) { + /* + * check if event_buf is initialized and big enough + * Base64 encoding will increase size by ~4/3 + */ + size = (written * 1.5) + 4; + if (buf->event_buf == NULL || buf->event_buf_size < size) { + flb_free(buf->event_buf); + buf->event_buf = flb_malloc(size); + buf->event_buf_size = size; + if (buf->event_buf == NULL) { + flb_errno(); + return -1; + } + } + + tmp_buf_ptr = buf->tmp_buf + buf->tmp_buf_offset; + + ret = mbedtls_base64_encode((unsigned char *) buf->event_buf, size, &b64_len, + (unsigned char *) tmp_buf_ptr, written); + if (ret != 0) { flb_errno(); return -1; } + written = b64_len; } - - tmp_buf_ptr = buf->tmp_buf + buf->tmp_buf_offset; - ret = mbedtls_base64_encode((unsigned char *) buf->event_buf, size, &b64_len, - (unsigned char *) tmp_buf_ptr, written); - if (ret != 0) { - flb_errno(); - return -1; + else { + /* + * compress event, truncating input if needed + * replace event buffer with compressed buffer + */ + ret = flb_aws_compression_b64_truncate_compress(ctx->compression, + MAX_B64_EVENT_SIZE, + tmp_buf_ptr, + written, &compressed_tmp_buf, + &size); /* evaluate size */ + if (ret == -1) { + flb_plg_error(ctx->ins, "Unable to compress record, discarding, " + "%s", written + 1, ctx->delivery_stream); + return 2; + } + flb_free(buf->event_buf); + buf->event_buf = compressed_tmp_buf; + compressed_tmp_buf = NULL; + written = size; } - written = b64_len; tmp_buf_ptr = buf->tmp_buf + buf->tmp_buf_offset; if ((buf->tmp_buf_size - buf->tmp_buf_offset) < written) { diff --git a/plugins/out_kinesis_firehose/firehose_api.h b/plugins/out_kinesis_firehose/firehose_api.h index 69e92d392c5..386aa80497c 100644 --- a/plugins/out_kinesis_firehose/firehose_api.h +++ b/plugins/out_kinesis_firehose/firehose_api.h @@ -24,6 +24,7 @@ #define PUT_RECORD_BATCH_PAYLOAD_SIZE 4194304 #define MAX_EVENTS_PER_PUT 500 #define MAX_EVENT_SIZE 1024000 +#define MAX_B64_EVENT_SIZE 1365336 /* ceil(1024000 / 3) * 4 */ /* number of characters needed to 'start' a PutRecordBatch payload */ #define PUT_RECORD_BATCH_HEADER_LEN 42