Skip to content

Commit

Permalink
out_opentelemetry: export traces
Browse files Browse the repository at this point in the history
Signed-off-by: Aditya Prajapati <[email protected]>
  • Loading branch information
Aditya Prajapati authored and edsiper committed Oct 15, 2022
1 parent 2d022fd commit fd0a0de
Show file tree
Hide file tree
Showing 2 changed files with 95 additions and 3 deletions.
97 changes: 94 additions & 3 deletions plugins/out_opentelemetry/opentelemetry.c
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@
#include <cmetrics/cmetrics.h>
#include <cmetrics/cmt_encode_opentelemetry.h>

#include <ctraces/ctraces.h>
#include <ctraces/ctr_decode_msgpack.h>

extern cfl_sds_t cmt_encode_opentelemetry_create(struct cmt *cmt);
extern void cmt_encode_opentelemetry_destroy(struct cmt *cmt);

Expand Down Expand Up @@ -428,6 +431,86 @@ static int process_metrics(struct flb_event_chunk *event_chunk,
return result;
}

static int process_traces(struct flb_event_chunk *event_chunk,
struct flb_output_flush *out_flush,
struct flb_input_instance *ins, void *out_context,
struct flb_config *config)
{
int ok;
int ret;
int result;
cfl_sds_t encoded_chunk;
flb_sds_t buf = NULL;
size_t off = 0;
struct ctrace *ctr;
struct opentelemetry_context *ctx = out_context;

/* Initialize vars */
ctx = out_context;
ok = 0;
result = FLB_OK;

buf = flb_sds_create_size(event_chunk->size);
if (!buf) {
flb_plg_error(ctx->ins, "could not allocate outgoing buffer");
return FLB_RETRY;
}

flb_plg_debug(ctx->ins, "ctraces msgpack size: %lu",
event_chunk->size);

ret = ctr_decode_msgpack_create(&ctr,
(char *) event_chunk->data,
event_chunk->size, &off);
if (ret != ok) {
flb_plg_error(ctx->ins, "Error decoding msgpack encoded context");
}

/* Create a OpenTelemetry payload */
encoded_chunk = ctr_encode_opentelemetry_create(ctr);
if (encoded_chunk == NULL) {
flb_plg_error(ctx->ins,
"Error encoding context as opentelemetry");
result = FLB_ERROR;
goto exit;
}

/* concat buffer */
flb_sds_cat_safe(&buf, encoded_chunk, flb_sds_len(encoded_chunk));

/* release */
ctr_encode_opentelemetry_destroy(encoded_chunk);
ctr_destroy(ctr);

flb_plg_debug(ctx->ins, "final payload size: %lu", flb_sds_len(buf));
if (buf && flb_sds_len(buf) > 0) {
/* Send HTTP request */
result = http_post(ctx, buf, flb_sds_len(buf),
event_chunk->tag,
flb_sds_len(event_chunk->tag),
ctx->traces_uri);

/* Debug http_post() result statuses */
if (result == FLB_OK) {
flb_plg_debug(ctx->ins, "http_post result FLB_OK");
}
else if (result == FLB_ERROR) {
flb_plg_debug(ctx->ins, "http_post result FLB_ERROR");
}
else if (result == FLB_RETRY) {
flb_plg_debug(ctx->ins, "http_post result FLB_RETRY");
}
}
flb_sds_destroy(buf);
buf = NULL;

exit:
if (buf) {
flb_sds_destroy(buf);
}
return result;
}

static int cb_opentelemetry_exit(void *data, struct flb_config *config)
{
struct opentelemetry_context *ctx;
Expand Down Expand Up @@ -462,12 +545,15 @@ static void cb_opentelemetry_flush(struct flb_event_chunk *event_chunk,
{
int result = FLB_RETRY;

if (ins->event_type == FLB_OUTPUT_METRICS || ins->event_type == FLB_INPUT_METRICS){
if (ins->event_type == FLB_INPUT_METRICS){
result = process_metrics(event_chunk, out_flush, ins, out_context, config);
}
else if (ins->event_type == FLB_INPUT_LOGS || ins->event_type == FLB_OUTPUT_LOGS){
else if (ins->event_type == FLB_INPUT_LOGS){
result = process_logs(event_chunk, out_flush, ins, out_context, config);
}
else if (ins->event_type == FLB_INPUT_TRACES){
result = process_traces(event_chunk, out_flush, ins, out_context, config);
}
FLB_OUTPUT_RETURN(result);
}

Expand Down Expand Up @@ -510,6 +596,11 @@ static struct flb_config_map config_map[] = {
0, FLB_TRUE, offsetof(struct opentelemetry_context, logs_uri),
"Specify an optional HTTP URI for the target OTel endpoint."
},
{
FLB_CONFIG_MAP_STR, "traces_uri", "/v1/traces",
0, FLB_TRUE, offsetof(struct opentelemetry_context, traces_uri),
"Specify an optional HTTP URI for the target OTel endpoint."
},
{
FLB_CONFIG_MAP_BOOL, "log_response_payload", "true",
0, FLB_TRUE, offsetof(struct opentelemetry_context, log_response_payload),
Expand All @@ -527,6 +618,6 @@ struct flb_output_plugin out_opentelemetry_plugin = {
.cb_flush = cb_opentelemetry_flush,
.cb_exit = cb_opentelemetry_exit,
.config_map = config_map,
.event_type = FLB_OUTPUT_LOGS | FLB_OUTPUT_METRICS,
.event_type = FLB_OUTPUT_LOGS,
.flags = FLB_OUTPUT_NET | FLB_IO_OPT_TLS,
};
1 change: 1 addition & 0 deletions plugins/out_opentelemetry/opentelemetry.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ struct opentelemetry_context {
int proxy_port;

/* HTTP URI */
char *traces_uri;
char *metrics_uri;
char *logs_uri;
char *host;
Expand Down

0 comments on commit fd0a0de

Please sign in to comment.