Skip to content

Commit

Permalink
in_opentelemetry: support to add resource of log
Browse files Browse the repository at this point in the history
Signed-off-by: Takahiro Yamashita <[email protected]>
  • Loading branch information
nokute78 committed Dec 23, 2023
1 parent 34a2cd0 commit 2f9e2c4
Showing 1 changed file with 39 additions and 0 deletions.
39 changes: 39 additions & 0 deletions plugins/in_opentelemetry/opentelemetry_prot.c
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
#include <fluent-bit/flb_time.h>
#include <fluent-bit/flb_gzip.h>
#include <fluent-bit/flb_snappy.h>
#include <fluent-bit/flb_mp.h>
#include <fluent-bit/flb_log_event_encoder.h>

#include <monkey/monkey.h>
Expand Down Expand Up @@ -396,13 +397,15 @@ static int binary_payload_to_msgpack(struct flb_log_event_encoder *encoder,
int resource_logs_index;
int scope_log_index;
int log_record_index;
struct flb_mp_map_header mh;

Opentelemetry__Proto__Collector__Logs__V1__ExportLogsServiceRequest *input_logs;
Opentelemetry__Proto__Logs__V1__ScopeLogs **scope_logs;
Opentelemetry__Proto__Logs__V1__ScopeLogs *scope_log;
Opentelemetry__Proto__Logs__V1__ResourceLogs **resource_logs;
Opentelemetry__Proto__Logs__V1__ResourceLogs *resource_log;
Opentelemetry__Proto__Logs__V1__LogRecord **log_records;
Opentelemetry__Proto__Resource__V1__Resource *resource;

msgpack_sbuffer_init(&buffer);
msgpack_packer_init(&packer, &buffer, msgpack_sbuffer_write);
Expand All @@ -423,6 +426,7 @@ static int binary_payload_to_msgpack(struct flb_log_event_encoder *encoder,

for (resource_logs_index = 0; resource_logs_index < input_logs->n_resource_logs; resource_logs_index++) {
resource_log = resource_logs[resource_logs_index];
resource = resource_log->resource;
scope_logs = resource_log->scope_logs;

if (resource_log->n_scope_logs > 0 && scope_logs == NULL) {
Expand All @@ -449,6 +453,40 @@ static int binary_payload_to_msgpack(struct flb_log_event_encoder *encoder,
}

if (ret == FLB_EVENT_ENCODER_SUCCESS) {
flb_mp_map_header_init(&mh, &packer);

/* pack resource */
flb_mp_map_header_append(&mh);
msgpack_pack_str(&packer, 8);
msgpack_pack_str_body(&packer, "resource", 8);
if (resource != NULL) {
msgpack_pack_map(&packer, 1);

msgpack_pack_str(&packer, 10);
msgpack_pack_str_body(&packer, "attributes", 10);
ret = otel_pack_kvarray(
&packer,
resource->attributes,
resource->n_attributes);
}
else {
msgpack_pack_map(&packer, 0);
}

if (ret != 0) {
flb_error("[otel] Failed to convert log resource attributes");
msgpack_sbuffer_destroy(&buffer);
return -1;
}

/* pack logRecords */
flb_mp_map_header_append(&mh);
msgpack_pack_str(&packer, 10);
msgpack_pack_str_body(&packer, "logRecords", 10);

msgpack_pack_map(&packer, 1);
msgpack_pack_str(&packer, 10);
msgpack_pack_str_body(&packer, "attributes", 10);
ret = otel_pack_kvarray(
&packer,
log_records[log_record_index]->attributes,
Expand All @@ -460,6 +498,7 @@ static int binary_payload_to_msgpack(struct flb_log_event_encoder *encoder,
ret = FLB_EVENT_ENCODER_ERROR_SERIALIZATION_FAILURE;
}
else {
flb_mp_map_header_end(&mh);
ret = flb_log_event_encoder_set_metadata_from_raw_msgpack(
encoder,
buffer.data,
Expand Down

0 comments on commit 2f9e2c4

Please sign in to comment.