diff --git a/plugins/out_opentelemetry/opentelemetry.c b/plugins/out_opentelemetry/opentelemetry.c index 509aa1d7c78..784b1d95a69 100644 --- a/plugins/out_opentelemetry/opentelemetry.c +++ b/plugins/out_opentelemetry/opentelemetry.c @@ -41,6 +41,7 @@ extern void cmt_encode_opentelemetry_destroy(cfl_sds_t text); #include "opentelemetry.h" #include "opentelemetry_conf.h" + static inline Opentelemetry__Proto__Common__V1__AnyValue *msgpack_object_to_otlp_any_value(struct msgpack_object *o); static inline void otlp_any_value_destroy(Opentelemetry__Proto__Common__V1__AnyValue *value); @@ -67,17 +68,19 @@ static inline void otlp_kvarray_destroy(Opentelemetry__Proto__Common__V1__KeyVal static inline void otlp_kvpair_destroy(Opentelemetry__Proto__Common__V1__KeyValue *kvpair) { - if (kvpair != NULL) { - if (kvpair->key != NULL) { - flb_free(kvpair->key); - } + if (kvpair == NULL) { + return; + } - if (kvpair->value != NULL) { - otlp_any_value_destroy(kvpair->value); - } + if (kvpair->key != NULL) { + flb_free(kvpair->key); + } - flb_free(kvpair); + if (kvpair->value != NULL) { + otlp_any_value_destroy(kvpair->value); } + + flb_free(kvpair); } static inline void otlp_kvlist_destroy(Opentelemetry__Proto__Common__V1__KeyValueList *kvlist) @@ -181,10 +184,12 @@ static int http_post(struct opentelemetry_context *ctx, if (ret == 0) { compressed = FLB_TRUE; - } else { + } + else { flb_plg_error(ctx->ins, "cannot gzip payload, disabling compression"); } - } else { + } + else { final_body = (void *) body; final_body_len = body_len; } @@ -280,10 +285,8 @@ static int http_post(struct opentelemetry_context *ctx, out_ret = FLB_RETRY; } else { - if (ctx->log_response_payload && - c->resp.payload != NULL && - c->resp.payload_size > 0) { - flb_plg_info(ctx->ins, "%s:%i, HTTP status=%i\n%.*s", + if (ctx->log_response_payload && c->resp.payload != NULL && c->resp.payload_size > 2) { + flb_plg_info(ctx->ins, "%s:%i, HTTP status=%i%.*s", ctx->host, ctx->port, c->resp.status, (int) c->resp.payload_size, @@ -654,7 +657,6 @@ static inline Opentelemetry__Proto__Common__V1__KeyValue **msgpack_map_to_otlp_k *entry_count = o->via.map.size; result = flb_calloc(*entry_count, sizeof(Opentelemetry__Proto__Common__V1__KeyValue *)); - if (result != NULL) { for (index = 0; index < *entry_count; index++) { kv = &o->via.map.ptr[index]; @@ -743,19 +745,159 @@ static inline Opentelemetry__Proto__Common__V1__AnyValue *msgpack_object_to_otlp return result; } +static inline int log_record_set_body(struct opentelemetry_context *ctx, + Opentelemetry__Proto__Logs__V1__LogRecord *log_records, struct flb_log_event *event, + struct flb_record_accessor **out_ra_match) +{ + int ret; + struct mk_list *head; + struct opentelemetry_body_key *bk; + msgpack_object *s_key = NULL; + msgpack_object *o_key = NULL; + msgpack_object *o_val = NULL; + Opentelemetry__Proto__Common__V1__AnyValue *log_object = NULL; + + *out_ra_match = NULL; + mk_list_foreach(head, &ctx->log_body_key_list) { + bk = mk_list_entry(head, struct opentelemetry_body_key, _head); + + ret = flb_ra_get_kv_pair(bk->ra, *event->body, &s_key, &o_key, &o_val); + if (ret == 0) { + log_object = msgpack_object_to_otlp_any_value(o_val); + + /* Link the record accessor pattern that matched */ + *out_ra_match = bk->ra; + break; + } + + log_object = NULL; + } + + /* At this point the record accessor patterns found nothing, so we just package the whole record */ + if (!log_object) { + log_object = msgpack_object_to_otlp_any_value(event->body); + } + + if (!log_object) { + flb_plg_error(ctx->ins, "log event conversion failure"); + return -1; + } + + /* try to find the following keys: message or log, if found */ + log_records->body = log_object; + return 0; +} + +static int log_record_set_attributes(struct opentelemetry_context *ctx, + Opentelemetry__Proto__Logs__V1__LogRecord *log_record, struct flb_log_event *event, + struct flb_record_accessor *ra_match) +{ + int i; + int ret; + int attr_count = 0; + int unpacked = FLB_FALSE; + size_t array_size; + void *out_buf; + size_t offset = 0; + size_t out_size; + msgpack_object_kv *kv; + msgpack_object *metadata; + msgpack_unpacked result; + Opentelemetry__Proto__Common__V1__KeyValue **buf; + + /* Maximum array size is the total number of root keys in metadata and record keys */ + array_size = event->body->via.map.size; + + /* log metadata (metada that comes from original Fluent Bit record ) */ + metadata = event->metadata; + if (metadata) { + array_size += metadata->via.map.size; + } + + /* + * Remove the keys from the record that were added to the log body and create a new output + * buffer. If there are matches, meaning that a new output buffer was created, ret will + * be FLB_TRUE, if no matches exists it returns FLB_FALSE. + */ + if (ctx->mp_accessor && ra_match) { + /* + * if ra_match is not NULL, it means that the log body was populated with a key from the record + * and the variable holds a reference to the record accessor that matched the key. + * + * Since 'likely' the mp_accessor context can have multiple record accessor patterns, + * we need to make sure to remove 'only' the one that was used in the log body, + * the approach we take is to disable all the patterns, enable the single one that + * matched, process and then re-enable all of them. + */ + flb_mp_accessor_set_active(ctx->mp_accessor, FLB_FALSE); + + /* Only active the one that matched */ + flb_mp_accessor_set_active_by_pattern(ctx->mp_accessor, + ra_match->pattern, + FLB_TRUE); + + /* Remove the undesired key */ + ret = flb_mp_accessor_keys_remove(ctx->mp_accessor, event->body, &out_buf, &out_size); + if (ret) { + msgpack_unpacked_init(&result); + msgpack_unpack_next(&result, out_buf, out_size, &offset); + + array_size += result.data.via.map.size; + unpacked = FLB_TRUE; + } + /* Enable all the mp_accessors */ + flb_mp_accessor_set_active(ctx->mp_accessor, FLB_TRUE); + } + + /* allocate an array to hold the converted map entries */ + buf = flb_calloc(array_size, sizeof(Opentelemetry__Proto__Common__V1__KeyValue *)); + if (!buf) { + flb_errno(); + if (unpacked) { + msgpack_unpacked_destroy(&result); + flb_free(out_buf); + } + return -1; + } + + /* pack metadata */ + for (i = 0; i < metadata->via.map.size; i++) { + kv = &metadata->via.map.ptr[i]; + buf[i] = msgpack_kv_to_otlp_any_value(kv); + attr_count++; + } + + /* remaining fields that were not added to log body */ + if (unpacked) { + /* iterate the map and reference each elemento as an OTLP value */ + for (i = 0; i < result.data.via.map.size; i++) { + kv = &result.data.via.map.ptr[i]; + buf[attr_count] = msgpack_kv_to_otlp_any_value(kv); + attr_count++; + } + msgpack_unpacked_destroy(&result); + flb_free(out_buf); + } + + log_record->attributes = buf; + log_record->n_attributes = attr_count; + return 0; +} + static int flush_to_otel(struct opentelemetry_context *ctx, struct flb_event_chunk *event_chunk, Opentelemetry__Proto__Logs__V1__LogRecord **logs, size_t log_count) { + int ret; + void *body; + unsigned len; + Opentelemetry__Proto__Collector__Logs__V1__ExportLogsServiceRequest export_logs; Opentelemetry__Proto__Logs__V1__ScopeLogs scope_log; Opentelemetry__Proto__Logs__V1__ResourceLogs resource_log; Opentelemetry__Proto__Logs__V1__ResourceLogs *resource_logs[1]; Opentelemetry__Proto__Logs__V1__ScopeLogs *scope_logs[1]; - void *body; - unsigned len; - int res; opentelemetry__proto__collector__logs__v1__export_logs_service_request__init(&export_logs); opentelemetry__proto__logs__v1__resource_logs__init(&resource_log); @@ -781,15 +923,15 @@ static int flush_to_otel(struct opentelemetry_context *ctx, opentelemetry__proto__collector__logs__v1__export_logs_service_request__pack(&export_logs, body); - // send post request to opentelemetry with content type application/x-protobuf - res = http_post(ctx, body, len, + /* send post request to opentelemetry with content type application/x-protobuf */ + ret = http_post(ctx, body, len, event_chunk->tag, flb_sds_len(event_chunk->tag), ctx->logs_uri); flb_free(body); - return res; + return ret; } static int process_logs(struct flb_event_chunk *event_chunk, @@ -797,25 +939,22 @@ static int process_logs(struct flb_event_chunk *event_chunk, struct flb_input_instance *ins, void *out_context, struct flb_config *config) { + int ret; + size_t i; size_t log_record_count; Opentelemetry__Proto__Logs__V1__LogRecord **log_record_list; Opentelemetry__Proto__Logs__V1__LogRecord *log_records; - Opentelemetry__Proto__Common__V1__AnyValue *log_object; struct flb_log_event_decoder *decoder; struct flb_log_event event; - size_t index; struct opentelemetry_context *ctx; - int res; + struct flb_record_accessor *ra_match; ctx = (struct opentelemetry_context *) out_context; - log_record_list = (Opentelemetry__Proto__Logs__V1__LogRecord **) \ - flb_calloc(ctx->batch_size, - sizeof(Opentelemetry__Proto__Logs__V1__LogRecord *)); - - if (log_record_list == NULL) { + log_record_list = (Opentelemetry__Proto__Logs__V1__LogRecord **) + flb_calloc(ctx->batch_size, sizeof(Opentelemetry__Proto__Logs__V1__LogRecord *)); + if (!log_record_list) { flb_errno(); - return -1; } @@ -823,72 +962,71 @@ static int process_logs(struct flb_event_chunk *event_chunk, flb_calloc(ctx->batch_size, sizeof(Opentelemetry__Proto__Logs__V1__LogRecord)); - if (log_records == NULL) { + if (!log_records) { flb_errno(); - flb_free(log_record_list); - return -2; } - for(index = 0 ; index < ctx->batch_size ; index++) { - log_record_list[index] = &log_records[index]; + for (i = 0 ; i < ctx->batch_size ; i++) { + log_record_list[i] = &log_records[i]; } - decoder = flb_log_event_decoder_create((char *) event_chunk->data, - event_chunk->size); - + decoder = flb_log_event_decoder_create((char *) event_chunk->data, event_chunk->size); if (decoder == NULL) { flb_plg_error(ctx->ins, "could not initialize record decoder"); - flb_free(log_record_list); flb_free(log_records); - return -1; } log_record_count = 0; - res = FLB_OK; - - while (flb_log_event_decoder_next(decoder, &event) == 0 && - res == FLB_OK) { + ret = FLB_OK; + while (flb_log_event_decoder_next(decoder, &event) == FLB_EVENT_DECODER_SUCCESS) { + ra_match = NULL; opentelemetry__proto__logs__v1__log_record__init(&log_records[log_record_count]); - log_records[log_record_count].attributes = \ - msgpack_map_to_otlp_kvarray(event.metadata, - &log_records[log_record_count].n_attributes); - log_object = msgpack_object_to_otlp_any_value(event.body); + /* + * Set the record body by using the logic defined in the configuration by + * the 'logs_body_key' properties. + * + * Note that the reference set in `out_body_parent_key` is the parent/root key that holds the content + * that was discovered. We get that reference so we can easily filter it out when composing + * the final list of attributes. + */ + ret = log_record_set_body(ctx, &log_records[log_record_count], &event, &ra_match); + if (ret == -1) { + /* the only possible fail path is a problem with a memory allocation, let's suggest a FLB_RETRY */ + ret = FLB_RETRY; + break; + } - if (log_object == NULL) { - flb_plg_error(ctx->ins, "log event conversion failure"); - res = FLB_ERROR; - continue; + /* set attributes from metadata and remaining fields from the main record */ + ret = log_record_set_attributes(ctx, &log_records[log_record_count], &event, ra_match); + if (ret == -1) { + /* as before, it can only fail on a memory allocation */ + ret = FLB_RETRY; + break; } + ret = FLB_OK; - log_records[log_record_count].body = log_object; + /* set timestamp */ log_records[log_record_count].time_unix_nano = flb_time_to_nanosec(&event.timestamp); - log_record_count++; if (log_record_count >= ctx->batch_size) { - res = flush_to_otel(ctx, - event_chunk, - log_record_list, - log_record_count); - + ret = flush_to_otel(ctx, event_chunk, log_record_list, log_record_count); clear_array(log_record_list, log_record_count); - log_record_count = 0; } } flb_log_event_decoder_destroy(decoder); - if (log_record_count > 0 && - res == FLB_OK) { - res = flush_to_otel(ctx, + if (log_record_count > 0 && ret == FLB_OK) { + ret = flush_to_otel(ctx, event_chunk, log_record_list, log_record_count); @@ -899,13 +1037,13 @@ static int process_logs(struct flb_event_chunk *event_chunk, flb_free(log_record_list); flb_free(log_records); - return res; + return ret; } static int process_metrics(struct flb_event_chunk *event_chunk, - struct flb_output_flush *out_flush, - struct flb_input_instance *ins, void *out_context, - struct flb_config *config) + struct flb_output_flush *out_flush, + struct flb_input_instance *ins, void *out_context, + struct flb_config *config) { int c = 0; int ok; @@ -1165,11 +1303,34 @@ static struct flb_config_map config_map[] = { 0, FLB_TRUE, offsetof(struct opentelemetry_context, metrics_uri), "Specify an optional HTTP URI for the target OTel endpoint." }, + + { + FLB_CONFIG_MAP_INT, "batch_size", DEFAULT_LOG_RECORD_BATCH_SIZE, + 0, FLB_TRUE, offsetof(struct opentelemetry_context, batch_size), + "Set the maximum number of log records to be flushed at a time" + }, + { + FLB_CONFIG_MAP_STR, "compress", NULL, + 0, FLB_FALSE, 0, + "Set payload compression mechanism. Option available is 'gzip'" + }, + + /* + * Logs Properties + * --------------- + */ { FLB_CONFIG_MAP_STR, "logs_uri", "/v1/logs", 0, FLB_TRUE, offsetof(struct opentelemetry_context, logs_uri), "Specify an optional HTTP URI for the target OTel endpoint." }, + + { + FLB_CONFIG_MAP_STR, "logs_body_key", NULL, + FLB_CONFIG_MAP_MULT, FLB_TRUE, offsetof(struct opentelemetry_context, log_body_key_list_str), + "Specify an optional HTTP URI for the target OTel endpoint." + }, + { FLB_CONFIG_MAP_STR, "traces_uri", "/v1/traces", 0, FLB_TRUE, offsetof(struct opentelemetry_context, traces_uri), @@ -1180,16 +1341,6 @@ static struct flb_config_map config_map[] = { 0, FLB_TRUE, offsetof(struct opentelemetry_context, log_response_payload), "Specify if the response paylod should be logged or not" }, - { - FLB_CONFIG_MAP_INT, "batch_size", DEFAULT_LOG_RECORD_BATCH_SIZE, - 0, FLB_TRUE, offsetof(struct opentelemetry_context, batch_size), - "Set the maximum number of log records to be flushed at a time" - }, - { - FLB_CONFIG_MAP_STR, "compress", NULL, - 0, FLB_FALSE, 0, - "Set payload compression mechanism. Option available is 'gzip'" - }, /* EOF */ {0} }; diff --git a/plugins/out_opentelemetry/opentelemetry.h b/plugins/out_opentelemetry/opentelemetry.h index c49d25622d4..6ef6b2bab87 100644 --- a/plugins/out_opentelemetry/opentelemetry.h +++ b/plugins/out_opentelemetry/opentelemetry.h @@ -21,6 +21,8 @@ #define FLB_OUT_OPENTELEMETRY_H #include +#include +#include #define FLB_OPENTELEMETRY_CONTENT_TYPE_HEADER_NAME "Content-Type" #define FLB_OPENTELEMETRY_MIME_PROTOBUF_LITERAL "application/x-protobuf" @@ -33,6 +35,12 @@ */ #define DEFAULT_LOG_RECORD_BATCH_SIZE "1000" +struct opentelemetry_body_key { + flb_sds_t key; + struct flb_record_accessor *ra; + struct mk_list _head; +}; + /* Plugin context */ struct opentelemetry_context { /* HTTP Auth */ @@ -60,9 +68,21 @@ struct opentelemetry_context { /* config reader for 'add_label' */ struct mk_list *add_labels; + /* + * list of linked list body keys given at configuration: note this list is just a slist, + * of strings, once is parsed, it populate the final list in 'log_body_key_list' + */ + struct mk_list *log_body_key_list_str; + + /* head of linked list body keys populated once log_body_key_list_str is parsed */ + struct mk_list log_body_key_list; + /* internal labels ready to append */ struct mk_list kv_labels; + /* special accessor with list of patterns used to populate log metadata */ + struct flb_mp_accessor *mp_accessor; + /* Upstream connection to the backend server */ struct flb_upstream *u; diff --git a/plugins/out_opentelemetry/opentelemetry_conf.c b/plugins/out_opentelemetry/opentelemetry_conf.c index f85876d18dd..83f14bbe153 100644 --- a/plugins/out_opentelemetry/opentelemetry_conf.c +++ b/plugins/out_opentelemetry/opentelemetry_conf.c @@ -26,6 +26,122 @@ #include "opentelemetry.h" #include "opentelemetry_conf.h" +/* create a single entry of log_body_key */ +static int log_body_key_create(struct opentelemetry_context *ctx, char *ra_pattern) +{ + struct opentelemetry_body_key *bk; + + bk = flb_calloc(1, sizeof(struct opentelemetry_body_key)); + if (!bk) { + flb_errno(); + return -1; + } + + bk->key = flb_sds_create(ra_pattern); + if (!bk->key) { + flb_free(bk); + return -1; + } + + bk->ra = flb_ra_create(ra_pattern, FLB_TRUE); + if (!bk->ra) { + flb_plg_error(ctx->ins, + "could not process event_field with pattern '%s'", + ra_pattern); + flb_sds_destroy(bk->key); + flb_free(bk); + return -1; + } + + mk_list_add(&bk->_head, &ctx->log_body_key_list); + + return 0; +} + +/* process and instance the list of body key patterns */ +static int log_body_key_list_create(struct opentelemetry_context *ctx) +{ + int ret; + struct mk_list *head; + struct flb_config_map_val *mv; + + /* If no log_body_key are defined, set the default ones */ + if (!ctx->log_body_key_list_str || mk_list_size(ctx->log_body_key_list_str) == 0) { + ret = log_body_key_create(ctx, "$log"); + if (ret != 0) { + return -1; + } + + ret = log_body_key_create(ctx, "$message"); + if (ret != 0) { + return -1; + } + + return 0; + } + + /* Iterate the list of log body keys defined in the configuration and initiate them */ + flb_config_map_foreach(head, mv, ctx->log_body_key_list_str) { + ret = log_body_key_create(ctx, mv->val.str); + if (ret != 0) { + return -1; + } + } + + return 0; +} + +static void log_body_key_list_destroy(struct opentelemetry_context *ctx) +{ + struct mk_list *tmp; + struct mk_list *head; + struct opentelemetry_body_key *bk; + + mk_list_foreach_safe(head, tmp, &ctx->log_body_key_list) { + bk = mk_list_entry(head, struct opentelemetry_body_key, _head); + flb_sds_destroy(bk->key); + flb_ra_destroy(bk->ra); + mk_list_del(&bk->_head); + flb_free(bk); + } +} + +static int metadata_mp_accessor_create(struct opentelemetry_context *ctx) +{ + int ret; + struct mk_list *head; + struct mk_list slist; + struct opentelemetry_body_key *bk; + struct flb_mp_accessor *mpa; + + ret = flb_slist_create(&slist); + if (ret != 0) { + return -1; + } + + /* Iterate the list of log body keys and create a mp_accessor for each one */ + mk_list_foreach(head, &ctx->log_body_key_list) { + bk = mk_list_entry(head, struct opentelemetry_body_key, _head); + + ret = flb_slist_add(&slist, bk->key); + if (ret != 0) { + flb_slist_destroy(&slist); + return -1; + } + } + + mpa = flb_mp_accessor_create(&slist); + if (!mpa) { + flb_slist_destroy(&slist); + return -1; + } + + ctx->mp_accessor = mpa; + flb_slist_destroy(&slist); + + return 0; +} + static int config_add_labels(struct flb_output_instance *ins, struct opentelemetry_context *ctx) { @@ -119,8 +235,7 @@ static char *sanitize_uri(char *uri){ return uri; } -struct opentelemetry_context *flb_opentelemetry_context_create( - struct flb_output_instance *ins, struct flb_config *config) +struct opentelemetry_context *flb_opentelemetry_context_create(struct flb_output_instance *ins, struct flb_config *config) { int ret; int io_flags = 0; @@ -142,6 +257,7 @@ struct opentelemetry_context *flb_opentelemetry_context_create( } ctx->ins = ins; mk_list_init(&ctx->kv_labels); + mk_list_init(&ctx->log_body_key_list); ret = flb_output_config_map_set(ins, (void *) ctx); if (ret == -1) { @@ -155,6 +271,7 @@ struct opentelemetry_context *flb_opentelemetry_context_create( return NULL; } + check_proxy(ins, ctx, host, port, protocol, metrics_uri); check_proxy(ins, ctx, host, port, protocol, logs_uri); @@ -202,6 +319,8 @@ struct opentelemetry_context *flb_opentelemetry_context_create( ctx->host = ins->host.name; ctx->port = ins->host.port; + + /* Logs Properties */ if (logs_uri == NULL) { flb_plg_trace(ctx->ins, "Could not allocate memory for sanitized " @@ -211,6 +330,26 @@ struct opentelemetry_context *flb_opentelemetry_context_create( ctx->logs_uri = logs_uri; } + /* list of 'logs_body_key' */ + ret = log_body_key_list_create(ctx); + if (ret != 0) { + flb_opentelemetry_context_destroy(ctx); + return NULL; + } + + /* + * Add the pattern to the mp_accessor list: for every key that populates the log body, we need + * it also in the mp_accessor list so remaining keys are set into the metadata field. + * + * This process is far from being optimal since we are kind of duplicating the logic, however + * we can simply use the API already exists in place, let's optimize later (if needed). + */ + ret = metadata_mp_accessor_create(ctx); + if (ret != 0) { + flb_opentelemetry_context_destroy(ctx); + return NULL; + } + if (traces_uri == NULL) { flb_plg_trace(ctx->ins, "Could not allocate memory for sanitized " @@ -244,8 +383,7 @@ struct opentelemetry_context *flb_opentelemetry_context_create( return ctx; } -void flb_opentelemetry_context_destroy( - struct opentelemetry_context *ctx) +void flb_opentelemetry_context_destroy(struct opentelemetry_context *ctx) { if (!ctx) { return; @@ -257,6 +395,13 @@ void flb_opentelemetry_context_destroy( flb_upstream_destroy(ctx->u); } + /* release log_body_key_list */ + log_body_key_list_destroy(ctx); + + if (ctx->mp_accessor) { + flb_mp_accessor_destroy(ctx->mp_accessor); + } + flb_free(ctx->proxy_host); flb_free(ctx); }