Skip to content

Commit

Permalink
Update MetricProducer interface to match spec
Browse files Browse the repository at this point in the history
  • Loading branch information
punya committed Sep 1, 2024
1 parent a920898 commit c77980f
Show file tree
Hide file tree
Showing 6 changed files with 45 additions and 30 deletions.
18 changes: 8 additions & 10 deletions exporters/prometheus/test/collector_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,13 @@
#include <thread>

using opentelemetry::exporter::metrics::PrometheusCollector;
using opentelemetry::sdk::metrics::MetricProducer;
using opentelemetry::sdk::metrics::ResourceMetrics;
namespace metric_api = opentelemetry::metrics;
namespace metric_sdk = opentelemetry::sdk::metrics;
namespace metric_exporter = opentelemetry::exporter::metrics;

class MockMetricProducer : public opentelemetry::sdk::metrics::MetricProducer
class MockMetricProducer : public MetricProducer
{
TestDataPoints test_data_points_;

Expand All @@ -26,13 +27,12 @@ class MockMetricProducer : public opentelemetry::sdk::metrics::MetricProducer
: sleep_ms_{sleep_ms}
{}

bool Collect(nostd::function_ref<bool(ResourceMetrics &)> callback) noexcept override
MetricProducer::Result Produce() noexcept override
{
std::this_thread::sleep_for(sleep_ms_);
data_sent_size_++;
ResourceMetrics data = test_data_points_.CreateSumPointData();
callback(data);
return true;
return {data, MetricProducer::Status::kSuccess};
}

size_t GetDataCount() { return data_sent_size_; }
Expand Down Expand Up @@ -70,15 +70,13 @@ class MockMetricReader : public opentelemetry::sdk::metrics::MetricReader
*/
TEST(PrometheusCollector, BasicTests)
{
MockMetricReader *reader = new MockMetricReader();
MockMetricProducer *producer = new MockMetricProducer();
reader->SetMetricProducer(producer);
PrometheusCollector collector(reader, true, false);
MockMetricReader reader;
MockMetricProducer producer;
reader.SetMetricProducer(&producer);
PrometheusCollector collector(&reader, true, false);
auto data = collector.Collect();

// Collection size should be the same as the size
// of the records collection produced by MetricProducer.
ASSERT_EQ(data.size(), 2);
delete reader;
delete producer;
}
38 changes: 27 additions & 11 deletions sdk/include/opentelemetry/sdk/metrics/export/metric_producer.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
#include <vector>

#include "opentelemetry/nostd/function_ref.h"
#include "opentelemetry/nostd/variant.h"
#include "opentelemetry/sdk/metrics/data/metric_data.h"
#include "opentelemetry/version.h"

Expand Down Expand Up @@ -70,27 +71,42 @@ struct ResourceMetrics
};

/**
* MetricProducer is the interface that is used to make metric data available to the
* OpenTelemetry exporters. Implementations should be stateful, in that each call to
* `Collect` will return any metric generated since the last call was made.
* MetricProducer defines the interface which bridges to third-party metric sources MUST implement,
* so they can be plugged into an OpenTelemetry MetricReader as a source of aggregated metric data.
*
* <p>Implementations must be thread-safe.
* Implementations must be thread-safe, and should accept configuration for the
* AggregationTemporality of produced metrics.
*/

class MetricProducer
{
public:
MetricProducer() = default;
virtual ~MetricProducer() = default;

MetricProducer(const MetricProducer &) = delete;
MetricProducer(const MetricProducer &&) = delete;
void operator=(const MetricProducer &) = delete;
void operator=(const MetricProducer &&) = delete;

enum class Status
{
kSuccess,
kFailure,
kTimeout,
};

struct Result
{
ResourceMetrics points_;
Status status_;
};

/**
* The callback to be called for each metric exporter. This will only be those
* metrics that have been produced since the last time this method was called.
*
* @return a status of completion of method.
* Produce returns a batch of Metric Points, with a single instrumentation scope that identifies
* the MetricProducer. Implementations may return successfully collected points even if there is a
* partial failure.
*/
virtual bool Collect(
nostd::function_ref<bool(ResourceMetrics &metric_data)> callback) noexcept = 0;
virtual Result Produce() noexcept = 0;
};

} // namespace metrics
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ class MetricCollector : public MetricProducer, public CollectorHandle
*
* @return a status of completion of method.
*/
bool Collect(nostd::function_ref<bool(ResourceMetrics &metric_data)> callback) noexcept override;
Result Produce() noexcept override;

bool ForceFlush(std::chrono::microseconds timeout = (std::chrono::microseconds::max)()) noexcept;

Expand Down
6 changes: 4 additions & 2 deletions sdk/src/metrics/metric_reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ bool MetricReader::Collect(
if (!metric_producer_)
{
OTEL_INTERNAL_LOG_WARN(
"MetricReader::Collect Cannot invoke Collect(). No MetricProducer registered for "
"MetricReader::Collect Cannot invoke Produce(). No MetricProducer registered for "
"collection!")
return false;
}
Expand All @@ -36,7 +36,9 @@ bool MetricReader::Collect(
OTEL_INTERNAL_LOG_WARN("MetricReader::Collect invoked while Shutdown in progress!");
}

return metric_producer_->Collect(callback);
auto result = metric_producer_->Produce();
auto success = callback(result.points_);
return (result.status_ == MetricProducer::Status::kSuccess) && success;
}

bool MetricReader::Shutdown(std::chrono::microseconds timeout) noexcept
Expand Down
9 changes: 4 additions & 5 deletions sdk/src/metrics/state/metric_collector.cc
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ namespace sdk
{
namespace metrics
{
using opentelemetry::sdk::resource::Resource;

MetricCollector::MetricCollector(opentelemetry::sdk::metrics::MeterContext *context,
std::shared_ptr<MetricReader> metric_reader)
Expand All @@ -38,14 +39,13 @@ AggregationTemporality MetricCollector::GetAggregationTemporality(
return metric_reader_->GetAggregationTemporality(instrument_type);
}

bool MetricCollector::Collect(
nostd::function_ref<bool(ResourceMetrics &metric_data)> callback) noexcept
MetricProducer::Result MetricCollector::Produce() noexcept
{
if (!meter_context_)
{
OTEL_INTERNAL_LOG_ERROR("[MetricCollector::Collect] - Error during collecting."
<< "The metric context is invalid");
return false;
return {{}, MetricProducer::Status::kFailure};
}
ResourceMetrics resource_metrics;
meter_context_->ForEachMeter([&](const std::shared_ptr<Meter> &meter) noexcept {
Expand All @@ -61,8 +61,7 @@ bool MetricCollector::Collect(
return true;
});
resource_metrics.resource_ = &meter_context_->GetResource();
callback(resource_metrics);
return true;
return {resource_metrics, MetricProducer::Status::kSuccess};
}

bool MetricCollector::ForceFlush(std::chrono::microseconds timeout) noexcept
Expand Down
2 changes: 1 addition & 1 deletion sdk/test/metrics/metric_reader_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -25,5 +25,5 @@ TEST(MetricReaderTest, BasicTests)
std::shared_ptr<MeterContext> meter_context2(new MeterContext());
std::shared_ptr<MetricProducer> metric_producer{
new MetricCollector(meter_context2.get(), std::move(metric_reader2))};
metric_producer->Collect([](ResourceMetrics & /* metric_data */) { return true; });
metric_producer->Produce();
}

0 comments on commit c77980f

Please sign in to comment.