Skip to content

Commit

Permalink
out_kafka: Introduce the raw_log_key option raw format
Browse files Browse the repository at this point in the history
Allow to write the value of a single key instead of the entire message
to kafka. This allows to use a part of the message as the message_key
and another part as the payload. This is similar to other output
plugins.

The value of the key must be a string and the format must be set to raw.

Signed-off-by: Holger Hans Peter Freyther <[email protected]>
  • Loading branch information
zecke authored and edsiper committed Nov 9, 2024
1 parent a162864 commit 56255ac
Show file tree
Hide file tree
Showing 3 changed files with 43 additions and 0 deletions.
28 changes: 28 additions & 0 deletions plugins/out_kafka/kafka.c
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ int produce_message(struct flb_time *tm, msgpack_object *map,
char *dynamic_topic;
char *message_key = NULL;
size_t message_key_len = 0;
flb_sds_t raw_key = NULL;
struct flb_kafka_topic *topic = NULL;
msgpack_sbuffer mp_sbuf;
msgpack_packer mp_pck;
Expand Down Expand Up @@ -211,6 +212,14 @@ int produce_message(struct flb_time *tm, msgpack_object *map,
}
}

/* Lookup raw_log_key */
if (ctx->raw_log_key && ctx->format == FLB_KAFKA_FMT_RAW && !raw_key && val.type == MSGPACK_OBJECT_STR) {
if (key.via.str.size == ctx->raw_log_key_len &&
strncmp(key.via.str.ptr, ctx->raw_log_key, ctx->raw_log_key_len) == 0) {
raw_key = flb_sds_create_len(val.via.str.ptr, val.via.str.size);
}
}

/* Lookup key/topic */
if (ctx->topic_key && !topic && val.type == MSGPACK_OBJECT_STR) {
if (key.via.str.size == ctx->topic_key_len &&
Expand Down Expand Up @@ -346,6 +355,15 @@ int produce_message(struct flb_time *tm, msgpack_object *map,

}
#endif
else if (ctx->format == FLB_KAFKA_FMT_RAW) {
if (raw_key == NULL) {
flb_plg_error(ctx->ins, "missing raw_log_key");
msgpack_sbuffer_destroy(&mp_sbuf);
return FLB_ERROR;
}
out_buf = raw_key;
out_size = flb_sds_len(raw_key);
}

if (!message_key) {
message_key = ctx->message_key;
Expand All @@ -363,6 +381,7 @@ int produce_message(struct flb_time *tm, msgpack_object *map,
AVRO_FREE(avro_fast_buffer, out_buf)
}
#endif
flb_sds_destroy(raw_key);
return FLB_ERROR;
}

Expand All @@ -384,6 +403,7 @@ int produce_message(struct flb_time *tm, msgpack_object *map,
AVRO_FREE(avro_fast_buffer, out_buf)
}
#endif
flb_sds_destroy(raw_key);
/*
* Unblock the flush requests so that the
* engine could try sending data again.
Expand Down Expand Up @@ -455,6 +475,7 @@ int produce_message(struct flb_time *tm, msgpack_object *map,
AVRO_FREE(avro_fast_buffer, out_buf)
}
#endif
flb_sds_destroy(raw_key);

msgpack_sbuffer_destroy(&mp_sbuf);
return FLB_OK;
Expand Down Expand Up @@ -643,6 +664,13 @@ static struct flb_config_map config_map[] = {
0, FLB_FALSE, 0,
"Set the kafka group_id."
},
{
FLB_CONFIG_MAP_STR, "raw_log_key", NULL,
0, FLB_TRUE, offsetof(struct flb_out_kafka, raw_log_key),
"By default, the whole log record will be sent to Kafka. "
"If you specify a key name with this option, then only the value of "
"that key will be sent to Kafka."
},
/* EOF */
{0}
};
Expand Down
11 changes: 11 additions & 0 deletions plugins/out_kafka/kafka_config.c
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,9 @@ struct flb_out_kafka *flb_out_kafka_create(struct flb_output_instance *ins,
ctx->format = FLB_KAFKA_FMT_AVRO;
}
#endif
else if (strcasecmp(ctx->format_str, "raw") == 0) {
ctx->format = FLB_KAFKA_FMT_RAW;
}
}
else {
ctx->format = FLB_KAFKA_FMT_JSON;
Expand All @@ -114,6 +117,14 @@ struct flb_out_kafka *flb_out_kafka_create(struct flb_output_instance *ins,
ctx->message_key_field_len = 0;
}

/* Config: Log_Key */
if (ctx->raw_log_key) {
ctx->raw_log_key_len = strlen(ctx->raw_log_key);
}
else {
ctx->raw_log_key_len = 0;
}

/* Config: Timestamp_Key */
if (ctx->timestamp_key) {
ctx->timestamp_key_len = strlen(ctx->timestamp_key);
Expand Down
4 changes: 4 additions & 0 deletions plugins/out_kafka/kafka_config.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
#ifdef FLB_HAVE_AVRO_ENCODER
#define FLB_KAFKA_FMT_AVRO 3
#endif
#define FLB_KAFKA_FMT_RAW 4
#define FLB_KAFKA_TS_KEY "@timestamp"
#define FLB_KAFKA_QUEUE_FULL_RETRIES "10"

Expand Down Expand Up @@ -80,6 +81,9 @@ struct flb_out_kafka {
int message_key_field_len;
char *message_key_field;

int raw_log_key_len;
char *raw_log_key;

/* Gelf Keys */
struct flb_gelf_fields gelf_fields;

Expand Down

0 comments on commit 56255ac

Please sign in to comment.