Skip to content

Commit

Permalink
out_opentelemetry: move variables to heap and sanitize batch size
Browse files Browse the repository at this point in the history
Signed-off-by: Aditya Prajapati <[email protected]>
  • Loading branch information
Aditya Prajapati committed Dec 19, 2022
1 parent 898a8e7 commit 46bb784
Showing 1 changed file with 20 additions and 4 deletions.
24 changes: 20 additions & 4 deletions plugins/out_opentelemetry/opentelemetry.c
Original file line number Diff line number Diff line change
Expand Up @@ -295,7 +295,10 @@ static void clear_array(Opentelemetry__Proto__Logs__V1__LogRecord **logs,

for (index = 0 ; index < log_count ; index++) {
otlp_any_value_destroy(logs[index]->body);
flb_free(logs[index]);
}

flb_free(logs);
}

static Opentelemetry__Proto__Common__V1__ArrayValue *otlp_array_value_initialize(size_t entry_count)
Expand Down Expand Up @@ -693,10 +696,16 @@ static int process_logs(struct flb_event_chunk *event_chunk,
struct opentelemetry_context *ctx;
ctx = out_context;

Opentelemetry__Proto__Logs__V1__LogRecord *log_record_list[ctx->batch_size];
Opentelemetry__Proto__Logs__V1__LogRecord log_records[ctx->batch_size];
Opentelemetry__Proto__Common__V1__AnyValue log_bodies[ctx->batch_size];
// These were initially variable length arrays.
// However, having a high value for batch_size was causing memory
// issues with the event chunk being overwritten. Moving it to the heap
// solves these issues but we still do not know the root cause

Opentelemetry__Proto__Logs__V1__LogRecord **log_record_list;
Opentelemetry__Proto__Logs__V1__LogRecord *log_records;
Opentelemetry__Proto__Common__V1__AnyValue *log_bodies;
Opentelemetry__Proto__Common__V1__AnyValue *log_object;

size_t log_record_count;
size_t index;
msgpack_unpacked result;
Expand All @@ -705,6 +714,9 @@ static int process_logs(struct flb_event_chunk *event_chunk,
struct flb_time tm;
int res = FLB_OK;

log_record_list = (Opentelemetry__Proto__Logs__V1__LogRecord *) flb_calloc(ctx->batch_size, sizeof(Opentelemetry__Proto__Logs__V1__LogRecord *));
log_records = flb_calloc(ctx->batch_size, sizeof(Opentelemetry__Proto__Logs__V1__LogRecord));
log_bodies = (Opentelemetry__Proto__Common__V1__AnyValue *) flb_calloc(ctx->batch_size, sizeof(Opentelemetry__Proto__Common__V1__AnyValue));

for(index = 0 ; index < ctx->batch_size ; index++) {
opentelemetry__proto__logs__v1__log_record__init(&log_records[index]);
Expand All @@ -713,7 +725,6 @@ static int process_logs(struct flb_event_chunk *event_chunk,
log_records[index].body = &log_bodies[index];
log_record_list[index] = &log_records[index];
}

log_record_count = 0;

msgpack_unpacked_init(&result);
Expand Down Expand Up @@ -771,6 +782,7 @@ static int process_logs(struct flb_event_chunk *event_chunk,
log_record_count = 0;
}

flb_free(log_bodies);
msgpack_unpacked_destroy(&result);

return res;
Expand Down Expand Up @@ -975,6 +987,10 @@ static int cb_opentelemetry_init(struct flb_output_instance *ins,
return -1;
}

if (ctx->batch_size <= 0){
ctx->batch_size = atoi(DEFAULT_LOG_RECORD_BATCH_SIZE);
}

flb_output_set_context(ins, ctx);

return 0;
Expand Down

0 comments on commit 46bb784

Please sign in to comment.