Skip to content

Commit

Permalink
Add Counter and Histogram Aggregators (#178)
Browse files Browse the repository at this point in the history
  • Loading branch information
ankit-bhargava authored Jul 30, 2020
1 parent 4e2aeee commit 41af3ac
Show file tree
Hide file tree
Showing 6 changed files with 743 additions and 0 deletions.
141 changes: 141 additions & 0 deletions sdk/include/opentelemetry/sdk/metrics/aggregator/aggregator.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
#pragma once

#include <mutex>
#include <vector>
#include "opentelemetry/core/timestamp.h"
#include "opentelemetry/metrics/instrument.h"
#include "opentelemetry/version.h"

namespace metrics_api = opentelemetry::metrics;

OPENTELEMETRY_BEGIN_NAMESPACE
namespace sdk
{
namespace metrics
{

enum class AggregatorKind
{
Counter = 0,
MinMaxSumCount = 1,
Gauge = 2,
Sketch = 3,
Histogram = 4,
Exact = 5,
};

/*
* Performs calculations necessary to combine updates from instruments into an
* insightful value.
* Also stores current instrument values and checkpoints collected at intervals
* governing the entire pipeline.
*/
template <typename T>
class Aggregator
{
public:
Aggregator() = default;

virtual ~Aggregator() = default;

/**
* Receives a captured value from the instrument and applies it to the current aggregator value.
*
* @param val, the raw value used in aggregation
* @return none
*/
virtual void update(T val) = 0;

/**
* Checkpoints the current value. This function will overwrite the current checkpoint with the
* current value.
*
* @param none
* @return none
*/
virtual void checkpoint() = 0;

/**
* Merges the values of two aggregators in a semantically accurate manner.
* Merging will occur differently for different aggregators depending on the
* way values are tracked.
*
* @param other, the aggregator with merge with
* @return none
*/
void merge(Aggregator *other);

/**
* Returns the checkpointed value
*
* @param none
* @return the value of the checkpoint
*/
virtual std::vector<T> get_checkpoint() = 0;

/**
* Returns the current value
*
* @param none
* @return the present aggregator value
*/
virtual std::vector<T> get_values() = 0;

/**
* Returns the instrument kind which this aggregator is associated with
*
* @param none
* @return the InstrumentKind of the aggregator's owner
*/
virtual opentelemetry::metrics::InstrumentKind get_instrument_kind() final { return kind_; }

/**
* Returns the type of this aggregator
*
* @param none
* @return the AggregatorKind of this instrument
*/
virtual AggregatorKind get_aggregator_kind() final { return agg_kind_; }

// virtual function to be overriden for the Histogram Aggregator
virtual std::vector<double> get_boundaries() { return std::vector<double>(); }

// virtual function to be overriden for the Histogram Aggregator
virtual std::vector<int> get_counts() { return std::vector<int>(); }

// virtual function to be overriden for Exact and Sketch Aggregators
virtual bool get_quant_estimation() { return false; }

// virtual function to be overriden for Exact and Sketch Aggregators
virtual T get_quantiles(double q) { return values_[0]; }

// virtual function to be overriden for Sketch Aggregator
virtual double get_error_bound() { return 0; }

// virtual function to be overriden for Sketch Aggregator
virtual size_t get_max_buckets() { return 0; }

// virtual function to be overriden for Gauge Aggregator
virtual core::SystemTimestamp get_checkpoint_timestamp() { return core::SystemTimestamp(); }

// Custom copy constructor to handle the mutex
Aggregator(const Aggregator &cp)
{
values_ = cp.values_;
checkpoint_ = cp.checkpoint_;
kind_ = cp.kind_;
agg_kind_ = cp.agg_kind_;
// use default initialized mutex as they cannot be copied
}

protected:
std::vector<T> values_;
std::vector<T> checkpoint_;
opentelemetry::metrics::InstrumentKind kind_;
std::mutex mu_;
AggregatorKind agg_kind_;
};

} // namespace metrics
} // namespace sdk
OPENTELEMETRY_END_NAMESPACE
101 changes: 101 additions & 0 deletions sdk/include/opentelemetry/sdk/metrics/aggregator/counter_aggregator.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
#pragma once

#include <mutex>
#include <vector>
#include "opentelemetry/metrics/instrument.h"
#include "opentelemetry/sdk/metrics/aggregator/aggregator.h"
#include "opentelemetry/version.h"

namespace metrics_api = opentelemetry::metrics;

OPENTELEMETRY_BEGIN_NAMESPACE
namespace sdk
{
namespace metrics
{

template <class T>
class CounterAggregator final : public Aggregator<T>
{

public:
CounterAggregator(metrics_api::InstrumentKind kind)
{
this->kind_ = kind;
this->values_ = std::vector<T>(1, 0);
this->checkpoint_ = std::vector<T>(1, 0);
this->agg_kind_ = AggregatorKind::Counter;
}

/**
* Recieves a captured value from the instrument and applies it to the current aggregator value.
*
* @param val, the raw value used in aggregation
* @return none
*/
void update(T val) override
{
this->mu_.lock();
this->values_[0] += val; // atomic operation
this->mu_.unlock();
}

/**
* Checkpoints the current value. This function will overwrite the current checkpoint with the
* current value.
*
* @param none
* @return none
*/
void checkpoint() override
{
this->checkpoint_ = this->values_;
this->values_[0] = 0;
}

/**
* Merges the values of two aggregators in a semantically accurate manner.
* In this case, merging only requires the the current values of the two aggregators be summed.
*
* @param other, the aggregator with merge with
* @return none
*/
void merge(CounterAggregator other)
{
if (this->agg_kind_ == other.agg_kind_)
{
this->mu_.lock();
this->values_[0] += other.values_[0];
this->checkpoint_[0] += other.checkpoint_[0];
this->mu_.unlock();
}
else
{
#if __EXCEPTIONS
throw std::invalid_argument("Aggregators of different types cannot be merged.");
#else
std::terminate();
#endif
}
}

/**
* Returns the checkpointed value
*
* @param none
* @return the value of the checkpoint
*/
virtual std::vector<T> get_checkpoint() override { return this->checkpoint_; }

/**
* Returns the current values
*
* @param none
* @return the present aggregator values
*/
virtual std::vector<T> get_values() override { return this->values_; }
};

} // namespace metrics
} // namespace sdk
OPENTELEMETRY_END_NAMESPACE
Loading

0 comments on commit 41af3ac

Please sign in to comment.