Skip to content

Commit

Permalink
Add Metrics Controller (open-telemetry#231)
Browse files Browse the repository at this point in the history
  • Loading branch information
ankit-bhargava authored Aug 6, 2020
1 parent 6f6978d commit b4d5234
Show file tree
Hide file tree
Showing 7 changed files with 220 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,10 @@ class CounterAggregator final : public Aggregator<T>
*/
void checkpoint() override
{
this->mu_.lock();
this->checkpoint_ = this->values_;
this->values_[0] = 0;
this->mu_.unlock();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ class HistogramAggregator final : public Aggregator<T>
*/
void update(T val) override
{
this->mu_.lock();
int bucketID = boundaries_.size();
for (size_t i = 0; i < boundaries_.size(); i++)
{
Expand All @@ -76,7 +77,6 @@ class HistogramAggregator final : public Aggregator<T>
// auto pos = std::lower_bound (boundaries_.begin(), boundaries_.end(), val);
// bucketCounts_[pos-boundaries_.begin()] += 1;

this->mu_.lock();
this->values_[0] += val;
this->values_[1] += 1;
bucketCounts_[bucketID] += 1;
Expand All @@ -92,11 +92,13 @@ class HistogramAggregator final : public Aggregator<T>
*/
void checkpoint() override
{
this->mu_.lock();
this->checkpoint_ = this->values_;
this->values_[0] = 0;
this->values_[1] = 0;
bucketCounts_ckpt_ = bucketCounts_;
std::fill(bucketCounts_.begin(), bucketCounts_.end(), 0);
this->mu_.unlock();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,6 @@ class SketchAggregator final : public Aggregator<T>
*/
void merge(SketchAggregator other)
{
this->mu_.lock();
if (gamma != other.gamma)
{
#if __EXCEPTIONS
Expand All @@ -166,6 +165,7 @@ class SketchAggregator final : public Aggregator<T>
#endif
}

this->mu_.lock();
this->values_[0] += other.values_[0];
this->values_[1] += other.values_[1];
this->checkpoint_[0] += other.checkpoint_[0];
Expand Down
150 changes: 150 additions & 0 deletions sdk/include/opentelemetry/sdk/metrics/controller.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
#pragma once

#include <atomic>
#include <iostream>
#include <sstream>
#include <thread>
#include <vector>
#include "opentelemetry/exporters/ostream/metrics_exporter.h"
#include "opentelemetry/metrics/instrument.h"
#include "opentelemetry/nostd/unique_ptr.h"
#include "opentelemetry/sdk/metrics/exporter.h"
#include "opentelemetry/sdk/metrics/meter.h"
#include "opentelemetry/sdk/metrics/processor.h"
#include "opentelemetry/sdk/metrics/record.h"
#include "opentelemetry/version.h"

namespace metrics_api = opentelemetry::metrics;
namespace trace_api = opentelemetry::trace;

OPENTELEMETRY_BEGIN_NAMESPACE
namespace sdk
{
namespace metrics
{

class PushController
{

public:
PushController(nostd::shared_ptr<metrics_api::Meter> meter,
nostd::unique_ptr<MetricsExporter> exporter,
nostd::shared_ptr<MetricsProcessor> processor,
double period,
int timeout = 30)
{
meter_ = meter;
exporter_ = std::move(exporter);
processor_ = processor;
timeout_ = (unsigned int)(timeout * 1000000); // convert seconds to microseconds
period_ = (unsigned int)(period * 1000000);
}

/*
* Used to check if the metrics pipeline is currecntly active
*
* @param none
* @return true when active, false when on standby
*/
bool isActive() { return active_.load(); }

/*
* Begins the data processing and export pipeline. The function first ensures that the pipeline
* is not already running. If not, it begins and detaches a new thread for the Controller's run
* function which periodically polls the instruments for their data.
*
* @param none
* @return a boolean which is true when the pipeline is successfully started and false when
* already active
*/
bool start()
{
if (!active_.load())
{
active_ = true;
std::thread runner(&PushController::run, this);
runner.detach();
return true;
}
return false;
}

/*
* Ends the processing and export pipeline then exports metrics one last time
* before returning.
*
* @param none
* @return none
*/
void stop()
{
if (active_.load())
{
active_ = false;
while (running_.load())
{
std::this_thread::sleep_for(
std::chrono::microseconds(period_ / 100)); // wait until the runner thread concludes
}
tick(); // flush metrics sitting in the processor
}
}

private:
/*
* Run the tick function at a regular interval. This function
* should be run in its own thread.
*
* Used to wait between collection intervals.
*/
void run()
{
if (!running_.load())
{
running_ = true;
while (active_.load())
{
tick();
std::this_thread::sleep_for(std::chrono::microseconds(period_));
}
running_ = false;
;
}
}

/*
* Tick
*
* Called at regular intervals, this function collects all values from the
* member variable meter_, then sends them to the processor_ for
* processing. After the records have been processed they are sent to the
* exporter_ to be exported.
*
*/
void tick()
{
this->mu_.lock();
std::vector<Record> collected = dynamic_cast<Meter *>(meter_.get())->Collect();
for (const auto &rec : collected)
{
processor_->process(rec);
}
collected = processor_->CheckpointSelf();
processor_->FinishedCollection();
exporter_->Export(collected);
this->mu_.unlock();
}

nostd::shared_ptr<metrics_api::Meter> meter_;
nostd::unique_ptr<MetricsExporter> exporter_;
nostd::shared_ptr<MetricsProcessor> processor_;
std::mutex mu_;
std::atomic<bool> active_ = ATOMIC_VAR_INIT(false);
std::atomic<bool> running_ = ATOMIC_VAR_INIT(false);
unsigned int period_;
unsigned int timeout_;
};

} // namespace metrics
} // namespace sdk
OPENTELEMETRY_END_NAMESPACE
8 changes: 7 additions & 1 deletion sdk/include/opentelemetry/sdk/metrics/instrument.h
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,13 @@ class BoundSynchronousInstrument : public Instrument,
* @param none
* @return current ref count of the instrument
*/
virtual int get_ref() override { return ref_; }
virtual int get_ref() override
{
this->mu_.lock();
auto ret = ref_;
this->mu_.unlock();
return ret;
}

/**
* Records a single synchronous metric event via a call to the aggregator.
Expand Down
12 changes: 12 additions & 0 deletions sdk/test/metrics/BUILD
Original file line number Diff line number Diff line change
@@ -1,3 +1,15 @@
cc_test(
name = "controller_test",
srcs = [
"controller_test.cc",
],
deps = [
"//exporters/ostream:ostream_metrics_exporter",
"//sdk/src/metrics",
"@com_google_googletest//:gtest_main",
],
)

cc_test(
name = "gauge_aggregator_test",
srcs = [
Expand Down
45 changes: 45 additions & 0 deletions sdk/test/metrics/controller_test.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
#include "opentelemetry/sdk/metrics/controller.h"
#include "opentelemetry/sdk/metrics/meter.h"
#include "opentelemetry/sdk/metrics/ungrouped_processor.h"

#include <gtest/gtest.h>
#include <numeric>
#include <thread>
// #include <chrono>

namespace metrics_api = opentelemetry::metrics;

OPENTELEMETRY_BEGIN_NAMESPACE
namespace sdk
{
namespace metrics
{

TEST(Controller, Constructor)
{

std::shared_ptr<metrics_api::Meter> meter =
std::shared_ptr<metrics_api::Meter>(new Meter("Test"));
PushController alpha(meter,
std::unique_ptr<MetricsExporter>(
new opentelemetry::exporter::metrics::OStreamMetricsExporter),
std::shared_ptr<MetricsProcessor>(
new opentelemetry::sdk::metrics::UngroupedMetricsProcessor(false)),
.05);

auto instr = meter->NewIntCounter("test", "none", "none", true);
std::map<std::string, std::string> labels = {{"key", "value"}};
auto labelkv = trace::KeyValueIterableView<decltype(labels)>{labels};

alpha.start();

for (int i = 0; i < 20; i++)
{
instr->add(i, labelkv);
}
alpha.stop();
}

} // namespace metrics
} // namespace sdk
OPENTELEMETRY_END_NAMESPACE

0 comments on commit b4d5234

Please sign in to comment.