Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Sketch Aggregator #213

Merged
merged 13 commits into from
Aug 4, 2020
275 changes: 275 additions & 0 deletions sdk/include/opentelemetry/sdk/metrics/aggregator/sketch_aggregator.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,275 @@
#pragma once

#include <algorithm>
#include <cmath>
#include <limits>
#include <map>
#include <mutex>
#include <stdexcept>
#include <vector>
#include "opentelemetry/metrics/instrument.h"
#include "opentelemetry/sdk/metrics/aggregator/aggregator.h"
#include "opentelemetry/version.h"

namespace metrics_api = opentelemetry::metrics;

OPENTELEMETRY_BEGIN_NAMESPACE
namespace sdk
{
namespace metrics
{

/** Sketch Aggregators implement the DDSketch data type. Note that data is compressed
* by the DDSketch algorithm and users should be informed about its behavior before
* selecting it as the aggregation type. NOTE: The current implementation can only support
* non-negative values.
*
* Detailed information about the algorithm can be found in the following paper
* published by Datadog: http://www.vldb.org/pvldb/vol12/p2195-masson.pdf
*/

template <class T>
class SketchAggregator final : public Aggregator<T>
{

public:
/**
* Given the distribution of data this aggregator is designed for and its usage, the raw updates
*are stored in a map rather than a vector.
*
*@param kind, the instrument kind creating this aggregator
*@param error_bound, what is referred to as "alpha" in the DDSketch algorithm
*@param max_buckets, the maximum number of indices in the raw value map
*/
SketchAggregator(metrics_api::InstrumentKind kind, double error_bound, size_t max_buckets = 2048)
{

this->kind_ = kind;
this->agg_kind_ = AggregatorKind::Sketch;
this->values_ = std::vector<T>(2, 0); // Sum in [0], Count in [1]
this->checkpoint_ = std::vector<T>(2, 0);
max_buckets_ = max_buckets;
error_bound_ = error_bound;
gamma = (1 + error_bound) / (1 - error_bound);
}

/**
* Update the aggregator with the new value. For a DDSketch aggregator, if the addition of this
* value creates a new bucket which is in excess of the maximum allowed size, the lowest indexes
* buckets are merged.
*
* @param val, the raw value used in aggregation
* @return none
*/
void update(T val) override
{
this->mu_.lock();
int idx;
if (val == 0)
{
idx = std::numeric_limits<int>::min();
}
else
{
idx = ceil(log(val) / log(gamma));
}
if (raw_.find(idx) != raw_.end())
{
raw_[idx] += 1;
}
else
{
raw_[idx] = 1;
}
this->values_[1] += 1;
this->values_[0] += val;
if (raw_.size() > max_buckets_)
{
int minidx = raw_.begin()->first, minidxval = raw_.begin()->second;
raw_.erase(minidx);
raw_[raw_.begin()->first] += minidxval;
}
this->mu_.unlock();
}

/**
* Calculate and return the value of a user specified quantile.
*
* @param q, the quantile to calculate (for example 0.5 is equivelant to the 50th percentile)
*/
virtual T get_quantiles(double q) override
{
if (q < 0 || q > 1)
{
#if __EXCEPTIONS
throw std::invalid_argument("Quantile values must fall between 0 and 1");
#else
std::terminate();
#endif
}
auto iter = checkpoint_raw_.begin();
int idx = iter->first;
int count = iter->second;

while (count < (q * (this->checkpoint_[1] - 1)) && iter != checkpoint_raw_.end())
{
iter++;
idx = iter->first;
count += iter->second;
}
return round(2 * pow(gamma, idx) / (gamma + 1));
}

/**
* Checkpoints the current value. This function will overwrite the current checkpoint with the
* current value.
*
* @param none
* @return none
*/
void checkpoint() override
{
this->mu_.lock();
this->checkpoint_ = this->values_;
checkpoint_raw_ = raw_;
this->values_[0] = 0;
this->values_[1] = 0;
raw_.clear();
this->mu_.unlock();
}

/**
* Merges this sketch aggregator with another. The same bucket compression used when
* updating values is employed here to manage bucket size if the merging of aggregators
* results in more buckets than allowed.
*
* @param other, the aggregator with merge with
* @return none
*/
void merge(SketchAggregator other)
{
this->mu_.lock();
if (gamma != other.gamma)
{
#if __EXCEPTIONS
throw std::invalid_argument("Aggregators must have identical error tolerance");
#else
std::terminate();
#endif
}
else if (max_buckets_ != other.max_buckets_)
{
#if __EXCEPTIONS
throw std::invalid_argument("Aggregators must have the same maximum bucket allowance");
#else
std::terminate();
#endif
}

this->values_[0] += other.values_[0];
this->values_[1] += other.values_[1];
this->checkpoint_[0] += other.checkpoint_[0];
this->checkpoint_[1] += other.checkpoint_[1];
auto other_iter = other.raw_.begin();
while (other_iter != other.raw_.end())
{
raw_[other_iter->first] += other_iter->second;
if (raw_.size() > max_buckets_)
{
int minidx = raw_.begin()->first, minidxval = raw_.begin()->second;
raw_.erase(minidx);
raw_[raw_.begin()->first] += minidxval;
}
other_iter++;
}
auto other_ckpt_iter = other.checkpoint_raw_.begin();
while (other_ckpt_iter != other.checkpoint_raw_.end())
{
checkpoint_raw_[other_ckpt_iter->first] += other_ckpt_iter->second;
if (checkpoint_raw_.size() > max_buckets_)
{
int minidx = checkpoint_raw_.begin()->first, minidxval = checkpoint_raw_.begin()->second;
checkpoint_raw_.erase(minidx);
checkpoint_raw_[checkpoint_raw_.begin()->first] += minidxval;
}
other_ckpt_iter++;
}
this->mu_.unlock();
}

/**
* Returns the checkpointed value
*
* @param none
* @return the value of the checkpoint
*/
std::vector<T> get_checkpoint() override { return this->checkpoint_; }

/**
* Returns the current values
*
* @param none
* @return the present aggregator values
*/
std::vector<T> get_values() override { return this->values_; }

/**
* Returns the indices (or values) stored by this sketch aggregator.
*
* @param none
* @return a vector of all values the aggregator is currently tracking
*/
virtual std::vector<double> get_boundaries() override
{
std::vector<double> ret;
for (auto const &x : checkpoint_raw_)
{
ret.push_back(2 * pow(gamma, x.first) / (gamma + 1));
}
return ret;
}

/**
* Returns the error bound
*
* @param none
* @return the error bound specified during construction
*/
virtual double get_error_bound() override { return error_bound_; }

/**
* Returns the maximum allowed buckets
*
* @param none
* @return the maximum allowed buckets
*/
virtual size_t get_max_buckets() override { return max_buckets_; }

/**
* Returns the count of each value tracked by this sketch aggregator. These are returned
* in the same order as the indices returned by the get_boundaries function.
*
* @param none
* @return a vector of all counts for values tracked by the aggregator
*/
virtual std::vector<int> get_counts() override
{
std::vector<int> ret;
for (auto const &x : checkpoint_raw_)
{
ret.push_back(x.second);
}
return ret;
}

private:
double gamma;
double error_bound_;
size_t max_buckets_;
std::map<int, int> raw_;
std::map<int, int> checkpoint_raw_;
};

} // namespace metrics
} // namespace sdk
OPENTELEMETRY_END_NAMESPACE
11 changes: 11 additions & 0 deletions sdk/test/metrics/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -74,3 +74,14 @@ cc_test(
"@com_google_googletest//:gtest_main",
],
)

cc_test(
name = "sketch_aggregator_test",
srcs = [
"sketch_aggregator_test.cc",
],
deps = [
"//sdk/src/metrics",
"@com_google_googletest//:gtest_main",
],
)
34 changes: 6 additions & 28 deletions sdk/test/metrics/metric_instrument_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -69,42 +69,20 @@ TEST(Counter, getAggsandnewupdate)
{
Counter<int> alpha("test", "none", "unitless", true);

std::map<std::string, std::string> labels = {{"key", "value"}};
std::map<std::string, std::string> labels1 = {{"key1", "value1"}};

// labels 2 and 3 are actually the same
std::map<std::string, std::string> labels2 = {{"key2", "value2"}, {"key3", "value3"}};
std::map<std::string, std::string> labels3 = {{"key3", "value3"}, {"key2", "value2"}};

auto labelkv = trace::KeyValueIterableView<decltype(labels)>{labels};
auto labelkv1 = trace::KeyValueIterableView<decltype(labels1)>{labels1};
auto labelkv2 = trace::KeyValueIterableView<decltype(labels2)>{labels2};
auto labelkv3 = trace::KeyValueIterableView<decltype(labels3)>{labels3};
std::map<std::string, std::string> labels = {{"key3", "value3"}, {"key2", "value2"}};

auto labelkv = trace::KeyValueIterableView<decltype(labels)>{labels};
auto beta = alpha.bindCounter(labelkv);
auto gamma = alpha.bindCounter(labelkv1);
auto delta = alpha.bindCounter(labelkv1);
auto epsilon = alpha.bindCounter(labelkv1);
auto zeta = alpha.bindCounter(labelkv2);
auto eta = alpha.bindCounter(labelkv3);

EXPECT_EQ(beta->get_ref(), 1);
EXPECT_EQ(gamma->get_ref(), 3);
EXPECT_EQ(eta->get_ref(), 2);
beta->unbind();

delta->unbind();
gamma->unbind();
epsilon->unbind();

EXPECT_EQ(alpha.boundInstruments_[KvToString(labelkv1)]->get_ref(), 0);
EXPECT_EQ(alpha.boundInstruments_.size(), 3);
EXPECT_EQ(alpha.boundInstruments_[KvToString(labelkv)]->get_ref(), 0);
EXPECT_EQ(alpha.boundInstruments_.size(), 1);

auto theta = alpha.GetRecords();
EXPECT_EQ(theta.size(), 3);
EXPECT_EQ(theta.size(), 1);
EXPECT_EQ(theta[0].GetName(), "test");
EXPECT_EQ(theta[0].GetDescription(), "none");
EXPECT_EQ(theta[0].GetLabels(), "{\"key2\":\"value2\",\"key3\":\"value3\"}");
EXPECT_EQ(theta[1].GetLabels(), "{\"key1\":\"value1\"}");
}

void CounterCallback(std::shared_ptr<Counter<int>> in,
Expand Down
Loading