Skip to content

Commit

Permalink
fix load channel tracker
Browse files Browse the repository at this point in the history
  • Loading branch information
xinyiZzz committed Nov 1, 2022
1 parent 3c95106 commit aae4dbf
Show file tree
Hide file tree
Showing 40 changed files with 181 additions and 153 deletions.
12 changes: 2 additions & 10 deletions be/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -647,16 +647,8 @@ CONF_mInt32(remote_storage_read_buffer_mb, "16");
// Whether Hook TCmalloc new/delete, currently consume/release tls mem tracker in Hook.
CONF_Bool(enable_tcmalloc_hook, "true");

// If true, switch TLS MemTracker to count more detailed memory,
// including caches such as ExecNode operators and TabletManager.
//
// At present, there is a performance problem in the frequent switch thread mem tracker.
// This is because the mem tracker exists as a shared_ptr in the thread local. Each time it is switched,
// the atomic variable use_count in the shared_ptr of the current tracker will be -1, and the tracker to be
// replaced use_count +1, multi-threading Frequent changes to the same tracker shared_ptr are slow.
// TODO: 1. Reduce unnecessary thread mem tracker switches,
// 2. Consider using raw pointers for mem tracker in thread local
CONF_Bool(memory_verbose_track, "false");
// Print more detailed logs, more detailed records, etc.
CONF_Bool(memory_debug, "false");

// The minimum length when TCMalloc Hook consumes/releases MemTracker, consume size
// smaller than this value will continue to accumulate. specified as number of bytes.
Expand Down
2 changes: 1 addition & 1 deletion be/src/exec/blocking_join_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ Status BlockingJoinNode::close(RuntimeState* state) {

void BlockingJoinNode::build_side_thread(RuntimeState* state, std::promise<Status>* status) {
SCOPED_ATTACH_TASK(state);
SCOPED_CONSUME_MEM_TRACKER(mem_tracker());
SCOPED_CONSUME_MEM_TRACKER(mem_tracker_shared());
status->set_value(construct_build_side(state));
}

Expand Down
2 changes: 1 addition & 1 deletion be/src/exec/broker_scan_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -376,7 +376,7 @@ Status BrokerScanNode::scanner_scan(const TBrokerScanRange& scan_range,

void BrokerScanNode::scanner_worker(int start_idx, int length) {
SCOPED_ATTACH_TASK(_runtime_state);
SCOPED_CONSUME_MEM_TRACKER(mem_tracker());
SCOPED_CONSUME_MEM_TRACKER(mem_tracker_shared());
// Clone expr context
std::vector<ExprContext*> scanner_expr_ctxs;
auto status = Expr::clone_if_not_exists(_conjunct_ctxs, _runtime_state, &scanner_expr_ctxs);
Expand Down
2 changes: 1 addition & 1 deletion be/src/exec/es_http_scan_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -424,7 +424,7 @@ static std::string get_host_port(const std::vector<TNetworkAddress>& es_hosts) {

void EsHttpScanNode::scanner_worker(int start_idx, int length, std::promise<Status>& p_status) {
SCOPED_ATTACH_TASK(_runtime_state);
SCOPED_CONSUME_MEM_TRACKER(mem_tracker());
SCOPED_CONSUME_MEM_TRACKER(mem_tracker_shared());
// Clone expr context
std::vector<ExprContext*> scanner_expr_ctxs;
DCHECK(start_idx < length);
Expand Down
2 changes: 1 addition & 1 deletion be/src/exec/except_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,8 @@ Status ExceptNode::init(const TPlanNode& tnode, RuntimeState* state) {
}

Status ExceptNode::open(RuntimeState* state) {
SCOPED_CONSUME_MEM_TRACKER(mem_tracker());
RETURN_IF_ERROR(SetOperationNode::open(state));
SCOPED_CONSUME_MEM_TRACKER(mem_tracker());
// if a table is empty, the result must be empty
if (_hash_tbl->size() == 0) {
_hash_tbl_iterator = _hash_tbl->begin();
Expand Down
2 changes: 1 addition & 1 deletion be/src/exec/exec_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,7 @@ Status ExecNode::prepare(RuntimeState* state) {
std::bind<int64_t>(&RuntimeProfile::units_per_second, _rows_returned_counter,
runtime_profile()->total_time_counter()),
"");
_mem_tracker = std::make_unique<MemTracker>("ExecNode:" + _runtime_profile->name(),
_mem_tracker = std::make_shared<MemTracker>("ExecNode:" + _runtime_profile->name(),
_runtime_profile.get());
SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get());

Expand Down
3 changes: 2 additions & 1 deletion be/src/exec/exec_node.h
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,7 @@ class ExecNode {
RuntimeProfile::Counter* memory_used_counter() const { return _memory_used_counter; }

MemTracker* mem_tracker() const { return _mem_tracker.get(); }
std::shared_ptr<MemTracker> mem_tracker_shared() const { return _mem_tracker; }

OpentelemetrySpan get_next_span() { return _get_next_span; }

Expand Down Expand Up @@ -298,7 +299,7 @@ class ExecNode {
std::unique_ptr<RuntimeProfile> _runtime_profile;

/// Account for peak memory used by this node
std::unique_ptr<MemTracker> _mem_tracker;
std::shared_ptr<MemTracker> _mem_tracker;

RuntimeProfile::Counter* _rows_returned_counter;
RuntimeProfile::Counter* _rows_returned_rate;
Expand Down
2 changes: 1 addition & 1 deletion be/src/exec/hash_join_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ Status HashJoinNode::close(RuntimeState* state) {

void HashJoinNode::probe_side_open_thread(RuntimeState* state, std::promise<Status>* status) {
SCOPED_ATTACH_TASK(state);
SCOPED_CONSUME_MEM_TRACKER(mem_tracker());
SCOPED_CONSUME_MEM_TRACKER(mem_tracker_shared());
status->set_value(child(0)->open(state));
}

Expand Down
2 changes: 1 addition & 1 deletion be/src/exec/intersect_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,8 @@ Status IntersectNode::init(const TPlanNode& tnode, RuntimeState* state) {
// 2 probe with child(1), then filter the hash table and find the matched item, use them to rebuild a hash table
// repeat [2] this for all the rest child
Status IntersectNode::open(RuntimeState* state) {
SCOPED_CONSUME_MEM_TRACKER(mem_tracker());
RETURN_IF_ERROR(SetOperationNode::open(state));
SCOPED_CONSUME_MEM_TRACKER(mem_tracker());
// if a table is empty, the result must be empty
if (_hash_tbl->size() == 0) {
_hash_tbl_iterator = _hash_tbl->begin();
Expand Down
9 changes: 4 additions & 5 deletions be/src/exec/olap_scan_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ Status OlapScanNode::prepare(RuntimeState* state) {
_init_counter(state);
_tuple_desc = state->desc_tbl().get_tuple_descriptor(_tuple_id);

_scanner_mem_tracker = std::make_unique<MemTracker>("Scanners");
_scanner_mem_tracker = std::make_shared<MemTracker>("Scanners");

if (_tuple_desc == nullptr) {
// TODO: make sure we print all available diagnostic output to our error log
Expand Down Expand Up @@ -941,7 +941,7 @@ Status OlapScanNode::start_scan_thread(RuntimeState* state) {
}
OlapScanner* scanner =
new OlapScanner(state, this, _olap_scan_node.is_preaggregation,
_need_agg_finalize, *scan_range, _scanner_mem_tracker.get());
_need_agg_finalize, *scan_range, _scanner_mem_tracker);
scanner->set_batch_size(_batch_size);
// add scanner to pool before doing prepare.
// so that scanner can be automatically deconstructed if prepare failed.
Expand Down Expand Up @@ -1483,7 +1483,7 @@ Status OlapScanNode::normalize_bloom_filter_predicate(SlotDescriptor* slot) {
void OlapScanNode::transfer_thread(RuntimeState* state) {
// scanner open pushdown to scanThread
SCOPED_ATTACH_TASK(state);
SCOPED_CONSUME_MEM_TRACKER(mem_tracker());
SCOPED_CONSUME_MEM_TRACKER(mem_tracker_shared());
Status status = Status::OK();
for (auto scanner : _olap_scanners) {
status = Expr::clone_if_not_exists(_conjunct_ctxs, state, scanner->conjunct_ctxs());
Expand Down Expand Up @@ -1660,8 +1660,7 @@ void OlapScanNode::transfer_thread(RuntimeState* state) {
}

void OlapScanNode::scanner_thread(OlapScanner* scanner) {
// SCOPED_ATTACH_TASK(_runtime_state);
SCOPED_CONSUME_MEM_TRACKER(mem_tracker());
SCOPED_CONSUME_MEM_TRACKER(mem_tracker_shared());
Thread::set_self_name("olap_scanner");
if (UNLIKELY(_transfer_done)) {
_scanner_done = true;
Expand Down
2 changes: 1 addition & 1 deletion be/src/exec/olap_scan_node.h
Original file line number Diff line number Diff line change
Expand Up @@ -264,7 +264,7 @@ class OlapScanNode : public ScanNode {

int64_t _buffered_bytes;
// Count the memory consumption of Rowset Reader and Tablet Reader in OlapScanner.
std::unique_ptr<MemTracker> _scanner_mem_tracker;
std::shared_ptr<MemTracker> _scanner_mem_tracker;
EvalConjunctsFn _eval_conjuncts_fn;

// the max num of scan keys of this scan request.
Expand Down
8 changes: 4 additions & 4 deletions be/src/exec/olap_scanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,16 +43,16 @@ namespace doris {

OlapScanner::OlapScanner(RuntimeState* runtime_state, OlapScanNode* parent, bool aggregation,
bool need_agg_finalize, const TPaloScanRange& scan_range,
MemTracker* tracker)
const std::shared_ptr<MemTracker>& tracker)
: _runtime_state(runtime_state),
_parent(parent),
_tuple_desc(parent->_tuple_desc),
_id(-1),
_is_open(false),
_aggregation(aggregation),
_need_agg_finalize(need_agg_finalize),
_version(-1) {
_mem_tracker = tracker;
_version(-1),
_mem_tracker(tracker) {
_tablet_schema = std::make_shared<TabletSchema>();
}

Expand Down Expand Up @@ -326,7 +326,7 @@ Status OlapScanner::get_batch(RuntimeState* state, RowBatch* batch, bool* eof) {
bzero(tuple_buf, _batch_size * _tuple_desc->byte_size());
Tuple* tuple = reinterpret_cast<Tuple*>(tuple_buf);

std::unique_ptr<MemPool> mem_pool(new MemPool(_mem_tracker));
std::unique_ptr<MemPool> mem_pool(new MemPool(_mem_tracker.get()));
int64_t raw_rows_threshold = raw_rows_read() + config::doris_scanner_row_num;
int64_t raw_bytes_threshold = config::doris_scanner_row_bytes;
{
Expand Down
5 changes: 3 additions & 2 deletions be/src/exec/olap_scanner.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,8 @@ class OlapScanNode;
class OlapScanner {
public:
OlapScanner(RuntimeState* runtime_state, OlapScanNode* parent, bool aggregation,
bool need_agg_finalize, const TPaloScanRange& scan_range, MemTracker* tracker);
bool need_agg_finalize, const TPaloScanRange& scan_range,
const std::shared_ptr<MemTracker>& tracker);

virtual ~OlapScanner() = default;

Expand Down Expand Up @@ -151,7 +152,7 @@ class OlapScanner {

MonotonicStopWatch _watcher;

MemTracker* _mem_tracker;
std::shared_ptr<MemTracker> _mem_tracker;

TabletSchemaSPtr _tablet_schema;
};
Expand Down
44 changes: 23 additions & 21 deletions be/src/olap/delta_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,22 +33,22 @@

namespace doris {

Status DeltaWriter::open(WriteRequest* req, DeltaWriter** writer,
const std::shared_ptr<MemTrackerLimiter>& parent_tracker, bool is_vec) {
*writer = new DeltaWriter(req, StorageEngine::instance(), parent_tracker, is_vec);
Status DeltaWriter::open(WriteRequest* req, DeltaWriter** writer, const UniqueId& load_id,
bool is_vec) {
*writer = new DeltaWriter(req, StorageEngine::instance(), load_id, is_vec);
return Status::OK();
}

DeltaWriter::DeltaWriter(WriteRequest* req, StorageEngine* storage_engine,
const std::shared_ptr<MemTrackerLimiter>& parent_tracker, bool is_vec)
DeltaWriter::DeltaWriter(WriteRequest* req, StorageEngine* storage_engine, const UniqueId& load_id,
bool is_vec)
: _req(*req),
_tablet(nullptr),
_cur_rowset(nullptr),
_rowset_writer(nullptr),
_tablet_schema(new TabletSchema),
_delta_written_success(false),
_storage_engine(storage_engine),
_parent_tracker(parent_tracker),
_load_id(load_id),
_is_vec(is_vec) {}

DeltaWriter::~DeltaWriter() {
Expand Down Expand Up @@ -109,8 +109,6 @@ Status DeltaWriter::init() {
_rowset_ids = _tablet->all_rs_id(_cur_max_version);
}

_mem_tracker = std::make_shared<MemTrackerLimiter>(
-1, fmt::format("DeltaWriter:tabletId={}", _tablet->tablet_id()), _parent_tracker);
// check tablet version number
if (_tablet->version_count() > config::max_tablet_version_num) {
//trigger quick compaction
Expand Down Expand Up @@ -164,8 +162,6 @@ Status DeltaWriter::write(Tuple* tuple) {
return Status::OLAPInternalError(OLAP_ERR_ALREADY_CANCELLED);
}

SCOPED_ATTACH_TASK(_mem_tracker, ThreadContext::TaskType::LOAD);

_mem_table->insert(tuple);

// if memtable is full, push it to the flush executor,
Expand Down Expand Up @@ -281,9 +277,18 @@ void DeltaWriter::_reset_mem_table() {
if (_tablet->enable_unique_key_merge_on_write() && _delete_bitmap == nullptr) {
_delete_bitmap.reset(new DeleteBitmap(_tablet->tablet_id()));
}
auto mem_table_insert_tracker = std::make_shared<MemTracker>(
fmt::format("MemTableManualInsert:TabletId={}:MemTableNum={}#loadID={}",
std::to_string(tablet_id()), _mem_table_num, _load_id.to_string()));
auto mem_table_flush_tracker = std::make_shared<MemTracker>(
fmt::format("MemTableHookFlush:TabletId={}:MemTableNum={}#loadID={}",
std::to_string(tablet_id()), _mem_table_num++, _load_id.to_string()));
_mem_table_tracker.push_back(mem_table_insert_tracker);
_mem_table_tracker.push_back(mem_table_flush_tracker);
_mem_table.reset(new MemTable(_tablet, _schema.get(), _tablet_schema.get(), _req.slots,
_req.tuple_desc, _rowset_writer.get(), _delete_bitmap,
_rowset_ids, _cur_max_version, _mem_tracker, _is_vec));
_rowset_ids, _cur_max_version, mem_table_insert_tracker,
mem_table_flush_tracker, _is_vec));
}

Status DeltaWriter::close() {
Expand Down Expand Up @@ -385,7 +390,7 @@ Status DeltaWriter::cancel() {

void DeltaWriter::save_mem_consumption_snapshot() {
_mem_consumption_snapshot = mem_consumption();
_memtable_consumption_snapshot = memtable_consumption();
_memtable_consumption_snapshot = _mem_table->memory_usage();
}

int64_t DeltaWriter::get_memtable_consumption_inflush() const {
Expand All @@ -398,19 +403,16 @@ int64_t DeltaWriter::get_memtable_consumption_snapshot() const {
}

int64_t DeltaWriter::mem_consumption() const {
if (_mem_tracker == nullptr) {
if (_flush_token == nullptr) {
// This method may be called before this writer is initialized.
// So _mem_tracker may be null.
// So _flush_token may be null.
return 0;
}
return _mem_tracker->consumption();
}

int64_t DeltaWriter::memtable_consumption() const {
if (_mem_table == nullptr) {
return 0;
int64_t mem_usage = 0;
for (auto mem_table_tracker : _mem_table_tracker) {
mem_usage += mem_table_tracker->consumption();
}
return _mem_table->mem_tracker_hook()->consumption();
return mem_usage;
}

int64_t DeltaWriter::partition_id() const {
Expand Down
20 changes: 6 additions & 14 deletions be/src/olap/delta_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -56,9 +56,7 @@ struct WriteRequest {
class DeltaWriter {
public:
static Status open(WriteRequest* req, DeltaWriter** writer,
const std::shared_ptr<MemTrackerLimiter>& parent_tracker =
std::shared_ptr<MemTrackerLimiter>(),
bool is_vec = false);
const UniqueId& load_id = TUniqueId(), bool is_vec = false);

~DeltaWriter();

Expand Down Expand Up @@ -102,8 +100,6 @@ class DeltaWriter {

int32_t schema_hash() { return _tablet->schema_hash(); }

int64_t memtable_consumption() const;

void save_mem_consumption_snapshot();

int64_t get_memtable_consumption_inflush() const;
Expand All @@ -113,8 +109,8 @@ class DeltaWriter {
void finish_slave_tablet_pull_rowset(int64_t node_id, bool is_succeed);

private:
DeltaWriter(WriteRequest* req, StorageEngine* storage_engine,
const std::shared_ptr<MemTrackerLimiter>& parent_tracker, bool is_vec);
DeltaWriter(WriteRequest* req, StorageEngine* storage_engine, const UniqueId& load_id,
bool is_vec);

// push a full memtable to flush executor
Status _flush_memtable_async();
Expand Down Expand Up @@ -146,14 +142,10 @@ class DeltaWriter {
bool _delta_written_success;

StorageEngine* _storage_engine;
UniqueId _load_id;
std::unique_ptr<FlushToken> _flush_token;
// The memory value automatically tracked by the Tcmalloc hook is 20% less than the manually recorded
// value in the memtable, because some freed memory is not allocated in the DeltaWriter.
// The memory value automatically tracked by the Tcmalloc hook, used for load channel mgr to trigger
// flush memtable when the sum of all channel memory exceeds the limit.
// The manually recorded value of memtable is used to flush when it is larger than write_buffer_size.
std::shared_ptr<MemTrackerLimiter> _mem_tracker;
std::shared_ptr<MemTrackerLimiter> _parent_tracker;
std::vector<std::shared_ptr<MemTracker>> _mem_table_tracker;
std::atomic<uint32_t> _mem_table_num = 1;

// The counter of number of segment flushed already.
int64_t _segment_counter = 0;
Expand Down
Loading

0 comments on commit aae4dbf

Please sign in to comment.