diff --git a/sdk/include/opentelemetry/sdk/metrics/aggregator/counter_aggregator.h b/sdk/include/opentelemetry/sdk/metrics/aggregator/counter_aggregator.h index 0bf44f64214..ebd0201ef0f 100644 --- a/sdk/include/opentelemetry/sdk/metrics/aggregator/counter_aggregator.h +++ b/sdk/include/opentelemetry/sdk/metrics/aggregator/counter_aggregator.h @@ -49,8 +49,10 @@ class CounterAggregator final : public Aggregator */ void checkpoint() override { + this->mu_.lock(); this->checkpoint_ = this->values_; this->values_[0] = 0; + this->mu_.unlock(); } /** diff --git a/sdk/include/opentelemetry/sdk/metrics/aggregator/histogram_aggregator.h b/sdk/include/opentelemetry/sdk/metrics/aggregator/histogram_aggregator.h index b2ab83bf224..9d5aafa1277 100644 --- a/sdk/include/opentelemetry/sdk/metrics/aggregator/histogram_aggregator.h +++ b/sdk/include/opentelemetry/sdk/metrics/aggregator/histogram_aggregator.h @@ -62,6 +62,7 @@ class HistogramAggregator final : public Aggregator */ void update(T val) override { + this->mu_.lock(); int bucketID = boundaries_.size(); for (size_t i = 0; i < boundaries_.size(); i++) { @@ -76,7 +77,6 @@ class HistogramAggregator final : public Aggregator // auto pos = std::lower_bound (boundaries_.begin(), boundaries_.end(), val); // bucketCounts_[pos-boundaries_.begin()] += 1; - this->mu_.lock(); this->values_[0] += val; this->values_[1] += 1; bucketCounts_[bucketID] += 1; @@ -92,11 +92,13 @@ class HistogramAggregator final : public Aggregator */ void checkpoint() override { + this->mu_.lock(); this->checkpoint_ = this->values_; this->values_[0] = 0; this->values_[1] = 0; bucketCounts_ckpt_ = bucketCounts_; std::fill(bucketCounts_.begin(), bucketCounts_.end(), 0); + this->mu_.unlock(); } /** diff --git a/sdk/include/opentelemetry/sdk/metrics/aggregator/sketch_aggregator.h b/sdk/include/opentelemetry/sdk/metrics/aggregator/sketch_aggregator.h index 25f178c9c00..fe0b4737ec8 100644 --- a/sdk/include/opentelemetry/sdk/metrics/aggregator/sketch_aggregator.h +++ b/sdk/include/opentelemetry/sdk/metrics/aggregator/sketch_aggregator.h @@ -148,7 +148,6 @@ class SketchAggregator final : public Aggregator */ void merge(SketchAggregator other) { - this->mu_.lock(); if (gamma != other.gamma) { #if __EXCEPTIONS @@ -166,6 +165,7 @@ class SketchAggregator final : public Aggregator #endif } + this->mu_.lock(); this->values_[0] += other.values_[0]; this->values_[1] += other.values_[1]; this->checkpoint_[0] += other.checkpoint_[0]; diff --git a/sdk/include/opentelemetry/sdk/metrics/controller.h b/sdk/include/opentelemetry/sdk/metrics/controller.h new file mode 100644 index 00000000000..0bb789e91cc --- /dev/null +++ b/sdk/include/opentelemetry/sdk/metrics/controller.h @@ -0,0 +1,150 @@ +#pragma once + +#include +#include +#include +#include +#include +#include "opentelemetry/exporters/ostream/metrics_exporter.h" +#include "opentelemetry/metrics/instrument.h" +#include "opentelemetry/nostd/unique_ptr.h" +#include "opentelemetry/sdk/metrics/exporter.h" +#include "opentelemetry/sdk/metrics/meter.h" +#include "opentelemetry/sdk/metrics/processor.h" +#include "opentelemetry/sdk/metrics/record.h" +#include "opentelemetry/version.h" + +namespace metrics_api = opentelemetry::metrics; +namespace trace_api = opentelemetry::trace; + +OPENTELEMETRY_BEGIN_NAMESPACE +namespace sdk +{ +namespace metrics +{ + +class PushController +{ + +public: + PushController(nostd::shared_ptr meter, + nostd::unique_ptr exporter, + nostd::shared_ptr processor, + double period, + int timeout = 30) + { + meter_ = meter; + exporter_ = std::move(exporter); + processor_ = processor; + timeout_ = (unsigned int)(timeout * 1000000); // convert seconds to microseconds + period_ = (unsigned int)(period * 1000000); + } + + /* + * Used to check if the metrics pipeline is currecntly active + * + * @param none + * @return true when active, false when on standby + */ + bool isActive() { return active_.load(); } + + /* + * Begins the data processing and export pipeline. The function first ensures that the pipeline + * is not already running. If not, it begins and detaches a new thread for the Controller's run + * function which periodically polls the instruments for their data. + * + * @param none + * @return a boolean which is true when the pipeline is successfully started and false when + * already active + */ + bool start() + { + if (!active_.load()) + { + active_ = true; + std::thread runner(&PushController::run, this); + runner.detach(); + return true; + } + return false; + } + + /* + * Ends the processing and export pipeline then exports metrics one last time + * before returning. + * + * @param none + * @return none + */ + void stop() + { + if (active_.load()) + { + active_ = false; + while (running_.load()) + { + std::this_thread::sleep_for( + std::chrono::microseconds(period_ / 100)); // wait until the runner thread concludes + } + tick(); // flush metrics sitting in the processor + } + } + +private: + /* + * Run the tick function at a regular interval. This function + * should be run in its own thread. + * + * Used to wait between collection intervals. + */ + void run() + { + if (!running_.load()) + { + running_ = true; + while (active_.load()) + { + tick(); + std::this_thread::sleep_for(std::chrono::microseconds(period_)); + } + running_ = false; + ; + } + } + + /* + * Tick + * + * Called at regular intervals, this function collects all values from the + * member variable meter_, then sends them to the processor_ for + * processing. After the records have been processed they are sent to the + * exporter_ to be exported. + * + */ + void tick() + { + this->mu_.lock(); + std::vector collected = dynamic_cast(meter_.get())->Collect(); + for (const auto &rec : collected) + { + processor_->process(rec); + } + collected = processor_->CheckpointSelf(); + processor_->FinishedCollection(); + exporter_->Export(collected); + this->mu_.unlock(); + } + + nostd::shared_ptr meter_; + nostd::unique_ptr exporter_; + nostd::shared_ptr processor_; + std::mutex mu_; + std::atomic active_ = ATOMIC_VAR_INIT(false); + std::atomic running_ = ATOMIC_VAR_INIT(false); + unsigned int period_; + unsigned int timeout_; +}; + +} // namespace metrics +} // namespace sdk +OPENTELEMETRY_END_NAMESPACE diff --git a/sdk/include/opentelemetry/sdk/metrics/instrument.h b/sdk/include/opentelemetry/sdk/metrics/instrument.h index 1d06cbec4f7..85de5ee701c 100644 --- a/sdk/include/opentelemetry/sdk/metrics/instrument.h +++ b/sdk/include/opentelemetry/sdk/metrics/instrument.h @@ -111,7 +111,13 @@ class BoundSynchronousInstrument : public Instrument, * @param none * @return current ref count of the instrument */ - virtual int get_ref() override { return ref_; } + virtual int get_ref() override + { + this->mu_.lock(); + auto ret = ref_; + this->mu_.unlock(); + return ret; + } /** * Records a single synchronous metric event via a call to the aggregator. diff --git a/sdk/test/metrics/BUILD b/sdk/test/metrics/BUILD index ee6638a0301..cebe041affe 100644 --- a/sdk/test/metrics/BUILD +++ b/sdk/test/metrics/BUILD @@ -1,3 +1,15 @@ +cc_test( + name = "controller_test", + srcs = [ + "controller_test.cc", + ], + deps = [ + "//exporters/ostream:ostream_metrics_exporter", + "//sdk/src/metrics", + "@com_google_googletest//:gtest_main", + ], +) + cc_test( name = "gauge_aggregator_test", srcs = [ diff --git a/sdk/test/metrics/controller_test.cc b/sdk/test/metrics/controller_test.cc new file mode 100644 index 00000000000..ab1878c9a20 --- /dev/null +++ b/sdk/test/metrics/controller_test.cc @@ -0,0 +1,45 @@ +#include "opentelemetry/sdk/metrics/controller.h" +#include "opentelemetry/sdk/metrics/meter.h" +#include "opentelemetry/sdk/metrics/ungrouped_processor.h" + +#include +#include +#include +// #include + +namespace metrics_api = opentelemetry::metrics; + +OPENTELEMETRY_BEGIN_NAMESPACE +namespace sdk +{ +namespace metrics +{ + +TEST(Controller, Constructor) +{ + + std::shared_ptr meter = + std::shared_ptr(new Meter("Test")); + PushController alpha(meter, + std::unique_ptr( + new opentelemetry::exporter::metrics::OStreamMetricsExporter), + std::shared_ptr( + new opentelemetry::sdk::metrics::UngroupedMetricsProcessor(false)), + .05); + + auto instr = meter->NewIntCounter("test", "none", "none", true); + std::map labels = {{"key", "value"}}; + auto labelkv = trace::KeyValueIterableView{labels}; + + alpha.start(); + + for (int i = 0; i < 20; i++) + { + instr->add(i, labelkv); + } + alpha.stop(); +} + +} // namespace metrics +} // namespace sdk +OPENTELEMETRY_END_NAMESPACE