From 46bb7845817a37fef3fc01b9ebd54797c192419b Mon Sep 17 00:00:00 2001 From: Aditya Prajapati Date: Fri, 16 Dec 2022 22:08:52 +0530 Subject: [PATCH] out_opentelemetry: move variables to heap and sanitize batch size Signed-off-by: Aditya Prajapati --- plugins/out_opentelemetry/opentelemetry.c | 24 +++++++++++++++++++---- 1 file changed, 20 insertions(+), 4 deletions(-) diff --git a/plugins/out_opentelemetry/opentelemetry.c b/plugins/out_opentelemetry/opentelemetry.c index 01859acb6d0..4d7d072b2ec 100644 --- a/plugins/out_opentelemetry/opentelemetry.c +++ b/plugins/out_opentelemetry/opentelemetry.c @@ -295,7 +295,10 @@ static void clear_array(Opentelemetry__Proto__Logs__V1__LogRecord **logs, for (index = 0 ; index < log_count ; index++) { otlp_any_value_destroy(logs[index]->body); + flb_free(logs[index]); } + + flb_free(logs); } static Opentelemetry__Proto__Common__V1__ArrayValue *otlp_array_value_initialize(size_t entry_count) @@ -693,10 +696,16 @@ static int process_logs(struct flb_event_chunk *event_chunk, struct opentelemetry_context *ctx; ctx = out_context; - Opentelemetry__Proto__Logs__V1__LogRecord *log_record_list[ctx->batch_size]; - Opentelemetry__Proto__Logs__V1__LogRecord log_records[ctx->batch_size]; - Opentelemetry__Proto__Common__V1__AnyValue log_bodies[ctx->batch_size]; + // These were initially variable length arrays. + // However, having a high value for batch_size was causing memory + // issues with the event chunk being overwritten. Moving it to the heap + // solves these issues but we still do not know the root cause + + Opentelemetry__Proto__Logs__V1__LogRecord **log_record_list; + Opentelemetry__Proto__Logs__V1__LogRecord *log_records; + Opentelemetry__Proto__Common__V1__AnyValue *log_bodies; Opentelemetry__Proto__Common__V1__AnyValue *log_object; + size_t log_record_count; size_t index; msgpack_unpacked result; @@ -705,6 +714,9 @@ static int process_logs(struct flb_event_chunk *event_chunk, struct flb_time tm; int res = FLB_OK; + log_record_list = (Opentelemetry__Proto__Logs__V1__LogRecord *) flb_calloc(ctx->batch_size, sizeof(Opentelemetry__Proto__Logs__V1__LogRecord *)); + log_records = flb_calloc(ctx->batch_size, sizeof(Opentelemetry__Proto__Logs__V1__LogRecord)); + log_bodies = (Opentelemetry__Proto__Common__V1__AnyValue *) flb_calloc(ctx->batch_size, sizeof(Opentelemetry__Proto__Common__V1__AnyValue)); for(index = 0 ; index < ctx->batch_size ; index++) { opentelemetry__proto__logs__v1__log_record__init(&log_records[index]); @@ -713,7 +725,6 @@ static int process_logs(struct flb_event_chunk *event_chunk, log_records[index].body = &log_bodies[index]; log_record_list[index] = &log_records[index]; } - log_record_count = 0; msgpack_unpacked_init(&result); @@ -771,6 +782,7 @@ static int process_logs(struct flb_event_chunk *event_chunk, log_record_count = 0; } + flb_free(log_bodies); msgpack_unpacked_destroy(&result); return res; @@ -975,6 +987,10 @@ static int cb_opentelemetry_init(struct flb_output_instance *ins, return -1; } + if (ctx->batch_size <= 0){ + ctx->batch_size = atoi(DEFAULT_LOG_RECORD_BATCH_SIZE); + } + flb_output_set_context(ins, ctx); return 0;