Skip to content

Commit

Permalink
out_kafka: new 'format' parameter, add support for MessagePack
Browse files Browse the repository at this point in the history
Signed-off-by: Eduardo Silva <[email protected]>
  • Loading branch information
edsiper committed Nov 17, 2017
1 parent 95d1491 commit 64c9cfe
Show file tree
Hide file tree
Showing 3 changed files with 35 additions and 14 deletions.
28 changes: 18 additions & 10 deletions plugins/out_kafka/kafka.c
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,8 @@ int produce_message(struct flb_time *tm, msgpack_object *map,
int i;
int ret;
int size;
char *json_buf;
size_t json_size;
char *out_buf;
size_t out_size;
struct flb_kafka_topic *topic = NULL;
msgpack_sbuffer mp_sbuf;
msgpack_packer mp_pck;
Expand Down Expand Up @@ -111,12 +111,18 @@ int produce_message(struct flb_time *tm, msgpack_object *map,
}
}

ret = flb_msgpack_raw_to_json_str(mp_sbuf.data, mp_sbuf.size,
&json_buf, &json_size);
if (ret != 0) {
flb_error("[out_kafka] error encoding to JSON");
msgpack_sbuffer_destroy(&mp_sbuf);
return FLB_ERROR;
if (ctx->format == FLB_KAFKA_FMT_JSON) {
ret = flb_msgpack_raw_to_json_str(mp_sbuf.data, mp_sbuf.size,
&out_buf, &out_size);
if (ret != 0) {
flb_error("[out_kafka] error encoding to JSON");
msgpack_sbuffer_destroy(&mp_sbuf);
return FLB_ERROR;
}
}
else if (ctx->format == FLB_KAFKA_FMT_MSGP) {
out_buf = mp_sbuf.data;
out_size = mp_sbuf.size;
}

if (!topic) {
Expand All @@ -131,7 +137,7 @@ int produce_message(struct flb_time *tm, msgpack_object *map,
ret = rd_kafka_produce(topic->tp,
RD_KAFKA_PARTITION_UA,
RD_KAFKA_MSG_F_COPY,
json_buf, json_size,
out_buf, out_size,
ctx->message_key, ctx->message_key_len,
NULL);
if (ret == -1) {
Expand All @@ -145,7 +151,9 @@ int produce_message(struct flb_time *tm, msgpack_object *map,
rd_kafka_poll(ctx->producer, -1);
}

flb_free(json_buf);
if (ctx->format == FLB_KAFKA_FMT_JSON) {
flb_free(out_buf);
}
msgpack_sbuffer_destroy(&mp_sbuf);
return FLB_OK;
}
Expand Down
18 changes: 14 additions & 4 deletions plugins/out_kafka/kafka_config.c
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,6 @@ struct flb_kafka *flb_kafka_conf_create(struct flb_output_instance *ins,
/* Callback: log */
rd_kafka_conf_set_log_cb(ctx->conf, cb_kafka_logger);


/* Config: Topic_Key */
tmp = flb_output_get_property("topic_key", ins);
if (tmp) {
Expand All @@ -96,6 +95,20 @@ struct flb_kafka *flb_kafka_conf_create(struct flb_output_instance *ins,
ctx->topic_key = NULL;
}

/* Config: Format */
tmp = flb_output_get_property("format", ins);
if (tmp) {
if (strcasecmp(tmp, "json") == 0) {
ctx->format = FLB_KAFKA_FMT_JSON;
}
else if (strcasecmp(tmp, "msgpack") == 0) {
ctx->format = FLB_KAFKA_FMT_MSGP;
}
}
else {
ctx->format = FLB_KAFKA_FMT_JSON;
}

/* Config: Message_Key */
tmp = flb_output_get_property("message_key", ins);
if (tmp) {
Expand Down Expand Up @@ -172,9 +185,6 @@ int flb_kafka_conf_destroy(struct flb_kafka *ctx)
if (ctx->producer) {
rd_kafka_destroy(ctx->producer);
}
if (ctx->conf) {
//rd_kafka_conf_destroy(ctx->conf);
}

if (ctx->topic_key) {
flb_free(ctx->topic_key);
Expand Down
3 changes: 3 additions & 0 deletions plugins/out_kafka/kafka_config.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@

#include "rdkafka.h"

#define FLB_KAFKA_FMT_JSON 0
#define FLB_KAFKA_FMT_MSGP 1
#define FLB_KAFKA_BROKERS "127.0.0.1"
#define FLB_KAFKA_TOPIC "fluent-bit"
#define FLB_KAFKA_TS_KEY "@timestamp"
Expand All @@ -38,6 +40,7 @@ struct flb_kafka_topic {

struct flb_kafka {
/* Config Parameters */
int format;
char *brokers;

/* Optional topic key for routing */
Expand Down

0 comments on commit 64c9cfe

Please sign in to comment.