Skip to content

Commit

Permalink
feat(new_metrics): implement the counter (XiaoMi#1081)
Browse files Browse the repository at this point in the history
  • Loading branch information
empiredan authored and wangdan committed Jun 20, 2022
1 parent e7c4a6a commit 4c7b929
Show file tree
Hide file tree
Showing 2 changed files with 198 additions and 3 deletions.
54 changes: 54 additions & 0 deletions include/dsn/utility/metrics.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
#include <dsn/utility/autoref_ptr.h>
#include <dsn/utility/casts.h>
#include <dsn/utility/enum_helper.h>
#include <dsn/utility/long_adder.h>
#include <dsn/utility/ports.h>
#include <dsn/utility/singleton.h>
#include <dsn/utility/string_view.h>
Expand Down Expand Up @@ -68,11 +69,27 @@
::dsn::gauge_prototype<int64_t> METRIC_##name({#entity_type, #name, unit, desc, ##__VA_ARGS__})
#define METRIC_DEFINE_gauge_double(entity_type, name, unit, desc, ...) \
::dsn::gauge_prototype<double> METRIC_##name({#entity_type, #name, unit, desc, ##__VA_ARGS__})
// There are 2 kinds of counters:
// - `counter` is the general type of counter that is implemented by striped_long_adder, which can
// achieve high performance while consuming less memory if it's not updated very frequently.
// - `concurrent_counter` uses concurrent_long_adder as the underlying implementation. It has
// higher performance while consuming more memory if it's updated very frequently.
// See also include/dsn/utility/long_adder.h for details.
#define METRIC_DEFINE_counter(entity_type, name, unit, desc, ...) \
::dsn::counter_prototype<::dsn::striped_long_adder> METRIC_##name( \
{#entity_type, #name, unit, desc, ##__VA_ARGS__})
#define METRIC_DEFINE_concurrent_counter(entity_type, name, unit, desc, ...) \
::dsn::counter_prototype<::dsn::concurrent_long_adder> METRIC_##name( \
{#entity_type, #name, unit, desc, ##__VA_ARGS__})

// The following macros act as forward declarations for entity types and metric prototypes.
#define METRIC_DECLARE_entity(name) extern ::dsn::metric_entity_prototype METRIC_ENTITY_##name
#define METRIC_DECLARE_gauge_int64(name) extern ::dsn::gauge_prototype<int64_t> METRIC_##name
#define METRIC_DECLARE_gauge_double(name) extern ::dsn::gauge_prototype<double> METRIC_##name
#define METRIC_DECLARE_counter(name) \
extern ::dsn::counter_prototype<::dsn::striped_long_adder> METRIC_##name
#define METRIC_DECLARE_concurrent_counter(name) \
extern ::dsn::counter_prototype<::dsn::concurrent_long_adder> METRIC_##name

namespace dsn {

Expand Down Expand Up @@ -176,6 +193,7 @@ enum class metric_unit
kMicroSeconds,
kMilliSeconds,
kSeconds,
kRequests,
kInvalidUnit,
};

Expand Down Expand Up @@ -305,4 +323,40 @@ using gauge_ptr = ref_ptr<gauge<T>>;
template <typename T>
using gauge_prototype = metric_prototype_with<gauge<T>>;

// A counter in essence is a 64-bit integer that can be incremented and decremented. It can be
// used to measure the number of tasks in queues, current number of running manual compacts,
// etc. All counters start out at 0.
template <typename Adder = striped_long_adder>
class counter : public metric
{
public:
int64_t value() const { return _adder.value(); }

void increment_by(int64_t x) { _adder.increment_by(x); }
void increment() { _adder.increment(); }
void decrement() { _adder.decrement(); }

void reset() { _adder.reset(); }

protected:
counter(const metric_prototype *prototype) : metric(prototype) {}

virtual ~counter() = default;

private:
friend class metric_entity;
friend class ref_ptr<counter<Adder>>;

long_adder_wrapper<Adder> _adder;

DISALLOW_COPY_AND_ASSIGN(counter);
};

template <typename Adder = striped_long_adder>
using counter_ptr = ref_ptr<counter<Adder>>;
using concurrent_counter_ptr = counter_ptr<concurrent_long_adder>;

template <typename Adder = striped_long_adder>
using counter_prototype = metric_prototype_with<counter<Adder>>;

} // namespace dsn
147 changes: 144 additions & 3 deletions src/utils/test/metrics_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,10 @@
// under the License.

#include <dsn/utility/metrics.h>
#include <dsn/utility/rand.h>

#include <thread>
#include <vector>

#include <gtest/gtest.h>

Expand Down Expand Up @@ -82,6 +86,16 @@ METRIC_DEFINE_gauge_double(my_server,
dsn::metric_unit::kSeconds,
"a server-level gauge of double type for test");

METRIC_DEFINE_counter(my_server,
test_counter,
dsn::metric_unit::kRequests,
"a server-level counter for test");

METRIC_DEFINE_concurrent_counter(my_server,
test_concurrent_counter,
dsn::metric_unit::kRequests,
"a server-level concurrent_counter for test");

namespace dsn {

TEST(metrics_test, create_entity)
Expand Down Expand Up @@ -238,7 +252,6 @@ TEST(metrics_test, recreate_metric)

TEST(metrics_test, gauge_int64)
{

// Test cases:
// - create a gauge of int64 type without initial value, then increase
// - create a gauge of int64 type without initial value, then decrease
Expand Down Expand Up @@ -280,7 +293,6 @@ TEST(metrics_test, gauge_int64)

TEST(metrics_test, gauge_double)
{

// Test cases:
// - create a gauge of double type without initial value, then increase
// - create a gauge of double type without initial value, then decrease
Expand Down Expand Up @@ -313,11 +325,140 @@ TEST(metrics_test, gauge_double)
ASSERT_DOUBLE_EQ(my_metric->value(), test.new_value);

auto metrics = my_server_entity->metrics();
ASSERT_EQ(static_cast<metric *>(metrics[&METRIC_test_gauge_double].get()), my_metric.get());
ASSERT_EQ(metrics[&METRIC_test_gauge_double].get(), static_cast<metric *>(my_metric.get()));

ASSERT_EQ(my_metric->prototype(),
static_cast<const metric_prototype *>(&METRIC_test_gauge_double));
}
}

void execute(int64_t num_threads, std::function<void(int)> runner)
{
std::vector<std::thread> threads;
for (int64_t i = 0; i < num_threads; i++) {
threads.emplace_back([i, &runner]() { runner(i); });
}
for (auto &t : threads) {
t.join();
}
}

template <typename Adder>
void run_counter_increment_by(::dsn::counter_ptr<Adder> &my_metric,
int64_t base_value,
int64_t num_operations,
int64_t num_threads,
int64_t &result)
{
std::vector<int64_t> deltas;
int64_t n = num_operations * num_threads;
deltas.reserve(n);

int64_t expected_value = base_value;
for (int64_t i = 0; i < n; ++i) {
auto delta = static_cast<int64_t>(dsn::rand::next_u64(1000000));
if (delta % 3 == 0) {
delta = -delta;
}
expected_value += delta;
deltas.push_back(delta);
}

execute(num_threads, [num_operations, &my_metric, &deltas](int tid) mutable {
for (int64_t i = 0; i < num_operations; ++i) {
my_metric->increment_by(deltas[tid * num_operations + i]);
}
});
ASSERT_EQ(my_metric->value(), expected_value);
result = expected_value;
}

template <typename Adder>
void run_counter_increment(::dsn::counter_ptr<Adder> &my_metric,
int64_t base_value,
int64_t num_operations,
int64_t num_threads,
int64_t &result)
{
execute(num_threads, [num_operations, &my_metric](int) mutable {
for (int64_t i = 0; i < num_operations; ++i) {
my_metric->increment();
}
});

int64_t expected_value = base_value + num_operations * num_threads;
ASSERT_EQ(my_metric->value(), expected_value);
result = expected_value;
}

template <typename Adder>
void run_counter_decrement(::dsn::counter_ptr<Adder> &my_metric,
int64_t base_value,
int64_t num_operations,
int64_t num_threads,
int64_t &result)
{
execute(num_threads, [num_operations, &my_metric](int) mutable {
for (int64_t i = 0; i < num_operations; ++i) {
my_metric->decrement();
}
});

int64_t expected_value = base_value - num_operations * num_threads;
ASSERT_EQ(my_metric->value(), expected_value);
result = expected_value;
}

template <typename Adder>
void run_counter_cases(::dsn::counter_prototype<Adder> *prototype, int64_t num_threads)
{
// Test cases:
// - test the counter with small-scale computations
// - test the counter with large-scale computations
struct test_case
{
std::string entity_id;
int64_t increments_by;
int64_t increments;
int64_t decrements;
} tests[] = {{"server_9", 100, 1000, 1000}, {"server_10", 1000000, 10000000, 10000000}};

for (const auto &test : tests) {
auto my_server_entity = METRIC_ENTITY_my_server.instantiate(test.entity_id);

auto my_metric = prototype->instantiate(my_server_entity);

int64_t value = 0;
ASSERT_EQ(my_metric->value(), value);
run_counter_increment_by(my_metric, value, test.increments_by, num_threads, value);
run_counter_increment(my_metric, value, test.increments, num_threads, value);
run_counter_decrement(my_metric, value, test.decrements, num_threads, value);

my_metric->reset();
ASSERT_EQ(my_metric->value(), 0);

auto metrics = my_server_entity->metrics();
ASSERT_EQ(metrics[prototype].get(), static_cast<metric *>(my_metric.get()));

ASSERT_EQ(my_metric->prototype(), prototype);
}
}

template <typename Adder>
void run_counter_cases(::dsn::counter_prototype<Adder> *prototype)
{
// Do single-threaded tests
run_counter_cases(prototype, 1);

// Do multi-threaded tests
run_counter_cases(prototype, 4);
}

TEST(metrics_test, counter)
{
// Test both kinds of counter
run_counter_cases<striped_long_adder>(&METRIC_test_counter);
run_counter_cases<concurrent_long_adder>(&METRIC_test_concurrent_counter);
}

} // namespace dsn

0 comments on commit 4c7b929

Please sign in to comment.