Skip to content

Commit

Permalink
in_emitter: added automatic threading detection workaround
Browse files Browse the repository at this point in the history
This patch uses the scheduler instance from the thread local storage
to determine if the filter creating the emitter instance is running
in an input plugins thread to transparently switch over to the ring
buffer based mechanism.

This is not a definitive solution, it's a workaround.

Signed-off-by: Leonardo Alminana <[email protected]>
  • Loading branch information
leonardo-albertovich authored May 21, 2023
1 parent e191803 commit c254d0f
Showing 1 changed file with 35 additions and 9 deletions.
44 changes: 35 additions & 9 deletions plugins/in_emitter/emitter.c
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,14 @@
#include <fluent-bit/flb_input_plugin.h>
#include <fluent-bit/flb_utils.h>
#include <fluent-bit/flb_sds.h>
#include <fluent-bit/flb_scheduler.h>
#include <fluent-bit/flb_ring_buffer.h>


#include <sys/types.h>
#include <sys/stat.h>

#define DEFAULT_EMITTER_RING_BUFFER_FLUSH_FREQUENCY 2000

struct em_chunk {
flb_sds_t tag;
struct msgpack_sbuffer mp_sbuf; /* msgpack sbuffer */
Expand Down Expand Up @@ -108,24 +110,34 @@ int in_emitter_add_record(const char *tag, int tag_len,
const char *buf_data, size_t buf_size,
struct flb_input_instance *in)
{
struct em_chunk temporary_chunk;
struct mk_list *head;
struct em_chunk *ec = NULL;
struct em_chunk *ec;
struct flb_emitter *ctx;
int ret;

ctx = (struct flb_emitter *) in->context;
ec = NULL;

/* Use the ring buffer first if it exists */
if (ctx->msgs) {
ec = flb_calloc(1, sizeof(struct em_chunk));
if (ec == NULL) {
memset(&temporary_chunk, 0, sizeof(struct em_chunk));

temporary_chunk.tag = flb_sds_create_len(tag, tag_len);

if (temporary_chunk.tag == NULL) {
flb_plg_error(ctx->ins,
"cannot allocate memory for tag: %s",
tag);
return -1;
}
ec->tag = flb_sds_create_len(tag, tag_len);
msgpack_sbuffer_write(&ec->mp_sbuf, buf_data, buf_size);
ret = flb_ring_buffer_write(ctx->msgs, (void *)ec, sizeof(struct em_chunk));
flb_free(ec);
return ret;

msgpack_sbuffer_init(&temporary_chunk.mp_sbuf);
msgpack_sbuffer_write(&temporary_chunk.mp_sbuf, buf_data, buf_size);

return flb_ring_buffer_write(ctx->msgs,
(void *) &temporary_chunk,
sizeof(struct em_chunk));
}

/* Check if any target chunk already exists */
Expand Down Expand Up @@ -207,9 +219,11 @@ static int in_emitter_start_ring_buffer(struct flb_input_instance *in, struct fl
static int cb_emitter_init(struct flb_input_instance *in,
struct flb_config *config, void *data)
{
struct flb_sched *scheduler;
struct flb_emitter *ctx;
int ret;

scheduler = flb_sched_ctx_get();

ctx = flb_calloc(1, sizeof(struct flb_emitter));
if (!ctx) {
Expand All @@ -225,6 +239,18 @@ static int cb_emitter_init(struct flb_input_instance *in,
return -1;
}

if (scheduler != config->sched &&
scheduler != NULL &&
ctx->ring_buffer_size == 0) {

ctx->ring_buffer_size = DEFAULT_EMITTER_RING_BUFFER_FLUSH_FREQUENCY;

flb_plg_debug(in,
"threaded emitter instances require ring_buffer_size"
" being set, using default value of %u",
ctx->ring_buffer_size);
}

if (ctx->ring_buffer_size > 0) {
ret = in_emitter_start_ring_buffer(in, ctx);
if (ret == -1) {
Expand Down

0 comments on commit c254d0f

Please sign in to comment.