Skip to content

Commit

Permalink
feat(new_metrics): migrate replica-level metrics for pegasus_manual_c…
Browse files Browse the repository at this point in the history
…ompact_service (#1443)

#1441

In perf counters, all of the 2 metrics of `pegasus_manual_compact_service`
are about the numbers of tasks of rocksdb manual compaction: one is  the
number of current queued tasks, while another is the number of current
running tasks. Both metrics are server-level.

They would become replica-level after migrating to the new metrics, based
on which server-level ones could also be achieved. A convenient class
`auto_count` is also provided to increment gauge that will be decremented
automatically at the end of the scope.
  • Loading branch information
empiredan authored and wangdan committed May 5, 2023
1 parent af3cad2 commit 142c44c
Show file tree
Hide file tree
Showing 4 changed files with 108 additions and 31 deletions.
37 changes: 19 additions & 18 deletions src/server/pegasus_manual_compact_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,16 +29,27 @@
#include "base/pegasus_const.h"
#include "common/replication.codes.h"
#include "pegasus_server_impl.h"
#include "perf_counter/perf_counter.h"
#include "runtime/api_layer1.h"
#include "runtime/task/async_calls.h"
#include "runtime/task/task_code.h"
#include "utils/autoref_ptr.h"
#include "utils/flags.h"
#include "utils/fmt_logging.h"
#include "utils/string_conv.h"
#include "utils/string_view.h"
#include "utils/strings.h"
#include "utils/time_utils.h"

METRIC_DEFINE_gauge_int64(replica,
rdb_manual_compact_queued_tasks,
dsn::metric_unit::kTasks,
"The number of current queued tasks of rocksdb manual compaction");

METRIC_DEFINE_gauge_int64(replica,
rdb_manual_compact_running_tasks,
dsn::metric_unit::kTasks,
"The number of current running tasks of rocksdb manual compaction");

namespace pegasus {
namespace server {

Expand All @@ -58,17 +69,10 @@ pegasus_manual_compact_service::pegasus_manual_compact_service(pegasus_server_im
_manual_compact_enqueue_time_ms(0),
_manual_compact_start_running_time_ms(0),
_manual_compact_last_finish_time_ms(0),
_manual_compact_last_time_used_ms(0)
_manual_compact_last_time_used_ms(0),
METRIC_VAR_INIT_replica(rdb_manual_compact_queued_tasks),
METRIC_VAR_INIT_replica(rdb_manual_compact_running_tasks)
{
_pfc_manual_compact_enqueue_count.init_app_counter("app.pegasus",
"manual.compact.enqueue.count",
COUNTER_TYPE_NUMBER,
"current manual compact in queue count");

_pfc_manual_compact_running_count.init_app_counter("app.pegasus",
"manual.compact.running.count",
COUNTER_TYPE_NUMBER,
"current manual compact running count");
}

void pegasus_manual_compact_service::init_last_finish_time_ms(uint64_t last_finish_time_ms)
Expand Down Expand Up @@ -106,9 +110,9 @@ void pegasus_manual_compact_service::start_manual_compact_if_needed(
rocksdb::CompactRangeOptions options;
extract_manual_compact_opts(envs, compact_rule, options);

_pfc_manual_compact_enqueue_count->increment();
METRIC_VAR_INCREMENT(rdb_manual_compact_queued_tasks);
dsn::tasking::enqueue(LPC_MANUAL_COMPACT, &_app->_tracker, [this, options]() {
_pfc_manual_compact_enqueue_count->decrement();
METRIC_VAR_DECREMENT(rdb_manual_compact_queued_tasks);
manual_compact(options);
});
} else {
Expand Down Expand Up @@ -295,9 +299,8 @@ void pegasus_manual_compact_service::manual_compact(const rocksdb::CompactRangeO
}

// if current running count exceeds the limit, it would not to be started.
_pfc_manual_compact_running_count->increment();
if (_pfc_manual_compact_running_count->get_integer_value() > _max_concurrent_running_count) {
_pfc_manual_compact_running_count->decrement();
METRIC_VAR_AUTO_COUNT(rdb_manual_compact_running_tasks);
if (METRIC_VAR_VALUE(rdb_manual_compact_running_tasks) > _max_concurrent_running_count) {
LOG_INFO_PREFIX("ignored compact because exceed max_concurrent_running_count({})",
_max_concurrent_running_count.load());
_manual_compact_enqueue_time_ms.store(0);
Expand All @@ -307,8 +310,6 @@ void pegasus_manual_compact_service::manual_compact(const rocksdb::CompactRangeO
uint64_t start = begin_manual_compact();
uint64_t finish = _app->do_manual_compact(options);
end_manual_compact(start, finish);

_pfc_manual_compact_running_count->decrement();
}

uint64_t pegasus_manual_compact_service::begin_manual_compact()
Expand Down
6 changes: 3 additions & 3 deletions src/server/pegasus_manual_compact_service.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@
#include <string>

#include "metadata_types.h"
#include "perf_counter/perf_counter_wrapper.h"
#include "replica/replica_base.h"
#include "utils/metrics.h"

namespace rocksdb {
struct CompactRangeOptions;
Expand Down Expand Up @@ -97,8 +97,8 @@ class pegasus_manual_compact_service : public dsn::replication::replica_base
std::atomic<uint64_t> _manual_compact_last_finish_time_ms;
std::atomic<uint64_t> _manual_compact_last_time_used_ms;

::dsn::perf_counter_wrapper _pfc_manual_compact_enqueue_count;
::dsn::perf_counter_wrapper _pfc_manual_compact_running_count;
METRIC_VAR_DECLARE_gauge_int64(rdb_manual_compact_queued_tasks);
METRIC_VAR_DECLARE_gauge_int64(rdb_manual_compact_running_tasks);
};

} // namespace server
Expand Down
57 changes: 47 additions & 10 deletions src/utils/metrics.h
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ class error_code;
#define METRIC_VAR_INIT_partition(name, ...) METRIC_VAR_INIT(name, partition, ##__VA_ARGS__)
#define METRIC_VAR_INIT_backup_policy(name, ...) METRIC_VAR_INIT(name, backup_policy, ##__VA_ARGS__)

// Perform increment-related operations on metrics including gauge and counter.
// Perform increment-related operations on gauges and counters.
#define METRIC_VAR_INCREMENT_BY(name, x) \
do { \
const auto v = (x); \
Expand All @@ -179,17 +179,21 @@ class error_code;
} \
} while (0)

// Perform increment() operations on gauges and counters.
#define METRIC_VAR_INCREMENT(name) _##name->increment()

// Perform set() operations on metrics including gauge and percentile.
// Perform decrement() operations on gauges.
#define METRIC_VAR_DECREMENT(name) _##name->decrement()

// Perform set() operations on gauges and percentiles.
//
// There are 2 kinds of invocations of set() for a metric:
// * set(val): set a single value for a metric, such as gauge, percentile;
// * set(n, val): set multiple repeated values (the number of duplicates is n) for a metric,
// such as percentile.
#define METRIC_VAR_SET(name, ...) _##name->set(__VA_ARGS__)

// Read the current measurement of the metric.
// Read the current measurement of gauges and counters.
#define METRIC_VAR_VALUE(name) _##name->value()

// Convenient macro that is used to compute latency automatically, which is dedicated to percentile.
Expand All @@ -198,6 +202,10 @@ class error_code;

#define METRIC_VAR_AUTO_LATENCY_DURATION_NS(name) __##name##_auto_latency.duration_ns()

// Convenient macro that is used to increment/decrement gauge automatically in current scope.
#define METRIC_VAR_AUTO_COUNT(name, ...) \
dsn::auto_count __##name##_auto_count(_##name, ##__VA_ARGS__)

#define METRIC_DEFINE_INCREMENT_BY(name) \
void increment_##name##_by(int64_t x) { METRIC_VAR_INCREMENT_BY(name, x); }

Expand Down Expand Up @@ -650,6 +658,7 @@ enum class metric_unit : size_t
kWrites,
kChanges,
kOperations,
kTasks,
kDisconnections,
kServers,
kInvalidUnit,
Expand Down Expand Up @@ -1433,22 +1442,22 @@ using floating_percentile_prototype =
class auto_latency
{
public:
auto_latency(const percentile_ptr<int64_t> &percentile) : _percentile(percentile) {}
auto_latency(const percentile_ptr<int64_t> &p) : _percentile(p) {}

auto_latency(const percentile_ptr<int64_t> &percentile, std::function<void(uint64_t)> callback)
: _percentile(percentile), _callback(std::move(callback))
auto_latency(const percentile_ptr<int64_t> &p, std::function<void(uint64_t)> callback)
: _percentile(p), _callback(std::move(callback))
{
}

auto_latency(const percentile_ptr<int64_t> &percentile, uint64_t start_time_ns)
: _percentile(percentile), _chrono(start_time_ns)
auto_latency(const percentile_ptr<int64_t> &p, uint64_t start_time_ns)
: _percentile(p), _chrono(start_time_ns)
{
}

auto_latency(const percentile_ptr<int64_t> &percentile,
auto_latency(const percentile_ptr<int64_t> &p,
uint64_t start_time_ns,
std::function<void(uint64_t)> callback)
: _percentile(percentile), _chrono(start_time_ns), _callback(std::move(callback))
: _percentile(p), _chrono(start_time_ns), _callback(std::move(callback))
{
}

Expand All @@ -1473,6 +1482,34 @@ class auto_latency
DISALLOW_COPY_AND_ASSIGN(auto_latency);
};

// Increment gauge and decrement it automatically at the end of the scope.
class auto_count
{
public:
auto_count(const gauge_ptr<int64_t> &g) : _gauge(g) { _gauge->increment(); }

auto_count(const gauge_ptr<int64_t> &g, std::function<void()> callback)
: _gauge(g), _callback(std::move(callback))
{
_gauge->increment();
}

~auto_count()
{
if (_callback) {
_callback();
}

_gauge->decrement();
}

private:
gauge_ptr<int64_t> _gauge;
std::function<void()> _callback;

DISALLOW_COPY_AND_ASSIGN(auto_count);
};

} // namespace dsn

// Since server_metric_entity() will be called in macros such as METRIC_VAR_INIT_server(), its
Expand Down
39 changes: 39 additions & 0 deletions src/utils/test/metrics_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3096,6 +3096,8 @@ class MetricVarTest : public testing::Test
void test_set_percentile(const std::vector<int64_t> &expected_samples);
void test_set_percentile(size_t n, int64_t val);

void test_auto_count();

const metric_entity_ptr _my_replica_metric_entity;
METRIC_VAR_DECLARE_gauge_int64(test_replica_gauge_int64);
METRIC_VAR_DECLARE_counter(test_replica_counter);
Expand Down Expand Up @@ -3134,6 +3136,19 @@ void MetricVarTest::test_set_percentile(size_t n, int64_t val)
EXPECT_EQ(std::vector<int64_t>(n, val), METRIC_VAR_SAMPLES(test_replica_percentile_int64_ns));
}

void MetricVarTest::test_auto_count()
{
ASSERT_EQ(0, METRIC_VAR_VALUE(test_replica_gauge_int64));

{
METRIC_VAR_AUTO_COUNT(test_replica_gauge_int64, [this]() {
ASSERT_EQ(1, METRIC_VAR_VALUE(test_replica_gauge_int64));
});
}

ASSERT_EQ(0, METRIC_VAR_VALUE(test_replica_gauge_int64));
}

#define TEST_METRIC_VAR_INCREMENT(name) \
do { \
ASSERT_EQ(0, METRIC_VAR_VALUE(name)); \
Expand All @@ -3155,6 +3170,28 @@ TEST_F(MetricVarTest, IncrementGauge) { TEST_METRIC_VAR_INCREMENT(test_replica_g

TEST_F(MetricVarTest, IncrementCounter) { TEST_METRIC_VAR_INCREMENT(test_replica_counter); }

#define TEST_METRIC_VAR_DECREMENT(name) \
do { \
ASSERT_EQ(0, METRIC_VAR_VALUE(name)); \
\
METRIC_VAR_INCREMENT_BY(name, 11); \
ASSERT_EQ(11, METRIC_VAR_VALUE(name)); \
\
METRIC_VAR_DECREMENT(name); \
ASSERT_EQ(10, METRIC_VAR_VALUE(name)); \
\
METRIC_VAR_DECREMENT(name); \
ASSERT_EQ(9, METRIC_VAR_VALUE(name)); \
\
METRIC_VAR_INCREMENT(name); \
ASSERT_EQ(10, METRIC_VAR_VALUE(name)); \
\
METRIC_VAR_DECREMENT(name); \
ASSERT_EQ(9, METRIC_VAR_VALUE(name)); \
} while (0);

TEST_F(MetricVarTest, DecrementGauge) { TEST_METRIC_VAR_DECREMENT(test_replica_gauge_int64); }

TEST_F(MetricVarTest, SetGauge)
{
ASSERT_EQ(0, METRIC_VAR_VALUE(test_replica_gauge_int64));
Expand Down Expand Up @@ -3195,4 +3232,6 @@ TEST_F(MetricVarTest, AutoLatencyMilliSeconds) { TEST_METRIC_VAR_AUTO_LATENCY(ms

TEST_F(MetricVarTest, AutoLatencySeconds) { TEST_METRIC_VAR_AUTO_LATENCY(s, 1000 * 1000 * 1000); }

TEST_F(MetricVarTest, AutoCount) { ASSERT_NO_FATAL_FAILURE(test_auto_count()); }

} // namespace dsn

0 comments on commit 142c44c

Please sign in to comment.