Skip to content

Commit

Permalink
[Improvement]Add more load cpu usage to workload group (#42053)
Browse files Browse the repository at this point in the history
## Proposed changes
Add more workload cpu usage to workload group.
1  AsyncResultWriter's cpu usage.
2 Memtable flush's cpu usage when memtable is not on sink side.
  • Loading branch information
wangbo authored Oct 23, 2024
1 parent cd5f9d7 commit 63c5625
Show file tree
Hide file tree
Showing 9 changed files with 58 additions and 42 deletions.
7 changes: 6 additions & 1 deletion be/src/olap/delta_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -103,10 +103,15 @@ Status BaseDeltaWriter::init() {
if (_is_init) {
return Status::OK();
}
auto* t_ctx = doris::thread_context(true);
std::shared_ptr<WorkloadGroup> wg_sptr = nullptr;
if (t_ctx) {
wg_sptr = t_ctx->workload_group().lock();
}
RETURN_IF_ERROR(_rowset_builder->init());
RETURN_IF_ERROR(_memtable_writer->init(
_rowset_builder->rowset_writer(), _rowset_builder->tablet_schema(),
_rowset_builder->get_partial_update_info(), nullptr,
_rowset_builder->get_partial_update_info(), wg_sptr,
_rowset_builder->tablet()->enable_unique_key_merge_on_write()));
ExecEnv::GetInstance()->memtable_memory_limiter()->register_writer(_memtable_writer);
_is_init = true;
Expand Down
7 changes: 3 additions & 4 deletions be/src/olap/delta_writer_v2.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -127,13 +127,12 @@ Status DeltaWriterV2::init() {

_rowset_writer = std::make_shared<BetaRowsetWriterV2>(_streams);
RETURN_IF_ERROR(_rowset_writer->init(context));
ThreadPool* wg_thread_pool_ptr = nullptr;
std::shared_ptr<WorkloadGroup> wg_sptr = nullptr;
if (_state->get_query_ctx()) {
wg_thread_pool_ptr = _state->get_query_ctx()->get_memtable_flush_pool();
wg_sptr = _state->get_query_ctx()->workload_group();
}
RETURN_IF_ERROR(_memtable_writer->init(_rowset_writer, _tablet_schema, _partial_update_info,
wg_thread_pool_ptr,
_streams[0]->enable_unique_mow(_req.index_id)));
wg_sptr, _streams[0]->enable_unique_mow(_req.index_id)));
ExecEnv::GetInstance()->memtable_memory_limiter()->register_writer(_memtable_writer);
_is_init = true;
_streams.clear();
Expand Down
28 changes: 13 additions & 15 deletions be/src/olap/memtable_flush_executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,16 @@ Status FlushToken::submit(std::shared_ptr<MemTable> mem_table) {
int64_t submit_task_time = MonotonicNanos();
auto task = MemtableFlushTask::create_shared(
shared_from_this(), mem_table, _rowset_writer->allocate_segment_id(), submit_task_time);
Status ret = _thread_pool->submit(std::move(task));
// NOTE: we should guarantee WorkloadGroup is not deconstructed when submit memtable flush task.
// because currently WorkloadGroup's can only be destroyed when all queries in the group is finished,
// but not consider whether load channel is finish.
std::shared_ptr<WorkloadGroup> wg_sptr = _wg_wptr.lock();
ThreadPool* wg_thread_pool = nullptr;
if (wg_sptr) {
wg_thread_pool = wg_sptr->get_memtable_flush_pool_ptr();
}
Status ret = wg_thread_pool ? wg_thread_pool->submit(std::move(task))
: _thread_pool->submit(std::move(task));
if (ret.ok()) {
// _wait_running_task_finish was executed after this function, so no need to notify _cond here
_stats.flush_running_count++;
Expand Down Expand Up @@ -236,15 +245,16 @@ void MemTableFlushExecutor::init(int num_disk) {
// NOTE: we use SERIAL mode here to ensure all mem-tables from one tablet are flushed in order.
Status MemTableFlushExecutor::create_flush_token(std::shared_ptr<FlushToken>& flush_token,
std::shared_ptr<RowsetWriter> rowset_writer,
bool is_high_priority) {
bool is_high_priority,
std::shared_ptr<WorkloadGroup> wg_sptr) {
switch (rowset_writer->type()) {
case ALPHA_ROWSET:
// alpha rowset do not support flush in CONCURRENT. and not support alpha rowset now.
return Status::InternalError<false>("not support alpha rowset load now.");
case BETA_ROWSET: {
// beta rowset can be flush in CONCURRENT, because each memtable using a new segment writer.
ThreadPool* pool = is_high_priority ? _high_prio_flush_pool.get() : _flush_pool.get();
flush_token = FlushToken::create_shared(pool);
flush_token = FlushToken::create_shared(pool, wg_sptr);
flush_token->set_rowset_writer(rowset_writer);
return Status::OK();
}
Expand All @@ -253,18 +263,6 @@ Status MemTableFlushExecutor::create_flush_token(std::shared_ptr<FlushToken>& fl
}
}

Status MemTableFlushExecutor::create_flush_token(std::shared_ptr<FlushToken>& flush_token,
std::shared_ptr<RowsetWriter> rowset_writer,
ThreadPool* wg_flush_pool_ptr) {
if (rowset_writer->type() == BETA_ROWSET) {
flush_token = FlushToken::create_shared(wg_flush_pool_ptr);
} else {
return Status::InternalError<false>("not support alpha rowset load now.");
}
flush_token->set_rowset_writer(rowset_writer);
return Status::OK();
}

void MemTableFlushExecutor::_register_metrics() {
REGISTER_HOOK_METRIC(flush_thread_pool_queue_size,
[this]() { return _flush_pool->get_queue_size(); });
Expand Down
13 changes: 7 additions & 6 deletions be/src/olap/memtable_flush_executor.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ namespace doris {
class DataDir;
class MemTable;
class RowsetWriter;
class WorkloadGroup;

// the statistic of a certain flush handler.
// use atomic because it may be updated by multi threads
Expand All @@ -59,7 +60,8 @@ class FlushToken : public std::enable_shared_from_this<FlushToken> {
ENABLE_FACTORY_CREATOR(FlushToken);

public:
FlushToken(ThreadPool* thread_pool) : _flush_status(Status::OK()), _thread_pool(thread_pool) {}
FlushToken(ThreadPool* thread_pool, std::shared_ptr<WorkloadGroup> wg_sptr)
: _flush_status(Status::OK()), _thread_pool(thread_pool), _wg_wptr(wg_sptr) {}

Status submit(std::shared_ptr<MemTable> mem_table);

Expand Down Expand Up @@ -108,6 +110,8 @@ class FlushToken : public std::enable_shared_from_this<FlushToken> {

std::mutex _mutex;
std::condition_variable _cond;

std::weak_ptr<WorkloadGroup> _wg_wptr;
};

// MemTableFlushExecutor is responsible for flushing memtables to disk.
Expand All @@ -133,11 +137,8 @@ class MemTableFlushExecutor {
void init(int num_disk);

Status create_flush_token(std::shared_ptr<FlushToken>& flush_token,
std::shared_ptr<RowsetWriter> rowset_writer, bool is_high_priority);

Status create_flush_token(std::shared_ptr<FlushToken>& flush_token,
std::shared_ptr<RowsetWriter> rowset_writer,
ThreadPool* wg_flush_pool_ptr);
std::shared_ptr<RowsetWriter> rowset_writer, bool is_high_priority,
std::shared_ptr<WorkloadGroup> wg_sptr);

private:
void _register_metrics();
Expand Down
18 changes: 4 additions & 14 deletions be/src/olap/memtable_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ MemTableWriter::~MemTableWriter() {
Status MemTableWriter::init(std::shared_ptr<RowsetWriter> rowset_writer,
TabletSchemaSPtr tablet_schema,
std::shared_ptr<PartialUpdateInfo> partial_update_info,
ThreadPool* wg_flush_pool_ptr, bool unique_key_mow) {
std::shared_ptr<WorkloadGroup> wg_sptr, bool unique_key_mow) {
_rowset_writer = rowset_writer;
_tablet_schema = tablet_schema;
_unique_key_mow = unique_key_mow;
Expand All @@ -77,19 +77,9 @@ Status MemTableWriter::init(std::shared_ptr<RowsetWriter> rowset_writer,
// create flush handler
// by assigning segment_id to memtable before submiting to flush executor,
// we can make sure same keys sort in the same order in all replicas.
if (wg_flush_pool_ptr) {
RETURN_IF_ERROR(
ExecEnv::GetInstance()
->storage_engine()
.memtable_flush_executor()
->create_flush_token(_flush_token, _rowset_writer, wg_flush_pool_ptr));
} else {
RETURN_IF_ERROR(
ExecEnv::GetInstance()
->storage_engine()
.memtable_flush_executor()
->create_flush_token(_flush_token, _rowset_writer, _req.is_high_priority));
}
RETURN_IF_ERROR(
ExecEnv::GetInstance()->storage_engine().memtable_flush_executor()->create_flush_token(
_flush_token, _rowset_writer, _req.is_high_priority, wg_sptr));

_is_init = true;
return Status::OK();
Expand Down
3 changes: 2 additions & 1 deletion be/src/olap/memtable_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ class SlotDescriptor;
class OlapTableSchemaParam;
class RowsetWriter;
struct FlushStatistic;
class WorkloadGroup;

namespace vectorized {
class Block;
Expand All @@ -67,7 +68,7 @@ class MemTableWriter {

Status init(std::shared_ptr<RowsetWriter> rowset_writer, TabletSchemaSPtr tablet_schema,
std::shared_ptr<PartialUpdateInfo> partial_update_info,
ThreadPool* wg_flush_pool_ptr, bool unique_key_mow = false);
std::shared_ptr<WorkloadGroup> wg_sptr, bool unique_key_mow = false);

Status write(const vectorized::Block* block, const std::vector<uint32_t>& row_idxs);

Expand Down
2 changes: 1 addition & 1 deletion be/src/runtime/query_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,7 @@ class QueryContext {
// And will be shared by all instances of this query.
// So that we can control the max thread that a query can be used to execute.
// If this token is not set, the scanner will be executed in "_scan_thread_pool" in exec env.
std::unique_ptr<ThreadPoolToken> _thread_token;
std::unique_ptr<ThreadPoolToken> _thread_token {nullptr};

void _init_query_mem_tracker();

Expand Down
11 changes: 11 additions & 0 deletions be/src/runtime/workload_group/workload_group.h
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,17 @@ class WorkloadGroup : public std::enable_shared_from_this<WorkloadGroup> {
}
int64_t get_remote_scan_bytes_per_second();

CgroupCpuCtl* get_cgroup_cpu_ctl_ptr() {
std::shared_lock<std::shared_mutex> rlock(_task_sched_lock);
return _cgroup_cpu_ctl.get();
}

ThreadPool* get_memtable_flush_pool_ptr() {
// no lock here because this is called by memtable flush,
// to avoid lock competition with the workload thread pool's update
return _memtable_flush_pool.get();
}

private:
mutable std::shared_mutex _mutex; // lock _name, _version, _cpu_share, _memory_limit
const uint64_t _id;
Expand Down
11 changes: 11 additions & 0 deletions be/src/vec/sink/writer/async_result_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,17 @@ void AsyncResultWriter::process_block(RuntimeState* state, RuntimeProfile* profi
force_close(status);
}

if (state && state->get_query_ctx()) {
WorkloadGroupPtr wg_ptr = state->get_query_ctx()->workload_group();
if (wg_ptr && wg_ptr->get_cgroup_cpu_ctl_ptr()) {
Status ret = wg_ptr->get_cgroup_cpu_ctl_ptr()->add_thread_to_cgroup();
if (ret.ok()) {
std::string wg_tname = "asyc_wr_" + wg_ptr->name();
Thread::set_self_name(wg_tname);
}
}
}

DCHECK(_dependency);
if (_writer_status.ok()) {
while (true) {
Expand Down

0 comments on commit 63c5625

Please sign in to comment.