From 18e0041e5a6ead89f89368b8d9f52759bbc178c1 Mon Sep 17 00:00:00 2001 From: Eduardo Silva Date: Tue, 13 Aug 2024 13:44:42 -0600 Subject: [PATCH 1/7] out_datadog: use cat_safe for remapped tags (CID 253605) Signed-off-by: Eduardo Silva --- plugins/out_datadog/datadog.c | 17 ++++++++--------- 1 file changed, 8 insertions(+), 9 deletions(-) diff --git a/plugins/out_datadog/datadog.c b/plugins/out_datadog/datadog.c index 1f623662fe7..86ff6b51a2a 100644 --- a/plugins/out_datadog/datadog.c +++ b/plugins/out_datadog/datadog.c @@ -120,9 +120,9 @@ static int datadog_format(struct flb_config *config, /* in normal flush callback we have the event_chunk set as flush context * so we don't need to calculate the event len. * But in test mode the formatter won't get the event_chunk as flush_ctx - */ + */ if (flush_ctx != NULL) { - event_chunk = flush_ctx; + event_chunk = flush_ctx; array_size = event_chunk->total_events; } else { array_size = flb_mp_count(data, bytes); @@ -292,25 +292,24 @@ static int datadog_format(struct flb_config *config, /* here we concatenate ctx->dd_tags and remapped_tags, depending on their presence */ if (remap_cnt) { if (ctx->dd_tags != NULL) { - tmp = flb_sds_cat(remapped_tags, FLB_DATADOG_TAG_SEPERATOR, - strlen(FLB_DATADOG_TAG_SEPERATOR)); - if (!tmp) { + ret = flb_sds_cat_safe(&remapped_tags, FLB_DATADOG_TAG_SEPERATOR, + strlen(FLB_DATADOG_TAG_SEPERATOR)); + if (ret < 0) { flb_errno(); flb_sds_destroy(remapped_tags); msgpack_sbuffer_destroy(&mp_sbuf); flb_log_event_decoder_destroy(&log_decoder); return -1; } - remapped_tags = tmp; - flb_sds_cat(remapped_tags, ctx->dd_tags, strlen(ctx->dd_tags)); - if (!tmp) { + + ret = flb_sds_cat_safe(&remapped_tags, ctx->dd_tags, strlen(ctx->dd_tags)); + if (ret < 0) { flb_errno(); flb_sds_destroy(remapped_tags); msgpack_sbuffer_destroy(&mp_sbuf); flb_log_event_decoder_destroy(&log_decoder); return -1; } - remapped_tags = tmp; } dd_msgpack_pack_key_value_str(&mp_pck, FLB_DATADOG_DD_TAGS_KEY, From 9f15a4ce06d9ab16546c81f09581885a6c037292 Mon Sep 17 00:00:00 2001 From: Eduardo Silva Date: Tue, 13 Aug 2024 13:48:34 -0600 Subject: [PATCH 2/7] azure_blob_http: use sds_cat variant (CID 306764 306765) Signed-off-by: Eduardo Silva --- plugins/out_azure_blob/azure_blob_http.c | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/plugins/out_azure_blob/azure_blob_http.c b/plugins/out_azure_blob/azure_blob_http.c index dd6f128a242..01efdb5c073 100644 --- a/plugins/out_azure_blob/azure_blob_http.c +++ b/plugins/out_azure_blob/azure_blob_http.c @@ -236,6 +236,7 @@ flb_sds_t azb_http_canonical_request(struct flb_azure_blob *ctx, can_headers = canonical_headers(c); if (!can_headers) { flb_sds_destroy(can_req); + flb_sds_destroy(tmp); return NULL; } tmp = flb_sds_cat(can_req, can_headers, flb_sds_len(can_headers)); @@ -345,8 +346,8 @@ int azb_http_client_setup(struct flb_azure_blob *ctx, struct flb_http_client *c, auth = flb_sds_create_size(64 + flb_sds_len(can_req)); - flb_sds_cat(auth, ctx->shared_key_prefix, flb_sds_len(ctx->shared_key_prefix)); - flb_sds_cat(auth, can_req, flb_sds_len(can_req)); + flb_sds_cat_safe(&auth, ctx->shared_key_prefix, flb_sds_len(ctx->shared_key_prefix)); + flb_sds_cat_safe(&auth, can_req, flb_sds_len(can_req)); /* Azure header: authorization */ flb_http_add_header(c, "Authorization", 13, auth, flb_sds_len(auth)); From b025357809c492005c0cff5171db4e490a3fbfee Mon Sep 17 00:00:00 2001 From: Eduardo Silva Date: Tue, 13 Aug 2024 13:52:42 -0600 Subject: [PATCH 3/7] out_stackdriver: release log_name on exception (CID 313112) Signed-off-by: Eduardo Silva --- plugins/out_stackdriver/stackdriver.c | 1 + 1 file changed, 1 insertion(+) diff --git a/plugins/out_stackdriver/stackdriver.c b/plugins/out_stackdriver/stackdriver.c index 71c6abe233c..4326683187d 100644 --- a/plugins/out_stackdriver/stackdriver.c +++ b/plugins/out_stackdriver/stackdriver.c @@ -2358,6 +2358,7 @@ static flb_sds_t stackdriver_format(struct flb_stackdriver *ctx, flb_sds_destroy(operation_id); flb_sds_destroy(operation_producer); flb_sds_destroy(trace); + flb_sds_destroy(log_name); flb_log_event_decoder_destroy(&log_decoder); msgpack_sbuffer_destroy(&mp_sbuf); return NULL; From e8667d691fe60d52a6b3444d29be504879db9f10 Mon Sep 17 00:00:00 2001 From: Eduardo Silva Date: Tue, 13 Aug 2024 13:57:40 -0600 Subject: [PATCH 4/7] in_http: use cat_safe on tag handliong Signed-off-by: Eduardo Silva --- plugins/in_http/http_prot.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/plugins/in_http/http_prot.c b/plugins/in_http/http_prot.c index eae52607b10..4070c159495 100644 --- a/plugins/in_http/http_prot.c +++ b/plugins/in_http/http_prot.c @@ -632,7 +632,7 @@ int http_prot_handle(struct flb_http *ctx, struct http_conn *conn, } /* New tag skipping the URI '/' */ - flb_sds_cat(tag, uri + 1, len - 1); + flb_sds_cat_safe(&tag, uri + 1, len - 1); /* Sanitize, only allow alphanum chars */ for (i = 0; i < flb_sds_len(tag); i++) { From f854989aa1fda15e1bbb602c7c616c04d711bb73 Mon Sep 17 00:00:00 2001 From: Eduardo Silva Date: Tue, 13 Aug 2024 14:00:07 -0600 Subject: [PATCH 5/7] lib: cmetrics: upgrade to v0.9.4 Signed-off-by: Eduardo Silva --- lib/cmetrics/CMakeLists.txt | 2 +- .../include/cmetrics/cmt_decode_statsd.h | 60 ++ lib/cmetrics/src/CMakeLists.txt | 1 + lib/cmetrics/src/cmt_cat.c | 6 +- lib/cmetrics/src/cmt_counter.c | 16 +- .../src/cmt_decode_prometheus_remote_write.c | 1 + lib/cmetrics/src/cmt_decode_statsd.c | 613 ++++++++++++++++++ lib/cmetrics/src/cmt_encode_splunk_hec.c | 4 +- lib/cmetrics/src/cmt_gauge.c | 24 +- lib/cmetrics/src/cmt_histogram.c | 8 +- lib/cmetrics/src/cmt_summary.c | 4 +- lib/cmetrics/src/cmt_untyped.c | 8 +- lib/cmetrics/tests/data/statsd_payload.txt | 13 + lib/cmetrics/tests/decoding.c | 32 + 14 files changed, 754 insertions(+), 38 deletions(-) create mode 100644 lib/cmetrics/include/cmetrics/cmt_decode_statsd.h create mode 100644 lib/cmetrics/src/cmt_decode_statsd.c create mode 100644 lib/cmetrics/tests/data/statsd_payload.txt diff --git a/lib/cmetrics/CMakeLists.txt b/lib/cmetrics/CMakeLists.txt index 6509d7a35f4..f5221d7f524 100644 --- a/lib/cmetrics/CMakeLists.txt +++ b/lib/cmetrics/CMakeLists.txt @@ -6,7 +6,7 @@ set(CMAKE_EXPORT_COMPILE_COMMANDS ON) # CMetrics Version set(CMT_VERSION_MAJOR 0) set(CMT_VERSION_MINOR 9) -set(CMT_VERSION_PATCH 3) +set(CMT_VERSION_PATCH 4) set(CMT_VERSION_STR "${CMT_VERSION_MAJOR}.${CMT_VERSION_MINOR}.${CMT_VERSION_PATCH}") # Include helpers diff --git a/lib/cmetrics/include/cmetrics/cmt_decode_statsd.h b/lib/cmetrics/include/cmetrics/cmt_decode_statsd.h new file mode 100644 index 00000000000..12415fe5da4 --- /dev/null +++ b/lib/cmetrics/include/cmetrics/cmt_decode_statsd.h @@ -0,0 +1,60 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* CMetrics + * ======== + * Copyright 2021-2022 The CMetrics Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +#ifndef CMT_DECODE_STATSD_H +#define CMT_DECODE_STATSD_H + +#include + +#define CMT_DECODE_STATSD_TYPE_COUNTER 1 +#define CMT_DECODE_STATSD_TYPE_GAUGE 2 +#define CMT_DECODE_STATSD_TYPE_TIMER 3 +#define CMT_DECODE_STATSD_TYPE_SET 4 + +#define CMT_DECODE_STATSD_SUCCESS 0 +#define CMT_DECODE_STATSD_ALLOCATION_ERROR 1 +#define CMT_DECODE_STATSD_UNEXPECTED_ERROR 2 +#define CMT_DECODE_STATSD_INVALID_ARGUMENT_ERROR 3 +#define CMT_DECODE_STATSD_UNEXPECTED_METRIC_TYPE 4 +#define CMT_DECODE_STATSD_DECODE_ERROR 5 +#define CMT_DECODE_STATSD_UNPACK_ERROR 6 +#define CMT_DECODE_STATSD_UNSUPPORTED_METRIC_TYPE 7 +#define CMT_DECODE_STATSD_INVALID_TAG_FORMAT_ERROR 8 + +#define CMT_DECODE_STATSD_GAUGE_OBSERVER 1 << 0 + +/* + * The "cmt_statsd_message" represents a single line in UDP packet. + * It's just a bunch of pointers to ephemeral buffer. + */ +struct cmt_statsd_message { + char *bucket; + int bucket_len; + char *value; + char *labels; + int value_len; + int type; + double sample_rate; +}; + +int cmt_decode_statsd_create(struct cmt **out_cmt, char *in_buf, size_t in_size, int flags); +void cmt_decode_statsd_destroy(struct cmt *cmt); + +#endif diff --git a/lib/cmetrics/src/CMakeLists.txt b/lib/cmetrics/src/CMakeLists.txt index d45de9b8334..f08667d07a1 100644 --- a/lib/cmetrics/src/CMakeLists.txt +++ b/lib/cmetrics/src/CMakeLists.txt @@ -35,6 +35,7 @@ set(src cmt_encode_influx.c cmt_encode_msgpack.c cmt_decode_msgpack.c + cmt_decode_statsd.c cmt_mpack_utils.c external/remote.pb-c.c external/types.pb-c.c diff --git a/lib/cmetrics/src/cmt_cat.c b/lib/cmetrics/src/cmt_cat.c index af95dcbd73d..b2171196086 100644 --- a/lib/cmetrics/src/cmt_cat.c +++ b/lib/cmetrics/src/cmt_cat.c @@ -346,20 +346,16 @@ int cmt_cat_untyped(struct cmt *cmt, struct cmt_untyped *untyped, int cmt_cat_histogram(struct cmt *cmt, struct cmt_histogram *histogram, struct cmt_map *filtered_map) { - int i; - double val; int ret; char **labels = NULL; struct cmt_map *map; struct cmt_opts *opts; struct cmt_histogram *hist; - uint64_t timestamp; struct cmt_histogram_buckets *buckets; int64_t buckets_count; map = histogram->map; opts = map->opts; - timestamp = cmt_metric_get_timestamp(&map->metric); ret = cmt_cat_copy_label_keys(map, (char **) &labels); if (ret == -1) { @@ -433,6 +429,8 @@ int cmt_cat_summary(struct cmt *cmt, struct cmt_summary *summary, quantiles, map->label_count, labels); if (!sum) { + free(labels); + free(quantiles); return -1; } diff --git a/lib/cmetrics/src/cmt_counter.c b/lib/cmetrics/src/cmt_counter.c index 668e9168ef7..c5afad2a5d0 100644 --- a/lib/cmetrics/src/cmt_counter.c +++ b/lib/cmetrics/src/cmt_counter.c @@ -107,8 +107,8 @@ int cmt_counter_inc(struct cmt_counter *counter, counter->map, labels_count, label_vals, CMT_TRUE); if (!metric) { - cmt_log_error(counter->cmt, "unable to retrieve metric: %s for counter %s_%s_%s", - counter->map, counter->opts.ns, counter->opts.subsystem, + cmt_log_error(counter->cmt, "unable to retrieve metric for counter %s_%s_%s", + counter->opts.ns, counter->opts.subsystem, counter->opts.name); return -1; } @@ -125,8 +125,8 @@ int cmt_counter_add(struct cmt_counter *counter, uint64_t timestamp, double val, counter->map, labels_count, label_vals, CMT_TRUE); if (!metric) { - cmt_log_error(counter->cmt, "unable to retrieve metric: %s for counter %s_%s_%s", - counter->map, counter->opts.ns, counter->opts.subsystem, + cmt_log_error(counter->cmt, "unable to retrieve metric for counter %s_%s_%s", + counter->opts.ns, counter->opts.subsystem, counter->opts.name); return -1; } @@ -144,8 +144,8 @@ int cmt_counter_set(struct cmt_counter *counter, uint64_t timestamp, double val, labels_count, label_vals, CMT_TRUE); if (!metric) { - cmt_log_error(counter->cmt, "unable to retrieve metric: %s for counter %s_%s_%s", - counter->map, counter->opts.ns, counter->opts.subsystem, + cmt_log_error(counter->cmt, "unable to retrieve metric for counter %s_%s_%s", + counter->opts.ns, counter->opts.subsystem, counter->opts.name); return -1; } @@ -170,8 +170,8 @@ int cmt_counter_get_val(struct cmt_counter *counter, counter->map, labels_count, label_vals, &val); if (ret == -1) { - cmt_log_error(counter->cmt, "unable to retrieve metric: %s for counter %s_%s_%s", - counter->map, counter->opts.ns, counter->opts.subsystem, + cmt_log_error(counter->cmt, "unable to retrieve metric for counter %s_%s_%s", + counter->opts.ns, counter->opts.subsystem, counter->opts.name); return -1; } diff --git a/lib/cmetrics/src/cmt_decode_prometheus_remote_write.c b/lib/cmetrics/src/cmt_decode_prometheus_remote_write.c index 1a01f2d08f1..4aa91e676d4 100644 --- a/lib/cmetrics/src/cmt_decode_prometheus_remote_write.c +++ b/lib/cmetrics/src/cmt_decode_prometheus_remote_write.c @@ -663,6 +663,7 @@ int cmt_decode_prometheus_remote_write_create(struct cmt **out_cmt, char *in_buf (uint8_t *) in_buf); if (write == NULL) { result = CMT_DECODE_PROMETHEUS_REMOTE_WRITE_UNPACK_ERROR; + cmt_destroy(cmt); return result; } diff --git a/lib/cmetrics/src/cmt_decode_statsd.c b/lib/cmetrics/src/cmt_decode_statsd.c new file mode 100644 index 00000000000..168f7ecc338 --- /dev/null +++ b/lib/cmetrics/src/cmt_decode_statsd.c @@ -0,0 +1,613 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* CMetrics + * ======== + * Copyright 2021-2024 The CMetrics Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include /* for DBL_EPSILON */ +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +static struct cmt_map_label *create_map_label(char *caption, size_t length) +{ + struct cmt_map_label *map_label; + + map_label = calloc(1, sizeof(struct cmt_map_label)); + if (!map_label) { + return NULL; + } + + if (map_label != NULL) { + if (caption != NULL) { + if (length == 0) { + length = strlen(caption); + } + + map_label->name = cfl_sds_create_len(caption, length); + + if (map_label->name == NULL) { + cmt_errno(); + + free(map_label); + + map_label = NULL; + } + } + } + + return map_label; +} + +static int append_new_map_label_key(struct cmt_map *map, char *name) +{ + struct cmt_map_label *label; + + label = create_map_label(name, 0); + + if (label == NULL) { + return CMT_DECODE_STATSD_ALLOCATION_ERROR; + } + + cfl_list_add(&label->_head, &map->label_keys); + map->label_count++; + + return CMT_DECODE_STATSD_SUCCESS; +} + +static int append_new_metric_label_value(struct cmt_metric *metric, char *name, size_t length) +{ + struct cmt_map_label *label; + + label = create_map_label(name, length); + + if (label == NULL) { + return CMT_DECODE_STATSD_ALLOCATION_ERROR; + } + + cfl_list_add(&label->_head, &metric->labels); + + return CMT_DECODE_STATSD_SUCCESS; +} + +static int is_incremental(char *str) +{ + return (*str == '+' || *str == '-'); +} + +static int decode_labels(struct cmt *cmt, + struct cmt_map *map, + struct cmt_metric *metric, + char *labels, int incremental) +{ + void **value_index_list; + size_t map_label_index; + size_t map_label_count; + struct cfl_list *label_iterator; + struct cmt_map_label *current_label; + size_t label_index; + int label_found; + char *label_kv, *colon; + cfl_sds_t label_k, label_v, tmp; + int result; + struct cfl_list *head = NULL; + struct cfl_list *kvs = NULL; + struct cfl_split_entry *cur = NULL; + + result = CMT_DECODE_STATSD_SUCCESS; + + value_index_list = calloc(128, sizeof(void *)); + + if (value_index_list == NULL) { + return CMT_DECODE_STATSD_ALLOCATION_ERROR; + } + + label_found = CMT_FALSE; + label_index = 0; + + if (incremental) { + label_k = cfl_sds_create("incremental"); + if (label_k != NULL) { + result = append_new_map_label_key(map, label_k); + cfl_sds_destroy(label_k); + + if (result == CMT_DECODE_STATSD_SUCCESS) { + tmp = (void *) cfl_sds_create("true"); + if (tmp != NULL) { + value_index_list[label_index] = tmp; + } + } + } + } + + if (labels != NULL) { + kvs = cfl_utils_split(labels, ',', -1 ); + if (kvs == NULL) { + goto split_error; + } + + cfl_list_foreach(head, kvs) { + retry: + cur = cfl_list_entry(head, struct cfl_split_entry, _head); + label_kv = cur->value; + + colon = strchr(label_kv, ':'); + if (colon == NULL) { + goto retry; + } + label_k = cfl_sds_create_len(label_kv, colon - label_kv); + if (label_k == NULL) { + free(value_index_list); + if (kvs != NULL) { + cfl_utils_split_free(kvs); + } + + return CMT_DECODE_STATSD_INVALID_TAG_FORMAT_ERROR; + } + label_v = cfl_sds_create_len(colon + 1, strlen(label_kv) - strlen(label_k) - 1); + if (label_v == NULL) { + cfl_sds_destroy(label_k); + free(value_index_list); + if (kvs != NULL) { + cfl_utils_split_free(kvs); + } + + return CMT_DECODE_STATSD_INVALID_TAG_FORMAT_ERROR; + } + + cfl_list_foreach(label_iterator, &map->label_keys) { + current_label = cfl_list_entry(label_iterator, struct cmt_map_label, _head); + + if (strcmp(current_label->name, label_k) == 0) { + label_found = CMT_TRUE; + + break; + } + + label_index++; + } + + if (label_index > 127) { + cfl_sds_destroy(label_k); + cfl_sds_destroy(label_v); + free(value_index_list); + if (kvs != NULL) { + cfl_utils_split_free(kvs); + } + + return CMT_DECODE_STATSD_INVALID_ARGUMENT_ERROR; + } + + if (label_found == CMT_FALSE) { + result = append_new_map_label_key(map, label_k); + } + + if (result == CMT_DECODE_STATSD_SUCCESS) { + value_index_list[label_index] = (void *) label_v; + } + + cfl_sds_destroy(label_k); + } + } + +split_error: /* Nop for adding labels */ + + map_label_count = cfl_list_size(&map->label_keys); + + for (map_label_index = 0 ; + result == CMT_DECODE_STATSD_SUCCESS && + map_label_index < map_label_count ; + map_label_index++) { + + if (value_index_list[map_label_index] != NULL) { + label_v = (char *) value_index_list[map_label_index]; + result = append_new_metric_label_value(metric, label_v, 0); + } + } + + for (map_label_index = 0 ; + result == CMT_DECODE_STATSD_SUCCESS && + map_label_index < map_label_count ; + map_label_index++) { + label_v = (cfl_sds_t) value_index_list[map_label_index]; + cfl_sds_destroy(label_v); + } + + if (kvs != NULL) { + cfl_utils_split_free(kvs); + } + + free(value_index_list); + + return result; +} + +static int decode_numerical_message(struct cmt *cmt, + struct cmt_map *map, + struct cmt_statsd_message *m) +{ + struct cmt_metric *metric; + int result; + uint64_t ts; + int incremental = 0; + + ts = cfl_time_now(); + + result = CMT_DECODE_STATSD_SUCCESS; + + metric = calloc(1, sizeof(struct cmt_metric)); + + if (metric == NULL) { + return CMT_DECODE_STATSD_ALLOCATION_ERROR; + } + + cfl_list_init(&metric->labels); + + incremental = is_incremental(m->value); + + result = decode_labels(cmt, + map, + metric, + m->labels, + incremental); + + if (result) { + destroy_label_list(&metric->labels); + + free(metric); + } + else { + cfl_list_add(&metric->_head, &map->metrics); + } + + if (result == CMT_DECODE_STATSD_SUCCESS) { + if ((m->sample_rate - 0.0) > DBL_EPSILON && + (1.0 - m->sample_rate) > DBL_EPSILON) { + cmt_metric_set(metric, ts, strtod(m->value, NULL) / m->sample_rate); + } + else { + cmt_metric_set(metric, ts, strtod(m->value, NULL)); + } + } + + return result; +} + +static int decode_counter_entry(struct cmt *cmt, + void *instance, + struct cmt_statsd_message *m) +{ + struct cmt_counter *counter; + int result; + + result = CMT_DECODE_STATSD_SUCCESS; + + counter = (struct cmt_counter *) instance; + + counter->map->metric_static_set = 0; + + result = decode_numerical_message(cmt, + counter->map, + m); + + return result; +} + +static int decode_gauge_entry(struct cmt *cmt, + void *instance, + struct cmt_statsd_message *m) +{ + struct cmt_gauge *gauge; + int result; + + result = CMT_DECODE_STATSD_SUCCESS; + + gauge = (struct cmt_gauge *) instance; + + gauge->map->metric_static_set = 0; + + result = decode_numerical_message(cmt, + gauge->map, + m); + + return result; +} + +static int decode_untyped_entry(struct cmt *cmt, + void *instance, + struct cmt_statsd_message *m) +{ + struct cmt_untyped *untyped; + int result; + + result = CMT_DECODE_STATSD_SUCCESS; + + untyped = (struct cmt_untyped *) instance; + + untyped->map->metric_static_set = 0; + + result = decode_numerical_message(cmt, + untyped->map, + m); + + return result; +} + +static int decode_statsd_message(struct cmt *cmt, + struct cmt_statsd_message *m, + int flags) +{ + char *metric_name = NULL; + char *metric_subsystem = NULL; + char *metric_namespace = NULL; + char *metric_description = NULL; + void *instance; + int result; + + result = CMT_DECODE_STATSD_SUCCESS; + + metric_description = "-"; + metric_name = cfl_sds_create_len(m->bucket, m->bucket_len); + if (metric_name == NULL) { + return CMT_DECODE_STATSD_ALLOCATION_ERROR; + } + metric_namespace = ""; + metric_subsystem = ""; + + switch (m->type) { + case CMT_DECODE_STATSD_TYPE_COUNTER: + instance = cmt_counter_create(cmt, + metric_namespace, + metric_subsystem, + metric_name, + metric_description, + 0, NULL); + + if (instance == NULL) { + cfl_sds_destroy(metric_name); + return CMT_DECODE_STATSD_ALLOCATION_ERROR; + } + + result = decode_counter_entry(cmt, instance, m); + + if (result) { + cfl_sds_destroy(metric_name); + cmt_counter_destroy(instance); + } + break; + case CMT_DECODE_STATSD_TYPE_GAUGE: + instance = cmt_gauge_create(cmt, + metric_namespace, + metric_subsystem, + metric_name, + metric_description, + 0, NULL); + + if (instance == NULL) { + cfl_sds_destroy(metric_name); + return CMT_DECODE_STATSD_ALLOCATION_ERROR; + } + + result = decode_gauge_entry(cmt, instance, m); + + if (result) { + cfl_sds_destroy(metric_name); + cmt_gauge_destroy(instance); + } + break; + case CMT_DECODE_STATSD_TYPE_SET: + /* Set type will be translated as an untyped */ + instance = cmt_untyped_create(cmt, + metric_namespace, + metric_subsystem, + metric_name, + metric_description, + 0, NULL); + + if (instance == NULL) { + cfl_sds_destroy(metric_name); + return CMT_DECODE_STATSD_ALLOCATION_ERROR; + } + + result = decode_untyped_entry(cmt, instance, m); + + if (result) { + cfl_sds_destroy(metric_name); + cmt_untyped_destroy(instance); + } + break; + case CMT_DECODE_STATSD_TYPE_TIMER: + /* TODO: Add histogram observer */ + if (flags & CMT_DECODE_STATSD_GAUGE_OBSERVER) { + instance = cmt_gauge_create(cmt, + metric_namespace, + metric_subsystem, + metric_name, + metric_description, + 0, NULL); + + if (instance == NULL) { + cfl_sds_destroy(metric_name); + return CMT_DECODE_STATSD_ALLOCATION_ERROR; + } + + result = decode_gauge_entry(cmt, instance, m); + + if (result) { + cfl_sds_destroy(metric_name); + cmt_gauge_destroy(instance); + } + } + break; + default: + result = CMT_DECODE_STATSD_UNSUPPORTED_METRIC_TYPE; + break; + } + + cfl_sds_destroy(metric_name); + + return result; +} + +static int cmt_get_statsd_type(char *str) +{ + switch (*str) { + case 'g': + return CMT_DECODE_STATSD_TYPE_GAUGE; + case 's': + return CMT_DECODE_STATSD_TYPE_SET; + case 'c': + return CMT_DECODE_STATSD_TYPE_COUNTER; + case 'm': + if (*(str + 1) == 's') { + return CMT_DECODE_STATSD_TYPE_TIMER; + } + } + return CMT_DECODE_STATSD_TYPE_COUNTER; +} + +static int statsd_process_line(struct cmt *cmt, char *line, int flags) +{ + char *colon = NULL, *bar = NULL, *atmark = NULL, *labels = NULL; + struct cmt_statsd_message m = {0}; + + /* + * bucket:value|type|@sample_rate|#key1:value1,key2:value2,... + * ------ + */ + colon = strchr(line, ':'); + if (colon == NULL) { + return CMT_DECODE_STATSD_INVALID_ARGUMENT_ERROR; + } + m.bucket = line; + m.bucket_len = (colon - line); + + /* + * bucket:value|type|@sample_rate|#key1:value1,key2:value2,... + * ---- + */ + bar = strchr(colon + 1, '|'); + if (bar == NULL) { + return CMT_DECODE_STATSD_INVALID_ARGUMENT_ERROR; + } + m.type = cmt_get_statsd_type(bar + 1); + + /* + * bucket:value|type|@sample_rate|#key1:value1,key2:value2,... + * ----- + */ + m.value = colon + 1; + m.value_len = (bar - colon - 1); + + /* + * bucket:value|type|@sample_rate|#key1:value1,key2:value2,... + * ------------ + */ + atmark = strstr(bar + 1, "|@"); + if (atmark == NULL || atof(atmark + 2) == 0) { + m.sample_rate = 1.0; + } + else { + m.sample_rate = atof(atmark + 2); + } + + /* + * bucket:value|type|@sample_rate|#key1:value1,key2:value2,... + * ------------ + */ + labels = strstr(bar + 1, "|#"); + if (labels != NULL) { + m.labels = labels + 2; + } + + return decode_statsd_message(cmt, &m, flags); +} + +static int decode_metrics_lines(struct cmt *cmt, + char *in_buf, size_t in_size, + int flags) +{ + int ret = CMT_DECODE_STATSD_SUCCESS; + struct cfl_list *head = NULL; + struct cfl_list *kvs = NULL; + struct cfl_split_entry *cur = NULL; + + kvs = cfl_utils_split(in_buf, '\n', -1 ); + if (kvs == NULL) { + goto split_error; + } + + cfl_list_foreach(head, kvs) { +retry: + cur = cfl_list_entry(head, struct cfl_split_entry, _head); + /* StatsD format always has | at least one. */ + if (strstr(cur->value, "|") == NULL) { + goto retry; + } + + ret = statsd_process_line(cmt, cur->value, flags); + if (ret != CMT_DECODE_STATSD_SUCCESS) { + ret = CMT_DECODE_STATSD_DECODE_ERROR; + + break; + } + } + + if (kvs != NULL) { + cfl_utils_split_free(kvs); + } + + return ret; + +split_error: + return -1; +} + +int cmt_decode_statsd_create(struct cmt **out_cmt, char *in_buf, size_t in_size, int flags) +{ + int result = CMT_DECODE_STATSD_INVALID_ARGUMENT_ERROR; + struct cmt *cmt = NULL; + + cmt = cmt_create(); + + if (cmt == NULL) { + return CMT_DECODE_STATSD_ALLOCATION_ERROR; + } + + result = decode_metrics_lines(cmt, in_buf, in_size, flags); + if (result != CMT_DECODE_STATSD_SUCCESS) { + cmt_destroy(cmt); + result = CMT_DECODE_STATSD_DECODE_ERROR; + + return result; + } + + *out_cmt = cmt; + + return result; +} + +void cmt_decode_statsd_destroy(struct cmt *cmt) +{ + cmt_destroy(cmt); +} diff --git a/lib/cmetrics/src/cmt_encode_splunk_hec.c b/lib/cmetrics/src/cmt_encode_splunk_hec.c index a2db9ae83ce..3cbc9663396 100644 --- a/lib/cmetrics/src/cmt_encode_splunk_hec.c +++ b/lib/cmetrics/src/cmt_encode_splunk_hec.c @@ -601,14 +601,12 @@ static struct cmt_splunk_hec_context context = calloc(1, sizeof(struct cmt_splunk_hec_context)); if (context == NULL) { result = CMT_ENCODE_SPLUNK_HEC_ALLOCATION_ERROR; - goto cleanup; } /* host parameter is mandatory. */ if (host == NULL) { result = CMT_ENCODE_SPLUNK_HEC_INVALID_ARGUMENT_ERROR; - goto cleanup; } @@ -634,7 +632,6 @@ static struct cmt_splunk_hec_context if (result != CMT_ENCODE_SPLUNK_HEC_SUCCESS) { if (context != NULL) { destroy_splunk_hec_context(context); - context = NULL; } } @@ -664,6 +661,7 @@ cfl_sds_t cmt_encode_splunk_hec_create(struct cmt *cmt, const char *host, /* Allocate a 1KB of buffer */ buf = cfl_sds_create_size(1024); if (!buf) { + destroy_splunk_hec_context(context); return NULL; } diff --git a/lib/cmetrics/src/cmt_gauge.c b/lib/cmetrics/src/cmt_gauge.c index 16c8f48b48e..4b89b41784b 100644 --- a/lib/cmetrics/src/cmt_gauge.c +++ b/lib/cmetrics/src/cmt_gauge.c @@ -99,8 +99,8 @@ int cmt_gauge_set(struct cmt_gauge *gauge, uint64_t timestamp, double val, metric = cmt_map_metric_get(&gauge->opts, gauge->map, labels_count, label_vals, CMT_TRUE); if (!metric) { - cmt_log_error(gauge->cmt, "unable to retrieve metric: %s for gauge %s_%s_%s", - gauge->map, gauge->opts.ns, gauge->opts.subsystem, + cmt_log_error(gauge->cmt, "unable to retrieve metric for gauge %s_%s_%s", + gauge->opts.ns, gauge->opts.subsystem, gauge->opts.name); return -1; } @@ -117,8 +117,8 @@ int cmt_gauge_inc(struct cmt_gauge *gauge, uint64_t timestamp, metric = cmt_map_metric_get(&gauge->opts, gauge->map, labels_count, label_vals, CMT_TRUE); if (!metric) { - cmt_log_error(gauge->cmt, "unable to retrieve metric: %s for gauge %s_%s_%s", - gauge->map, gauge->opts.ns, gauge->opts.subsystem, + cmt_log_error(gauge->cmt, "unable to retrieve metric for gauge %s_%s_%s", + gauge->opts.ns, gauge->opts.subsystem, gauge->opts.name); return -1; } @@ -134,8 +134,8 @@ int cmt_gauge_dec(struct cmt_gauge *gauge, uint64_t timestamp, metric = cmt_map_metric_get(&gauge->opts, gauge->map, labels_count, label_vals, CMT_TRUE); if (!metric) { - cmt_log_error(gauge->cmt, "unable to retrieve metric: %s for gauge %s_%s_%s", - gauge->map, gauge->opts.ns, gauge->opts.subsystem, + cmt_log_error(gauge->cmt, "unable to retrieve metric for gauge %s_%s_%s", + gauge->opts.ns, gauge->opts.subsystem, gauge->opts.name); return -1; } @@ -151,8 +151,8 @@ int cmt_gauge_add(struct cmt_gauge *gauge, uint64_t timestamp, double val, metric = cmt_map_metric_get(&gauge->opts, gauge->map, labels_count, label_vals, CMT_TRUE); if (!metric) { - cmt_log_error(gauge->cmt, "unable to retrieve metric: %s for gauge %s_%s_%s", - gauge->map, gauge->opts.ns, gauge->opts.subsystem, + cmt_log_error(gauge->cmt, "unable to retrieve metric for gauge %s_%s_%s", + gauge->opts.ns, gauge->opts.subsystem, gauge->opts.name); return -1; } @@ -168,8 +168,8 @@ int cmt_gauge_sub(struct cmt_gauge *gauge, uint64_t timestamp, double val, metric = cmt_map_metric_get(&gauge->opts, gauge->map, labels_count, label_vals, CMT_TRUE); if (!metric) { - cmt_log_error(gauge->cmt, "unable to retrieve metric: %s for gauge %s_%s_%s", - gauge->map, gauge->opts.ns, gauge->opts.subsystem, + cmt_log_error(gauge->cmt, "unable to retrieve metric for gauge %s_%s_%s", + gauge->opts.ns, gauge->opts.subsystem, gauge->opts.name); return -1; } @@ -188,8 +188,8 @@ int cmt_gauge_get_val(struct cmt_gauge *gauge, &val); if (ret == -1) { cmt_log_error(gauge->cmt, - "unable to retrieve metric value: %s for gauge %s_%s_%s", - gauge->map, gauge->opts.ns, gauge->opts.subsystem, + "unable to retrieve metric value for gauge %s_%s_%s", + gauge->opts.ns, gauge->opts.subsystem, gauge->opts.name); return -1; } diff --git a/lib/cmetrics/src/cmt_histogram.c b/lib/cmetrics/src/cmt_histogram.c index ae895f3ade7..df4a5c8f4f0 100644 --- a/lib/cmetrics/src/cmt_histogram.c +++ b/lib/cmetrics/src/cmt_histogram.c @@ -341,8 +341,8 @@ int cmt_histogram_observe(struct cmt_histogram *histogram, uint64_t timestamp, metric = histogram_get_metric(histogram, labels_count, label_vals); if (!metric) { cmt_log_error(histogram->cmt, - "unable to retrieve metric: %s for histogram %s_%s_%s", - histogram->map, histogram->opts.ns, histogram->opts.subsystem, + "unable to retrieve metric for histogram %s_%s_%s", + histogram->opts.ns, histogram->opts.subsystem, histogram->opts.name); return -1; } @@ -381,8 +381,8 @@ int cmt_histogram_set_default(struct cmt_histogram *histogram, metric = histogram_get_metric(histogram, labels_count, label_vals); if (!metric) { cmt_log_error(histogram->cmt, - "unable to retrieve metric: %s for histogram %s_%s_%s", - histogram->map, histogram->opts.ns, histogram->opts.subsystem, + "unable to retrieve metric for histogram %s_%s_%s", + histogram->opts.ns, histogram->opts.subsystem, histogram->opts.name); return -1; } diff --git a/lib/cmetrics/src/cmt_summary.c b/lib/cmetrics/src/cmt_summary.c index 314fbfbefc3..f2a9eec0450 100644 --- a/lib/cmetrics/src/cmt_summary.c +++ b/lib/cmetrics/src/cmt_summary.c @@ -272,8 +272,8 @@ int cmt_summary_set_default(struct cmt_summary *summary, labels_count, label_vars, CMT_TRUE); if (!metric) { - cmt_log_error(summary->cmt, "unable to retrieve metric: %s for summary %s_%s_%s", - summary->map, summary->opts.ns, summary->opts.subsystem, + cmt_log_error(summary->cmt, "unable to retrieve metric for summary %s_%s_%s", + summary->opts.ns, summary->opts.subsystem, summary->opts.name); return -1; } diff --git a/lib/cmetrics/src/cmt_untyped.c b/lib/cmetrics/src/cmt_untyped.c index 24af3990a0a..2df6ea1413a 100644 --- a/lib/cmetrics/src/cmt_untyped.c +++ b/lib/cmetrics/src/cmt_untyped.c @@ -106,8 +106,8 @@ int cmt_untyped_set(struct cmt_untyped *untyped, uint64_t timestamp, double val, labels_count, label_vals, CMT_TRUE); if (!metric) { - cmt_log_error(untyped->cmt, "unable to retrieve metric: %s for untyped %s_%s_%s", - untyped->map, untyped->opts.ns, untyped->opts.subsystem, + cmt_log_error(untyped->cmt, "unable to retrieve metric for untyped %s_%s_%s", + untyped->opts.ns, untyped->opts.subsystem, untyped->opts.name); return -1; } @@ -130,8 +130,8 @@ int cmt_untyped_get_val(struct cmt_untyped *untyped, &val); if (ret == -1) { cmt_log_error(untyped->cmt, - "unable to retrieve metric value: %s for untyped %s_%s_%s", - untyped->map, untyped->opts.ns, untyped->opts.subsystem, + "unable to retrieve metric value for untyped %s_%s_%s", + untyped->opts.ns, untyped->opts.subsystem, untyped->opts.name); return -1; } diff --git a/lib/cmetrics/tests/data/statsd_payload.txt b/lib/cmetrics/tests/data/statsd_payload.txt new file mode 100644 index 00000000000..d71221d68d7 --- /dev/null +++ b/lib/cmetrics/tests/data/statsd_payload.txt @@ -0,0 +1,13 @@ +statsdTestMetric011:5000|g|#mykey:myvalue,mykey2:othervalue +statsdTestMetric012:400|s|@0.125|#mykey:myvalue +statsdTestMetric013:+500|g|#mykey:myvalue +statsdTestMetric014:-400|g|#mykey:myvalue +statsdTestMetric015:+2|g|#mykey:myvalue +statsdTestMetric016:-1|g|@0.1|#mykey:myvalue +statsdTestMetric021:365|g|#mykey:myvalue +statsdTestMetric022:+300|c|#mykey:myvalue +statsdTestMetric023:-200|s|#mykey:myvalue +statsdTestMetric024:200|g|#mykey:myvalue +expohisto:1|ms|#mykey:myvalue +expohisto:0|ms|#mykey:myvalue +expohisto:-1|ms|#mykey:myvalue diff --git a/lib/cmetrics/tests/decoding.c b/lib/cmetrics/tests/decoding.c index 358ca531367..6c70811698c 100644 --- a/lib/cmetrics/tests/decoding.c +++ b/lib/cmetrics/tests/decoding.c @@ -28,6 +28,7 @@ #include #include #include +#include #include "cmt_tests.h" @@ -197,9 +198,40 @@ void test_prometheus_remote_write() cfl_sds_destroy(payload); } +void test_statsd() +{ + int ret; + struct cmt *decoded_context; + cfl_sds_t payload = read_file(CMT_TESTS_DATA_PATH "/statsd_payload.txt"); + size_t len = 0; + cfl_sds_t text = NULL; + int flags = 0; + + /* For strtok_r, fill the last byte as \0. */ + len = cfl_sds_len(payload); + cfl_sds_set_len(payload, len + 1); + payload[len] = '\0'; + + cmt_initialize(); + + flags |= CMT_DECODE_STATSD_GAUGE_OBSERVER; + + ret = cmt_decode_statsd_create(&decoded_context, payload, cfl_sds_len(payload), flags); + TEST_CHECK(ret == CMT_DECODE_PROMETHEUS_REMOTE_WRITE_SUCCESS); + text = cmt_encode_prometheus_create(decoded_context, CMT_FALSE); + + printf("%s\n", text); + cmt_encode_prometheus_destroy(text); + + cmt_decode_statsd_destroy(decoded_context); + + cfl_sds_destroy(payload); +} + TEST_LIST = { {"opentelemetry", test_opentelemetry}, {"prometheus_remote_write", test_prometheus_remote_write}, + {"statsd", test_statsd}, { 0 } }; From f7414034f030d9343a22e22da146e570aeba3d37 Mon Sep 17 00:00:00 2001 From: Eduardo Silva Date: Tue, 13 Aug 2024 14:04:06 -0600 Subject: [PATCH 6/7] in_splunk: release uri on exception (CID 507867) Signed-off-by: Eduardo Silva --- plugins/in_splunk/splunk_prot.c | 23 ++++++++++++----------- 1 file changed, 12 insertions(+), 11 deletions(-) diff --git a/plugins/in_splunk/splunk_prot.c b/plugins/in_splunk/splunk_prot.c index dd3258468a6..a6f9be57ed3 100644 --- a/plugins/in_splunk/splunk_prot.c +++ b/plugins/in_splunk/splunk_prot.c @@ -741,6 +741,7 @@ int splunk_prot_handle(struct flb_splunk *ctx, struct splunk_conn *conn, if (ctx->ins->tag && !ctx->ins->tag_default) { tag = flb_sds_create(ctx->ins->tag); if (tag == NULL) { + mk_mem_free(uri); return -1; } } @@ -901,8 +902,8 @@ int splunk_prot_handle_error(struct flb_splunk *ctx, struct splunk_conn *conn, /* New gen HTTP server */ -static int send_response_ng(struct flb_http_response *response, - int http_status, +static int send_response_ng(struct flb_http_response *response, + int http_status, char *message) { flb_http_response_set_status(response, http_status); @@ -921,8 +922,8 @@ static int send_response_ng(struct flb_http_response *response, } if (message != NULL) { - flb_http_response_set_body(response, - (unsigned char *) message, + flb_http_response_set_body(response, + (unsigned char *) message, strlen(message)); } @@ -931,8 +932,8 @@ static int send_response_ng(struct flb_http_response *response, return 0; } -static int send_json_message_response_ng(struct flb_http_response *response, - int http_status, +static int send_json_message_response_ng(struct flb_http_response *response, + int http_status, char *message) { flb_http_response_set_status(response, http_status); @@ -950,13 +951,13 @@ static int send_json_message_response_ng(struct flb_http_response *response, flb_http_response_set_message(response, "Bad Request"); } - flb_http_response_set_header(response, + flb_http_response_set_header(response, "content-type", 0, "application/json", 0); if (message != NULL) { - flb_http_response_set_body(response, - (unsigned char *) message, + flb_http_response_set_body(response, + (unsigned char *) message, strlen(message)); } @@ -1099,7 +1100,7 @@ int splunk_prot_handle_ng(struct flb_http_request *request, } /* HTTP/1.1 needs Host header */ - if (request->protocol_version == HTTP_PROTOCOL_HTTP1 && + if (request->protocol_version == HTTP_PROTOCOL_HTTP1 && request->host == NULL) { return -1; @@ -1140,7 +1141,7 @@ int splunk_prot_handle_ng(struct flb_http_request *request, if (request->method != HTTP_METHOD_POST) { /* HEAD, PUT, PATCH, and DELETE methods are prohibited to use.*/ send_response_ng(response, 400, "error: invalid HTTP method\n"); - + return -1; } From 6df8fabcf843e94c36eabbb5e2cadb939731e6e7 Mon Sep 17 00:00:00 2001 From: Eduardo Silva Date: Tue, 13 Aug 2024 14:17:13 -0600 Subject: [PATCH 7/7] lib: ctraces: upgrade to v0.5.5 Signed-off-by: Eduardo Silva --- lib/ctraces/CMakeLists.txt | 2 +- lib/ctraces/src/ctr_decode_opentelemetry.c | 1 + lib/ctraces/src/ctr_encode_opentelemetry.c | 3 ++- 3 files changed, 4 insertions(+), 2 deletions(-) diff --git a/lib/ctraces/CMakeLists.txt b/lib/ctraces/CMakeLists.txt index 425384fcce9..3cef9282fe2 100644 --- a/lib/ctraces/CMakeLists.txt +++ b/lib/ctraces/CMakeLists.txt @@ -27,7 +27,7 @@ endif() # CTraces Version set(CTR_VERSION_MAJOR 0) set(CTR_VERSION_MINOR 5) -set(CTR_VERSION_PATCH 3) +set(CTR_VERSION_PATCH 5) set(CTR_VERSION_STR "${CTR_VERSION_MAJOR}.${CTR_VERSION_MINOR}.${CTR_VERSION_PATCH}") # Define __FILENAME__ consistently across Operating Systems diff --git a/lib/ctraces/src/ctr_decode_opentelemetry.c b/lib/ctraces/src/ctr_decode_opentelemetry.c index 067e88d9be9..0e845e15f84 100644 --- a/lib/ctraces/src/ctr_decode_opentelemetry.c +++ b/lib/ctraces/src/ctr_decode_opentelemetry.c @@ -532,6 +532,7 @@ int ctr_decode_opentelemetry_create(struct ctrace **out_ctr, otel_resource_span = service_request->resource_spans[resource_span_index]; if (otel_resource_span == NULL) { opentelemetry__proto__collector__trace__v1__export_trace_service_request__free_unpacked(service_request, NULL); + ctr_destroy(ctr); return -1; } diff --git a/lib/ctraces/src/ctr_encode_opentelemetry.c b/lib/ctraces/src/ctr_encode_opentelemetry.c index e8a95f149aa..8f0c81ed0bb 100644 --- a/lib/ctraces/src/ctr_encode_opentelemetry.c +++ b/lib/ctraces/src/ctr_encode_opentelemetry.c @@ -1042,6 +1042,7 @@ static Opentelemetry__Proto__Trace__V1__ResourceSpans **set_resource_spans(struc otel_resource_span = initialize_resource_span(); if (!otel_resource_span) { + free(rs); return NULL; } otel_resource_span->resource = ctr_set_resource(resource_span->resource); @@ -1305,12 +1306,12 @@ cfl_sds_t ctr_encode_opentelemetry_create(struct ctrace *ctr) len = opentelemetry__proto__collector__trace__v1__export_trace_service_request__get_packed_size(req); buf = cfl_sds_create_size(len); if (!buf) { + destroy_export_service_request(req); return NULL; } cfl_sds_set_len(buf, len); opentelemetry__proto__collector__trace__v1__export_trace_service_request__pack(req, (uint8_t *)buf); - destroy_export_service_request(req); return buf;