Skip to content

Commit

Permalink
Add support to set otel fields from the event body based on keys.
Browse files Browse the repository at this point in the history
Signed-off-by: Boslet, Cory (cb645j) <[email protected]>
  • Loading branch information
cb645j committed Mar 29, 2024
1 parent 20c461a commit 240f096
Show file tree
Hide file tree
Showing 3 changed files with 171 additions and 0 deletions.
126 changes: 126 additions & 0 deletions plugins/out_opentelemetry/opentelemetry.c
Original file line number Diff line number Diff line change
Expand Up @@ -1080,6 +1080,110 @@ static int append_v1_logs_metadata(struct opentelemetry_context *ctx,
return 0;
}

static int append_v1_logs_message(struct opentelemetry_context *ctx,
struct flb_log_event *event,
Opentelemetry__Proto__Logs__V1__LogRecord *log_record)
{
struct flb_ra_value *ra_val;

if (ctx == NULL || event == NULL || log_record == NULL) {
return -1;
}

/* SeverityText */
if (ctx->ra_severity_text_message) {
ra_val = flb_ra_get_value_object(ctx->ra_severity_text_message, *event->body);
if (ra_val != NULL && ra_val->o.type == MSGPACK_OBJECT_STR) {
if(is_valid_severity_text(ra_val->o.via.str.ptr, ra_val->o.via.str.size) == FLB_TRUE){
log_record->severity_text = flb_calloc(1, ra_val->o.via.str.size+1);
if (log_record->severity_text) {
strncpy(log_record->severity_text, ra_val->o.via.str.ptr, ra_val->o.via.str.size);
}
flb_ra_key_value_destroy(ra_val);
}else{
flb_plg_warn(ctx->ins, "Unable to process %s. Invalid Severity Text.\n", ctx->ra_severity_text_message->pattern);
log_record->severity_text = NULL;
}
}
else {
/* To prevent invalid free */
log_record->severity_text = NULL;
}
}

/* SeverityNumber */
if (ctx->ra_severity_number_message) {
ra_val = flb_ra_get_value_object(ctx->ra_severity_number_metadata, *event->body);
if (ra_val != NULL && ra_val->o.type == MSGPACK_OBJECT_POSITIVE_INTEGER &&
is_valid_severity_number(ra_val->o.via.u64) == FLB_TRUE) {
log_record->severity_number = ra_val->o.via.u64;
flb_ra_key_value_destroy(ra_val);
}
}else if(ctx->ra_severity_text_message){
//TODO get sev number based off sev text
}

/* SpanId */
if (ctx->ra_span_id_message) {
ra_val = flb_ra_get_value_object(ctx->ra_span_id_message, *event->body);
if (ra_val != NULL) {
if(ra_val->o.type == MSGPACK_OBJECT_BIN){
log_record->span_id.data = flb_calloc(1, ra_val->o.via.bin.size);
if (log_record->span_id.data) {
memcpy(log_record->span_id.data, ra_val->o.via.bin.ptr, ra_val->o.via.bin.size);
log_record->span_id.len = ra_val->o.via.bin.size;
}
}else if(ra_val->o.type == MSGPACK_OBJECT_STR){
log_record->span_id.data = flb_calloc(8, sizeof(uint8_t));
if (log_record->span_id.data) {
// Convert to a byte array
uint8_t val[8];
for(size_t count = 0; count < sizeof val/sizeof *val; count++ ){
sscanf(ra_val->o.via.str.ptr, "%2hhx", &val[count]);
ra_val->o.via.str.ptr+=2;
}
memcpy(log_record->span_id.data, val, sizeof(val));
log_record->span_id.len = sizeof(val);
}
}else{
flb_plg_warn(ctx->ins, "Unable to process %s. Unsupported data type.\n", ctx->ra_span_id_message->pattern);
}
flb_ra_key_value_destroy(ra_val);
}
}

/* TraceId */
if (ctx->ra_trace_id_message) {
ra_val = flb_ra_get_value_object(ctx->ra_trace_id_message, *event->body);
if (ra_val != NULL) {
if(ra_val->o.type == MSGPACK_OBJECT_BIN){
log_record->trace_id.data = flb_calloc(1, ra_val->o.via.bin.size);
if (log_record->trace_id.data) {
memcpy(log_record->trace_id.data, ra_val->o.via.bin.ptr, ra_val->o.via.bin.size);
log_record->trace_id.len = ra_val->o.via.bin.size;
}
}else if(ra_val->o.type == MSGPACK_OBJECT_STR){
log_record->trace_id.data = flb_calloc(16, sizeof(uint8_t));
if (log_record->trace_id.data) {
// Convert from hexdec string to a 16 byte array
uint8_t val[16];
for(size_t count = 0; count < sizeof val/sizeof *val; count++ ){
sscanf(ra_val->o.via.str.ptr, "%2hhx", &val[count]);
ra_val->o.via.str.ptr+=2;
}
memcpy(log_record->trace_id.data, val, sizeof(val));
log_record->trace_id.len = sizeof(val);
}
}else{
flb_plg_warn(ctx->ins, "Unable to process %s. Unsupported data type.\n", ctx->ra_trace_id_message->pattern);
}
flb_ra_key_value_destroy(ra_val);
}
}

return 0;
}

static int process_logs(struct flb_event_chunk *event_chunk,
struct flb_output_flush *out_flush,
struct flb_input_instance *ins, void *out_context,
Expand Down Expand Up @@ -1158,6 +1262,8 @@ static int process_logs(struct flb_event_chunk *event_chunk,

append_v1_logs_metadata(ctx, &event, &log_records[log_record_count]);

append_v1_logs_message(ctx, &event, &log_records[log_record_count]);

ret = FLB_OK;

log_records[log_record_count].time_unix_nano = flb_time_to_nanosec(&event.timestamp);
Expand Down Expand Up @@ -1544,6 +1650,26 @@ static struct flb_config_map config_map[] = {
0, FLB_TRUE, offsetof(struct opentelemetry_context, logs_resource_metadata_key),
"Specify a Resource key"
},
{
FLB_CONFIG_MAP_STR, "logs_span_id_message_key", "$SpanId",
0, FLB_TRUE, offsetof(struct opentelemetry_context, logs_span_id_message_key),
"Specify a SpanId key"
},
{
FLB_CONFIG_MAP_STR, "logs_trace_id_message_key", "$TraceId",
0, FLB_TRUE, offsetof(struct opentelemetry_context, logs_trace_id_message_key),
"Specify a TraceId key"
},
{
FLB_CONFIG_MAP_STR, "logs_severity_text_message_key", "$SeverityText",
0, FLB_TRUE, offsetof(struct opentelemetry_context, logs_severity_text_message_key),
"Specify a Severity Text key"
},
{
FLB_CONFIG_MAP_STR, "logs_severity_number_message_key", "$SeverityNumber",
0, FLB_TRUE, offsetof(struct opentelemetry_context, logs_severity_number_message_key),
"Specify a Severity Number key"
},

/* EOF */
{0}
Expand Down
13 changes: 13 additions & 0 deletions plugins/out_opentelemetry/opentelemetry.h
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,19 @@ struct opentelemetry_context {
flb_sds_t logs_instrumentation_scope_metadata_key;
flb_sds_t logs_resource_metadata_key;

/* otel body keys */
flb_sds_t logs_span_id_message_key;
struct flb_record_accessor *ra_span_id_message;

flb_sds_t logs_trace_id_message_key;
struct flb_record_accessor *ra_trace_id_message;

flb_sds_t logs_severity_text_message_key;
struct flb_record_accessor *ra_severity_text_message;

flb_sds_t logs_severity_number_message_key;
struct flb_record_accessor *ra_severity_number_message;

/* Number of logs to flush at a time */
int batch_size;

Expand Down
32 changes: 32 additions & 0 deletions plugins/out_opentelemetry/opentelemetry_conf.c
Original file line number Diff line number Diff line change
Expand Up @@ -420,6 +420,26 @@ struct opentelemetry_context *flb_opentelemetry_context_create(struct flb_output
if (ctx->ra_attributes_metadata == NULL) {
flb_plg_error(ins, "failed to create ra for attributes");
}
ctx->ra_span_id_message = flb_ra_create((char*)ctx->logs_span_id_message_key,
FLB_FALSE);
if (ctx->ra_span_id_message == NULL) {
flb_plg_error(ins, "failed to create ra for message span id");
}
ctx->ra_trace_id_message = flb_ra_create((char*)ctx->logs_trace_id_message_key,
FLB_FALSE);
if (ctx->ra_trace_id_message == NULL) {
flb_plg_error(ins, "failed to create ra for message trace id");
}
ctx->ra_severity_text_message = flb_ra_create((char*)ctx->logs_severity_text_message_key,
FLB_FALSE);
if (ctx->ra_severity_text_message == NULL) {
flb_plg_error(ins, "failed to create ra for message severity text");
}
ctx->ra_severity_number_message = flb_ra_create((char*)ctx->logs_severity_number_message_key,
FLB_FALSE);
if (ctx->ra_severity_number_message == NULL) {
flb_plg_error(ins, "failed to create ra for message severity number");
}

return ctx;
}
Expand Down Expand Up @@ -466,6 +486,18 @@ void flb_opentelemetry_context_destroy(struct opentelemetry_context *ctx)
if (ctx->ra_attributes_metadata) {
flb_ra_destroy(ctx->ra_attributes_metadata);
}
if (ctx->ra_span_id_message) {
flb_ra_destroy(ctx->ra_span_id_message);
}
if (ctx->ra_trace_id_message) {
flb_ra_destroy(ctx->ra_trace_id_message);
}
if (ctx->ra_severity_text_message) {
flb_ra_destroy(ctx->ra_severity_text_message);
}
if (ctx->ra_severity_number_message) {
flb_ra_destroy(ctx->ra_severity_number_message);
}

flb_free(ctx->proxy_host);
flb_free(ctx);
Expand Down

0 comments on commit 240f096

Please sign in to comment.