Skip to content

Commit

Permalink
Improve test
Browse files Browse the repository at this point in the history
  • Loading branch information
anchitj committed Sep 25, 2023
1 parent a2351c4 commit 3a91abe
Show file tree
Hide file tree
Showing 3 changed files with 126 additions and 22 deletions.
3 changes: 2 additions & 1 deletion src/rdkafka_mock_handlers.c
Original file line number Diff line number Diff line change
Expand Up @@ -2196,7 +2196,8 @@ static int rd_kafka_mock_handle_PushTelemetry(rd_kafka_mock_connection_t *mconn,
rd_kafka_buf_read_i8(rkbuf, &compression_type);
rd_kafka_buf_read_kbytes(rkbuf, &metrics);

rd_kafka_telemetry_decode_metrics((void *)metrics.data, metrics.len);
rd_kafka_telemetry_decode_metrics((void *)metrics.data, metrics.len,
rd_false);

// ThrottleTime
rd_kafka_buf_write_i32(resp, 0);
Expand Down
141 changes: 121 additions & 20 deletions src/rdkafka_telemetry_decode.c
Original file line number Diff line number Diff line change
Expand Up @@ -32,13 +32,27 @@
#include "nanopb/pb_encode.h"
#include "nanopb/pb_decode.h"
#include "opentelemetry/metrics.pb.h"
#include "rdkafka.h"
#include "rdkafka_int.h"
#include "rdkafka_telemetry_encode.h"
#include "rdunittest.h"

#define _NANOPB_STRING_DECODE_MAX_BUFFER_SIZE 1024

struct metric_unit_test_data {
rd_kafka_telemetry_metric_type_t type;
char metric_name[_NANOPB_STRING_DECODE_MAX_BUFFER_SIZE];
char metric_description[_NANOPB_STRING_DECODE_MAX_BUFFER_SIZE];
char metric_unit[_NANOPB_STRING_DECODE_MAX_BUFFER_SIZE];
int64_t metric_value;
uint64_t metric_time;
};

struct metric_unit_test_data unit_test_data;

bool (*decode_and_print_metric_ptr)(pb_istream_t *stream,
const pb_field_t *field,
void **arg) = NULL;

static bool decode_and_print_string(pb_istream_t *stream,
const pb_field_t *field,
void **arg) {
Expand All @@ -54,6 +68,10 @@ static bool decode_and_print_string(pb_istream_t *stream,
return false;
}
fprintf(stderr, "String: %s\n", buffer);
if (arg != NULL && *arg != NULL) {
rd_strlcpy(*arg, (char *)buffer,
_NANOPB_STRING_DECODE_MAX_BUFFER_SIZE);
}

return true;
}
Expand Down Expand Up @@ -90,7 +108,13 @@ static bool decode_and_print_number_data_point(pb_istream_t *stream,
return false;
}

fprintf(stderr, "NumberDataPoint value: %lld time: %llu\n",
if (arg != NULL && *arg != NULL) {
struct metric_unit_test_data *test_data = *arg;
test_data->metric_value = data_point.value.as_int;
test_data->metric_time = data_point.time_unix_nano;
}

fprintf(stderr, "NumberDataPoint value: %ld time: %lu\n",
data_point.value.as_int, data_point.time_unix_nano);
return true;
}
Expand All @@ -102,11 +126,21 @@ data_msg_callback(pb_istream_t *stream, const pb_field_t *field, void **arg) {
opentelemetry_proto_metrics_v1_Sum *sum = field->pData;
sum->data_points.funcs.decode =
&decode_and_print_number_data_point;
if (arg != NULL && *arg != NULL) {
sum->data_points.arg = &unit_test_data;
struct metric_unit_test_data *data = *arg;
data->type = RD_KAFKA_TELEMETRY_METRIC_TYPE_SUM;
}
} else if (field->tag ==
opentelemetry_proto_metrics_v1_Metric_gauge_tag) {
opentelemetry_proto_metrics_v1_Gauge *gauge = field->pData;
gauge->data_points.funcs.decode =
&decode_and_print_number_data_point;
if (arg != NULL && *arg != NULL) {
gauge->data_points.arg = &unit_test_data;
struct metric_unit_test_data *data = *arg;
data->type = RD_KAFKA_TELEMETRY_METRIC_TYPE_GAUGE;
}
}
return true;
}
Expand All @@ -132,14 +166,39 @@ static bool decode_and_print_metric(pb_istream_t *stream,
return true;
}

static bool decode_and_print_metric_unittest(pb_istream_t *stream,
const pb_field_t *field,
void **arg) {
opentelemetry_proto_metrics_v1_Metric metric =
opentelemetry_proto_metrics_v1_Metric_init_zero;
metric.name.funcs.decode = &decode_and_print_string;
metric.name.arg = &unit_test_data.metric_name;
metric.description.funcs.decode = &decode_and_print_string;
metric.description.arg = &unit_test_data.metric_description;
metric.unit.funcs.decode = &decode_and_print_string;
metric.unit.arg = &unit_test_data.metric_unit;
metric.cb_data.funcs.decode = &data_msg_callback;
metric.cb_data.arg = &unit_test_data;

if (!pb_decode(stream, opentelemetry_proto_metrics_v1_Metric_fields,
&metric)) {
fprintf(stderr, "Failed to decode Metric: %s\n",
PB_GET_ERROR(stream));
return false;
}

return true;
}


static bool decode_and_print_scope_metrics(pb_istream_t *stream,
const pb_field_t *field,
void **arg) {
opentelemetry_proto_metrics_v1_ScopeMetrics scope_metrics =
opentelemetry_proto_metrics_v1_ScopeMetrics_init_zero;
scope_metrics.scope.name.funcs.decode = &decode_and_print_string;
scope_metrics.scope.version.funcs.decode = &decode_and_print_string;
scope_metrics.metrics.funcs.decode = &decode_and_print_metric;
scope_metrics.metrics.funcs.decode = decode_and_print_metric_ptr;
if (!pb_decode(stream,
opentelemetry_proto_metrics_v1_ScopeMetrics_fields,
&scope_metrics)) {
Expand Down Expand Up @@ -174,13 +233,19 @@ static bool decode_and_print_resource_metrics(pb_istream_t *stream,
* opentelemetry_proto_metrics_v1_MetricsData datatype. Used for testing and
* debugging.
*/
int rd_kafka_telemetry_decode_metrics(void *buffer, size_t size) {
int rd_kafka_telemetry_decode_metrics(void *buffer,
size_t size,
rd_bool_t is_unit_test) {
opentelemetry_proto_metrics_v1_MetricsData metricsData =
opentelemetry_proto_metrics_v1_MetricsData_init_zero;

pb_istream_t stream = pb_istream_from_buffer(buffer, size);
metricsData.resource_metrics.funcs.decode =
&decode_and_print_resource_metrics;
if (is_unit_test)
decode_and_print_metric_ptr = &decode_and_print_metric_unittest;
else
decode_and_print_metric_ptr = &decode_and_print_metric;

bool status = pb_decode(
&stream, opentelemetry_proto_metrics_v1_MetricsData_fields,
Expand All @@ -192,7 +257,16 @@ int rd_kafka_telemetry_decode_metrics(void *buffer, size_t size) {
return status;
}

bool ut_telemetry_gauge() {
void clear_unit_test_data() {
unit_test_data.type = RD_KAFKA_TELEMETRY_METRIC_TYPE_GAUGE;
unit_test_data.metric_name[0] = '\0';
unit_test_data.metric_description[0] = '\0';
unit_test_data.metric_unit[0] = '\0';
unit_test_data.metric_value = 0;
unit_test_data.metric_time = 0;
}

bool unit_test_telemetry_gauge() {
rd_kafka_t *rk = rd_calloc(1, sizeof(*rk));
rk->rk_type = RD_KAFKA_PRODUCER;
rk->rk_telemetry.matched_metrics_cnt = 1;
Expand All @@ -208,28 +282,42 @@ bool ut_telemetry_gauge() {
rkb->rkb_c.connects.val = 1;
TAILQ_INSERT_HEAD(&rk->rk_brokers, rkb, rkb_link);

size_t metrics_payload_size = 0, metrics_payload_size_expected = 162;
size_t metrics_payload_size = 0;
clear_unit_test_data();

void *metrics_payload =
rd_kafka_telemetry_encode_metrics(rk, &metrics_payload_size);
RD_UT_SAY("metrics_payload_size: %zu", metrics_payload_size);

RD_UT_ASSERT(
metrics_payload_size == metrics_payload_size_expected,
"Metrics payload size mismatch. Expected: %zu, Actual: %zu",
metrics_payload_size_expected, metrics_payload_size);
RD_UT_ASSERT(metrics_payload_size != 0, "Metrics payload zero");

bool decode_status = rd_kafka_telemetry_decode_metrics(
metrics_payload, metrics_payload_size);
metrics_payload, metrics_payload_size, rd_true);

RD_UT_ASSERT(decode_status == 1, "Decoding failed");
RD_UT_ASSERT(unit_test_data.type ==
RD_KAFKA_TELEMETRY_METRIC_TYPE_GAUGE,
"Metric type mismatch");
RD_UT_ASSERT(strcmp(unit_test_data.metric_name,
"producer.connection.creation.rate") == 0,
"Metric name mismatch");
RD_UT_ASSERT(
strcmp(unit_test_data.metric_description,
"The rate of connections established per second.") == 0,
"Metric description mismatch");
RD_UT_ASSERT(strcmp(unit_test_data.metric_unit, "1") == 0,
"Metric unit mismatch");
RD_UT_ASSERT(unit_test_data.metric_value == 1, "Metric value mismatch");
RD_UT_ASSERT(unit_test_data.metric_time != 0, "Metric time mismatch");

rd_free(rk->rk_telemetry.matched_metrics);
rd_free(metrics_payload);
rd_free(rkb);
rd_free(rk);
RD_UT_PASS();
}

bool ut_test_sum() {
bool unit_test_telemetry_sum() {
rd_kafka_t *rk = rd_calloc(1, sizeof(*rk));
rk->rk_type = RD_KAFKA_PRODUCER;
rk->rk_telemetry.matched_metrics_cnt = 1;
Expand All @@ -245,21 +333,34 @@ bool ut_test_sum() {
rkb->rkb_c.connects.val = 1;
TAILQ_INSERT_HEAD(&rk->rk_brokers, rkb, rkb_link);

size_t metrics_payload_size = 0, metrics_payload_size_expected = 164;
size_t metrics_payload_size = 0;
clear_unit_test_data();

void *metrics_payload =
rd_kafka_telemetry_encode_metrics(rk, &metrics_payload_size);
RD_UT_SAY("metrics_payload_size: %zu", metrics_payload_size);

RD_UT_ASSERT(
metrics_payload_size == metrics_payload_size_expected,
"Metrics payload size mismatch. Expected: %zu, Actual: %zu",
metrics_payload_size_expected, metrics_payload_size);
RD_UT_ASSERT(metrics_payload_size != 0, "Metrics payload zero");

bool decode_status = rd_kafka_telemetry_decode_metrics(
metrics_payload, metrics_payload_size);
metrics_payload, metrics_payload_size, rd_true);

RD_UT_ASSERT(decode_status == 1, "Decoding failed");
RD_UT_ASSERT(unit_test_data.type == RD_KAFKA_TELEMETRY_METRIC_TYPE_SUM,
"Metric type mismatch");
RD_UT_ASSERT(strcmp(unit_test_data.metric_name,
"producer.connection.creation.total") == 0,
"Metric name mismatch");
RD_UT_ASSERT(strcmp(unit_test_data.metric_description,
"The total number of connections established.") ==
0,
"Metric description mismatch");
RD_UT_ASSERT(strcmp(unit_test_data.metric_unit, "1") == 0,
"Metric unit mismatch");
RD_UT_ASSERT(unit_test_data.metric_value == 1, "Metric value mismatch");
RD_UT_ASSERT(unit_test_data.metric_time != 0, "Metric time mismatch");

rd_free(rk->rk_telemetry.matched_metrics);
rd_free(metrics_payload);
rd_free(rkb);
rd_free(rk);
Expand All @@ -268,7 +369,7 @@ bool ut_test_sum() {

int unittest_telemetry_decode(void) {
int fails = 0;
fails += ut_telemetry_gauge();
fails += ut_test_sum();
fails += unit_test_telemetry_gauge();
fails += unit_test_telemetry_sum();
return fails;
}
4 changes: 3 additions & 1 deletion src/rdkafka_telemetry_decode.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@
#ifndef _RDKAFKA_RDKAFKA_TELEMETRY_DECODE_H
#define _RDKAFKA_RDKAFKA_TELEMETRY_DECODE_H

int rd_kafka_telemetry_decode_metrics(void *buffer, size_t size);
int rd_kafka_telemetry_decode_metrics(void *buffer,
size_t size,
rd_bool_t is_unit_test);

#endif /* _RDKAFKA_RDKAFKA_TELEMETRY_DECODE_H */

0 comments on commit 3a91abe

Please sign in to comment.