Skip to content

Commit

Permalink
Add Sketch Aggregator (open-telemetry#213)
Browse files Browse the repository at this point in the history
  • Loading branch information
ankit-bhargava authored Aug 4, 2020
1 parent d9035ef commit 6fd86dc
Show file tree
Hide file tree
Showing 4 changed files with 541 additions and 28 deletions.
275 changes: 275 additions & 0 deletions sdk/include/opentelemetry/sdk/metrics/aggregator/sketch_aggregator.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,275 @@
#pragma once

#include <algorithm>
#include <cmath>
#include <limits>
#include <map>
#include <mutex>
#include <stdexcept>
#include <vector>
#include "opentelemetry/metrics/instrument.h"
#include "opentelemetry/sdk/metrics/aggregator/aggregator.h"
#include "opentelemetry/version.h"

namespace metrics_api = opentelemetry::metrics;

OPENTELEMETRY_BEGIN_NAMESPACE
namespace sdk
{
namespace metrics
{

/** Sketch Aggregators implement the DDSketch data type. Note that data is compressed
* by the DDSketch algorithm and users should be informed about its behavior before
* selecting it as the aggregation type. NOTE: The current implementation can only support
* non-negative values.
*
* Detailed information about the algorithm can be found in the following paper
* published by Datadog: http://www.vldb.org/pvldb/vol12/p2195-masson.pdf
*/

template <class T>
class SketchAggregator final : public Aggregator<T>
{

public:
/**
* Given the distribution of data this aggregator is designed for and its usage, the raw updates
*are stored in a map rather than a vector.
*
*@param kind, the instrument kind creating this aggregator
*@param error_bound, what is referred to as "alpha" in the DDSketch algorithm
*@param max_buckets, the maximum number of indices in the raw value map
*/
SketchAggregator(metrics_api::InstrumentKind kind, double error_bound, size_t max_buckets = 2048)
{

this->kind_ = kind;
this->agg_kind_ = AggregatorKind::Sketch;
this->values_ = std::vector<T>(2, 0); // Sum in [0], Count in [1]
this->checkpoint_ = std::vector<T>(2, 0);
max_buckets_ = max_buckets;
error_bound_ = error_bound;
gamma = (1 + error_bound) / (1 - error_bound);
}

/**
* Update the aggregator with the new value. For a DDSketch aggregator, if the addition of this
* value creates a new bucket which is in excess of the maximum allowed size, the lowest indexes
* buckets are merged.
*
* @param val, the raw value used in aggregation
* @return none
*/
void update(T val) override
{
this->mu_.lock();
int idx;
if (val == 0)
{
idx = std::numeric_limits<int>::min();
}
else
{
idx = ceil(log(val) / log(gamma));
}
if (raw_.find(idx) != raw_.end())
{
raw_[idx] += 1;
}
else
{
raw_[idx] = 1;
}
this->values_[1] += 1;
this->values_[0] += val;
if (raw_.size() > max_buckets_)
{
int minidx = raw_.begin()->first, minidxval = raw_.begin()->second;
raw_.erase(minidx);
raw_[raw_.begin()->first] += minidxval;
}
this->mu_.unlock();
}

/**
* Calculate and return the value of a user specified quantile.
*
* @param q, the quantile to calculate (for example 0.5 is equivelant to the 50th percentile)
*/
virtual T get_quantiles(double q) override
{
if (q < 0 || q > 1)
{
#if __EXCEPTIONS
throw std::invalid_argument("Quantile values must fall between 0 and 1");
#else
std::terminate();
#endif
}
auto iter = checkpoint_raw_.begin();
int idx = iter->first;
int count = iter->second;

while (count < (q * (this->checkpoint_[1] - 1)) && iter != checkpoint_raw_.end())
{
iter++;
idx = iter->first;
count += iter->second;
}
return round(2 * pow(gamma, idx) / (gamma + 1));
}

/**
* Checkpoints the current value. This function will overwrite the current checkpoint with the
* current value.
*
* @param none
* @return none
*/
void checkpoint() override
{
this->mu_.lock();
this->checkpoint_ = this->values_;
checkpoint_raw_ = raw_;
this->values_[0] = 0;
this->values_[1] = 0;
raw_.clear();
this->mu_.unlock();
}

/**
* Merges this sketch aggregator with another. The same bucket compression used when
* updating values is employed here to manage bucket size if the merging of aggregators
* results in more buckets than allowed.
*
* @param other, the aggregator with merge with
* @return none
*/
void merge(SketchAggregator other)
{
this->mu_.lock();
if (gamma != other.gamma)
{
#if __EXCEPTIONS
throw std::invalid_argument("Aggregators must have identical error tolerance");
#else
std::terminate();
#endif
}
else if (max_buckets_ != other.max_buckets_)
{
#if __EXCEPTIONS
throw std::invalid_argument("Aggregators must have the same maximum bucket allowance");
#else
std::terminate();
#endif
}

this->values_[0] += other.values_[0];
this->values_[1] += other.values_[1];
this->checkpoint_[0] += other.checkpoint_[0];
this->checkpoint_[1] += other.checkpoint_[1];
auto other_iter = other.raw_.begin();
while (other_iter != other.raw_.end())
{
raw_[other_iter->first] += other_iter->second;
if (raw_.size() > max_buckets_)
{
int minidx = raw_.begin()->first, minidxval = raw_.begin()->second;
raw_.erase(minidx);
raw_[raw_.begin()->first] += minidxval;
}
other_iter++;
}
auto other_ckpt_iter = other.checkpoint_raw_.begin();
while (other_ckpt_iter != other.checkpoint_raw_.end())
{
checkpoint_raw_[other_ckpt_iter->first] += other_ckpt_iter->second;
if (checkpoint_raw_.size() > max_buckets_)
{
int minidx = checkpoint_raw_.begin()->first, minidxval = checkpoint_raw_.begin()->second;
checkpoint_raw_.erase(minidx);
checkpoint_raw_[checkpoint_raw_.begin()->first] += minidxval;
}
other_ckpt_iter++;
}
this->mu_.unlock();
}

/**
* Returns the checkpointed value
*
* @param none
* @return the value of the checkpoint
*/
std::vector<T> get_checkpoint() override { return this->checkpoint_; }

/**
* Returns the current values
*
* @param none
* @return the present aggregator values
*/
std::vector<T> get_values() override { return this->values_; }

/**
* Returns the indices (or values) stored by this sketch aggregator.
*
* @param none
* @return a vector of all values the aggregator is currently tracking
*/
virtual std::vector<double> get_boundaries() override
{
std::vector<double> ret;
for (auto const &x : checkpoint_raw_)
{
ret.push_back(2 * pow(gamma, x.first) / (gamma + 1));
}
return ret;
}

/**
* Returns the error bound
*
* @param none
* @return the error bound specified during construction
*/
virtual double get_error_bound() override { return error_bound_; }

/**
* Returns the maximum allowed buckets
*
* @param none
* @return the maximum allowed buckets
*/
virtual size_t get_max_buckets() override { return max_buckets_; }

/**
* Returns the count of each value tracked by this sketch aggregator. These are returned
* in the same order as the indices returned by the get_boundaries function.
*
* @param none
* @return a vector of all counts for values tracked by the aggregator
*/
virtual std::vector<int> get_counts() override
{
std::vector<int> ret;
for (auto const &x : checkpoint_raw_)
{
ret.push_back(x.second);
}
return ret;
}

private:
double gamma;
double error_bound_;
size_t max_buckets_;
std::map<int, int> raw_;
std::map<int, int> checkpoint_raw_;
};

} // namespace metrics
} // namespace sdk
OPENTELEMETRY_END_NAMESPACE
11 changes: 11 additions & 0 deletions sdk/test/metrics/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -74,3 +74,14 @@ cc_test(
"@com_google_googletest//:gtest_main",
],
)

cc_test(
name = "sketch_aggregator_test",
srcs = [
"sketch_aggregator_test.cc",
],
deps = [
"//sdk/src/metrics",
"@com_google_googletest//:gtest_main",
],
)
34 changes: 6 additions & 28 deletions sdk/test/metrics/metric_instrument_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -69,42 +69,20 @@ TEST(Counter, getAggsandnewupdate)
{
Counter<int> alpha("test", "none", "unitless", true);

std::map<std::string, std::string> labels = {{"key", "value"}};
std::map<std::string, std::string> labels1 = {{"key1", "value1"}};

// labels 2 and 3 are actually the same
std::map<std::string, std::string> labels2 = {{"key2", "value2"}, {"key3", "value3"}};
std::map<std::string, std::string> labels3 = {{"key3", "value3"}, {"key2", "value2"}};

auto labelkv = trace::KeyValueIterableView<decltype(labels)>{labels};
auto labelkv1 = trace::KeyValueIterableView<decltype(labels1)>{labels1};
auto labelkv2 = trace::KeyValueIterableView<decltype(labels2)>{labels2};
auto labelkv3 = trace::KeyValueIterableView<decltype(labels3)>{labels3};
std::map<std::string, std::string> labels = {{"key3", "value3"}, {"key2", "value2"}};

auto labelkv = trace::KeyValueIterableView<decltype(labels)>{labels};
auto beta = alpha.bindCounter(labelkv);
auto gamma = alpha.bindCounter(labelkv1);
auto delta = alpha.bindCounter(labelkv1);
auto epsilon = alpha.bindCounter(labelkv1);
auto zeta = alpha.bindCounter(labelkv2);
auto eta = alpha.bindCounter(labelkv3);

EXPECT_EQ(beta->get_ref(), 1);
EXPECT_EQ(gamma->get_ref(), 3);
EXPECT_EQ(eta->get_ref(), 2);
beta->unbind();

delta->unbind();
gamma->unbind();
epsilon->unbind();

EXPECT_EQ(alpha.boundInstruments_[KvToString(labelkv1)]->get_ref(), 0);
EXPECT_EQ(alpha.boundInstruments_.size(), 3);
EXPECT_EQ(alpha.boundInstruments_[KvToString(labelkv)]->get_ref(), 0);
EXPECT_EQ(alpha.boundInstruments_.size(), 1);

auto theta = alpha.GetRecords();
EXPECT_EQ(theta.size(), 3);
EXPECT_EQ(theta.size(), 1);
EXPECT_EQ(theta[0].GetName(), "test");
EXPECT_EQ(theta[0].GetDescription(), "none");
EXPECT_EQ(theta[0].GetLabels(), "{\"key2\":\"value2\",\"key3\":\"value3\"}");
EXPECT_EQ(theta[1].GetLabels(), "{\"key1\":\"value1\"}");
}

void CounterCallback(std::shared_ptr<Counter<int>> in,
Expand Down
Loading

0 comments on commit 6fd86dc

Please sign in to comment.