diff --git a/sdk/include/opentelemetry/sdk/metrics/async_instruments.h b/sdk/include/opentelemetry/sdk/metrics/async_instruments.h new file mode 100644 index 0000000000..9eccb86466 --- /dev/null +++ b/sdk/include/opentelemetry/sdk/metrics/async_instruments.h @@ -0,0 +1,272 @@ +#pragma once + +#include +#include +#include +#include +#include +#include "opentelemetry/metrics/async_instruments.h" +#include "opentelemetry/sdk/metrics/aggregator/counter_aggregator.h" +#include "opentelemetry/sdk/metrics/aggregator/min_max_sum_count_aggregator.h" +#include "opentelemetry/sdk/metrics/instrument.h" +#include "opentelemetry/version.h" + +namespace metrics_api = opentelemetry::metrics; + +OPENTELEMETRY_BEGIN_NAMESPACE +namespace sdk +{ +namespace metrics +{ + +template +class ValueObserver : public AsynchronousInstrument, virtual public metrics_api::ValueObserver +{ + +public: + ValueObserver() = default; + + ValueObserver(nostd::string_view name, + nostd::string_view description, + nostd::string_view unit, + bool enabled, + void (*callback)(metrics_api::ObserverResult)) + : AsynchronousInstrument(name, + description, + unit, + enabled, + callback, + metrics_api::InstrumentKind::ValueObserver) + {} + + /* + * Updates the instruments aggregator with the new value. The labels should + * contain the keys and values to be associated with this value. + * + * @param value is the numerical representation of the metric being captured + * @param labels the set of labels, as key-value pairs + */ + virtual void observe(T value, const trace::KeyValueIterable &labels) override + { + this->mu_.lock(); + std::string labelset = KvToString(labels); + if (boundAggregators_.find(labelset) == boundAggregators_.end()) + { + auto sp1 = std::shared_ptr>(new MinMaxSumCountAggregator(this->kind_)); + boundAggregators_.insert(std::make_pair(labelset, sp1)); + sp1->update(value); + } + else + { + boundAggregators_[labelset]->update(value); + } + this->mu_.unlock(); + } + + /* + * Activate the instrument's callback function to record a measurement. This + * function will be called by the specified controller at a regular interval. + * + * @param none + * @return none + */ + virtual void run() override + { + metrics_api::ObserverResult res(this); + this->callback_(res); + } + + virtual std::vector GetRecords() override + { + this->mu_.lock(); + std::vector ret; + for (auto x : boundAggregators_) + { + x.second->checkpoint(); + ret.push_back(Record(this->GetName(), this->GetDescription(), x.first, x.second)); + } + boundAggregators_.clear(); + this->mu_.unlock(); + return ret; + } + + // Public mapping from labels (stored as strings) to their respective aggregators + std::unordered_map>> boundAggregators_; +}; + +template +class SumObserver : public AsynchronousInstrument, virtual public metrics_api::SumObserver +{ + +public: + SumObserver() = default; + + SumObserver(nostd::string_view name, + nostd::string_view description, + nostd::string_view unit, + bool enabled, + void (*callback)(metrics_api::ObserverResult)) + : AsynchronousInstrument(name, + description, + unit, + enabled, + callback, + metrics_api::InstrumentKind::SumObserver) + {} + + /* + * Updates the instruments aggregator with the new value. The labels should + * contain the keys and values to be associated with this value. + * + * @param value is the numerical representation of the metric being captured + * @param labels the set of labels, as key-value pairs + */ + virtual void observe(T value, const trace::KeyValueIterable &labels) override + { + this->mu_.lock(); + std::string labelset = KvToString(labels); + if (boundAggregators_.find(labelset) == boundAggregators_.end()) + { + auto sp1 = std::shared_ptr>(new CounterAggregator(this->kind_)); + boundAggregators_.insert(std::make_pair(labelset, sp1)); + if (value < 0) + { +#if __EXCEPTIONS + throw std::invalid_argument("Counter instrument updates must be non-negative."); +#else + std::terminate(); +#endif + } + else + { + sp1->update(value); + } + } + else + { + if (value < 0) + { +#if __EXCEPTIONS + throw std::invalid_argument("Counter instrument updates must be non-negative."); +#else + std::terminate(); +#endif + } + else + { + boundAggregators_[labelset]->update(value); + } + } + this->mu_.unlock(); + } + + /* + * Activate the intsrument's callback function to record a measurement. This + * function will be called by the specified controller at a regular interval. + * + * @param none + * @return none + */ + virtual void run() override + { + metrics_api::ObserverResult res(this); + this->callback_(res); + } + + virtual std::vector GetRecords() override + { + this->mu_.lock(); + std::vector ret; + for (auto x : boundAggregators_) + { + x.second->checkpoint(); + ret.push_back(Record(this->GetName(), this->GetDescription(), x.first, x.second)); + } + boundAggregators_.clear(); + this->mu_.unlock(); + return ret; + } + + // Public mapping from labels (stored as strings) to their respective aggregators + std::unordered_map>> boundAggregators_; +}; + +template +class UpDownSumObserver : public AsynchronousInstrument, + virtual public metrics_api::UpDownSumObserver +{ + +public: + UpDownSumObserver() = default; + + UpDownSumObserver(nostd::string_view name, + nostd::string_view description, + nostd::string_view unit, + bool enabled, + void (*callback)(metrics_api::ObserverResult)) + : AsynchronousInstrument(name, + description, + unit, + enabled, + callback, + metrics_api::InstrumentKind::UpDownSumObserver) + {} + + /* + * Updates the instruments aggregator with the new value. The labels should + * contain the keys and values to be associated with this value. + * + * @param value is the numerical representation of the metric being captured + * @param labels the set of labels, as key-value pairs + */ + virtual void observe(T value, const trace::KeyValueIterable &labels) override + { + this->mu_.lock(); + std::string labelset = KvToString(labels); + if (boundAggregators_.find(labelset) == boundAggregators_.end()) + { + auto sp1 = std::shared_ptr>(new CounterAggregator(this->kind_)); + boundAggregators_.insert(std::make_pair(labelset, sp1)); + sp1->update(value); + } + else + { + boundAggregators_[labelset]->update(value); + } + this->mu_.unlock(); + } + + /* + * Activate the intsrument's callback function to record a measurement. This + * function will be called by the specified controller at a regular interval. + * + * @param none + * @return none + */ + virtual void run() override + { + metrics_api::ObserverResult res(this); + this->callback_(res); + } + + virtual std::vector GetRecords() override + { + this->mu_.lock(); + std::vector ret; + for (auto x : boundAggregators_) + { + x.second->checkpoint(); + ret.push_back(Record(this->GetName(), this->GetDescription(), x.first, x.second)); + } + boundAggregators_.clear(); + this->mu_.unlock(); + return ret; + } + + // Public mapping from labels (stored as strings) to their respective aggregators + std::unordered_map>> boundAggregators_; +}; + +} // namespace metrics +} // namespace sdk +OPENTELEMETRY_END_NAMESPACE diff --git a/sdk/include/opentelemetry/sdk/metrics/instrument.h b/sdk/include/opentelemetry/sdk/metrics/instrument.h index 3a3e525714..1d06cbec4f 100644 --- a/sdk/include/opentelemetry/sdk/metrics/instrument.h +++ b/sdk/include/opentelemetry/sdk/metrics/instrument.h @@ -1,10 +1,5 @@ #pragma once -#include "opentelemetry/metrics/instrument.h" -#include "opentelemetry/sdk/metrics/aggregator/aggregator.h" -#include "opentelemetry/sdk/metrics/record.h" -#include "opentelemetry/version.h" - #include #include #include @@ -12,6 +7,10 @@ #include #include #include +#include "opentelemetry/metrics/instrument.h" +#include "opentelemetry/sdk/metrics/aggregator/aggregator.h" +#include "opentelemetry/sdk/metrics/record.h" +#include "opentelemetry/version.h" namespace metrics_api = opentelemetry::metrics; namespace trace_api = opentelemetry::trace; @@ -85,7 +84,12 @@ class BoundSynchronousInstrument : public Instrument, * @param none * @return void */ - virtual void unbind() override { ref_ -= 1; } + virtual void unbind() override + { + this->mu_.lock(); + ref_ -= 1; + this->mu_.unlock(); + } /** * Increments the reference count. This function is used when binding or instantiating. @@ -93,7 +97,12 @@ class BoundSynchronousInstrument : public Instrument, * @param none * @return void */ - virtual void inc_ref() override { ref_ += 1; } + virtual void inc_ref() override + { + this->mu_.lock(); + ref_ += 1; + this->mu_.unlock(); + } /** * Returns the current reference count of the instrument. This value is used to @@ -164,6 +173,7 @@ class SynchronousInstrument : public Instrument, return nostd::shared_ptr>(); } + // This function is necessary for batch recording and should NOT be called by the user virtual void update(T value, const trace::KeyValueIterable &labels) override = 0; /** @@ -177,6 +187,49 @@ class SynchronousInstrument : public Instrument, virtual std::vector GetRecords() = 0; }; +template +class AsynchronousInstrument : public Instrument, + virtual public metrics_api::AsynchronousInstrument +{ + +public: + AsynchronousInstrument() = default; + + AsynchronousInstrument(nostd::string_view name, + nostd::string_view description, + nostd::string_view unit, + bool enabled, + void (*callback)(metrics_api::ObserverResult), + metrics_api::InstrumentKind kind) + : Instrument(name, description, unit, enabled, kind) + { + this->callback_ = callback; + } + + /** + * Captures data through a manual call rather than the automatic collection process instituted + * in the run function. Asynchronous instruments are generally expected to obtain data from + * their callbacks rather than direct calls. This function is used by the callback to store data. + * + * @param value is the numerical representation of the metric being captured + * @param labels is the numerical representation of the metric being captured + * @return none + */ + virtual void observe(T value, const trace::KeyValueIterable &labels) override = 0; + + virtual std::vector GetRecords() = 0; + + /** + * Captures data by activating the callback function associated with the + * instrument and storing its return value. Callbacks for asynchronous + * instruments are defined during construction. + * + * @param none + * @return none + */ + virtual void run() override = 0; +}; + // Helper functions for turning a trace::KeyValueIterable into a string inline void print_value(std::stringstream &ss, common::AttributeValue &value, diff --git a/sdk/include/opentelemetry/sdk/metrics/sync_instruments.h b/sdk/include/opentelemetry/sdk/metrics/sync_instruments.h index d8b1f83d24..2c0931eeb2 100644 --- a/sdk/include/opentelemetry/sdk/metrics/sync_instruments.h +++ b/sdk/include/opentelemetry/sdk/metrics/sync_instruments.h @@ -48,7 +48,6 @@ class BoundCounter final : public BoundSynchronousInstrument, public metrics_ */ virtual void add(T value) override { - this->mu_.lock(); if (value < 0) { #if __EXCEPTIONS @@ -61,7 +60,6 @@ class BoundCounter final : public BoundSynchronousInstrument, public metrics_ { this->update(value); } - this->mu_.unlock(); } }; @@ -94,18 +92,22 @@ class Counter final : public SynchronousInstrument, public metrics_api::Count virtual nostd::shared_ptr> bindCounter( const trace::KeyValueIterable &labels) override { + this->mu_.lock(); std::string labelset = KvToString(labels); if (boundInstruments_.find(labelset) == boundInstruments_.end()) { auto sp1 = nostd::shared_ptr>( new BoundCounter(this->name_, this->description_, this->unit_, this->enabled_)); boundInstruments_[labelset] = sp1; + this->mu_.unlock(); return sp1; } else { boundInstruments_[labelset]->inc_ref(); - return boundInstruments_[labelset]; + auto ret = boundInstruments_[labelset]; + this->mu_.unlock(); + return ret; } } @@ -119,7 +121,6 @@ class Counter final : public SynchronousInstrument, public metrics_api::Count */ virtual void add(T value, const trace::KeyValueIterable &labels) override { - this->mu_.lock(); if (value < 0) { #if __EXCEPTIONS @@ -134,11 +135,11 @@ class Counter final : public SynchronousInstrument, public metrics_api::Count sp->update(value); sp->unbind(); } - this->mu_.unlock(); } virtual std::vector GetRecords() override { + this->mu_.lock(); std::vector ret; std::vector toDelete; for (const auto &x : boundInstruments_) @@ -155,6 +156,7 @@ class Counter final : public SynchronousInstrument, public metrics_api::Count { boundInstruments_.erase(x); } + this->mu_.unlock(); return ret; } @@ -194,12 +196,7 @@ class BoundUpDownCounter final : public BoundSynchronousInstrument, * @param value the numerical representation of the metric being captured * @param labels the set of labels, as key-value pairs */ - virtual void add(T value) override - { - this->mu_.lock(); - this->update(value); - this->mu_.unlock(); - } + virtual void add(T value) override { this->update(value); } }; template @@ -230,18 +227,22 @@ class UpDownCounter final : public SynchronousInstrument, public metrics_api: nostd::shared_ptr> bindUpDownCounter( const trace::KeyValueIterable &labels) override { + this->mu_.lock(); std::string labelset = KvToString(labels); if (boundInstruments_.find(labelset) == boundInstruments_.end()) { auto sp1 = nostd::shared_ptr>( new BoundUpDownCounter(this->name_, this->description_, this->unit_, this->enabled_)); boundInstruments_[labelset] = sp1; + this->mu_.unlock(); return sp1; } else { boundInstruments_[labelset]->inc_ref(); - return boundInstruments_[labelset]; + auto ret = boundInstruments_[labelset]; + this->mu_.unlock(); + return ret; } } @@ -255,15 +256,14 @@ class UpDownCounter final : public SynchronousInstrument, public metrics_api: */ void add(T value, const trace::KeyValueIterable &labels) override { - this->mu_.lock(); auto sp = bindUpDownCounter(labels); sp->update(value); sp->unbind(); - this->mu_.unlock(); } virtual std::vector GetRecords() override { + this->mu_.lock(); std::vector ret; std::vector toDelete; for (const auto &x : boundInstruments_) @@ -280,6 +280,7 @@ class UpDownCounter final : public SynchronousInstrument, public metrics_api: { boundInstruments_.erase(x); } + this->mu_.unlock(); return ret; } @@ -318,12 +319,7 @@ class BoundValueRecorder final : public BoundSynchronousInstrument, * @param value the numerical representation of the metric being captured * @param labels the set of labels, as key-value pairs */ - void record(T value) - { - this->mu_.lock(); - this->update(value); - this->mu_.unlock(); - } + void record(T value) { this->update(value); } }; template @@ -354,18 +350,22 @@ class ValueRecorder final : public SynchronousInstrument, public metrics_api: nostd::shared_ptr> bindValueRecorder( const trace::KeyValueIterable &labels) override { + this->mu_.lock(); std::string labelset = KvToString(labels); if (boundInstruments_.find(labelset) == boundInstruments_.end()) { auto sp1 = nostd::shared_ptr>( new BoundValueRecorder(this->name_, this->description_, this->unit_, this->enabled_)); boundInstruments_[labelset] = sp1; + this->mu_.unlock(); return sp1; } else { boundInstruments_[labelset]->inc_ref(); - return boundInstruments_[labelset]; + auto ret = boundInstruments_[labelset]; + this->mu_.unlock(); + return ret; } } @@ -379,15 +379,14 @@ class ValueRecorder final : public SynchronousInstrument, public metrics_api: */ void record(T value, const trace::KeyValueIterable &labels) override { - this->mu_.lock(); auto sp = bindValueRecorder(labels); sp->update(value); sp->unbind(); - this->mu_.unlock(); } virtual std::vector GetRecords() override { + this->mu_.lock(); std::vector ret; std::vector toDelete; for (const auto &x : boundInstruments_) @@ -404,6 +403,7 @@ class ValueRecorder final : public SynchronousInstrument, public metrics_api: { boundInstruments_.erase(x); } + this->mu_.unlock(); return ret; } diff --git a/sdk/test/metrics/metric_instrument_test.cc b/sdk/test/metrics/metric_instrument_test.cc index f1bb23b73f..97fe53278c 100644 --- a/sdk/test/metrics/metric_instrument_test.cc +++ b/sdk/test/metrics/metric_instrument_test.cc @@ -1,12 +1,13 @@ -#include -#include "opentelemetry/sdk/metrics/sync_instruments.h" +#include #include #include #include #include #include #include +#include "opentelemetry/sdk/metrics/async_instruments.h" +#include "opentelemetry/sdk/metrics/sync_instruments.h" namespace metrics_api = opentelemetry::metrics; @@ -16,6 +17,180 @@ namespace sdk namespace metrics { +void ObserverConstructorCallback(metrics_api::ObserverResult result) +{ + std::map labels = {{"key", "value"}}; + auto labelkv = trace::KeyValueIterableView{labels}; + result.observe(1, labelkv); +} + +TEST(ApiSdkConversion, async) +{ + nostd::shared_ptr> alpha = + nostd::shared_ptr>( + new ValueObserver("ankit", "none", "unitles", true, &ObserverConstructorCallback)); + + std::map labels = {{"key587", "value264"}}; + auto labelkv = trace::KeyValueIterableView{labels}; + + alpha->observe(123456, labelkv); + EXPECT_EQ(dynamic_cast *>(alpha.get())->GetRecords()[0].GetLabels(), + "{\"key587\":\"value264\"}"); + + alpha->observe(123456, labelkv); + AggregatorVariant canCollect = + dynamic_cast *>(alpha.get())->GetRecords()[0].GetAggregator(); + EXPECT_EQ(nostd::holds_alternative>>(canCollect), false); + EXPECT_EQ(nostd::holds_alternative>>(canCollect), true); + EXPECT_EQ(nostd::get>>(canCollect)->get_checkpoint()[0], 123456); +} + +TEST(IntValueObserver, InstrumentFunctions) +{ + ValueObserver alpha("enabled", "no description", "unitless", true, + &ObserverConstructorCallback); + std::map labels = {{"key", "value"}}; + auto labelkv = trace::KeyValueIterableView{labels}; + + EXPECT_EQ(alpha.GetName(), "enabled"); + EXPECT_EQ(alpha.GetDescription(), "no description"); + EXPECT_EQ(alpha.GetUnits(), "unitless"); + EXPECT_EQ(alpha.IsEnabled(), true); + EXPECT_EQ(alpha.GetKind(), metrics_api::InstrumentKind::ValueObserver); + + alpha.run(); + EXPECT_EQ(alpha.boundAggregators_[KvToString(labelkv)]->get_values()[0], 1); // min +} + +void ObserverCallback(std::shared_ptr> in, + int freq, + const trace::KeyValueIterable &labels) +{ + for (int i = 0; i < freq; i++) + { + in->observe(i, labels); + } +} + +void NegObserverCallback(std::shared_ptr> in, + int freq, + const trace::KeyValueIterable &labels) +{ + for (int i = 0; i < freq; i++) + { + in->observe(-i, labels); + } +} + +TEST(IntValueObserver, StressObserve) +{ + std::shared_ptr> alpha(new ValueObserver( + "enabled", "no description", "unitless", true, &ObserverConstructorCallback)); + + std::map labels = {{"key", "value"}}; + std::map labels1 = {{"key1", "value1"}}; + auto labelkv = trace::KeyValueIterableView{labels}; + auto labelkv1 = trace::KeyValueIterableView{labels1}; + + std::thread first(ObserverCallback, alpha, 25, + labelkv); // spawn new threads that call the callback + std::thread second(ObserverCallback, alpha, 50, labelkv); + std::thread third(ObserverCallback, alpha, 25, labelkv1); + std::thread fourth(NegObserverCallback, alpha, 100, labelkv1); // negative values + + first.join(); + second.join(); + third.join(); + fourth.join(); + + EXPECT_EQ(alpha->boundAggregators_[KvToString(labelkv)]->get_values()[0], 0); // min + EXPECT_EQ(alpha->boundAggregators_[KvToString(labelkv)]->get_values()[1], 49); // max + EXPECT_EQ(alpha->boundAggregators_[KvToString(labelkv)]->get_values()[2], 1525); // sum + EXPECT_EQ(alpha->boundAggregators_[KvToString(labelkv)]->get_values()[3], 75); // count + + EXPECT_EQ(alpha->boundAggregators_[KvToString(labelkv1)]->get_values()[0], -99); // min + EXPECT_EQ(alpha->boundAggregators_[KvToString(labelkv1)]->get_values()[1], 24); // max + EXPECT_EQ(alpha->boundAggregators_[KvToString(labelkv1)]->get_values()[2], -4650); // sum + EXPECT_EQ(alpha->boundAggregators_[KvToString(labelkv1)]->get_values()[3], 125); // count +} + +void SumObserverCallback(std::shared_ptr> in, + int freq, + const trace::KeyValueIterable &labels) +{ + for (int i = 0; i < freq; i++) + { + in->observe(1, labels); + } +} + +TEST(IntSumObserver, StressObserve) +{ + std::shared_ptr> alpha( + new SumObserver("test", "none", "unitless", true, &ObserverConstructorCallback)); + + std::map labels = {{"key", "value"}}; + std::map labels1 = {{"key1", "value1"}}; + auto labelkv = trace::KeyValueIterableView{labels}; + auto labelkv1 = trace::KeyValueIterableView{labels1}; + + std::thread first(SumObserverCallback, alpha, 100000, labelkv); + std::thread second(SumObserverCallback, alpha, 100000, labelkv); + std::thread third(SumObserverCallback, alpha, 300000, labelkv1); + + first.join(); + second.join(); + third.join(); + + EXPECT_EQ(alpha->boundAggregators_[KvToString(labelkv)]->get_values()[0], 200000); + EXPECT_EQ(alpha->boundAggregators_[KvToString(labelkv1)]->get_values()[0], 300000); +} + +void UpDownSumObserverCallback(std::shared_ptr> in, + int freq, + const trace::KeyValueIterable &labels) +{ + for (int i = 0; i < freq; i++) + { + in->observe(1, labels); + } +} + +void NegUpDownSumObserverCallback(std::shared_ptr> in, + int freq, + const trace::KeyValueIterable &labels) +{ + for (int i = 0; i < freq; i++) + { + in->observe(-1, labels); + } +} + +TEST(IntUpDownObserver, StressAdd) +{ + std::shared_ptr> alpha( + new UpDownSumObserver("test", "none", "unitless", true, &ObserverConstructorCallback)); + + std::map labels = {{"key", "value"}}; + std::map labels1 = {{"key1", "value1"}}; + auto labelkv = trace::KeyValueIterableView{labels}; + auto labelkv1 = trace::KeyValueIterableView{labels1}; + + std::thread first(UpDownSumObserverCallback, alpha, 123400, + labelkv); // spawn new threads that call the callback + std::thread second(UpDownSumObserverCallback, alpha, 123400, labelkv); + std::thread third(UpDownSumObserverCallback, alpha, 567800, labelkv1); + std::thread fourth(NegUpDownSumObserverCallback, alpha, 123400, labelkv1); // negative values + + first.join(); + second.join(); + third.join(); + fourth.join(); + + EXPECT_EQ(alpha->boundAggregators_[KvToString(labelkv)]->get_values()[0], 123400 * 2); + EXPECT_EQ(alpha->boundAggregators_[KvToString(labelkv1)]->get_values()[0], 567800 - 123400); +} + TEST(Counter, InstrumentFunctions) { Counter alpha("enabled", "no description", "unitless", true); @@ -262,4 +437,5 @@ TEST(IntValueRecorder, StressRecord) } // namespace metrics } // namespace sdk + OPENTELEMETRY_END_NAMESPACE