From fd0a0de7da4f2db16cb4e48a23e91949a9a85b6b Mon Sep 17 00:00:00 2001 From: Aditya Prajapati Date: Tue, 11 Oct 2022 21:42:35 +0530 Subject: [PATCH] out_opentelemetry: export traces Signed-off-by: Aditya Prajapati --- plugins/out_opentelemetry/opentelemetry.c | 97 ++++++++++++++++++++++- plugins/out_opentelemetry/opentelemetry.h | 1 + 2 files changed, 95 insertions(+), 3 deletions(-) diff --git a/plugins/out_opentelemetry/opentelemetry.c b/plugins/out_opentelemetry/opentelemetry.c index 73bf5d1977d..0de5b7c473f 100644 --- a/plugins/out_opentelemetry/opentelemetry.c +++ b/plugins/out_opentelemetry/opentelemetry.c @@ -30,6 +30,9 @@ #include #include +#include +#include + extern cfl_sds_t cmt_encode_opentelemetry_create(struct cmt *cmt); extern void cmt_encode_opentelemetry_destroy(struct cmt *cmt); @@ -428,6 +431,86 @@ static int process_metrics(struct flb_event_chunk *event_chunk, return result; } +static int process_traces(struct flb_event_chunk *event_chunk, + struct flb_output_flush *out_flush, + struct flb_input_instance *ins, void *out_context, + struct flb_config *config) +{ + int ok; + int ret; + int result; + cfl_sds_t encoded_chunk; + flb_sds_t buf = NULL; + size_t off = 0; + struct ctrace *ctr; + struct opentelemetry_context *ctx = out_context; + + /* Initialize vars */ + ctx = out_context; + ok = 0; + result = FLB_OK; + + buf = flb_sds_create_size(event_chunk->size); + if (!buf) { + flb_plg_error(ctx->ins, "could not allocate outgoing buffer"); + return FLB_RETRY; + } + + flb_plg_debug(ctx->ins, "ctraces msgpack size: %lu", + event_chunk->size); + + ret = ctr_decode_msgpack_create(&ctr, + (char *) event_chunk->data, + event_chunk->size, &off); + if (ret != ok) { + flb_plg_error(ctx->ins, "Error decoding msgpack encoded context"); + } + + /* Create a OpenTelemetry payload */ + encoded_chunk = ctr_encode_opentelemetry_create(ctr); + if (encoded_chunk == NULL) { + flb_plg_error(ctx->ins, + "Error encoding context as opentelemetry"); + result = FLB_ERROR; + goto exit; + } + + /* concat buffer */ + flb_sds_cat_safe(&buf, encoded_chunk, flb_sds_len(encoded_chunk)); + + /* release */ + ctr_encode_opentelemetry_destroy(encoded_chunk); + ctr_destroy(ctr); + + flb_plg_debug(ctx->ins, "final payload size: %lu", flb_sds_len(buf)); + if (buf && flb_sds_len(buf) > 0) { + /* Send HTTP request */ + result = http_post(ctx, buf, flb_sds_len(buf), + event_chunk->tag, + flb_sds_len(event_chunk->tag), + ctx->traces_uri); + + /* Debug http_post() result statuses */ + if (result == FLB_OK) { + flb_plg_debug(ctx->ins, "http_post result FLB_OK"); + } + else if (result == FLB_ERROR) { + flb_plg_debug(ctx->ins, "http_post result FLB_ERROR"); + } + else if (result == FLB_RETRY) { + flb_plg_debug(ctx->ins, "http_post result FLB_RETRY"); + } + } + flb_sds_destroy(buf); + buf = NULL; + +exit: + if (buf) { + flb_sds_destroy(buf); + } + return result; +} + static int cb_opentelemetry_exit(void *data, struct flb_config *config) { struct opentelemetry_context *ctx; @@ -462,12 +545,15 @@ static void cb_opentelemetry_flush(struct flb_event_chunk *event_chunk, { int result = FLB_RETRY; - if (ins->event_type == FLB_OUTPUT_METRICS || ins->event_type == FLB_INPUT_METRICS){ + if (ins->event_type == FLB_INPUT_METRICS){ result = process_metrics(event_chunk, out_flush, ins, out_context, config); } - else if (ins->event_type == FLB_INPUT_LOGS || ins->event_type == FLB_OUTPUT_LOGS){ + else if (ins->event_type == FLB_INPUT_LOGS){ result = process_logs(event_chunk, out_flush, ins, out_context, config); } + else if (ins->event_type == FLB_INPUT_TRACES){ + result = process_traces(event_chunk, out_flush, ins, out_context, config); + } FLB_OUTPUT_RETURN(result); } @@ -510,6 +596,11 @@ static struct flb_config_map config_map[] = { 0, FLB_TRUE, offsetof(struct opentelemetry_context, logs_uri), "Specify an optional HTTP URI for the target OTel endpoint." }, + { + FLB_CONFIG_MAP_STR, "traces_uri", "/v1/traces", + 0, FLB_TRUE, offsetof(struct opentelemetry_context, traces_uri), + "Specify an optional HTTP URI for the target OTel endpoint." + }, { FLB_CONFIG_MAP_BOOL, "log_response_payload", "true", 0, FLB_TRUE, offsetof(struct opentelemetry_context, log_response_payload), @@ -527,6 +618,6 @@ struct flb_output_plugin out_opentelemetry_plugin = { .cb_flush = cb_opentelemetry_flush, .cb_exit = cb_opentelemetry_exit, .config_map = config_map, - .event_type = FLB_OUTPUT_LOGS | FLB_OUTPUT_METRICS, + .event_type = FLB_OUTPUT_LOGS, .flags = FLB_OUTPUT_NET | FLB_IO_OPT_TLS, }; diff --git a/plugins/out_opentelemetry/opentelemetry.h b/plugins/out_opentelemetry/opentelemetry.h index 659f0d64bb5..81693bb5e65 100644 --- a/plugins/out_opentelemetry/opentelemetry.h +++ b/plugins/out_opentelemetry/opentelemetry.h @@ -38,6 +38,7 @@ struct opentelemetry_context { int proxy_port; /* HTTP URI */ + char *traces_uri; char *metrics_uri; char *logs_uri; char *host;