diff --git a/plugins/out_kafka/kafka.c b/plugins/out_kafka/kafka.c index 476e528cb83..95fa5cd3e11 100644 --- a/plugins/out_kafka/kafka.c +++ b/plugins/out_kafka/kafka.c @@ -100,6 +100,7 @@ int produce_message(struct flb_time *tm, msgpack_object *map, char *dynamic_topic; char *message_key = NULL; size_t message_key_len = 0; + flb_sds_t raw_key = NULL; struct flb_kafka_topic *topic = NULL; msgpack_sbuffer mp_sbuf; msgpack_packer mp_pck; @@ -211,6 +212,14 @@ int produce_message(struct flb_time *tm, msgpack_object *map, } } + /* Lookup raw_log_key */ + if (ctx->raw_log_key && ctx->format == FLB_KAFKA_FMT_RAW && !raw_key && val.type == MSGPACK_OBJECT_STR) { + if (key.via.str.size == ctx->raw_log_key_len && + strncmp(key.via.str.ptr, ctx->raw_log_key, ctx->raw_log_key_len) == 0) { + raw_key = flb_sds_create_len(val.via.str.ptr, val.via.str.size); + } + } + /* Lookup key/topic */ if (ctx->topic_key && !topic && val.type == MSGPACK_OBJECT_STR) { if (key.via.str.size == ctx->topic_key_len && @@ -346,6 +355,15 @@ int produce_message(struct flb_time *tm, msgpack_object *map, } #endif + else if (ctx->format == FLB_KAFKA_FMT_RAW) { + if (raw_key == NULL) { + flb_plg_error(ctx->ins, "missing raw_log_key"); + msgpack_sbuffer_destroy(&mp_sbuf); + return FLB_ERROR; + } + out_buf = raw_key; + out_size = flb_sds_len(raw_key); + } if (!message_key) { message_key = ctx->message_key; @@ -363,6 +381,7 @@ int produce_message(struct flb_time *tm, msgpack_object *map, AVRO_FREE(avro_fast_buffer, out_buf) } #endif + flb_sds_destroy(raw_key); return FLB_ERROR; } @@ -384,6 +403,7 @@ int produce_message(struct flb_time *tm, msgpack_object *map, AVRO_FREE(avro_fast_buffer, out_buf) } #endif + flb_sds_destroy(raw_key); /* * Unblock the flush requests so that the * engine could try sending data again. @@ -455,6 +475,7 @@ int produce_message(struct flb_time *tm, msgpack_object *map, AVRO_FREE(avro_fast_buffer, out_buf) } #endif + flb_sds_destroy(raw_key); msgpack_sbuffer_destroy(&mp_sbuf); return FLB_OK; @@ -643,6 +664,13 @@ static struct flb_config_map config_map[] = { 0, FLB_FALSE, 0, "Set the kafka group_id." }, + { + FLB_CONFIG_MAP_STR, "raw_log_key", NULL, + 0, FLB_TRUE, offsetof(struct flb_out_kafka, raw_log_key), + "By default, the whole log record will be sent to Kafka. " + "If you specify a key name with this option, then only the value of " + "that key will be sent to Kafka." + }, /* EOF */ {0} }; diff --git a/plugins/out_kafka/kafka_config.c b/plugins/out_kafka/kafka_config.c index 04e91255635..fb2f5ca146c 100644 --- a/plugins/out_kafka/kafka_config.c +++ b/plugins/out_kafka/kafka_config.c @@ -93,6 +93,9 @@ struct flb_out_kafka *flb_out_kafka_create(struct flb_output_instance *ins, ctx->format = FLB_KAFKA_FMT_AVRO; } #endif + else if (strcasecmp(ctx->format_str, "raw") == 0) { + ctx->format = FLB_KAFKA_FMT_RAW; + } } else { ctx->format = FLB_KAFKA_FMT_JSON; @@ -114,6 +117,14 @@ struct flb_out_kafka *flb_out_kafka_create(struct flb_output_instance *ins, ctx->message_key_field_len = 0; } + /* Config: Log_Key */ + if (ctx->raw_log_key) { + ctx->raw_log_key_len = strlen(ctx->raw_log_key); + } + else { + ctx->raw_log_key_len = 0; + } + /* Config: Timestamp_Key */ if (ctx->timestamp_key) { ctx->timestamp_key_len = strlen(ctx->timestamp_key); diff --git a/plugins/out_kafka/kafka_config.h b/plugins/out_kafka/kafka_config.h index 42af378e161..14e036f8184 100644 --- a/plugins/out_kafka/kafka_config.h +++ b/plugins/out_kafka/kafka_config.h @@ -34,6 +34,7 @@ #ifdef FLB_HAVE_AVRO_ENCODER #define FLB_KAFKA_FMT_AVRO 3 #endif +#define FLB_KAFKA_FMT_RAW 4 #define FLB_KAFKA_TS_KEY "@timestamp" #define FLB_KAFKA_QUEUE_FULL_RETRIES "10" @@ -80,6 +81,9 @@ struct flb_out_kafka { int message_key_field_len; char *message_key_field; + int raw_log_key_len; + char *raw_log_key; + /* Gelf Keys */ struct flb_gelf_fields gelf_fields;