Skip to content

Commit

Permalink
Merge pull request redpanda-data#12067 from graphcareful/usage-fixes
Browse files Browse the repository at this point in the history
Align usage metrics to the configured interval
  • Loading branch information
Rob Blafford authored Aug 3, 2023
2 parents d88fada + 3311ee7 commit e9517de
Show file tree
Hide file tree
Showing 13 changed files with 1,059 additions and 468 deletions.
43 changes: 23 additions & 20 deletions src/v/cluster/health_monitor_backend.cc
Original file line number Diff line number Diff line change
Expand Up @@ -559,26 +559,6 @@ result<node_health_report> health_monitor_backend::process_node_reply(
return res;
}

ss::future<> health_monitor_backend::maybe_refresh_cloud_health_stats() {
auto holder = _gate.hold();
auto units = co_await _refresh_mutex.get_units();
auto leader_id = _raft0->get_leader_id();
if (!leader_id || leader_id != _raft0->self().id()) {
co_return;
}
vlog(clusterlog.debug, "collecting cloud health statistics");

cluster::cloud_storage_size_reducer reducer(
_topic_table,
_members,
_partition_leaders_table,
_connections,
topic_table_partition_generator::default_batch_size,
cloud_storage_size_reducer::default_retries_allowed);

_bytes_in_cloud_storage = co_await reducer.reduce();
}

ss::future<std::error_code> health_monitor_backend::collect_cluster_health() {
/**
* We are collecting cluster health on raft 0 leader only
Expand Down Expand Up @@ -631,6 +611,29 @@ ss::future<std::error_code> health_monitor_backend::collect_cluster_health() {
}
}
_reports_disk_health = cluster_disk_health;

if (config::shard_local_cfg().enable_usage()) {
vlog(clusterlog.info, "collecting cloud health statistics");

cluster::cloud_storage_size_reducer reducer(
_topic_table,
_members,
_partition_leaders_table,
_connections,
topic_table_partition_generator::default_batch_size,
cloud_storage_size_reducer::default_retries_allowed);

try {
/// TODO: https://github.com/redpanda-data/redpanda/issues/12515
/// Eventually move the cloud storage size metrics into the node
/// health report which will reduce the number of redundent RPCs
/// needed to be made
_bytes_in_cloud_storage = co_await reducer.reduce();
} catch (const std::exception& ex) {
// All exceptions are already logged by this class, in this case
}
}

_last_refresh = ss::lowres_clock::now();
co_return errc::success;
}
Expand Down
2 changes: 0 additions & 2 deletions src/v/cluster/health_monitor_backend.h
Original file line number Diff line number Diff line change
Expand Up @@ -82,8 +82,6 @@ class health_monitor_backend {

bool does_raft0_have_leader();

ss::future<> maybe_refresh_cloud_health_stats();

private:
/**
* Struct used to track pending refresh request, it gives ability
Expand Down
6 changes: 0 additions & 6 deletions src/v/cluster/health_monitor_frontend.cc
Original file line number Diff line number Diff line change
Expand Up @@ -50,12 +50,6 @@ health_monitor_frontend::get_cluster_health(
});
}

ss::future<> health_monitor_frontend::maybe_refresh_cloud_health_stats() {
return dispatch_to_backend([](health_monitor_backend& be) {
return be.maybe_refresh_cloud_health_stats();
});
}

storage::disk_space_alert health_monitor_frontend::get_cluster_disk_health() {
return _cluster_disk_health;
}
Expand Down
7 changes: 0 additions & 7 deletions src/v/cluster/health_monitor_frontend.h
Original file line number Diff line number Diff line change
Expand Up @@ -85,13 +85,6 @@ class health_monitor_frontend

ss::future<bool> does_raft0_have_leader();

/**
* All health metadata is refreshed automatically via the
* health_monitor_backend on a timer. The cloud storage stats are an
* exception, this method is for external events to trigger this refresh.
*/
ss::future<> maybe_refresh_cloud_health_stats();

private:
template<typename Func>
auto dispatch_to_backend(Func&& f) {
Expand Down
1 change: 1 addition & 0 deletions src/v/kafka/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ v_cc_library(
server/group.cc
server/group_router.cc
server/group_manager.cc
server/usage_aggregator.cc
server/usage_manager.cc
server/rm_group_frontend.cc
server/connection_context.cc
Expand Down
1 change: 1 addition & 0 deletions src/v/kafka/server/tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ set(srcs
delete_topics_test.cc
offset_fetch_test.cc
api_versions_test.cc
usage_manager_test.cc
create_topics_test.cc
find_coordinator_test.cc
list_offsets_test.cc
Expand Down
198 changes: 198 additions & 0 deletions src/v/kafka/server/tests/usage_manager_test.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,198 @@
// Copyright 2023 Redpanda Data, Inc.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.md
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0

#include "kafka/server/usage_aggregator.h"
#include "storage/tests/kvstore_fixture.h"
#include "vlog.h"

#include <seastar/testing/thread_test_case.hh>
#include <seastar/util/log.hh>

static ss::logger af_logger{"test-accounting-fiber"};

class test_accounting_fiber final
: public kafka::usage_aggregator<ss::manual_clock> {
public:
test_accounting_fiber(
storage::kvstore& kvstore,
size_t usage_num_windows,
std::chrono::seconds usage_window_width_interval,
std::chrono::seconds usage_disk_persistance_interval)
: kafka::usage_aggregator<ss::manual_clock>(
kvstore,
usage_num_windows,
usage_window_width_interval,
usage_disk_persistance_interval) {}

void add_bytes(size_t sent, size_t recv) {
vlog(af_logger.info, "Adding bytes sent: {} recv: {}", sent, recv);
_bytes_sent += sent;
_bytes_recv += recv;
}

template<typename Duration>
ss::future<> advance_clock(Duration d) {
if (_window_closed_promise) {
return ss::make_exception_future<>(
std::logic_error("Already a waiter on data fetching"));
}
_window_closed_promise = ss::promise<>();
auto f = _window_closed_promise->get_future();
ss::manual_clock::advance(d);
return ss::with_timeout(ss::lowres_clock::now() + d, std::move(f))
.handle_exception_type([this, d](const ss::timed_out_error&) {
vlog(
af_logger.info,
"Clock advanced {}s but window didn't close",
d.count());
_window_closed_promise = std::nullopt;
});
}

protected:
ss::future<kafka::usage> close_current_window() final {
vlog(af_logger.info, "Data taken...");
auto u = kafka::usage{
.bytes_sent = _bytes_sent, .bytes_received = _bytes_recv};
_bytes_sent = 0;
_bytes_recv = 0;
co_return u;
}

void window_closed() final {
vlog(af_logger.info, "Window closed...");
if (_window_closed_promise) {
_window_closed_promise->set_value();
_window_closed_promise = std::nullopt;
}
}

private:
std::optional<ss::promise<>> _window_closed_promise;
size_t _bytes_sent{0};
size_t _bytes_recv{0};
};

namespace {
std::vector<kafka::usage>
strip_window_data(const std::vector<kafka::usage_window>& v) {
std::vector<kafka::usage> vv;
std::transform(
v.begin(),
v.end(),
std::back_inserter(vv),
[](const kafka::usage_window& uw) { return uw.u; });
return vv;
}

ss::sstring print_window_data(const std::vector<kafka::usage_window>& v) {
std::stringstream ss;
ss << "\n[\n";
for (const auto& vs : v) {
ss << "\t" << vs.u << ",\n";
}
ss << "]";
return ss.str();
}
} // namespace

FIXTURE_TEST(test_usage, kvstore_test_fixture) {
using namespace std::chrono_literals;
auto kvstore = make_kvstore();
kvstore->start().get();

const auto num_windows = 10;
const auto window_width = 10s;
const auto disk_write_interval = 1s;

auto usage_fiber = std::make_unique<test_accounting_fiber>(
*kvstore, num_windows, window_width, disk_write_interval);
usage_fiber->start().get();

/// Create expected data set
std::vector<kafka::usage> data;
for (auto i = 0; i < num_windows; ++i) {
const uint64_t sent = i * 100;
const uint64_t recv = i * 200;
data.emplace_back(
kafka::usage{.bytes_sent = sent, .bytes_received = recv});
}

/// Publish some data as the windows move across in time, assert observed is
/// as expected
for (const auto& e : data) {
usage_fiber->add_bytes(e.bytes_sent, e.bytes_received);
usage_fiber->advance_clock(window_width).get();
vlog(
af_logger.info,
"Clock advanced: by {}s, data: {}",
window_width.count(),
print_window_data(usage_fiber->get_usage_stats().get()));
}
auto result = strip_window_data(usage_fiber->get_usage_stats().get());
BOOST_CHECK_EQUAL(result, data);

/// Add to open window
usage_fiber->add_bytes(10, 10);
usage_fiber->advance_clock(2s).get();
usage_fiber->add_bytes(10, 10);
usage_fiber->advance_clock(2s).get();
const auto open_windows = usage_fiber->get_usage_stats().get();
vlog(
af_logger.info,
"Clock advanced 4s, data: {}",
print_window_data(open_windows));
const auto open_window = open_windows[0];
BOOST_CHECK_EQUAL(open_window.u.bytes_sent, 20);
BOOST_CHECK_EQUAL(open_window.u.bytes_received, 20);

/// Shut it down, note clean shutdowns persist data to kvstore
usage_fiber->stop().get();
usage_fiber = nullptr;

/// Restart the instance, and verify expected behavior
usage_fiber = std::make_unique<test_accounting_fiber>(
*kvstore, num_windows, window_width, disk_write_interval);
usage_fiber->start().get();

// Ensure open window is consistent
result = strip_window_data(usage_fiber->get_usage_stats().get());
const auto& new_open_window = result[0];
BOOST_CHECK_EQUAL(new_open_window.bytes_sent, 20);
BOOST_CHECK_EQUAL(new_open_window.bytes_received, 20);
BOOST_CHECK_EQUAL(result, data);

usage_fiber->stop().get();
kvstore->stop().get();
}

SEASTAR_THREAD_TEST_CASE(test_round_to_interval_method) {
using namespace std::chrono_literals;
/// Aug 3rd, 2023 4PM GMT
const auto ts = ss::lowres_system_clock::time_point(
std::chrono::seconds(1691078400));
/// 15min, 30min, 1hr, 2hrs
const auto intervals = std::to_array({900s, 1800s, 3600s, 7200s});

for (const auto& ival : intervals) {
/// Underneath the 2min threshold the method returns the input rounded
BOOST_CHECK(ts == kafka::detail::round_to_interval(ival, ts));
BOOST_CHECK(ts == kafka::detail::round_to_interval(ival, ts + 10s));
BOOST_CHECK(ts == kafka::detail::round_to_interval(ival, ts - 10s));
BOOST_CHECK(ts == kafka::detail::round_to_interval(ival, ts + 1min));
BOOST_CHECK(ts == kafka::detail::round_to_interval(ival, ts - 1min));
BOOST_CHECK(ts == kafka::detail::round_to_interval(ival, ts + 2min));
BOOST_CHECK(ts == kafka::detail::round_to_interval(ival, ts - 2min));
/// Past the 2min threshold the method returns the input unmodified
BOOST_CHECK(ts != kafka::detail::round_to_interval(ival, ts + 3min));
BOOST_CHECK(ts != kafka::detail::round_to_interval(ival, ts - 3min));
BOOST_CHECK(ts != kafka::detail::round_to_interval(ival, ts + 10min));
BOOST_CHECK(ts != kafka::detail::round_to_interval(ival, ts - 10min));
}
}
Loading

0 comments on commit e9517de

Please sign in to comment.