Skip to content

Commit

Permalink
refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
xinyiZzz committed Nov 1, 2022
1 parent aae4dbf commit 59900bf
Show file tree
Hide file tree
Showing 74 changed files with 565 additions and 1,015 deletions.
2 changes: 1 addition & 1 deletion be/src/exec/olap_scan_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1535,7 +1535,7 @@ void OlapScanNode::transfer_thread(RuntimeState* state) {
size_t thread_slot_num = 0;
mem_consume = _scanner_mem_tracker->consumption();
// check limit for total memory and _scan_row_batches memory
if (mem_consume < (state->instance_mem_tracker()->limit() * 6) / 10 &&
if (mem_consume < (state->query_mem_tracker()->limit() * 6) / 10 &&
_scan_row_batches_bytes < _max_scanner_queue_size_bytes / 2) {
thread_slot_num = max_thread - assigned_thread_num;
} else {
Expand Down
4 changes: 2 additions & 2 deletions be/src/exec/tablet_sink.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -581,15 +581,15 @@ void NodeChannel::try_send_batch(RuntimeState* state) {
_add_batch_closure->cntl.http_request().set_method(brpc::HTTP_METHOD_POST);
_add_batch_closure->cntl.http_request().set_content_type("application/json");
{
SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(ExecEnv::GetInstance()->bthread_mem_tracker());
SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(ExecEnv::GetInstance()->orphan_mem_tracker());
_brpc_http_stub->tablet_writer_add_batch_by_http(&_add_batch_closure->cntl, NULL,
&_add_batch_closure->result,
_add_batch_closure);
}
} else {
_add_batch_closure->cntl.http_request().Clear();
{
SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(ExecEnv::GetInstance()->bthread_mem_tracker());
SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(ExecEnv::GetInstance()->orphan_mem_tracker());
_stub->tablet_writer_add_batch(&_add_batch_closure->cntl, &request,
&_add_batch_closure->result, _add_batch_closure);
}
Expand Down
4 changes: 2 additions & 2 deletions be/src/exec/tablet_sink.h
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ class ReusableClosure final : public google::protobuf::Closure {
~ReusableClosure() override {
// shouldn't delete when Run() is calling or going to be called, wait for current Run() done.
join();
SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(ExecEnv::GetInstance()->bthread_mem_tracker());
SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(ExecEnv::GetInstance()->orphan_mem_tracker());
cntl.Reset();
}

Expand Down Expand Up @@ -124,7 +124,7 @@ class ReusableClosure final : public google::protobuf::Closure {

// plz follow this order: reset() -> set_in_flight() -> send brpc batch
void reset() {
SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(ExecEnv::GetInstance()->bthread_mem_tracker());
SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(ExecEnv::GetInstance()->orphan_mem_tracker());
cntl.Reset();
cid = cntl.call_id();
}
Expand Down
44 changes: 25 additions & 19 deletions be/src/http/default_path_handlers.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -126,45 +126,51 @@ void mem_tracker_handler(const WebPageHandler::ArgumentMap& args, std::stringstr
" data-search='true' "
" class='table table-striped'>\n";
(*output) << "<thead><tr>"
"<th data-sortable='true'>Level</th>"
"<th data-sortable='true'>Type</th>"
"<th data-sortable='true'>Label</th>"
"<th>Parent</th>"
"<th>Limit</th>"
"<th data-sortable='true' "
">Current Consumption(Bytes)</th>"
"<th>Current Consumption(Normalize)</th>"
"<th data-sortable='true' "
">Peak Consumption(Bytes)</th>"
"<th>Peak Consumption(Normalize)</th>"
"<th data-sortable='true' "
">Child Count</th>"
"</tr></thead>";
(*output) << "<tbody>\n";

size_t upper_level;
size_t cur_level = 1;
// the level equal or lower than upper_level will show in web page
auto iter = args.find("upper_level");
std::vector<MemTracker::Snapshot> snapshots;
auto iter = args.find("type");
if (iter != args.end()) {
upper_level = std::stol(iter->second);
switch (iter->second) {
case "GLOBAL":
return ExecEnv::GetInstance()->process_mem_tracker().get();
case MemTrackerLimiter::Type::ORPHAN:
return ExecEnv::GetInstance()->orphan_mem_tracker_raw();
case MemTrackerLimiter::Type::QUERY:
return ExecEnv::GetInstance()->query_pool_mem_tracker().get();
case MemTrackerLimiter::Type::LOAD_FRAGMENT:
return ExecEnv::GetInstance()->load_pool_mem_tracker().get();
case MemTrackerLimiter::Type::LOAD_CHANNEL:
return ExecEnv::GetInstance()->load_channel_mgr()->mem_tracker();
case MemTrackerLimiter::Type::COMPACTION:
return StorageEngine::instance()->compaction_mem_tracker().get();
default:
DCHECK(false) << ", type: " << type;
}
MemTrackerLimiter::Type Types[] = {"GLOBAL", "QUERY", "LOAD", "COMPACTION", "SCHEMA_CHANGE", "CLONE", "BATCHLOAD", "CONSISTENCY"};
MemTrackerLimiter::make_type_snapshots(&snapshots, MemTrackerLimiter::TypeList[iter->second]);
} else {
upper_level = 3;
MemTrackerLimiter::make_process_snapshots(&snapshots);
}

std::vector<MemTracker::Snapshot> snapshots;
ExecEnv::GetInstance()->process_mem_tracker()->make_snapshot(&snapshots, cur_level,
upper_level);
MemTracker::make_global_mem_tracker_snapshot(&snapshots);
for (const auto& item : snapshots) {
string limit_str = item.limit == -1 ? "none" : AccurateItoaKMGT(item.limit);
string current_consumption_normalize = AccurateItoaKMGT(item.cur_consumption);
string peak_consumption_normalize = AccurateItoaKMGT(item.peak_consumption);
(*output) << strings::Substitute(
"<tr><td>$0</td><td>$1</td><td>$2</td><td>$3</td><td>$4</td><td>$5</td><td>$6</"
"td><td>$7</td><td>$8</td></tr>\n",
item.level, item.label, item.parent, limit_str, item.cur_consumption,
current_consumption_normalize, item.peak_consumption, peak_consumption_normalize,
item.child_count);
"<tr><td>$0</td><td>$1</td><td>$2</td><td>$3</td><td>$4</td><td>$5</td><td>$6</td></tr>\n",
item.type, item.label, limit_str, item.cur_consumption,
current_consumption_normalize, item.peak_consumption, peak_consumption_normalize);
}
(*output) << "</tbody></table>\n";
}
Expand Down
4 changes: 2 additions & 2 deletions be/src/olap/compaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,10 @@ Compaction::Compaction(TabletSharedPtr tablet, const std::string& label)
_state(CompactionState::INITED) {
#ifndef BE_TEST
_mem_tracker = std::make_shared<MemTrackerLimiter>(
-1, label, StorageEngine::instance()->compaction_mem_tracker());
MemTrackerLimiter::Type::COMPACTION, -1, label);
_mem_tracker->enable_reset_zero();
#else
_mem_tracker = std::make_shared<MemTrackerLimiter>(-1, label);
_mem_tracker = std::make_shared<MemTrackerLimiter>(MemTrackerLimiter::Type::ORPHAN, -1, label);
#endif
}

Expand Down
2 changes: 1 addition & 1 deletion be/src/olap/lru_cache.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -436,7 +436,7 @@ ShardedLRUCache::ShardedLRUCache(const std::string& name, size_t total_capacity,
_num_shards(num_shards),
_shards(nullptr),
_last_id(1) {
_mem_tracker = std::make_unique<MemTrackerLimiter>(-1, name);
_mem_tracker = std::make_unique<MemTrackerLimiter>(MemTrackerLimiter::Type::GLOBAL, -1, name);
CHECK(num_shards > 0) << "num_shards cannot be 0";
CHECK_EQ((num_shards & (num_shards - 1)), 0)
<< "num_shards should be power of two, but got " << num_shards;
Expand Down
2 changes: 1 addition & 1 deletion be/src/olap/memtable.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ MemTable::~MemTable() {
_flush_mem_tracker->set_consumption(0);
DCHECK_EQ(_insert_mem_tracker->consumption(), 0)
<< std::endl
<< MemTracker::log_usage(_insert_mem_tracker->make_snapshot(0));
<< MemTracker::log_usage(_insert_mem_tracker->make_snapshot());
DCHECK_EQ(_flush_mem_tracker->consumption(), 0);
}

Expand Down
4 changes: 2 additions & 2 deletions be/src/olap/schema_change.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1596,10 +1596,10 @@ Status VSchemaChangeWithSorting::_inner_process(RowsetReaderSharedPtr rowset_rea
}

RETURN_IF_ERROR(_changer.change_block(ref_block.get(), new_block.get()));
if (!_mem_tracker->check_limit(_memory_limitation, new_block->allocated_bytes())) {
if (_mem_tracker->consumption() + new_block->allocated_bytes() > _memory_limitation) {
RETURN_IF_ERROR(create_rowset());

if (!_mem_tracker->check_limit(_memory_limitation, new_block->allocated_bytes())) {
if (_mem_tracker->consumption() + new_block->allocated_bytes() > _memory_limitation) {
LOG(WARNING) << "Memory limitation is too small for Schema Change."
<< " _memory_limitation=" << _memory_limitation
<< ", new_block->allocated_bytes()=" << new_block->allocated_bytes()
Expand Down
17 changes: 4 additions & 13 deletions be/src/olap/storage_engine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -110,17 +110,8 @@ StorageEngine::StorageEngine(const EngineOptions& options)
_available_storage_medium_type_count(0),
_effective_cluster_id(-1),
_is_all_cluster_id_exist(true),
_compaction_mem_tracker(
std::make_shared<MemTrackerLimiter>(-1, "StorageEngine::AutoCompaction")),
_segment_meta_mem_tracker(std::make_unique<MemTracker>("StorageEngine::SegmentMeta")),
_schema_change_mem_tracker(
std::make_shared<MemTrackerLimiter>(-1, "StorageEngine::SchemaChange")),
_clone_mem_tracker(std::make_shared<MemTrackerLimiter>(-1, "StorageEngine::Clone")),
_batch_load_mem_tracker(
std::make_shared<MemTrackerLimiter>(-1, "StorageEngine::BatchLoad")),
_consistency_mem_tracker(
std::make_shared<MemTrackerLimiter>(-1, "StorageEngine::Consistency")),
_mem_tracker(std::make_shared<MemTrackerLimiter>(-1, "StorageEngine::Self")),
_segment_meta_mem_tracker(std::make_unique<MemTracker>("SegmentMeta")),
_mem_tracker(std::make_unique<MemTracker>("StorageEngine")),
_stop_background_threads_latch(1),
_tablet_manager(new TabletManager(config::tablet_map_shard_size)),
_txn_manager(new TxnManager(config::txn_map_shard_size, config::txn_shard_size)),
Expand Down Expand Up @@ -166,7 +157,7 @@ void StorageEngine::load_data_dirs(const std::vector<DataDir*>& data_dirs) {
std::vector<std::thread> threads;
for (auto data_dir : data_dirs) {
threads.emplace_back([this, data_dir] {
SCOPED_ATTACH_TASK(_mem_tracker, ThreadContext::TaskType::STORAGE);
SCOPED_CONSUME_MEM_TRACKER(_mem_tracker);
auto res = data_dir->load();
if (!res.ok()) {
LOG(WARNING) << "io error when init load tables. res=" << res
Expand Down Expand Up @@ -212,7 +203,7 @@ Status StorageEngine::_init_store_map() {
_tablet_manager.get(), _txn_manager.get());
tmp_stores.emplace_back(store);
threads.emplace_back([this, store, &error_msg_lock, &error_msg]() {
SCOPED_ATTACH_TASK(_mem_tracker, ThreadContext::TaskType::STORAGE);
SCOPED_CONSUME_MEM_TRACKER(_mem_tracker);
auto st = store->init();
if (!st.ok()) {
{
Expand Down
22 changes: 1 addition & 21 deletions be/src/olap/storage_engine.h
Original file line number Diff line number Diff line change
Expand Up @@ -175,16 +175,7 @@ class StorageEngine {

Status get_compaction_status_json(std::string* result);

std::shared_ptr<MemTrackerLimiter> compaction_mem_tracker() { return _compaction_mem_tracker; }
MemTracker* segment_meta_mem_tracker() { return _segment_meta_mem_tracker.get(); }
std::shared_ptr<MemTrackerLimiter> schema_change_mem_tracker() {
return _schema_change_mem_tracker;
}
std::shared_ptr<MemTrackerLimiter> clone_mem_tracker() { return _clone_mem_tracker; }
std::shared_ptr<MemTrackerLimiter> batch_load_mem_tracker() { return _batch_load_mem_tracker; }
std::shared_ptr<MemTrackerLimiter> consistency_mem_tracker() {
return _consistency_mem_tracker;
}

// check cumulative compaction config
void check_cumulative_compaction_config();
Expand Down Expand Up @@ -322,22 +313,11 @@ class StorageEngine {
// map<rowset_id(str), RowsetSharedPtr>, if we use RowsetId as the key, we need custom hash func
std::unordered_map<std::string, RowsetSharedPtr> _unused_rowsets;

// Count the memory consumption of all Base and Cumulative tasks.
std::shared_ptr<MemTrackerLimiter> _compaction_mem_tracker;
// This mem tracker is only for tracking memory use by segment meta data such as footer or index page.
// The memory consumed by querying is tracked in segment iterator.
std::unique_ptr<MemTracker> _segment_meta_mem_tracker;
// Count the memory consumption of all SchemaChange tasks.
std::shared_ptr<MemTrackerLimiter> _schema_change_mem_tracker;
// Count the memory consumption of all EngineCloneTask.
// Note: Memory that does not contain make/release snapshots.
std::shared_ptr<MemTrackerLimiter> _clone_mem_tracker;
// Count the memory consumption of all EngineBatchLoadTask.
std::shared_ptr<MemTrackerLimiter> _batch_load_mem_tracker;
// Count the memory consumption of all EngineChecksumTask.
std::shared_ptr<MemTrackerLimiter> _consistency_mem_tracker;
// StorageEngine oneself
std::shared_ptr<MemTrackerLimiter> _mem_tracker;
std::unique_ptr<MemTracker> _mem_tracker;

CountDownLatch _stop_background_threads_latch;
scoped_refptr<Thread> _unused_rowset_monitor_thread;
Expand Down
4 changes: 2 additions & 2 deletions be/src/olap/task/engine_alter_tablet_task.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,11 @@ namespace doris {
EngineAlterTabletTask::EngineAlterTabletTask(const TAlterTabletReqV2& request)
: _alter_tablet_req(request) {
_mem_tracker = std::make_shared<MemTrackerLimiter>(
MemTrackerLimiter::Type::SCHEMA_CHANGE,
config::memory_limitation_per_thread_for_schema_change_bytes,
fmt::format("EngineAlterTabletTask#baseTabletId={}:newTabletId={}",
std::to_string(_alter_tablet_req.base_tablet_id),
std::to_string(_alter_tablet_req.new_tablet_id)),
StorageEngine::instance()->schema_change_mem_tracker());
std::to_string(_alter_tablet_req.new_tablet_id)));
}

Status EngineAlterTabletTask::execute() {
Expand Down
5 changes: 2 additions & 3 deletions be/src/olap/task/engine_batch_load_task.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -49,10 +49,9 @@ namespace doris {
EngineBatchLoadTask::EngineBatchLoadTask(TPushReq& push_req, std::vector<TTabletInfo>* tablet_infos)
: _push_req(push_req), _tablet_infos(tablet_infos) {
_mem_tracker = std::make_shared<MemTrackerLimiter>(
-1,
MemTrackerLimiter::Type::BATCHLOAD, -1,
fmt::format("EngineBatchLoadTask#pushType={}:tabletId={}", _push_req.push_type,
std::to_string(_push_req.tablet_id)),
StorageEngine::instance()->batch_load_mem_tracker());
std::to_string(_push_req.tablet_id)));
}

EngineBatchLoadTask::~EngineBatchLoadTask() {}
Expand Down
3 changes: 1 addition & 2 deletions be/src/olap/task/engine_checksum_task.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,7 @@ EngineChecksumTask::EngineChecksumTask(TTabletId tablet_id, TSchemaHash schema_h
TVersion version, uint32_t* checksum)
: _tablet_id(tablet_id), _schema_hash(schema_hash), _version(version), _checksum(checksum) {
_mem_tracker = std::make_shared<MemTrackerLimiter>(
-1, "EngineChecksumTask#tabletId=" + std::to_string(tablet_id),
StorageEngine::instance()->consistency_mem_tracker());
MemTrackerLimiter::Type::CONSISTENCY, -1, "EngineChecksumTask#tabletId=" + std::to_string(tablet_id));
}

Status EngineChecksumTask::execute() {
Expand Down
3 changes: 1 addition & 2 deletions be/src/olap/task/engine_clone_task.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,7 @@ EngineCloneTask::EngineCloneTask(const TCloneReq& clone_req, const TMasterInfo&
_signature(signature),
_master_info(master_info) {
_mem_tracker = std::make_shared<MemTrackerLimiter>(
-1, "EngineCloneTask#tabletId=" + std::to_string(_clone_req.tablet_id),
StorageEngine::instance()->clone_mem_tracker());
MemTrackerLimiter::Type::CLONE, -1, "EngineCloneTask#tabletId=" + std::to_string(_clone_req.tablet_id));
}

Status EngineCloneTask::execute() {
Expand Down
1 change: 0 additions & 1 deletion be/src/runtime/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,6 @@ set(RUNTIME_FILES
memory/chunk_allocator.cpp
memory/mem_tracker_limiter.cpp
memory/mem_tracker.cpp
memory/mem_tracker_task_pool.cpp
memory/thread_mem_tracker_mgr.cpp
fold_constant_executor.cpp
cache/result_node.cpp
Expand Down
6 changes: 3 additions & 3 deletions be/src/runtime/buffer_control_block.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ void GetResultBatchCtx::on_failure(const Status& status) {
status.to_protobuf(result->mutable_status());
{
// call by result sink
SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(ExecEnv::GetInstance()->bthread_mem_tracker());
SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(ExecEnv::GetInstance()->orphan_mem_tracker());
done->Run();
}
delete this;
Expand All @@ -45,7 +45,7 @@ void GetResultBatchCtx::on_close(int64_t packet_seq, QueryStatistics* statistics
result->set_packet_seq(packet_seq);
result->set_eos(true);
{
SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(ExecEnv::GetInstance()->bthread_mem_tracker());
SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(ExecEnv::GetInstance()->orphan_mem_tracker());
done->Run();
}
delete this;
Expand Down Expand Up @@ -73,7 +73,7 @@ void GetResultBatchCtx::on_data(const std::unique_ptr<TFetchDataResult>& t_resul
}
st.to_protobuf(result->mutable_status());
{
SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(ExecEnv::GetInstance()->bthread_mem_tracker());
SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(ExecEnv::GetInstance()->orphan_mem_tracker());
done->Run();
}
delete this;
Expand Down
19 changes: 5 additions & 14 deletions be/src/runtime/data_stream_recvr.cc
Original file line number Diff line number Diff line change
Expand Up @@ -186,10 +186,7 @@ Status DataStreamRecvr::SenderQueue::get_batch(RowBatch** next_batch) {

if (!_pending_closures.empty()) {
auto closure_pair = _pending_closures.front();
{
SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(ExecEnv::GetInstance()->bthread_mem_tracker());
closure_pair.first->Run();
}
closure_pair.first->Run();
_pending_closures.pop_front();

closure_pair.second.stop();
Expand Down Expand Up @@ -339,11 +336,8 @@ void DataStreamRecvr::SenderQueue::cancel() {

{
std::lock_guard<std::mutex> l(_lock);
{
SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(ExecEnv::GetInstance()->bthread_mem_tracker());
for (auto closure_pair : _pending_closures) {
closure_pair.first->Run();
}
for (auto closure_pair : _pending_closures) {
closure_pair.first->Run();
}
_pending_closures.clear();
}
Expand All @@ -357,11 +351,8 @@ void DataStreamRecvr::SenderQueue::close() {
std::lock_guard<std::mutex> l(_lock);
_is_cancelled = true;

{
SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(ExecEnv::GetInstance()->bthread_mem_tracker());
for (auto closure_pair : _pending_closures) {
closure_pair.first->Run();
}
for (auto closure_pair : _pending_closures) {
closure_pair.first->Run();
}
_pending_closures.clear();
}
Expand Down
Loading

0 comments on commit 59900bf

Please sign in to comment.