diff --git a/include/dsn/utility/metrics.h b/include/dsn/utility/metrics.h index 06e76017f..7c1f47900 100644 --- a/include/dsn/utility/metrics.h +++ b/include/dsn/utility/metrics.h @@ -27,6 +27,7 @@ #include #include #include +#include #include #include #include @@ -68,11 +69,27 @@ ::dsn::gauge_prototype METRIC_##name({#entity_type, #name, unit, desc, ##__VA_ARGS__}) #define METRIC_DEFINE_gauge_double(entity_type, name, unit, desc, ...) \ ::dsn::gauge_prototype METRIC_##name({#entity_type, #name, unit, desc, ##__VA_ARGS__}) +// There are 2 kinds of counters: +// - `counter` is the general type of counter that is implemented by striped_long_adder, which can +// achieve high performance while consuming less memory if it's not updated very frequently. +// - `concurrent_counter` uses concurrent_long_adder as the underlying implementation. It has +// higher performance while consuming more memory if it's updated very frequently. +// See also include/dsn/utility/long_adder.h for details. +#define METRIC_DEFINE_counter(entity_type, name, unit, desc, ...) \ + ::dsn::counter_prototype<::dsn::striped_long_adder> METRIC_##name( \ + {#entity_type, #name, unit, desc, ##__VA_ARGS__}) +#define METRIC_DEFINE_concurrent_counter(entity_type, name, unit, desc, ...) \ + ::dsn::counter_prototype<::dsn::concurrent_long_adder> METRIC_##name( \ + {#entity_type, #name, unit, desc, ##__VA_ARGS__}) // The following macros act as forward declarations for entity types and metric prototypes. #define METRIC_DECLARE_entity(name) extern ::dsn::metric_entity_prototype METRIC_ENTITY_##name #define METRIC_DECLARE_gauge_int64(name) extern ::dsn::gauge_prototype METRIC_##name #define METRIC_DECLARE_gauge_double(name) extern ::dsn::gauge_prototype METRIC_##name +#define METRIC_DECLARE_counter(name) \ + extern ::dsn::counter_prototype<::dsn::striped_long_adder> METRIC_##name +#define METRIC_DECLARE_concurrent_counter(name) \ + extern ::dsn::counter_prototype<::dsn::concurrent_long_adder> METRIC_##name namespace dsn { @@ -176,6 +193,7 @@ enum class metric_unit kMicroSeconds, kMilliSeconds, kSeconds, + kRequests, kInvalidUnit, }; @@ -305,4 +323,40 @@ using gauge_ptr = ref_ptr>; template using gauge_prototype = metric_prototype_with>; +// A counter in essence is a 64-bit integer that can be incremented and decremented. It can be +// used to measure the number of tasks in queues, current number of running manual compacts, +// etc. All counters start out at 0. +template +class counter : public metric +{ +public: + int64_t value() const { return _adder.value(); } + + void increment_by(int64_t x) { _adder.increment_by(x); } + void increment() { _adder.increment(); } + void decrement() { _adder.decrement(); } + + void reset() { _adder.reset(); } + +protected: + counter(const metric_prototype *prototype) : metric(prototype) {} + + virtual ~counter() = default; + +private: + friend class metric_entity; + friend class ref_ptr>; + + long_adder_wrapper _adder; + + DISALLOW_COPY_AND_ASSIGN(counter); +}; + +template +using counter_ptr = ref_ptr>; +using concurrent_counter_ptr = counter_ptr; + +template +using counter_prototype = metric_prototype_with>; + } // namespace dsn diff --git a/src/utils/test/metrics_test.cpp b/src/utils/test/metrics_test.cpp index e1b72e2b6..123661c88 100644 --- a/src/utils/test/metrics_test.cpp +++ b/src/utils/test/metrics_test.cpp @@ -16,6 +16,10 @@ // under the License. #include +#include + +#include +#include #include @@ -82,6 +86,16 @@ METRIC_DEFINE_gauge_double(my_server, dsn::metric_unit::kSeconds, "a server-level gauge of double type for test"); +METRIC_DEFINE_counter(my_server, + test_counter, + dsn::metric_unit::kRequests, + "a server-level counter for test"); + +METRIC_DEFINE_concurrent_counter(my_server, + test_concurrent_counter, + dsn::metric_unit::kRequests, + "a server-level concurrent_counter for test"); + namespace dsn { TEST(metrics_test, create_entity) @@ -238,7 +252,6 @@ TEST(metrics_test, recreate_metric) TEST(metrics_test, gauge_int64) { - // Test cases: // - create a gauge of int64 type without initial value, then increase // - create a gauge of int64 type without initial value, then decrease @@ -280,7 +293,6 @@ TEST(metrics_test, gauge_int64) TEST(metrics_test, gauge_double) { - // Test cases: // - create a gauge of double type without initial value, then increase // - create a gauge of double type without initial value, then decrease @@ -313,11 +325,140 @@ TEST(metrics_test, gauge_double) ASSERT_DOUBLE_EQ(my_metric->value(), test.new_value); auto metrics = my_server_entity->metrics(); - ASSERT_EQ(static_cast(metrics[&METRIC_test_gauge_double].get()), my_metric.get()); + ASSERT_EQ(metrics[&METRIC_test_gauge_double].get(), static_cast(my_metric.get())); ASSERT_EQ(my_metric->prototype(), static_cast(&METRIC_test_gauge_double)); } } +void execute(int64_t num_threads, std::function runner) +{ + std::vector threads; + for (int64_t i = 0; i < num_threads; i++) { + threads.emplace_back([i, &runner]() { runner(i); }); + } + for (auto &t : threads) { + t.join(); + } +} + +template +void run_counter_increment_by(::dsn::counter_ptr &my_metric, + int64_t base_value, + int64_t num_operations, + int64_t num_threads, + int64_t &result) +{ + std::vector deltas; + int64_t n = num_operations * num_threads; + deltas.reserve(n); + + int64_t expected_value = base_value; + for (int64_t i = 0; i < n; ++i) { + auto delta = static_cast(dsn::rand::next_u64(1000000)); + if (delta % 3 == 0) { + delta = -delta; + } + expected_value += delta; + deltas.push_back(delta); + } + + execute(num_threads, [num_operations, &my_metric, &deltas](int tid) mutable { + for (int64_t i = 0; i < num_operations; ++i) { + my_metric->increment_by(deltas[tid * num_operations + i]); + } + }); + ASSERT_EQ(my_metric->value(), expected_value); + result = expected_value; +} + +template +void run_counter_increment(::dsn::counter_ptr &my_metric, + int64_t base_value, + int64_t num_operations, + int64_t num_threads, + int64_t &result) +{ + execute(num_threads, [num_operations, &my_metric](int) mutable { + for (int64_t i = 0; i < num_operations; ++i) { + my_metric->increment(); + } + }); + + int64_t expected_value = base_value + num_operations * num_threads; + ASSERT_EQ(my_metric->value(), expected_value); + result = expected_value; +} + +template +void run_counter_decrement(::dsn::counter_ptr &my_metric, + int64_t base_value, + int64_t num_operations, + int64_t num_threads, + int64_t &result) +{ + execute(num_threads, [num_operations, &my_metric](int) mutable { + for (int64_t i = 0; i < num_operations; ++i) { + my_metric->decrement(); + } + }); + + int64_t expected_value = base_value - num_operations * num_threads; + ASSERT_EQ(my_metric->value(), expected_value); + result = expected_value; +} + +template +void run_counter_cases(::dsn::counter_prototype *prototype, int64_t num_threads) +{ + // Test cases: + // - test the counter with small-scale computations + // - test the counter with large-scale computations + struct test_case + { + std::string entity_id; + int64_t increments_by; + int64_t increments; + int64_t decrements; + } tests[] = {{"server_9", 100, 1000, 1000}, {"server_10", 1000000, 10000000, 10000000}}; + + for (const auto &test : tests) { + auto my_server_entity = METRIC_ENTITY_my_server.instantiate(test.entity_id); + + auto my_metric = prototype->instantiate(my_server_entity); + + int64_t value = 0; + ASSERT_EQ(my_metric->value(), value); + run_counter_increment_by(my_metric, value, test.increments_by, num_threads, value); + run_counter_increment(my_metric, value, test.increments, num_threads, value); + run_counter_decrement(my_metric, value, test.decrements, num_threads, value); + + my_metric->reset(); + ASSERT_EQ(my_metric->value(), 0); + + auto metrics = my_server_entity->metrics(); + ASSERT_EQ(metrics[prototype].get(), static_cast(my_metric.get())); + + ASSERT_EQ(my_metric->prototype(), prototype); + } +} + +template +void run_counter_cases(::dsn::counter_prototype *prototype) +{ + // Do single-threaded tests + run_counter_cases(prototype, 1); + + // Do multi-threaded tests + run_counter_cases(prototype, 4); +} + +TEST(metrics_test, counter) +{ + // Test both kinds of counter + run_counter_cases(&METRIC_test_counter); + run_counter_cases(&METRIC_test_concurrent_counter); +} + } // namespace dsn