Skip to content

Commit

Permalink
metrics service: use snapshot time for all metrics flushed (#13825)
Browse files Browse the repository at this point in the history
Signed-off-by: Rama Chavali <[email protected]>
  • Loading branch information
ramaraochavali authored Nov 4, 2020
1 parent 67609bc commit bf55b57
Show file tree
Hide file tree
Showing 11 changed files with 61 additions and 41 deletions.
1 change: 1 addition & 0 deletions include/envoy/stats/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ envoy_cc_library(
":refcount_ptr_interface",
":symbol_table_interface",
"//include/envoy/common:interval_set_interface",
"//include/envoy/common:time_interface",
],
)

Expand Down
6 changes: 6 additions & 0 deletions include/envoy/stats/sink.h
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
#include <memory>

#include "envoy/common/pure.h"
#include "envoy/common/time.h"
#include "envoy/stats/histogram.h"
#include "envoy/stats/stats.h"

Expand Down Expand Up @@ -40,6 +41,11 @@ class MetricSnapshot {
* @return a snapshot of all text readouts.
*/
virtual const std::vector<std::reference_wrapper<const TextReadout>>& textReadouts() PURE;

/**
* @return the time in UTC since epoch when the snapshot was created.
*/
virtual SystemTime snapshotTime() const PURE;
};

/**
Expand Down
2 changes: 1 addition & 1 deletion source/extensions/stat_sinks/metrics_service/config.cc
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ MetricsServiceSinkFactory::createStatsSink(const Protobuf::Message& config,
server.localInfo(), transport_api_version);

return std::make_unique<MetricsServiceSink>(
grpc_metrics_streamer, server.timeSource(),
grpc_metrics_streamer,
PROTOBUF_GET_WRAPPED_OR_DEFAULT(sink_config, report_counters_as_deltas, false));
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
#include "extensions/stat_sinks/metrics_service/grpc_metrics_service_impl.h"

#include <chrono>

#include "envoy/common/exception.h"
#include "envoy/event/dispatcher.h"
#include "envoy/service/metrics/v3/metrics_service.pb.h"
Expand Down Expand Up @@ -38,20 +40,17 @@ void GrpcMetricsStreamerImpl::send(envoy::service::metrics::v3::StreamMetricsMes
}

MetricsServiceSink::MetricsServiceSink(const GrpcMetricsStreamerSharedPtr& grpc_metrics_streamer,
TimeSource& time_source,
const bool report_counters_as_deltas)
: grpc_metrics_streamer_(grpc_metrics_streamer), time_source_(time_source),
: grpc_metrics_streamer_(grpc_metrics_streamer),
report_counters_as_deltas_(report_counters_as_deltas) {}

void MetricsServiceSink::flushCounter(
const Stats::MetricSnapshot::CounterSnapshot& counter_snapshot) {
const Stats::MetricSnapshot::CounterSnapshot& counter_snapshot, int64_t snapshot_time_ms) {
io::prometheus::client::MetricFamily* metrics_family = message_.add_envoy_metrics();
metrics_family->set_type(io::prometheus::client::MetricType::COUNTER);
metrics_family->set_name(counter_snapshot.counter_.get().name());
auto* metric = metrics_family->add_metric();
metric->set_timestamp_ms(std::chrono::duration_cast<std::chrono::milliseconds>(
time_source_.systemTime().time_since_epoch())
.count());
metric->set_timestamp_ms(snapshot_time_ms);
auto* counter_metric = metric->mutable_counter();
if (report_counters_as_deltas_) {
counter_metric->set_value(counter_snapshot.delta_);
Expand All @@ -60,19 +59,18 @@ void MetricsServiceSink::flushCounter(
}
}

void MetricsServiceSink::flushGauge(const Stats::Gauge& gauge) {
void MetricsServiceSink::flushGauge(const Stats::Gauge& gauge, int64_t snapshot_time_ms) {
io::prometheus::client::MetricFamily* metrics_family = message_.add_envoy_metrics();
metrics_family->set_type(io::prometheus::client::MetricType::GAUGE);
metrics_family->set_name(gauge.name());
auto* metric = metrics_family->add_metric();
metric->set_timestamp_ms(std::chrono::duration_cast<std::chrono::milliseconds>(
time_source_.systemTime().time_since_epoch())
.count());
metric->set_timestamp_ms(snapshot_time_ms);
auto* gauge_metric = metric->mutable_gauge();
gauge_metric->set_value(gauge.value());
}

void MetricsServiceSink::flushHistogram(const Stats::ParentHistogram& envoy_histogram) {
void MetricsServiceSink::flushHistogram(const Stats::ParentHistogram& envoy_histogram,
int64_t snapshot_time_ms) {
// TODO(ramaraochavali): Currently we are sending both quantile information and bucket
// information. We should make this configurable if it turns out that sending both affects
// performance.
Expand All @@ -82,9 +80,7 @@ void MetricsServiceSink::flushHistogram(const Stats::ParentHistogram& envoy_hist
summary_metrics_family->set_type(io::prometheus::client::MetricType::SUMMARY);
summary_metrics_family->set_name(envoy_histogram.name());
auto* summary_metric = summary_metrics_family->add_metric();
summary_metric->set_timestamp_ms(std::chrono::duration_cast<std::chrono::milliseconds>(
time_source_.systemTime().time_since_epoch())
.count());
summary_metric->set_timestamp_ms(snapshot_time_ms);
auto* summary = summary_metric->mutable_summary();
const Stats::HistogramStatistics& hist_stats = envoy_histogram.intervalStatistics();
for (size_t i = 0; i < hist_stats.supportedQuantiles().size(); i++) {
Expand All @@ -98,9 +94,7 @@ void MetricsServiceSink::flushHistogram(const Stats::ParentHistogram& envoy_hist
histogram_metrics_family->set_type(io::prometheus::client::MetricType::HISTOGRAM);
histogram_metrics_family->set_name(envoy_histogram.name());
auto* histogram_metric = histogram_metrics_family->add_metric();
histogram_metric->set_timestamp_ms(std::chrono::duration_cast<std::chrono::milliseconds>(
time_source_.systemTime().time_since_epoch())
.count());
histogram_metric->set_timestamp_ms(snapshot_time_ms);
auto* histogram = histogram_metric->mutable_histogram();
histogram->set_sample_count(hist_stats.sampleCount());
histogram->set_sample_sum(hist_stats.sampleSum());
Expand All @@ -119,21 +113,24 @@ void MetricsServiceSink::flush(Stats::MetricSnapshot& snapshot) {
// preallocating the pointer array).
message_.mutable_envoy_metrics()->Reserve(snapshot.counters().size() + snapshot.gauges().size() +
snapshot.histograms().size());
int64_t snapshot_time_ms = std::chrono::duration_cast<std::chrono::milliseconds>(
snapshot.snapshotTime().time_since_epoch())
.count();
for (const auto& counter : snapshot.counters()) {
if (counter.counter_.get().used()) {
flushCounter(counter);
flushCounter(counter, snapshot_time_ms);
}
}

for (const auto& gauge : snapshot.gauges()) {
if (gauge.get().used()) {
flushGauge(gauge.get());
flushGauge(gauge.get(), snapshot_time_ms);
}
}

for (const auto& histogram : snapshot.histograms()) {
if (histogram.get().used()) {
flushHistogram(histogram.get());
flushHistogram(histogram.get(), snapshot_time_ms);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,18 +80,18 @@ class MetricsServiceSink : public Stats::Sink {
public:
// MetricsService::Sink
MetricsServiceSink(const GrpcMetricsStreamerSharedPtr& grpc_metrics_streamer,
TimeSource& time_system, const bool report_counters_as_deltas);
const bool report_counters_as_deltas);
void flush(Stats::MetricSnapshot& snapshot) override;
void onHistogramComplete(const Stats::Histogram&, uint64_t) override {}

void flushCounter(const Stats::MetricSnapshot::CounterSnapshot& counter_snapshot);
void flushGauge(const Stats::Gauge& gauge);
void flushHistogram(const Stats::ParentHistogram& envoy_histogram);
void flushCounter(const Stats::MetricSnapshot::CounterSnapshot& counter_snapshot,
int64_t snapshot_time_ms);
void flushGauge(const Stats::Gauge& gauge, int64_t snapshot_time_ms);
void flushHistogram(const Stats::ParentHistogram& envoy_histogram, int64_t snapshot_time_ms);

private:
GrpcMetricsStreamerSharedPtr grpc_metrics_streamer_;
envoy::service::metrics::v3::StreamMetricsMessage message_;
TimeSource& time_source_;
const bool report_counters_as_deltas_;
};

Expand Down
14 changes: 9 additions & 5 deletions source/server/server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,14 @@

#include <csignal>
#include <cstdint>
#include <ctime>
#include <functional>
#include <memory>
#include <string>

#include "envoy/admin/v3/config_dump.pb.h"
#include "envoy/common/exception.h"
#include "envoy/common/time.h"
#include "envoy/config/bootstrap/v2/bootstrap.pb.h"
#include "envoy/config/bootstrap/v2/bootstrap.pb.validate.h"
#include "envoy/config/bootstrap/v3/bootstrap.pb.h"
Expand Down Expand Up @@ -147,7 +149,7 @@ void InstanceImpl::failHealthcheck(bool fail) {
server_stats_->live_.set(live_.load());
}

MetricSnapshotImpl::MetricSnapshotImpl(Stats::Store& store) {
MetricSnapshotImpl::MetricSnapshotImpl(Stats::Store& store, TimeSource& time_source) {
snapped_counters_ = store.counters();
counters_.reserve(snapped_counters_.size());
for (const auto& counter : snapped_counters_) {
Expand All @@ -172,15 +174,17 @@ MetricSnapshotImpl::MetricSnapshotImpl(Stats::Store& store) {
for (const auto& text_readout : snapped_text_readouts_) {
text_readouts_.push_back(*text_readout);
}

snapshot_time_ = time_source.systemTime();
}

void InstanceUtil::flushMetricsToSinks(const std::list<Stats::SinkPtr>& sinks,
Stats::Store& store) {
void InstanceUtil::flushMetricsToSinks(const std::list<Stats::SinkPtr>& sinks, Stats::Store& store,
TimeSource& time_source) {
// Create a snapshot and flush to all sinks.
// NOTE: Even if there are no sinks, creating the snapshot has the important property that it
// latches all counters on a periodic basis. The hot restart code assumes this is being
// done so this should not be removed.
MetricSnapshotImpl snapshot(store);
MetricSnapshotImpl snapshot(store, time_source);
for (const auto& sink : sinks) {
sink->flush(snapshot);
}
Expand Down Expand Up @@ -231,7 +235,7 @@ void InstanceImpl::updateServerStats() {

void InstanceImpl::flushStatsInternal() {
updateServerStats();
InstanceUtil::flushMetricsToSinks(config_.statsSinks(), stats_store_);
InstanceUtil::flushMetricsToSinks(config_.statsSinks(), stats_store_, timeSource());
// TODO(ramaraochavali): consider adding different flush interval for histograms.
if (stat_flush_timer_ != nullptr) {
stat_flush_timer_->enableTimer(config_.statsFlushInterval());
Expand Down
7 changes: 5 additions & 2 deletions source/server/server.h
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,8 @@ class InstanceUtil : Logger::Loggable<Logger::Id::main> {
* @param sinks supplies the list of sinks.
* @param store provides the store being flushed.
*/
static void flushMetricsToSinks(const std::list<Stats::SinkPtr>& sinks, Stats::Store& store);
static void flushMetricsToSinks(const std::list<Stats::SinkPtr>& sinks, Stats::Store& store,
TimeSource& time_source);

/**
* Load a bootstrap config and perform validation.
Expand Down Expand Up @@ -383,7 +384,7 @@ class InstanceImpl final : Logger::Loggable<Logger::Id::main>,
// copying and probably be a cleaner API in general.
class MetricSnapshotImpl : public Stats::MetricSnapshot {
public:
explicit MetricSnapshotImpl(Stats::Store& store);
explicit MetricSnapshotImpl(Stats::Store& store, TimeSource& time_source);

// Stats::MetricSnapshot
const std::vector<CounterSnapshot>& counters() override { return counters_; }
Expand All @@ -396,6 +397,7 @@ class MetricSnapshotImpl : public Stats::MetricSnapshot {
const std::vector<std::reference_wrapper<const Stats::TextReadout>>& textReadouts() override {
return text_readouts_;
}
SystemTime snapshotTime() const override { return snapshot_time_; }

private:
std::vector<Stats::CounterSharedPtr> snapped_counters_;
Expand All @@ -406,6 +408,7 @@ class MetricSnapshotImpl : public Stats::MetricSnapshot {
std::vector<std::reference_wrapper<const Stats::ParentHistogram>> histograms_;
std::vector<Stats::TextReadoutSharedPtr> snapped_text_readouts_;
std::vector<std::reference_wrapper<const Stats::TextReadout>> text_readouts_;
SystemTime snapshot_time_;
};

} // namespace Server
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,12 +96,11 @@ class MetricsServiceSinkTest : public testing::Test {
MetricsServiceSinkTest() = default;

NiceMock<Stats::MockMetricSnapshot> snapshot_;
Event::SimulatedTimeSystem time_system_;
std::shared_ptr<MockGrpcMetricsStreamer> streamer_{new MockGrpcMetricsStreamer()};
};

TEST_F(MetricsServiceSinkTest, CheckSendCall) {
MetricsServiceSink sink(streamer_, time_system_, false);
MetricsServiceSink sink(streamer_, false);

auto counter = std::make_shared<NiceMock<Stats::MockCounter>>();
counter->name_ = "test_counter";
Expand All @@ -125,7 +124,7 @@ TEST_F(MetricsServiceSinkTest, CheckSendCall) {
}

TEST_F(MetricsServiceSinkTest, CheckStatsCount) {
MetricsServiceSink sink(streamer_, time_system_, false);
MetricsServiceSink sink(streamer_, false);

auto counter = std::make_shared<NiceMock<Stats::MockCounter>>();
counter->name_ = "test_counter";
Expand Down Expand Up @@ -156,7 +155,7 @@ TEST_F(MetricsServiceSinkTest, CheckStatsCount) {

// Test that verifies counters are correctly reported as current value when configured to do so.
TEST_F(MetricsServiceSinkTest, ReportCountersValues) {
MetricsServiceSink sink(streamer_, time_system_, false);
MetricsServiceSink sink(streamer_, false);

auto counter = std::make_shared<NiceMock<Stats::MockCounter>>();
counter->name_ = "test_counter";
Expand All @@ -174,7 +173,7 @@ TEST_F(MetricsServiceSinkTest, ReportCountersValues) {

// Test that verifies counters are reported as the delta between flushes when configured to do so.
TEST_F(MetricsServiceSinkTest, ReportCountersAsDeltas) {
MetricsServiceSink sink(streamer_, time_system_, true);
MetricsServiceSink sink(streamer_, true);

auto counter = std::make_shared<NiceMock<Stats::MockCounter>>();
counter->name_ = "test_counter";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ class MetricsServiceIntegrationTest : public Grpc::VersionedGrpcClientIntegratio
const Protobuf::RepeatedPtrField<::io::prometheus::client::MetricFamily>& envoy_metrics =
request_msg.envoy_metrics();

int64_t previous_time_stamp = 0;
for (const ::io::prometheus::client::MetricFamily& metrics_family : envoy_metrics) {
if (metrics_family.name() == "cluster.cluster_0.membership_change" &&
metrics_family.type() == ::io::prometheus::client::MetricType::COUNTER) {
Expand All @@ -112,6 +113,11 @@ class MetricsServiceIntegrationTest : public Grpc::VersionedGrpcClientIntegratio
Stats::HistogramSettingsImpl::defaultBuckets().size());
}
ASSERT(metrics_family.metric(0).has_timestamp_ms());
// Validate that all metrics have the same timestamp.
if (previous_time_stamp > 0) {
EXPECT_EQ(previous_time_stamp, metrics_family.metric(0).timestamp_ms());
}
previous_time_stamp = metrics_family.metric(0).timestamp_ms();
if (known_counter_exists && known_gauge_exists && known_histogram_exists) {
break;
}
Expand Down
3 changes: 3 additions & 0 deletions test/mocks/stats/mocks.h
Original file line number Diff line number Diff line change
Expand Up @@ -280,10 +280,13 @@ class MockMetricSnapshot : public MetricSnapshot {
MOCK_METHOD(const std::vector<std::reference_wrapper<const ParentHistogram>>&, histograms, ());
MOCK_METHOD(const std::vector<std::reference_wrapper<const TextReadout>>&, textReadouts, ());

SystemTime snapshotTime() const override { return snapshot_time_; }

std::vector<CounterSnapshot> counters_;
std::vector<std::reference_wrapper<const Gauge>> gauges_;
std::vector<std::reference_wrapper<const ParentHistogram>> histograms_;
std::vector<std::reference_wrapper<const TextReadout>> text_readouts_;
SystemTime snapshot_time_;
};

class MockSink : public Sink {
Expand Down
7 changes: 4 additions & 3 deletions test/server/server_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -52,14 +52,15 @@ TEST(ServerInstanceUtil, flushHelper) {
InSequence s;

Stats::TestUtil::TestStore store;
Event::SimulatedTimeSystem time_system;
Stats::Counter& c = store.counter("hello");
c.inc();
store.gauge("world", Stats::Gauge::ImportMode::Accumulate).set(5);
store.histogram("histogram", Stats::Histogram::Unit::Unspecified);
store.textReadout("text").set("is important");

std::list<Stats::SinkPtr> sinks;
InstanceUtil::flushMetricsToSinks(sinks, store);
InstanceUtil::flushMetricsToSinks(sinks, store, time_system);
// Make sure that counters have been latched even if there are no sinks.
EXPECT_EQ(1UL, c.value());
EXPECT_EQ(0, c.latch());
Expand All @@ -80,7 +81,7 @@ TEST(ServerInstanceUtil, flushHelper) {
EXPECT_EQ(snapshot.textReadouts()[0].get().value(), "is important");
}));
c.inc();
InstanceUtil::flushMetricsToSinks(sinks, store);
InstanceUtil::flushMetricsToSinks(sinks, store, time_system);

// Histograms don't currently work with the isolated store so test those with a mock store.
NiceMock<Stats::MockStore> mock_store;
Expand All @@ -93,7 +94,7 @@ TEST(ServerInstanceUtil, flushHelper) {
EXPECT_EQ(snapshot.histograms().size(), 1);
EXPECT_TRUE(snapshot.textReadouts().empty());
}));
InstanceUtil::flushMetricsToSinks(sinks, mock_store);
InstanceUtil::flushMetricsToSinks(sinks, mock_store, time_system);
}

class RunHelperTest : public testing::Test {
Expand Down

0 comments on commit bf55b57

Please sign in to comment.