diff --git a/src/yb/common/hybrid_time.h b/src/yb/common/hybrid_time.h index 63924fef30f1..dd9295098674 100644 --- a/src/yb/common/hybrid_time.h +++ b/src/yb/common/hybrid_time.h @@ -202,6 +202,11 @@ class HybridTime { return v >> kBitsForLogicalComponent; } + // Returns the physical value embedded in this HybridTime, in milliseconds. + inline MillisTime GetPhysicalValueMillis() const { + return GetPhysicalValueMicros() / 1000; + } + inline int64_t PhysicalDiff(const HybridTime& other) const { return static_cast(GetPhysicalValueMicros() - other.GetPhysicalValueMicros()); } diff --git a/src/yb/consensus/raft_consensus.cc b/src/yb/consensus/raft_consensus.cc index 90ccf0d96be3..4285ea3e5811 100644 --- a/src/yb/consensus/raft_consensus.cc +++ b/src/yb/consensus/raft_consensus.cc @@ -340,7 +340,7 @@ RaftConsensus::RaftConsensus( term_metric_(metric_entity->FindOrCreateGauge(&METRIC_raft_term, cmeta->current_term())), follower_last_update_time_ms_metric_( - metric_entity->FindOrCreateAtomicMillisLag(&METRIC_follower_lag_ms)), + metric_entity->FindOrCreateAtomicMillisLag(&METRIC_follower_lag_ms, clock_)), is_raft_leader_metric_(metric_entity->FindOrCreateGauge(&METRIC_is_raft_leader, static_cast(0))), parent_mem_tracker_(std::move(parent_mem_tracker)), diff --git a/src/yb/server/CMakeLists.txt b/src/yb/server/CMakeLists.txt index d9d1af70af0b..feb0d8a61a4f 100644 --- a/src/yb/server/CMakeLists.txt +++ b/src/yb/server/CMakeLists.txt @@ -58,6 +58,7 @@ ADD_YB_TEST(hybrid_clock-test) ADD_YB_TEST(logical_clock-test) # This test is here and not in common because we need access to HybridClock. ADD_YB_TEST(doc_hybrid_time-test) +ADD_YB_TEST(lag_metrics_test) ######################################### # server_base_proto diff --git a/src/yb/server/clock.h b/src/yb/server/clock.h index 22f1f1675b12..5580b5804ff9 100644 --- a/src/yb/server/clock.h +++ b/src/yb/server/clock.h @@ -36,7 +36,6 @@ #include #include "yb/common/clock.h" -#include "yb/common/common.pb.h" #include "yb/common/hybrid_time.h" #include "yb/gutil/ref_counted.h" diff --git a/src/yb/server/lag_metrics_test.cc b/src/yb/server/lag_metrics_test.cc new file mode 100644 index 000000000000..b6a7c79c57d1 --- /dev/null +++ b/src/yb/server/lag_metrics_test.cc @@ -0,0 +1,90 @@ +// Copyright (c) YugaByte, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except +// in compliance with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software distributed under the License +// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +// or implied. See the License for the specific language governing permissions and limitations +// under the License. + +#include + +#include "yb/common/hybrid_time.h" +#include "yb/server/logical_clock.h" +#include "yb/util/metrics.h" +#include "yb/util/test_util.h" + +namespace yb { + +METRIC_DEFINE_entity(lag_metric_test_entity); + +class LagMetricsTest : public YBTest { + public: + void SetUp() override { + YBTest::SetUp(); + + entity_ = METRIC_ENTITY_lag_metric_test_entity.Instantiate(®istry_, "my-lag-metric-test"); + } + + protected: + template + void DoLagTest(MillisLagPrototype* metric) { + auto micros = std::chrono::duration_cast( + std::chrono::system_clock::now().time_since_epoch()).count(); + scoped_refptr clock( + server::LogicalClock::CreateStartingAt(HybridTime::FromMicros(micros))); + + auto lag = metric->Instantiate(entity_, clock); + ASSERT_EQ(metric->description(), lag->prototype()->description()); + + SleepFor(MonoDelta::FromMilliseconds(500)); + + micros = std::chrono::duration_cast( + std::chrono::system_clock::now().time_since_epoch()).count(); + clock->Update(HybridTime::FromMicros(micros)); + + // Internal timestamp is set to the time when the metric was created. + // So this lag is measure of the time elapsed since the metric was + // created and the check time. + ASSERT_GE(lag->lag_ms(), 500); + SleepFor(MonoDelta::FromMilliseconds(1000)); + + auto now_ms = std::chrono::duration_cast( + std::chrono::system_clock::now().time_since_epoch()).count(); + lag->UpdateTimestampInMilliseconds(now_ms); + + micros = std::chrono::duration_cast( + std::chrono::system_clock::now().time_since_epoch()).count(); + clock->Update(HybridTime::FromMicros(micros)); + + // Verify that the update happened correctly. The lag time should + // be close to 0, but giving it extra time to account for slow + // tests. + ASSERT_LT(lag->lag_ms(), 200); + + // Set the timestamp to some time in the future to verify that the + // metric can correctly deal with this case. + lag->UpdateTimestampInMilliseconds(now_ms * 2); + ASSERT_EQ(0, lag->lag_ms()); + } + + MetricRegistry registry_; + scoped_refptr entity_; +}; + +METRIC_DEFINE_lag(lag_metric_test_entity, lag_simple, "Test MillisLag", + "Test MillisLag Description"); +TEST_F(LagMetricsTest, SimpleLagTest) { +ASSERT_NO_FATALS(DoLagTest(&METRIC_lag_simple)); +} + +METRIC_DEFINE_lag(lag_metric_test_entity, atomic_lag_simple, "Test Atomic MillisLag", + "Test Atomic MillisLag Description"); +TEST_F(LagMetricsTest, SimpleAtomicLagTest) { +ASSERT_NO_FATALS(DoLagTest(&METRIC_atomic_lag_simple)); +} + +} // namespace yb diff --git a/src/yb/util/metrics-test.cc b/src/yb/util/metrics-test.cc index e6209831668c..47b58a7b6acb 100644 --- a/src/yb/util/metrics-test.cc +++ b/src/yb/util/metrics-test.cc @@ -66,22 +66,6 @@ class MetricsTest : public YBTest { } protected: - template - void DoLagTest(const MillisLagPrototype& metric) { - auto lag = new LagType(&metric); - ASSERT_EQ(metric.description(), lag->prototype()->description()); - auto now_ms = std::chrono::duration_cast( - std::chrono::system_clock::now().time_since_epoch()).count(); - SleepFor(MonoDelta::FromMilliseconds(100)); - ASSERT_LT(now_ms, lag->lag_ms()); - lag->UpdateTimestampInMilliseconds(now_ms); - ASSERT_GT(10000, lag->lag_ms()); - // Set the timestamp to some time in the future to verify that the metric can correctly deal - // with this case. - lag->UpdateTimestampInMilliseconds(now_ms * 2); - ASSERT_EQ(0, lag->lag_ms()); - } - MetricRegistry registry_; scoped_refptr entity_; }; @@ -100,17 +84,6 @@ TEST_F(MetricsTest, SimpleCounterTest) { ASSERT_EQ(3, requests->value()); } -METRIC_DEFINE_lag(test_entity, lag_simple, "Test MillisLag", "Test MillisLag Description"); -TEST_F(MetricsTest, SimpleLagTest) { - ASSERT_NO_FATALS(DoLagTest(METRIC_lag_simple)); -} - -METRIC_DEFINE_lag(test_entity, atomic_lag_simple, "Test Atomic MillisLag", - "Test Atomic MillisLag Description"); -TEST_F(MetricsTest, SimpleAtomicLagTest) { - ASSERT_NO_FATALS(DoLagTest(METRIC_atomic_lag_simple)); -} - METRIC_DEFINE_gauge_uint64(test_entity, fake_memory_usage, "Memory Usage", MetricUnit::kBytes, "Test Gauge 1"); diff --git a/src/yb/util/metrics.cc b/src/yb/util/metrics.cc index 7557d44a5220..863f2b975a25 100644 --- a/src/yb/util/metrics.cc +++ b/src/yb/util/metrics.cc @@ -774,11 +774,12 @@ CHECKED_STATUS Counter::WriteForPrometheus( // scoped_refptr MillisLagPrototype::Instantiate( - const scoped_refptr& entity) { - return entity->FindOrCreateMillisLag(this); + const scoped_refptr& entity, const scoped_refptr& clock) { + return entity->FindOrCreateMillisLag(this, clock); } -MillisLag::MillisLag(const MillisLagPrototype* proto) : Metric(proto) { +MillisLag::MillisLag(const MillisLagPrototype* proto, const scoped_refptr& clock) + : Metric(proto), clock_(clock), timestamp_ms_(clock_->Now().GetPhysicalValueMillis()) { } Status MillisLag::WriteAsJson(JsonWriter* writer, const MetricJsonOptions& opts) const { @@ -791,7 +792,7 @@ Status MillisLag::WriteAsJson(JsonWriter* writer, const MetricJsonOptions& opts) prototype_->WriteFields(writer, opts); writer->String("value"); - writer->Int64(lag_ms()); + writer->Uint64(lag_ms()); writer->EndObject(); return Status::OK(); @@ -807,6 +808,11 @@ Status MillisLag::WriteForPrometheus( return writer->WriteSingleEntry(attr, prototype_->name(), lag_ms()); } +AtomicMillisLag::AtomicMillisLag(const MillisLagPrototype* proto, + const scoped_refptr& clock) + : MillisLag(proto, clock), atomic_timestamp_ms_(clock_->Now().GetPhysicalValueMillis()) { +} + Status AtomicMillisLag::WriteAsJson(JsonWriter* writer, const MetricJsonOptions& opts) const { if (prototype_->level() < opts.level) { return Status::OK(); @@ -817,7 +823,7 @@ Status AtomicMillisLag::WriteAsJson(JsonWriter* writer, const MetricJsonOptions& prototype_->WriteFields(writer, opts); writer->String("value"); - writer->Int64(this->lag_ms()); + writer->Uint64(this->lag_ms()); writer->EndObject(); return Status::OK(); diff --git a/src/yb/util/metrics.h b/src/yb/util/metrics.h index 9e218d402541..7fb7b6d32785 100644 --- a/src/yb/util/metrics.h +++ b/src/yb/util/metrics.h @@ -258,6 +258,8 @@ #include "yb/gutil/ref_counted.h" #include "yb/gutil/singleton.h" +#include "yb/server/clock.h" + #include "yb/util/atomic.h" #include "yb/util/jsonwriter.h" #include "yb/util/locks.h" @@ -558,8 +560,11 @@ class MetricEntity : public RefCountedThreadSafe { ExternalPrometheusMetricsCb; scoped_refptr FindOrCreateCounter(const CounterPrototype* proto); - scoped_refptr FindOrCreateMillisLag(const MillisLagPrototype* proto); - scoped_refptr FindOrCreateAtomicMillisLag(const MillisLagPrototype* proto); + scoped_refptr FindOrCreateMillisLag(const MillisLagPrototype* proto, + const scoped_refptr& clock); + scoped_refptr FindOrCreateAtomicMillisLag( + const MillisLagPrototype* proto, + const scoped_refptr& clock); scoped_refptr FindOrCreateHistogram(const HistogramPrototype* proto); template @@ -1306,7 +1311,8 @@ class MillisLagPrototype : public MetricPrototype { public: explicit MillisLagPrototype(const MetricPrototype::CtorArgs& args) : MetricPrototype(args) { } - scoped_refptr Instantiate(const scoped_refptr& entity); + scoped_refptr Instantiate(const scoped_refptr& entity, + const scoped_refptr& clock); virtual MetricType::Type type() const override { return MetricType::kLag; } @@ -1319,12 +1325,13 @@ class MillisLagPrototype : public MetricPrototype { // will be in charge of calculating the lag by doing now() - metric_timestamp_. class MillisLag : public Metric { public: - virtual int64_t lag_ms() const { - return std::max(static_cast(0), - static_cast(std::chrono::duration_cast( - std::chrono::system_clock::now().time_since_epoch()).count()) - timestamp_ms_); + MillisLag(const MillisLagPrototype* proto, const scoped_refptr& clock); + + virtual uint64_t lag_ms() const { + auto now = clock_->Now().GetPhysicalValueMillis(); + return now > timestamp_ms_ ? now - timestamp_ms_ : 0; } - virtual void UpdateTimestampInMilliseconds(int64_t timestamp) { + virtual void UpdateTimestampInMilliseconds(uint64_t timestamp) { timestamp_ms_ = timestamp; } virtual CHECKED_STATUS WriteAsJson(JsonWriter* w, @@ -1333,28 +1340,24 @@ class MillisLag : public Metric { PrometheusWriter* writer, const MetricEntity::AttributeMap& attr, const MetricPrometheusOptions& opts) const override; - private: - friend class MetricEntity; - friend class AtomicMillisLag; - friend class MetricsTest; - - explicit MillisLag(const MillisLagPrototype* proto); + protected: + const scoped_refptr& clock_; - int64_t timestamp_ms_; + private: + uint64_t timestamp_ms_; }; class AtomicMillisLag : public MillisLag { public: - explicit AtomicMillisLag(const MillisLagPrototype* proto) : MillisLag(proto) {} + AtomicMillisLag(const MillisLagPrototype* proto, const scoped_refptr& clock); - int64_t lag_ms() const override { - return std::max(static_cast(0), - static_cast(std::chrono::duration_cast( - std::chrono::system_clock::now().time_since_epoch()).count()) - - atomic_timestamp_ms_.load(std::memory_order_acquire)); + uint64_t lag_ms() const override { + auto now = clock_->Now().GetPhysicalValueMillis(); + auto timestamp = atomic_timestamp_ms_.load(std::memory_order_acquire); + return now > timestamp ? now - timestamp : 0; } - void UpdateTimestampInMilliseconds(int64_t timestamp) override { + void UpdateTimestampInMilliseconds(uint64_t timestamp) override { atomic_timestamp_ms_.store(timestamp, std::memory_order_release); } @@ -1372,7 +1375,8 @@ class AtomicMillisLag : public MillisLag { } protected: - std::atomic atomic_timestamp_ms_; + std::atomic atomic_timestamp_ms_; + private: DISALLOW_COPY_AND_ASSIGN(AtomicMillisLag); }; @@ -1501,25 +1505,25 @@ inline scoped_refptr MetricEntity::FindOrCreateCounter( } inline scoped_refptr MetricEntity::FindOrCreateMillisLag( - const MillisLagPrototype* proto) { + const MillisLagPrototype* proto, const scoped_refptr& clock) { CheckInstantiation(proto); std::lock_guard l(lock_); scoped_refptr m = down_cast(FindPtrOrNull(metric_map_, proto).get()); if (!m) { - m = new MillisLag(proto); + m = new MillisLag(proto, clock); InsertOrDie(&metric_map_, proto, m); } return m; } inline scoped_refptr MetricEntity::FindOrCreateAtomicMillisLag( - const MillisLagPrototype* proto) { + const MillisLagPrototype* proto, const scoped_refptr& clock) { CheckInstantiation(proto); std::lock_guard l(lock_); scoped_refptr m = down_cast( FindPtrOrNull(metric_map_, proto).get()); if (!m) { - m = new AtomicMillisLag(proto); + m = new AtomicMillisLag(proto, clock); InsertOrDie(&metric_map_, proto, m); } return m; diff --git a/src/yb/util/physical_time.h b/src/yb/util/physical_time.h index 3bb7098fbd2e..944286284858 100644 --- a/src/yb/util/physical_time.h +++ b/src/yb/util/physical_time.h @@ -21,6 +21,7 @@ namespace yb { using MicrosTime = uint64_t; +using MillisTime = uint64_t; struct PhysicalTime { MicrosTime time_point;