Skip to content

Commit

Permalink
fix query tracker brpc
Browse files Browse the repository at this point in the history
  • Loading branch information
xinyiZzz committed Oct 16, 2022
1 parent 4222178 commit 002569d
Show file tree
Hide file tree
Showing 10 changed files with 53 additions and 34 deletions.
10 changes: 4 additions & 6 deletions be/src/runtime/memory/mem_tracker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
#include <fmt/format.h>

#include "runtime/thread_context.h"
#include "util/pretty_printer.h"
#include "util/string_util.h"
#include "util/time.h"

Expand Down Expand Up @@ -103,11 +102,10 @@ void MemTracker::make_group_snapshot(std::vector<MemTracker::Snapshot>* snapshot
}

std::string MemTracker::log_usage(MemTracker::Snapshot snapshot) {
return fmt::format(
"MemTracker Label={}, Parent Label={}, Used={}({} B), Peak={}({} B)", snapshot.label,
snapshot.parent, PrettyPrinter::print(snapshot.cur_consumption, TUnit::BYTES),
snapshot.cur_consumption, PrettyPrinter::print(snapshot.peak_consumption, TUnit::BYTES),
snapshot.peak_consumption);
return fmt::format("MemTracker Label={}, Parent Label={}, Used={}({} B), Peak={}({} B)",
snapshot.label, snapshot.parent, print_bytes(snapshot.cur_consumption),
snapshot.cur_consumption, print_bytes(snapshot.peak_consumption),
snapshot.peak_consumption);
}

static std::unordered_map<std::string, std::shared_ptr<MemTracker>> global_mem_trackers;
Expand Down
6 changes: 6 additions & 0 deletions be/src/runtime/memory/mem_tracker.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
// and modified by Doris
#pragma once

#include "util/pretty_printer.h"
#include "util/runtime_profile.h"

namespace doris {
Expand Down Expand Up @@ -56,6 +57,11 @@ class MemTracker {
static std::shared_ptr<MemTracker> get_global_mem_tracker(const std::string& label);
static void make_global_mem_tracker_snapshot(std::vector<MemTracker::Snapshot>* snapshots);

static std::string print_bytes(int64_t bytes) {
return bytes >= 0 ? PrettyPrinter::print(bytes, TUnit::BYTES)
: "-" + PrettyPrinter::print(std::abs(bytes), TUnit::BYTES);
}

public:
const std::string& label() const { return _label; }
// Returns the memory consumed in bytes.
Expand Down
4 changes: 1 addition & 3 deletions be/src/runtime/memory/mem_tracker_limiter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -83,9 +83,7 @@ MemTrackerLimiter::~MemTrackerLimiter() {
_consumption->current_value());
}
if (_reset_zero) {
ExecEnv::GetInstance()->orphan_mem_tracker_raw()->cache_consume_local(
_consumption->current_value());
cache_consume_local(-_consumption->current_value());
reset_zero();
_all_ancestors.clear();
_all_ancestors.push_back(ExecEnv::GetInstance()->orphan_mem_tracker_raw());
}
Expand Down
10 changes: 6 additions & 4 deletions be/src/runtime/memory/mem_tracker_limiter.h
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,12 @@ class MemTrackerLimiter final : public MemTracker {
void enable_print_log_usage() { _print_log_usage = true; }
void enable_reset_zero() { _reset_zero = true; }

void reset_zero() {
ExecEnv::GetInstance()->orphan_mem_tracker_raw()->cache_consume_local(
_consumption->current_value());
cache_consume_local(-_consumption->current_value());
}

// Logs the usage of this tracker limiter and optionally its children (recursively).
// If 'logged_consumption' is non-nullptr, sets the consumption value logged.
// 'max_recursive_depth' specifies the maximum number of levels of children
Expand Down Expand Up @@ -159,10 +165,6 @@ class MemTrackerLimiter final : public MemTracker {
return msg.str();
}

static std::string print_bytes(int64_t bytes) {
return PrettyPrinter::print(bytes, TUnit::BYTES);
}

private:
// The following func, for automatic memory tracking and limiting based on system memory allocation.
friend class ThreadMemTrackerMgr;
Expand Down
13 changes: 8 additions & 5 deletions be/src/runtime/memory/mem_tracker_task_pool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ std::shared_ptr<MemTrackerLimiter> MemTrackerTaskPool::register_query_scanner_me
const std::string& query_id) {
return register_task_mem_tracker_impl("Scanner#" + query_id, -1,
fmt::format("Scanner#Query#Id={}", query_id),
ExecEnv::GetInstance()->query_pool_mem_tracker());
get_task_mem_tracker(query_id));
}

std::shared_ptr<MemTrackerLimiter> MemTrackerTaskPool::register_load_mem_tracker(
Expand All @@ -69,7 +69,7 @@ std::shared_ptr<MemTrackerLimiter> MemTrackerTaskPool::register_load_scanner_mem
const std::string& load_id) {
return register_task_mem_tracker_impl("Scanner#" + load_id, -1,
fmt::format("Scanner#Load#Id={}", load_id),
ExecEnv::GetInstance()->load_pool_mem_tracker());
get_task_mem_tracker(load_id));
}

std::shared_ptr<MemTrackerLimiter> MemTrackerTaskPool::get_task_mem_tracker(
Expand Down Expand Up @@ -104,10 +104,13 @@ void MemTrackerTaskPool::logout_task_mem_tracker() {
LOG(INFO) << fmt::format(
"Deregister query/load memory tracker, queryId={}, Limit={}, CurrUsed={}, "
"PeakUsed={}",
it->first, PrettyPrinter::print(it->second->limit(), TUnit::BYTES),
PrettyPrinter::print(it->second->consumption(), TUnit::BYTES),
PrettyPrinter::print(it->second->peak_consumption(), TUnit::BYTES));
it->first, MemTracker::print_bytes(it->second->limit()),
MemTracker::print_bytes(it->second->consumption()),
MemTracker::print_bytes(it->second->peak_consumption()));
expired_task_ids.emplace_back(it->first);
} else if (config::memory_verbose_track) {
it->second->print_log_usage("query routine");
it->second->enable_print_log_usage();
}
}
for (auto tid : expired_task_ids) {
Expand Down
6 changes: 4 additions & 2 deletions be/src/runtime/memory/thread_mem_tracker_mgr.h
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,10 @@ class ThreadMemTrackerMgr {
// After attach, the current thread TCMalloc Hook starts to consume/release task mem_tracker
void attach_limiter_tracker(const std::string& task_id, const TUniqueId& fragment_instance_id,
const std::shared_ptr<MemTrackerLimiter>& mem_tracker);

void detach_limiter_tracker();
// Usually there are only two layers, the first is the default trackerOrphan;
// the second is the query tracker or bthread tracker.
int64_t get_attach_layers() { return _limiter_tracker_stack.size(); }

// Must be fast enough! Thread update_tracker may be called very frequently.
// So for performance, add tracker as early as possible, and then call update_tracker<Existed>.
Expand Down Expand Up @@ -163,11 +165,11 @@ inline void ThreadMemTrackerMgr::init_impl() {
}

inline void ThreadMemTrackerMgr::clear() {
flush_untracked_mem<false>();
std::vector<std::shared_ptr<MemTrackerLimiter>>().swap(_limiter_tracker_stack);
std::vector<MemTracker*>().swap(_consumer_tracker_stack);
std::vector<std::string>().swap(_task_id_stack);
std::vector<TUniqueId>().swap(_fragment_instance_id_stack);
flush_untracked_mem<false>();
init_impl();
}

Expand Down
19 changes: 14 additions & 5 deletions be/src/runtime/thread_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -191,26 +191,35 @@ class ThreadContext {
static void attach_bthread() {
bthread_id = bthread_self();
bthread_context = static_cast<ThreadContext*>(bthread_getspecific(btls_key));
// First call to bthread_getspecific (and before any bthread_setspecific) returns NULL
if (bthread_context == nullptr) {
// A new bthread starts, two scenarios:
// 1. First call to bthread_getspecific (and before any bthread_setspecific) returns NULL
// 2. There are not enough reusable btls in btls pool.
#ifndef BE_TEST
DCHECK(ExecEnv::GetInstance()->initialized());
#endif
// Create thread-local data on demand.
bthread_context = new ThreadContext;
std::shared_ptr<MemTrackerLimiter> btls_tracker =
std::make_shared<MemTrackerLimiter>(-1, "Bthread:id=" + std::to_string(bthread_id),
ExecEnv::GetInstance()->bthread_mem_tracker());
bthread_context->attach_task(ThreadContext::TaskType::BRPC, "", TUniqueId(), btls_tracker);
// set the data so that next time bthread_getspecific in the thread returns the data.
CHECK_EQ(0, bthread_setspecific(btls_key, bthread_context));
} else {
bthread_context->_thread_mem_tracker_mgr->clear();
// two scenarios:
// 1. A new bthread starts, but get a reuses btls.
// 2. A pthread switch occurs. Because the pthread switch cannot be accurately identified at the moment.
// So tracker call reset 0 like reuses btls.
DCHECK(bthread_context->_thread_mem_tracker_mgr->get_attach_layers() == 2);
bthread_context->_thread_mem_tracker_mgr->limiter_mem_tracker_raw()->reset_zero();
}
bthread_context->attach_task(ThreadContext::TaskType::BRPC, "", TUniqueId(),
ExecEnv::GetInstance()->bthread_mem_tracker());
}

static ThreadContext* thread_context() {
if (bthread_self() != 0) {
if (bthread_self() != bthread_id) {
// pthread switch occurs, updating bthread_context and bthread_context_key cached in pthread tls.
// A new bthread starts or pthread switch occurs.
thread_context_ptr.init = false;
attach_bthread();
thread_context_ptr.init = true;
Expand Down
4 changes: 4 additions & 0 deletions be/src/service/doris_main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -513,6 +513,10 @@ int main(int argc, char** argv) {
// The process tracker print log usage interval is 1s to avoid a large number of tasks being
// canceled when the process exceeds the mem limit, resulting in too many duplicate logs.
doris::ExecEnv::GetInstance()->process_mem_tracker()->enable_print_log_usage();
if (doris::config::memory_verbose_track) {
doris::ExecEnv::GetInstance()->process_mem_tracker()->print_log_usage("main routine");
doris::ExecEnv::GetInstance()->process_mem_tracker()->enable_print_log_usage();
}
sleep(1);
}

Expand Down
10 changes: 2 additions & 8 deletions be/src/service/internal_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -132,10 +132,8 @@ void PInternalServiceImpl::_transmit_data(google::protobuf::RpcController* cntl_
query_id = print_id(request->query_id());
finst_id.__set_hi(request->finst_id().hi());
finst_id.__set_lo(request->finst_id().lo());
// In some cases, query mem tracker does not exist in BE when transmit block, will get null pointer.
transmit_tracker = std::make_shared<MemTrackerLimiter>(
-1, fmt::format("QueryTransmit#queryId={}", query_id),
_exec_env->task_pool_mem_tracker_registry()->get_task_mem_tracker(query_id));
transmit_tracker =
_exec_env->task_pool_mem_tracker_registry()->get_task_mem_tracker(query_id);
} else {
query_id = "unkown_transmit_data";
transmit_tracker = std::make_shared<MemTrackerLimiter>(-1, "unkown_transmit_data");
Expand Down Expand Up @@ -642,10 +640,6 @@ void PInternalServiceImpl::_transmit_block(google::protobuf::RpcController* cntl
query_id = print_id(request->query_id());
finst_id.__set_hi(request->finst_id().hi());
finst_id.__set_lo(request->finst_id().lo());
// In some cases, query mem tracker does not exist in BE when transmit block, will get null pointer.
transmit_tracker = std::make_shared<MemTrackerLimiter>(
-1, fmt::format("QueryTransmit#queryId={}", query_id),
_exec_env->task_pool_mem_tracker_registry()->get_task_mem_tracker(query_id));
transmit_tracker =
_exec_env->task_pool_mem_tracker_registry()->get_task_mem_tracker(query_id);
} else {
Expand Down
5 changes: 4 additions & 1 deletion be/src/vec/runtime/vdata_stream_recvr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,10 @@ Status VDataStreamRecvr::SenderQueue::get_batch(Block** next_block) {
}

// _cur_batch must be replaced with the returned batch.
_current_block.reset();
{
SCOPED_ATTACH_TASK(ExecEnv::GetInstance()->orphan_mem_tracker());
_current_block.reset();
}
*next_block = nullptr;
if (_is_cancelled) {
return Status::Cancelled("Cancelled");
Expand Down

0 comments on commit 002569d

Please sign in to comment.