From 4caf0bbb0daf9620182b1900de66f381a91d9db8 Mon Sep 17 00:00:00 2001 From: Brandon Kimberly Date: Thu, 30 Jul 2020 10:39:49 -0400 Subject: [PATCH] Add MinMaxSumCount and Gauge Aggregators (#181) --- sdk/include/opentelemetry/sdk/metrics/TBD | 0 .../sdk/metrics/aggregator/gauge_aggregator.h | 138 ++++++++++++ .../aggregator/min_max_sum_count_aggregator.h | 154 +++++++++++++ sdk/test/metrics/BUILD | 22 ++ sdk/test/metrics/CMakeLists.txt | 8 +- sdk/test/metrics/gauge_aggregator_test.cc | 127 +++++++++++ .../min_max_sum_count_aggregator_test.cc | 203 ++++++++++++++++++ 7 files changed, 648 insertions(+), 4 deletions(-) delete mode 100644 sdk/include/opentelemetry/sdk/metrics/TBD create mode 100644 sdk/include/opentelemetry/sdk/metrics/aggregator/gauge_aggregator.h create mode 100644 sdk/include/opentelemetry/sdk/metrics/aggregator/min_max_sum_count_aggregator.h create mode 100644 sdk/test/metrics/gauge_aggregator_test.cc create mode 100644 sdk/test/metrics/min_max_sum_count_aggregator_test.cc diff --git a/sdk/include/opentelemetry/sdk/metrics/TBD b/sdk/include/opentelemetry/sdk/metrics/TBD deleted file mode 100644 index e69de29bb2d..00000000000 diff --git a/sdk/include/opentelemetry/sdk/metrics/aggregator/gauge_aggregator.h b/sdk/include/opentelemetry/sdk/metrics/aggregator/gauge_aggregator.h new file mode 100644 index 00000000000..7925f5e2053 --- /dev/null +++ b/sdk/include/opentelemetry/sdk/metrics/aggregator/gauge_aggregator.h @@ -0,0 +1,138 @@ +#pragma once + +#include "opentelemetry/core/timestamp.h" +#include "opentelemetry/metrics/instrument.h" +#include "opentelemetry/sdk/metrics/aggregator/aggregator.h" +#include "opentelemetry/version.h" + +#include +#include +#include + +namespace metrics_api = opentelemetry::metrics; + +OPENTELEMETRY_BEGIN_NAMESPACE +namespace sdk +{ +namespace metrics +{ +/** + * This aggregator stores and maintains a vector of + * type T where the contents of the vector simply + * include the last value recorded to the aggregator. + * The aggregator also maintains a timestamp of when + * the last value was recorded. + * + * @tparam T the type of values stored in this aggregator. + */ +template +class GaugeAggregator : public Aggregator +{ +public: + explicit GaugeAggregator(metrics_api::InstrumentKind kind) + { + static_assert(std::is_arithmetic::value, "Not an arithmetic type"); + this->kind_ = kind; + this->values_ = std::vector(1, 0); + this->checkpoint_ = this->values_; + this->agg_kind_ = AggregatorKind::Gauge; + current_timestamp_ = core::SystemTimestamp(std::chrono::system_clock::now()); + } + + ~GaugeAggregator() = default; + + GaugeAggregator(const GaugeAggregator &cp) + { + this->values_ = cp.values_; + this->checkpoint_ = cp.checkpoint_; + this->kind_ = cp.kind_; + this->agg_kind_ = cp.agg_kind_; + current_timestamp_ = cp.current_timestamp_; + // use default initialized mutex as they cannot be copied + } + + /** + * Receives a captured value from the instrument and applies it to the current aggregator value. + * + * @param val, the raw value used in aggregation + */ + void update(T val) override + { + this->mu_.lock(); + this->values_[0] = val; + current_timestamp_ = core::SystemTimestamp(std::chrono::system_clock::now()); + this->mu_.unlock(); + } + + /** + * Checkpoints the current value. This function will overwrite the current checkpoint with the + * current value. + * + * @return none + */ + + void checkpoint() override + { + this->mu_.lock(); + + this->checkpoint_ = this->values_; + + // Reset the values to default + this->values_[0] = 0; + checkpoint_timestamp_ = current_timestamp_; + current_timestamp_ = core::SystemTimestamp(std::chrono::system_clock::now()); + + this->mu_.unlock(); + } + + /** + * Merges two Gauge aggregators together + * + * @param other the aggregator to merge with this aggregator + */ + void merge(GaugeAggregator other) + { + if (this->kind_ == other.kind_) + { + this->mu_.lock(); + // First merge values + this->values_[0] = other.values_[0]; + // Now merge checkpoints + this->checkpoint_[0] = other.checkpoint_[0]; + current_timestamp_ = core::SystemTimestamp(std::chrono::system_clock::now()); + this->mu_.unlock(); + } + else + { + // Log error + return; + } + } + + /** + * @return the value of the latest checkpoint + */ + std::vector get_checkpoint() override { return this->checkpoint_; } + + /** + * @return the latest checkpointed timestamp + */ + core::SystemTimestamp get_checkpoint_timestamp() override { return checkpoint_timestamp_; } + + /** + * @return the values_ vector stored in this aggregator + */ + std::vector get_values() override { return this->values_; } + + /** + * @return the timestamp of when the last value recorded + */ + core::SystemTimestamp get_timestamp() { return current_timestamp_; } + +private: + core::SystemTimestamp current_timestamp_; + core::SystemTimestamp checkpoint_timestamp_; +}; +} // namespace metrics +} // namespace sdk +OPENTELEMETRY_END_NAMESPACE diff --git a/sdk/include/opentelemetry/sdk/metrics/aggregator/min_max_sum_count_aggregator.h b/sdk/include/opentelemetry/sdk/metrics/aggregator/min_max_sum_count_aggregator.h new file mode 100644 index 00000000000..b47cf0df064 --- /dev/null +++ b/sdk/include/opentelemetry/sdk/metrics/aggregator/min_max_sum_count_aggregator.h @@ -0,0 +1,154 @@ +#pragma once + +#include "opentelemetry/metrics/instrument.h" +#include "opentelemetry/sdk/metrics/aggregator/aggregator.h" +#include "opentelemetry/version.h" + +#include +#include +#include + +namespace metrics_api = opentelemetry::metrics; + +OPENTELEMETRY_BEGIN_NAMESPACE +namespace sdk +{ +namespace metrics +{ +const int MinValueIndex = 0; +const int MaxValueIndex = 1; +const int SumValueIndex = 2; +const int CountValueIndex = 3; +/** + * This aggregator stores and maintains a vector of + * type T where the contents in the vector are made + * up of the minimum value recorded to this instrument, + * the maximum value, the sum of all values, and the + * count of all values. + * + * @tparam T the type of values stored in this aggregator. + */ +template +class MinMaxSumCountAggregator : public Aggregator +{ +public: + explicit MinMaxSumCountAggregator(metrics_api::InstrumentKind kind) + { + static_assert(std::is_arithmetic::value, "Not an arithmetic type"); + this->kind_ = kind; + this->values_ = std::vector(4, 0); // {min, max, sum, count} + this->checkpoint_ = this->values_; + this->agg_kind_ = AggregatorKind::MinMaxSumCount; + } + + ~MinMaxSumCountAggregator() = default; + + MinMaxSumCountAggregator(const MinMaxSumCountAggregator &cp) + { + this->values_ = cp.values_; + this->checkpoint_ = cp.checkpoint_; + this->kind_ = cp.kind_; + this->agg_kind_ = cp.agg_kind_; + // use default initialized mutex as they cannot be copied + } + + /** + * Receives a captured value from the instrument and applies it to the current aggregator value. + * + * @param val, the raw value used in aggregation + */ + void update(T val) override + { + this->mu_.lock(); + + if (this->values_[CountValueIndex] == 0 || val < this->values_[MinValueIndex]) // set min + this->values_[MinValueIndex] = val; + if (this->values_[CountValueIndex] == 0 || val > this->values_[MaxValueIndex]) // set max + this->values_[MaxValueIndex] = val; + + this->values_[SumValueIndex] += val; // compute sum + this->values_[CountValueIndex]++; // increment count + + this->mu_.unlock(); + } + + /** + * Checkpoints the current value. This function will overwrite the current checkpoint with the + * current value. + * + */ + void checkpoint() override + { + this->mu_.lock(); + this->checkpoint_ = this->values_; + // Reset the values + this->values_[MinValueIndex] = 0; + this->values_[MaxValueIndex] = 0; + this->values_[SumValueIndex] = 0; + this->values_[CountValueIndex] = 0; + this->mu_.unlock(); + } + + /** + * Merges two MinMaxSumCount aggregators together + * + * @param other the aggregator to merge with this aggregator + */ + void merge(const MinMaxSumCountAggregator &other) + { + if (this->kind_ == other.kind_) + { + this->mu_.lock(); + // First merge values + // set min + if (this->values_[CountValueIndex] == 0 || + other.values_[MinValueIndex] < this->values_[MinValueIndex]) + this->values_[MinValueIndex] = other.values_[MinValueIndex]; + // set max + if (this->values_[CountValueIndex] == 0 || + other.values_[MaxValueIndex] > this->values_[MaxValueIndex]) + this->values_[MaxValueIndex] = other.values_[MaxValueIndex]; + // set sum + this->values_[SumValueIndex] += other.values_[SumValueIndex]; + // set count + this->values_[CountValueIndex] += other.values_[CountValueIndex]; + + // Now merge checkpoints + if (this->checkpoint_[CountValueIndex] == 0 || + other.checkpoint_[MinValueIndex] < this->checkpoint_[MinValueIndex]) + this->checkpoint_[MinValueIndex] = other.checkpoint_[MinValueIndex]; + // set max + if (this->checkpoint_[CountValueIndex] == 0 || + other.checkpoint_[MaxValueIndex] > this->checkpoint_[MaxValueIndex]) + this->checkpoint_[MaxValueIndex] = other.checkpoint_[MaxValueIndex]; + // set sum + this->checkpoint_[SumValueIndex] += other.checkpoint_[SumValueIndex]; + // set count + this->checkpoint_[CountValueIndex] += other.checkpoint_[CountValueIndex]; + + this->mu_.unlock(); + } + else + { + // Log error + return; + } + } + + /** + * Returns the checkpointed value + * + * @return the value of the checkpoint + */ + std::vector get_checkpoint() override { return this->checkpoint_; } + + /** + * Returns the values currently held by the aggregator + * + * @return the values held by the aggregator + */ + std::vector get_values() override { return this->values_; } +}; +} // namespace metrics +} // namespace sdk +OPENTELEMETRY_END_NAMESPACE diff --git a/sdk/test/metrics/BUILD b/sdk/test/metrics/BUILD index 4ce50b36e2b..ec3ec362ab0 100644 --- a/sdk/test/metrics/BUILD +++ b/sdk/test/metrics/BUILD @@ -1,3 +1,25 @@ +cc_test( + name = "gauge_aggregator_test", + srcs = [ + "gauge_aggregator_test.cc", + ], + deps = [ + "//sdk/src/metrics", + "@com_google_googletest//:gtest_main", + ], +) + +cc_test( + name = "min_max_sum_count_aggregator_test", + srcs = [ + "min_max_sum_count_aggregator_test.cc", + ], + deps = [ + "//sdk/src/metrics", + "@com_google_googletest//:gtest_main", + ], +) + cc_test( name = "meter_provider_sdk_test", srcs = [ diff --git a/sdk/test/metrics/CMakeLists.txt b/sdk/test/metrics/CMakeLists.txt index d939de19f0a..3d906b6bcb2 100644 --- a/sdk/test/metrics/CMakeLists.txt +++ b/sdk/test/metrics/CMakeLists.txt @@ -1,7 +1,7 @@ -foreach(testname meter_provider_sdk_test) +foreach(testname meter_provider_sdk_test gauge_aggregator_test + min_max_sum_count_aggregator_test) add_executable(${testname} "${testname}.cc") - target_link_libraries( - ${testname} ${GTEST_BOTH_LIBRARIES} ${CMAKE_THREAD_LIBS_INIT} - opentelemetry_common opentelemetry_metrics) + target_link_libraries(${testname} ${GTEST_BOTH_LIBRARIES} + ${CMAKE_THREAD_LIBS_INIT} opentelemetry_metrics) gtest_add_tests(TARGET ${testname} TEST_PREFIX metrics. TEST_LIST ${testname}) endforeach() diff --git a/sdk/test/metrics/gauge_aggregator_test.cc b/sdk/test/metrics/gauge_aggregator_test.cc new file mode 100644 index 00000000000..7370d78f70a --- /dev/null +++ b/sdk/test/metrics/gauge_aggregator_test.cc @@ -0,0 +1,127 @@ +#include +#include + +#include "opentelemetry/sdk/metrics/aggregator/gauge_aggregator.h" + +using namespace opentelemetry::sdk::metrics; + +TEST(GaugeAggregator, Update) +{ + // This tests that the aggregator updates the maintained value correctly + // after a call to the update() function. + auto agg = new GaugeAggregator(opentelemetry::metrics::InstrumentKind::Counter); + + // Verify default value + ASSERT_EQ(agg->get_values()[0], 0); + + // Verify that the value updates correctly + agg->update(1); + ASSERT_EQ(agg->get_values()[0], 1); + + // Verify that the value continually updates correctly + for (int i = 0; i < 10; ++i) + { + agg->update(i); + } + ASSERT_EQ(agg->get_values()[0], 9); + delete agg; +} + +TEST(GaugeAggregator, Checkpoint) +{ + // This tests that the aggregator correctly updates the + // checkpoint_ value after a call to update() followed + // by a call to checkpoint(). + GaugeAggregator agg(opentelemetry::metrics::InstrumentKind::Counter); + + // Verify default checkpoint, before updates + ASSERT_EQ(agg.get_checkpoint()[0], 0); + + agg.update(10); + agg.checkpoint(); + + // Verify that the new checkpoint contains the update value + ASSERT_EQ(agg.get_checkpoint()[0], 10); +} + +TEST(GaugeAggregator, Merge) +{ + // This tests that the values_ vector is updated correctly after + // two aggregators are merged together. + GaugeAggregator agg1(opentelemetry::metrics::InstrumentKind::Counter); + GaugeAggregator agg2(opentelemetry::metrics::InstrumentKind::Counter); + + agg1.update(1); + agg2.update(2); + + agg1.merge(agg2); + + // Verify that the aggregators merged and the value was updated correctly + ASSERT_EQ(agg1.get_values()[0], 2); +} + +TEST(GaugeAggregator, BadMerge) +{ + // This verifies that we encounter and error when we try to merge + // two aggregators of different numeric types together. + GaugeAggregator agg1(opentelemetry::metrics::InstrumentKind::Counter); + GaugeAggregator agg2(opentelemetry::metrics::InstrumentKind::ValueRecorder); + + agg1.update(1); + agg2.update(2); + agg1.merge(agg2); + + // Verify that the aggregators did NOT merge + std::vector correct{1}; + ASSERT_EQ(agg1.get_values(), correct); +} + +TEST(GaugeAggregator, Types) +{ + // This test verifies that we do not encounter any errors when + // using various numeric types. + GaugeAggregator agg_int(opentelemetry::metrics::InstrumentKind::Counter); + GaugeAggregator agg_long(opentelemetry::metrics::InstrumentKind::Counter); + GaugeAggregator agg_float(opentelemetry::metrics::InstrumentKind::Counter); + GaugeAggregator agg_double(opentelemetry::metrics::InstrumentKind::Counter); + + for (int i = 1; i <= 10; ++i) + { + agg_int.update(i); + agg_long.update(i); + } + + for (float i = 1.0; i <= 10.0; i += 1) + { + agg_float.update(i); + agg_double.update(i); + } + + ASSERT_EQ(agg_int.get_values()[0], 10); + ASSERT_EQ(agg_long.get_values()[0], 10); + ASSERT_EQ(agg_float.get_values()[0], 10.0); + ASSERT_EQ(agg_double.get_values()[0], 10.0); +} + +static void callback(GaugeAggregator &agg) +{ + for (int i = 1; i <= 10000; ++i) + { + agg.update(i); + } +} + +TEST(GaugeAggregator, Concurrency) +{ + // This test checks that the aggregator updates appropriately + // when called in a multi-threaded context. + GaugeAggregator agg(opentelemetry::metrics::InstrumentKind::Counter); + + std::thread first(&callback, std::ref(agg)); + std::thread second(&callback, std::ref(agg)); + + first.join(); + second.join(); + + ASSERT_EQ(agg.get_values()[0], 10000); +} \ No newline at end of file diff --git a/sdk/test/metrics/min_max_sum_count_aggregator_test.cc b/sdk/test/metrics/min_max_sum_count_aggregator_test.cc new file mode 100644 index 00000000000..21d06ddcea9 --- /dev/null +++ b/sdk/test/metrics/min_max_sum_count_aggregator_test.cc @@ -0,0 +1,203 @@ +#include +#include + +#include "opentelemetry/sdk/metrics/aggregator/min_max_sum_count_aggregator.h" + +using namespace opentelemetry::sdk::metrics; + +TEST(MinMaxSumCountAggregator, Update) +{ + // This tests that the aggregator updates the maintained value correctly + // after a call to the update() function. + MinMaxSumCountAggregator agg(opentelemetry::metrics::InstrumentKind::Counter); + auto value_set = agg.get_values(); + ASSERT_EQ(value_set[0], 0); + ASSERT_EQ(value_set[1], 0); + ASSERT_EQ(value_set[2], 0); + ASSERT_EQ(value_set[3], 0); + + // 1 + 2 + 3 + ... + 10 = 55 + for (int i = 1; i <= 10; ++i) + { + agg.update(i); + } + + value_set = agg.get_values(); + ASSERT_EQ(value_set[0], 1); // min + ASSERT_EQ(value_set[1], 10); // max + ASSERT_EQ(value_set[2], 55); // sum + ASSERT_EQ(value_set[3], 10); // count +} + +TEST(MinMaxSumCountAggregator, FirstUpdate) +{ + // This tests that the aggregator appropriately maintains the min and + // max values after a single update call. + MinMaxSumCountAggregator agg(opentelemetry::metrics::InstrumentKind::Counter); + agg.update(1); + auto value_set = agg.get_values(); + ASSERT_EQ(value_set[0], 1); // min + ASSERT_EQ(value_set[1], 1); // max + ASSERT_EQ(value_set[2], 1); // sum + ASSERT_EQ(value_set[3], 1); // count +} + +TEST(MinMaxSumCountAggregator, Checkpoint) +{ + // This test verifies that the default checkpoint is set correctly + // and that the checkpoint values update correctly after a call + // to the checkpoint() function. + MinMaxSumCountAggregator agg(opentelemetry::metrics::InstrumentKind::Counter); + + // Verify that the default checkpoint is set correctly. + auto checkpoint_set = agg.get_checkpoint(); + ASSERT_EQ(checkpoint_set[0], 0); // min + ASSERT_EQ(checkpoint_set[1], 0); // max + ASSERT_EQ(checkpoint_set[2], 0); // sum + ASSERT_EQ(checkpoint_set[3], 0); // count + + // 1 + 2 + 3 + ... + 10 = 55 + for (int i = 1; i <= 10; ++i) + { + agg.update(i); + } + + agg.checkpoint(); + + // Verify that the checkpoint values were updated. + checkpoint_set = agg.get_checkpoint(); + ASSERT_EQ(checkpoint_set[0], 1); // min + ASSERT_EQ(checkpoint_set[1], 10); // max + ASSERT_EQ(checkpoint_set[2], 55); // sum + ASSERT_EQ(checkpoint_set[3], 10); // count + + // Verify that the current values were reset to the default state. + auto value_set = agg.get_values(); + ASSERT_EQ(value_set[0], 0); // min + ASSERT_EQ(value_set[1], 0); // max + ASSERT_EQ(value_set[2], 0); // sum + ASSERT_EQ(value_set[3], 0); // count +} + +TEST(MinMaxSumCountAggregator, Merge) +{ + // This tests that the values_ vector is updated correctly after + // two aggregators are merged together. + MinMaxSumCountAggregator agg1(opentelemetry::metrics::InstrumentKind::Counter); + MinMaxSumCountAggregator agg2(opentelemetry::metrics::InstrumentKind::Counter); + + // 1 + 2 + 3 + ... + 10 = 55 + for (int i = 1; i <= 10; ++i) + { + agg1.update(i); + } + + // 1 + 2 + 3 + ... + 20 = 210 + for (int i = 1; i <= 20; ++i) + { + agg2.update(i); + } + + agg1.merge(agg2); + + // Verify that the current values were changed by the merge. + auto value_set = agg1.get_values(); + ASSERT_EQ(value_set[0], 1); // min + ASSERT_EQ(value_set[1], 20); // max + ASSERT_EQ(value_set[2], 265); // sum + ASSERT_EQ(value_set[3], 30); // count +} + +TEST(MinMaxSumCountAggregator, BadMerge) +{ + // This verifies that we encounter and error when we try to merge + // two aggregators of different numeric types together. + MinMaxSumCountAggregator agg1(opentelemetry::metrics::InstrumentKind::Counter); + MinMaxSumCountAggregator agg2(opentelemetry::metrics::InstrumentKind::ValueRecorder); + + agg1.update(1); + agg2.update(2); + + agg1.merge(agg2); + + // Verify that the values did NOT merge + auto value_set = agg1.get_values(); + ASSERT_EQ(value_set[0], 1); // min + ASSERT_EQ(value_set[0], 1); // max + ASSERT_EQ(value_set[0], 1); // sum + ASSERT_EQ(value_set[0], 1); // count +} + +TEST(MinMaxSumCountAggregator, Types) +{ + // This test verifies that we do not encounter any errors when + // using various numeric types. + MinMaxSumCountAggregator agg_int(opentelemetry::metrics::InstrumentKind::Counter); + MinMaxSumCountAggregator agg_long(opentelemetry::metrics::InstrumentKind::Counter); + MinMaxSumCountAggregator agg_float(opentelemetry::metrics::InstrumentKind::Counter); + MinMaxSumCountAggregator agg_double(opentelemetry::metrics::InstrumentKind::Counter); + + for (int i = 1; i <= 10; ++i) + { + agg_int.update(i); + agg_long.update(i); + } + + for (float i = 1.0; i <= 10.0; i += 1) + { + agg_float.update(i); + agg_double.update(i); + } + + auto value_set = agg_int.get_values(); + ASSERT_EQ(value_set[0], 1); // min + ASSERT_EQ(value_set[1], 10); // max + ASSERT_EQ(value_set[2], 55); // sum + ASSERT_EQ(value_set[3], 10); // count + + auto value_set2 = agg_long.get_values(); + ASSERT_EQ(value_set[0], 1); // min + ASSERT_EQ(value_set[1], 10); // max + ASSERT_EQ(value_set[2], 55); // sum + ASSERT_EQ(value_set[3], 10); // count + + auto value_set3 = agg_float.get_values(); + ASSERT_EQ(value_set[0], 1.0); // min + ASSERT_EQ(value_set[1], 10.0); // max + ASSERT_EQ(value_set[2], 55.0); // sum + ASSERT_EQ(value_set[3], 10); // count + + auto value_set4 = agg_double.get_values(); + ASSERT_EQ(value_set[0], 1.0); // min + ASSERT_EQ(value_set[1], 10.0); // max + ASSERT_EQ(value_set[2], 55.0); // sum + ASSERT_EQ(value_set[3], 10); // count +} + +static void callback(MinMaxSumCountAggregator &agg) +{ + // 1 + 2 + ... + 10000 = 50005000 + for (int i = 1; i <= 10000; ++i) + { + agg.update(i); + } +} + +TEST(MinMaxSumCountAggregator, Concurrency) +{ + // This test checks that the aggregator updates appropriately + // when called in a multi-threaded context. + MinMaxSumCountAggregator agg(opentelemetry::metrics::InstrumentKind::Counter); + + std::thread first(&callback, std::ref(agg)); + std::thread second(&callback, std::ref(agg)); + + first.join(); + second.join(); + + auto value_set = agg.get_values(); + ASSERT_EQ(value_set[0], 1); + ASSERT_EQ(value_set[1], 10000); + ASSERT_EQ(value_set[2], 2 * 50005000); + ASSERT_EQ(value_set[3], 2 * 10000); +} \ No newline at end of file