diff --git a/sdk/include/opentelemetry/sdk/metrics/aggregator/exact_aggregator.h b/sdk/include/opentelemetry/sdk/metrics/aggregator/exact_aggregator.h new file mode 100644 index 0000000000..00ec1ae12d --- /dev/null +++ b/sdk/include/opentelemetry/sdk/metrics/aggregator/exact_aggregator.h @@ -0,0 +1,164 @@ +#pragma once + +#include "opentelemetry/metrics/instrument.h" +#include "opentelemetry/sdk/metrics/aggregator/aggregator.h" +#include "opentelemetry/version.h" + +#include +#include +#include +#include + +namespace metrics_api = opentelemetry::metrics; + +OPENTELEMETRY_BEGIN_NAMESPACE +namespace sdk +{ +namespace metrics +{ +/** + * This aggregator has two modes. In-order and quantile estimation. + * + * The first mode simply stores all values sent to the Update() + * function in a vector and maintains the order they were sent in. + * + * The second mode also stores all values sent to the Update() + * function in a vector but sorts this vector when Checkpoint() + * is called. This mode also includes a function, Quantile(), + * that estimates the quantiles of the recorded data. + * + * @tparam T the type of values stored in this aggregator. + */ +template +class ExactAggregator : public Aggregator +{ +public: + ExactAggregator(metrics_api::InstrumentKind kind, bool quant_estimation = false) + { + static_assert(std::is_arithmetic::value, "Not an arithmetic type"); + this->kind_ = kind; + this->checkpoint_ = this->values_; + this->agg_kind_ = AggregatorKind::Exact; + quant_estimation_ = quant_estimation; + } + + ~ExactAggregator() = default; + + ExactAggregator(const ExactAggregator &cp) + { + this->values_ = cp.values_; + this->checkpoint_ = cp.checkpoint_; + this->kind_ = cp.kind_; + this->agg_kind_ = cp.agg_kind_; + quant_estimation_ = cp.quant_estimation_; + // use default initialized mutex as they cannot be copied + } + + /** + * Receives a captured value from the instrument and adds it to the values_ vector. + * + * @param val, the raw value used in aggregation + */ + void update(T val) override + { + this->mu_.lock(); + this->values_.push_back(val); + this->mu_.unlock(); + } + + /** + * Checkpoints the current values. This function will overwrite the current checkpoint with the + * current value. Sorts the values_ vector if quant_estimation_ == true + * + */ + void checkpoint() override + { + this->mu_.lock(); + if (quant_estimation_) + { + std::sort(this->values_.begin(), this->values_.end()); + } + this->checkpoint_ = this->values_; + this->values_.clear(); + this->mu_.unlock(); + } + + /** + * Merges two exact aggregators' values_ vectors together. + * + * @param other the aggregator to merge with this aggregator + */ + void merge(const ExactAggregator &other) + { + if (this->kind_ == other.kind_) + { + this->mu_.lock(); + // First merge values + this->values_.insert(this->values_.end(), other.values_.begin(), other.values_.end()); + // Now merge checkpoints + this->checkpoint_.insert(this->checkpoint_.end(), other.checkpoint_.begin(), + other.checkpoint_.end()); + this->mu_.unlock(); + } + else + { + // Log error + return; + } + } + + /** + * Performs quantile estimation on the checkpoint vector in this aggregator. + * This function only works if quant_estimation_ == true. + * @param q the quantile to estimate. 0 <= q <= 1 + * @return the nearest value in the vector to the exact quantile. + */ + T get_quantiles(double q) override + { + if (!quant_estimation_) + { +// Log error +#if __EXCEPTIONS + throw std::domain_error("Exact aggregator is not in quantile estimation mode!"); +#else + std::terminate(); +#endif + } + if (this->checkpoint_.size() == 0 || q < 0 || q > 1) + { +// Log error +#if __EXCEPTIONS + throw std::invalid_argument("Arg 'q' must be between 0 and 1, inclusive"); +#else + std::terminate(); +#endif + } + else if (q == 0 || this->checkpoint_.size() == 1) + { + return this->checkpoint_[0]; + } + else if (q == 1) + { + return this->checkpoint_[this->checkpoint_.size() - 1]; + } + else + { + float position = float(this->checkpoint_.size() - 1) * q; + int ceiling = ceil(position); + return this->checkpoint_[ceiling]; + } + } + + //////////////////////////ACCESSOR FUNCTIONS////////////////////////// + std::vector get_checkpoint() override { return this->checkpoint_; } + + std::vector get_values() override { return this->values_; } + + bool get_quant_estimation() override { return quant_estimation_; } + +private: + bool quant_estimation_; // Used to switch between in-order and quantile estimation modes +}; +} // namespace metrics +} // namespace sdk +OPENTELEMETRY_END_NAMESPACE diff --git a/sdk/test/metrics/BUILD b/sdk/test/metrics/BUILD index ec3ec362ab..d24acd3e58 100644 --- a/sdk/test/metrics/BUILD +++ b/sdk/test/metrics/BUILD @@ -42,6 +42,17 @@ cc_test( ], ) +cc_test( + name = "exact_aggregator_test", + srcs = [ + "exact_aggregator_test.cc", + ], + deps = [ + "//sdk/src/metrics", + "@com_google_googletest//:gtest_main", + ], +) + cc_test( name = "histogram_aggregator_test", srcs = [ diff --git a/sdk/test/metrics/CMakeLists.txt b/sdk/test/metrics/CMakeLists.txt index 3d906b6bcb..7ad1d9e81e 100644 --- a/sdk/test/metrics/CMakeLists.txt +++ b/sdk/test/metrics/CMakeLists.txt @@ -1,5 +1,5 @@ foreach(testname meter_provider_sdk_test gauge_aggregator_test - min_max_sum_count_aggregator_test) + min_max_sum_count_aggregator_test exact_aggregator_test) add_executable(${testname} "${testname}.cc") target_link_libraries(${testname} ${GTEST_BOTH_LIBRARIES} ${CMAKE_THREAD_LIBS_INIT} opentelemetry_metrics) diff --git a/sdk/test/metrics/exact_aggregator_test.cc b/sdk/test/metrics/exact_aggregator_test.cc new file mode 100644 index 0000000000..be32b87704 --- /dev/null +++ b/sdk/test/metrics/exact_aggregator_test.cc @@ -0,0 +1,217 @@ +#include +#include + +#include "opentelemetry/sdk/metrics/aggregator/exact_aggregator.h" + +using namespace opentelemetry::sdk::metrics; + +TEST(ExactAggregatorOrdered, Update) +{ + ExactAggregator agg(opentelemetry::metrics::InstrumentKind::Counter); + + std::vector correct; + + ASSERT_EQ(agg.get_values(), correct); + + agg.update(1); + correct.push_back(1); + + ASSERT_EQ(agg.get_values(), std::vector{1}); + + for (int i = 2; i <= 5; ++i) + { + correct.push_back(i); + agg.update(i); + } + ASSERT_EQ(agg.get_values(), correct); +} + +TEST(ExactAggregatorOrdered, Checkpoint) +{ + ExactAggregator agg(opentelemetry::metrics::InstrumentKind::Counter); + + std::vector correct; + + ASSERT_EQ(agg.get_checkpoint(), correct); + + agg.update(1); + correct.push_back(1); + agg.checkpoint(); + + ASSERT_EQ(agg.get_checkpoint(), correct); +} + +TEST(ExactAggregatorOrdered, Merge) +{ + ExactAggregator agg1(opentelemetry::metrics::InstrumentKind::Counter); + ExactAggregator agg2(opentelemetry::metrics::InstrumentKind::Counter); + + agg1.update(1); + agg2.update(2); + agg1.merge(agg2); + + std::vector correct{1, 2}; + + ASSERT_EQ(agg1.get_values(), correct); +} + +TEST(ExactAggregatorOrdered, BadMerge) +{ + // This verifies that we encounter and error when we try to merge + // two aggregators of different numeric types together. + ExactAggregator agg1(opentelemetry::metrics::InstrumentKind::Counter); + ExactAggregator agg2(opentelemetry::metrics::InstrumentKind::ValueRecorder); + + agg1.update(1); + agg2.update(2); + + agg1.merge(agg2); + + // Verify that the aggregators did NOT merge + std::vector correct{1}; + ASSERT_EQ(agg1.get_values(), correct); +} + +TEST(ExactAggregatorOrdered, Types) +{ + // This test verifies that we do not encounter any errors when + // using various numeric types. + ExactAggregator agg_int(opentelemetry::metrics::InstrumentKind::Counter); + ExactAggregator agg_long(opentelemetry::metrics::InstrumentKind::Counter); + ExactAggregator agg_float(opentelemetry::metrics::InstrumentKind::Counter); + ExactAggregator agg_double(opentelemetry::metrics::InstrumentKind::Counter); + + for (int i = 1; i <= 5; ++i) + { + agg_int.update(i); + agg_long.update(i); + } + + for (float i = 1.0; i <= 5.0; i += 1) + { + agg_float.update(i); + agg_double.update(i); + } + + std::vector correct_int{1, 2, 3, 4, 5}; + std::vector correct_long{1, 2, 3, 4, 5}; + std::vector correct_float{1.0, 2.0, 3.0, 4.0, 5.0}; + std::vector correct_double{1.0, 2.0, 3.0, 4.0, 5.0}; + + ASSERT_EQ(agg_int.get_values(), correct_int); + ASSERT_EQ(agg_long.get_values(), correct_long); + ASSERT_EQ(agg_float.get_values(), correct_float); + ASSERT_EQ(agg_double.get_values(), correct_double); +} + +TEST(ExactAggregatorQuant, Update) +{ + ExactAggregator agg(opentelemetry::metrics::InstrumentKind::Counter, true); + + std::vector correct; + + ASSERT_EQ(agg.get_values(), correct); + + agg.update(1); + correct.push_back(1); + + ASSERT_EQ(agg.get_values(), std::vector{1}); + + for (int i = 2; i <= 5; ++i) + { + correct.push_back(i); + agg.update(i); + } + ASSERT_EQ(agg.get_values(), correct); +} + +TEST(ExactAggregatorQuant, Checkpoint) +{ + // This test verifies that the aggregator updates correctly when + // quantile estimation is turned on. + + ExactAggregator agg(opentelemetry::metrics::InstrumentKind::Counter, true); + + std::vector correct; + + ASSERT_EQ(agg.get_checkpoint(), correct); + + agg.update(1); + agg.update(0); + agg.update(-1); + + // The vector MUST be sorted when checkpointed + correct.push_back(-1); + correct.push_back(0); + correct.push_back(1); + agg.checkpoint(); + + ASSERT_EQ(agg.get_checkpoint(), correct); +} + +TEST(ExactAggregatorQuant, Quantile) +{ + // This test verifies that the quantile estimation function returns + // the correct values. + + ExactAggregator agg(opentelemetry::metrics::InstrumentKind::Counter, true); + + std::vector tmp{3, 9, 42, 57, 163, 210, 272, 300}; + for (int i : tmp) + { + agg.update(i); + } + agg.checkpoint(); + ASSERT_EQ(agg.get_quantiles(.25), 42); + ASSERT_EQ(agg.get_quantiles(0.5), 163); + ASSERT_EQ(agg.get_quantiles(0.75), 272); +} + +TEST(ExactAggregatorInOrder, Quantile) +{ + // This test verifies that if the user has an exact aggregator in "in-order" mode + // an exception will be thrown if they call the quantile() function. + ExactAggregator agg(opentelemetry::metrics::InstrumentKind::Counter); + + std::vector tmp{3, 9, 42, 57, 163, 210, 272, 300}; + for (int i : tmp) + { + agg.update(i); + } + agg.checkpoint(); +#if __EXCEPTIONS + ASSERT_THROW(agg.get_quantiles(0.5), std::domain_error); +#else +#endif +} + +void callback(ExactAggregator &agg) +{ + for (int i = 1; i <= 10000; ++i) + { + agg.update(i); + } +} + +TEST(ExactAggregatorQuant, Concurrency) +{ + // This test checks that the aggregator updates appropriately + // when called in a multi-threaded context. + ExactAggregator agg(opentelemetry::metrics::InstrumentKind::Counter, true); + + std::thread first(&callback, std::ref(agg)); + std::thread second(&callback, std::ref(agg)); + + first.join(); + second.join(); + + std::vector correct; + for (int i = 1; i <= 10000; ++i) + { + correct.push_back(i); + correct.push_back(i); + } + agg.checkpoint(); + + ASSERT_EQ(agg.get_checkpoint(), correct); +} \ No newline at end of file