Skip to content

Commit

Permalink
in_kafka: switch to log_event_encoder from mpack
Browse files Browse the repository at this point in the history
Signed-off-by: Aditya Prajapati <[email protected]>
  • Loading branch information
Aditya Prajapati committed May 11, 2023
1 parent c323438 commit 258645b
Showing 1 changed file with 110 additions and 61 deletions.
171 changes: 110 additions & 61 deletions plugins/in_kafka/in_kafka.c
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,8 @@
#include "in_kafka.h"
#include "rdkafka.h"

static int try_json(mpack_writer_t *writer, rd_kafka_message_t *rkm)
static int try_json(struct flb_log_event_encoder *log_encoder,
rd_kafka_message_t *rkm)
{
int root_type;
char *buf = NULL;
Expand All @@ -50,111 +51,145 @@ static int try_json(mpack_writer_t *writer, rd_kafka_message_t *rkm)
}
return ret;
}
mpack_write_object_bytes(writer, buf, bufsize);
flb_log_event_encoder_append_body_binary_body(log_encoder, buf, bufsize);
flb_free(buf);
return 0;
}

static void process_message(mpack_writer_t *writer,
rd_kafka_message_t *rkm)
static int process_message(struct flb_log_event_encoder *log_encoder,
rd_kafka_message_t *rkm)
{
struct flb_time t;
int ret;

mpack_write_tag(writer, mpack_tag_array(2));
ret = flb_log_event_encoder_begin_record(log_encoder);

flb_time_get(&t);
flb_time_append_to_mpack(writer, &t, 0);
if (ret == FLB_EVENT_ENCODER_SUCCESS) {
ret = flb_log_event_encoder_set_current_timestamp(log_encoder);
}

mpack_write_tag(writer, mpack_tag_map(6));
if (ret == FLB_EVENT_ENCODER_SUCCESS) {
ret = flb_log_event_encoder_append_body_cstring(log_encoder, "topic");
}

mpack_write_cstr(writer, "topic");
if (rkm->rkt) {
mpack_write_cstr(writer, rd_kafka_topic_name(rkm->rkt));
if (ret == FLB_EVENT_ENCODER_SUCCESS) {
if (rkm->rkt) {
ret = flb_log_event_encoder_append_body_cstring(log_encoder,
rd_kafka_topic_name(rkm->rkt));
}
else {
ret = flb_log_event_encoder_append_body_null(log_encoder);
}
}
else {
mpack_write_nil(writer);

if (ret == FLB_EVENT_ENCODER_SUCCESS) {
ret = flb_log_event_encoder_append_body_values(log_encoder,
FLB_LOG_EVENT_CSTRING_VALUE("partition"),
FLB_LOG_EVENT_INT32_VALUE(rkm->partition));
}

mpack_write_cstr(writer, "partition");
mpack_write_i32(writer, rkm->partition);
if (ret == FLB_EVENT_ENCODER_SUCCESS) {
ret = flb_log_event_encoder_append_body_values(log_encoder,
FLB_LOG_EVENT_CSTRING_VALUE("offset"),
FLB_LOG_EVENT_INT64_VALUE(rkm->offset));
}

mpack_write_cstr(writer, "offset");
mpack_write_i64(writer, rkm->offset);
if (ret == FLB_EVENT_ENCODER_SUCCESS) {
ret = flb_log_event_encoder_append_body_cstring(log_encoder, "error");
}

mpack_write_cstr(writer, "error");
if (rkm->err) {
mpack_write_cstr(writer, rd_kafka_message_errstr(rkm));
if (ret == FLB_EVENT_ENCODER_SUCCESS) {
if (rkm->err) {
ret = flb_log_event_encoder_append_body_cstring(log_encoder,
rd_kafka_message_errstr(rkm));
}
else {
ret = flb_log_event_encoder_append_body_null(log_encoder);
}
}
else {
mpack_write_nil(writer);

if (ret == FLB_EVENT_ENCODER_SUCCESS) {
ret = flb_log_event_encoder_append_body_cstring(log_encoder, "key");
}

mpack_write_cstr(writer, "key");
if (rkm->key) {
mpack_write_str(writer, rkm->key, rkm->key_len);
if (ret == FLB_EVENT_ENCODER_SUCCESS) {
if (rkm->key) {
ret = flb_log_event_encoder_append_body_string(log_encoder,
rkm->key,
rkm->key_len);
}
else {
ret = flb_log_event_encoder_append_body_null(log_encoder);
}
}
else {
mpack_write_nil(writer);

if (ret == FLB_EVENT_ENCODER_SUCCESS) {
ret = flb_log_event_encoder_append_body_cstring(log_encoder, "payload");
}

mpack_write_cstr(writer, "payload");
if (rkm->payload) {
if (try_json(writer, rkm)) {
mpack_write_str(writer, rkm->payload, rkm->len);
if (ret == FLB_EVENT_ENCODER_SUCCESS) {
if (rkm->payload) {
if (try_json(log_encoder, rkm)) {
ret = flb_log_event_encoder_append_body_string(log_encoder,
rkm->payload,
rkm->len);
}
}
else {
ret = flb_log_event_encoder_append_body_null(log_encoder);
}
}
else {
mpack_write_nil(writer);

if (ret == FLB_EVENT_ENCODER_SUCCESS) {
ret = flb_log_event_encoder_commit_record(log_encoder);
}

mpack_writer_flush_message(writer);
if (ret != FLB_EVENT_ENCODER_SUCCESS) {
flb_log_event_encoder_rollback_record(log_encoder);
}

return ret;
}

static int in_kafka_collect(struct flb_input_instance *ins,
struct flb_config *config, void *in_context)
{
mpack_writer_t writer;
char *buf;
size_t bufsize;
size_t written = 0;
int ret;
struct flb_in_kafka_config *ctx = in_context;
rd_kafka_message_t *rkm;

mpack_writer_init_growable(&writer, &buf, &bufsize);

if (writer.error == mpack_error_memory) {
flb_plg_error(ins, "Failed to allocate buffer.");
return -1;
}
ret = FLB_EVENT_ENCODER_SUCCESS;

while (true) {
rd_kafka_message_t *rkm = rd_kafka_consumer_poll(ctx->kafka.rk, 1);
while (ret == FLB_EVENT_ENCODER_SUCCESS) {
rkm = rd_kafka_consumer_poll(ctx->kafka.rk, 1);

if (!rkm) {
break;
}

flb_plg_debug(ins, "kafka message received");
process_message(&writer, rkm);

ret = process_message(ctx->log_encoder, rkm);

rd_kafka_message_destroy(rkm);
rd_kafka_commit(ctx->kafka.rk, NULL, 0);

if (writer.error == mpack_error_memory) {
flb_plg_error(ins, "Failed to allocate buffer.");
return -1;
}
// TO-DO: commit the record based on `ret`
rd_kafka_commit(ctx->kafka.rk, NULL, 0);
}

written = writer.position - writer.buffer;

if (written == 0) {
mpack_writer_destroy(&writer);
return -1;
if (ret == FLB_EVENT_ENCODER_SUCCESS) {
flb_input_log_append(ins, NULL, 0,
ctx->log_encoder->output_buffer,
ctx->log_encoder->output_length);
ret = 0;
}
else {
flb_plg_error(ins, "Error encoding record : %d", ret);
ret = -1;
}

flb_input_log_append(ins, NULL, 0, writer.buffer, written);
mpack_writer_destroy(&writer);
flb_log_event_encoder_reset(ctx->log_encoder);

return 0;
return ret;
}

/* Initialize plugin */
Expand Down Expand Up @@ -234,6 +269,15 @@ static int in_kafka_init(struct flb_input_instance *ins,
goto init_error;
}

ctx->log_encoder = flb_log_event_encoder_create(FLB_LOG_EVENT_FORMAT_DEFAULT);

if (ctx->log_encoder == NULL) {
flb_plg_error(ins, "could not initialize log encoder");
flb_log_event_encoder_destroy(ctx->log_encoder);

return -1;
}

return 0;

init_error:
Expand Down Expand Up @@ -264,6 +308,11 @@ static int in_kafka_exit(void *in_context, struct flb_config *config)
ctx = in_context;
rd_kafka_destroy(ctx->kafka.rk);
flb_free(ctx->kafka.brokers);

if (ctx->log_encoder){
flb_log_event_encoder_destroy(ctx->log_encoder);
}

flb_free(ctx);

return 0;
Expand Down

0 comments on commit 258645b

Please sign in to comment.