Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

in_opentelemetry: use ctraces to decode binary payload #6180

Merged
merged 1 commit into from
Oct 20, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions plugins/in_opentelemetry/opentelemetry.c
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,11 @@ static struct flb_config_map config_map[] = {
0, FLB_TRUE, offsetof(struct flb_opentelemetry, successful_response_code),
"Set successful response code. 200, 201 and 204 are supported."
},

{
FLB_CONFIG_MAP_BOOL, "raw_traces", "false",
0, FLB_TRUE, offsetof(struct flb_opentelemetry, raw_traces),
"Forward traces without processing"
},

/* EOF */
{0}
Expand All @@ -193,5 +197,5 @@ struct flb_input_plugin in_opentelemetry_plugin = {
.cb_exit = in_opentelemetry_exit,
.config_map = config_map,
.flags = FLB_INPUT_NET | FLB_IO_OPT_TLS,
.event_type = FLB_INPUT_LOGS | FLB_INPUT_METRICS
.event_type = FLB_INPUT_LOGS
};
1 change: 1 addition & 0 deletions plugins/in_opentelemetry/opentelemetry.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ struct flb_opentelemetry {
flb_sds_t listen;
flb_sds_t tcp_port;
const char *tag_key;
bool raw_traces;

size_t buffer_max_size; /* Maximum buffer size */
size_t buffer_chunk_size; /* Chunk allocation size */
Expand Down
48 changes: 44 additions & 4 deletions plugins/in_opentelemetry/opentelemetry_prot.c
Original file line number Diff line number Diff line change
Expand Up @@ -128,10 +128,33 @@ static int process_payload_metrics(struct flb_opentelemetry *ctx, struct http_co
return 0;
}

static int process_payload_traces(struct flb_opentelemetry *ctx, struct http_conn *conn,
flb_sds_t tag,
struct mk_http_session *session,
struct mk_http_request *request)
static int process_payload_traces_proto(struct flb_opentelemetry *ctx, struct http_conn *conn,
flb_sds_t tag,
struct mk_http_session *session,
struct mk_http_request *request)
{
struct ctrace *decoded_context;
size_t offset;
int result;

offset = 0;
result = ctr_decode_opentelemetry_create(&decoded_context,
request->data.data,
request->data.len,
&offset);
if (result == 0) {
ctx->ins->event_type = FLB_INPUT_TRACES;
result = flb_input_trace_append(ctx->ins, NULL, 0, decoded_context);
ctr_decode_opentelemetry_destroy(decoded_context);
}

return result;
}

static int process_payload_raw_traces(struct flb_opentelemetry *ctx, struct http_conn *conn,
flb_sds_t tag,
struct mk_http_session *session,
struct mk_http_request *request)
{
int ret;
int root_type;
Expand Down Expand Up @@ -173,6 +196,23 @@ static int process_payload_traces(struct flb_opentelemetry *ctx, struct http_con
return 0;
}

static int process_payload_traces(struct flb_opentelemetry *ctx, struct http_conn *conn,
flb_sds_t tag,
struct mk_http_session *session,
struct mk_http_request *request)
{
int result;

if (ctx->raw_traces) {
result = process_payload_raw_traces(ctx, conn, tag, session, request);
}
else {
result = process_payload_traces_proto(ctx, conn, tag, session, request);
}

return result;
}

static inline int mk_http_point_header(mk_ptr_t *h,
struct mk_http_parser *parser, int key)
{
Expand Down