diff --git a/sdk/include/opentelemetry/sdk/metrics/aggregator/sketch_aggregator.h b/sdk/include/opentelemetry/sdk/metrics/aggregator/sketch_aggregator.h new file mode 100644 index 00000000000..25f178c9c00 --- /dev/null +++ b/sdk/include/opentelemetry/sdk/metrics/aggregator/sketch_aggregator.h @@ -0,0 +1,275 @@ +#pragma once + +#include +#include +#include +#include +#include +#include +#include +#include "opentelemetry/metrics/instrument.h" +#include "opentelemetry/sdk/metrics/aggregator/aggregator.h" +#include "opentelemetry/version.h" + +namespace metrics_api = opentelemetry::metrics; + +OPENTELEMETRY_BEGIN_NAMESPACE +namespace sdk +{ +namespace metrics +{ + +/** Sketch Aggregators implement the DDSketch data type. Note that data is compressed + * by the DDSketch algorithm and users should be informed about its behavior before + * selecting it as the aggregation type. NOTE: The current implementation can only support + * non-negative values. + * + * Detailed information about the algorithm can be found in the following paper + * published by Datadog: http://www.vldb.org/pvldb/vol12/p2195-masson.pdf + */ + +template +class SketchAggregator final : public Aggregator +{ + +public: + /** + * Given the distribution of data this aggregator is designed for and its usage, the raw updates + *are stored in a map rather than a vector. + * + *@param kind, the instrument kind creating this aggregator + *@param error_bound, what is referred to as "alpha" in the DDSketch algorithm + *@param max_buckets, the maximum number of indices in the raw value map + */ + SketchAggregator(metrics_api::InstrumentKind kind, double error_bound, size_t max_buckets = 2048) + { + + this->kind_ = kind; + this->agg_kind_ = AggregatorKind::Sketch; + this->values_ = std::vector(2, 0); // Sum in [0], Count in [1] + this->checkpoint_ = std::vector(2, 0); + max_buckets_ = max_buckets; + error_bound_ = error_bound; + gamma = (1 + error_bound) / (1 - error_bound); + } + + /** + * Update the aggregator with the new value. For a DDSketch aggregator, if the addition of this + * value creates a new bucket which is in excess of the maximum allowed size, the lowest indexes + * buckets are merged. + * + * @param val, the raw value used in aggregation + * @return none + */ + void update(T val) override + { + this->mu_.lock(); + int idx; + if (val == 0) + { + idx = std::numeric_limits::min(); + } + else + { + idx = ceil(log(val) / log(gamma)); + } + if (raw_.find(idx) != raw_.end()) + { + raw_[idx] += 1; + } + else + { + raw_[idx] = 1; + } + this->values_[1] += 1; + this->values_[0] += val; + if (raw_.size() > max_buckets_) + { + int minidx = raw_.begin()->first, minidxval = raw_.begin()->second; + raw_.erase(minidx); + raw_[raw_.begin()->first] += minidxval; + } + this->mu_.unlock(); + } + + /** + * Calculate and return the value of a user specified quantile. + * + * @param q, the quantile to calculate (for example 0.5 is equivelant to the 50th percentile) + */ + virtual T get_quantiles(double q) override + { + if (q < 0 || q > 1) + { +#if __EXCEPTIONS + throw std::invalid_argument("Quantile values must fall between 0 and 1"); +#else + std::terminate(); +#endif + } + auto iter = checkpoint_raw_.begin(); + int idx = iter->first; + int count = iter->second; + + while (count < (q * (this->checkpoint_[1] - 1)) && iter != checkpoint_raw_.end()) + { + iter++; + idx = iter->first; + count += iter->second; + } + return round(2 * pow(gamma, idx) / (gamma + 1)); + } + + /** + * Checkpoints the current value. This function will overwrite the current checkpoint with the + * current value. + * + * @param none + * @return none + */ + void checkpoint() override + { + this->mu_.lock(); + this->checkpoint_ = this->values_; + checkpoint_raw_ = raw_; + this->values_[0] = 0; + this->values_[1] = 0; + raw_.clear(); + this->mu_.unlock(); + } + + /** + * Merges this sketch aggregator with another. The same bucket compression used when + * updating values is employed here to manage bucket size if the merging of aggregators + * results in more buckets than allowed. + * + * @param other, the aggregator with merge with + * @return none + */ + void merge(SketchAggregator other) + { + this->mu_.lock(); + if (gamma != other.gamma) + { +#if __EXCEPTIONS + throw std::invalid_argument("Aggregators must have identical error tolerance"); +#else + std::terminate(); +#endif + } + else if (max_buckets_ != other.max_buckets_) + { +#if __EXCEPTIONS + throw std::invalid_argument("Aggregators must have the same maximum bucket allowance"); +#else + std::terminate(); +#endif + } + + this->values_[0] += other.values_[0]; + this->values_[1] += other.values_[1]; + this->checkpoint_[0] += other.checkpoint_[0]; + this->checkpoint_[1] += other.checkpoint_[1]; + auto other_iter = other.raw_.begin(); + while (other_iter != other.raw_.end()) + { + raw_[other_iter->first] += other_iter->second; + if (raw_.size() > max_buckets_) + { + int minidx = raw_.begin()->first, minidxval = raw_.begin()->second; + raw_.erase(minidx); + raw_[raw_.begin()->first] += minidxval; + } + other_iter++; + } + auto other_ckpt_iter = other.checkpoint_raw_.begin(); + while (other_ckpt_iter != other.checkpoint_raw_.end()) + { + checkpoint_raw_[other_ckpt_iter->first] += other_ckpt_iter->second; + if (checkpoint_raw_.size() > max_buckets_) + { + int minidx = checkpoint_raw_.begin()->first, minidxval = checkpoint_raw_.begin()->second; + checkpoint_raw_.erase(minidx); + checkpoint_raw_[checkpoint_raw_.begin()->first] += minidxval; + } + other_ckpt_iter++; + } + this->mu_.unlock(); + } + + /** + * Returns the checkpointed value + * + * @param none + * @return the value of the checkpoint + */ + std::vector get_checkpoint() override { return this->checkpoint_; } + + /** + * Returns the current values + * + * @param none + * @return the present aggregator values + */ + std::vector get_values() override { return this->values_; } + + /** + * Returns the indices (or values) stored by this sketch aggregator. + * + * @param none + * @return a vector of all values the aggregator is currently tracking + */ + virtual std::vector get_boundaries() override + { + std::vector ret; + for (auto const &x : checkpoint_raw_) + { + ret.push_back(2 * pow(gamma, x.first) / (gamma + 1)); + } + return ret; + } + + /** + * Returns the error bound + * + * @param none + * @return the error bound specified during construction + */ + virtual double get_error_bound() override { return error_bound_; } + + /** + * Returns the maximum allowed buckets + * + * @param none + * @return the maximum allowed buckets + */ + virtual size_t get_max_buckets() override { return max_buckets_; } + + /** + * Returns the count of each value tracked by this sketch aggregator. These are returned + * in the same order as the indices returned by the get_boundaries function. + * + * @param none + * @return a vector of all counts for values tracked by the aggregator + */ + virtual std::vector get_counts() override + { + std::vector ret; + for (auto const &x : checkpoint_raw_) + { + ret.push_back(x.second); + } + return ret; + } + +private: + double gamma; + double error_bound_; + size_t max_buckets_; + std::map raw_; + std::map checkpoint_raw_; +}; + +} // namespace metrics +} // namespace sdk +OPENTELEMETRY_END_NAMESPACE diff --git a/sdk/test/metrics/BUILD b/sdk/test/metrics/BUILD index 85081d8e180..9b95807a68a 100644 --- a/sdk/test/metrics/BUILD +++ b/sdk/test/metrics/BUILD @@ -74,3 +74,14 @@ cc_test( "@com_google_googletest//:gtest_main", ], ) + +cc_test( + name = "sketch_aggregator_test", + srcs = [ + "sketch_aggregator_test.cc", + ], + deps = [ + "//sdk/src/metrics", + "@com_google_googletest//:gtest_main", + ], +) diff --git a/sdk/test/metrics/metric_instrument_test.cc b/sdk/test/metrics/metric_instrument_test.cc index 183b5b97e2b..f1bb23b73fe 100644 --- a/sdk/test/metrics/metric_instrument_test.cc +++ b/sdk/test/metrics/metric_instrument_test.cc @@ -69,42 +69,20 @@ TEST(Counter, getAggsandnewupdate) { Counter alpha("test", "none", "unitless", true); - std::map labels = {{"key", "value"}}; - std::map labels1 = {{"key1", "value1"}}; - - // labels 2 and 3 are actually the same - std::map labels2 = {{"key2", "value2"}, {"key3", "value3"}}; - std::map labels3 = {{"key3", "value3"}, {"key2", "value2"}}; - - auto labelkv = trace::KeyValueIterableView{labels}; - auto labelkv1 = trace::KeyValueIterableView{labels1}; - auto labelkv2 = trace::KeyValueIterableView{labels2}; - auto labelkv3 = trace::KeyValueIterableView{labels3}; + std::map labels = {{"key3", "value3"}, {"key2", "value2"}}; + auto labelkv = trace::KeyValueIterableView{labels}; auto beta = alpha.bindCounter(labelkv); - auto gamma = alpha.bindCounter(labelkv1); - auto delta = alpha.bindCounter(labelkv1); - auto epsilon = alpha.bindCounter(labelkv1); - auto zeta = alpha.bindCounter(labelkv2); - auto eta = alpha.bindCounter(labelkv3); - - EXPECT_EQ(beta->get_ref(), 1); - EXPECT_EQ(gamma->get_ref(), 3); - EXPECT_EQ(eta->get_ref(), 2); + beta->unbind(); - delta->unbind(); - gamma->unbind(); - epsilon->unbind(); - - EXPECT_EQ(alpha.boundInstruments_[KvToString(labelkv1)]->get_ref(), 0); - EXPECT_EQ(alpha.boundInstruments_.size(), 3); + EXPECT_EQ(alpha.boundInstruments_[KvToString(labelkv)]->get_ref(), 0); + EXPECT_EQ(alpha.boundInstruments_.size(), 1); auto theta = alpha.GetRecords(); - EXPECT_EQ(theta.size(), 3); + EXPECT_EQ(theta.size(), 1); EXPECT_EQ(theta[0].GetName(), "test"); EXPECT_EQ(theta[0].GetDescription(), "none"); EXPECT_EQ(theta[0].GetLabels(), "{\"key2\":\"value2\",\"key3\":\"value3\"}"); - EXPECT_EQ(theta[1].GetLabels(), "{\"key1\":\"value1\"}"); } void CounterCallback(std::shared_ptr> in, diff --git a/sdk/test/metrics/sketch_aggregator_test.cc b/sdk/test/metrics/sketch_aggregator_test.cc new file mode 100644 index 00000000000..8b4800952f3 --- /dev/null +++ b/sdk/test/metrics/sketch_aggregator_test.cc @@ -0,0 +1,249 @@ +#include "opentelemetry/sdk/metrics/aggregator/sketch_aggregator.h" + +#include +#include +#include +#include + +namespace metrics_api = opentelemetry::metrics; + +OPENTELEMETRY_BEGIN_NAMESPACE +namespace sdk +{ +namespace metrics +{ + +// Test updating with a uniform set of updates +TEST(Sketch, UniformValues) +{ + SketchAggregator alpha(metrics_api::InstrumentKind::ValueRecorder, .000005); + + EXPECT_EQ(alpha.get_aggregator_kind(), AggregatorKind::Sketch); + + alpha.checkpoint(); + EXPECT_EQ(alpha.get_checkpoint().size(), 2); + EXPECT_EQ(alpha.get_boundaries().size(), 0); + EXPECT_EQ(alpha.get_counts().size(), 0); + + for (int i = 0; i < 60; i++) + { + alpha.update(i); + } + + alpha.checkpoint(); + + EXPECT_EQ(alpha.get_boundaries().size(), 60); + EXPECT_EQ(alpha.get_counts().size(), 60); + + EXPECT_EQ(alpha.get_checkpoint()[0], 1770); + EXPECT_EQ(alpha.get_checkpoint()[1], 60); +} + +// Test updating with a normal distribution +TEST(Sketch, NormalValues) +{ + SketchAggregator alpha(metrics_api::InstrumentKind::ValueRecorder, .0005); + + std::vector vals{1, 3, 3, 5, 5, 5, 7, 7, 7, 7, 9, 9, 9, 11, 11, 13}; + for (int i : vals) + { + alpha.update(i); + } + alpha.checkpoint(); + + EXPECT_EQ(alpha.get_checkpoint()[0], std::accumulate(vals.begin(), vals.end(), 0)); + EXPECT_EQ(alpha.get_checkpoint()[1], vals.size()); + + std::vector correct = {1, 2, 3, 4, 3, 2, 1}; + EXPECT_EQ(alpha.get_counts(), correct); + + std::vector captured_bounds = alpha.get_boundaries(); + for (int i = 0; i < captured_bounds.size(); i++) + { + captured_bounds[i] = round(captured_bounds[i]); + } + + // It is not guaranteed that bounds are correct once the bucket sizes pass 1000 + std::vector correct_bounds = {1, 3, 5, 7, 9, 11, 13}; + EXPECT_EQ(captured_bounds, correct_bounds); +} + +int randVal() +{ + return rand() % 100000; +} + +/** Note that in this case, "Large" refers to a number of distinct values which exceed the maximum + * number of allowed buckets. + */ +TEST(Sketch, QuantileSmall) +{ + SketchAggregator alpha(metrics_api::InstrumentKind::ValueRecorder, .00005); + + std::vector vals1(2048); + std::generate(vals1.begin(), vals1.end(), randVal); + + std::vector vals2(2048); + std::generate(vals1.begin(), vals1.end(), randVal); + + for (int i : vals1) + { + alpha.update(i); + } + alpha.checkpoint(); + std::sort(vals1.begin(), vals1.end()); + + EXPECT_TRUE(abs(alpha.get_quantiles(.25) - vals1[2048 * .25 - 1]) <= 10); + EXPECT_TRUE(abs(alpha.get_quantiles(.50) - vals1[2048 * .50 - 1]) <= 10); + EXPECT_TRUE(abs(alpha.get_quantiles(.75) - vals1[2048 * .75 - 1]) <= 10); +} + +TEST(Sketch, UpdateQuantileLarge) +{ + SketchAggregator alpha(metrics_api::InstrumentKind::ValueRecorder, .0005, 7); + std::vector vals{1, 3, 3, 5, 5, 5, 7, 7, 7, 7, 9, 9, 9, 11, 11, 13}; + for (int i : vals) + { + alpha.update(i); + } + + // This addition should trigger the "1" and "3" buckets to merge + alpha.update(15); + alpha.checkpoint(); + + std::vector correct = {3, 3, 4, 3, 2, 1, 1}; + EXPECT_EQ(alpha.get_counts(), correct); + + for (int i : vals) + { + alpha.update(i); + } + alpha.update(15); + alpha.update(17); + alpha.checkpoint(); + + correct = {6, 4, 3, 2, 1, 1, 1}; + EXPECT_EQ(alpha.get_counts(), correct); +} + +TEST(Sketch, MergeSmall) +{ + SketchAggregator alpha(metrics_api::InstrumentKind::ValueRecorder, .0005); + SketchAggregator beta(metrics_api::InstrumentKind::ValueRecorder, .0005); + + std::vector vals{1, 3, 3, 5, 5, 5, 7, 7, 7, 7, 9, 9, 9, 11, 11, 13}; + for (int i : vals) + { + alpha.update(i); + } + + std::vector otherVals{1, 1, 1, 1, 11, 11, 13, 13, 13, 15}; + for (int i : otherVals) + { + beta.update(i); + } + + alpha.merge(beta); + alpha.checkpoint(); + + EXPECT_EQ(alpha.get_checkpoint()[0], std::accumulate(vals.begin(), vals.end(), 0) + + std::accumulate(otherVals.begin(), otherVals.end(), 0)); + EXPECT_EQ(alpha.get_checkpoint()[1], vals.size() + otherVals.size()); + + std::vector correct = {5, 2, 3, 4, 3, 4, 4, 1}; + EXPECT_EQ(alpha.get_counts(), correct); +} + +TEST(Sketch, MergeLarge) +{ + SketchAggregator alpha(metrics_api::InstrumentKind::ValueRecorder, .0005, 7); + SketchAggregator beta(metrics_api::InstrumentKind::ValueRecorder, .0005, 7); + + std::vector vals{1, 3, 3, 5, 5, 5, 7, 7, 7, 7, 9, 9, 9, 11, 11, 13}; + for (int i : vals) + { + alpha.update(i); + } + + std::vector otherVals{1, 1, 1, 1, 11, 11, 13, 13, 13, 15}; + for (int i : otherVals) + { + beta.update(i); + } + + alpha.merge(beta); + alpha.checkpoint(); + + EXPECT_EQ(alpha.get_checkpoint()[0], std::accumulate(vals.begin(), vals.end(), 0) + + std::accumulate(otherVals.begin(), otherVals.end(), 0)); + EXPECT_EQ(alpha.get_checkpoint()[1], vals.size() + otherVals.size()); + + std::vector correct = {7, 3, 4, 3, 4, 4, 1}; + EXPECT_EQ(alpha.get_counts(), correct); +} + +// Update callback used to validate multi-threaded performance +void sketchUpdateCallback(Aggregator &agg, std::vector vals) +{ + for (int i : vals) + { + agg.update(i); + } +} + +TEST(Sketch, Concurrency) +{ + SketchAggregator alpha(metrics_api::InstrumentKind::ValueRecorder, .0005, 20); + + std::vector vals1(1000); + std::generate(vals1.begin(), vals1.end(), randVal); + + std::vector vals2(1000); + std::generate(vals2.begin(), vals2.end(), randVal); + + std::thread first(sketchUpdateCallback, std::ref(alpha), vals1); + std::thread second(sketchUpdateCallback, std::ref(alpha), vals2); + + first.join(); + second.join(); + + SketchAggregator beta(metrics_api::InstrumentKind::ValueRecorder, .0005, 20); + + for (int i : vals1) + { + beta.update(i); + } + for (int i : vals2) + { + beta.update(i); + } + + alpha.checkpoint(); + beta.checkpoint(); + + EXPECT_EQ(alpha.get_checkpoint(), beta.get_checkpoint()); + EXPECT_EQ(alpha.get_counts(), beta.get_counts()); + EXPECT_EQ(alpha.get_boundaries(), beta.get_boundaries()); +} + +#if __EXCEPTIONS + +TEST(Sketch, Errors) +{ + + SketchAggregator tol1(metrics_api::InstrumentKind::ValueRecorder, .000005); + SketchAggregator tol2(metrics_api::InstrumentKind::ValueRecorder, .005); + SketchAggregator sz1(metrics_api::InstrumentKind::ValueRecorder, .000005, 2938); + SketchAggregator sz2(metrics_api::InstrumentKind::ValueRecorder, .000005); + + EXPECT_ANY_THROW(tol1.merge(tol2)); + EXPECT_ANY_THROW(sz1.merge(sz2)); + EXPECT_ANY_THROW(tol1.get_quantiles(-.000001)); + EXPECT_ANY_THROW(tol1.get_quantiles(1.000001)); +} + +#endif + +} // namespace metrics +} // namespace sdk +OPENTELEMETRY_END_NAMESPACE