Skip to content

Commit

Permalink
out_kinesis_firehose: add support for log_key (fluent#2619)
Browse files Browse the repository at this point in the history
* out_kinesis_firehose: add support for log_key

Signed-off-by: Wesley Pettit <[email protected]>
  • Loading branch information
PettitWesley authored and xmcqueen committed Oct 6, 2020
1 parent 06ffa92 commit e3c5d8d
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 6 deletions.
32 changes: 26 additions & 6 deletions plugins/out_kinesis_firehose/firehose.c
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,16 @@ static int cb_firehose_init(struct flb_output_instance *ins,
ctx->time_key_format = DEFAULT_TIME_KEY_FORMAT;
}

tmp = flb_output_get_property("log_key", ins);
if (tmp) {
ctx->log_key = tmp;
}

if (ctx->log_key && ctx->time_key) {
flb_plg_error(ctx->ins, "'time_key' and 'log_key' can not be used together");
goto error;
}

tmp = flb_output_get_property("endpoint", ins);
if (tmp) {
ctx->custom_endpoint = FLB_TRUE;
Expand Down Expand Up @@ -371,33 +381,33 @@ static int cb_firehose_exit(void *data, struct flb_config *config)
static struct flb_config_map config_map[] = {
{
FLB_CONFIG_MAP_STR, "region", NULL,
0, FLB_FALSE, 0,
0, FLB_TRUE, offsetof(struct flb_firehose, region),
"The AWS region of your delivery stream"
},

{
FLB_CONFIG_MAP_STR, "delivery_stream", NULL,
0, FLB_FALSE, 0,
0, FLB_TRUE, offsetof(struct flb_firehose, delivery_stream),
"Firehose delivery stream name"
},

{
FLB_CONFIG_MAP_STR, "time_key", NULL,
0, FLB_FALSE, 0,
0, FLB_TRUE, offsetof(struct flb_firehose, time_key),
"Add the timestamp to the record under this key. By default the timestamp "
"from Fluent Bit will not be added to records sent to Kinesis."
},

{
FLB_CONFIG_MAP_STR, "time_key_format", NULL,
0, FLB_FALSE, 0,
0, FLB_TRUE, offsetof(struct flb_firehose, time_key_format),
"strftime compliant format string for the timestamp; for example, "
"the default is '%Y-%m-%dT%H:%M:%S'. This option is used with time_key. "
},

{
FLB_CONFIG_MAP_STR, "role_arn", NULL,
0, FLB_FALSE, 0,
0, FLB_TRUE, offsetof(struct flb_firehose, role_arn),
"ARN of an IAM role to assume (ex. for cross account access)."
},

Expand All @@ -409,10 +419,20 @@ static struct flb_config_map config_map[] = {

{
FLB_CONFIG_MAP_STR, "sts_endpoint", NULL,
0, FLB_FALSE, 0,
0, FLB_TRUE, offsetof(struct flb_firehose, sts_endpoint),
"Custom endpoint for the STS API."
},

{
FLB_CONFIG_MAP_STR, "log_key", NULL,
0, FLB_TRUE, offsetof(struct flb_firehose, log_key),
"By default, the whole log record will be sent to Firehose. "
"If you specify a key name with this option, then only the value of "
"that key will be sent to Firehose. For example, if you are using "
"the Fluentd Docker log driver, you can specify `log_key log` and only "
"the log message will be sent to Firehose."
},

/* EOF */
{0}
};
Expand Down
11 changes: 11 additions & 0 deletions plugins/out_kinesis_firehose/firehose_api.c
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,17 @@ static int process_event(struct flb_firehose *ctx, struct flush *buf,
return 2;
}

if (ctx->log_key) {
/*
* flb_msgpack_to_json will encase the value in quotes
* We don't want that for log_key, so we ignore the first
* and last character
*/
written -= 2;
tmp_buf_ptr++; /* pass over the opening quote */
buf->tmp_buf_offset++;
}

/* is (written + 1) because we still have to append newline */
if ((written + 1) >= MAX_EVENT_SIZE) {
flb_plg_warn(ctx->ins, "[size=%zu] Discarding record which is larger than "
Expand Down

0 comments on commit e3c5d8d

Please sign in to comment.