Skip to content

Commit

Permalink
add metric for workloadgroup
Browse files Browse the repository at this point in the history
  • Loading branch information
wangbo committed Nov 29, 2024
1 parent cf0b4c9 commit cb0b237
Show file tree
Hide file tree
Showing 8 changed files with 199 additions and 78 deletions.
2 changes: 2 additions & 0 deletions be/src/common/daemon.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -396,6 +396,8 @@ void Daemon::calculate_metrics_thread() {
std::map<std::string, int64_t> lst_net_receive_bytes;

do {
ExecEnv::GetInstance()->workload_group_mgr()->refresh_workload_group_metrics();

DorisMetrics::instance()->metric_registry()->trigger_all_hooks(true);

if (last_ts == -1L) {
Expand Down
72 changes: 40 additions & 32 deletions be/src/runtime/thread_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -119,38 +119,40 @@
__VA_ARGS__; \
} while (0)

#define LIMIT_LOCAL_SCAN_IO(data_dir, bytes_read) \
std::shared_ptr<IOThrottle> iot = nullptr; \
auto* t_ctx = doris::thread_context(true); \
if (t_ctx) { \
iot = t_ctx->get_local_scan_io_throttle(data_dir); \
} \
if (iot) { \
iot->acquire(-1); \
} \
Defer defer { \
[&]() { \
if (iot) { \
iot->update_next_io_time(*bytes_read); \
t_ctx->update_total_local_scan_io_adder(*bytes_read); \
} \
} \
#define LIMIT_LOCAL_SCAN_IO(data_dir, bytes_read) \
std::shared_ptr<IOThrottle> iot = nullptr; \
auto* t_ctx = doris::thread_context(true); \
if (t_ctx) { \
iot = t_ctx->get_local_scan_io_throttle(data_dir); \
} \
if (iot) { \
iot->acquire(-1); \
} \
Defer defer { \
[&]() { \
if (iot) { \
iot->update_next_io_time(*bytes_read); \
t_ctx->update_total_local_scan_io(data_dir, *bytes_read); \
} \
} \
}

#define LIMIT_REMOTE_SCAN_IO(bytes_read) \
std::shared_ptr<IOThrottle> iot = nullptr; \
if (auto* t_ctx = doris::thread_context(true)) { \
iot = t_ctx->get_remote_scan_io_throttle(); \
} \
if (iot) { \
iot->acquire(-1); \
} \
Defer defer { \
[&]() { \
if (iot) { \
iot->update_next_io_time(*bytes_read); \
} \
} \
#define LIMIT_REMOTE_SCAN_IO(bytes_read) \
std::shared_ptr<IOThrottle> iot = nullptr; \
auto* t_ctx = doris::thread_context(true); \
if (t_ctx) { \
iot = t_ctx->get_remote_scan_io_throttle(); \
} \
if (iot) { \
iot->acquire(-1); \
} \
Defer defer { \
[&]() { \
if (iot) { \
iot->update_next_io_time(*bytes_read); \
t_ctx->update_total_remote_scan_io(*bytes_read); \
} \
} \
}

namespace doris {
Expand Down Expand Up @@ -282,9 +284,15 @@ class ThreadContext {
return nullptr;
}

void update_total_local_scan_io_adder(size_t bytes_read) {
void update_total_local_scan_io(std::string path, size_t bytes_read) {
if (std::shared_ptr<WorkloadGroup> wg_ptr = _wg_wptr.lock()) {
wg_ptr->update_total_local_scan_io_adder(bytes_read);
wg_ptr->update_total_local_scan_io(path, bytes_read);
}
}

void update_total_remote_scan_io(size_t bytes_read) {
if (std::shared_ptr<WorkloadGroup> wg_ptr = _wg_wptr.lock()) {
wg_ptr->update_total_remote_scan_io(bytes_read);
}
}

Expand Down
108 changes: 92 additions & 16 deletions be/src/runtime/workload_group/workload_group.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
#include "runtime/memory/mem_tracker_limiter.h"
#include "runtime/workload_management/io_throttle.h"
#include "util/mem_info.h"
#include "util/metrics.h"
#include "util/parse_util.h"
#include "util/runtime_profile.h"
#include "util/threadpool.h"
Expand Down Expand Up @@ -71,18 +72,76 @@ WorkloadGroup::WorkloadGroup(const WorkloadGroupInfo& tg_info, bool need_create_
_need_create_query_thread_pool(need_create_query_thread_pool) {
std::vector<DataDirInfo>& data_dir_list = io::BeConfDataDirReader::be_config_data_dir_list;
for (const auto& data_dir : data_dir_list) {
_scan_io_throttle_map[data_dir.path] =
std::make_shared<IOThrottle>(_name, data_dir.bvar_name + "_read_bytes");
_scan_io_throttle_map[data_dir.path] = std::make_shared<IOThrottle>(data_dir.bvar_name);
}
_remote_scan_io_throttle = std::make_shared<IOThrottle>();

register_metrics();
}

WorkloadGroup::~WorkloadGroup() {
DorisMetrics::instance()->metric_registry()->deregister_entity(_wg_metrics->_entity);
}

void WorkloadGroup::refresh_metrics() {
// cpu
uint64_t _current_cpu_time_nanos = _cpu_time_nanos.load();
uint64_t _cpu_time_sec = _current_cpu_time_nanos / (1000L * 1000L * 1000L);
_wg_metrics->_cpu_time_counter->set_value(_cpu_time_sec);
_per_sec_cpu_time_nanos = (_current_cpu_time_nanos - _last_cpu_time_nanos) / 15;
_last_cpu_time_nanos = _current_cpu_time_nanos;

// local scan
int64_t current_local_scan_bytes = _wg_metrics->_local_scan_bytes_counter->value();
_per_sec_local_scan_bytes = (current_local_scan_bytes - _last_local_scan_bytes) / 15;
_last_local_scan_bytes = current_local_scan_bytes;

// remote scan
int64_t current_remote_scan_bytes = _wg_metrics->_remote_scan_bytes_counter->value();
_per_sec_remote_scan_bytes = (current_remote_scan_bytes - _last_remote_scan_bytes) / 15;
_last_remote_scan_bytes = current_remote_scan_bytes;
}

void WorkloadGroup::register_metrics() {
_wg_metrics = std::make_unique<WorkloadGroupMetrics>();
_wg_metrics->_entity = DorisMetrics::instance()->metric_registry()->register_entity(
"workload_group." + _name, {{"name", _name}});

_wg_metrics->_cpu_time_metric = std::make_unique<doris::MetricPrototype>(
doris::MetricType::COUNTER, doris::MetricUnit::SECONDS, "workload_group_cpu_time");
_wg_metrics->_cpu_time_counter =
(IntAtomicCounter*)(_wg_metrics->_entity->register_metric<IntAtomicCounter>(
_wg_metrics->_cpu_time_metric.get()));

_wg_metrics->_mem_used_bytes_metric = std::make_unique<doris::MetricPrototype>(
doris::MetricType::COUNTER, doris::MetricUnit::BYTES, "workload_group_mem_used");
_wg_metrics->_mem_used_bytes_counter =
(IntAtomicCounter*)(_wg_metrics->_entity->register_metric<IntAtomicCounter>(
_wg_metrics->_mem_used_bytes_metric.get()));

_wg_metrics->_local_scan_bytes_metric = std::make_unique<doris::MetricPrototype>(
doris::MetricType::COUNTER, doris::MetricUnit::BYTES,
"workload_group_local_scan_bytes");
_wg_metrics->_local_scan_bytes_counter =
(IntAtomicCounter*)(_wg_metrics->_entity->register_metric<IntAtomicCounter>(
_wg_metrics->_local_scan_bytes_metric.get()));

_wg_metrics->_remote_scan_bytes_metric = std::make_unique<doris::MetricPrototype>(
doris::MetricType::COUNTER, doris::MetricUnit::BYTES,
"workload_group_remote_scan_bytes");
_wg_metrics->_remote_scan_bytes_counter =
(IntAtomicCounter*)(_wg_metrics->_entity->register_metric<IntAtomicCounter>(
_wg_metrics->_remote_scan_bytes_metric.get()));

for (const auto& [key, io_throttle] : _scan_io_throttle_map) {
std::unique_ptr<doris::MetricPrototype> metric = std::make_unique<doris::MetricPrototype>(
doris::MetricType::COUNTER, doris::MetricUnit::BYTES,
"workload_group_local_scan_bytes_" + io_throttle->metric_name());
_wg_metrics->_local_scan_bytes_counter_map[key] =
(IntAtomicCounter*)(_wg_metrics->_entity->register_metric<IntAtomicCounter>(
metric.get()));
_wg_metrics->_local_scan_bytes_metric_map[key] = std::move(metric);
}
_remote_scan_io_throttle = std::make_shared<IOThrottle>(_name, "remote_read_bytes");
_mem_used_status = std::make_unique<bvar::Status<int64_t>>(_name, "memory_used", 0);
_cpu_usage_adder = std::make_unique<bvar::Adder<uint64_t>>(_name, "cpu_usage_adder");
_cpu_usage_per_second = std::make_unique<bvar::PerSecond<bvar::Adder<uint64_t>>>(
_name, "cpu_usage", _cpu_usage_adder.get(), 10);
_total_local_scan_io_adder =
std::make_unique<bvar::Adder<size_t>>(_name, "total_local_read_bytes");
_total_local_scan_io_per_second = std::make_unique<bvar::PerSecond<bvar::Adder<size_t>>>(
_name, "total_local_read_bytes_per_second", _total_local_scan_io_adder.get(), 1);
}

std::string WorkloadGroup::debug_string() const {
Expand Down Expand Up @@ -173,7 +232,7 @@ int64_t WorkloadGroup::make_memory_tracker_snapshots(
// and _total_mem_used already contains all the current reserve memory.
// so after refreshing _total_mem_used, reset _wg_refresh_interval_memory_growth.
_wg_refresh_interval_memory_growth.store(0.0);
_mem_used_status->set_value(used_memory);
_wg_metrics->_mem_used_bytes_counter->set_value(used_memory);
return used_memory;
}

Expand Down Expand Up @@ -659,15 +718,32 @@ std::shared_ptr<IOThrottle> WorkloadGroup::get_remote_scan_io_throttle() {
}

void WorkloadGroup::update_cpu_adder(int64_t delta_cpu_time) {
(*_cpu_usage_adder) << (uint64_t)delta_cpu_time;
_cpu_time_nanos += delta_cpu_time;
}

void WorkloadGroup::update_total_local_scan_io(std::string path, size_t scan_bytes) {
_wg_metrics->_local_scan_bytes_counter->increment((uint64_t)scan_bytes);
_wg_metrics->_local_scan_bytes_counter_map[path]->increment((int64_t)scan_bytes);
}

void WorkloadGroup::update_total_remote_scan_io(size_t scan_bytes) {
_wg_metrics->_remote_scan_bytes_counter->increment((int64_t)scan_bytes);
}

void WorkloadGroup::update_total_local_scan_io_adder(size_t scan_bytes) {
(*_total_local_scan_io_adder) << scan_bytes;
int64_t WorkloadGroup::get_mem_used() {
return _wg_metrics->_mem_used_bytes_counter->value();
}

uint64_t WorkloadGroup::get_cpu_time_nanos_per_second() {
return _per_sec_cpu_time_nanos;
}

int64_t WorkloadGroup::get_remote_scan_bytes_per_second() {
return _remote_scan_io_throttle->get_bvar_io_per_second();
return _per_sec_remote_scan_bytes;
}

int64_t WorkloadGroup::get_local_scan_bytes_per_second() {
return _per_sec_local_scan_bytes;
}

void WorkloadGroup::try_stop_schedulers() {
Expand Down
61 changes: 49 additions & 12 deletions be/src/runtime/workload_group/workload_group.h
Original file line number Diff line number Diff line change
Expand Up @@ -51,15 +51,42 @@ namespace pipeline {
class TaskScheduler;
} // namespace pipeline

template <typename T>
class AtomicCounter;
using IntAtomicCounter = AtomicCounter<int64_t>;
class MetricEntity;
struct MetricPrototype;

struct WorkloadGroupMetrics {
std::unique_ptr<doris::MetricPrototype> _cpu_time_metric {nullptr};
std::unique_ptr<doris::MetricPrototype> _mem_used_bytes_metric {nullptr};
std::unique_ptr<doris::MetricPrototype> _local_scan_bytes_metric {nullptr};
std::unique_ptr<doris::MetricPrototype> _remote_scan_bytes_metric {nullptr};
// NOTE: _local_scan_bytes_metric is sum of all disk's IO
// _local_disk_io_metric is every disk's IO
std::map<std::string, std::unique_ptr<doris::MetricPrototype>> _local_scan_bytes_metric_map;

IntAtomicCounter* _cpu_time_counter {nullptr};
IntAtomicCounter* _mem_used_bytes_counter {nullptr};
IntAtomicCounter* _local_scan_bytes_counter {nullptr};
IntAtomicCounter* _remote_scan_bytes_counter {nullptr};
std::map<std::string, IntAtomicCounter*> _local_scan_bytes_counter_map;

std::shared_ptr<MetricEntity> _entity {nullptr};
};

class WorkloadGroup;
struct WorkloadGroupInfo;
struct TrackerLimiterGroup;

class WorkloadGroup : public std::enable_shared_from_this<WorkloadGroup> {
public:
explicit WorkloadGroup(const WorkloadGroupInfo& tg_info);

explicit WorkloadGroup(const WorkloadGroupInfo& tg_info, bool need_create_query_thread_pool);

~WorkloadGroup();

int64_t version() const { return _version; }

uint64_t cpu_share() const { return _cpu_share.load(); }
Expand Down Expand Up @@ -191,13 +218,13 @@ class WorkloadGroup : public std::enable_shared_from_this<WorkloadGroup> {

void update_cpu_adder(int64_t delta_cpu_time);

void update_total_local_scan_io_adder(size_t scan_bytes);
void update_total_local_scan_io(std::string path, size_t scan_bytes);

int64_t get_mem_used() { return _mem_used_status->get_value(); }
uint64_t get_cpu_usage() { return _cpu_usage_per_second->get_value(); }
int64_t get_local_scan_bytes_per_second() {
return _total_local_scan_io_per_second->get_value();
}
void update_total_remote_scan_io(size_t scan_bytes);

int64_t get_mem_used();
uint64_t get_cpu_time_nanos_per_second();
int64_t get_local_scan_bytes_per_second();
int64_t get_remote_scan_bytes_per_second();

ThreadPool* get_memtable_flush_pool_ptr() {
Expand All @@ -209,12 +236,16 @@ class WorkloadGroup : public std::enable_shared_from_this<WorkloadGroup> {

std::weak_ptr<CgroupCpuCtl> get_cgroup_cpu_ctl_wptr();

void refresh_metrics();

private:
void create_cgroup_cpu_ctl_no_lock();
void upsert_cgroup_cpu_ctl_no_lock(WorkloadGroupInfo* wg_info);
void upsert_thread_pool_no_lock(WorkloadGroupInfo* wg_info,
std::shared_ptr<CgroupCpuCtl> cg_cpu_ctl_ptr);

void register_metrics();

mutable std::shared_mutex _mutex; // lock _name, _version, _cpu_share, _memory_limit
const uint64_t _id;
std::string _name;
Expand Down Expand Up @@ -260,12 +291,18 @@ class WorkloadGroup : public std::enable_shared_from_this<WorkloadGroup> {
// for some background workload, it doesn't need to create query thread pool
const bool _need_create_query_thread_pool;

// bvar metric
std::unique_ptr<bvar::Status<int64_t>> _mem_used_status;
std::unique_ptr<bvar::Adder<uint64_t>> _cpu_usage_adder;
std::unique_ptr<bvar::PerSecond<bvar::Adder<uint64_t>>> _cpu_usage_per_second;
std::unique_ptr<bvar::Adder<size_t>> _total_local_scan_io_adder;
std::unique_ptr<bvar::PerSecond<bvar::Adder<size_t>>> _total_local_scan_io_per_second;
// workload group metrics
std::atomic<uint64_t> _cpu_time_nanos {0};
std::atomic<uint64_t> _last_cpu_time_nanos {0};
std::atomic<uint64_t> _per_sec_cpu_time_nanos {0};

std::atomic<uint64_t> _per_sec_local_scan_bytes {0};
std::atomic<uint64_t> _last_local_scan_bytes {0};

std::atomic<uint64_t> _per_sec_remote_scan_bytes {0};
std::atomic<uint64_t> _last_remote_scan_bytes {0};

std::unique_ptr<WorkloadGroupMetrics> _wg_metrics {nullptr};
};

using WorkloadGroupPtr = std::shared_ptr<WorkloadGroup>;
Expand Down
11 changes: 9 additions & 2 deletions be/src/runtime/workload_group/workload_group_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -289,8 +289,8 @@ void WorkloadGroupMgr::get_wg_resource_usage(vectorized::Block* block) {
SchemaScannerHelper::insert_int64_value(1, wg->id(), block);
SchemaScannerHelper::insert_int64_value(2, wg->get_mem_used(), block);

double cpu_usage_p =
(double)wg->get_cpu_usage() / (double)total_cpu_time_ns_per_second * 100;
double cpu_usage_p = (double)wg->get_cpu_time_nanos_per_second() /
(double)total_cpu_time_ns_per_second * 100;
cpu_usage_p = std::round(cpu_usage_p * 100.0) / 100.0;

SchemaScannerHelper::insert_double_value(3, cpu_usage_p, block);
Expand All @@ -300,6 +300,13 @@ void WorkloadGroupMgr::get_wg_resource_usage(vectorized::Block* block) {
}
}

void WorkloadGroupMgr::refresh_workload_group_metrics() {
std::shared_lock<std::shared_mutex> r_lock(_group_mutex);
for (const auto& [id, wg] : _workload_groups) {
wg->refresh_metrics();
}
}

void WorkloadGroupMgr::stop() {
for (auto iter = _workload_groups.begin(); iter != _workload_groups.end(); iter++) {
iter->second->try_stop_schedulers();
Expand Down
2 changes: 2 additions & 0 deletions be/src/runtime/workload_group/workload_group_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,8 @@ class WorkloadGroupMgr {
return _workload_groups[INTERNAL_WORKLOAD_GROUP_ID];
}

void refresh_workload_group_metrics();

private:
std::shared_mutex _group_mutex;
std::unordered_map<uint64_t, WorkloadGroupPtr> _workload_groups;
Expand Down
Loading

0 comments on commit cb0b237

Please sign in to comment.