Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

No tracing memory usage of shared column data in MPPTask's memory tracker (#8131) #8136

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 37 additions & 2 deletions dbms/src/Common/MemoryTracker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,21 @@ static Poco::Logger * getLogger()
return logger;
}

static String storageMemoryUsageDetail()
{
return fmt::format(
"non-query: peak={}, amount={}; "
"shared-column-data: peak={}, amount={}.",
root_of_non_query_mem_trackers ? formatReadableSizeWithBinarySuffix(root_of_non_query_mem_trackers->getPeak())
: "0",
root_of_non_query_mem_trackers ? formatReadableSizeWithBinarySuffix(root_of_non_query_mem_trackers->get())
: "0",
shared_column_data_mem_tracker ? formatReadableSizeWithBinarySuffix(shared_column_data_mem_tracker->getPeak())
: "0",
shared_column_data_mem_tracker ? formatReadableSizeWithBinarySuffix(shared_column_data_mem_tracker->get())
: "0");
}

void MemoryTracker::logPeakMemoryUsage() const
{
LOG_DEBUG(getLogger(), "Peak memory usage{}: {}.", (description ? " " + std::string(description) : ""), formatReadableSizeWithBinarySuffix(peak));
Expand All @@ -79,7 +94,12 @@ void MemoryTracker::alloc(Int64 size, bool check_memory_limit)
Int64 will_be = size + amount.fetch_add(size, std::memory_order_relaxed);

if (!next.load(std::memory_order_relaxed))
{
CurrentMetrics::add(metric, size);
// Only add shared column data size to root_of_query_mem_trackers.
if (shared_column_data_mem_tracker && root_of_query_mem_trackers.get() == this)
will_be += shared_column_data_mem_tracker->get();
}

if (check_memory_limit)
{
Expand All @@ -101,6 +121,7 @@ void MemoryTracker::alloc(Int64 size, bool check_memory_limit)
(root_of_non_query_mem_trackers ? formatReadableSizeWithBinarySuffix(root_of_non_query_mem_trackers->peak) : "0"),
(root_of_non_query_mem_trackers ? formatReadableSizeWithBinarySuffix(root_of_non_query_mem_trackers->amount) : "0"),
proc_virt_size.load());
fmt_buf.fmtAppend(" Memory usage of storage: {}", storageMemoryUsageDetail());
throw DB::TiFlashException(fmt_buf.toString(), DB::Errors::Coprocessor::MemoryLimitExceeded);
}

Expand All @@ -118,7 +139,7 @@ void MemoryTracker::alloc(Int64 size, bool check_memory_limit)
formatReadableSizeWithBinarySuffix(will_be),
size,
formatReadableSizeWithBinarySuffix(current_limit));

fmt_buf.fmtAppend(" Memory usage of storage: {}", storageMemoryUsageDetail());
throw DB::TiFlashException(fmt_buf.toString(), DB::Errors::Coprocessor::MemoryLimitExceeded);
}
Int64 current_bytes_rss_larger_than_limit = bytes_rss_larger_than_limit.load(std::memory_order_relaxed);
Expand Down Expand Up @@ -150,7 +171,7 @@ void MemoryTracker::alloc(Int64 size, bool check_memory_limit)
size,
formatReadableSizeWithBinarySuffix(current_limit));
}

fmt_buf.fmtAppend(" Memory usage of storage: {}", storageMemoryUsageDetail());
throw DB::TiFlashException(fmt_buf.toString(), DB::Errors::Coprocessor::MemoryLimitExceeded);
}
}
Expand Down Expand Up @@ -224,6 +245,20 @@ thread_local MemoryTracker * current_memory_tracker = nullptr;
std::shared_ptr<MemoryTracker> root_of_non_query_mem_trackers = MemoryTracker::createGlobalRoot();
std::shared_ptr<MemoryTracker> root_of_query_mem_trackers = MemoryTracker::createGlobalRoot();

std::shared_ptr<MemoryTracker> shared_column_data_mem_tracker;

void initStorageMemoryTracker(Int64 limit, Int64 larger_than_limit)
{
LOG_INFO(
getLogger(),
"Storage task memory limit={}, larger_than_limit={}",
formatReadableSizeWithBinarySuffix(limit),
formatReadableSizeWithBinarySuffix(larger_than_limit));
RUNTIME_CHECK(shared_column_data_mem_tracker == nullptr);
shared_column_data_mem_tracker = MemoryTracker::create(limit);
shared_column_data_mem_tracker->setBytesThatRssLargerThanLimit(larger_than_limit);
}

namespace CurrentMemoryTracker
{
static Int64 MEMORY_TRACER_SUBMIT_THRESHOLD = 1024 * 1024; // 1 MiB
Expand Down
3 changes: 3 additions & 0 deletions dbms/src/Common/MemoryTracker.h
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,9 @@ extern thread_local MemoryTracker * current_memory_tracker;
extern std::shared_ptr<MemoryTracker> root_of_non_query_mem_trackers;
extern std::shared_ptr<MemoryTracker> root_of_query_mem_trackers;

extern std::shared_ptr<MemoryTracker> shared_column_data_mem_tracker;
void initStorageMemoryTracker(Int64 limit, Int64 larger_than_limit);

/// Convenience methods, that use current_memory_tracker if it is available.
namespace CurrentMemoryTracker
{
Expand Down
25 changes: 20 additions & 5 deletions dbms/src/Common/PODArray.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
#include <Common/Allocator.h>
#include <Common/BitHelpers.h>
#include <Common/Exception.h>
#include <Common/MemoryTrackerSetter.h>
#include <Common/memcpySmall.h>
#include <common/likely.h>
#include <common/strong_typedef.h>
Expand Down Expand Up @@ -104,6 +105,14 @@ class PODArrayBase : private boost::noncopyable
char * c_end = null;
char * c_end_of_storage = null; /// Does not include pad_right.

bool is_shared_memory;

[[nodiscard]] __attribute__((always_inline)) std::optional<MemoryTrackerSetter> swicthMemoryTracker()
{
return is_shared_memory ? std::make_optional<MemoryTrackerSetter>(true, shared_column_data_mem_tracker.get())
: std::nullopt;
}

/// The amount of memory occupied by the num_elements of the elements.
static size_t byte_size(size_t num_elements) { return num_elements * ELEMENT_SIZE; }

Expand All @@ -129,7 +138,10 @@ class PODArrayBase : private boost::noncopyable
template <typename... TAllocatorParams>
void alloc(size_t bytes, TAllocatorParams &&... allocator_params)
{
c_start = c_end = reinterpret_cast<char *>(TAllocator::alloc(bytes, std::forward<TAllocatorParams>(allocator_params)...)) + pad_left;
auto guard = swicthMemoryTracker();
c_start = c_end
= reinterpret_cast<char *>(TAllocator::alloc(bytes, std::forward<TAllocatorParams>(allocator_params)...))
+ pad_left;
c_end_of_storage = c_start + bytes - pad_right - pad_left;

if (pad_left)
Expand All @@ -143,6 +155,7 @@ class PODArrayBase : private boost::noncopyable

unprotect();

auto guard = swicthMemoryTracker();
TAllocator::free(c_start - pad_left, allocated_bytes());
}

Expand All @@ -157,6 +170,7 @@ class PODArrayBase : private boost::noncopyable

unprotect();

auto guard = swicthMemoryTracker();
ptrdiff_t end_diff = c_end - c_start;

c_start = reinterpret_cast<char *>(
Expand Down Expand Up @@ -281,10 +295,11 @@ class PODArrayBase : private boost::noncopyable
#endif
}

~PODArrayBase()
{
dealloc();
}
~PODArrayBase() { dealloc(); }

PODArrayBase()
: is_shared_memory(current_memory_tracker == nullptr)
{}
};

template <typename T, size_t INITIAL_SIZE = 4096, typename TAllocator = Allocator<false>, size_t pad_right_ = 0, size_t pad_left_ = 0>
Expand Down
3 changes: 3 additions & 0 deletions dbms/src/Server/Server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1239,6 +1239,9 @@ int Server::main(const std::vector<std::string> & /*args*/)
auto & blockable_bg_pool = global_context->initializeBlockableBackgroundPool(settings.background_pool_size);
// adjust the thread pool size according to settings and logical cores num
adjustThreadPoolSize(settings, server_info.cpu_info.logical_cores);
initStorageMemoryTracker(
settings.max_memory_usage_for_all_queries.getActualBytes(server_info.memory_info.capacity),
settings.bytes_that_rss_larger_than_limit);

/// PageStorage run mode has been determined above
if (!global_context->getSharedContextDisagg()->isDisaggregatedComputeMode())
Expand Down
10 changes: 9 additions & 1 deletion dbms/src/Storages/DeltaMerge/File/DMFileReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

#include <Columns/ColumnsCommon.h>
#include <Common/CurrentMetrics.h>
#include <Common/MemoryTracker.h>
#include <Common/Stopwatch.h>
#include <Common/escapeForFileName.h>
#include <DataTypes/IDataType.h>
Expand Down Expand Up @@ -698,8 +699,15 @@ void DMFileReader::readColumn(ColumnDefine & column_define,
size_t read_rows,
size_t skip_packs)
{
bool has_concurrent_reader = DMFileReaderPool::instance().hasConcurrentReader(*this);
if (!getCachedPacks(column_define.id, start_pack_id, pack_count, read_rows, column))
{
// If there are concurrent read requests, this data is likely to be shared.
// So the allocation and deallocation of this data may not be in the same MemoryTracker.
// This can lead to inaccurate memory statistics of MemoryTracker.
// To solve this problem, we use a independent global memory tracker to trace the shared column data in ColumnSharingCacheMap.
auto mem_tracker_guard
= has_concurrent_reader ? std::make_optional<MemoryTrackerSetter>(true, nullptr) : std::nullopt;
auto data_type = dmfile->getColumnStat(column_define.id).type;
auto col = data_type->createColumn();
readFromDisk(column_define, col, start_pack_id, read_rows, skip_packs, last_read_from_cache[column_define.id]);
Expand All @@ -711,7 +719,7 @@ void DMFileReader::readColumn(ColumnDefine & column_define,
last_read_from_cache[column_define.id] = true;
}

if (col_data_cache != nullptr)
if (has_concurrent_reader && col_data_cache != nullptr)
{
DMFileReaderPool::instance().set(*this, column_define.id, start_pack_id, pack_count, column);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,14 @@ void DMFileReaderPool::set(DMFileReader & from_reader, int64_t col_id, size_t st
}
}

// Check is there any concurrent DMFileReader with `from_reader`.
bool DMFileReaderPool::hasConcurrentReader(DMFileReader & from_reader)
{
std::lock_guard lock(mtx);
auto itr = readers.find(from_reader.path());
return itr != readers.end() && itr->second.size() >= 2;
}

DMFileReader * DMFileReaderPool::get(const std::string & name)
{
std::lock_guard lock(mtx);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,7 @@ class DMFileReaderPool
void add(DMFileReader & reader);
void del(DMFileReader & reader);
void set(DMFileReader & from_reader, int64_t col_id, size_t start, size_t count, ColumnPtr & col);
bool hasConcurrentReader(DMFileReader & from_reader);
// `get` is just for test.
DMFileReader * get(const std::string & name);

Expand Down
1 change: 0 additions & 1 deletion dbms/src/TestUtils/gtests_dbms_main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,6 @@ int main(int argc, char ** argv)
DB::tests::TiFlashTestEnv::setupLogger();
auto run_mode = DB::PageStorageRunMode::ONLY_V3;
DB::tests::TiFlashTestEnv::initializeGlobalContext(/*testdata_path*/ {}, run_mode);

DB::ServerInfo server_info;
// `DMFileReaderPool` should be constructed before and destructed after `SegmentReaderPoolManager`.
DB::DM::DMFileReaderPool::instance();
Expand Down