From 0b6f8fbc69d25ea6cbca66901eefa08c80c05a9f Mon Sep 17 00:00:00 2001 From: Lalit Kumar Bhasin Date: Tue, 19 Apr 2022 01:22:58 +0530 Subject: [PATCH 1/7] initial commit -async storage --- .../metrics/aggregation/default_aggregation.h | 51 +++++++ .../metrics/aggregation/drop_aggregation.h | 2 + .../aggregation/histogram_aggregation.h | 10 +- .../aggregation/lastvalue_aggregation.h | 6 +- .../sdk/metrics/aggregation/sum_aggregation.h | 10 +- .../sdk/metrics/state/async_metric_storage.h | 43 ++++-- .../metrics/state/temporal_metric_storage.h | 47 ++++++ sdk/src/metrics/CMakeLists.txt | 1 + .../aggregation/histogram_aggregation.cc | 8 ++ .../aggregation/lastvalue_aggregation.cc | 10 ++ .../metrics/aggregation/sum_aggregation.cc | 4 + .../metrics/state/temporal_metric_storage.cc | 134 ++++++++++++++++++ sdk/test/metrics/async_metric_storage_test.cc | 106 ++++++++++---- 13 files changed, 383 insertions(+), 49 deletions(-) create mode 100644 sdk/include/opentelemetry/sdk/metrics/state/temporal_metric_storage.h create mode 100644 sdk/src/metrics/state/temporal_metric_storage.cc diff --git a/sdk/include/opentelemetry/sdk/metrics/aggregation/default_aggregation.h b/sdk/include/opentelemetry/sdk/metrics/aggregation/default_aggregation.h index b5a1283d26..90faa51105 100644 --- a/sdk/include/opentelemetry/sdk/metrics/aggregation/default_aggregation.h +++ b/sdk/include/opentelemetry/sdk/metrics/aggregation/default_aggregation.h @@ -90,6 +90,57 @@ class DefaultAggregation return DefaultAggregation::CreateAggregation(instrument_descriptor); } } + + static std::unique_ptr CloneAggregation(AggregationType aggregation_type, + InstrumentDescriptor instrument_descriptor, + const Aggregation &to_copy) + { + const PointType point_data = to_copy.ToPoint(); + switch (aggregation_type) + { + case AggregationType::kDrop: + return std::unique_ptr(new DropAggregation()); + break; + case AggregationType::kHistogram: + if (instrument_descriptor.value_type_ == InstrumentValueType::kLong) + { + return std::unique_ptr( + new LongHistogramAggregation(nostd::get(point_data))); + } + else + { + return std::unique_ptr( + new DoubleHistogramAggregation(nostd::get(point_data))); + } + break; + case AggregationType::kLastValue: + if (instrument_descriptor.value_type_ == InstrumentValueType::kLong) + { + return std::unique_ptr( + new LongLastValueAggregation(nostd::get(point_data))); + } + else + { + return std::unique_ptr( + new DoubleLastValueAggregation(nostd::get(point_data))); + } + break; + case AggregationType::kSum: + if (instrument_descriptor.value_type_ == InstrumentValueType::kLong) + { + return std::unique_ptr( + new LongSumAggregation(nostd::get(point_data))); + } + else + { + return std::unique_ptr( + new DoubleSumAggregation(nostd::get(point_data))); + } + break; + default: + return DefaultAggregation::CreateAggregation(instrument_descriptor); + } + } }; } // namespace metrics diff --git a/sdk/include/opentelemetry/sdk/metrics/aggregation/drop_aggregation.h b/sdk/include/opentelemetry/sdk/metrics/aggregation/drop_aggregation.h index 4e29fa2e46..16d5892740 100644 --- a/sdk/include/opentelemetry/sdk/metrics/aggregation/drop_aggregation.h +++ b/sdk/include/opentelemetry/sdk/metrics/aggregation/drop_aggregation.h @@ -23,6 +23,8 @@ class DropAggregation : public Aggregation public: DropAggregation() = default; + DropAggregation(const DropPointData &point_data) {} + void Aggregate(long value, const PointAttributes &attributes = {}) noexcept override {} void Aggregate(double value, const PointAttributes &attributes = {}) noexcept override {} diff --git a/sdk/include/opentelemetry/sdk/metrics/aggregation/histogram_aggregation.h b/sdk/include/opentelemetry/sdk/metrics/aggregation/histogram_aggregation.h index b5cc2c349e..e2a55fba58 100644 --- a/sdk/include/opentelemetry/sdk/metrics/aggregation/histogram_aggregation.h +++ b/sdk/include/opentelemetry/sdk/metrics/aggregation/histogram_aggregation.h @@ -19,6 +19,7 @@ class LongHistogramAggregation : public Aggregation public: LongHistogramAggregation(); LongHistogramAggregation(HistogramPointData &&); + LongHistogramAggregation(const HistogramPointData &); void Aggregate(long value, const PointAttributes &attributes = {}) noexcept override; @@ -26,14 +27,14 @@ class LongHistogramAggregation : public Aggregation /* Returns the result of merge of the existing aggregation with delta aggregation with same * boundaries */ - virtual std::unique_ptr Merge(const Aggregation &delta) const noexcept override; + std::unique_ptr Merge(const Aggregation &delta) const noexcept override; /* Returns the new delta aggregation by comparing existing aggregation with next aggregation with * same boundaries. Data points for `next` aggregation (sum , bucket-counts) should be more than * the current aggregation - which is the normal scenario as measurements values are monotonic * increasing. */ - virtual std::unique_ptr Diff(const Aggregation &next) const noexcept override; + std::unique_ptr Diff(const Aggregation &next) const noexcept override; PointType ToPoint() const noexcept override; @@ -47,6 +48,7 @@ class DoubleHistogramAggregation : public Aggregation public: DoubleHistogramAggregation(); DoubleHistogramAggregation(HistogramPointData &&); + DoubleHistogramAggregation(const HistogramPointData &); void Aggregate(long value, const PointAttributes &attributes = {}) noexcept override {} @@ -54,14 +56,14 @@ class DoubleHistogramAggregation : public Aggregation /* Returns the result of merge of the existing aggregation with delta aggregation with same * boundaries */ - virtual std::unique_ptr Merge(const Aggregation &delta) const noexcept override; + std::unique_ptr Merge(const Aggregation &delta) const noexcept override; /* Returns the new delta aggregation by comparing existing aggregation with next aggregation with * same boundaries. Data points for `next` aggregation (sum , bucket-counts) should be more than * the current aggregation - which is the normal scenario as measurements values are monotonic * increasing. */ - virtual std::unique_ptr Diff(const Aggregation &next) const noexcept override; + std::unique_ptr Diff(const Aggregation &next) const noexcept override; PointType ToPoint() const noexcept override; diff --git a/sdk/include/opentelemetry/sdk/metrics/aggregation/lastvalue_aggregation.h b/sdk/include/opentelemetry/sdk/metrics/aggregation/lastvalue_aggregation.h index 7f185d51a1..3b2c08f8ce 100644 --- a/sdk/include/opentelemetry/sdk/metrics/aggregation/lastvalue_aggregation.h +++ b/sdk/include/opentelemetry/sdk/metrics/aggregation/lastvalue_aggregation.h @@ -18,14 +18,15 @@ class LongLastValueAggregation : public Aggregation public: LongLastValueAggregation(); LongLastValueAggregation(LastValuePointData &&); + LongLastValueAggregation(const LastValuePointData &); void Aggregate(long value, const PointAttributes &attributes = {}) noexcept override; void Aggregate(double value, const PointAttributes &attributes = {}) noexcept override {} - virtual std::unique_ptr Merge(const Aggregation &delta) const noexcept override; + std::unique_ptr Merge(const Aggregation &delta) const noexcept override; - virtual std::unique_ptr Diff(const Aggregation &next) const noexcept override; + std::unique_ptr Diff(const Aggregation &next) const noexcept override; PointType ToPoint() const noexcept override; @@ -39,6 +40,7 @@ class DoubleLastValueAggregation : public Aggregation public: DoubleLastValueAggregation(); DoubleLastValueAggregation(LastValuePointData &&); + DoubleLastValueAggregation(const LastValuePointData &); void Aggregate(long value, const PointAttributes &attributes = {}) noexcept override {} diff --git a/sdk/include/opentelemetry/sdk/metrics/aggregation/sum_aggregation.h b/sdk/include/opentelemetry/sdk/metrics/aggregation/sum_aggregation.h index b0f0169b24..14f13bd727 100644 --- a/sdk/include/opentelemetry/sdk/metrics/aggregation/sum_aggregation.h +++ b/sdk/include/opentelemetry/sdk/metrics/aggregation/sum_aggregation.h @@ -19,14 +19,15 @@ class LongSumAggregation : public Aggregation public: LongSumAggregation(); LongSumAggregation(SumPointData &&); + LongSumAggregation(const SumPointData &); void Aggregate(long value, const PointAttributes &attributes = {}) noexcept override; void Aggregate(double value, const PointAttributes &attributes = {}) noexcept override {} - virtual std::unique_ptr Merge(const Aggregation &delta) const noexcept override; + std::unique_ptr Merge(const Aggregation &delta) const noexcept override; - virtual std::unique_ptr Diff(const Aggregation &next) const noexcept override; + std::unique_ptr Diff(const Aggregation &next) const noexcept override; PointType ToPoint() const noexcept override; @@ -40,14 +41,15 @@ class DoubleSumAggregation : public Aggregation public: DoubleSumAggregation(); DoubleSumAggregation(SumPointData &&); + DoubleSumAggregation(const SumPointData &); void Aggregate(long value, const PointAttributes &attributes = {}) noexcept override {} void Aggregate(double value, const PointAttributes &attributes = {}) noexcept override; - virtual std::unique_ptr Merge(const Aggregation &delta) const noexcept override; + std::unique_ptr Merge(const Aggregation &delta) const noexcept override; - virtual std::unique_ptr Diff(const Aggregation &next) const noexcept override; + std::unique_ptr Diff(const Aggregation &next) const noexcept override; PointType ToPoint() const noexcept override; diff --git a/sdk/include/opentelemetry/sdk/metrics/state/async_metric_storage.h b/sdk/include/opentelemetry/sdk/metrics/state/async_metric_storage.h index cfbf521538..8372bed917 100644 --- a/sdk/include/opentelemetry/sdk/metrics/state/async_metric_storage.h +++ b/sdk/include/opentelemetry/sdk/metrics/state/async_metric_storage.h @@ -3,6 +3,7 @@ #pragma once #ifndef ENABLE_METRICS_PREVIEW +# include # include "opentelemetry/sdk/common/attributemap_hash.h" # include "opentelemetry/sdk/metrics/aggregation/default_aggregation.h" # include "opentelemetry/sdk/metrics/instruments.h" @@ -10,10 +11,9 @@ # include "opentelemetry/sdk/metrics/state/attributes_hashmap.h" # include "opentelemetry/sdk/metrics/state/metric_collector.h" # include "opentelemetry/sdk/metrics/state/metric_storage.h" +# include "opentelemetry/sdk/metrics/state/temporal_metric_storage.h" # include "opentelemetry/sdk/metrics/view/attributes_processor.h" -# include - OPENTELEMETRY_BEGIN_NAMESPACE namespace sdk { @@ -32,7 +32,8 @@ class AsyncMetricStorage : public MetricStorage aggregation_type_{aggregation_type}, measurement_collection_callback_{measurement_callback}, attributes_processor_{attributes_processor}, - active_attributes_hashmap_(new AttributesHashMap()) + cumulative_hash_map_(new AttributesHashMap()), + temporal_metric_storage_(instrument_descriptor) {} bool Collect(CollectorHandle *collector, @@ -45,22 +46,33 @@ class AsyncMetricStorage : public MetricStorage // read the measurement using configured callback measurement_collection_callback_(ob_res); - + std::shared_ptr delta_hash_map(new AttributesHashMap()); // process the read measurements - aggregate and store in hashmap for (auto &measurement : ob_res.GetMeasurements()) { - auto agg = DefaultAggregation::CreateAggregation(aggregation_type_, instrument_descriptor_); - agg->Aggregate(measurement.second); - active_attributes_hashmap_->Set(measurement.first, std::move(agg)); + auto aggr = DefaultAggregation::CreateAggregation(aggregation_type_, instrument_descriptor_); + aggr->Aggregate(measurement.second); + auto prev = cumulative_hash_map_->Get(measurement.first); + if (prev != nullptr) + { + auto delta = prev->Diff(*aggr); + cumulative_hash_map_->Set(measurement.first, + DefaultAggregation::CloneAggregation( + aggregation_type_, instrument_descriptor_, *delta)); + delta_hash_map->Set(measurement.first, std::move(delta)); + } + else + { + cumulative_hash_map_->Set( + measurement.first, + DefaultAggregation::CloneAggregation(aggregation_type_, instrument_descriptor_, *aggr)); + delta_hash_map->Set(measurement.first, std::move(aggr)); + } } - // TBD -> read aggregation from hashmap, and perform metric collection - MetricData metric_data; - if (metric_collection_callback(std::move(metric_data))) - { - return true; - } - return false; + return temporal_metric_storage_.buildMetrics(collector, collectors, sdk_start_ts, collection_ts, + std::move(delta_hash_map), + metric_collection_callback); } private: @@ -68,7 +80,8 @@ class AsyncMetricStorage : public MetricStorage AggregationType aggregation_type_; void (*measurement_collection_callback_)(opentelemetry::metrics::ObserverResult &); const AttributesProcessor *attributes_processor_; - std::unique_ptr active_attributes_hashmap_; + std::unique_ptr cumulative_hash_map_; + TemporalMetricStorage temporal_metric_storage_; }; } // namespace metrics diff --git a/sdk/include/opentelemetry/sdk/metrics/state/temporal_metric_storage.h b/sdk/include/opentelemetry/sdk/metrics/state/temporal_metric_storage.h new file mode 100644 index 0000000000..21bdaf7b56 --- /dev/null +++ b/sdk/include/opentelemetry/sdk/metrics/state/temporal_metric_storage.h @@ -0,0 +1,47 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +#pragma once +#ifndef ENABLE_METRICS_PREVIEW +# include "opentelemetry/sdk/metrics/state/attributes_hashmap.h" +# include "opentelemetry/sdk/metrics/state/metric_collector.h" + +# include + +OPENTELEMETRY_BEGIN_NAMESPACE +namespace sdk +{ +namespace metrics +{ + +struct LastReportedMetrics +{ + std::unique_ptr attributes_map; + opentelemetry::common::SystemTimestamp collection_ts; +}; + +class TemporalMetricStorage +{ +public: + TemporalMetricStorage(InstrumentDescriptor instrument_descriptor); + + bool buildMetrics(CollectorHandle *collector, + nostd::span> collectors, + opentelemetry::common::SystemTimestamp sdk_start_ts, + opentelemetry::common::SystemTimestamp collection_ts, + std::shared_ptr delta_metrics, + nostd::function_ref callback) noexcept; + +private: + InstrumentDescriptor instrument_descriptor_; + + // unreported metrics stash for all the collectors + std::unordered_map>> + unreported_metrics_; + // last reported metrics stash for all the collectors. + std::unordered_map last_reported_metrics_; +}; +} // namespace metrics +} // namespace sdk +OPENTELEMETRY_END_NAMESPACE +#endif diff --git a/sdk/src/metrics/CMakeLists.txt b/sdk/src/metrics/CMakeLists.txt index b6656b5bf8..77a371a80c 100644 --- a/sdk/src/metrics/CMakeLists.txt +++ b/sdk/src/metrics/CMakeLists.txt @@ -7,6 +7,7 @@ add_library( export/periodic_exporting_metric_reader.cc state/metric_collector.cc state/sync_metric_storage.cc + state/temporal_metric_storage.cc aggregation/histogram_aggregation.cc aggregation/lastvalue_aggregation.cc aggregation/sum_aggregation.cc diff --git a/sdk/src/metrics/aggregation/histogram_aggregation.cc b/sdk/src/metrics/aggregation/histogram_aggregation.cc index 27405999c9..aa2be74713 100644 --- a/sdk/src/metrics/aggregation/histogram_aggregation.cc +++ b/sdk/src/metrics/aggregation/histogram_aggregation.cc @@ -25,6 +25,10 @@ LongHistogramAggregation::LongHistogramAggregation(HistogramPointData &&data) : point_data_{std::move(data)} {} +LongHistogramAggregation::LongHistogramAggregation(const HistogramPointData &data) + : point_data_{data} +{} + void LongHistogramAggregation::Aggregate(long value, const PointAttributes &attributes) noexcept { const std::lock_guard locked(lock_); @@ -83,6 +87,10 @@ DoubleHistogramAggregation::DoubleHistogramAggregation(HistogramPointData &&data : point_data_{std::move(data)} {} +DoubleHistogramAggregation::DoubleHistogramAggregation(const HistogramPointData &data) + : point_data_{data} +{} + void DoubleHistogramAggregation::Aggregate(double value, const PointAttributes &attributes) noexcept { const std::lock_guard locked(lock_); diff --git a/sdk/src/metrics/aggregation/lastvalue_aggregation.cc b/sdk/src/metrics/aggregation/lastvalue_aggregation.cc index 9c0252be31..a125005335 100644 --- a/sdk/src/metrics/aggregation/lastvalue_aggregation.cc +++ b/sdk/src/metrics/aggregation/lastvalue_aggregation.cc @@ -19,10 +19,15 @@ LongLastValueAggregation::LongLastValueAggregation() point_data_.is_lastvalue_valid_ = false; point_data_.value_ = 0l; } + LongLastValueAggregation::LongLastValueAggregation(LastValuePointData &&data) : point_data_{std::move(data)} {} +LongLastValueAggregation::LongLastValueAggregation(const LastValuePointData &data) + : point_data_{data} +{} + void LongLastValueAggregation::Aggregate(long value, const PointAttributes &attributes) noexcept { const std::lock_guard locked(lock_); @@ -71,10 +76,15 @@ DoubleLastValueAggregation::DoubleLastValueAggregation() point_data_.is_lastvalue_valid_ = false; point_data_.value_ = 0.0; } + DoubleLastValueAggregation::DoubleLastValueAggregation(LastValuePointData &&data) : point_data_{std::move(data)} {} +DoubleLastValueAggregation::DoubleLastValueAggregation(const LastValuePointData &data) + : point_data_{data} +{} + void DoubleLastValueAggregation::Aggregate(double value, const PointAttributes &attributes) noexcept { const std::lock_guard locked(lock_); diff --git a/sdk/src/metrics/aggregation/sum_aggregation.cc b/sdk/src/metrics/aggregation/sum_aggregation.cc index 94b871cd34..5ca786496e 100644 --- a/sdk/src/metrics/aggregation/sum_aggregation.cc +++ b/sdk/src/metrics/aggregation/sum_aggregation.cc @@ -22,6 +22,8 @@ LongSumAggregation::LongSumAggregation() LongSumAggregation::LongSumAggregation(SumPointData &&data) : point_data_{std::move(data)} {} +LongSumAggregation::LongSumAggregation(const SumPointData &data) : point_data_{data} {} + void LongSumAggregation::Aggregate(long value, const PointAttributes &attributes) noexcept { const std::lock_guard locked(lock_); @@ -64,6 +66,8 @@ DoubleSumAggregation::DoubleSumAggregation() DoubleSumAggregation::DoubleSumAggregation(SumPointData &&data) : point_data_(std::move(data)) {} +DoubleSumAggregation::DoubleSumAggregation(const SumPointData &data) : point_data_(data) {} + void DoubleSumAggregation::Aggregate(double value, const PointAttributes &attributes) noexcept { const std::lock_guard locked(lock_); diff --git a/sdk/src/metrics/state/temporal_metric_storage.cc b/sdk/src/metrics/state/temporal_metric_storage.cc new file mode 100644 index 0000000000..fde1e7b5fd --- /dev/null +++ b/sdk/src/metrics/state/temporal_metric_storage.cc @@ -0,0 +1,134 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +#ifndef ENABLE_METRICS_PREVIEW + +# include "opentelemetry/sdk/metrics/state/temporal_metric_storage.h" +# include "opentelemetry/sdk/metrics/aggregation/default_aggregation.h" + +OPENTELEMETRY_BEGIN_NAMESPACE +namespace sdk +{ +namespace metrics +{ + +TemporalMetricStorage::TemporalMetricStorage(InstrumentDescriptor instrument_descriptor) + : instrument_descriptor_(instrument_descriptor) +{} + +bool TemporalMetricStorage::buildMetrics(CollectorHandle *collector, + nostd::span> collectors, + opentelemetry::common::SystemTimestamp sdk_start_ts, + opentelemetry::common::SystemTimestamp collection_ts, + std::shared_ptr delta_metrics, + nostd::function_ref callback) noexcept +{ + opentelemetry::common::SystemTimestamp last_collection_ts = sdk_start_ts; + auto aggregation_temporarily = collector->GetAggregationTemporality(); + for (auto &col : collectors) + { + unreported_metrics_[col.get()].push_back(delta_metrics); + } + + // Get the unreported metrics for the `collector` from `unreported metrics stash` + // since last collection, this will also cleanup the unreported metrics for `collector` + // from the stash. + auto present = unreported_metrics_.find(collector); + if (present == unreported_metrics_.end()) + { + // no unreported metrics for the collector, return. + return true; + } + auto unreported_list = std::move(present->second); + + // Iterate over the unreporter metrics for `collector` and store result in `merged_metrics` + std::unique_ptr merged_metrics(new AttributesHashMap); + for (auto &agg_hashmap : unreported_list) + { + agg_hashmap->GetAllEnteries([&merged_metrics, this](const MetricAttributes &attributes, + Aggregation &aggregation) { + auto agg = merged_metrics->Get(attributes); + if (agg) + { + merged_metrics->Set(attributes, std::move(agg->Merge(aggregation))); + } + else + { + merged_metrics->Set( + attributes, + std::move( + DefaultAggregation::CreateAggregation(instrument_descriptor_)->Merge(aggregation))); + merged_metrics->GetAllEnteries( + [](const MetricAttributes &attr, Aggregation &aggr) { return true; }); + } + return true; + }); + } + // Get the last reported metrics for the `collector` from `last reported metrics` stash + // - If the aggregation_temporarily for the collector is cumulative + // - Merge the last reported metrics with unreported metrics (which is in merged_metrics), + // Final result of merge would be in merged_metrics. + // - Move the final merge to the `last reported metrics` stash. + // - If the aggregation_temporarily is delta + // - Store the unreported metrics for `collector` (which is in merged_mtrics) to + // `last reported metrics` stash. + + auto reported = last_reported_metrics_.find(collector); + if (reported != last_reported_metrics_.end()) + { + last_collection_ts = last_reported_metrics_[collector].collection_ts; + auto last_aggr_hashmap = std::move(last_reported_metrics_[collector].attributes_map); + if (aggregation_temporarily == AggregationTemporality::kCumulative) + { + // merge current delta to previous cumulative + last_aggr_hashmap->GetAllEnteries( + [&merged_metrics, this](const MetricAttributes &attributes, Aggregation &aggregation) { + auto agg = merged_metrics->Get(attributes); + if (agg) + { + merged_metrics->Set(attributes, agg->Merge(aggregation)); + } + else + { + merged_metrics->Set(attributes, + DefaultAggregation::CreateAggregation(instrument_descriptor_)); + } + return true; + }); + } + last_reported_metrics_[collector] = + LastReportedMetrics{std::move(merged_metrics), collection_ts}; + } + else + { + merged_metrics->GetAllEnteries( + [](const MetricAttributes &attr, Aggregation &aggr) { return true; }); + last_reported_metrics_.insert( + std::make_pair(collector, LastReportedMetrics{std::move(merged_metrics), collection_ts})); + } + + // Generate the MetricData from the final merged_metrics, and invoke callback over it. + + AttributesHashMap *result_to_export = (last_reported_metrics_[collector]).attributes_map.get(); + MetricData metric_data; + metric_data.instrument_descriptor = instrument_descriptor_; + metric_data.start_ts = last_collection_ts; + metric_data.end_ts = collection_ts; + result_to_export->GetAllEnteries( + [&metric_data](const MetricAttributes &attributes, Aggregation &aggregation) { + PointDataAttributes point_data_attr; + point_data_attr.point_data = aggregation.ToPoint(); + point_data_attr.attributes = attributes; + metric_data.point_data_attr_.push_back(point_data_attr); + return true; + }); + return callback(metric_data); + + return true; +} + +} // namespace metrics + +} // namespace sdk +OPENTELEMETRY_END_NAMESPACE +#endif \ No newline at end of file diff --git a/sdk/test/metrics/async_metric_storage_test.cc b/sdk/test/metrics/async_metric_storage_test.cc index 512e24472c..ca7d68bf35 100644 --- a/sdk/test/metrics/async_metric_storage_test.cc +++ b/sdk/test/metrics/async_metric_storage_test.cc @@ -18,46 +18,104 @@ using namespace opentelemetry::sdk::metrics; using namespace opentelemetry::sdk::instrumentationlibrary; using namespace opentelemetry::sdk::resource; -class MockMetricReader : public MetricReader +using namespace opentelemetry::sdk::metrics; +using namespace opentelemetry::common; +using M = std::map; + +class MockCollectorHandle : public CollectorHandle { public: - MockMetricReader(AggregationTemporality aggr_temporality) : MetricReader(aggr_temporality) {} + MockCollectorHandle(AggregationTemporality temp) : temporality(temp) {} - virtual bool OnForceFlush(std::chrono::microseconds timeout) noexcept override { return true; } + AggregationTemporality GetAggregationTemporality() noexcept override { return temporality; } - virtual bool OnShutDown(std::chrono::microseconds timeout) noexcept override { return true; } - - virtual void OnInitialized() noexcept override {} +private: + AggregationTemporality temporality; }; -void measurement_fetch(opentelemetry::metrics::ObserverResult &observer_result) +class WritableMetricStorageTestFixture : public ::testing::TestWithParam +{}; + +class MeasurementFetcher { - observer_result.Observe(20l); - observer_result.Observe(10l); -} +public: + static void Fetcher(opentelemetry::metrics::ObserverResult &observer_result) + { + fetch_count++; + if (fetch_count == 1) + { + observer_result.Observe(20l, {{"RequestType", "GET"}}); + observer_result.Observe(10l, {{"RequestType", "PUT"}}); + number_of_get += 20l; + number_of_put += 10l; + } + else if (fetch_count == 2) + { + observer_result.Observe(40l, {{"RequestType", "GET"}}); + observer_result.Observe(20l, {{"RequestType", "PUT"}}); + number_of_get += 40l; + number_of_put += 20l; + } + } -TEST(AsyncMetricStorageTest, BasicTests) + static size_t fetch_count; + static long number_of_get; + static long number_of_put; +}; + +size_t MeasurementFetcher::fetch_count; +long MeasurementFetcher::number_of_get; +long MeasurementFetcher::number_of_put; + +TEST_P(WritableMetricStorageTestFixture, TestAggregation) { - auto metric_callback = [](MetricData &&metric_data) { return true; }; - InstrumentDescriptor instr_desc = {"name", "desc", "1unit", InstrumentType::kCounter, + + AggregationTemporality temporality = GetParam(); + + InstrumentDescriptor instr_desc = {"name", "desc", "1unit", InstrumentType::kObservableCounter, InstrumentValueType::kLong}; auto sdk_start_ts = std::chrono::system_clock::now(); // Some computation here auto collection_ts = std::chrono::system_clock::now() + std::chrono::seconds(5); - std::vector> exporters; - std::shared_ptr meter_context(new MeterContext(std::move(exporters))); - std::unique_ptr metric_reader(new MockMetricReader(AggregationTemporality::kDelta)); - - std::shared_ptr collector = std::shared_ptr( - new MetricCollector(std::move(meter_context), std::move(metric_reader))); + std::shared_ptr collector(new MockCollectorHandle(temporality)); + std::vector> collectors; + collectors.push_back(collector); + size_t count_attributes = 0; + long value = 0; - std::vector> collectors{collector}; + MeasurementFetcher measurement_fetcher; + opentelemetry::sdk::metrics::AsyncMetricStorage storage(instr_desc, AggregationType::kSum, + MeasurementFetcher::Fetcher, + new DefaultAttributesProcessor()); - opentelemetry::sdk::metrics::AsyncMetricStorage storage( - instr_desc, AggregationType::kSum, &measurement_fetch, new DefaultAttributesProcessor()); - EXPECT_NO_THROW( - storage.Collect(collector.get(), collectors, sdk_start_ts, collection_ts, metric_callback)); + storage.Collect(collector.get(), collectors, sdk_start_ts, collection_ts, + [&](const MetricData data) { + for (auto data_attr : data.point_data_attr_) + { + /*auto data = opentelemetry::nostd::get(data_attr.point_data); + if (opentelemetry::nostd::get( + data_attr.attributes.find("RequestType")->second) == "GET") + { + EXPECT_EQ(opentelemetry::nostd::get(data.value_), + MeasurementFetcher::number_of_get); count_attributes++; + } + else if (opentelemetry::nostd::get( + data_attr.attributes.find("RequestType")->second) == "PUT") + { + EXPECT_EQ(opentelemetry::nostd::get(data.value_), + MeasurementFetcher::number_of_put); count_attributes++; + }*/ + count_attributes++; + } + return true; + }); + EXPECT_EQ(2, count_attributes); } +INSTANTIATE_TEST_SUITE_P(WritableMetricStorageTestLong, + WritableMetricStorageTestFixture, + ::testing::Values(AggregationTemporality::kCumulative, + AggregationTemporality::kDelta)); + #endif \ No newline at end of file From 9c4740dea8fba8c36113e0f3d3016a7d055bd5e0 Mon Sep 17 00:00:00 2001 From: Lalit Kumar Bhasin Date: Tue, 19 Apr 2022 20:13:21 +0530 Subject: [PATCH 2/7] add tests --- .../metrics/aggregation/default_aggregation.h | 3 +++ .../opentelemetry/sdk/metrics/instruments.h | 13 ++++++++++ .../metrics/state/temporal_metric_storage.cc | 4 +-- sdk/test/metrics/async_metric_storage_test.cc | 25 +++++++++++++------ 4 files changed, 35 insertions(+), 10 deletions(-) diff --git a/sdk/include/opentelemetry/sdk/metrics/aggregation/default_aggregation.h b/sdk/include/opentelemetry/sdk/metrics/aggregation/default_aggregation.h index 90faa51105..53c7403056 100644 --- a/sdk/include/opentelemetry/sdk/metrics/aggregation/default_aggregation.h +++ b/sdk/include/opentelemetry/sdk/metrics/aggregation/default_aggregation.h @@ -28,6 +28,7 @@ class DefaultAggregation { case InstrumentType::kCounter: case InstrumentType::kUpDownCounter: + case InstrumentType::kObservableCounter: case InstrumentType::kObservableUpDownCounter: return (instrument_descriptor.value_type_ == InstrumentValueType::kLong) ? std::move(std::unique_ptr(new LongSumAggregation())) @@ -116,11 +117,13 @@ class DefaultAggregation case AggregationType::kLastValue: if (instrument_descriptor.value_type_ == InstrumentValueType::kLong) { + return std::unique_ptr( new LongLastValueAggregation(nostd::get(point_data))); } else { + return std::unique_ptr( new DoubleLastValueAggregation(nostd::get(point_data))); } diff --git a/sdk/include/opentelemetry/sdk/metrics/instruments.h b/sdk/include/opentelemetry/sdk/metrics/instruments.h index 5d85357143..f9d69897f0 100644 --- a/sdk/include/opentelemetry/sdk/metrics/instruments.h +++ b/sdk/include/opentelemetry/sdk/metrics/instruments.h @@ -52,6 +52,19 @@ struct InstrumentDescriptor InstrumentValueType value_type_; }; +static inline const char* get_instrument_type(InstrumentType type) { + switch (type) { + case InstrumentType::kCounter: return "kCounter"; + case InstrumentType::kHistogram: return "khistogram"; + case InstrumentType::kObservableCounter: return "kObservableCounter"; + case InstrumentType::kObservableGauge: return "kObservableGauge"; + case InstrumentType::kObservableUpDownCounter: return "kObservableUpDownCounter"; + case InstrumentType::kUpDownCounter: return "kUpdownCounter"; + default: return "Nothing"; + } + return ""; +} + using MetricAttributes = opentelemetry::sdk::common::OrderedAttributeMap; /*class InstrumentSelector { diff --git a/sdk/src/metrics/state/temporal_metric_storage.cc b/sdk/src/metrics/state/temporal_metric_storage.cc index fde1e7b5fd..9066059650 100644 --- a/sdk/src/metrics/state/temporal_metric_storage.cc +++ b/sdk/src/metrics/state/temporal_metric_storage.cc @@ -14,7 +14,8 @@ namespace metrics TemporalMetricStorage::TemporalMetricStorage(InstrumentDescriptor instrument_descriptor) : instrument_descriptor_(instrument_descriptor) -{} +{ +} bool TemporalMetricStorage::buildMetrics(CollectorHandle *collector, nostd::span> collectors, @@ -40,7 +41,6 @@ bool TemporalMetricStorage::buildMetrics(CollectorHandle *collector, return true; } auto unreported_list = std::move(present->second); - // Iterate over the unreporter metrics for `collector` and store result in `merged_metrics` std::unique_ptr merged_metrics(new AttributesHashMap); for (auto &agg_hashmap : unreported_list) diff --git a/sdk/test/metrics/async_metric_storage_test.cc b/sdk/test/metrics/async_metric_storage_test.cc index ca7d68bf35..99e49ffd62 100644 --- a/sdk/test/metrics/async_metric_storage_test.cc +++ b/sdk/test/metrics/async_metric_storage_test.cc @@ -58,18 +58,26 @@ class MeasurementFetcher } } + static void init_values(){ + fetch_count = 0; + number_of_get = 0; + number_of_put = 0; + } + static size_t fetch_count; static long number_of_get; static long number_of_put; + static const size_t number_of_attributes = 2; // GET , PUT }; size_t MeasurementFetcher::fetch_count; long MeasurementFetcher::number_of_get; long MeasurementFetcher::number_of_put; +const size_t MeasurementFetcher::number_of_attributes; TEST_P(WritableMetricStorageTestFixture, TestAggregation) { - + MeasurementFetcher::init_values(); AggregationTemporality temporality = GetParam(); InstrumentDescriptor instr_desc = {"name", "desc", "1unit", InstrumentType::kObservableCounter, @@ -94,28 +102,29 @@ TEST_P(WritableMetricStorageTestFixture, TestAggregation) [&](const MetricData data) { for (auto data_attr : data.point_data_attr_) { - /*auto data = opentelemetry::nostd::get(data_attr.point_data); + auto data = opentelemetry::nostd::get(data_attr.point_data); if (opentelemetry::nostd::get( data_attr.attributes.find("RequestType")->second) == "GET") { - EXPECT_EQ(opentelemetry::nostd::get(data.value_), - MeasurementFetcher::number_of_get); count_attributes++; + EXPECT_EQ(opentelemetry::nostd::get(data.value_), + MeasurementFetcher::number_of_get); } else if (opentelemetry::nostd::get( data_attr.attributes.find("RequestType")->second) == "PUT") { EXPECT_EQ(opentelemetry::nostd::get(data.value_), - MeasurementFetcher::number_of_put); count_attributes++; - }*/ + MeasurementFetcher::number_of_put); + } count_attributes++; } return true; }); - EXPECT_EQ(2, count_attributes); + EXPECT_EQ(MeasurementFetcher::number_of_attributes, count_attributes); } + INSTANTIATE_TEST_SUITE_P(WritableMetricStorageTestLong, WritableMetricStorageTestFixture, ::testing::Values(AggregationTemporality::kCumulative, - AggregationTemporality::kDelta)); + AggregationTemporality::kDelta)); #endif \ No newline at end of file From b93d4dd67af8b684dfd082e187dfb83021f073d5 Mon Sep 17 00:00:00 2001 From: Lalit Kumar Bhasin Date: Tue, 19 Apr 2022 23:50:09 +0530 Subject: [PATCH 3/7] fix format --- .../opentelemetry/sdk/metrics/instruments.h | 27 ++++++++++++------- .../metrics/state/temporal_metric_storage.cc | 3 +-- sdk/test/metrics/async_metric_storage_test.cc | 15 ++++++----- 3 files changed, 27 insertions(+), 18 deletions(-) diff --git a/sdk/include/opentelemetry/sdk/metrics/instruments.h b/sdk/include/opentelemetry/sdk/metrics/instruments.h index f9d69897f0..260bac4272 100644 --- a/sdk/include/opentelemetry/sdk/metrics/instruments.h +++ b/sdk/include/opentelemetry/sdk/metrics/instruments.h @@ -52,15 +52,24 @@ struct InstrumentDescriptor InstrumentValueType value_type_; }; -static inline const char* get_instrument_type(InstrumentType type) { - switch (type) { - case InstrumentType::kCounter: return "kCounter"; - case InstrumentType::kHistogram: return "khistogram"; - case InstrumentType::kObservableCounter: return "kObservableCounter"; - case InstrumentType::kObservableGauge: return "kObservableGauge"; - case InstrumentType::kObservableUpDownCounter: return "kObservableUpDownCounter"; - case InstrumentType::kUpDownCounter: return "kUpdownCounter"; - default: return "Nothing"; +static inline const char *get_instrument_type(InstrumentType type) +{ + switch (type) + { + case InstrumentType::kCounter: + return "kCounter"; + case InstrumentType::kHistogram: + return "khistogram"; + case InstrumentType::kObservableCounter: + return "kObservableCounter"; + case InstrumentType::kObservableGauge: + return "kObservableGauge"; + case InstrumentType::kObservableUpDownCounter: + return "kObservableUpDownCounter"; + case InstrumentType::kUpDownCounter: + return "kUpdownCounter"; + default: + return "Nothing"; } return ""; } diff --git a/sdk/src/metrics/state/temporal_metric_storage.cc b/sdk/src/metrics/state/temporal_metric_storage.cc index 9066059650..2b2a557f24 100644 --- a/sdk/src/metrics/state/temporal_metric_storage.cc +++ b/sdk/src/metrics/state/temporal_metric_storage.cc @@ -14,8 +14,7 @@ namespace metrics TemporalMetricStorage::TemporalMetricStorage(InstrumentDescriptor instrument_descriptor) : instrument_descriptor_(instrument_descriptor) -{ -} +{} bool TemporalMetricStorage::buildMetrics(CollectorHandle *collector, nostd::span> collectors, diff --git a/sdk/test/metrics/async_metric_storage_test.cc b/sdk/test/metrics/async_metric_storage_test.cc index 99e49ffd62..a4decaaa79 100644 --- a/sdk/test/metrics/async_metric_storage_test.cc +++ b/sdk/test/metrics/async_metric_storage_test.cc @@ -58,8 +58,9 @@ class MeasurementFetcher } } - static void init_values(){ - fetch_count = 0; + static void init_values() + { + fetch_count = 0; number_of_get = 0; number_of_put = 0; } @@ -67,7 +68,7 @@ class MeasurementFetcher static size_t fetch_count; static long number_of_get; static long number_of_put; - static const size_t number_of_attributes = 2; // GET , PUT + static const size_t number_of_attributes = 2; // GET , PUT }; size_t MeasurementFetcher::fetch_count; @@ -106,14 +107,14 @@ TEST_P(WritableMetricStorageTestFixture, TestAggregation) if (opentelemetry::nostd::get( data_attr.attributes.find("RequestType")->second) == "GET") { - EXPECT_EQ(opentelemetry::nostd::get(data.value_), - MeasurementFetcher::number_of_get); + EXPECT_EQ(opentelemetry::nostd::get(data.value_), + MeasurementFetcher::number_of_get); } else if (opentelemetry::nostd::get( data_attr.attributes.find("RequestType")->second) == "PUT") { EXPECT_EQ(opentelemetry::nostd::get(data.value_), - MeasurementFetcher::number_of_put); + MeasurementFetcher::number_of_put); } count_attributes++; } @@ -125,6 +126,6 @@ TEST_P(WritableMetricStorageTestFixture, TestAggregation) INSTANTIATE_TEST_SUITE_P(WritableMetricStorageTestLong, WritableMetricStorageTestFixture, ::testing::Values(AggregationTemporality::kCumulative, - AggregationTemporality::kDelta)); + AggregationTemporality::kDelta)); #endif \ No newline at end of file From 3a6f8ebd60e8f5c06acb328db8d1aba07c554bee Mon Sep 17 00:00:00 2001 From: Lalit Kumar Bhasin Date: Tue, 19 Apr 2022 23:54:40 +0530 Subject: [PATCH 4/7] remove unwanted code --- .../opentelemetry/sdk/metrics/instruments.h | 22 ------------------- .../sdk/metrics/state/async_metric_storage.h | 3 ++- 2 files changed, 2 insertions(+), 23 deletions(-) diff --git a/sdk/include/opentelemetry/sdk/metrics/instruments.h b/sdk/include/opentelemetry/sdk/metrics/instruments.h index 260bac4272..5d85357143 100644 --- a/sdk/include/opentelemetry/sdk/metrics/instruments.h +++ b/sdk/include/opentelemetry/sdk/metrics/instruments.h @@ -52,28 +52,6 @@ struct InstrumentDescriptor InstrumentValueType value_type_; }; -static inline const char *get_instrument_type(InstrumentType type) -{ - switch (type) - { - case InstrumentType::kCounter: - return "kCounter"; - case InstrumentType::kHistogram: - return "khistogram"; - case InstrumentType::kObservableCounter: - return "kObservableCounter"; - case InstrumentType::kObservableGauge: - return "kObservableGauge"; - case InstrumentType::kObservableUpDownCounter: - return "kObservableUpDownCounter"; - case InstrumentType::kUpDownCounter: - return "kUpdownCounter"; - default: - return "Nothing"; - } - return ""; -} - using MetricAttributes = opentelemetry::sdk::common::OrderedAttributeMap; /*class InstrumentSelector { diff --git a/sdk/include/opentelemetry/sdk/metrics/state/async_metric_storage.h b/sdk/include/opentelemetry/sdk/metrics/state/async_metric_storage.h index 8372bed917..57db343884 100644 --- a/sdk/include/opentelemetry/sdk/metrics/state/async_metric_storage.h +++ b/sdk/include/opentelemetry/sdk/metrics/state/async_metric_storage.h @@ -3,7 +3,6 @@ #pragma once #ifndef ENABLE_METRICS_PREVIEW -# include # include "opentelemetry/sdk/common/attributemap_hash.h" # include "opentelemetry/sdk/metrics/aggregation/default_aggregation.h" # include "opentelemetry/sdk/metrics/instruments.h" @@ -14,6 +13,8 @@ # include "opentelemetry/sdk/metrics/state/temporal_metric_storage.h" # include "opentelemetry/sdk/metrics/view/attributes_processor.h" +# include + OPENTELEMETRY_BEGIN_NAMESPACE namespace sdk { From 2dda99a25adb23c3bbd59d84b31dc4fb54a58ed4 Mon Sep 17 00:00:00 2001 From: Lalit Date: Mon, 2 May 2022 13:24:55 -0700 Subject: [PATCH 5/7] review comments --- .../sdk/metrics/aggregation/default_aggregation.h | 6 ------ .../sdk/metrics/aggregation/drop_aggregation.h | 6 +++--- .../opentelemetry/sdk/metrics/state/async_metric_storage.h | 2 +- .../sdk/metrics/state/temporal_metric_storage.h | 3 +++ sdk/src/metrics/state/temporal_metric_storage.cc | 1 + 5 files changed, 8 insertions(+), 10 deletions(-) diff --git a/sdk/include/opentelemetry/sdk/metrics/aggregation/default_aggregation.h b/sdk/include/opentelemetry/sdk/metrics/aggregation/default_aggregation.h index 53c7403056..887e1beb92 100644 --- a/sdk/include/opentelemetry/sdk/metrics/aggregation/default_aggregation.h +++ b/sdk/include/opentelemetry/sdk/metrics/aggregation/default_aggregation.h @@ -101,7 +101,6 @@ class DefaultAggregation { case AggregationType::kDrop: return std::unique_ptr(new DropAggregation()); - break; case AggregationType::kHistogram: if (instrument_descriptor.value_type_ == InstrumentValueType::kLong) { @@ -113,21 +112,17 @@ class DefaultAggregation return std::unique_ptr( new DoubleHistogramAggregation(nostd::get(point_data))); } - break; case AggregationType::kLastValue: if (instrument_descriptor.value_type_ == InstrumentValueType::kLong) { - return std::unique_ptr( new LongLastValueAggregation(nostd::get(point_data))); } else { - return std::unique_ptr( new DoubleLastValueAggregation(nostd::get(point_data))); } - break; case AggregationType::kSum: if (instrument_descriptor.value_type_ == InstrumentValueType::kLong) { @@ -139,7 +134,6 @@ class DefaultAggregation return std::unique_ptr( new DoubleSumAggregation(nostd::get(point_data))); } - break; default: return DefaultAggregation::CreateAggregation(instrument_descriptor); } diff --git a/sdk/include/opentelemetry/sdk/metrics/aggregation/drop_aggregation.h b/sdk/include/opentelemetry/sdk/metrics/aggregation/drop_aggregation.h index 16d5892740..6c3d89d247 100644 --- a/sdk/include/opentelemetry/sdk/metrics/aggregation/drop_aggregation.h +++ b/sdk/include/opentelemetry/sdk/metrics/aggregation/drop_aggregation.h @@ -23,18 +23,18 @@ class DropAggregation : public Aggregation public: DropAggregation() = default; - DropAggregation(const DropPointData &point_data) {} + DropAggregation(const DropPointData &) {} void Aggregate(long value, const PointAttributes &attributes = {}) noexcept override {} void Aggregate(double value, const PointAttributes &attributes = {}) noexcept override {} - std::unique_ptr Merge(const Aggregation &delta) const noexcept override + std::unique_ptr Merge(const Aggregation &) const noexcept override { return std::unique_ptr(new DropAggregation()); } - std::unique_ptr Diff(const Aggregation &next) const noexcept override + std::unique_ptr Diff(const Aggregation &) const noexcept override { return std::unique_ptr(new DropAggregation()); } diff --git a/sdk/include/opentelemetry/sdk/metrics/state/async_metric_storage.h b/sdk/include/opentelemetry/sdk/metrics/state/async_metric_storage.h index 57db343884..e4c20e4010 100644 --- a/sdk/include/opentelemetry/sdk/metrics/state/async_metric_storage.h +++ b/sdk/include/opentelemetry/sdk/metrics/state/async_metric_storage.h @@ -54,7 +54,7 @@ class AsyncMetricStorage : public MetricStorage auto aggr = DefaultAggregation::CreateAggregation(aggregation_type_, instrument_descriptor_); aggr->Aggregate(measurement.second); auto prev = cumulative_hash_map_->Get(measurement.first); - if (prev != nullptr) + if (prev) { auto delta = prev->Diff(*aggr); cumulative_hash_map_->Set(measurement.first, diff --git a/sdk/include/opentelemetry/sdk/metrics/state/temporal_metric_storage.h b/sdk/include/opentelemetry/sdk/metrics/state/temporal_metric_storage.h index 21bdaf7b56..16659c14f5 100644 --- a/sdk/include/opentelemetry/sdk/metrics/state/temporal_metric_storage.h +++ b/sdk/include/opentelemetry/sdk/metrics/state/temporal_metric_storage.h @@ -40,6 +40,9 @@ class TemporalMetricStorage unreported_metrics_; // last reported metrics stash for all the collectors. std::unordered_map last_reported_metrics_; + + // Lock while building metrics + mutable opentelemetry::common::SpinLockMutex lock_; }; } // namespace metrics } // namespace sdk diff --git a/sdk/src/metrics/state/temporal_metric_storage.cc b/sdk/src/metrics/state/temporal_metric_storage.cc index 2b2a557f24..ccb81f0984 100644 --- a/sdk/src/metrics/state/temporal_metric_storage.cc +++ b/sdk/src/metrics/state/temporal_metric_storage.cc @@ -23,6 +23,7 @@ bool TemporalMetricStorage::buildMetrics(CollectorHandle *collector, std::shared_ptr delta_metrics, nostd::function_ref callback) noexcept { + std::lock_guard guard(lock_); opentelemetry::common::SystemTimestamp last_collection_ts = sdk_start_ts; auto aggregation_temporarily = collector->GetAggregationTemporality(); for (auto &col : collectors) From 77d344af3bbb7b7e6c8c6c6743fe1d980bfc7011 Mon Sep 17 00:00:00 2001 From: Lalit Date: Mon, 2 May 2022 14:07:46 -0700 Subject: [PATCH 6/7] fix review comments --- .../metrics/state/temporal_metric_storage.cc | 37 +++++++++---------- 1 file changed, 17 insertions(+), 20 deletions(-) diff --git a/sdk/src/metrics/state/temporal_metric_storage.cc b/sdk/src/metrics/state/temporal_metric_storage.cc index ccb81f0984..35d2ba720e 100644 --- a/sdk/src/metrics/state/temporal_metric_storage.cc +++ b/sdk/src/metrics/state/temporal_metric_storage.cc @@ -45,24 +45,23 @@ bool TemporalMetricStorage::buildMetrics(CollectorHandle *collector, std::unique_ptr merged_metrics(new AttributesHashMap); for (auto &agg_hashmap : unreported_list) { - agg_hashmap->GetAllEnteries([&merged_metrics, this](const MetricAttributes &attributes, - Aggregation &aggregation) { - auto agg = merged_metrics->Get(attributes); - if (agg) - { - merged_metrics->Set(attributes, std::move(agg->Merge(aggregation))); - } - else - { - merged_metrics->Set( - attributes, - std::move( - DefaultAggregation::CreateAggregation(instrument_descriptor_)->Merge(aggregation))); - merged_metrics->GetAllEnteries( - [](const MetricAttributes &attr, Aggregation &aggr) { return true; }); - } - return true; - }); + agg_hashmap->GetAllEnteries( + [&merged_metrics, this](const MetricAttributes &attributes, Aggregation &aggregation) { + auto agg = merged_metrics->Get(attributes); + if (agg) + { + merged_metrics->Set(attributes, std::move(agg->Merge(aggregation))); + } + else + { + merged_metrics->Set( + attributes, + DefaultAggregation::CreateAggregation(instrument_descriptor_)->Merge(aggregation)); + merged_metrics->GetAllEnteries( + [](const MetricAttributes &attr, Aggregation &aggr) { return true; }); + } + return true; + }); } // Get the last reported metrics for the `collector` from `last reported metrics` stash // - If the aggregation_temporarily for the collector is cumulative @@ -123,8 +122,6 @@ bool TemporalMetricStorage::buildMetrics(CollectorHandle *collector, return true; }); return callback(metric_data); - - return true; } } // namespace metrics From aafa07d725ac45952701f9ca2d56d60cf53061fc Mon Sep 17 00:00:00 2001 From: Lalit Kumar Bhasin Date: Tue, 3 May 2022 12:23:32 -0700 Subject: [PATCH 7/7] Update sdk/src/metrics/state/temporal_metric_storage.cc Co-authored-by: Ehsan Saei <71217171+esigo@users.noreply.github.com> --- sdk/src/metrics/state/temporal_metric_storage.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdk/src/metrics/state/temporal_metric_storage.cc b/sdk/src/metrics/state/temporal_metric_storage.cc index 35d2ba720e..55e93e3d46 100644 --- a/sdk/src/metrics/state/temporal_metric_storage.cc +++ b/sdk/src/metrics/state/temporal_metric_storage.cc @@ -50,7 +50,7 @@ bool TemporalMetricStorage::buildMetrics(CollectorHandle *collector, auto agg = merged_metrics->Get(attributes); if (agg) { - merged_metrics->Set(attributes, std::move(agg->Merge(aggregation))); + merged_metrics->Set(attributes, agg->Merge(aggregation)); } else {