diff --git a/plugins/out_kafka/kafka.c b/plugins/out_kafka/kafka.c index 55539d3a4c8..2205066af25 100644 --- a/plugins/out_kafka/kafka.c +++ b/plugins/out_kafka/kafka.c @@ -71,8 +71,8 @@ int produce_message(struct flb_time *tm, msgpack_object *map, int i; int ret; int size; - char *json_buf; - size_t json_size; + char *out_buf; + size_t out_size; struct flb_kafka_topic *topic = NULL; msgpack_sbuffer mp_sbuf; msgpack_packer mp_pck; @@ -111,12 +111,18 @@ int produce_message(struct flb_time *tm, msgpack_object *map, } } - ret = flb_msgpack_raw_to_json_str(mp_sbuf.data, mp_sbuf.size, - &json_buf, &json_size); - if (ret != 0) { - flb_error("[out_kafka] error encoding to JSON"); - msgpack_sbuffer_destroy(&mp_sbuf); - return FLB_ERROR; + if (ctx->format == FLB_KAFKA_FMT_JSON) { + ret = flb_msgpack_raw_to_json_str(mp_sbuf.data, mp_sbuf.size, + &out_buf, &out_size); + if (ret != 0) { + flb_error("[out_kafka] error encoding to JSON"); + msgpack_sbuffer_destroy(&mp_sbuf); + return FLB_ERROR; + } + } + else if (ctx->format == FLB_KAFKA_FMT_MSGP) { + out_buf = mp_sbuf.data; + out_size = mp_sbuf.size; } if (!topic) { @@ -131,7 +137,7 @@ int produce_message(struct flb_time *tm, msgpack_object *map, ret = rd_kafka_produce(topic->tp, RD_KAFKA_PARTITION_UA, RD_KAFKA_MSG_F_COPY, - json_buf, json_size, + out_buf, out_size, ctx->message_key, ctx->message_key_len, NULL); if (ret == -1) { @@ -145,7 +151,9 @@ int produce_message(struct flb_time *tm, msgpack_object *map, rd_kafka_poll(ctx->producer, -1); } - flb_free(json_buf); + if (ctx->format == FLB_KAFKA_FMT_JSON) { + flb_free(out_buf); + } msgpack_sbuffer_destroy(&mp_sbuf); return FLB_OK; } diff --git a/plugins/out_kafka/kafka_config.c b/plugins/out_kafka/kafka_config.c index 6f6a5eaee0b..da2795b9acc 100644 --- a/plugins/out_kafka/kafka_config.c +++ b/plugins/out_kafka/kafka_config.c @@ -85,7 +85,6 @@ struct flb_kafka *flb_kafka_conf_create(struct flb_output_instance *ins, /* Callback: log */ rd_kafka_conf_set_log_cb(ctx->conf, cb_kafka_logger); - /* Config: Topic_Key */ tmp = flb_output_get_property("topic_key", ins); if (tmp) { @@ -96,6 +95,20 @@ struct flb_kafka *flb_kafka_conf_create(struct flb_output_instance *ins, ctx->topic_key = NULL; } + /* Config: Format */ + tmp = flb_output_get_property("format", ins); + if (tmp) { + if (strcasecmp(tmp, "json") == 0) { + ctx->format = FLB_KAFKA_FMT_JSON; + } + else if (strcasecmp(tmp, "msgpack") == 0) { + ctx->format = FLB_KAFKA_FMT_MSGP; + } + } + else { + ctx->format = FLB_KAFKA_FMT_JSON; + } + /* Config: Message_Key */ tmp = flb_output_get_property("message_key", ins); if (tmp) { @@ -172,9 +185,6 @@ int flb_kafka_conf_destroy(struct flb_kafka *ctx) if (ctx->producer) { rd_kafka_destroy(ctx->producer); } - if (ctx->conf) { - //rd_kafka_conf_destroy(ctx->conf); - } if (ctx->topic_key) { flb_free(ctx->topic_key); diff --git a/plugins/out_kafka/kafka_config.h b/plugins/out_kafka/kafka_config.h index 1416938958b..e60f5dc0664 100644 --- a/plugins/out_kafka/kafka_config.h +++ b/plugins/out_kafka/kafka_config.h @@ -25,6 +25,8 @@ #include "rdkafka.h" +#define FLB_KAFKA_FMT_JSON 0 +#define FLB_KAFKA_FMT_MSGP 1 #define FLB_KAFKA_BROKERS "127.0.0.1" #define FLB_KAFKA_TOPIC "fluent-bit" #define FLB_KAFKA_TS_KEY "@timestamp" @@ -38,6 +40,7 @@ struct flb_kafka_topic { struct flb_kafka { /* Config Parameters */ + int format; char *brokers; /* Optional topic key for routing */