diff --git a/dbms/src/Common/MemoryTracker.cpp b/dbms/src/Common/MemoryTracker.cpp index 3cb0393c44e..98c2255f13f 100644 --- a/dbms/src/Common/MemoryTracker.cpp +++ b/dbms/src/Common/MemoryTracker.cpp @@ -64,6 +64,21 @@ static Poco::Logger * getLogger() return logger; } +static String storageMemoryUsageDetail() +{ + return fmt::format( + "non-query: peak={}, amount={}; " + "shared-column-data: peak={}, amount={}.", + root_of_non_query_mem_trackers ? formatReadableSizeWithBinarySuffix(root_of_non_query_mem_trackers->getPeak()) + : "0", + root_of_non_query_mem_trackers ? formatReadableSizeWithBinarySuffix(root_of_non_query_mem_trackers->get()) + : "0", + shared_column_data_mem_tracker ? formatReadableSizeWithBinarySuffix(shared_column_data_mem_tracker->getPeak()) + : "0", + shared_column_data_mem_tracker ? formatReadableSizeWithBinarySuffix(shared_column_data_mem_tracker->get()) + : "0"); +} + void MemoryTracker::logPeakMemoryUsage() const { LOG_DEBUG(getLogger(), "Peak memory usage{}: {}.", (description ? " " + std::string(description) : ""), formatReadableSizeWithBinarySuffix(peak)); @@ -78,7 +93,12 @@ void MemoryTracker::alloc(Int64 size, bool check_memory_limit) Int64 will_be = size + amount.fetch_add(size, std::memory_order_relaxed); if (!next.load(std::memory_order_relaxed)) + { CurrentMetrics::add(metric, size); + // Only add shared column data size to root_of_query_mem_trackers. + if (shared_column_data_mem_tracker && root_of_query_mem_trackers.get() == this) + will_be += shared_column_data_mem_tracker->get(); + } if (check_memory_limit) { @@ -100,6 +120,7 @@ void MemoryTracker::alloc(Int64 size, bool check_memory_limit) (root_of_non_query_mem_trackers ? formatReadableSizeWithBinarySuffix(root_of_non_query_mem_trackers->peak) : "0"), (root_of_non_query_mem_trackers ? formatReadableSizeWithBinarySuffix(root_of_non_query_mem_trackers->amount) : "0"), proc_virt_size.load()); + fmt_buf.fmtAppend(" Memory usage of storage: {}", storageMemoryUsageDetail()); throw DB::TiFlashException(fmt_buf.toString(), DB::Errors::Coprocessor::MemoryLimitExceeded); } @@ -117,7 +138,7 @@ void MemoryTracker::alloc(Int64 size, bool check_memory_limit) formatReadableSizeWithBinarySuffix(will_be), size, formatReadableSizeWithBinarySuffix(current_limit)); - + fmt_buf.fmtAppend(" Memory usage of storage: {}", storageMemoryUsageDetail()); throw DB::TiFlashException(fmt_buf.toString(), DB::Errors::Coprocessor::MemoryLimitExceeded); } Int64 current_bytes_rss_larger_than_limit = bytes_rss_larger_than_limit.load(std::memory_order_relaxed); @@ -148,7 +169,7 @@ void MemoryTracker::alloc(Int64 size, bool check_memory_limit) size, formatReadableSizeWithBinarySuffix(current_limit)); } - + fmt_buf.fmtAppend(" Memory usage of storage: {}", storageMemoryUsageDetail()); throw DB::TiFlashException(fmt_buf.toString(), DB::Errors::Coprocessor::MemoryLimitExceeded); } } @@ -222,6 +243,20 @@ thread_local MemoryTracker * current_memory_tracker = nullptr; std::shared_ptr root_of_non_query_mem_trackers = MemoryTracker::createGlobalRoot(); std::shared_ptr root_of_query_mem_trackers = MemoryTracker::createGlobalRoot(); +std::shared_ptr shared_column_data_mem_tracker; + +void initStorageMemoryTracker(Int64 limit, Int64 larger_than_limit) +{ + LOG_INFO( + getLogger(), + "Storage task memory limit={}, larger_than_limit={}", + formatReadableSizeWithBinarySuffix(limit), + formatReadableSizeWithBinarySuffix(larger_than_limit)); + RUNTIME_CHECK(shared_column_data_mem_tracker == nullptr); + shared_column_data_mem_tracker = MemoryTracker::create(limit); + shared_column_data_mem_tracker->setBytesThatRssLargerThanLimit(larger_than_limit); +} + namespace CurrentMemoryTracker { static Int64 MEMORY_TRACER_SUBMIT_THRESHOLD = 1024 * 1024; // 1 MiB diff --git a/dbms/src/Common/MemoryTracker.h b/dbms/src/Common/MemoryTracker.h index bffe1db272e..1e72d4d8024 100644 --- a/dbms/src/Common/MemoryTracker.h +++ b/dbms/src/Common/MemoryTracker.h @@ -157,6 +157,9 @@ extern thread_local MemoryTracker * current_memory_tracker; extern std::shared_ptr root_of_non_query_mem_trackers; extern std::shared_ptr root_of_query_mem_trackers; +extern std::shared_ptr shared_column_data_mem_tracker; +void initStorageMemoryTracker(Int64 limit, Int64 larger_than_limit); + /// Convenience methods, that use current_memory_tracker if it is available. namespace CurrentMemoryTracker { diff --git a/dbms/src/Common/PODArray.h b/dbms/src/Common/PODArray.h index 23b94bb3e3b..a5b24a974c2 100644 --- a/dbms/src/Common/PODArray.h +++ b/dbms/src/Common/PODArray.h @@ -17,6 +17,7 @@ #include #include #include +#include #include #include #include @@ -104,6 +105,14 @@ class PODArrayBase : private boost::noncopyable char * c_end = null; char * c_end_of_storage = null; /// Does not include pad_right. + bool is_shared_memory; + + [[nodiscard]] __attribute__((always_inline)) std::optional swicthMemoryTracker() + { + return is_shared_memory ? std::make_optional(true, shared_column_data_mem_tracker.get()) + : std::nullopt; + } + /// The amount of memory occupied by the num_elements of the elements. static size_t byte_size(size_t num_elements) { return num_elements * ELEMENT_SIZE; } @@ -129,7 +138,10 @@ class PODArrayBase : private boost::noncopyable template void alloc(size_t bytes, TAllocatorParams &&... allocator_params) { - c_start = c_end = reinterpret_cast(TAllocator::alloc(bytes, std::forward(allocator_params)...)) + pad_left; + auto guard = swicthMemoryTracker(); + c_start = c_end + = reinterpret_cast(TAllocator::alloc(bytes, std::forward(allocator_params)...)) + + pad_left; c_end_of_storage = c_start + bytes - pad_right - pad_left; if (pad_left) @@ -143,6 +155,7 @@ class PODArrayBase : private boost::noncopyable unprotect(); + auto guard = swicthMemoryTracker(); TAllocator::free(c_start - pad_left, allocated_bytes()); } @@ -157,6 +170,7 @@ class PODArrayBase : private boost::noncopyable unprotect(); + auto guard = swicthMemoryTracker(); ptrdiff_t end_diff = c_end - c_start; c_start = reinterpret_cast( @@ -281,10 +295,11 @@ class PODArrayBase : private boost::noncopyable #endif } - ~PODArrayBase() - { - dealloc(); - } + ~PODArrayBase() { dealloc(); } + + PODArrayBase() + : is_shared_memory(current_memory_tracker == nullptr) + {} }; template , size_t pad_right_ = 0, size_t pad_left_ = 0> diff --git a/dbms/src/Server/Server.cpp b/dbms/src/Server/Server.cpp index afa972f67c5..ce63627c3a2 100644 --- a/dbms/src/Server/Server.cpp +++ b/dbms/src/Server/Server.cpp @@ -1088,6 +1088,9 @@ int Server::main(const std::vector & /*args*/) LOG_INFO(log, "Background & Blockable Background pool size: {}", settings.background_pool_size); auto & bg_pool = global_context->initializeBackgroundPool(settings.background_pool_size); auto & blockable_bg_pool = global_context->initializeBlockableBackgroundPool(settings.background_pool_size); + initStorageMemoryTracker( + settings.max_memory_usage_for_all_queries, + settings.bytes_that_rss_larger_than_limit); /// PageStorage run mode has been determined above global_context->initializeGlobalStoragePoolIfNeed(global_context->getPathPool()); diff --git a/dbms/src/Storages/DeltaMerge/File/DMFileReader.cpp b/dbms/src/Storages/DeltaMerge/File/DMFileReader.cpp index 92b574ee365..fe3e77a4779 100644 --- a/dbms/src/Storages/DeltaMerge/File/DMFileReader.cpp +++ b/dbms/src/Storages/DeltaMerge/File/DMFileReader.cpp @@ -13,6 +13,7 @@ // limitations under the License. #include +#include #include #include #include @@ -594,8 +595,15 @@ void DMFileReader::readColumn(ColumnDefine & column_define, size_t skip_packs, bool force_seek) { + bool has_concurrent_reader = DMFileReaderPool::instance().hasConcurrentReader(*this); if (!getCachedPacks(column_define.id, start_pack_id, pack_count, read_rows, column)) { + // If there are concurrent read requests, this data is likely to be shared. + // So the allocation and deallocation of this data may not be in the same MemoryTracker. + // This can lead to inaccurate memory statistics of MemoryTracker. + // To solve this problem, we use a independent global memory tracker to trace the shared column data in ColumnSharingCacheMap. + auto mem_tracker_guard + = has_concurrent_reader ? std::make_optional(true, nullptr) : std::nullopt; auto data_type = dmfile->getColumnStat(column_define.id).type; auto col = data_type->createColumn(); readFromDisk(column_define, col, start_pack_id, read_rows, skip_packs, force_seek || last_read_from_cache[column_define.id]); @@ -607,7 +615,7 @@ void DMFileReader::readColumn(ColumnDefine & column_define, last_read_from_cache[column_define.id] = true; } - if (col_data_cache != nullptr) + if (has_concurrent_reader && col_data_cache != nullptr) { DMFileReaderPool::instance().set(*this, column_define.id, start_pack_id, pack_count, column); } diff --git a/dbms/src/Storages/DeltaMerge/ReadThread/ColumnSharingCache.cpp b/dbms/src/Storages/DeltaMerge/ReadThread/ColumnSharingCache.cpp index 3300dbbafb8..f9caa0ccea8 100644 --- a/dbms/src/Storages/DeltaMerge/ReadThread/ColumnSharingCache.cpp +++ b/dbms/src/Storages/DeltaMerge/ReadThread/ColumnSharingCache.cpp @@ -61,6 +61,14 @@ void DMFileReaderPool::set(DMFileReader & from_reader, int64_t col_id, size_t st } } +// Check is there any concurrent DMFileReader with `from_reader`. +bool DMFileReaderPool::hasConcurrentReader(DMFileReader & from_reader) +{ + std::lock_guard lock(mtx); + auto itr = readers.find(from_reader.path()); + return itr != readers.end() && itr->second.size() >= 2; +} + DMFileReader * DMFileReaderPool::get(const std::string & name) { std::lock_guard lock(mtx); diff --git a/dbms/src/Storages/DeltaMerge/ReadThread/ColumnSharingCache.h b/dbms/src/Storages/DeltaMerge/ReadThread/ColumnSharingCache.h index c21d6bc7786..f6c825b2db5 100644 --- a/dbms/src/Storages/DeltaMerge/ReadThread/ColumnSharingCache.h +++ b/dbms/src/Storages/DeltaMerge/ReadThread/ColumnSharingCache.h @@ -224,6 +224,7 @@ class DMFileReaderPool void add(DMFileReader & reader); void del(DMFileReader & reader); void set(DMFileReader & from_reader, int64_t col_id, size_t start, size_t count, ColumnPtr & col); + bool hasConcurrentReader(DMFileReader & from_reader); // `get` is just for test. DMFileReader * get(const std::string & name);