Skip to content

Commit

Permalink
[#24565] docdb: Pre-Aggregate Metrics for Faster Scraping
Browse files Browse the repository at this point in the history
Summary:
**Background**
The current metric scraping approach aggregates metrics during the scrape, which has caused performance bottlenecks, leading to frequent Prometheus target downtimes for customers. We observed metric scrape times exceeding 15 seconds with 4,000 tables and 18,000 tablets, with most of this time consumed by aggregating tablet metrics to the table level. This update introduces pre-aggregation for tablet metrics that are summed, allowing metric scrapes to skip aggregation of most tablet metrics, significantly reducing the scrape duration.

**Key Aspects of Pre-Aggregation:**
Pre-aggregation is supported for tablet, xCluster, and CDCSDK metrics. Below, we use tablet metrics as an example:
1. Pre-Aggregation Setup: During metric creation, pre-aggregation is enabled based on the metric's entity type and aggregation function. Only tablet and stream metrics with a sum aggregation function are eligible for pre-aggregation, while other metrics are handled at scrape time.
2. Shared Atomic Variable: Pre-aggregation creates an Atomic Integer variable shared across all instances of the same metric within a table(or stream). When a pre-aggregated metric value is updated, the shared Atomic Integer variable is also updated accordingly.
3. Metric Destruction Handling: When a pre-aggregated tablet metric object is destroyed (e.g., due to tablet move), the shared aggregated value is decremented by the tablet metric value to maintain accuracy.

Contention concern for (2): With many concurrent read or write operations, contention may occur, as updating a tablet metric value must compete with other threads updating the same table-level value. To verify performance impact, I ran several Sysbench read-only and and write-only workload, which showed no noticeable impact. [[ https://docs.google.com/spreadsheets/d/1O-RtRWWLkZYNTnjeNWLvrocenIpMtwCb9urjtNkSR9c/edit | Link to results ]].

**New Metric Scraping Steps:**
1. Handling Non-Pre-Aggregated Metrics:
     * Metrics that need to be aggregated at scrape time are aggregated in this step.
     * Metrics that do not require aggregation are flushed directly in this step.
2. Flushing pre-aggregated metrics and scrape-time-aggregated metrics.
After completing these two phases, a separate asynchronous thread handles cleanup. This cleanup removes unreferenced metric values (e.g., when a table is removed, and no tablets reference the shared aggregated value) and cleans up attributes associated with pre-aggregated values.

With these enhancements, scrape time has improved from 15 seconds to 2 seconds for 4,000 tables and 18,000 tablets.

**Other Changes:**
* Addressed a potential issue where aggregated metrics using the max aggregation function were assumed to always be greater than or equal to zero. However, negative values are possible and are now correctly handled.
* Redesigned D35689: The aggregated metric now holds a shared_ptr to its prototype to ensure the OwningPrototype is not deleted before the flush operation.
Jira: DB-13599

Test Plan:
Jenkins
MetricsTest.AggregationTest

Reviewers: esheng, mlillibridge, rthallam, amitanand

Reviewed By: amitanand

Subscribers: amitanand, hsunder, yql, kannan, ybase

Differential Revision: https://phorge.dev.yugabyte.com/D39667
  • Loading branch information
yusong-yan committed Dec 31, 2024
1 parent cb0dfba commit 6486578
Show file tree
Hide file tree
Showing 27 changed files with 1,670 additions and 558 deletions.
4 changes: 4 additions & 0 deletions src/yb/gutil/ref_counted.cc
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,10 @@ bool RefCountedThreadSafeBase::HasOneRef() const {
return ref_count_.load(std::memory_order_acquire) == 1;
}

bool RefCountedThreadSafeBase::HasTwoRef() const {
return ref_count_.load(std::memory_order_acquire) == 2;
}

RefCountedThreadSafeBase::~RefCountedThreadSafeBase() {
#ifndef NDEBUG
DCHECK(in_dtor_) << "RefCountedThreadSafe object deleted without "
Expand Down
1 change: 1 addition & 0 deletions src/yb/gutil/ref_counted.h
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ class RefCountedBase {
class RefCountedThreadSafeBase {
public:
bool HasOneRef() const;
bool HasTwoRef() const;

protected:
RefCountedThreadSafeBase() = default;
Expand Down
28 changes: 7 additions & 21 deletions src/yb/integration-tests/cdcsdk_ysql-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,9 @@
#include "yb/tserver/tablet_server.h"
#include "yb/tserver/ts_tablet_manager.h"

#include "yb/util/metrics.h"
#include "yb/util/test_macros.h"
#include "yb/util/tostring.h"
#include "yb/util/metric_entity.h"

namespace yb {

Expand Down Expand Up @@ -4245,13 +4245,6 @@ TEST_F(CDCSDKYsqlTest, YB_DISABLE_TEST_IN_TSAN(TestCDCSDKMetricsTwoTablesSingleS
int64_t total_traffic_sent = 0;
uint64_t total_change_event_count = 0;

std::stringstream output;
MetricPrometheusOptions opts;
PrometheusWriter writer(&output, opts);

std::unordered_map<std::string, std::string> attr;
auto aggregation_level = kStreamLevel;

for (uint32_t idx = 0; idx < num_tables; idx++) {
ASSERT_OK(
WriteRowsHelper(1, 50, &test_cluster_, true, 2, (kTableName + table_suffix[idx]).c_str()));
Expand All @@ -4276,20 +4269,13 @@ TEST_F(CDCSDKYsqlTest, YB_DISABLE_TEST_IN_TSAN(TestCDCSDKMetricsTwoTablesSingleS
return current_expiry_time > metrics[idx]->cdcsdk_expiry_time_ms->value();
},
MonoDelta::FromSeconds(10) * kTimeMultiplier, "Wait for stream expiry time update."));

attr["namespace_name"] = kNamespaceName;
attr["stream_id"] = stream_id.ToString();
attr["metric_type"] = "cdcsdk";

ASSERT_OK(metrics[idx]->cdcsdk_change_event_count->WriteForPrometheus(
&writer, attr, opts, aggregation_level));
ASSERT_OK(metrics[idx]->cdcsdk_traffic_sent->WriteForPrometheus(
&writer, attr, opts, aggregation_level));
}
auto aggregated_change_event_count =
writer.TEST_GetAggregatedValue("cdcsdk_change_event_count", stream_id.ToString());
auto aggregated_traffic_sent =
writer.TEST_GetAggregatedValue("cdcsdk_traffic_sent", stream_id.ToString());

auto metrics_aggregator = tserver->metric_registry()->TEST_metrics_aggregator();
auto aggregated_change_event_count = metrics_aggregator->TEST_GetMetricPreAggregatedValue(
"cdcsdk_change_event_count", stream_id.ToString());
auto aggregated_traffic_sent = metrics_aggregator->TEST_GetMetricPreAggregatedValue(
"cdcsdk_traffic_sent", stream_id.ToString());

ASSERT_GT(aggregated_traffic_sent, 100);
ASSERT_GT(total_record_size, 100);
Expand Down
2 changes: 1 addition & 1 deletion src/yb/rpc/service_pool.cc
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ class ServicePoolImpl final : public InboundCallHandler {
EscapeMetricNameForPrometheus(&id);
string description = id + " metric for ServicePoolImpl";
rpcs_in_queue_ = entity->FindOrCreateMetric<AtomicGauge<int64_t>>(
std::unique_ptr<GaugePrototype<int64_t>>(new OwningGaugePrototype<int64_t>(
std::shared_ptr<GaugePrototype<int64_t>>(new OwningGaugePrototype<int64_t>(
entity->prototype().name(), std::move(id),
description, MetricUnit::kRequests, description, MetricLevel::kInfo)),
static_cast<int64>(0) /* initial_value */);
Expand Down
17 changes: 13 additions & 4 deletions src/yb/tserver/tablet_server-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -317,11 +317,20 @@ TEST_F(TabletServerTest, TestSetFlagsAndCheckWebPages) {
ASSERT_OK(c.FetchURL(Substitute("http://$0/prometheus-metrics?reset_histograms=false", addr),
&buf));
// Find our target metric and concatenate value zero to it. metric_instance_with_zero_value is a
// string looks like: handler_latency_yb_tserver_TabletServerService_Write{quantile=p50.....} 0
// string looks like: handler_latency_yb_tserver_TabletServerService_Write{...quantile=p50...} 0
string page_content = buf.ToString();
std::size_t begin = page_content.find("handler_latency_yb_tserver_TabletServerService_Write"
"{quantile=\"p50\"");
std::size_t end = page_content.find("}", begin);
const auto kMetricLinePrefix = "handler_latency_yb_tserver_TabletServerService_Write{";
std::size_t begin = 0;
std::size_t end = 0;
while ((begin = page_content.find(kMetricLinePrefix, begin)) != std::string::npos) {
end = page_content.find("}", begin);
ASSERT_NE(end, std::string::npos);
if (page_content.find("quantile=\"p50\"", begin) < end) {
break;
}
begin = end + 1;
}
ASSERT_NE(begin, std::string::npos);
string metric_instance_with_zero_value = page_content.substr(begin, end - begin + 1) + " 0";

ASSERT_STR_CONTAINS(buf.ToString(), metric_instance_with_zero_value);
Expand Down
1 change: 1 addition & 0 deletions src/yb/util/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,7 @@ set(UTIL_SRCS
memory/memory.cc
metric_entity.cc
metrics.cc
metrics_aggregator.cc
metrics_writer.cc
monotime.cc
mutex.cc
Expand Down
55 changes: 51 additions & 4 deletions src/yb/util/aggregate_stats.cc
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,11 @@ AggregateStats::AggregateStats(const AggregateStats& other):
min_value_(other.min_value_.Load()),
max_value_(other.max_value_.Load()) {}

AggregateStats::~AggregateStats() {
aggregator_.SumIncrementBy(-total_sum_.Load());
aggregator_.CountIncrementBy(-total_count_.Load());
}

void AggregateStats::IncrementBy(int64_t value, uint64_t count) {
current_sum_.IncrementBy(value * count);
current_count_.IncrementBy(count);
Expand Down Expand Up @@ -67,13 +72,19 @@ double AggregateStats::MeanValue() const {

void AggregateStats::Reset(PreserveTotalStats preserve_total) {
if (preserve_total) {
total_sum_.IncrementBy(current_sum_.Exchange(0));
total_count_.IncrementBy(current_count_.Exchange(0));
int64_t previous_sum = current_sum_.Exchange(0);
int64_t previous_count = current_count_.Exchange(0);
total_sum_.IncrementBy(previous_sum);
total_count_.IncrementBy(previous_count);
aggregator_.SumIncrementBy(previous_sum);
aggregator_.CountIncrementBy(previous_count);
} else {
current_sum_.Store(0);
total_sum_.Store(0);
current_count_.Store(0);
total_count_.Store(0);
int64_t previous_total_sum = total_sum_.Exchange(0);
int64_t previous_total_count = total_count_.Exchange(0);
aggregator_.SumIncrementBy(-previous_total_sum);
aggregator_.CountIncrementBy(-previous_total_count);
}
min_value_.Store(std::numeric_limits<int64_t>::max());
max_value_.Store(std::numeric_limits<int64_t>::min());
Expand All @@ -86,4 +97,40 @@ void AggregateStats::Add(const AggregateStats& other) {
max_value_.StoreMax(other.max_value_.Load());
}

Status AggregateStats::SetUpPreAggregationForPrometheus(
std::shared_ptr<AtomicInt<int64_t>> aggregated_prometheus_sum_value_holder,
std::shared_ptr<AtomicInt<int64_t>> aggregated_prometheus_count_value_holder) {
RETURN_NOT_OK(aggregator_.InitializeValueHolders(
aggregated_prometheus_sum_value_holder, aggregated_prometheus_count_value_holder));
aggregator_.SumIncrementBy(total_sum_.Load());
aggregator_.CountIncrementBy(total_count_.Load());
return Status::OK();
}

bool AggregateStats::IsPreAggregatedForPrometheus() const {
return aggregator_.HasValueHolders();
}

Status AggregateStats::Aggregator::InitializeValueHolders(
std::shared_ptr<AtomicInt<int64_t>> aggregated_prometheus_sum_value_holder,
std::shared_ptr<AtomicInt<int64_t>> aggregated_prometheus_count_value_holder) {
RSTATUS_DCHECK(!HasValueHolders(), IllegalState, "Aggregator value holders are already set.");

if (aggregated_prometheus_sum_value_holder != nullptr &&
aggregated_prometheus_count_value_holder != nullptr) {
sum_holder_ = std::move(aggregated_prometheus_sum_value_holder);
count_holder_ = std::move(aggregated_prometheus_count_value_holder);
return Status::OK();
}

RSTATUS_DCHECK(
(aggregated_prometheus_sum_value_holder == nullptr &&
aggregated_prometheus_count_value_holder == nullptr),
IllegalState,
"Both aggregated_prometheus_sum_value_holder and aggregated_prometheus_count_value_holder "
"must be either nullptr or have values."
);
return Status::OK();
}

} // namespace yb
43 changes: 43 additions & 0 deletions src/yb/util/aggregate_stats.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
#include "yb/gutil/macros.h"

#include "yb/util/atomic.h"
#include "yb/util/status.h"
#include "yb/util/strongly_typed_bool.h"

namespace yb {
Expand All @@ -27,6 +28,8 @@ class AggregateStats {
public:
AggregateStats();

~AggregateStats();

// Copy-construct a (non-consistent) snapshot of other.
explicit AggregateStats(const AggregateStats& other);

Expand Down Expand Up @@ -66,6 +69,44 @@ class AggregateStats {

size_t DynamicMemoryUsage() const { return sizeof(*this); }

Status SetUpPreAggregationForPrometheus(
std::shared_ptr<AtomicInt<int64_t>> aggregated_prometheus_total_sum_value,
std::shared_ptr<AtomicInt<int64_t>> aggregated_prometheus_total_count_value);

bool IsPreAggregatedForPrometheus() const;

// Aggregate total_sum_ and total_count_ from multiple AggregateStats instances.
// Aggregated value holders are updated whenever total_sum_ and total_count_ are updated.
class Aggregator {
public:
Aggregator() = default;

Status InitializeValueHolders(
std::shared_ptr<AtomicInt<int64_t>> aggregated_prometheus_sum_value_holder,
std::shared_ptr<AtomicInt<int64_t>> aggregated_prometheus_count_value_holder);

void SumIncrementBy(int64_t value) {
if (sum_holder_ != nullptr) {
sum_holder_->IncrementBy(value);
}
}

void CountIncrementBy(int64_t value) {
if (count_holder_ != nullptr) {
count_holder_->IncrementBy(value);
}
}

bool HasValueHolders() const {
return sum_holder_ != nullptr && count_holder_ != nullptr;
}

private:
// Both sum_holder_ and count_holder_ must either be nullptrs or non-nullptrs.
std::shared_ptr<AtomicInt<int64_t>> sum_holder_;
std::shared_ptr<AtomicInt<int64_t>> count_holder_;
};

private:
// Non-resetting sum and counts.
AtomicInt<int64_t> total_sum_;
Expand All @@ -76,6 +117,8 @@ class AggregateStats {
AtomicInt<int64_t> min_value_;
AtomicInt<int64_t> max_value_;

Aggregator aggregator_;

AggregateStats& operator=(const AggregateStats& other) = delete;
};

Expand Down
2 changes: 2 additions & 0 deletions src/yb/util/logging.h
Original file line number Diff line number Diff line change
Expand Up @@ -340,6 +340,8 @@ std::ostream& operator<<(std::ostream &os, const PRIVATE_ThrottleMsg&);
#define VLOG_IF_WITH_PREFIX(verboselevel, condition) VLOG_IF(verboselevel, condition) << LogPrefix()
#define VLOG_IF_WITH_FUNC(verboselevel, condition) VLOG_IF(verboselevel, condition) << __func__ \
<< ": "
#define LOG_IF_WITH_PREFIX_AND_FUNC(severity, condition) LOG_IF_WITH_PREFIX(severity, condition) \
<< __func__ << ": "

// DCHECK_ONLY_NOTNULL is like DCHECK_NOTNULL, but does not result in an unused expression in
// release mode, so it is suitable for being used as a stand-alone statement. In other words, use
Expand Down
6 changes: 4 additions & 2 deletions src/yb/util/mem_tracker.cc
Original file line number Diff line number Diff line change
Expand Up @@ -225,8 +225,10 @@ class MemTracker::TrackerMetrics {
}

void Init(const MemTracker& mem_tracker) {
// The GaugePrototype object is owned by the AtomicGauge and is also shared with
// MetricsAggregator or PrometheusWriter when the metric is being aggregated.
metric_ = metric_entity_->FindOrCreateMetric<AtomicGauge<int64_t>>(
std::unique_ptr<GaugePrototype<int64_t>>(new OwningGaugePrototype<int64_t>(
std::shared_ptr<GaugePrototype<int64_t>>(new OwningGaugePrototype<int64_t>(
metric_entity_->prototype().name(), mem_tracker.metric_name(),
CreateMetricLabel(mem_tracker), MetricUnit::kBytes,
CreateMetricDescription(mem_tracker), yb::MetricLevel::kInfo)),
Expand All @@ -239,7 +241,7 @@ class MemTracker::TrackerMetrics {
void operator=(const TrackerMetrics&) = delete;

~TrackerMetrics() {
metric_entity_->Remove(metric_->prototype());
metric_entity_->RemoveFromMetricMap(metric_->prototype());
}

MetricEntityPtr metric_entity_;
Expand Down
Loading

0 comments on commit 6486578

Please sign in to comment.