Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Asynchronous Metric Instruments SDK #191

Merged
merged 10 commits into from
Aug 5, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
272 changes: 272 additions & 0 deletions sdk/include/opentelemetry/sdk/metrics/async_instruments.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,272 @@
#pragma once

#include <map>
#include <memory>
#include <sstream>
#include <stdexcept>
#include <vector>
#include "opentelemetry/metrics/async_instruments.h"
#include "opentelemetry/sdk/metrics/aggregator/counter_aggregator.h"
#include "opentelemetry/sdk/metrics/aggregator/min_max_sum_count_aggregator.h"
#include "opentelemetry/sdk/metrics/instrument.h"
#include "opentelemetry/version.h"

namespace metrics_api = opentelemetry::metrics;

OPENTELEMETRY_BEGIN_NAMESPACE
namespace sdk
{
namespace metrics
{

template <class T>
class ValueObserver : public AsynchronousInstrument<T>, virtual public metrics_api::ValueObserver<T>
{

public:
ValueObserver() = default;

ValueObserver(nostd::string_view name,
nostd::string_view description,
nostd::string_view unit,
bool enabled,
void (*callback)(metrics_api::ObserverResult<T>))
: AsynchronousInstrument<T>(name,
description,
unit,
enabled,
callback,
metrics_api::InstrumentKind::ValueObserver)
{}

/*
* Updates the instruments aggregator with the new value. The labels should
* contain the keys and values to be associated with this value.
*
* @param value is the numerical representation of the metric being captured
* @param labels the set of labels, as key-value pairs
*/
virtual void observe(T value, const trace::KeyValueIterable &labels) override
{
this->mu_.lock();
std::string labelset = KvToString(labels);
if (boundAggregators_.find(labelset) == boundAggregators_.end())
{
auto sp1 = std::shared_ptr<Aggregator<T>>(new MinMaxSumCountAggregator<T>(this->kind_));
boundAggregators_.insert(std::make_pair(labelset, sp1));
sp1->update(value);
}
else
{
boundAggregators_[labelset]->update(value);
}
this->mu_.unlock();
}

/*
* Activate the instrument's callback function to record a measurement. This
* function will be called by the specified controller at a regular interval.
*
* @param none
* @return none
*/
virtual void run() override
{
metrics_api::ObserverResult<T> res(this);
this->callback_(res);
}

virtual std::vector<Record> GetRecords() override
{
this->mu_.lock();
std::vector<Record> ret;
for (auto x : boundAggregators_)
{
x.second->checkpoint();
ret.push_back(Record(this->GetName(), this->GetDescription(), x.first, x.second));
}
boundAggregators_.clear();
this->mu_.unlock();
return ret;
}

// Public mapping from labels (stored as strings) to their respective aggregators
std::unordered_map<std::string, std::shared_ptr<Aggregator<T>>> boundAggregators_;
};

template <class T>
class SumObserver : public AsynchronousInstrument<T>, virtual public metrics_api::SumObserver<T>
{

public:
SumObserver() = default;

SumObserver(nostd::string_view name,
nostd::string_view description,
nostd::string_view unit,
bool enabled,
void (*callback)(metrics_api::ObserverResult<T>))
: AsynchronousInstrument<T>(name,
description,
unit,
enabled,
callback,
metrics_api::InstrumentKind::SumObserver)
{}

/*
* Updates the instruments aggregator with the new value. The labels should
* contain the keys and values to be associated with this value.
*
* @param value is the numerical representation of the metric being captured
* @param labels the set of labels, as key-value pairs
*/
virtual void observe(T value, const trace::KeyValueIterable &labels) override
{
this->mu_.lock();
std::string labelset = KvToString(labels);
if (boundAggregators_.find(labelset) == boundAggregators_.end())
{
auto sp1 = std::shared_ptr<Aggregator<T>>(new CounterAggregator<T>(this->kind_));
boundAggregators_.insert(std::make_pair(labelset, sp1));
if (value < 0)
{
#if __EXCEPTIONS
throw std::invalid_argument("Counter instrument updates must be non-negative.");
#else
std::terminate();
#endif
}
else
{
sp1->update(value);
}
}
else
{
if (value < 0)
{
#if __EXCEPTIONS
throw std::invalid_argument("Counter instrument updates must be non-negative.");
#else
std::terminate();
#endif
}
else
{
boundAggregators_[labelset]->update(value);
}
}
this->mu_.unlock();
}

/*
* Activate the intsrument's callback function to record a measurement. This
* function will be called by the specified controller at a regular interval.
*
* @param none
* @return none
*/
virtual void run() override
{
metrics_api::ObserverResult<T> res(this);
this->callback_(res);
}

virtual std::vector<Record> GetRecords() override
{
this->mu_.lock();
std::vector<Record> ret;
for (auto x : boundAggregators_)
{
x.second->checkpoint();
ret.push_back(Record(this->GetName(), this->GetDescription(), x.first, x.second));
}
boundAggregators_.clear();
this->mu_.unlock();
return ret;
}

// Public mapping from labels (stored as strings) to their respective aggregators
std::unordered_map<std::string, std::shared_ptr<Aggregator<T>>> boundAggregators_;
};

template <class T>
class UpDownSumObserver : public AsynchronousInstrument<T>,
virtual public metrics_api::UpDownSumObserver<T>
{

public:
UpDownSumObserver() = default;

UpDownSumObserver(nostd::string_view name,
nostd::string_view description,
nostd::string_view unit,
bool enabled,
void (*callback)(metrics_api::ObserverResult<T>))
: AsynchronousInstrument<T>(name,
description,
unit,
enabled,
callback,
metrics_api::InstrumentKind::UpDownSumObserver)
{}

/*
* Updates the instruments aggregator with the new value. The labels should
* contain the keys and values to be associated with this value.
*
* @param value is the numerical representation of the metric being captured
* @param labels the set of labels, as key-value pairs
*/
virtual void observe(T value, const trace::KeyValueIterable &labels) override
{
this->mu_.lock();
std::string labelset = KvToString(labels);
if (boundAggregators_.find(labelset) == boundAggregators_.end())
{
auto sp1 = std::shared_ptr<Aggregator<T>>(new CounterAggregator<T>(this->kind_));
boundAggregators_.insert(std::make_pair(labelset, sp1));
sp1->update(value);
}
else
{
boundAggregators_[labelset]->update(value);
}
this->mu_.unlock();
}

/*
* Activate the intsrument's callback function to record a measurement. This
* function will be called by the specified controller at a regular interval.
*
* @param none
* @return none
*/
virtual void run() override
{
metrics_api::ObserverResult<T> res(this);
this->callback_(res);
}

virtual std::vector<Record> GetRecords() override
{
this->mu_.lock();
std::vector<Record> ret;
for (auto x : boundAggregators_)
{
x.second->checkpoint();
ret.push_back(Record(this->GetName(), this->GetDescription(), x.first, x.second));
}
boundAggregators_.clear();
this->mu_.unlock();
return ret;
}

// Public mapping from labels (stored as strings) to their respective aggregators
std::unordered_map<std::string, std::shared_ptr<Aggregator<T>>> boundAggregators_;
};

} // namespace metrics
} // namespace sdk
OPENTELEMETRY_END_NAMESPACE
67 changes: 60 additions & 7 deletions sdk/include/opentelemetry/sdk/metrics/instrument.h
Original file line number Diff line number Diff line change
@@ -1,17 +1,16 @@
#pragma once

#include "opentelemetry/metrics/instrument.h"
#include "opentelemetry/sdk/metrics/aggregator/aggregator.h"
#include "opentelemetry/sdk/metrics/record.h"
#include "opentelemetry/version.h"

#include <iostream>
#include <map>
#include <memory>
#include <sstream>
#include <string>
#include <unordered_map>
#include <vector>
#include "opentelemetry/metrics/instrument.h"
#include "opentelemetry/sdk/metrics/aggregator/aggregator.h"
#include "opentelemetry/sdk/metrics/record.h"
#include "opentelemetry/version.h"

namespace metrics_api = opentelemetry::metrics;
namespace trace_api = opentelemetry::trace;
Expand Down Expand Up @@ -85,15 +84,25 @@ class BoundSynchronousInstrument : public Instrument,
* @param none
* @return void
*/
virtual void unbind() override { ref_ -= 1; }
virtual void unbind() override
{
this->mu_.lock();
ref_ -= 1;
this->mu_.unlock();
}

/**
* Increments the reference count. This function is used when binding or instantiating.
*
* @param none
* @return void
*/
virtual void inc_ref() override { ref_ += 1; }
virtual void inc_ref() override
{
this->mu_.lock();
ref_ += 1;
this->mu_.unlock();
}

/**
* Returns the current reference count of the instrument. This value is used to
Expand Down Expand Up @@ -164,6 +173,7 @@ class SynchronousInstrument : public Instrument,
return nostd::shared_ptr<BoundSynchronousInstrument<T>>();
}

// This function is necessary for batch recording and should NOT be called by the user
virtual void update(T value, const trace::KeyValueIterable &labels) override = 0;

/**
Expand All @@ -177,6 +187,49 @@ class SynchronousInstrument : public Instrument,
virtual std::vector<Record> GetRecords() = 0;
};

template <class T>
class AsynchronousInstrument : public Instrument,
virtual public metrics_api::AsynchronousInstrument<T>
{

public:
AsynchronousInstrument() = default;

AsynchronousInstrument(nostd::string_view name,
nostd::string_view description,
nostd::string_view unit,
bool enabled,
void (*callback)(metrics_api::ObserverResult<T>),
metrics_api::InstrumentKind kind)
: Instrument(name, description, unit, enabled, kind)
{
this->callback_ = callback;
}

/**
* Captures data through a manual call rather than the automatic collection process instituted
* in the run function. Asynchronous instruments are generally expected to obtain data from
* their callbacks rather than direct calls. This function is used by the callback to store data.
*
* @param value is the numerical representation of the metric being captured
* @param labels is the numerical representation of the metric being captured
* @return none
*/
virtual void observe(T value, const trace::KeyValueIterable &labels) override = 0;

virtual std::vector<Record> GetRecords() = 0;

/**
* Captures data by activating the callback function associated with the
* instrument and storing its return value. Callbacks for asynchronous
* instruments are defined during construction.
*
* @param none
* @return none
*/
virtual void run() override = 0;
};

// Helper functions for turning a trace::KeyValueIterable into a string
inline void print_value(std::stringstream &ss,
common::AttributeValue &value,
Expand Down
Loading