diff --git a/cmake/plugins_options.cmake b/cmake/plugins_options.cmake index 88c935689e4..5d08e505fff 100644 --- a/cmake/plugins_options.cmake +++ b/cmake/plugins_options.cmake @@ -67,7 +67,7 @@ DEFINE_OPTION(FLB_PROCESSOR_CONTENT_MODIFIER "Enable content modifier processor DEFINE_OPTION(FLB_PROCESSOR_LABELS "Enable metrics label manipulation processor" ON) DEFINE_OPTION(FLB_PROCESSOR_METRICS_SELECTOR "Enable metrics selector processor" ON) DEFINE_OPTION(FLB_PROCESSOR_SQL "Enable SQL processor" ON) - +DEFINE_OPTION(FLB_PROCESSOR_OPENTELEMETRY_ENVELOPE "Enable OpenTelemetry envelope processor" ON) # Filters # ======= diff --git a/plugins/CMakeLists.txt b/plugins/CMakeLists.txt index 9006ef6d823..ce8cae64d97 100644 --- a/plugins/CMakeLists.txt +++ b/plugins/CMakeLists.txt @@ -286,6 +286,7 @@ REGISTER_PROCESSOR_PLUGIN("processor_content_modifier") REGISTER_PROCESSOR_PLUGIN("processor_labels") REGISTER_PROCESSOR_PLUGIN("processor_metrics_selector") REGISTER_PROCESSOR_PLUGIN("processor_sql") +REGISTER_PROCESSOR_PLUGIN("processor_opentelemetry_envelope") # OUTPUTS # ======= diff --git a/plugins/processor_opentelemetry_envelope/CMakeLists.txt b/plugins/processor_opentelemetry_envelope/CMakeLists.txt new file mode 100644 index 00000000000..e244060cffc --- /dev/null +++ b/plugins/processor_opentelemetry_envelope/CMakeLists.txt @@ -0,0 +1,4 @@ +set(src + otel_envelope.c) + +FLB_PLUGIN(processor_opentelemetry_envelope "${src}" "") diff --git a/plugins/processor_opentelemetry_envelope/otel_envelope.c b/plugins/processor_opentelemetry_envelope/otel_envelope.c new file mode 100644 index 00000000000..c83fbb8f328 --- /dev/null +++ b/plugins/processor_opentelemetry_envelope/otel_envelope.c @@ -0,0 +1,224 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Fluent Bit + * ========== + * Copyright (C) 2015-2024 The Fluent Bit Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +#include +#include + +/* Processor initialization */ +static int cb_init(struct flb_processor_instance *ins, + void *source_plugin_instance, + int source_plugin_type, + struct flb_config *config) +{ + return FLB_PROCESSOR_SUCCESS; +} + +/* Processor exit */ +static int cb_exit(struct flb_processor_instance *ins, void *data) +{ + return FLB_PROCESSOR_SUCCESS; +} + +/* Create an group start with OTLP style-signature */ +static struct flb_mp_chunk_record *envelop_init(struct cfl_list *list, struct flb_mp_chunk_record *active_record) +{ + int ret; + struct cfl_kvlist *kvlist_meta = NULL; + struct cfl_kvlist *kvlist_record = NULL; + struct cfl_kvlist *kvlist_resource = NULL; + struct cfl_kvlist *kvlist_scope = NULL; + struct cfl_object *cobj_meta = NULL; + struct cfl_object *cobj_record = NULL; + struct flb_mp_chunk_record *record = NULL; + struct flb_time tm; + + /* metadata */ + kvlist_meta = cfl_kvlist_create(); + if (!kvlist_meta) { + return NULL; + } + + cfl_kvlist_insert_string(kvlist_meta, "schema", "otlp"); + cfl_kvlist_insert_int64(kvlist_meta, "resource_id", 0); + cfl_kvlist_insert_int64(kvlist_meta, "scope_id", 0); + + /* empty content */ + kvlist_record = cfl_kvlist_create(); + if (!kvlist_record) { + goto failure; + } + + kvlist_resource = cfl_kvlist_create(); + if (!kvlist_resource) { + goto failure; + } + + kvlist_scope = cfl_kvlist_create(); + if (!kvlist_scope) { + goto failure; + } + + cfl_kvlist_insert_kvlist(kvlist_record, "resource", kvlist_resource); + cfl_kvlist_insert_kvlist(kvlist_record, "scope", kvlist_scope); + + record = flb_mp_chunk_record_create(NULL); + if (!record) { + goto failure; + } + + cobj_meta = cfl_object_create(); + if (!cobj_meta) { + goto failure; + } + ret = cfl_object_set(cobj_meta, CFL_OBJECT_KVLIST, kvlist_meta); + if (ret != 0) { + goto failure; + } + + cobj_record = cfl_object_create(); + if (!cobj_record) { + goto failure; + } + ret = cfl_object_set(cobj_record, CFL_OBJECT_KVLIST, kvlist_record); + if (ret != 0) { + goto failure; + } + + /* set the group flag in the timestamp field */ + flb_time_set(&tm, FLB_LOG_EVENT_GROUP_START, 0); + flb_time_copy(&record->event.timestamp, &tm); + + record->modified = FLB_TRUE; + record->cobj_metadata = cobj_meta; + record->cobj_record = cobj_record; + + /* add the envelop before the active record */ + cfl_list_add_before(&record->_head, &active_record->_head, list); + + return record; + +failure: + if (kvlist_meta) { + cfl_kvlist_destroy(kvlist_meta); + } + if (kvlist_record) { + cfl_kvlist_destroy(kvlist_record); + } + if (kvlist_resource) { + cfl_kvlist_destroy(kvlist_resource); + } + if (kvlist_scope) { + cfl_kvlist_destroy(kvlist_scope); + } + if (cobj_meta) { + cfl_object_destroy(cobj_meta); + } + if (cobj_record) { + cfl_object_destroy(cobj_record); + } + if (record) { + flb_mp_chunk_cobj_record_destroy(NULL, record); + } + + return NULL; +} + +/* Create an group end */ +static void envelop_end(struct cfl_list *list, struct flb_mp_chunk_record *active_record) +{ + struct flb_time tm; + struct flb_mp_chunk_record *record; + + /* set the group flag in the timestamp field */ + record = flb_mp_chunk_record_create(NULL); + if (!record) { + return; + } + + flb_time_set(&tm, FLB_LOG_EVENT_GROUP_END, 0); + flb_time_copy(&record->event.timestamp, &tm); + + record->modified = FLB_TRUE; + record->cobj_metadata = NULL; + record->cobj_record = NULL; + + /* add the envelop before the active record */ + cfl_list_add_after(&record->_head, &active_record->_head, list); +} + + +#include + +/* Logs callback */ +static int cb_process_logs(struct flb_processor_instance *ins, + void *chunk_data, const char *tag, int tag_len) +{ + int ret; + int record_type; + int grouped = FLB_FALSE; + struct flb_mp_chunk_record *prev_record; + struct flb_mp_chunk_record *record; + struct flb_mp_chunk_cobj *chunk_cobj = (struct flb_mp_chunk_cobj *) chunk_data; + + + /* Iterate records */ + while (flb_mp_chunk_cobj_record_next(chunk_cobj, &record) == FLB_MP_CHUNK_RECORD_OK) { + prev_record = record; + + /* get record type */ + ret = flb_log_event_decoder_get_record_type(&record->event, &record_type); + if (ret != 0) { + flb_plg_error(ins, "record has invalid event type"); + continue; + } + + if (record_type == FLB_LOG_EVENT_NORMAL && grouped == FLB_FALSE) { + envelop_init(&chunk_cobj->records, record); + grouped = FLB_TRUE; + } + else if (record_type == FLB_LOG_EVENT_GROUP_START && grouped == FLB_TRUE) { + envelop_end(&chunk_cobj->records, record); + grouped = FLB_FALSE; + } + } + + if (grouped == FLB_TRUE) { + envelop_end(&chunk_cobj->records, prev_record); + } + + return FLB_PROCESSOR_SUCCESS; +} + +static struct flb_config_map config_map[] = { + /* EOF */ + {0} +}; + +struct flb_processor_plugin processor_opentelemetry_envelope_plugin = { + .name = "opentelemetry_envelope", + .description = "Package log records inside an OpenTelemetry Logs schema", + .cb_init = cb_init, + .cb_process_logs = cb_process_logs, + .cb_process_metrics = NULL, + .cb_process_traces = NULL, + .cb_exit = cb_exit, + .config_map = config_map, + .flags = 0, +}; \ No newline at end of file