Skip to content

Commit

Permalink
firehose: integrate with shared compression lib
Browse files Browse the repository at this point in the history
Signed-off-by: Matthew Fala <[email protected]>
  • Loading branch information
matthewfala committed Dec 6, 2021
1 parent a594257 commit 95dfc07
Show file tree
Hide file tree
Showing 4 changed files with 66 additions and 18 deletions.
20 changes: 20 additions & 0 deletions plugins/out_kinesis_firehose/firehose.c
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@
#include <fluent-bit/flb_http_client.h>
#include <fluent-bit/flb_utils.h>

#include <fluent-bit/aws/flb_aws_compress.h>

#include <monkey/mk_core.h>
#include <msgpack.h>
#include <string.h>
Expand Down Expand Up @@ -119,6 +121,15 @@ static int cb_firehose_init(struct flb_output_instance *ins,
ctx->sts_endpoint = (char *) tmp;
}

tmp = flb_output_get_property("compression", ins);
if (tmp) {
ret = flb_aws_compression_get_type(tmp);
if (ret == -1) {
flb_plg_error(ctx->ins, "unknown compression: %s", tmp);
goto error;
}
ctx->compression = ret;
}

tmp = flb_output_get_property("log_key", ins);
if (tmp) {
Expand Down Expand Up @@ -427,6 +438,15 @@ static struct flb_config_map config_map[] = {
"Custom endpoint for the STS API."
},

{
FLB_CONFIG_MAP_STR, "compression", NULL,
0, FLB_FALSE, 0,
"Compression type for Firehose records. Each log record is individually compressed "
"and sent to Firehose. 'gzip' and 'arrow' are the supported values. "
"'arrow' is only an available if Apache Arrow was enabled at compile time. "
"Defaults to no compression."
},

{
FLB_CONFIG_MAP_STR, "log_key", NULL,
0, FLB_TRUE, offsetof(struct flb_firehose, log_key),
Expand Down
1 change: 1 addition & 0 deletions plugins/out_kinesis_firehose/firehose.h
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ struct flb_firehose {
char *sts_endpoint;
int custom_endpoint;
int retry_requests;
int compression;

/* must be freed on shutdown if custom_endpoint is not set */
char *endpoint;
Expand Down
62 changes: 44 additions & 18 deletions plugins/out_kinesis_firehose/firehose_api.c
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@
#include <fluent-bit/flb_http_client.h>
#include <fluent-bit/flb_utils.h>

#include <fluent-bit/aws/flb_aws_compress.h>

#include <monkey/mk_core.h>
#include <mbedtls/base64.h>
#include <msgpack.h>
Expand Down Expand Up @@ -160,6 +162,7 @@ static int process_event(struct flb_firehose *ctx, struct flush *buf,
struct tm *tmp;
size_t len;
size_t tmp_size;
void *compressed_tmp_buf;

tmp_buf_ptr = buf->tmp_buf + buf->tmp_buf_offset;
ret = flb_msgpack_to_json(tmp_buf_ptr,
Expand Down Expand Up @@ -260,29 +263,52 @@ static int process_event(struct flb_firehose *ctx, struct flush *buf,
memcpy(tmp_buf_ptr + written, "\n", 1);
written++;

/*
* check if event_buf is initialized and big enough
* Base64 encoding will increase size by ~4/3
*/
size = (written * 1.5) + 4;
if (buf->event_buf == NULL || buf->event_buf_size < size) {
flb_free(buf->event_buf);
buf->event_buf = flb_malloc(size);
buf->event_buf_size = size;
if (buf->event_buf == NULL) {
if (ctx->compression == FLB_AWS_COMPRESS_NONE) {
/*
* check if event_buf is initialized and big enough
* Base64 encoding will increase size by ~4/3
*/
size = (written * 1.5) + 4;
if (buf->event_buf == NULL || buf->event_buf_size < size) {
flb_free(buf->event_buf);
buf->event_buf = flb_malloc(size);
buf->event_buf_size = size;
if (buf->event_buf == NULL) {
flb_errno();
return -1;
}
}

tmp_buf_ptr = buf->tmp_buf + buf->tmp_buf_offset;

ret = mbedtls_base64_encode((unsigned char *) buf->event_buf, size, &b64_len,
(unsigned char *) tmp_buf_ptr, written);
if (ret != 0) {
flb_errno();
return -1;
}
written = b64_len;
}

tmp_buf_ptr = buf->tmp_buf + buf->tmp_buf_offset;
ret = mbedtls_base64_encode((unsigned char *) buf->event_buf, size, &b64_len,
(unsigned char *) tmp_buf_ptr, written);
if (ret != 0) {
flb_errno();
return -1;
else {
/*
* compress event, truncating input if needed
* replace event buffer with compressed buffer
*/
ret = flb_aws_compression_b64_truncate_compress(ctx->compression,
MAX_B64_EVENT_SIZE,
tmp_buf_ptr,
written, &compressed_tmp_buf,
&size); /* evaluate size */
if (ret == -1) {
flb_plg_error(ctx->ins, "Unable to compress record, discarding, "
"%s", written + 1, ctx->delivery_stream);
return 2;
}
flb_free(buf->event_buf);
buf->event_buf = compressed_tmp_buf;
compressed_tmp_buf = NULL;
written = size;
}
written = b64_len;

tmp_buf_ptr = buf->tmp_buf + buf->tmp_buf_offset;
if ((buf->tmp_buf_size - buf->tmp_buf_offset) < written) {
Expand Down
1 change: 1 addition & 0 deletions plugins/out_kinesis_firehose/firehose_api.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
#define PUT_RECORD_BATCH_PAYLOAD_SIZE 4194304
#define MAX_EVENTS_PER_PUT 500
#define MAX_EVENT_SIZE 1024000
#define MAX_B64_EVENT_SIZE 1365336 /* ceil(1024000 / 3) * 4 */

/* number of characters needed to 'start' a PutRecordBatch payload */
#define PUT_RECORD_BATCH_HEADER_LEN 42
Expand Down

0 comments on commit 95dfc07

Please sign in to comment.