From bf55b57614245f81049af9d91106f1b54bacffa8 Mon Sep 17 00:00:00 2001 From: Rama Chavali Date: Wed, 4 Nov 2020 21:45:57 +0530 Subject: [PATCH] metrics service: use snapshot time for all metrics flushed (#13825) Signed-off-by: Rama Chavali --- include/envoy/stats/BUILD | 1 + include/envoy/stats/sink.h | 6 +++ .../stat_sinks/metrics_service/config.cc | 2 +- .../grpc_metrics_service_impl.cc | 37 +++++++++---------- .../grpc_metrics_service_impl.h | 10 ++--- source/server/server.cc | 14 ++++--- source/server/server.h | 7 +++- .../grpc_metrics_service_impl_test.cc | 9 ++--- .../metrics_service_integration_test.cc | 6 +++ test/mocks/stats/mocks.h | 3 ++ test/server/server_test.cc | 7 ++-- 11 files changed, 61 insertions(+), 41 deletions(-) diff --git a/include/envoy/stats/BUILD b/include/envoy/stats/BUILD index c810ac7ad30c..484115c77691 100644 --- a/include/envoy/stats/BUILD +++ b/include/envoy/stats/BUILD @@ -34,6 +34,7 @@ envoy_cc_library( ":refcount_ptr_interface", ":symbol_table_interface", "//include/envoy/common:interval_set_interface", + "//include/envoy/common:time_interface", ], ) diff --git a/include/envoy/stats/sink.h b/include/envoy/stats/sink.h index 1303c9fd67b8..ff0e607ffaa8 100644 --- a/include/envoy/stats/sink.h +++ b/include/envoy/stats/sink.h @@ -4,6 +4,7 @@ #include #include "envoy/common/pure.h" +#include "envoy/common/time.h" #include "envoy/stats/histogram.h" #include "envoy/stats/stats.h" @@ -40,6 +41,11 @@ class MetricSnapshot { * @return a snapshot of all text readouts. */ virtual const std::vector>& textReadouts() PURE; + + /** + * @return the time in UTC since epoch when the snapshot was created. + */ + virtual SystemTime snapshotTime() const PURE; }; /** diff --git a/source/extensions/stat_sinks/metrics_service/config.cc b/source/extensions/stat_sinks/metrics_service/config.cc index db1998aefe5b..05228e9a67c4 100644 --- a/source/extensions/stat_sinks/metrics_service/config.cc +++ b/source/extensions/stat_sinks/metrics_service/config.cc @@ -36,7 +36,7 @@ MetricsServiceSinkFactory::createStatsSink(const Protobuf::Message& config, server.localInfo(), transport_api_version); return std::make_unique( - grpc_metrics_streamer, server.timeSource(), + grpc_metrics_streamer, PROTOBUF_GET_WRAPPED_OR_DEFAULT(sink_config, report_counters_as_deltas, false)); } diff --git a/source/extensions/stat_sinks/metrics_service/grpc_metrics_service_impl.cc b/source/extensions/stat_sinks/metrics_service/grpc_metrics_service_impl.cc index 092e3fbe6fcf..d18bfbf20c83 100644 --- a/source/extensions/stat_sinks/metrics_service/grpc_metrics_service_impl.cc +++ b/source/extensions/stat_sinks/metrics_service/grpc_metrics_service_impl.cc @@ -1,5 +1,7 @@ #include "extensions/stat_sinks/metrics_service/grpc_metrics_service_impl.h" +#include + #include "envoy/common/exception.h" #include "envoy/event/dispatcher.h" #include "envoy/service/metrics/v3/metrics_service.pb.h" @@ -38,20 +40,17 @@ void GrpcMetricsStreamerImpl::send(envoy::service::metrics::v3::StreamMetricsMes } MetricsServiceSink::MetricsServiceSink(const GrpcMetricsStreamerSharedPtr& grpc_metrics_streamer, - TimeSource& time_source, const bool report_counters_as_deltas) - : grpc_metrics_streamer_(grpc_metrics_streamer), time_source_(time_source), + : grpc_metrics_streamer_(grpc_metrics_streamer), report_counters_as_deltas_(report_counters_as_deltas) {} void MetricsServiceSink::flushCounter( - const Stats::MetricSnapshot::CounterSnapshot& counter_snapshot) { + const Stats::MetricSnapshot::CounterSnapshot& counter_snapshot, int64_t snapshot_time_ms) { io::prometheus::client::MetricFamily* metrics_family = message_.add_envoy_metrics(); metrics_family->set_type(io::prometheus::client::MetricType::COUNTER); metrics_family->set_name(counter_snapshot.counter_.get().name()); auto* metric = metrics_family->add_metric(); - metric->set_timestamp_ms(std::chrono::duration_cast( - time_source_.systemTime().time_since_epoch()) - .count()); + metric->set_timestamp_ms(snapshot_time_ms); auto* counter_metric = metric->mutable_counter(); if (report_counters_as_deltas_) { counter_metric->set_value(counter_snapshot.delta_); @@ -60,19 +59,18 @@ void MetricsServiceSink::flushCounter( } } -void MetricsServiceSink::flushGauge(const Stats::Gauge& gauge) { +void MetricsServiceSink::flushGauge(const Stats::Gauge& gauge, int64_t snapshot_time_ms) { io::prometheus::client::MetricFamily* metrics_family = message_.add_envoy_metrics(); metrics_family->set_type(io::prometheus::client::MetricType::GAUGE); metrics_family->set_name(gauge.name()); auto* metric = metrics_family->add_metric(); - metric->set_timestamp_ms(std::chrono::duration_cast( - time_source_.systemTime().time_since_epoch()) - .count()); + metric->set_timestamp_ms(snapshot_time_ms); auto* gauge_metric = metric->mutable_gauge(); gauge_metric->set_value(gauge.value()); } -void MetricsServiceSink::flushHistogram(const Stats::ParentHistogram& envoy_histogram) { +void MetricsServiceSink::flushHistogram(const Stats::ParentHistogram& envoy_histogram, + int64_t snapshot_time_ms) { // TODO(ramaraochavali): Currently we are sending both quantile information and bucket // information. We should make this configurable if it turns out that sending both affects // performance. @@ -82,9 +80,7 @@ void MetricsServiceSink::flushHistogram(const Stats::ParentHistogram& envoy_hist summary_metrics_family->set_type(io::prometheus::client::MetricType::SUMMARY); summary_metrics_family->set_name(envoy_histogram.name()); auto* summary_metric = summary_metrics_family->add_metric(); - summary_metric->set_timestamp_ms(std::chrono::duration_cast( - time_source_.systemTime().time_since_epoch()) - .count()); + summary_metric->set_timestamp_ms(snapshot_time_ms); auto* summary = summary_metric->mutable_summary(); const Stats::HistogramStatistics& hist_stats = envoy_histogram.intervalStatistics(); for (size_t i = 0; i < hist_stats.supportedQuantiles().size(); i++) { @@ -98,9 +94,7 @@ void MetricsServiceSink::flushHistogram(const Stats::ParentHistogram& envoy_hist histogram_metrics_family->set_type(io::prometheus::client::MetricType::HISTOGRAM); histogram_metrics_family->set_name(envoy_histogram.name()); auto* histogram_metric = histogram_metrics_family->add_metric(); - histogram_metric->set_timestamp_ms(std::chrono::duration_cast( - time_source_.systemTime().time_since_epoch()) - .count()); + histogram_metric->set_timestamp_ms(snapshot_time_ms); auto* histogram = histogram_metric->mutable_histogram(); histogram->set_sample_count(hist_stats.sampleCount()); histogram->set_sample_sum(hist_stats.sampleSum()); @@ -119,21 +113,24 @@ void MetricsServiceSink::flush(Stats::MetricSnapshot& snapshot) { // preallocating the pointer array). message_.mutable_envoy_metrics()->Reserve(snapshot.counters().size() + snapshot.gauges().size() + snapshot.histograms().size()); + int64_t snapshot_time_ms = std::chrono::duration_cast( + snapshot.snapshotTime().time_since_epoch()) + .count(); for (const auto& counter : snapshot.counters()) { if (counter.counter_.get().used()) { - flushCounter(counter); + flushCounter(counter, snapshot_time_ms); } } for (const auto& gauge : snapshot.gauges()) { if (gauge.get().used()) { - flushGauge(gauge.get()); + flushGauge(gauge.get(), snapshot_time_ms); } } for (const auto& histogram : snapshot.histograms()) { if (histogram.get().used()) { - flushHistogram(histogram.get()); + flushHistogram(histogram.get(), snapshot_time_ms); } } diff --git a/source/extensions/stat_sinks/metrics_service/grpc_metrics_service_impl.h b/source/extensions/stat_sinks/metrics_service/grpc_metrics_service_impl.h index d65bae27f9bb..668c7f3fb567 100644 --- a/source/extensions/stat_sinks/metrics_service/grpc_metrics_service_impl.h +++ b/source/extensions/stat_sinks/metrics_service/grpc_metrics_service_impl.h @@ -80,18 +80,18 @@ class MetricsServiceSink : public Stats::Sink { public: // MetricsService::Sink MetricsServiceSink(const GrpcMetricsStreamerSharedPtr& grpc_metrics_streamer, - TimeSource& time_system, const bool report_counters_as_deltas); + const bool report_counters_as_deltas); void flush(Stats::MetricSnapshot& snapshot) override; void onHistogramComplete(const Stats::Histogram&, uint64_t) override {} - void flushCounter(const Stats::MetricSnapshot::CounterSnapshot& counter_snapshot); - void flushGauge(const Stats::Gauge& gauge); - void flushHistogram(const Stats::ParentHistogram& envoy_histogram); + void flushCounter(const Stats::MetricSnapshot::CounterSnapshot& counter_snapshot, + int64_t snapshot_time_ms); + void flushGauge(const Stats::Gauge& gauge, int64_t snapshot_time_ms); + void flushHistogram(const Stats::ParentHistogram& envoy_histogram, int64_t snapshot_time_ms); private: GrpcMetricsStreamerSharedPtr grpc_metrics_streamer_; envoy::service::metrics::v3::StreamMetricsMessage message_; - TimeSource& time_source_; const bool report_counters_as_deltas_; }; diff --git a/source/server/server.cc b/source/server/server.cc index b0a16884d982..f0942407c4e8 100644 --- a/source/server/server.cc +++ b/source/server/server.cc @@ -2,12 +2,14 @@ #include #include +#include #include #include #include #include "envoy/admin/v3/config_dump.pb.h" #include "envoy/common/exception.h" +#include "envoy/common/time.h" #include "envoy/config/bootstrap/v2/bootstrap.pb.h" #include "envoy/config/bootstrap/v2/bootstrap.pb.validate.h" #include "envoy/config/bootstrap/v3/bootstrap.pb.h" @@ -147,7 +149,7 @@ void InstanceImpl::failHealthcheck(bool fail) { server_stats_->live_.set(live_.load()); } -MetricSnapshotImpl::MetricSnapshotImpl(Stats::Store& store) { +MetricSnapshotImpl::MetricSnapshotImpl(Stats::Store& store, TimeSource& time_source) { snapped_counters_ = store.counters(); counters_.reserve(snapped_counters_.size()); for (const auto& counter : snapped_counters_) { @@ -172,15 +174,17 @@ MetricSnapshotImpl::MetricSnapshotImpl(Stats::Store& store) { for (const auto& text_readout : snapped_text_readouts_) { text_readouts_.push_back(*text_readout); } + + snapshot_time_ = time_source.systemTime(); } -void InstanceUtil::flushMetricsToSinks(const std::list& sinks, - Stats::Store& store) { +void InstanceUtil::flushMetricsToSinks(const std::list& sinks, Stats::Store& store, + TimeSource& time_source) { // Create a snapshot and flush to all sinks. // NOTE: Even if there are no sinks, creating the snapshot has the important property that it // latches all counters on a periodic basis. The hot restart code assumes this is being // done so this should not be removed. - MetricSnapshotImpl snapshot(store); + MetricSnapshotImpl snapshot(store, time_source); for (const auto& sink : sinks) { sink->flush(snapshot); } @@ -231,7 +235,7 @@ void InstanceImpl::updateServerStats() { void InstanceImpl::flushStatsInternal() { updateServerStats(); - InstanceUtil::flushMetricsToSinks(config_.statsSinks(), stats_store_); + InstanceUtil::flushMetricsToSinks(config_.statsSinks(), stats_store_, timeSource()); // TODO(ramaraochavali): consider adding different flush interval for histograms. if (stat_flush_timer_ != nullptr) { stat_flush_timer_->enableTimer(config_.statsFlushInterval()); diff --git a/source/server/server.h b/source/server/server.h index f9ce2954d581..1c06562b5a0e 100644 --- a/source/server/server.h +++ b/source/server/server.h @@ -113,7 +113,8 @@ class InstanceUtil : Logger::Loggable { * @param sinks supplies the list of sinks. * @param store provides the store being flushed. */ - static void flushMetricsToSinks(const std::list& sinks, Stats::Store& store); + static void flushMetricsToSinks(const std::list& sinks, Stats::Store& store, + TimeSource& time_source); /** * Load a bootstrap config and perform validation. @@ -383,7 +384,7 @@ class InstanceImpl final : Logger::Loggable, // copying and probably be a cleaner API in general. class MetricSnapshotImpl : public Stats::MetricSnapshot { public: - explicit MetricSnapshotImpl(Stats::Store& store); + explicit MetricSnapshotImpl(Stats::Store& store, TimeSource& time_source); // Stats::MetricSnapshot const std::vector& counters() override { return counters_; } @@ -396,6 +397,7 @@ class MetricSnapshotImpl : public Stats::MetricSnapshot { const std::vector>& textReadouts() override { return text_readouts_; } + SystemTime snapshotTime() const override { return snapshot_time_; } private: std::vector snapped_counters_; @@ -406,6 +408,7 @@ class MetricSnapshotImpl : public Stats::MetricSnapshot { std::vector> histograms_; std::vector snapped_text_readouts_; std::vector> text_readouts_; + SystemTime snapshot_time_; }; } // namespace Server diff --git a/test/extensions/stats_sinks/metrics_service/grpc_metrics_service_impl_test.cc b/test/extensions/stats_sinks/metrics_service/grpc_metrics_service_impl_test.cc index ce940b650136..927b710ac756 100644 --- a/test/extensions/stats_sinks/metrics_service/grpc_metrics_service_impl_test.cc +++ b/test/extensions/stats_sinks/metrics_service/grpc_metrics_service_impl_test.cc @@ -96,12 +96,11 @@ class MetricsServiceSinkTest : public testing::Test { MetricsServiceSinkTest() = default; NiceMock snapshot_; - Event::SimulatedTimeSystem time_system_; std::shared_ptr streamer_{new MockGrpcMetricsStreamer()}; }; TEST_F(MetricsServiceSinkTest, CheckSendCall) { - MetricsServiceSink sink(streamer_, time_system_, false); + MetricsServiceSink sink(streamer_, false); auto counter = std::make_shared>(); counter->name_ = "test_counter"; @@ -125,7 +124,7 @@ TEST_F(MetricsServiceSinkTest, CheckSendCall) { } TEST_F(MetricsServiceSinkTest, CheckStatsCount) { - MetricsServiceSink sink(streamer_, time_system_, false); + MetricsServiceSink sink(streamer_, false); auto counter = std::make_shared>(); counter->name_ = "test_counter"; @@ -156,7 +155,7 @@ TEST_F(MetricsServiceSinkTest, CheckStatsCount) { // Test that verifies counters are correctly reported as current value when configured to do so. TEST_F(MetricsServiceSinkTest, ReportCountersValues) { - MetricsServiceSink sink(streamer_, time_system_, false); + MetricsServiceSink sink(streamer_, false); auto counter = std::make_shared>(); counter->name_ = "test_counter"; @@ -174,7 +173,7 @@ TEST_F(MetricsServiceSinkTest, ReportCountersValues) { // Test that verifies counters are reported as the delta between flushes when configured to do so. TEST_F(MetricsServiceSinkTest, ReportCountersAsDeltas) { - MetricsServiceSink sink(streamer_, time_system_, true); + MetricsServiceSink sink(streamer_, true); auto counter = std::make_shared>(); counter->name_ = "test_counter"; diff --git a/test/extensions/stats_sinks/metrics_service/metrics_service_integration_test.cc b/test/extensions/stats_sinks/metrics_service/metrics_service_integration_test.cc index dcdf47945e94..ff46886b4cd7 100644 --- a/test/extensions/stats_sinks/metrics_service/metrics_service_integration_test.cc +++ b/test/extensions/stats_sinks/metrics_service/metrics_service_integration_test.cc @@ -87,6 +87,7 @@ class MetricsServiceIntegrationTest : public Grpc::VersionedGrpcClientIntegratio const Protobuf::RepeatedPtrField<::io::prometheus::client::MetricFamily>& envoy_metrics = request_msg.envoy_metrics(); + int64_t previous_time_stamp = 0; for (const ::io::prometheus::client::MetricFamily& metrics_family : envoy_metrics) { if (metrics_family.name() == "cluster.cluster_0.membership_change" && metrics_family.type() == ::io::prometheus::client::MetricType::COUNTER) { @@ -112,6 +113,11 @@ class MetricsServiceIntegrationTest : public Grpc::VersionedGrpcClientIntegratio Stats::HistogramSettingsImpl::defaultBuckets().size()); } ASSERT(metrics_family.metric(0).has_timestamp_ms()); + // Validate that all metrics have the same timestamp. + if (previous_time_stamp > 0) { + EXPECT_EQ(previous_time_stamp, metrics_family.metric(0).timestamp_ms()); + } + previous_time_stamp = metrics_family.metric(0).timestamp_ms(); if (known_counter_exists && known_gauge_exists && known_histogram_exists) { break; } diff --git a/test/mocks/stats/mocks.h b/test/mocks/stats/mocks.h index cc43bd084e10..ba12c326c33d 100644 --- a/test/mocks/stats/mocks.h +++ b/test/mocks/stats/mocks.h @@ -280,10 +280,13 @@ class MockMetricSnapshot : public MetricSnapshot { MOCK_METHOD(const std::vector>&, histograms, ()); MOCK_METHOD(const std::vector>&, textReadouts, ()); + SystemTime snapshotTime() const override { return snapshot_time_; } + std::vector counters_; std::vector> gauges_; std::vector> histograms_; std::vector> text_readouts_; + SystemTime snapshot_time_; }; class MockSink : public Sink { diff --git a/test/server/server_test.cc b/test/server/server_test.cc index a26da0f646e6..5249a38f9828 100644 --- a/test/server/server_test.cc +++ b/test/server/server_test.cc @@ -52,6 +52,7 @@ TEST(ServerInstanceUtil, flushHelper) { InSequence s; Stats::TestUtil::TestStore store; + Event::SimulatedTimeSystem time_system; Stats::Counter& c = store.counter("hello"); c.inc(); store.gauge("world", Stats::Gauge::ImportMode::Accumulate).set(5); @@ -59,7 +60,7 @@ TEST(ServerInstanceUtil, flushHelper) { store.textReadout("text").set("is important"); std::list sinks; - InstanceUtil::flushMetricsToSinks(sinks, store); + InstanceUtil::flushMetricsToSinks(sinks, store, time_system); // Make sure that counters have been latched even if there are no sinks. EXPECT_EQ(1UL, c.value()); EXPECT_EQ(0, c.latch()); @@ -80,7 +81,7 @@ TEST(ServerInstanceUtil, flushHelper) { EXPECT_EQ(snapshot.textReadouts()[0].get().value(), "is important"); })); c.inc(); - InstanceUtil::flushMetricsToSinks(sinks, store); + InstanceUtil::flushMetricsToSinks(sinks, store, time_system); // Histograms don't currently work with the isolated store so test those with a mock store. NiceMock mock_store; @@ -93,7 +94,7 @@ TEST(ServerInstanceUtil, flushHelper) { EXPECT_EQ(snapshot.histograms().size(), 1); EXPECT_TRUE(snapshot.textReadouts().empty()); })); - InstanceUtil::flushMetricsToSinks(sinks, mock_store); + InstanceUtil::flushMetricsToSinks(sinks, mock_store, time_system); } class RunHelperTest : public testing::Test {