diff --git a/plugins/in_opentelemetry/opentelemetry_prot.c b/plugins/in_opentelemetry/opentelemetry_prot.c index d73f82d87ce..00a251254a8 100644 --- a/plugins/in_opentelemetry/opentelemetry_prot.c +++ b/plugins/in_opentelemetry/opentelemetry_prot.c @@ -24,6 +24,7 @@ #include #include #include +#include #include #include @@ -396,6 +397,7 @@ static int binary_payload_to_msgpack(struct flb_log_event_encoder *encoder, int resource_logs_index; int scope_log_index; int log_record_index; + struct flb_mp_map_header mh; Opentelemetry__Proto__Collector__Logs__V1__ExportLogsServiceRequest *input_logs; Opentelemetry__Proto__Logs__V1__ScopeLogs **scope_logs; @@ -403,6 +405,7 @@ static int binary_payload_to_msgpack(struct flb_log_event_encoder *encoder, Opentelemetry__Proto__Logs__V1__ResourceLogs **resource_logs; Opentelemetry__Proto__Logs__V1__ResourceLogs *resource_log; Opentelemetry__Proto__Logs__V1__LogRecord **log_records; + Opentelemetry__Proto__Resource__V1__Resource *resource; msgpack_sbuffer_init(&buffer); msgpack_packer_init(&packer, &buffer, msgpack_sbuffer_write); @@ -423,6 +426,7 @@ static int binary_payload_to_msgpack(struct flb_log_event_encoder *encoder, for (resource_logs_index = 0; resource_logs_index < input_logs->n_resource_logs; resource_logs_index++) { resource_log = resource_logs[resource_logs_index]; + resource = resource_log->resource; scope_logs = resource_log->scope_logs; if (resource_log->n_scope_logs > 0 && scope_logs == NULL) { @@ -449,6 +453,40 @@ static int binary_payload_to_msgpack(struct flb_log_event_encoder *encoder, } if (ret == FLB_EVENT_ENCODER_SUCCESS) { + flb_mp_map_header_init(&mh, &packer); + + /* pack resource */ + flb_mp_map_header_append(&mh); + msgpack_pack_str(&packer, 8); + msgpack_pack_str_body(&packer, "resource", 8); + if (resource != NULL) { + msgpack_pack_map(&packer, 1); + + msgpack_pack_str(&packer, 10); + msgpack_pack_str_body(&packer, "attributes", 10); + ret = otel_pack_kvarray( + &packer, + resource->attributes, + resource->n_attributes); + } + else { + msgpack_pack_map(&packer, 0); + } + + if (ret != 0) { + flb_error("[otel] Failed to convert log resource attributes"); + msgpack_sbuffer_destroy(&buffer); + return -1; + } + + /* pack logRecords */ + flb_mp_map_header_append(&mh); + msgpack_pack_str(&packer, 10); + msgpack_pack_str_body(&packer, "logRecords", 10); + + msgpack_pack_map(&packer, 1); + msgpack_pack_str(&packer, 10); + msgpack_pack_str_body(&packer, "attributes", 10); ret = otel_pack_kvarray( &packer, log_records[log_record_index]->attributes, @@ -460,6 +498,7 @@ static int binary_payload_to_msgpack(struct flb_log_event_encoder *encoder, ret = FLB_EVENT_ENCODER_ERROR_SERIALIZATION_FAILURE; } else { + flb_mp_map_header_end(&mh); ret = flb_log_event_encoder_set_metadata_from_raw_msgpack( encoder, buffer.data,