From bc6d7fa59d1030a95e56ce3518e7343e6f9cbc08 Mon Sep 17 00:00:00 2001 From: Takahiro Yamashita Date: Mon, 27 Sep 2021 16:26:13 +0900 Subject: [PATCH] in_emitter: write msgpack buffer directly(#4049) In previous implementation, in_emitter has 2 buffers. - 1. rewrite_tag -> in_emitter_add_record -> msgpack_sbuffer_write - 2. (timer thread. every 0.5 sec) cb_queue_chunks -> flb_input_chunk_append_raw 'mem_buf_limit' is for flb_input_chunk API, so the thread 1 doesn't have limits. The patch is to modify writing sequence. rewrite_tag -> in_emitter_add_record -> flb_input_chunk_append_raw Signed-off-by: Takahiro Yamashita --- plugins/in_emitter/emitter.c | 79 +++++++----------------------------- 1 file changed, 15 insertions(+), 64 deletions(-) diff --git a/plugins/in_emitter/emitter.c b/plugins/in_emitter/emitter.c index 5e4cb1926e4..c67a70fff92 100644 --- a/plugins/in_emitter/emitter.c +++ b/plugins/in_emitter/emitter.c @@ -36,7 +36,6 @@ struct em_chunk { }; struct flb_emitter { - int coll_fd; /* collector id */ struct mk_list chunks; /* list of all pending chunks */ struct flb_input_instance *ins; /* input instance */ }; @@ -87,6 +86,7 @@ int in_emitter_add_record(const char *tag, int tag_len, struct mk_list *head; struct em_chunk *ec = NULL; struct flb_emitter *ctx; + int ret; ctx = (struct flb_emitter *) in->context; @@ -112,47 +112,21 @@ int in_emitter_add_record(const char *tag, int tag_len, /* Append raw msgpack data */ msgpack_sbuffer_write(&ec->mp_sbuf, buf_data, buf_size); - return 0; -} - -int in_emitter_get_collector_id(struct flb_input_instance *in) -{ - struct flb_emitter *ctx = (struct flb_emitter *) in->context; - - return ctx->coll_fd; -} - -static int cb_queue_chunks(struct flb_input_instance *in, - struct flb_config *config, void *data) -{ - int ret; - struct mk_list *tmp; - struct mk_list *head; - struct em_chunk *echunk; - struct flb_emitter *ctx; - - /* Get context */ - ctx = (struct flb_emitter *) data; - - /* Try to enqueue chunks under our limits */ - mk_list_foreach_safe(head, tmp, &ctx->chunks) { - echunk = mk_list_entry(head, struct em_chunk, _head); - - /* Associate this backlog chunk to this instance into the engine */ - ret = flb_input_chunk_append_raw(in, - echunk->tag, flb_sds_len(echunk->tag), - echunk->mp_sbuf.data, - echunk->mp_sbuf.size); - if (ret == -1) { - flb_plg_error(ctx->ins, "error registering chunk with tag: %s", - echunk->tag); - continue; - } + /* Associate this backlog chunk to this instance into the engine */ + ret = flb_input_chunk_append_raw(in, + ec->tag, flb_sds_len(ec->tag), + ec->mp_sbuf.data, + ec->mp_sbuf.size); + if (ret == -1) { + flb_plg_error(ctx->ins, "error registering chunk with tag: %s", + ec->tag); /* Release the echunk */ - em_chunk_destroy(echunk); + em_chunk_destroy(ec); + return -1; } - + /* Release the echunk */ + em_chunk_destroy(ec); return 0; } @@ -160,7 +134,6 @@ static int cb_queue_chunks(struct flb_input_instance *in, static int cb_emitter_init(struct flb_input_instance *in, struct flb_config *config, void *data) { - int ret; struct flb_emitter *ctx; ctx = flb_malloc(sizeof(struct flb_emitter)); @@ -174,29 +147,9 @@ static int cb_emitter_init(struct flb_input_instance *in, /* export plugin context */ flb_input_set_context(in, ctx); - /* Set a collector to trigger the callback to queue data every 0.5 second */ - ret = flb_input_set_collector_time(in, cb_queue_chunks, 0, 50000000, config); - if (ret < 0) { - flb_plg_error(ctx->ins, "could not create collector"); - flb_free(ctx); - return -1; - } - ctx->coll_fd = ret; return 0; } -static void cb_emitter_pause(void *data, struct flb_config *config) -{ - struct flb_emitter *ctx = data; - flb_input_collector_pause(ctx->coll_fd, ctx->ins); -} - -static void cb_emitter_resume(void *data, struct flb_config *config) -{ - struct flb_emitter *ctx = data; - flb_input_collector_resume(ctx->coll_fd, ctx->ins); -} - static int cb_emitter_exit(void *data, struct flb_config *config) { struct mk_list *tmp; @@ -204,8 +157,6 @@ static int cb_emitter_exit(void *data, struct flb_config *config) struct flb_emitter *ctx = data; struct em_chunk *echunk; - flb_input_collector_pause(ctx->coll_fd, ctx->ins); - mk_list_foreach_safe(head, tmp, &ctx->chunks) { echunk = mk_list_entry(head, struct em_chunk, _head); mk_list_del(&echunk->_head); @@ -225,8 +176,8 @@ struct flb_input_plugin in_emitter_plugin = { .cb_collect = NULL, .cb_ingest = NULL, .cb_flush_buf = NULL, - .cb_pause = cb_emitter_pause, - .cb_resume = cb_emitter_resume, + .cb_pause = NULL, + .cb_resume = NULL, .cb_exit = cb_emitter_exit, /* This plugin can only be configured and invoked by the Engine only */