Skip to content

Commit

Permalink
in_opentelemetry: add support for logs (#5928)
Browse files Browse the repository at this point in the history
Signed-off-by: Aditya Prajapati <[email protected]>
  • Loading branch information
Aditya Prajapati authored Oct 24, 2022
1 parent 01134e8 commit c9637ab
Showing 1 changed file with 344 additions and 2 deletions.
346 changes: 344 additions & 2 deletions plugins/in_opentelemetry/opentelemetry_prot.c
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,15 @@
#include <monkey/mk_core.h>
#include <cmetrics/cmt_decode_opentelemetry.h>

#include <fluent-otel-proto/fluent-otel.h>
#include "opentelemetry.h"
#include "http_conn.h"

#define HTTP_CONTENT_JSON 0

static int otlp_pack_any_value(msgpack_packer *mp_pck,
Opentelemetry__Proto__Common__V1__AnyValue *body);

static int send_response(struct http_conn *conn, int http_status, char *message)
{
int len;
Expand Down Expand Up @@ -90,7 +94,6 @@ static int send_response(struct http_conn *conn, int http_status, char *message)
return 0;
}


static int process_payload_metrics(struct flb_opentelemetry *ctx, struct http_conn *conn,
flb_sds_t tag,
struct mk_http_session *session,
Expand Down Expand Up @@ -213,6 +216,335 @@ static int process_payload_traces(struct flb_opentelemetry *ctx, struct http_con
return result;
}

static int otel_pack_string(msgpack_packer *mp_pck, char *str)
{
return msgpack_pack_str_with_body(mp_pck, str, strlen(str));
}

static int otel_pack_bool(msgpack_packer *mp_pck, bool val)
{
if (val) {
return msgpack_pack_true(mp_pck);
}
else {
return msgpack_pack_false(mp_pck);
}
}

static int otel_pack_int(msgpack_packer *mp_pck, int val)
{
return msgpack_pack_int64(mp_pck, val);
}

static int otel_pack_double(msgpack_packer *mp_pck, double val)
{
return msgpack_pack_double(mp_pck, val);
}

static int otel_pack_kvlist(msgpack_packer *mp_pck,
Opentelemetry__Proto__Common__V1__KeyValueList *kv_list)
{
int kv_index;
int ret;
char *key;
Opentelemetry__Proto__Common__V1__AnyValue *value;

ret = msgpack_pack_map(mp_pck, kv_list->n_values);
if (ret != 0) {
return ret;
}

for (kv_index = 0; kv_index < kv_list->n_values && ret == 0; kv_index++) {
key = kv_list->values[kv_index]->key;
value = kv_list->values[kv_index]->value;

ret = otel_pack_string(mp_pck, key);

if(ret == 0) {
ret = otlp_pack_any_value(mp_pck, value);
}
}

return ret;
}

static int otel_pack_array(msgpack_packer *mp_pck,
Opentelemetry__Proto__Common__V1__ArrayValue *array)
{
int ret;
int array_index;

for (array_index = 0; array_index < array->n_values && ret == 0; array_index++) {
ret = otlp_pack_any_value(mp_pck, array->values[array_index]);
}
return ret;
}

static int otel_pack_bytes(msgpack_packer *mp_pck,
ProtobufCBinaryData bytes)
{
return msgpack_pack_bin_with_body(mp_pck, bytes.data, bytes.len);
}

static int otlp_pack_any_value(msgpack_packer *mp_pck,
Opentelemetry__Proto__Common__V1__AnyValue *body)
{
int result;

result = -2;

switch(body->value_case){
case OPENTELEMETRY__PROTO__COMMON__V1__ANY_VALUE__VALUE_STRING_VALUE:
result = otel_pack_string(mp_pck, body->string_value);
break;

case OPENTELEMETRY__PROTO__COMMON__V1__ANY_VALUE__VALUE_BOOL_VALUE:
result = otel_pack_bool(mp_pck, body->bool_value);
break;

case OPENTELEMETRY__PROTO__COMMON__V1__ANY_VALUE__VALUE_INT_VALUE:
result = otel_pack_int(mp_pck, body->int_value);
break;

case OPENTELEMETRY__PROTO__COMMON__V1__ANY_VALUE__VALUE_DOUBLE_VALUE:
result = otel_pack_double(mp_pck, body->double_value);
break;

case OPENTELEMETRY__PROTO__COMMON__V1__ANY_VALUE__VALUE_ARRAY_VALUE:
result = otel_pack_array(mp_pck, body->array_value);
break;

case OPENTELEMETRY__PROTO__COMMON__V1__ANY_VALUE__VALUE_KVLIST_VALUE:
result = otel_pack_kvlist(mp_pck, body->kvlist_value);
break;

case OPENTELEMETRY__PROTO__COMMON__V1__ANY_VALUE__VALUE_BYTES_VALUE:
result = otel_pack_bytes(mp_pck, body->bytes_value);
break;

default:
break;
}

if (result == -2) {
flb_error("[otel]: invalid value type in pack_any_value");
result = -1;
}

return result;
}

static int binary_payload_to_msgpack(msgpack_packer *mp_pck,
uint8_t *in_buf,
size_t in_size)
{
int ret;
int resource_logs_index;
int scope_log_index;
int log_record_index;

Opentelemetry__Proto__Collector__Logs__V1__ExportLogsServiceRequest *input_logs;
Opentelemetry__Proto__Logs__V1__ScopeLogs **scope_logs;
Opentelemetry__Proto__Logs__V1__ScopeLogs *scope_log;
Opentelemetry__Proto__Logs__V1__ResourceLogs **resource_logs;
Opentelemetry__Proto__Logs__V1__ResourceLogs *resource_log;
Opentelemetry__Proto__Logs__V1__LogRecord **log_records;
Opentelemetry__Proto__Logs__V1__LogRecord *log_record;

input_logs = opentelemetry__proto__collector__logs__v1__export_logs_service_request__unpack(NULL, in_size, in_buf);
if (input_logs == NULL) {
flb_error("[otel] Failed to unpack input logs");
return -1;
}

resource_logs = input_logs->resource_logs;
if (resource_logs == NULL) {
flb_error("[otel] No resource logs found");
return -1;
}

for (resource_logs_index = 0; resource_logs_index < input_logs->n_resource_logs; resource_logs_index++) {
resource_log = resource_logs[resource_logs_index];
scope_logs = resource_log->scope_logs;

if (resource_log->n_scope_logs > 0 && scope_logs == NULL) {
flb_error("[otel] No scope logs found");
return -1;
}

for (scope_log_index = 0; scope_log_index < resource_log->n_scope_logs; scope_log_index++) {
scope_log = scope_logs[scope_log_index];
log_records = scope_log->log_records;

if (log_records == NULL) {
flb_error("[otel] No log records found");
return -1;
}

for (log_record_index=0; log_record_index < scope_log->n_log_records; log_record_index++) {
msgpack_pack_array(mp_pck, 2);
flb_pack_time_now(mp_pck);

log_record = log_records[log_record_index];

ret = otlp_pack_any_value(mp_pck, log_record->body);

if (ret != 0) {
flb_error("[otel] Failed to convert log record body");
return -1;
}
}
}
}
return 0;
}

static int get_token_length(jsmntok_t token){
return token.end - token.start;
}

static char *get_value_from_token(jsmntok_t *tokens,
const char *body,
int pos){
char *tmp;
jsmntok_t token;
int token_len;

token = tokens[pos];
token_len = get_token_length(token);

tmp = flb_calloc(1, token_len + 1);
tmp = memcpy(tmp, body+token.start, token_len);

return tmp;
}

static int json_payload_to_msgpack(msgpack_packer *mp_pck,
const char *body,
size_t len)
{
int n_tokens;
int token_index;
int kv_index;
int result;

char *key;
char *otel_value_type;
char *otel_log_record;

jsmn_parser parser;
jsmntok_t tokens[1024];
jsmntok_t token;

result = 0;

jsmn_init(&parser);
n_tokens = jsmn_parse(&parser, body, len, tokens, 1024);

if (n_tokens < 0) {
flb_error("[otel] Failed to parse JSON payload, jsmn error %d", n_tokens);
return -1;
}

// position 0 is the root object, skip it
for (token_index = 1; token_index < n_tokens; token_index++) {
token = tokens[token_index];

switch (token.type) {

case JSMN_OBJECT:
for (kv_index=0; kv_index < token.size; kv_index++) {
key = get_value_from_token(tokens, body, token_index+kv_index+1);

if (strcmp(key, "body") == 0) {
otel_value_type = get_value_from_token(tokens, body, token_index+kv_index+3);
otel_log_record = get_value_from_token(tokens, body, token_index+kv_index+4);

msgpack_pack_array(mp_pck, 2);
flb_pack_time_now(mp_pck);

if (strcasecmp(otel_value_type, "stringvalue") == 0) {
result = otel_pack_string(mp_pck, otel_log_record);
}

else if (strcasecmp(otel_value_type, "intvalue") == 0) {
result = otel_pack_int(mp_pck, atoi(otel_log_record));
}

else if (strcasecmp(otel_value_type, "doublevalue") == 0) {
result = otel_pack_double(mp_pck, atof(otel_log_record));
}

else if (strcasecmp(otel_value_type, "boolvalue") == 0) {
if (strcasecmp(otel_log_record, "true") == 0) {
result = otel_pack_bool(mp_pck, true);
} else {
result = otel_pack_bool(mp_pck, false);
}
}

else if (strcasecmp(otel_value_type, "bytesvalue") == 0){
result = otel_pack_string(mp_pck, otel_log_record);
}

flb_free(otel_value_type);
flb_free(otel_log_record);
}

flb_free(key);
}
break;

default:
break;
}
}
return result;
}

static int process_payload_logs(struct flb_opentelemetry *ctx, struct http_conn *conn,
flb_sds_t tag,
struct mk_http_session *session,
struct mk_http_request *request)
{
int ret;
char *out_buf = NULL;

msgpack_packer mp_pck;
msgpack_sbuffer mp_sbuf;

msgpack_sbuffer_init(&mp_sbuf);
msgpack_packer_init(&mp_pck, &mp_sbuf, msgpack_sbuffer_write);

/* Check if the incoming payload is a valid JSON message and convert it to msgpack */
if (strncasecmp(request->content_type.data,
"application/json",
request->content_type.len) == 0) {
ret = json_payload_to_msgpack(&mp_pck, request->data.data, request->data.len);
}
else if (strncasecmp(request->content_type.data,
"application/x-protobuf",
request->content_type.len) == 0) {
ret = binary_payload_to_msgpack(&mp_pck, (uint8_t *)request->data.data, request->data.len);
}
else {
flb_error("[otel] Unsupported content type %.*s", request->content_type.len, request->content_type.data);
ret = -1;
}

/* release 'out_buf' if it was allocated */
if (out_buf) {
flb_free(out_buf);
}

ctx->ins->event_type = FLB_INPUT_LOGS;

flb_input_log_append(ctx->ins, tag, flb_sds_len(tag), mp_sbuf.data, mp_sbuf.size);

msgpack_sbuffer_destroy(&mp_sbuf);
return ret;
}

static inline int mk_http_point_header(mk_ptr_t *h,
struct mk_http_parser *parser, int key)
{
Expand Down Expand Up @@ -265,9 +597,13 @@ int opentelemetry_prot_handle(struct flb_opentelemetry *ctx, struct http_conn *c
uri[request->uri.len] = '\0';
}

if (strcmp(uri, "/v1/metrics") != 0 && strcmp(uri, "/v1/traces") != 0) {
if (strcmp(uri, "/v1/metrics") != 0 &&
strcmp(uri, "/v1/traces") != 0 &&
strcmp(uri, "/v1/logs") != 0) {

send_response(conn, 400, "error: invalid endpoint\n");
mk_mem_free(uri);

return -1;
}

Expand Down Expand Up @@ -330,6 +666,8 @@ int opentelemetry_prot_handle(struct flb_opentelemetry *ctx, struct http_conn *c
request->_content_length.data = NULL;
}

mk_http_point_header(&request->content_type, &session->parser, MK_HEADER_CONTENT_TYPE);

if (request->method != MK_METHOD_POST) {
flb_sds_destroy(tag);
mk_mem_free(uri);
Expand All @@ -343,6 +681,10 @@ int opentelemetry_prot_handle(struct flb_opentelemetry *ctx, struct http_conn *c
else if (strcmp(uri, "/v1/traces") == 0) {
ret = process_payload_traces(ctx, conn, tag, session, request);
}
else if (strcmp(uri, "/v1/logs") == 0) {
ret = process_payload_logs(ctx, conn, tag, session, request);
}

mk_mem_free(uri);
flb_sds_destroy(tag);
send_response(conn, ctx->successful_response_code, NULL);
Expand Down

0 comments on commit c9637ab

Please sign in to comment.