From c9637abe792796d54273b00a84f9acf3284e97a5 Mon Sep 17 00:00:00 2001 From: Aditya Prajapati Date: Mon, 24 Oct 2022 09:38:08 +0530 Subject: [PATCH] in_opentelemetry: add support for logs (#5928) Signed-off-by: Aditya Prajapati --- plugins/in_opentelemetry/opentelemetry_prot.c | 346 +++++++++++++++++- 1 file changed, 344 insertions(+), 2 deletions(-) diff --git a/plugins/in_opentelemetry/opentelemetry_prot.c b/plugins/in_opentelemetry/opentelemetry_prot.c index 72d97272fd6..fdb87ac6610 100644 --- a/plugins/in_opentelemetry/opentelemetry_prot.c +++ b/plugins/in_opentelemetry/opentelemetry_prot.c @@ -26,11 +26,15 @@ #include #include +#include #include "opentelemetry.h" #include "http_conn.h" #define HTTP_CONTENT_JSON 0 +static int otlp_pack_any_value(msgpack_packer *mp_pck, + Opentelemetry__Proto__Common__V1__AnyValue *body); + static int send_response(struct http_conn *conn, int http_status, char *message) { int len; @@ -90,7 +94,6 @@ static int send_response(struct http_conn *conn, int http_status, char *message) return 0; } - static int process_payload_metrics(struct flb_opentelemetry *ctx, struct http_conn *conn, flb_sds_t tag, struct mk_http_session *session, @@ -213,6 +216,335 @@ static int process_payload_traces(struct flb_opentelemetry *ctx, struct http_con return result; } +static int otel_pack_string(msgpack_packer *mp_pck, char *str) +{ + return msgpack_pack_str_with_body(mp_pck, str, strlen(str)); +} + +static int otel_pack_bool(msgpack_packer *mp_pck, bool val) +{ + if (val) { + return msgpack_pack_true(mp_pck); + } + else { + return msgpack_pack_false(mp_pck); + } +} + +static int otel_pack_int(msgpack_packer *mp_pck, int val) +{ + return msgpack_pack_int64(mp_pck, val); +} + +static int otel_pack_double(msgpack_packer *mp_pck, double val) +{ + return msgpack_pack_double(mp_pck, val); +} + +static int otel_pack_kvlist(msgpack_packer *mp_pck, + Opentelemetry__Proto__Common__V1__KeyValueList *kv_list) +{ + int kv_index; + int ret; + char *key; + Opentelemetry__Proto__Common__V1__AnyValue *value; + + ret = msgpack_pack_map(mp_pck, kv_list->n_values); + if (ret != 0) { + return ret; + } + + for (kv_index = 0; kv_index < kv_list->n_values && ret == 0; kv_index++) { + key = kv_list->values[kv_index]->key; + value = kv_list->values[kv_index]->value; + + ret = otel_pack_string(mp_pck, key); + + if(ret == 0) { + ret = otlp_pack_any_value(mp_pck, value); + } + } + + return ret; +} + +static int otel_pack_array(msgpack_packer *mp_pck, + Opentelemetry__Proto__Common__V1__ArrayValue *array) +{ + int ret; + int array_index; + + for (array_index = 0; array_index < array->n_values && ret == 0; array_index++) { + ret = otlp_pack_any_value(mp_pck, array->values[array_index]); + } + return ret; +} + +static int otel_pack_bytes(msgpack_packer *mp_pck, + ProtobufCBinaryData bytes) +{ + return msgpack_pack_bin_with_body(mp_pck, bytes.data, bytes.len); +} + +static int otlp_pack_any_value(msgpack_packer *mp_pck, + Opentelemetry__Proto__Common__V1__AnyValue *body) +{ + int result; + + result = -2; + + switch(body->value_case){ + case OPENTELEMETRY__PROTO__COMMON__V1__ANY_VALUE__VALUE_STRING_VALUE: + result = otel_pack_string(mp_pck, body->string_value); + break; + + case OPENTELEMETRY__PROTO__COMMON__V1__ANY_VALUE__VALUE_BOOL_VALUE: + result = otel_pack_bool(mp_pck, body->bool_value); + break; + + case OPENTELEMETRY__PROTO__COMMON__V1__ANY_VALUE__VALUE_INT_VALUE: + result = otel_pack_int(mp_pck, body->int_value); + break; + + case OPENTELEMETRY__PROTO__COMMON__V1__ANY_VALUE__VALUE_DOUBLE_VALUE: + result = otel_pack_double(mp_pck, body->double_value); + break; + + case OPENTELEMETRY__PROTO__COMMON__V1__ANY_VALUE__VALUE_ARRAY_VALUE: + result = otel_pack_array(mp_pck, body->array_value); + break; + + case OPENTELEMETRY__PROTO__COMMON__V1__ANY_VALUE__VALUE_KVLIST_VALUE: + result = otel_pack_kvlist(mp_pck, body->kvlist_value); + break; + + case OPENTELEMETRY__PROTO__COMMON__V1__ANY_VALUE__VALUE_BYTES_VALUE: + result = otel_pack_bytes(mp_pck, body->bytes_value); + break; + + default: + break; + } + + if (result == -2) { + flb_error("[otel]: invalid value type in pack_any_value"); + result = -1; + } + + return result; +} + +static int binary_payload_to_msgpack(msgpack_packer *mp_pck, + uint8_t *in_buf, + size_t in_size) +{ + int ret; + int resource_logs_index; + int scope_log_index; + int log_record_index; + + Opentelemetry__Proto__Collector__Logs__V1__ExportLogsServiceRequest *input_logs; + Opentelemetry__Proto__Logs__V1__ScopeLogs **scope_logs; + Opentelemetry__Proto__Logs__V1__ScopeLogs *scope_log; + Opentelemetry__Proto__Logs__V1__ResourceLogs **resource_logs; + Opentelemetry__Proto__Logs__V1__ResourceLogs *resource_log; + Opentelemetry__Proto__Logs__V1__LogRecord **log_records; + Opentelemetry__Proto__Logs__V1__LogRecord *log_record; + + input_logs = opentelemetry__proto__collector__logs__v1__export_logs_service_request__unpack(NULL, in_size, in_buf); + if (input_logs == NULL) { + flb_error("[otel] Failed to unpack input logs"); + return -1; + } + + resource_logs = input_logs->resource_logs; + if (resource_logs == NULL) { + flb_error("[otel] No resource logs found"); + return -1; + } + + for (resource_logs_index = 0; resource_logs_index < input_logs->n_resource_logs; resource_logs_index++) { + resource_log = resource_logs[resource_logs_index]; + scope_logs = resource_log->scope_logs; + + if (resource_log->n_scope_logs > 0 && scope_logs == NULL) { + flb_error("[otel] No scope logs found"); + return -1; + } + + for (scope_log_index = 0; scope_log_index < resource_log->n_scope_logs; scope_log_index++) { + scope_log = scope_logs[scope_log_index]; + log_records = scope_log->log_records; + + if (log_records == NULL) { + flb_error("[otel] No log records found"); + return -1; + } + + for (log_record_index=0; log_record_index < scope_log->n_log_records; log_record_index++) { + msgpack_pack_array(mp_pck, 2); + flb_pack_time_now(mp_pck); + + log_record = log_records[log_record_index]; + + ret = otlp_pack_any_value(mp_pck, log_record->body); + + if (ret != 0) { + flb_error("[otel] Failed to convert log record body"); + return -1; + } + } + } + } + return 0; +} + +static int get_token_length(jsmntok_t token){ + return token.end - token.start; +} + +static char *get_value_from_token(jsmntok_t *tokens, + const char *body, + int pos){ + char *tmp; + jsmntok_t token; + int token_len; + + token = tokens[pos]; + token_len = get_token_length(token); + + tmp = flb_calloc(1, token_len + 1); + tmp = memcpy(tmp, body+token.start, token_len); + + return tmp; +} + +static int json_payload_to_msgpack(msgpack_packer *mp_pck, + const char *body, + size_t len) +{ + int n_tokens; + int token_index; + int kv_index; + int result; + + char *key; + char *otel_value_type; + char *otel_log_record; + + jsmn_parser parser; + jsmntok_t tokens[1024]; + jsmntok_t token; + + result = 0; + + jsmn_init(&parser); + n_tokens = jsmn_parse(&parser, body, len, tokens, 1024); + + if (n_tokens < 0) { + flb_error("[otel] Failed to parse JSON payload, jsmn error %d", n_tokens); + return -1; + } + + // position 0 is the root object, skip it + for (token_index = 1; token_index < n_tokens; token_index++) { + token = tokens[token_index]; + + switch (token.type) { + + case JSMN_OBJECT: + for (kv_index=0; kv_index < token.size; kv_index++) { + key = get_value_from_token(tokens, body, token_index+kv_index+1); + + if (strcmp(key, "body") == 0) { + otel_value_type = get_value_from_token(tokens, body, token_index+kv_index+3); + otel_log_record = get_value_from_token(tokens, body, token_index+kv_index+4); + + msgpack_pack_array(mp_pck, 2); + flb_pack_time_now(mp_pck); + + if (strcasecmp(otel_value_type, "stringvalue") == 0) { + result = otel_pack_string(mp_pck, otel_log_record); + } + + else if (strcasecmp(otel_value_type, "intvalue") == 0) { + result = otel_pack_int(mp_pck, atoi(otel_log_record)); + } + + else if (strcasecmp(otel_value_type, "doublevalue") == 0) { + result = otel_pack_double(mp_pck, atof(otel_log_record)); + } + + else if (strcasecmp(otel_value_type, "boolvalue") == 0) { + if (strcasecmp(otel_log_record, "true") == 0) { + result = otel_pack_bool(mp_pck, true); + } else { + result = otel_pack_bool(mp_pck, false); + } + } + + else if (strcasecmp(otel_value_type, "bytesvalue") == 0){ + result = otel_pack_string(mp_pck, otel_log_record); + } + + flb_free(otel_value_type); + flb_free(otel_log_record); + } + + flb_free(key); + } + break; + + default: + break; + } + } + return result; +} + +static int process_payload_logs(struct flb_opentelemetry *ctx, struct http_conn *conn, + flb_sds_t tag, + struct mk_http_session *session, + struct mk_http_request *request) +{ + int ret; + char *out_buf = NULL; + + msgpack_packer mp_pck; + msgpack_sbuffer mp_sbuf; + + msgpack_sbuffer_init(&mp_sbuf); + msgpack_packer_init(&mp_pck, &mp_sbuf, msgpack_sbuffer_write); + + /* Check if the incoming payload is a valid JSON message and convert it to msgpack */ + if (strncasecmp(request->content_type.data, + "application/json", + request->content_type.len) == 0) { + ret = json_payload_to_msgpack(&mp_pck, request->data.data, request->data.len); + } + else if (strncasecmp(request->content_type.data, + "application/x-protobuf", + request->content_type.len) == 0) { + ret = binary_payload_to_msgpack(&mp_pck, (uint8_t *)request->data.data, request->data.len); + } + else { + flb_error("[otel] Unsupported content type %.*s", request->content_type.len, request->content_type.data); + ret = -1; + } + + /* release 'out_buf' if it was allocated */ + if (out_buf) { + flb_free(out_buf); + } + + ctx->ins->event_type = FLB_INPUT_LOGS; + + flb_input_log_append(ctx->ins, tag, flb_sds_len(tag), mp_sbuf.data, mp_sbuf.size); + + msgpack_sbuffer_destroy(&mp_sbuf); + return ret; +} + static inline int mk_http_point_header(mk_ptr_t *h, struct mk_http_parser *parser, int key) { @@ -265,9 +597,13 @@ int opentelemetry_prot_handle(struct flb_opentelemetry *ctx, struct http_conn *c uri[request->uri.len] = '\0'; } - if (strcmp(uri, "/v1/metrics") != 0 && strcmp(uri, "/v1/traces") != 0) { + if (strcmp(uri, "/v1/metrics") != 0 && + strcmp(uri, "/v1/traces") != 0 && + strcmp(uri, "/v1/logs") != 0) { + send_response(conn, 400, "error: invalid endpoint\n"); mk_mem_free(uri); + return -1; } @@ -330,6 +666,8 @@ int opentelemetry_prot_handle(struct flb_opentelemetry *ctx, struct http_conn *c request->_content_length.data = NULL; } + mk_http_point_header(&request->content_type, &session->parser, MK_HEADER_CONTENT_TYPE); + if (request->method != MK_METHOD_POST) { flb_sds_destroy(tag); mk_mem_free(uri); @@ -343,6 +681,10 @@ int opentelemetry_prot_handle(struct flb_opentelemetry *ctx, struct http_conn *c else if (strcmp(uri, "/v1/traces") == 0) { ret = process_payload_traces(ctx, conn, tag, session, request); } + else if (strcmp(uri, "/v1/logs") == 0) { + ret = process_payload_logs(ctx, conn, tag, session, request); + } + mk_mem_free(uri); flb_sds_destroy(tag); send_response(conn, ctx->successful_response_code, NULL);