From 257ec4ccb2212be902b360ea824cb052759b63b5 Mon Sep 17 00:00:00 2001 From: JaySon-Huang Date: Wed, 16 Mar 2022 00:19:03 +0800 Subject: [PATCH 01/10] Add tracing_id to snapshot --- dbms/src/Storages/DeltaMerge/DMContext.h | 10 ++++--- .../Storages/DeltaMerge/DeltaMergeStore.cpp | 15 ++++++----- .../src/Storages/DeltaMerge/DeltaMergeStore.h | 3 ++- dbms/src/Storages/DeltaMerge/Segment.cpp | 4 +-- .../tests/gtest_dm_minmax_index.cpp | 3 ++- .../DeltaMerge/tools/workload/DTWorkload.cpp | 3 ++- dbms/src/Storages/SelectQueryInfo.cpp | 27 ++++++++++--------- dbms/src/Storages/SelectQueryInfo.h | 12 ++++++--- dbms/src/Storages/StorageDeltaMerge.cpp | 8 ++++-- 9 files changed, 52 insertions(+), 33 deletions(-) diff --git a/dbms/src/Storages/DeltaMerge/DMContext.h b/dbms/src/Storages/DeltaMerge/DMContext.h index 30ae4c723d5..cb80e9c40e1 100644 --- a/dbms/src/Storages/DeltaMerge/DMContext.h +++ b/dbms/src/Storages/DeltaMerge/DMContext.h @@ -14,6 +14,7 @@ #pragma once +#include #include #include #include @@ -23,6 +24,9 @@ namespace DB { class StoragePathPool; +class LogWithPrefix; +using LogWithPrefixPtr = std::shared_ptr; + namespace DM { @@ -81,7 +85,7 @@ struct DMContext : private boost::noncopyable const bool enable_relevant_place; const bool enable_skippable_place; - const String query_id; + const LogWithPrefixPtr tracing_logger; public: DMContext(const Context & db_context_, @@ -93,7 +97,7 @@ struct DMContext : private boost::noncopyable bool is_common_handle_, size_t rowkey_column_size_, const DB::Settings & settings, - const String & query_id_ = "") + const LogWithPrefixPtr & tracing_logger_ = nullptr) : db_context(db_context_) , path_pool(path_pool_) , storage_pool(storage_pool_) @@ -117,7 +121,7 @@ struct DMContext : private boost::noncopyable , read_stable_only(settings.dt_read_stable_only) , enable_relevant_place(settings.dt_enable_relevant_place) , enable_skippable_place(settings.dt_enable_skippable_place) - , query_id(query_id_) + , tracing_logger(tracing_logger_ ? tracing_logger_ : getLogWithPrefix(nullptr)) { } diff --git a/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp b/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp index 25abdbbb07d..eb1b2984723 100644 --- a/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp +++ b/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp @@ -442,7 +442,7 @@ void DeltaMergeStore::shutdown() LOG_FMT_TRACE(log, "Shutdown DeltaMerge end [{}.{}]", db_name, table_name); } -DMContextPtr DeltaMergeStore::newDMContext(const Context & db_context, const DB::Settings & db_settings, const String & query_id) +DMContextPtr DeltaMergeStore::newDMContext(const Context & db_context, const DB::Settings & db_settings, const LogWithPrefixPtr & tracing_logger) { std::shared_lock lock(read_write_mutex); @@ -458,7 +458,7 @@ DMContextPtr DeltaMergeStore::newDMContext(const Context & db_context, const DB: is_common_handle, rowkey_column_size, db_settings, - query_id); + tracing_logger); return DMContextPtr(ctx); } @@ -1018,7 +1018,7 @@ BlockInputStreams DeltaMergeStore::readRaw(const Context & db_context, { SegmentReadTasks tasks; - auto dm_context = newDMContext(db_context, db_settings, db_context.getCurrentQueryId()); + auto dm_context = newDMContext(db_context, db_settings, getLogWithPrefix(nullptr, db_context.getCurrentQueryId())); { std::shared_lock lock(read_write_mutex); @@ -1073,15 +1073,16 @@ BlockInputStreams DeltaMergeStore::read(const Context & db_context, size_t num_streams, UInt64 max_version, const RSOperatorPtr & filter, + const LogWithPrefixPtr & tracing_logger, size_t expected_block_size, const SegmentIdSet & read_segments, size_t extra_table_id_index) { - auto dm_context = newDMContext(db_context, db_settings, db_context.getCurrentQueryId()); + auto dm_context = newDMContext(db_context, db_settings, tracing_logger); SegmentReadTasks tasks = getReadTasksByRanges(*dm_context, sorted_ranges, num_streams, read_segments); - LOG_FMT_DEBUG(log, "Read create segment snapshot done"); + LOG_FMT_DEBUG(dm_context->tracing_logger, "Read create segment snapshot done"); auto after_segment_read = [&](const DMContextPtr & dm_context_, const SegmentPtr & segment_) { this->checkSegmentUpdate(dm_context_, segment_, ThreadType::Read); @@ -1113,7 +1114,7 @@ BlockInputStreams DeltaMergeStore::read(const Context & db_context, res.push_back(stream); } - LOG_FMT_DEBUG(log, "Read create stream done"); + LOG_FMT_DEBUG(dm_context->tracing_logger, "Read create stream done"); return res; } @@ -2502,7 +2503,7 @@ SegmentReadTasks DeltaMergeStore::getReadTasksByRanges( } LOG_FMT_DEBUG( - log, + dm_context.tracing_logger, "[sorted_ranges: {}] [tasks before split: {}] [tasks final: {}] [ranges final: {}]", sorted_ranges.size(), tasks.size(), diff --git a/dbms/src/Storages/DeltaMerge/DeltaMergeStore.h b/dbms/src/Storages/DeltaMerge/DeltaMergeStore.h index 51e50a255dc..4b53d47431a 100644 --- a/dbms/src/Storages/DeltaMerge/DeltaMergeStore.h +++ b/dbms/src/Storages/DeltaMerge/DeltaMergeStore.h @@ -358,6 +358,7 @@ class DeltaMergeStore : private boost::noncopyable size_t num_streams, UInt64 max_version, const RSOperatorPtr & filter, + const LogWithPrefixPtr & tracing_logger, size_t expected_block_size = DEFAULT_BLOCK_SIZE, const SegmentIdSet & read_segments = {}, size_t extra_table_id_index = InvalidColumnID); @@ -414,7 +415,7 @@ class DeltaMergeStore : private boost::noncopyable private: #endif - DMContextPtr newDMContext(const Context & db_context, const DB::Settings & db_settings, const String & query_id = ""); + DMContextPtr newDMContext(const Context & db_context, const DB::Settings & db_settings, const LogWithPrefixPtr & tracing_logger = nullptr); static bool pkIsHandle(const ColumnDefine & handle_define) { return handle_define.id != EXTRA_HANDLE_COLUMN_ID; } diff --git a/dbms/src/Storages/DeltaMerge/Segment.cpp b/dbms/src/Storages/DeltaMerge/Segment.cpp index a38aa202954..85219d15370 100644 --- a/dbms/src/Storages/DeltaMerge/Segment.cpp +++ b/dbms/src/Storages/DeltaMerge/Segment.cpp @@ -430,10 +430,10 @@ BlockInputStreamPtr Segment::getInputStream(const DMContext & dm_context, columns_to_read, max_version, is_common_handle, - dm_context.query_id); + ""); LOG_FMT_TRACE( - log, + dm_context.tracing_logger, "Segment [{}] is read by max_version: {}, {} ranges: {}", segment_id, max_version, diff --git a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_minmax_index.cpp b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_minmax_index.cpp index bbb8aaf8206..6bc0133fc2a 100644 --- a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_minmax_index.cpp +++ b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_minmax_index.cpp @@ -124,7 +124,8 @@ bool checkMatch( store->mergeDeltaAll(context); const ColumnDefine & col_to_read = check_pk ? getExtraHandleColumnDefine(is_common_handle) : cd; - auto streams = store->read(context, context.getSettingsRef(), {col_to_read}, {all_range}, 1, std::numeric_limits::max(), filter); + auto log = Logger::get(__PRETTY_FUNCTION__); + auto streams = store->read(context, context.getSettingsRef(), {col_to_read}, {all_range}, 1, std::numeric_limits::max(), filter, log); streams[0]->readPrefix(); auto rows = streams[0]->read().rows(); streams[0]->readSuffix(); diff --git a/dbms/src/Storages/DeltaMerge/tools/workload/DTWorkload.cpp b/dbms/src/Storages/DeltaMerge/tools/workload/DTWorkload.cpp index 5f99cbcd33f..cccc26f8dcd 100644 --- a/dbms/src/Storages/DeltaMerge/tools/workload/DTWorkload.cpp +++ b/dbms/src/Storages/DeltaMerge/tools/workload/DTWorkload.cpp @@ -191,7 +191,8 @@ void DTWorkload::read(const ColumnDefines & columns, int stream_count, T func) auto filter = EMPTY_FILTER; int excepted_block_size = 1024; uint64_t read_ts = ts_gen->get(); - auto streams = store->read(*context, context->getSettingsRef(), columns, ranges, stream_count, read_ts, filter, excepted_block_size); + auto log = getLogWithPrefix(nullptr, "DTWorkload"); + auto streams = store->read(*context, context->getSettingsRef(), columns, ranges, stream_count, read_ts, filter, log, excepted_block_size); std::vector threads; threads.reserve(streams.size()); for (auto & stream : streams) diff --git a/dbms/src/Storages/SelectQueryInfo.cpp b/dbms/src/Storages/SelectQueryInfo.cpp index 631f707e21d..516a5f67087 100644 --- a/dbms/src/Storages/SelectQueryInfo.cpp +++ b/dbms/src/Storages/SelectQueryInfo.cpp @@ -12,28 +12,31 @@ // See the License for the specific language governing permissions and // limitations under the License. -#include - #include #include +#include namespace DB { SelectQueryInfo::SelectQueryInfo() = default; -SelectQueryInfo::SelectQueryInfo(const SelectQueryInfo & query_info_) - : query(query_info_.query), - sets(query_info_.sets), - mvcc_query_info(query_info_.mvcc_query_info != nullptr ? std::make_unique(*query_info_.mvcc_query_info) : nullptr), - dag_query(query_info_.dag_query != nullptr ? std::make_unique(*query_info_.dag_query) : nullptr) -{} +SelectQueryInfo::~SelectQueryInfo() = default; -SelectQueryInfo::SelectQueryInfo(SelectQueryInfo && query_info_) - : query(query_info_.query), sets(query_info_.sets), mvcc_query_info(std::move(query_info_.mvcc_query_info)), - dag_query(std::move(query_info_.dag_query)) +SelectQueryInfo::SelectQueryInfo(const SelectQueryInfo & rhs) + : query(rhs.query) + , sets(rhs.sets) + , mvcc_query_info(rhs.mvcc_query_info != nullptr ? std::make_unique(*rhs.mvcc_query_info) : nullptr) + , dag_query(rhs.dag_query != nullptr ? std::make_unique(*rhs.dag_query) : nullptr) + , logger(rhs.logger) {} -SelectQueryInfo::~SelectQueryInfo() = default; +SelectQueryInfo::SelectQueryInfo(SelectQueryInfo && rhs) noexcept + : query(std::move(rhs.query)) + , sets(std::move(rhs.sets)) + , mvcc_query_info(std::move(rhs.mvcc_query_info)) + , dag_query(std::move(rhs.dag_query)) + , logger(std::move(rhs.logger)) +{} } // namespace DB diff --git a/dbms/src/Storages/SelectQueryInfo.h b/dbms/src/Storages/SelectQueryInfo.h index bed607ce0fe..f343720718c 100644 --- a/dbms/src/Storages/SelectQueryInfo.h +++ b/dbms/src/Storages/SelectQueryInfo.h @@ -31,6 +31,8 @@ using PreparedSets = std::unordered_map; struct MvccQueryInfo; struct DAGQueryInfo; +class LogWithPrefix; +using LogWithPrefixPtr = std::shared_ptr; /** Query along with some additional data, * that can be used during query processing @@ -48,14 +50,16 @@ struct SelectQueryInfo std::unique_ptr dag_query; - SelectQueryInfo(); - - SelectQueryInfo(const SelectQueryInfo & query_info_); + LogWithPrefixPtr logger; - SelectQueryInfo(SelectQueryInfo && query_info_); + SelectQueryInfo(); ~SelectQueryInfo(); + // support copying and moving + SelectQueryInfo(const SelectQueryInfo & rhs); + SelectQueryInfo(SelectQueryInfo && rhs) noexcept; + bool fromAST() const { return dag_query == nullptr; }; }; diff --git a/dbms/src/Storages/StorageDeltaMerge.cpp b/dbms/src/Storages/StorageDeltaMerge.cpp index 224500cd69d..f8fe35764e6 100644 --- a/dbms/src/Storages/StorageDeltaMerge.cpp +++ b/dbms/src/Storages/StorageDeltaMerge.cpp @@ -727,6 +727,7 @@ BlockInputStreams StorageDeltaMerge::read( else LOG_FMT_DEBUG(log, "Rough set filter is disabled."); + auto dummy_logger = getLogWithPrefix(nullptr); auto streams = store->read( context, context.getSettingsRef(), @@ -735,6 +736,7 @@ BlockInputStreams StorageDeltaMerge::read( num_streams, /*max_version=*/mvcc_query_info.read_tso, rs_operator, + dummy_logger, max_block_size, parseSegmentSet(select_query.segment_expression_list), extra_table_id_index); @@ -809,7 +811,8 @@ size_t getRows(DM::DeltaMergeStorePtr & store, const Context & context, const DM {range}, 1, std::numeric_limits::max(), - EMPTY_FILTER)[0]; + EMPTY_FILTER, + Logger::get("StorageDeltaMerge", "getRows"))[0]; stream->readPrefix(); Block block; while ((block = stream->read())) @@ -832,7 +835,8 @@ DM::RowKeyRange getRange(DM::DeltaMergeStorePtr & store, const Context & context {DM::RowKeyRange::newAll(store->isCommonHandle(), store->getRowKeyColumnSize())}, 1, std::numeric_limits::max(), - EMPTY_FILTER)[0]; + EMPTY_FILTER, + Logger::get("StorageDeltaMerge", "getRange"))[0]; stream->readPrefix(); Block block; size_t index = 0; From 23f47a4d0ece00cb8a9791362d1a781107938f42 Mon Sep 17 00:00:00 2001 From: JaySon-Huang Date: Tue, 15 Mar 2022 17:15:16 +0800 Subject: [PATCH 02/10] Add more tracing log on the storage layer by prefix logger --- .../Coprocessor/DAGStorageInterpreter.cpp | 1 + dbms/src/Server/tests/gtest_dttool.cpp | 3 +- .../DeltaMerge/ColumnFile/ColumnFileBig.cpp | 2 +- .../DMVersionFilterBlockInputStream.h | 15 +++-- .../DeltaMerge/File/DMFilePackFilter.h | 5 +- .../Storages/DeltaMerge/File/DMFileReader.h | 1 + .../Storages/DeltaMerge/RowKeyRangeUtils.cpp | 7 ++- .../Storages/DeltaMerge/RowKeyRangeUtils.h | 8 ++- dbms/src/Storages/DeltaMerge/Segment.cpp | 2 +- .../Storages/DeltaMerge/SegmentReadTaskPool.h | 2 +- .../Storages/DeltaMerge/StableValueSpace.cpp | 26 ++++---- .../tests/gtest_dm_delta_merge_store.cpp | 60 +++++++++++++++++-- .../DeltaMerge/tools/workload/DTWorkload.cpp | 2 + dbms/src/Storages/SelectQueryInfo.h | 3 + dbms/src/Storages/StorageDeltaMerge.cpp | 23 +++---- dbms/src/Storages/StorageDeltaMergeHelpers.h | 9 ++- 16 files changed, 122 insertions(+), 47 deletions(-) diff --git a/dbms/src/Flash/Coprocessor/DAGStorageInterpreter.cpp b/dbms/src/Flash/Coprocessor/DAGStorageInterpreter.cpp index a7a8ca48ef5..144e0233d3c 100644 --- a/dbms/src/Flash/Coprocessor/DAGStorageInterpreter.cpp +++ b/dbms/src/Flash/Coprocessor/DAGStorageInterpreter.cpp @@ -270,6 +270,7 @@ std::unordered_map DAGStorageInterpreter::generateSele analyzer->getPreparedSets(), analyzer->getCurrentInputColumns(), context.getTimezoneInfo()); + query_info.logger = log; return query_info; }; if (table_scan.isPartitionTableScan()) diff --git a/dbms/src/Server/tests/gtest_dttool.cpp b/dbms/src/Server/tests/gtest_dttool.cpp index 8499f2f863c..f240c6884a4 100644 --- a/dbms/src/Server/tests/gtest_dttool.cpp +++ b/dbms/src/Server/tests/gtest_dttool.cpp @@ -13,6 +13,7 @@ // limitations under the License. #include +#include #include #include #include @@ -287,4 +288,4 @@ TEST_F(DTToolTest, BlockwiseInvariant) EXPECT_EQ(new_prop_iter, refreshed_file->getPackProperties().property().end()); stream->readSuffix(); } -} \ No newline at end of file +} diff --git a/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileBig.cpp b/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileBig.cpp index 8fe9e3ff825..de5c3146418 100644 --- a/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileBig.cpp +++ b/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileBig.cpp @@ -43,7 +43,7 @@ void ColumnFileBig::calculateStat(const DMContext & context) {}, context.db_context.getFileProvider(), context.getReadLimiter(), - /*tracing_logger*/ nullptr); + /*tracing_logger*/ context.tracing_logger); std::tie(valid_rows, valid_bytes) = pack_filter.validRowsAndBytes(); } diff --git a/dbms/src/Storages/DeltaMerge/DMVersionFilterBlockInputStream.h b/dbms/src/Storages/DeltaMerge/DMVersionFilterBlockInputStream.h index 7cf83f08ab3..927b6252fe3 100644 --- a/dbms/src/Storages/DeltaMerge/DMVersionFilterBlockInputStream.h +++ b/dbms/src/Storages/DeltaMerge/DMVersionFilterBlockInputStream.h @@ -15,6 +15,8 @@ #pragma once #include +#include +#include #include #include #include @@ -41,12 +43,11 @@ class DMVersionFilterBlockInputStream : public IBlockInputStream const ColumnDefines & read_columns, UInt64 version_limit_, bool is_common_handle_, - const String & query_id_ = "") + const LogWithPrefixPtr & log_ = nullptr) : version_limit(version_limit_) , is_common_handle(is_common_handle_) , header(toEmptyBlock(read_columns)) - , query_id(query_id_) - , log(&Poco::Logger::get("DMVersionFilterBlockInputStream<" + String(MODE == DM_VERSION_FILTER_MODE_MVCC ? "MVCC" : "COMPACT") + ">")) + , log(getLogWithPrefix(log_, fmt::format("DMVersionFilterBlockInputStream<{}>", (MODE == DM_VERSION_FILTER_MODE_MVCC ? "MVCC" : "COMPACT")))) { children.push_back(input); @@ -60,15 +61,14 @@ class DMVersionFilterBlockInputStream : public IBlockInputStream ~DMVersionFilterBlockInputStream() { LOG_FMT_DEBUG(log, - "Total rows: {}, pass: {:.2f}%, complete pass: {:.2f}%, complete not pass: {:.2f}%, not clean: {:.2f}%, effective: {:.2f}%, read tso: {}, query id: {}", + "Total rows: {}, pass: {:.2f}%, complete pass: {:.2f}%, complete not pass: {:.2f}%, not clean: {:.2f}%, effective: {:.2f}%, read tso: {}", total_rows, passed_rows * 100.0 / total_rows, complete_passed * 100.0 / total_blocks, complete_not_passed * 100.0 / total_blocks, not_clean_rows * 100.0 / passed_rows, effective_num_rows * 100.0 / passed_rows, - version_limit, - (query_id.empty() ? "" : query_id)); + version_limit); } void readPrefix() override; @@ -192,7 +192,6 @@ class DMVersionFilterBlockInputStream : public IBlockInputStream const UInt64 version_limit; const bool is_common_handle; const Block header; - const String query_id; size_t handle_col_pos; size_t version_col_pos; @@ -230,7 +229,7 @@ class DMVersionFilterBlockInputStream : public IBlockInputStream size_t not_clean_rows = 0; size_t effective_num_rows = 0; - Poco::Logger * const log; + const LogWithPrefixPtr log; }; } // namespace DM } // namespace DB diff --git a/dbms/src/Storages/DeltaMerge/File/DMFilePackFilter.h b/dbms/src/Storages/DeltaMerge/File/DMFilePackFilter.h index ee59fb7bc1c..eee26944413 100644 --- a/dbms/src/Storages/DeltaMerge/File/DMFilePackFilter.h +++ b/dbms/src/Storages/DeltaMerge/File/DMFilePackFilter.h @@ -14,6 +14,7 @@ #pragma once +#include #include #include #include @@ -119,7 +120,7 @@ class DMFilePackFilter , file_provider(file_provider_) , handle_res(dmfile->getPacks(), RSResult::All) , use_packs(dmfile->getPacks()) - , log(tracing_logger ? tracing_logger : DB::Logger::get("DMFilePackFilter")) + , log(tracing_logger ? tracing_logger : Logger::get("DMFilePackFilter")) , read_limiter(read_limiter_) { } @@ -299,7 +300,7 @@ class DMFilePackFilter std::vector handle_res; std::vector use_packs; - DB::LoggerPtr log; + LoggerPtr log; ReadLimiterPtr read_limiter; }; diff --git a/dbms/src/Storages/DeltaMerge/File/DMFileReader.h b/dbms/src/Storages/DeltaMerge/File/DMFileReader.h index e844df64cfd..dd754c86518 100644 --- a/dbms/src/Storages/DeltaMerge/File/DMFileReader.h +++ b/dbms/src/Storages/DeltaMerge/File/DMFileReader.h @@ -67,6 +67,7 @@ class DMFileReader using StreamPtr = std::unique_ptr; using ColumnStreams = std::map; + // TODO: Use a builder to replace these params DMFileReader( const DMFilePtr & dmfile_, const ColumnDefines & read_columns_, diff --git a/dbms/src/Storages/DeltaMerge/RowKeyRangeUtils.cpp b/dbms/src/Storages/DeltaMerge/RowKeyRangeUtils.cpp index dc1fae350f9..12c13926503 100644 --- a/dbms/src/Storages/DeltaMerge/RowKeyRangeUtils.cpp +++ b/dbms/src/Storages/DeltaMerge/RowKeyRangeUtils.cpp @@ -12,8 +12,10 @@ // See the License for the specific language governing permissions and // limitations under the License. +#include #include #include +#include namespace DB { @@ -44,7 +46,7 @@ class MergeRangeHelper public: explicit MergeRangeHelper(RowKeyRanges && sorted_ranges_) - : sorted_ranges(std::move(sorted_ranges_)) // + : sorted_ranges(std::move(sorted_ranges_)) { genMergeStats(); } @@ -156,7 +158,7 @@ void sortRangesByStartEdge(RowKeyRanges & ranges) }); } -RowKeyRanges tryMergeRanges(RowKeyRanges && sorted_ranges, size_t expected_ranges_count, Poco::Logger * log) +RowKeyRanges tryMergeRanges(RowKeyRanges && sorted_ranges, size_t expected_ranges_count, const LogWithPrefixPtr & log) { if (sorted_ranges.size() <= 1) return std::move(sorted_ranges); @@ -170,7 +172,6 @@ RowKeyRanges tryMergeRanges(RowKeyRanges && sorted_ranges, size_t expected_range /// Try to make the number of merged_ranges result larger or equal to expected_ranges_count. do_merge_ranges.trySplit(expected_ranges_count); - if (log) LOG_FMT_TRACE(log, "[original ranges: {}] [expected ranges: {}] [after merged ranges: {}] [final ranges: {}]", ori_size, expected_ranges_count, after_merge_count, do_merge_ranges.currentRangesCount()); diff --git a/dbms/src/Storages/DeltaMerge/RowKeyRangeUtils.h b/dbms/src/Storages/DeltaMerge/RowKeyRangeUtils.h index 8331c11993b..67c0a54ec33 100644 --- a/dbms/src/Storages/DeltaMerge/RowKeyRangeUtils.h +++ b/dbms/src/Storages/DeltaMerge/RowKeyRangeUtils.h @@ -15,15 +15,17 @@ #pragma once #include -#include namespace DB { +class LogWithPrefix; +using LogWithPrefixPtr = std::shared_ptr; + namespace DM { void sortRangesByStartEdge(RowKeyRanges & ranges); -RowKeyRanges tryMergeRanges(RowKeyRanges && ranges, size_t expected_ranges_count, Poco::Logger * log = nullptr); +RowKeyRanges tryMergeRanges(RowKeyRanges && ranges, size_t expected_ranges_count, const LogWithPrefixPtr & log = nullptr); } // namespace DM -} // namespace DB \ No newline at end of file +} // namespace DB diff --git a/dbms/src/Storages/DeltaMerge/Segment.cpp b/dbms/src/Storages/DeltaMerge/Segment.cpp index 85219d15370..c62b2391c15 100644 --- a/dbms/src/Storages/DeltaMerge/Segment.cpp +++ b/dbms/src/Storages/DeltaMerge/Segment.cpp @@ -430,7 +430,7 @@ BlockInputStreamPtr Segment::getInputStream(const DMContext & dm_context, columns_to_read, max_version, is_common_handle, - ""); + dm_context.tracing_logger); LOG_FMT_TRACE( dm_context.tracing_logger, diff --git a/dbms/src/Storages/DeltaMerge/SegmentReadTaskPool.h b/dbms/src/Storages/DeltaMerge/SegmentReadTaskPool.h index fa0a1b93b22..dc14716af80 100644 --- a/dbms/src/Storages/DeltaMerge/SegmentReadTaskPool.h +++ b/dbms/src/Storages/DeltaMerge/SegmentReadTaskPool.h @@ -60,7 +60,7 @@ struct SegmentReadTask class SegmentReadTaskPool : private boost::noncopyable { public: - SegmentReadTaskPool(SegmentReadTasks && tasks_) + explicit SegmentReadTaskPool(SegmentReadTasks && tasks_) : tasks(std::move(tasks_)) {} diff --git a/dbms/src/Storages/DeltaMerge/StableValueSpace.cpp b/dbms/src/Storages/DeltaMerge/StableValueSpace.cpp index 01cfdccfc38..05ffa4abb7c 100644 --- a/dbms/src/Storages/DeltaMerge/StableValueSpace.cpp +++ b/dbms/src/Storages/DeltaMerge/StableValueSpace.cpp @@ -60,7 +60,7 @@ void StableValueSpace::setFiles(const DMFiles & files_, const RowKeyRange & rang {}, dm_context->db_context.getFileProvider(), dm_context->getReadLimiter(), - /*tracing_logger*/ nullptr); + dm_context->tracing_logger); auto [file_valid_rows, file_valid_bytes] = pack_filter.validRowsAndBytes(); rows += file_valid_rows; bytes += file_valid_bytes; @@ -184,7 +184,7 @@ void StableValueSpace::calculateStableProperty(const DMContext & context, const const auto & pack_properties = file->getPackProperties(); if (pack_stats.empty()) continue; - // if PackPropertys of this DMFile is empty, this must be an old format file generated by previous version. + // if PackProperties of this DMFile is empty, this must be an old format file generated by previous version. // so we need to create file property for this file. // but to keep dmfile immutable, we just cache the result in memory. // @@ -243,7 +243,7 @@ void StableValueSpace::calculateStableProperty(const DMContext & context, const {}, context.db_context.getFileProvider(), context.getReadLimiter(), - /*tracing_logger*/ nullptr); + context.tracing_logger); const auto & use_packs = pack_filter.getUsePacks(); size_t new_pack_properties_index = 0; bool use_new_pack_properties = pack_properties.property_size() == 0; @@ -257,7 +257,7 @@ void StableValueSpace::calculateStableProperty(const DMContext & context, const } if (unlikely((size_t)new_pack_properties.property_size() != use_packs_count)) { - throw Exception("new_pack_propertys size " + std::to_string(new_pack_properties.property_size()) + throw Exception("new_pack_properties size " + std::to_string(new_pack_properties.property_size()) + " doesn't match use packs size " + std::to_string(use_packs_count), ErrorCodes::LOGICAL_ERROR); } @@ -319,13 +319,15 @@ void StableValueSpace::drop(const FileProviderPtr & file_provider) } } -SkippableBlockInputStreamPtr StableValueSpace::Snapshot::getInputStream(const DMContext & context, // - const ColumnDefines & read_columns, - const RowKeyRanges & rowkey_ranges, - const RSOperatorPtr & filter, - UInt64 max_data_version, - size_t expected_block_size, - bool enable_clean_read) +SkippableBlockInputStreamPtr +StableValueSpace::Snapshot::getInputStream( + const DMContext & context, + const ColumnDefines & read_columns, + const RowKeyRanges & rowkey_ranges, + const RSOperatorPtr & filter, + UInt64 max_data_version, + size_t expected_block_size, + bool enable_clean_read) { LOG_FMT_DEBUG(log, "max_data_version: {}, enable_clean_read: {}", max_data_version, enable_clean_read); SkippableBlockInputStreams streams; @@ -366,7 +368,7 @@ RowsAndBytes StableValueSpace::Snapshot::getApproxRowsAndBytes(const DMContext & IdSetPtr{}, context.db_context.getFileProvider(), context.getReadLimiter(), - /*tracing_logger*/ nullptr); + context.tracing_logger); const auto & pack_stats = f->getPackStats(); const auto & use_packs = filter.getUsePacks(); for (size_t i = 0; i < pack_stats.size(); ++i) diff --git a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_delta_merge_store.cpp b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_delta_merge_store.cpp index 9ebe1cc7bb7..53ca6bd36fb 100644 --- a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_delta_merge_store.cpp +++ b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_delta_merge_store.cpp @@ -12,7 +12,9 @@ // See the License for the specific language governing permissions and // limitations under the License. +#include #include +#include #include #include #include @@ -149,6 +151,8 @@ class DeltaMergeStoreRWTest TiFlashStorageTestBasic::SetUp(); store = reload(); + + log = getLogWithPrefix(nullptr, "DeltaMergeStoreRWTest"); } DeltaMergeStorePtr @@ -206,6 +210,8 @@ class DeltaMergeStoreRWTest protected: TestMode mode; DeltaMergeStorePtr store; + + LogWithPrefixPtr log; }; TEST_F(DeltaMergeStoreTest, Create) @@ -414,6 +420,7 @@ try /* num_streams= */ 1, /* max_version= */ std::numeric_limits::max(), EMPTY_FILTER, + log, /* expected_block_size= */ 1024)[0]; size_t num_rows_read = 0; @@ -519,6 +526,7 @@ try /* num_streams= */ 1, /* max_version= */ std::numeric_limits::max(), EMPTY_FILTER, + log, /* expected_block_size= */ 1024)[0]; size_t num_rows_read = 0; while (Block block = in->read()) @@ -555,6 +563,7 @@ try /* num_streams= */ 1, /* max_version= */ std::numeric_limits::max(), EMPTY_FILTER, + log, /* expected_block_size= */ 1024)[0]; size_t num_rows_read = 0; while (Block block = in->read()) @@ -639,6 +648,7 @@ try /* num_streams= */ 1, /* max_version= */ std::numeric_limits::max(), EMPTY_FILTER, + log, /* expected_block_size= */ 1024)[0]; size_t num_rows_read = 0; while (Block block = in->read()) @@ -723,6 +733,7 @@ try /* num_streams= */ 1, /* max_version= */ std::numeric_limits::max(), EMPTY_FILTER, + log, /* expected_block_size= */ 1024)[0]; size_t num_rows_read = 0; while (Block block = in->read()) @@ -753,6 +764,7 @@ try /* num_streams= */ 1, /* max_version= */ UInt64(1), EMPTY_FILTER, + log, /* expected_block_size= */ 1024)[0]; size_t num_rows_read = 0; while (Block block = in->read()) @@ -807,6 +819,7 @@ try /* num_streams= */ 1, /* max_version= */ std::numeric_limits::max(), EMPTY_FILTER, + log, /* expected_block_size= */ 1024)[0]; size_t num_rows_read = 0; while (Block block = in->read()) @@ -845,6 +858,7 @@ try /* num_streams= */ 1, /* max_version= */ std::numeric_limits::max(), EMPTY_FILTER, + log, /* expected_block_size= */ 1024)[0]; size_t num_rows_read = 0; // block_num represents index of current segment @@ -903,6 +917,7 @@ try /* num_streams= */ 1, /* max_version= */ std::numeric_limits::max(), EMPTY_FILTER, + log, /* expected_block_size= */ 1024); ASSERT_EQ(ins.size(), 1UL); BlockInputStreamPtr in = ins[0]; @@ -925,6 +940,7 @@ try /* num_streams= */ 1, /* max_version= */ tso2, EMPTY_FILTER, + log, /* expected_block_size= */ 1024); ASSERT_EQ(ins.size(), 1UL); BlockInputStreamPtr in = ins[0]; @@ -947,6 +963,7 @@ try /* num_streams= */ 1, /* max_version= */ tso1, EMPTY_FILTER, + log, /* expected_block_size= */ 1024); ASSERT_EQ(ins.size(), 1UL); BlockInputStreamPtr in = ins[0]; @@ -969,6 +986,7 @@ try /* num_streams= */ 1, /* max_version= */ tso1 - 1, EMPTY_FILTER, + log, /* expected_block_size= */ 1024); ASSERT_EQ(ins.size(), 1UL); BlockInputStreamPtr in = ins[0]; @@ -1029,6 +1047,7 @@ try /* num_streams= */ 1, /* max_version= */ tso1, EMPTY_FILTER, + log, /* expected_block_size= */ 1024); ASSERT_EQ(ins.size(), 1UL); BlockInputStreamPtr in = ins[0]; @@ -1065,6 +1084,7 @@ try /* num_streams= */ 1, /* max_version= */ tso2 - 1, EMPTY_FILTER, + log, /* expected_block_size= */ 1024); ASSERT_EQ(ins.size(), 1UL); BlockInputStreamPtr in = ins[0]; @@ -1102,6 +1122,7 @@ try /* num_streams= */ 1, /* max_version= */ tso3 - 1, EMPTY_FILTER, + log, /* expected_block_size= */ 1024); ASSERT_EQ(ins.size(), 1UL); BlockInputStreamPtr in = ins[0]; @@ -1126,6 +1147,7 @@ try /* num_streams= */ 1, /* max_version= */ std::numeric_limits::max(), EMPTY_FILTER, + log, /* expected_block_size= */ 1024); ASSERT_EQ(ins.size(), 1UL); BlockInputStreamPtr in = ins[0]; @@ -1150,6 +1172,7 @@ try /* num_streams= */ 1, /* max_version= */ std::numeric_limits::max(), EMPTY_FILTER, + log, /* expected_block_size= */ 1024); ASSERT_EQ(ins.size(), 1UL); BlockInputStreamPtr in = ins[0]; @@ -1210,6 +1233,7 @@ try /* num_streams= */ 1, /* max_version= */ tso1, EMPTY_FILTER, + log, /* expected_block_size= */ 1024); ASSERT_EQ(ins.size(), 1); BlockInputStreamPtr in = ins[0]; @@ -1246,6 +1270,7 @@ try /* num_streams= */ 1, /* max_version= */ tso2 - 1, EMPTY_FILTER, + log, /* expected_block_size= */ 1024); ASSERT_EQ(ins.size(), 1); BlockInputStreamPtr in = ins[0]; @@ -1283,6 +1308,7 @@ try /* num_streams= */ 1, /* max_version= */ std::numeric_limits::max(), EMPTY_FILTER, + log, /* expected_block_size= */ 1024); ASSERT_EQ(ins.size(), 1); BlockInputStreamPtr in = ins[0]; @@ -1338,6 +1364,7 @@ try /* num_streams= */ 1, /* max_version= */ tso1, EMPTY_FILTER, + log, /* expected_block_size= */ 1024); ASSERT_EQ(ins.size(), 1); BlockInputStreamPtr in = ins[0]; @@ -1374,6 +1401,7 @@ try /* num_streams= */ 1, /* max_version= */ std::numeric_limits::max(), EMPTY_FILTER, + log, /* expected_block_size= */ 1024); ASSERT_EQ(ins.size(), 1); BlockInputStreamPtr in = ins[0]; @@ -1459,6 +1487,7 @@ try /* num_streams= */ 1, /* max_version= */ std::numeric_limits::max(), EMPTY_FILTER, + log, /* expected_block_size= */ 1024); ASSERT_EQ(ins.size(), 1UL); BlockInputStreamPtr in = ins[0]; @@ -1566,6 +1595,7 @@ try /* num_streams= */ 1, /* max_version= */ std::numeric_limits::max(), EMPTY_FILTER, + log, /* expected_block_size= */ 1024); ASSERT_EQ(ins.size(), 1UL); BlockInputStreamPtr & in = ins[0]; @@ -1672,6 +1702,7 @@ try /* num_streams= */ 1, /* max_version= */ std::numeric_limits::max(), EMPTY_FILTER, + log, /* expected_block_size= */ 1024); ASSERT_EQ(ins.size(), 1UL); BlockInputStreamPtr & in = ins[0]; @@ -1759,6 +1790,7 @@ try /* num_streams= */ 1, /* max_version= */ std::numeric_limits::max(), EMPTY_FILTER, + log, /* expected_block_size= */ 1024); ASSERT_EQ(ins.size(), 1UL); BlockInputStreamPtr & in = ins[0]; @@ -1861,6 +1893,7 @@ try /* num_streams= */ 1, /* max_version= */ std::numeric_limits::max(), EMPTY_FILTER, + log, /* expected_block_size= */ 1024)[0]; in->readPrefix(); @@ -1936,6 +1969,7 @@ try /* num_streams= */ 1, /* max_version= */ std::numeric_limits::max(), EMPTY_FILTER, + log, /* expected_block_size= */ 1024)[0]; in->readPrefix(); @@ -2011,6 +2045,7 @@ try /* num_streams= */ 1, /* max_version= */ std::numeric_limits::max(), EMPTY_FILTER, + log, /* expected_block_size= */ 1024)[0]; in->readPrefix(); @@ -2086,6 +2121,7 @@ try /* num_streams= */ 1, /* max_version= */ std::numeric_limits::max(), EMPTY_FILTER, + log, /* expected_block_size= */ 1024)[0]; in->readPrefix(); @@ -2161,6 +2197,7 @@ try /* num_streams= */ 1, /* max_version= */ std::numeric_limits::max(), EMPTY_FILTER, + log, /* expected_block_size= */ 1024)[0]; in->readPrefix(); @@ -2234,6 +2271,7 @@ try /* num_streams= */ 1, /* max_version= */ std::numeric_limits::max(), EMPTY_FILTER, + log, /* expected_block_size= */ 1024)[0]; size_t num_rows_read = 0; @@ -2306,6 +2344,7 @@ try /* num_streams= */ 1, /* max_version= */ std::numeric_limits::max(), EMPTY_FILTER, + log, /* expected_block_size= */ 1024)[0]; in->readPrefix(); @@ -2395,6 +2434,7 @@ try /* num_streams= */ 1, /* max_version= */ std::numeric_limits::max(), EMPTY_FILTER, + log, /* expected_block_size= */ 1024); ASSERT_EQ(ins.size(), 1UL); BlockInputStreamPtr & in = ins[0]; @@ -2528,6 +2568,7 @@ try /* num_streams= */ 1, /* max_version= */ std::numeric_limits::max(), EMPTY_FILTER, + log, /* expected_block_size= */ 1024); ASSERT_EQ(ins.size(), 1UL); BlockInputStreamPtr & in = ins[0]; @@ -2588,6 +2629,7 @@ try /* num_streams= */ 1, /* max_version= */ std::numeric_limits::max(), EMPTY_FILTER, + log, /* expected_block_size= */ 1024); ASSERT_EQ(ins.size(), 1UL); BlockInputStreamPtr & in = ins[0]; @@ -2686,6 +2728,7 @@ try /* num_streams= */ 1, /* max_version= */ std::numeric_limits::max(), EMPTY_FILTER, + log, /* expected_block_size= */ 1024)[0]; in->readPrefix(); @@ -2738,6 +2781,7 @@ try /* num_streams= */ 1, /* max_version= */ std::numeric_limits::max(), EMPTY_FILTER, + log, /* expected_block_size= */ 1024)[0]; in->readPrefix(); @@ -2853,6 +2897,7 @@ try /* num_streams= */ 1, /* max_version= */ std::numeric_limits::max(), EMPTY_FILTER, + log, /* expected_block_size= */ 1024)[0]; size_t num_rows_read = 0; @@ -2981,6 +3026,7 @@ try /* num_streams= */ 1, /* max_version= */ std::numeric_limits::max(), EMPTY_FILTER, + log, /* expected_block_size= */ 1024)[0]; size_t num_rows_read = 0; while (Block block = in->read()) @@ -3054,6 +3100,7 @@ try /* num_streams= */ 1, /* max_version= */ std::numeric_limits::max(), EMPTY_FILTER, + log, /* expected_block_size= */ 1024)[0]; size_t num_rows_read = 0; while (Block block = in->read()) @@ -3084,6 +3131,7 @@ try /* num_streams= */ 1, /* max_version= */ UInt64(1), EMPTY_FILTER, + log, /* expected_block_size= */ 1024)[0]; size_t num_rows_read = 0; while (Block block = in->read()) @@ -3141,6 +3189,7 @@ try /* num_streams= */ 1, /* max_version= */ std::numeric_limits::max(), EMPTY_FILTER, + log, /* expected_block_size= */ 1024)[0]; size_t num_rows_read = 0; while (Block block = in->read()) @@ -3190,6 +3239,7 @@ try /* num_streams= */ 1, /* max_version= */ std::numeric_limits::max(), EMPTY_FILTER, + log, /* expected_block_size= */ 1024)[0]; size_t num_rows_read = 0; while (Block block = in->read()) @@ -3262,6 +3312,7 @@ try /* num_streams= */ 1, /* max_version= */ std::numeric_limits::max(), EMPTY_FILTER, + log, /* expected_block_size= */ 1024); ASSERT_EQ(ins.size(), 1UL); BlockInputStreamPtr in = ins[0]; @@ -3305,10 +3356,11 @@ try } CATCH -INSTANTIATE_TEST_CASE_P(TestMode, // - DeltaMergeStoreRWTest, - testing::Values(TestMode::V1_BlockOnly, TestMode::V2_BlockOnly, TestMode::V2_FileOnly, TestMode::V2_Mix), - testModeToString); +INSTANTIATE_TEST_CASE_P( + TestMode, + DeltaMergeStoreRWTest, + testing::Values(TestMode::V1_BlockOnly, TestMode::V2_BlockOnly, TestMode::V2_FileOnly, TestMode::V2_Mix), + testModeToString); } // namespace tests } // namespace DM diff --git a/dbms/src/Storages/DeltaMerge/tools/workload/DTWorkload.cpp b/dbms/src/Storages/DeltaMerge/tools/workload/DTWorkload.cpp index cccc26f8dcd..43a9f95cde8 100644 --- a/dbms/src/Storages/DeltaMerge/tools/workload/DTWorkload.cpp +++ b/dbms/src/Storages/DeltaMerge/tools/workload/DTWorkload.cpp @@ -32,6 +32,8 @@ #include #include +#include "Common/LogWithPrefix.h" + namespace DB::DM::tests { DB::Settings createSettings(const WorkloadOptions & opts) diff --git a/dbms/src/Storages/SelectQueryInfo.h b/dbms/src/Storages/SelectQueryInfo.h index f343720718c..d43c3f8b6d0 100644 --- a/dbms/src/Storages/SelectQueryInfo.h +++ b/dbms/src/Storages/SelectQueryInfo.h @@ -34,6 +34,9 @@ struct DAGQueryInfo; class LogWithPrefix; using LogWithPrefixPtr = std::shared_ptr; +class LogWithPrefix; +using LogWithPrefixPtr = std::shared_ptr; + /** Query along with some additional data, * that can be used during query processing * inside storage engines. diff --git a/dbms/src/Storages/StorageDeltaMerge.cpp b/dbms/src/Storages/StorageDeltaMerge.cpp index f8fe35764e6..f18f11be244 100644 --- a/dbms/src/Storages/StorageDeltaMerge.cpp +++ b/dbms/src/Storages/StorageDeltaMerge.cpp @@ -14,6 +14,7 @@ #include #include +#include #include #include #include @@ -626,8 +627,9 @@ BlockInputStreams StorageDeltaMerge::read( throw Exception("TMTContext is not initialized", ErrorCodes::LOGICAL_ERROR); const auto & mvcc_query_info = *query_info.mvcc_query_info; + auto tracing_logger = query_info.logger; - LOG_FMT_DEBUG(log, "Read with tso: {}", mvcc_query_info.read_tso); + LOG_FMT_DEBUG(tracing_logger, "Read with tso: {}", mvcc_query_info.read_tso); // Check whether tso is smaller than TiDB GcSafePoint const auto check_read_tso = [&tmt, &context, this](UInt64 read_tso) { @@ -652,7 +654,7 @@ BlockInputStreams StorageDeltaMerge::read( check_read_tso(mvcc_query_info.read_tso); FmtBuffer fmt_buf; - if (unlikely(log->trace())) + if (unlikely(tracing_logger->trace())) { fmt_buf.append("orig, "); fmt_buf.joinStr( @@ -685,9 +687,9 @@ BlockInputStreams StorageDeltaMerge::read( is_common_handle, rowkey_column_size, /*expected_ranges_count*/ num_streams, - log); + tracing_logger); - if (unlikely(log->trace())) + if (unlikely(tracing_logger->trace())) { fmt_buf.append(" merged, "); fmt_buf.joinStr( @@ -697,7 +699,7 @@ BlockInputStreams StorageDeltaMerge::read( fb.append(range.toDebugString()); }, ","); - LOG_FMT_TRACE(log, "reading ranges: {}", fmt_buf.toString()); + LOG_FMT_TRACE(tracing_logger, "reading ranges: {}", fmt_buf.toString()); } /// Get Rough set filter from query @@ -722,12 +724,11 @@ BlockInputStreams StorageDeltaMerge::read( rs_operator = FilterParser::parseDAGQuery(*query_info.dag_query, columns_to_read, std::move(create_attr_by_column_id), log); } if (likely(rs_operator != DM::EMPTY_FILTER)) - LOG_FMT_DEBUG(log, "Rough set filter: {}", rs_operator->toDebugString()); + LOG_FMT_DEBUG(tracing_logger, "Rough set filter: {}", rs_operator->toDebugString()); } else - LOG_FMT_DEBUG(log, "Rough set filter is disabled."); + LOG_FMT_DEBUG(tracing_logger, "Rough set filter is disabled."); - auto dummy_logger = getLogWithPrefix(nullptr); auto streams = store->read( context, context.getSettingsRef(), @@ -736,7 +737,7 @@ BlockInputStreams StorageDeltaMerge::read( num_streams, /*max_version=*/mvcc_query_info.read_tso, rs_operator, - dummy_logger, + tracing_logger, max_block_size, parseSegmentSet(select_query.segment_expression_list), extra_table_id_index); @@ -744,7 +745,7 @@ BlockInputStreams StorageDeltaMerge::read( /// Ensure read_tso info after read. check_read_tso(mvcc_query_info.read_tso); - LOG_FMT_TRACE(log, "[ranges: {}] [streams: {}]", ranges.size(), streams.size()); + LOG_FMT_TRACE(tracing_logger, "[ranges: {}] [streams: {}]", ranges.size(), streams.size()); return streams; } @@ -799,6 +800,7 @@ UInt64 StorageDeltaMerge::onSyncGc(Int64 limit) return 0; } +// just for testing size_t getRows(DM::DeltaMergeStorePtr & store, const Context & context, const DM::RowKeyRange & range) { size_t rows = 0; @@ -822,6 +824,7 @@ size_t getRows(DM::DeltaMergeStorePtr & store, const Context & context, const DM return rows; } +// just for testing DM::RowKeyRange getRange(DM::DeltaMergeStorePtr & store, const Context & context, size_t total_rows, size_t delete_rows) { auto start_index = rand() % (total_rows - delete_rows + 1); // NOLINT(cert-msc50-cpp) diff --git a/dbms/src/Storages/StorageDeltaMergeHelpers.h b/dbms/src/Storages/StorageDeltaMergeHelpers.h index 96b3424b836..14ab39dc502 100644 --- a/dbms/src/Storages/StorageDeltaMergeHelpers.h +++ b/dbms/src/Storages/StorageDeltaMergeHelpers.h @@ -14,6 +14,7 @@ #pragma once +#include #include #include #include @@ -30,7 +31,13 @@ namespace ErrorCodes extern const int LOGICAL_ERROR; } -inline DM::RowKeyRanges getQueryRanges(const DB::MvccQueryInfo::RegionsQueryInfo & regions, TableID table_id, bool is_common_handle, size_t rowkey_column_size, size_t expected_ranges_count = 1, Poco::Logger * log = nullptr) +inline DM::RowKeyRanges getQueryRanges( + const DB::MvccQueryInfo::RegionsQueryInfo & regions, + TableID table_id, + bool is_common_handle, + size_t rowkey_column_size, + size_t expected_ranges_count = 1, + const LogWithPrefixPtr & log = nullptr) { // todo check table id in DecodedTiKVKey??? DM::RowKeyRanges ranges; From cd5677b7351926cacc428fc3c99dd4246109924b Mon Sep 17 00:00:00 2001 From: JaySon-Huang Date: Tue, 15 Mar 2022 20:53:13 +0800 Subject: [PATCH 03/10] Add to system table --- .../DeltaMerge/tools/workload/DTWorkload.cpp | 2 -- .../Storages/System/StorageSystemDTTables.cpp | 18 +++++++++++++----- 2 files changed, 13 insertions(+), 7 deletions(-) diff --git a/dbms/src/Storages/DeltaMerge/tools/workload/DTWorkload.cpp b/dbms/src/Storages/DeltaMerge/tools/workload/DTWorkload.cpp index 43a9f95cde8..cccc26f8dcd 100644 --- a/dbms/src/Storages/DeltaMerge/tools/workload/DTWorkload.cpp +++ b/dbms/src/Storages/DeltaMerge/tools/workload/DTWorkload.cpp @@ -32,8 +32,6 @@ #include #include -#include "Common/LogWithPrefix.h" - namespace DB::DM::tests { DB::Settings createSettings(const WorkloadOptions & opts) diff --git a/dbms/src/Storages/System/StorageSystemDTTables.cpp b/dbms/src/Storages/System/StorageSystemDTTables.cpp index 9523aff1f44..b3f9cf5b29e 100644 --- a/dbms/src/Storages/System/StorageSystemDTTables.cpp +++ b/dbms/src/Storages/System/StorageSystemDTTables.cpp @@ -28,7 +28,8 @@ namespace DB { -StorageSystemDTTables::StorageSystemDTTables(const std::string & name_) : name(name_) +StorageSystemDTTables::StorageSystemDTTables(const std::string & name_) + : name(name_) { setColumns(ColumnsDescription({ {"database", std::make_shared()}, @@ -85,6 +86,7 @@ StorageSystemDTTables::StorageSystemDTTables(const std::string & name_) : name(n {"storage_stable_num_snapshots", std::make_shared()}, {"storage_stable_oldest_snapshot_lifetime", std::make_shared()}, {"storage_stable_oldest_snapshot_thread_id", std::make_shared()}, + {"storage_stable_oldest_snapshot_tracing_id", std::make_shared()}, {"storage_stable_num_pages", std::make_shared()}, {"storage_stable_num_normal_pages", std::make_shared()}, {"storage_stable_max_page_id", std::make_shared()}, @@ -92,6 +94,7 @@ StorageSystemDTTables::StorageSystemDTTables(const std::string & name_) : name(n {"storage_delta_num_snapshots", std::make_shared()}, {"storage_delta_oldest_snapshot_lifetime", std::make_shared()}, {"storage_delta_oldest_snapshot_thread_id", std::make_shared()}, + {"storage_delta_oldest_snapshot_tracing_id", std::make_shared()}, {"storage_delta_num_pages", std::make_shared()}, {"storage_delta_num_normal_pages", std::make_shared()}, {"storage_delta_max_page_id", std::make_shared()}, @@ -99,6 +102,7 @@ StorageSystemDTTables::StorageSystemDTTables(const std::string & name_) : name(n {"storage_meta_num_snapshots", std::make_shared()}, {"storage_meta_oldest_snapshot_lifetime", std::make_shared()}, {"storage_meta_oldest_snapshot_thread_id", std::make_shared()}, + {"storage_meta_oldest_snapshot_tracing_id", std::make_shared()}, {"storage_meta_num_pages", std::make_shared()}, {"storage_meta_num_normal_pages", std::make_shared()}, {"storage_meta_max_page_id", std::make_shared()}, @@ -108,7 +112,8 @@ StorageSystemDTTables::StorageSystemDTTables(const std::string & name_) : name(n } -BlockInputStreams StorageSystemDTTables::read(const Names & column_names, +BlockInputStreams StorageSystemDTTables::read( + const Names & column_names, const SelectQueryInfo &, const Context & context, QueryProcessingStage::Enum & processed_stage, @@ -126,19 +131,19 @@ BlockInputStreams StorageSystemDTTables::read(const Names & column_names, for (const auto & d : databases) { String database_name = d.first; - auto & database = d.second; + const auto & database = d.second; const DatabaseTiFlash * db_tiflash = typeid_cast(database.get()); auto it = database->getIterator(context); for (; it->isValid(); it->next()) { - auto & table_name = it->name(); + const auto & table_name = it->name(); auto & storage = it->table(); if (storage->getName() != MutableSupport::delta_tree_storage_name) continue; auto dm_storage = std::dynamic_pointer_cast(storage); - auto & table_info = dm_storage->getTableInfo(); + const auto & table_info = dm_storage->getTableInfo(); auto table_id = table_info.id; auto stat = dm_storage->getStore()->getStat(); @@ -201,6 +206,7 @@ BlockInputStreams StorageSystemDTTables::read(const Names & column_names, res_columns[j++]->insert(stat.storage_stable_num_snapshots); res_columns[j++]->insert(stat.storage_stable_oldest_snapshot_lifetime); res_columns[j++]->insert(stat.storage_stable_oldest_snapshot_thread_id); + res_columns[j++]->insert(stat.storage_stable_oldest_snapshot_tracing_id); res_columns[j++]->insert(stat.storage_stable_num_pages); res_columns[j++]->insert(stat.storage_stable_num_normal_pages); res_columns[j++]->insert(stat.storage_stable_max_page_id); @@ -208,6 +214,7 @@ BlockInputStreams StorageSystemDTTables::read(const Names & column_names, res_columns[j++]->insert(stat.storage_delta_num_snapshots); res_columns[j++]->insert(stat.storage_delta_oldest_snapshot_lifetime); res_columns[j++]->insert(stat.storage_delta_oldest_snapshot_thread_id); + res_columns[j++]->insert(stat.storage_delta_oldest_snapshot_tracing_id); res_columns[j++]->insert(stat.storage_delta_num_pages); res_columns[j++]->insert(stat.storage_delta_num_normal_pages); res_columns[j++]->insert(stat.storage_delta_max_page_id); @@ -215,6 +222,7 @@ BlockInputStreams StorageSystemDTTables::read(const Names & column_names, res_columns[j++]->insert(stat.storage_meta_num_snapshots); res_columns[j++]->insert(stat.storage_meta_oldest_snapshot_lifetime); res_columns[j++]->insert(stat.storage_meta_oldest_snapshot_thread_id); + res_columns[j++]->insert(stat.storage_meta_oldest_snapshot_tracing_id); res_columns[j++]->insert(stat.storage_meta_num_pages); res_columns[j++]->insert(stat.storage_meta_num_normal_pages); res_columns[j++]->insert(stat.storage_meta_max_page_id); From 422232b7cbddaab996eb312b6b16130de7e5958f Mon Sep 17 00:00:00 2001 From: JaySon-Huang Date: Tue, 29 Mar 2022 13:09:09 +0800 Subject: [PATCH 04/10] Fix compile error --- .../Coprocessor/DAGStorageInterpreter.cpp | 2 +- dbms/src/Server/tests/gtest_dttool.cpp | 2 +- .../DeltaMerge/ColumnFile/ColumnFileBig.cpp | 2 +- dbms/src/Storages/DeltaMerge/DMContext.h | 10 +- .../DMVersionFilterBlockInputStream.h | 10 +- .../Storages/DeltaMerge/DeltaMergeStore.cpp | 20 ++-- .../src/Storages/DeltaMerge/DeltaMergeStore.h | 4 +- .../File/DMFileBlockInputStream.cpp | 4 +- .../DeltaMerge/File/DMFileBlockInputStream.h | 6 +- .../DeltaMerge/File/DMFilePackFilter.h | 8 +- .../Storages/DeltaMerge/File/DMFileReader.cpp | 4 +- .../Storages/DeltaMerge/File/DMFileReader.h | 5 +- .../DeltaMerge/FilterParser/FilterParser.cpp | 8 +- .../DeltaMerge/FilterParser/FilterParser.h | 2 +- .../Storages/DeltaMerge/RowKeyRangeUtils.cpp | 4 +- .../Storages/DeltaMerge/RowKeyRangeUtils.h | 6 +- dbms/src/Storages/DeltaMerge/Segment.cpp | 6 +- .../Storages/DeltaMerge/StableValueSpace.cpp | 7 +- .../tests/gtest_dm_delta_merge_store.cpp | 96 +++++++++---------- .../tests/gtest_dm_minmax_index.cpp | 3 +- .../DeltaMerge/tools/workload/DTWorkload.cpp | 3 +- dbms/src/Storages/SelectQueryInfo.cpp | 4 +- dbms/src/Storages/SelectQueryInfo.h | 7 +- dbms/src/Storages/StorageDeltaMerge.cpp | 12 +-- dbms/src/Storages/StorageDeltaMerge.h | 2 +- dbms/src/Storages/StorageDeltaMergeHelpers.h | 4 +- .../Storages/tests/gtest_filter_parser.cpp | 6 +- 27 files changed, 123 insertions(+), 124 deletions(-) diff --git a/dbms/src/Flash/Coprocessor/DAGStorageInterpreter.cpp b/dbms/src/Flash/Coprocessor/DAGStorageInterpreter.cpp index 144e0233d3c..31f36ef19a4 100644 --- a/dbms/src/Flash/Coprocessor/DAGStorageInterpreter.cpp +++ b/dbms/src/Flash/Coprocessor/DAGStorageInterpreter.cpp @@ -270,7 +270,7 @@ std::unordered_map DAGStorageInterpreter::generateSele analyzer->getPreparedSets(), analyzer->getCurrentInputColumns(), context.getTimezoneInfo()); - query_info.logger = log; + query_info.req_id = log->identifier(); return query_info; }; if (table_scan.isPartitionTableScan()) diff --git a/dbms/src/Server/tests/gtest_dttool.cpp b/dbms/src/Server/tests/gtest_dttool.cpp index f240c6884a4..4ea163f0507 100644 --- a/dbms/src/Server/tests/gtest_dttool.cpp +++ b/dbms/src/Server/tests/gtest_dttool.cpp @@ -13,7 +13,7 @@ // limitations under the License. #include -#include +#include #include #include #include diff --git a/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileBig.cpp b/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileBig.cpp index de5c3146418..c6a37bbb7be 100644 --- a/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileBig.cpp +++ b/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileBig.cpp @@ -43,7 +43,7 @@ void ColumnFileBig::calculateStat(const DMContext & context) {}, context.db_context.getFileProvider(), context.getReadLimiter(), - /*tracing_logger*/ context.tracing_logger); + /*tracing_id*/ context.tracing_id); std::tie(valid_rows, valid_bytes) = pack_filter.validRowsAndBytes(); } diff --git a/dbms/src/Storages/DeltaMerge/DMContext.h b/dbms/src/Storages/DeltaMerge/DMContext.h index cb80e9c40e1..cc8775703dc 100644 --- a/dbms/src/Storages/DeltaMerge/DMContext.h +++ b/dbms/src/Storages/DeltaMerge/DMContext.h @@ -14,7 +14,7 @@ #pragma once -#include +#include #include #include #include @@ -24,8 +24,6 @@ namespace DB { class StoragePathPool; -class LogWithPrefix; -using LogWithPrefixPtr = std::shared_ptr; namespace DM @@ -85,7 +83,7 @@ struct DMContext : private boost::noncopyable const bool enable_relevant_place; const bool enable_skippable_place; - const LogWithPrefixPtr tracing_logger; + const String tracing_id; public: DMContext(const Context & db_context_, @@ -97,7 +95,7 @@ struct DMContext : private boost::noncopyable bool is_common_handle_, size_t rowkey_column_size_, const DB::Settings & settings, - const LogWithPrefixPtr & tracing_logger_ = nullptr) + const String & tracing_id_ = "") : db_context(db_context_) , path_pool(path_pool_) , storage_pool(storage_pool_) @@ -121,7 +119,7 @@ struct DMContext : private boost::noncopyable , read_stable_only(settings.dt_read_stable_only) , enable_relevant_place(settings.dt_enable_relevant_place) , enable_skippable_place(settings.dt_enable_skippable_place) - , tracing_logger(tracing_logger_ ? tracing_logger_ : getLogWithPrefix(nullptr)) + , tracing_id(tracing_id_) { } diff --git a/dbms/src/Storages/DeltaMerge/DMVersionFilterBlockInputStream.h b/dbms/src/Storages/DeltaMerge/DMVersionFilterBlockInputStream.h index 927b6252fe3..e192aa53648 100644 --- a/dbms/src/Storages/DeltaMerge/DMVersionFilterBlockInputStream.h +++ b/dbms/src/Storages/DeltaMerge/DMVersionFilterBlockInputStream.h @@ -38,16 +38,20 @@ class DMVersionFilterBlockInputStream : public IBlockInputStream { static_assert(MODE == DM_VERSION_FILTER_MODE_MVCC || MODE == DM_VERSION_FILTER_MODE_COMPACT); + constexpr static const char * MVCC_FILTER_NAME = "DMVersionFilterBlockInputStream"; + constexpr static const char * COMPACT_FILTER_NAME = "DMVersionFilterBlockInputStream"; + public: DMVersionFilterBlockInputStream(const BlockInputStreamPtr & input, const ColumnDefines & read_columns, UInt64 version_limit_, bool is_common_handle_, - const LogWithPrefixPtr & log_ = nullptr) + const String & tracing_id = "") : version_limit(version_limit_) , is_common_handle(is_common_handle_) , header(toEmptyBlock(read_columns)) - , log(getLogWithPrefix(log_, fmt::format("DMVersionFilterBlockInputStream<{}>", (MODE == DM_VERSION_FILTER_MODE_MVCC ? "MVCC" : "COMPACT")))) + , log(Logger::get((MODE == DM_VERSION_FILTER_MODE_MVCC ? MVCC_FILTER_NAME : COMPACT_FILTER_NAME), + tracing_id)) { children.push_back(input); @@ -229,7 +233,7 @@ class DMVersionFilterBlockInputStream : public IBlockInputStream size_t not_clean_rows = 0; size_t effective_num_rows = 0; - const LogWithPrefixPtr log; + const LoggerPtr log; }; } // namespace DM } // namespace DB diff --git a/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp b/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp index eb1b2984723..d6c4955e243 100644 --- a/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp +++ b/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp @@ -442,7 +442,7 @@ void DeltaMergeStore::shutdown() LOG_FMT_TRACE(log, "Shutdown DeltaMerge end [{}.{}]", db_name, table_name); } -DMContextPtr DeltaMergeStore::newDMContext(const Context & db_context, const DB::Settings & db_settings, const LogWithPrefixPtr & tracing_logger) +DMContextPtr DeltaMergeStore::newDMContext(const Context & db_context, const DB::Settings & db_settings, const String & tracing_id) { std::shared_lock lock(read_write_mutex); @@ -458,7 +458,7 @@ DMContextPtr DeltaMergeStore::newDMContext(const Context & db_context, const DB: is_common_handle, rowkey_column_size, db_settings, - tracing_logger); + tracing_id); return DMContextPtr(ctx); } @@ -1009,6 +1009,8 @@ void DeltaMergeStore::compact(const Context & db_context, const RowKeyRange & ra } } +// Read data without mvcc filtering && delete-range filtering. +// just for debug BlockInputStreams DeltaMergeStore::readRaw(const Context & db_context, const DB::Settings & db_settings, const ColumnDefines & columns_to_read, @@ -1018,7 +1020,7 @@ BlockInputStreams DeltaMergeStore::readRaw(const Context & db_context, { SegmentReadTasks tasks; - auto dm_context = newDMContext(db_context, db_settings, getLogWithPrefix(nullptr, db_context.getCurrentQueryId())); + auto dm_context = newDMContext(db_context, db_settings, db_context.getCurrentQueryId()); { std::shared_lock lock(read_write_mutex); @@ -1073,16 +1075,17 @@ BlockInputStreams DeltaMergeStore::read(const Context & db_context, size_t num_streams, UInt64 max_version, const RSOperatorPtr & filter, - const LogWithPrefixPtr & tracing_logger, + const String & tracing_id, size_t expected_block_size, const SegmentIdSet & read_segments, size_t extra_table_id_index) { - auto dm_context = newDMContext(db_context, db_settings, tracing_logger); + auto dm_context = newDMContext(db_context, db_settings, tracing_id); SegmentReadTasks tasks = getReadTasksByRanges(*dm_context, sorted_ranges, num_streams, read_segments); - LOG_FMT_DEBUG(dm_context->tracing_logger, "Read create segment snapshot done"); + auto tracing_logger = Logger::get(log->name(), dm_context->tracing_id); + LOG_FMT_DEBUG(tracing_logger, "Read create segment snapshot done"); auto after_segment_read = [&](const DMContextPtr & dm_context_, const SegmentPtr & segment_) { this->checkSegmentUpdate(dm_context_, segment_, ThreadType::Read); @@ -1114,7 +1117,7 @@ BlockInputStreams DeltaMergeStore::read(const Context & db_context, res.push_back(stream); } - LOG_FMT_DEBUG(dm_context->tracing_logger, "Read create stream done"); + LOG_FMT_DEBUG(tracing_logger, "Read create stream done"); return res; } @@ -2502,8 +2505,9 @@ SegmentReadTasks DeltaMergeStore::getReadTasksByRanges( total_ranges += task->ranges.size(); } + auto tracing_logger = Logger::get(log->name(), dm_context.tracing_id); LOG_FMT_DEBUG( - dm_context.tracing_logger, + tracing_logger, "[sorted_ranges: {}] [tasks before split: {}] [tasks final: {}] [ranges final: {}]", sorted_ranges.size(), tasks.size(), diff --git a/dbms/src/Storages/DeltaMerge/DeltaMergeStore.h b/dbms/src/Storages/DeltaMerge/DeltaMergeStore.h index 4b53d47431a..599097adeb0 100644 --- a/dbms/src/Storages/DeltaMerge/DeltaMergeStore.h +++ b/dbms/src/Storages/DeltaMerge/DeltaMergeStore.h @@ -358,7 +358,7 @@ class DeltaMergeStore : private boost::noncopyable size_t num_streams, UInt64 max_version, const RSOperatorPtr & filter, - const LogWithPrefixPtr & tracing_logger, + const String & tracing_id, size_t expected_block_size = DEFAULT_BLOCK_SIZE, const SegmentIdSet & read_segments = {}, size_t extra_table_id_index = InvalidColumnID); @@ -415,7 +415,7 @@ class DeltaMergeStore : private boost::noncopyable private: #endif - DMContextPtr newDMContext(const Context & db_context, const DB::Settings & db_settings, const LogWithPrefixPtr & tracing_logger = nullptr); + DMContextPtr newDMContext(const Context & db_context, const DB::Settings & db_settings, const String & tracing_id = ""); static bool pkIsHandle(const ColumnDefine & handle_define) { return handle_define.id != EXTRA_HANDLE_COLUMN_ID; } diff --git a/dbms/src/Storages/DeltaMerge/File/DMFileBlockInputStream.cpp b/dbms/src/Storages/DeltaMerge/File/DMFileBlockInputStream.cpp index 749423b0bfa..c9212c4b81e 100644 --- a/dbms/src/Storages/DeltaMerge/File/DMFileBlockInputStream.cpp +++ b/dbms/src/Storages/DeltaMerge/File/DMFileBlockInputStream.cpp @@ -54,7 +54,7 @@ DMFileBlockInputStreamPtr DMFileBlockInputStreamBuilder::build(const DMFilePtr & read_packs, file_provider, read_limiter, - tracing_logger); + tracing_id); DMFileReader reader( dmfile, @@ -72,7 +72,7 @@ DMFileBlockInputStreamPtr DMFileBlockInputStreamBuilder::build(const DMFilePtr & read_limiter, rows_threshold_per_read, read_one_pack_every_time, - tracing_logger); + tracing_id); return std::make_shared(std::move(reader)); } diff --git a/dbms/src/Storages/DeltaMerge/File/DMFileBlockInputStream.h b/dbms/src/Storages/DeltaMerge/File/DMFileBlockInputStream.h index 9f166acd5e1..a36bf50a937 100644 --- a/dbms/src/Storages/DeltaMerge/File/DMFileBlockInputStream.h +++ b/dbms/src/Storages/DeltaMerge/File/DMFileBlockInputStream.h @@ -111,9 +111,9 @@ class DMFileBlockInputStreamBuilder return *this; } - DMFileBlockInputStreamBuilder & setTracingLogger(const DB::LoggerPtr & logger) + DMFileBlockInputStreamBuilder & setTracingID(const String & tracing_id_) { - tracing_logger = logger; + tracing_id = tracing_id_; return *this; } @@ -155,7 +155,7 @@ class DMFileBlockInputStreamBuilder size_t rows_threshold_per_read = DMFILE_READ_ROWS_THRESHOLD; bool read_one_pack_every_time = false; - DB::LoggerPtr tracing_logger; + String tracing_id; }; /** diff --git a/dbms/src/Storages/DeltaMerge/File/DMFilePackFilter.h b/dbms/src/Storages/DeltaMerge/File/DMFilePackFilter.h index eee26944413..ea0c3265757 100644 --- a/dbms/src/Storages/DeltaMerge/File/DMFilePackFilter.h +++ b/dbms/src/Storages/DeltaMerge/File/DMFilePackFilter.h @@ -50,9 +50,9 @@ class DMFilePackFilter const IdSetPtr & read_packs, const FileProviderPtr & file_provider, const ReadLimiterPtr & read_limiter, - const DB::LoggerPtr & tracing_logger) + const String & tracing_id) { - auto pack_filter = DMFilePackFilter(dmfile, index_cache, set_cache_if_miss, rowkey_ranges, filter, read_packs, file_provider, read_limiter, tracing_logger); + auto pack_filter = DMFilePackFilter(dmfile, index_cache, set_cache_if_miss, rowkey_ranges, filter, read_packs, file_provider, read_limiter, tracing_id); pack_filter.init(); return pack_filter; } @@ -110,7 +110,7 @@ class DMFilePackFilter const IdSetPtr & read_packs_, // filter by pack index const FileProviderPtr & file_provider_, const ReadLimiterPtr & read_limiter_, - const DB::LoggerPtr & tracing_logger) + const String & tracing_id) : dmfile(dmfile_) , index_cache(index_cache_) , set_cache_if_miss(set_cache_if_miss_) @@ -120,7 +120,7 @@ class DMFilePackFilter , file_provider(file_provider_) , handle_res(dmfile->getPacks(), RSResult::All) , use_packs(dmfile->getPacks()) - , log(tracing_logger ? tracing_logger : Logger::get("DMFilePackFilter")) + , log(Logger::get("DMFilePackFilter", tracing_id)) , read_limiter(read_limiter_) { } diff --git a/dbms/src/Storages/DeltaMerge/File/DMFileReader.cpp b/dbms/src/Storages/DeltaMerge/File/DMFileReader.cpp index 2ffc75c938f..0a2934bad07 100644 --- a/dbms/src/Storages/DeltaMerge/File/DMFileReader.cpp +++ b/dbms/src/Storages/DeltaMerge/File/DMFileReader.cpp @@ -223,7 +223,7 @@ DMFileReader::DMFileReader( const ReadLimiterPtr & read_limiter, size_t rows_threshold_per_read_, bool read_one_pack_every_time_, - const DB::LoggerPtr & tracing_logger) + const String & tracing_id_) : dmfile(dmfile_) , read_columns(read_columns_) , is_common_handle(is_common_handle_) @@ -238,7 +238,7 @@ DMFileReader::DMFileReader( , column_cache(column_cache_) , rows_threshold_per_read(rows_threshold_per_read_) , file_provider(file_provider_) - , log(tracing_logger ? tracing_logger : DB::Logger::get("DMFileReader")) + , log(Logger::get("DMFileReader", tracing_id_)) { for (const auto & cd : read_columns) { diff --git a/dbms/src/Storages/DeltaMerge/File/DMFileReader.h b/dbms/src/Storages/DeltaMerge/File/DMFileReader.h index dd754c86518..0ebfb96bf51 100644 --- a/dbms/src/Storages/DeltaMerge/File/DMFileReader.h +++ b/dbms/src/Storages/DeltaMerge/File/DMFileReader.h @@ -67,7 +67,6 @@ class DMFileReader using StreamPtr = std::unique_ptr; using ColumnStreams = std::map; - // TODO: Use a builder to replace these params DMFileReader( const DMFilePtr & dmfile_, const ColumnDefines & read_columns_, @@ -91,7 +90,7 @@ class DMFileReader const ReadLimiterPtr & read_limiter, size_t rows_threshold_per_read_, bool read_one_pack_every_time_, - const DB::LoggerPtr & tracing_logger); + const String & tracing_id_); Block getHeader() const { return toEmptyBlock(read_columns); } @@ -143,7 +142,7 @@ class DMFileReader FileProviderPtr file_provider; - DB::LoggerPtr log; + LoggerPtr log; }; } // namespace DM diff --git a/dbms/src/Storages/DeltaMerge/FilterParser/FilterParser.cpp b/dbms/src/Storages/DeltaMerge/FilterParser/FilterParser.cpp index 3db5fb8610c..e90eab156a4 100644 --- a/dbms/src/Storages/DeltaMerge/FilterParser/FilterParser.cpp +++ b/dbms/src/Storages/DeltaMerge/FilterParser/FilterParser.cpp @@ -106,7 +106,7 @@ inline RSOperatorPtr parseTiCompareExpr( // const ColumnDefines & columns_to_read, const FilterParser::AttrCreatorByColumnID & creator, const TimezoneInfo & timezone_info, - Poco::Logger * /* log */) + const LoggerPtr & /*log*/) { if (unlikely(expr.children_size() != 2)) return createUnsupported(expr.ShortDebugString(), @@ -247,7 +247,7 @@ RSOperatorPtr parseTiExpr(const tipb::Expr & expr, const ColumnDefines & columns_to_read, const FilterParser::AttrCreatorByColumnID & creator, const TimezoneInfo & timezone_info, - Poco::Logger * log) + const LoggerPtr & log) { assert(isFunctionExpr(expr)); @@ -331,7 +331,7 @@ inline RSOperatorPtr tryParse(const tipb::Expr & filter, const ColumnDefines & columns_to_read, const FilterParser::AttrCreatorByColumnID & creator, const TimezoneInfo & timezone_info, - Poco::Logger * log) + const LoggerPtr & log) { if (isFunctionExpr(filter)) return cop::parseTiExpr(filter, columns_to_read, creator, timezone_info, log); @@ -345,7 +345,7 @@ inline RSOperatorPtr tryParse(const tipb::Expr & filter, RSOperatorPtr FilterParser::parseDAGQuery(const DAGQueryInfo & dag_info, const ColumnDefines & columns_to_read, FilterParser::AttrCreatorByColumnID && creator, - Poco::Logger * log) + const LoggerPtr & log) { RSOperatorPtr op = EMPTY_FILTER; if (dag_info.filters.empty()) diff --git a/dbms/src/Storages/DeltaMerge/FilterParser/FilterParser.h b/dbms/src/Storages/DeltaMerge/FilterParser/FilterParser.h index e9428a3b0ba..79d11f82d4f 100644 --- a/dbms/src/Storages/DeltaMerge/FilterParser/FilterParser.h +++ b/dbms/src/Storages/DeltaMerge/FilterParser/FilterParser.h @@ -47,7 +47,7 @@ class FilterParser const DAGQueryInfo & dag_info, const ColumnDefines & columns_to_read, AttrCreatorByColumnID && creator, - Poco::Logger * log); + const LoggerPtr & log); /// Some helper structure diff --git a/dbms/src/Storages/DeltaMerge/RowKeyRangeUtils.cpp b/dbms/src/Storages/DeltaMerge/RowKeyRangeUtils.cpp index 12c13926503..a2d992e0e8b 100644 --- a/dbms/src/Storages/DeltaMerge/RowKeyRangeUtils.cpp +++ b/dbms/src/Storages/DeltaMerge/RowKeyRangeUtils.cpp @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -#include +#include #include #include #include @@ -158,7 +158,7 @@ void sortRangesByStartEdge(RowKeyRanges & ranges) }); } -RowKeyRanges tryMergeRanges(RowKeyRanges && sorted_ranges, size_t expected_ranges_count, const LogWithPrefixPtr & log) +RowKeyRanges tryMergeRanges(RowKeyRanges && sorted_ranges, size_t expected_ranges_count, const LoggerPtr & log) { if (sorted_ranges.size() <= 1) return std::move(sorted_ranges); diff --git a/dbms/src/Storages/DeltaMerge/RowKeyRangeUtils.h b/dbms/src/Storages/DeltaMerge/RowKeyRangeUtils.h index 67c0a54ec33..59b2c46d45b 100644 --- a/dbms/src/Storages/DeltaMerge/RowKeyRangeUtils.h +++ b/dbms/src/Storages/DeltaMerge/RowKeyRangeUtils.h @@ -18,14 +18,14 @@ namespace DB { -class LogWithPrefix; -using LogWithPrefixPtr = std::shared_ptr; +class Logger; +using LoggerPtr = std::shared_ptr; namespace DM { void sortRangesByStartEdge(RowKeyRanges & ranges); -RowKeyRanges tryMergeRanges(RowKeyRanges && ranges, size_t expected_ranges_count, const LogWithPrefixPtr & log = nullptr); +RowKeyRanges tryMergeRanges(RowKeyRanges && ranges, size_t expected_ranges_count, const LoggerPtr & log = nullptr); } // namespace DM } // namespace DB diff --git a/dbms/src/Storages/DeltaMerge/Segment.cpp b/dbms/src/Storages/DeltaMerge/Segment.cpp index c62b2391c15..01cada61ec2 100644 --- a/dbms/src/Storages/DeltaMerge/Segment.cpp +++ b/dbms/src/Storages/DeltaMerge/Segment.cpp @@ -382,7 +382,7 @@ BlockInputStreamPtr Segment::getInputStream(const DMContext & dm_context, BlockInputStreamPtr stream; if (dm_context.read_delta_only) { - throw Exception("Unsupported"); + throw Exception("Unsupported for read_delta_only"); } else if (dm_context.read_stable_only) { @@ -430,10 +430,10 @@ BlockInputStreamPtr Segment::getInputStream(const DMContext & dm_context, columns_to_read, max_version, is_common_handle, - dm_context.tracing_logger); + dm_context.tracing_id); LOG_FMT_TRACE( - dm_context.tracing_logger, + Logger::get(log->name(), dm_context.tracing_id), "Segment [{}] is read by max_version: {}, {} ranges: {}", segment_id, max_version, diff --git a/dbms/src/Storages/DeltaMerge/StableValueSpace.cpp b/dbms/src/Storages/DeltaMerge/StableValueSpace.cpp index 05ffa4abb7c..1fdfd21aa4a 100644 --- a/dbms/src/Storages/DeltaMerge/StableValueSpace.cpp +++ b/dbms/src/Storages/DeltaMerge/StableValueSpace.cpp @@ -60,7 +60,7 @@ void StableValueSpace::setFiles(const DMFiles & files_, const RowKeyRange & rang {}, dm_context->db_context.getFileProvider(), dm_context->getReadLimiter(), - dm_context->tracing_logger); + dm_context->tracing_id); auto [file_valid_rows, file_valid_bytes] = pack_filter.validRowsAndBytes(); rows += file_valid_rows; bytes += file_valid_bytes; @@ -243,7 +243,7 @@ void StableValueSpace::calculateStableProperty(const DMContext & context, const {}, context.db_context.getFileProvider(), context.getReadLimiter(), - context.tracing_logger); + context.tracing_id); const auto & use_packs = pack_filter.getUsePacks(); size_t new_pack_properties_index = 0; bool use_new_pack_properties = pack_properties.property_size() == 0; @@ -339,6 +339,7 @@ StableValueSpace::Snapshot::getInputStream( .enableCleanRead(enable_clean_read, max_data_version) .setRSOperator(filter) .setColumnCache(column_caches[i]) + .setTracingID(context.tracing_id) .setRowsThreshold(expected_block_size); streams.push_back(builder.build(stable->files[i], read_columns, rowkey_ranges)); } @@ -368,7 +369,7 @@ RowsAndBytes StableValueSpace::Snapshot::getApproxRowsAndBytes(const DMContext & IdSetPtr{}, context.db_context.getFileProvider(), context.getReadLimiter(), - context.tracing_logger); + context.tracing_id); const auto & pack_stats = f->getPackStats(); const auto & use_packs = filter.getUsePacks(); for (size_t i = 0; i < pack_stats.size(); ++i) diff --git a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_delta_merge_store.cpp b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_delta_merge_store.cpp index 53ca6bd36fb..bb0e47bddbd 100644 --- a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_delta_merge_store.cpp +++ b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_delta_merge_store.cpp @@ -14,7 +14,7 @@ #include #include -#include +#include #include #include #include @@ -151,8 +151,6 @@ class DeltaMergeStoreRWTest TiFlashStorageTestBasic::SetUp(); store = reload(); - - log = getLogWithPrefix(nullptr, "DeltaMergeStoreRWTest"); } DeltaMergeStorePtr @@ -211,7 +209,7 @@ class DeltaMergeStoreRWTest TestMode mode; DeltaMergeStorePtr store; - LogWithPrefixPtr log; + constexpr static const char * TRACING_NAME = "DeltaMergeStoreRWTest"; }; TEST_F(DeltaMergeStoreTest, Create) @@ -420,7 +418,7 @@ try /* num_streams= */ 1, /* max_version= */ std::numeric_limits::max(), EMPTY_FILTER, - log, + TRACING_NAME, /* expected_block_size= */ 1024)[0]; size_t num_rows_read = 0; @@ -526,7 +524,7 @@ try /* num_streams= */ 1, /* max_version= */ std::numeric_limits::max(), EMPTY_FILTER, - log, + TRACING_NAME, /* expected_block_size= */ 1024)[0]; size_t num_rows_read = 0; while (Block block = in->read()) @@ -563,7 +561,7 @@ try /* num_streams= */ 1, /* max_version= */ std::numeric_limits::max(), EMPTY_FILTER, - log, + TRACING_NAME, /* expected_block_size= */ 1024)[0]; size_t num_rows_read = 0; while (Block block = in->read()) @@ -648,7 +646,7 @@ try /* num_streams= */ 1, /* max_version= */ std::numeric_limits::max(), EMPTY_FILTER, - log, + TRACING_NAME, /* expected_block_size= */ 1024)[0]; size_t num_rows_read = 0; while (Block block = in->read()) @@ -733,7 +731,7 @@ try /* num_streams= */ 1, /* max_version= */ std::numeric_limits::max(), EMPTY_FILTER, - log, + TRACING_NAME, /* expected_block_size= */ 1024)[0]; size_t num_rows_read = 0; while (Block block = in->read()) @@ -764,7 +762,7 @@ try /* num_streams= */ 1, /* max_version= */ UInt64(1), EMPTY_FILTER, - log, + TRACING_NAME, /* expected_block_size= */ 1024)[0]; size_t num_rows_read = 0; while (Block block = in->read()) @@ -819,7 +817,7 @@ try /* num_streams= */ 1, /* max_version= */ std::numeric_limits::max(), EMPTY_FILTER, - log, + TRACING_NAME, /* expected_block_size= */ 1024)[0]; size_t num_rows_read = 0; while (Block block = in->read()) @@ -858,7 +856,7 @@ try /* num_streams= */ 1, /* max_version= */ std::numeric_limits::max(), EMPTY_FILTER, - log, + TRACING_NAME, /* expected_block_size= */ 1024)[0]; size_t num_rows_read = 0; // block_num represents index of current segment @@ -917,7 +915,7 @@ try /* num_streams= */ 1, /* max_version= */ std::numeric_limits::max(), EMPTY_FILTER, - log, + TRACING_NAME, /* expected_block_size= */ 1024); ASSERT_EQ(ins.size(), 1UL); BlockInputStreamPtr in = ins[0]; @@ -940,7 +938,7 @@ try /* num_streams= */ 1, /* max_version= */ tso2, EMPTY_FILTER, - log, + TRACING_NAME, /* expected_block_size= */ 1024); ASSERT_EQ(ins.size(), 1UL); BlockInputStreamPtr in = ins[0]; @@ -963,7 +961,7 @@ try /* num_streams= */ 1, /* max_version= */ tso1, EMPTY_FILTER, - log, + TRACING_NAME, /* expected_block_size= */ 1024); ASSERT_EQ(ins.size(), 1UL); BlockInputStreamPtr in = ins[0]; @@ -986,7 +984,7 @@ try /* num_streams= */ 1, /* max_version= */ tso1 - 1, EMPTY_FILTER, - log, + TRACING_NAME, /* expected_block_size= */ 1024); ASSERT_EQ(ins.size(), 1UL); BlockInputStreamPtr in = ins[0]; @@ -1047,7 +1045,7 @@ try /* num_streams= */ 1, /* max_version= */ tso1, EMPTY_FILTER, - log, + TRACING_NAME, /* expected_block_size= */ 1024); ASSERT_EQ(ins.size(), 1UL); BlockInputStreamPtr in = ins[0]; @@ -1084,7 +1082,7 @@ try /* num_streams= */ 1, /* max_version= */ tso2 - 1, EMPTY_FILTER, - log, + TRACING_NAME, /* expected_block_size= */ 1024); ASSERT_EQ(ins.size(), 1UL); BlockInputStreamPtr in = ins[0]; @@ -1122,7 +1120,7 @@ try /* num_streams= */ 1, /* max_version= */ tso3 - 1, EMPTY_FILTER, - log, + TRACING_NAME, /* expected_block_size= */ 1024); ASSERT_EQ(ins.size(), 1UL); BlockInputStreamPtr in = ins[0]; @@ -1147,7 +1145,7 @@ try /* num_streams= */ 1, /* max_version= */ std::numeric_limits::max(), EMPTY_FILTER, - log, + TRACING_NAME, /* expected_block_size= */ 1024); ASSERT_EQ(ins.size(), 1UL); BlockInputStreamPtr in = ins[0]; @@ -1172,7 +1170,7 @@ try /* num_streams= */ 1, /* max_version= */ std::numeric_limits::max(), EMPTY_FILTER, - log, + TRACING_NAME, /* expected_block_size= */ 1024); ASSERT_EQ(ins.size(), 1UL); BlockInputStreamPtr in = ins[0]; @@ -1233,7 +1231,7 @@ try /* num_streams= */ 1, /* max_version= */ tso1, EMPTY_FILTER, - log, + TRACING_NAME, /* expected_block_size= */ 1024); ASSERT_EQ(ins.size(), 1); BlockInputStreamPtr in = ins[0]; @@ -1270,7 +1268,7 @@ try /* num_streams= */ 1, /* max_version= */ tso2 - 1, EMPTY_FILTER, - log, + TRACING_NAME, /* expected_block_size= */ 1024); ASSERT_EQ(ins.size(), 1); BlockInputStreamPtr in = ins[0]; @@ -1308,7 +1306,7 @@ try /* num_streams= */ 1, /* max_version= */ std::numeric_limits::max(), EMPTY_FILTER, - log, + TRACING_NAME, /* expected_block_size= */ 1024); ASSERT_EQ(ins.size(), 1); BlockInputStreamPtr in = ins[0]; @@ -1364,7 +1362,7 @@ try /* num_streams= */ 1, /* max_version= */ tso1, EMPTY_FILTER, - log, + TRACING_NAME, /* expected_block_size= */ 1024); ASSERT_EQ(ins.size(), 1); BlockInputStreamPtr in = ins[0]; @@ -1401,7 +1399,7 @@ try /* num_streams= */ 1, /* max_version= */ std::numeric_limits::max(), EMPTY_FILTER, - log, + TRACING_NAME, /* expected_block_size= */ 1024); ASSERT_EQ(ins.size(), 1); BlockInputStreamPtr in = ins[0]; @@ -1487,7 +1485,7 @@ try /* num_streams= */ 1, /* max_version= */ std::numeric_limits::max(), EMPTY_FILTER, - log, + TRACING_NAME, /* expected_block_size= */ 1024); ASSERT_EQ(ins.size(), 1UL); BlockInputStreamPtr in = ins[0]; @@ -1595,7 +1593,7 @@ try /* num_streams= */ 1, /* max_version= */ std::numeric_limits::max(), EMPTY_FILTER, - log, + TRACING_NAME, /* expected_block_size= */ 1024); ASSERT_EQ(ins.size(), 1UL); BlockInputStreamPtr & in = ins[0]; @@ -1702,7 +1700,7 @@ try /* num_streams= */ 1, /* max_version= */ std::numeric_limits::max(), EMPTY_FILTER, - log, + TRACING_NAME, /* expected_block_size= */ 1024); ASSERT_EQ(ins.size(), 1UL); BlockInputStreamPtr & in = ins[0]; @@ -1790,7 +1788,7 @@ try /* num_streams= */ 1, /* max_version= */ std::numeric_limits::max(), EMPTY_FILTER, - log, + TRACING_NAME, /* expected_block_size= */ 1024); ASSERT_EQ(ins.size(), 1UL); BlockInputStreamPtr & in = ins[0]; @@ -1893,7 +1891,7 @@ try /* num_streams= */ 1, /* max_version= */ std::numeric_limits::max(), EMPTY_FILTER, - log, + TRACING_NAME, /* expected_block_size= */ 1024)[0]; in->readPrefix(); @@ -1969,7 +1967,7 @@ try /* num_streams= */ 1, /* max_version= */ std::numeric_limits::max(), EMPTY_FILTER, - log, + TRACING_NAME, /* expected_block_size= */ 1024)[0]; in->readPrefix(); @@ -2045,7 +2043,7 @@ try /* num_streams= */ 1, /* max_version= */ std::numeric_limits::max(), EMPTY_FILTER, - log, + TRACING_NAME, /* expected_block_size= */ 1024)[0]; in->readPrefix(); @@ -2121,7 +2119,7 @@ try /* num_streams= */ 1, /* max_version= */ std::numeric_limits::max(), EMPTY_FILTER, - log, + TRACING_NAME, /* expected_block_size= */ 1024)[0]; in->readPrefix(); @@ -2197,7 +2195,7 @@ try /* num_streams= */ 1, /* max_version= */ std::numeric_limits::max(), EMPTY_FILTER, - log, + TRACING_NAME, /* expected_block_size= */ 1024)[0]; in->readPrefix(); @@ -2271,7 +2269,7 @@ try /* num_streams= */ 1, /* max_version= */ std::numeric_limits::max(), EMPTY_FILTER, - log, + TRACING_NAME, /* expected_block_size= */ 1024)[0]; size_t num_rows_read = 0; @@ -2344,7 +2342,7 @@ try /* num_streams= */ 1, /* max_version= */ std::numeric_limits::max(), EMPTY_FILTER, - log, + TRACING_NAME, /* expected_block_size= */ 1024)[0]; in->readPrefix(); @@ -2434,7 +2432,7 @@ try /* num_streams= */ 1, /* max_version= */ std::numeric_limits::max(), EMPTY_FILTER, - log, + TRACING_NAME, /* expected_block_size= */ 1024); ASSERT_EQ(ins.size(), 1UL); BlockInputStreamPtr & in = ins[0]; @@ -2568,7 +2566,7 @@ try /* num_streams= */ 1, /* max_version= */ std::numeric_limits::max(), EMPTY_FILTER, - log, + TRACING_NAME, /* expected_block_size= */ 1024); ASSERT_EQ(ins.size(), 1UL); BlockInputStreamPtr & in = ins[0]; @@ -2629,7 +2627,7 @@ try /* num_streams= */ 1, /* max_version= */ std::numeric_limits::max(), EMPTY_FILTER, - log, + TRACING_NAME, /* expected_block_size= */ 1024); ASSERT_EQ(ins.size(), 1UL); BlockInputStreamPtr & in = ins[0]; @@ -2728,7 +2726,7 @@ try /* num_streams= */ 1, /* max_version= */ std::numeric_limits::max(), EMPTY_FILTER, - log, + TRACING_NAME, /* expected_block_size= */ 1024)[0]; in->readPrefix(); @@ -2781,7 +2779,7 @@ try /* num_streams= */ 1, /* max_version= */ std::numeric_limits::max(), EMPTY_FILTER, - log, + TRACING_NAME, /* expected_block_size= */ 1024)[0]; in->readPrefix(); @@ -2897,7 +2895,7 @@ try /* num_streams= */ 1, /* max_version= */ std::numeric_limits::max(), EMPTY_FILTER, - log, + TRACING_NAME, /* expected_block_size= */ 1024)[0]; size_t num_rows_read = 0; @@ -3026,7 +3024,7 @@ try /* num_streams= */ 1, /* max_version= */ std::numeric_limits::max(), EMPTY_FILTER, - log, + TRACING_NAME, /* expected_block_size= */ 1024)[0]; size_t num_rows_read = 0; while (Block block = in->read()) @@ -3100,7 +3098,7 @@ try /* num_streams= */ 1, /* max_version= */ std::numeric_limits::max(), EMPTY_FILTER, - log, + TRACING_NAME, /* expected_block_size= */ 1024)[0]; size_t num_rows_read = 0; while (Block block = in->read()) @@ -3131,7 +3129,7 @@ try /* num_streams= */ 1, /* max_version= */ UInt64(1), EMPTY_FILTER, - log, + TRACING_NAME, /* expected_block_size= */ 1024)[0]; size_t num_rows_read = 0; while (Block block = in->read()) @@ -3189,7 +3187,7 @@ try /* num_streams= */ 1, /* max_version= */ std::numeric_limits::max(), EMPTY_FILTER, - log, + TRACING_NAME, /* expected_block_size= */ 1024)[0]; size_t num_rows_read = 0; while (Block block = in->read()) @@ -3239,7 +3237,7 @@ try /* num_streams= */ 1, /* max_version= */ std::numeric_limits::max(), EMPTY_FILTER, - log, + TRACING_NAME, /* expected_block_size= */ 1024)[0]; size_t num_rows_read = 0; while (Block block = in->read()) @@ -3312,7 +3310,7 @@ try /* num_streams= */ 1, /* max_version= */ std::numeric_limits::max(), EMPTY_FILTER, - log, + TRACING_NAME, /* expected_block_size= */ 1024); ASSERT_EQ(ins.size(), 1UL); BlockInputStreamPtr in = ins[0]; diff --git a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_minmax_index.cpp b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_minmax_index.cpp index 6bc0133fc2a..31fd99faf01 100644 --- a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_minmax_index.cpp +++ b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_minmax_index.cpp @@ -124,8 +124,7 @@ bool checkMatch( store->mergeDeltaAll(context); const ColumnDefine & col_to_read = check_pk ? getExtraHandleColumnDefine(is_common_handle) : cd; - auto log = Logger::get(__PRETTY_FUNCTION__); - auto streams = store->read(context, context.getSettingsRef(), {col_to_read}, {all_range}, 1, std::numeric_limits::max(), filter, log); + auto streams = store->read(context, context.getSettingsRef(), {col_to_read}, {all_range}, 1, std::numeric_limits::max(), filter, name); streams[0]->readPrefix(); auto rows = streams[0]->read().rows(); streams[0]->readSuffix(); diff --git a/dbms/src/Storages/DeltaMerge/tools/workload/DTWorkload.cpp b/dbms/src/Storages/DeltaMerge/tools/workload/DTWorkload.cpp index cccc26f8dcd..e5c7fd30f40 100644 --- a/dbms/src/Storages/DeltaMerge/tools/workload/DTWorkload.cpp +++ b/dbms/src/Storages/DeltaMerge/tools/workload/DTWorkload.cpp @@ -191,8 +191,7 @@ void DTWorkload::read(const ColumnDefines & columns, int stream_count, T func) auto filter = EMPTY_FILTER; int excepted_block_size = 1024; uint64_t read_ts = ts_gen->get(); - auto log = getLogWithPrefix(nullptr, "DTWorkload"); - auto streams = store->read(*context, context->getSettingsRef(), columns, ranges, stream_count, read_ts, filter, log, excepted_block_size); + auto streams = store->read(*context, context->getSettingsRef(), columns, ranges, stream_count, read_ts, filter, "DTWorkload", excepted_block_size); std::vector threads; threads.reserve(streams.size()); for (auto & stream : streams) diff --git a/dbms/src/Storages/SelectQueryInfo.cpp b/dbms/src/Storages/SelectQueryInfo.cpp index 516a5f67087..075c0ad0631 100644 --- a/dbms/src/Storages/SelectQueryInfo.cpp +++ b/dbms/src/Storages/SelectQueryInfo.cpp @@ -28,7 +28,7 @@ SelectQueryInfo::SelectQueryInfo(const SelectQueryInfo & rhs) , sets(rhs.sets) , mvcc_query_info(rhs.mvcc_query_info != nullptr ? std::make_unique(*rhs.mvcc_query_info) : nullptr) , dag_query(rhs.dag_query != nullptr ? std::make_unique(*rhs.dag_query) : nullptr) - , logger(rhs.logger) + , req_id(rhs.req_id) {} SelectQueryInfo::SelectQueryInfo(SelectQueryInfo && rhs) noexcept @@ -36,7 +36,7 @@ SelectQueryInfo::SelectQueryInfo(SelectQueryInfo && rhs) noexcept , sets(std::move(rhs.sets)) , mvcc_query_info(std::move(rhs.mvcc_query_info)) , dag_query(std::move(rhs.dag_query)) - , logger(std::move(rhs.logger)) + , req_id(std::move(rhs.req_id)) {} } // namespace DB diff --git a/dbms/src/Storages/SelectQueryInfo.h b/dbms/src/Storages/SelectQueryInfo.h index d43c3f8b6d0..d49d4c831d1 100644 --- a/dbms/src/Storages/SelectQueryInfo.h +++ b/dbms/src/Storages/SelectQueryInfo.h @@ -15,6 +15,7 @@ #pragma once #include +#include #include namespace DB @@ -31,11 +32,7 @@ using PreparedSets = std::unordered_map; struct MvccQueryInfo; struct DAGQueryInfo; -class LogWithPrefix; -using LogWithPrefixPtr = std::shared_ptr; -class LogWithPrefix; -using LogWithPrefixPtr = std::shared_ptr; /** Query along with some additional data, * that can be used during query processing @@ -53,7 +50,7 @@ struct SelectQueryInfo std::unique_ptr dag_query; - LogWithPrefixPtr logger; + std::string req_id; SelectQueryInfo(); diff --git a/dbms/src/Storages/StorageDeltaMerge.cpp b/dbms/src/Storages/StorageDeltaMerge.cpp index f18f11be244..beef2acc9b0 100644 --- a/dbms/src/Storages/StorageDeltaMerge.cpp +++ b/dbms/src/Storages/StorageDeltaMerge.cpp @@ -14,7 +14,7 @@ #include #include -#include +#include #include #include #include @@ -81,7 +81,7 @@ StorageDeltaMerge::StorageDeltaMerge( , store_inited(false) , max_column_id_used(0) , global_context(global_context_.getGlobalContext()) - , log(&Poco::Logger::get("StorageDeltaMerge")) + , log(DB::Logger::get("StorageDeltaMerge", fmt::format("{}.{}", db_name_, table_name_))) { if (primary_expr_ast_->children.empty()) throw Exception("No primary key"); @@ -627,7 +627,7 @@ BlockInputStreams StorageDeltaMerge::read( throw Exception("TMTContext is not initialized", ErrorCodes::LOGICAL_ERROR); const auto & mvcc_query_info = *query_info.mvcc_query_info; - auto tracing_logger = query_info.logger; + auto tracing_logger = Logger::get("StorageDeltaMerge", log->identifier(), query_info.req_id); LOG_FMT_DEBUG(tracing_logger, "Read with tso: {}", mvcc_query_info.read_tso); @@ -737,7 +737,7 @@ BlockInputStreams StorageDeltaMerge::read( num_streams, /*max_version=*/mvcc_query_info.read_tso, rs_operator, - tracing_logger, + query_info.req_id, max_block_size, parseSegmentSet(select_query.segment_expression_list), extra_table_id_index); @@ -814,7 +814,7 @@ size_t getRows(DM::DeltaMergeStorePtr & store, const Context & context, const DM 1, std::numeric_limits::max(), EMPTY_FILTER, - Logger::get("StorageDeltaMerge", "getRows"))[0]; + /*tracing_id*/ "getRows")[0]; stream->readPrefix(); Block block; while ((block = stream->read())) @@ -839,7 +839,7 @@ DM::RowKeyRange getRange(DM::DeltaMergeStorePtr & store, const Context & context 1, std::numeric_limits::max(), EMPTY_FILTER, - Logger::get("StorageDeltaMerge", "getRange"))[0]; + /*tracing_id*/ "getRange")[0]; stream->readPrefix(); Block block; size_t index = 0; diff --git a/dbms/src/Storages/StorageDeltaMerge.h b/dbms/src/Storages/StorageDeltaMerge.h index f5513063d3e..4235ad49071 100644 --- a/dbms/src/Storages/StorageDeltaMerge.h +++ b/dbms/src/Storages/StorageDeltaMerge.h @@ -235,7 +235,7 @@ class StorageDeltaMerge Context & global_context; - Poco::Logger * log; + DB::LoggerPtr log; }; diff --git a/dbms/src/Storages/StorageDeltaMergeHelpers.h b/dbms/src/Storages/StorageDeltaMergeHelpers.h index 14ab39dc502..18337f879de 100644 --- a/dbms/src/Storages/StorageDeltaMergeHelpers.h +++ b/dbms/src/Storages/StorageDeltaMergeHelpers.h @@ -14,7 +14,7 @@ #pragma once -#include +#include #include #include #include @@ -37,7 +37,7 @@ inline DM::RowKeyRanges getQueryRanges( bool is_common_handle, size_t rowkey_column_size, size_t expected_ranges_count = 1, - const LogWithPrefixPtr & log = nullptr) + const LoggerPtr & log = nullptr) { // todo check table id in DecodedTiKVKey??? DM::RowKeyRanges ranges; diff --git a/dbms/src/Storages/tests/gtest_filter_parser.cpp b/dbms/src/Storages/tests/gtest_filter_parser.cpp index 52f9efe0f1e..a027ea71cfc 100644 --- a/dbms/src/Storages/tests/gtest_filter_parser.cpp +++ b/dbms/src/Storages/tests/gtest_filter_parser.cpp @@ -54,14 +54,14 @@ class FilterParserTest : public ::testing::Test } FilterParserTest() - : log(&Poco::Logger::get("FilterParserTest")) + : log(Logger::get("FilterParserTest")) , ctx(TiFlashTestEnv::getContext()) { default_timezone_info = ctx.getTimezoneInfo(); } protected: - Poco::Logger * log; + LoggerPtr log; Context ctx; static TimezoneInfo default_timezone_info; DM::RSOperatorPtr generateRsOperator(String table_info_json, const String & query, TimezoneInfo & timezone_info); @@ -98,7 +98,7 @@ DM::RSOperatorPtr FilterParserTest::generateRsOperator(const String table_info_j DM::ColumnDefines columns_to_read; { NamesAndTypes source_columns; - std::tie(source_columns, std::ignore) = parseColumnsFromTableInfo(table_info, log); + std::tie(source_columns, std::ignore) = parseColumnsFromTableInfo(table_info, log->getLog()); dag_query = std::make_unique( conditions, DAGPreparedSets(), From 3b7d30a0b6cbd5d4d315106acac75e03f9f6eeb5 Mon Sep 17 00:00:00 2001 From: JaySon-Huang Date: Tue, 29 Mar 2022 14:02:51 +0800 Subject: [PATCH 05/10] add more tracing_id other than query --- .../DeltaMerge/ColumnFile/ColumnFileBig.cpp | 4 +++- dbms/src/Storages/DeltaMerge/DMContext.h | 2 +- .../DMVersionFilterBlockInputStream.h | 5 ++++- .../Storages/DeltaMerge/Delta/Snapshot.cpp | 3 +-- .../Storages/DeltaMerge/DeltaMergeStore.cpp | 21 ++++++++++++------- .../Storages/DeltaMerge/File/DMFileReader.cpp | 2 +- .../Storages/DeltaMerge/File/DMFileReader.h | 2 +- dbms/src/Storages/DeltaMerge/Segment.cpp | 1 + .../Storages/DeltaMerge/StableValueSpace.cpp | 1 + 9 files changed, 26 insertions(+), 15 deletions(-) diff --git a/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileBig.cpp b/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileBig.cpp index c6a37bbb7be..9fe72839628 100644 --- a/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileBig.cpp +++ b/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileBig.cpp @@ -87,7 +87,9 @@ void ColumnFileBigReader::initStream() return; DMFileBlockInputStreamBuilder builder(context.db_context); - file_stream = builder.build(column_file.getFile(), *col_defs, RowKeyRanges{column_file.segment_range}); + file_stream = builder + .setTracingID(context.tracing_id) + .build(column_file.getFile(), *col_defs, RowKeyRanges{column_file.segment_range}); // If we only need to read pk and version columns, then cache columns data in memory. if (pk_ver_only) diff --git a/dbms/src/Storages/DeltaMerge/DMContext.h b/dbms/src/Storages/DeltaMerge/DMContext.h index cc8775703dc..1caeb5a87a8 100644 --- a/dbms/src/Storages/DeltaMerge/DMContext.h +++ b/dbms/src/Storages/DeltaMerge/DMContext.h @@ -83,7 +83,7 @@ struct DMContext : private boost::noncopyable const bool enable_relevant_place; const bool enable_skippable_place; - const String tracing_id; + String tracing_id; public: DMContext(const Context & db_context_, diff --git a/dbms/src/Storages/DeltaMerge/DMVersionFilterBlockInputStream.h b/dbms/src/Storages/DeltaMerge/DMVersionFilterBlockInputStream.h index e192aa53648..d512f140109 100644 --- a/dbms/src/Storages/DeltaMerge/DMVersionFilterBlockInputStream.h +++ b/dbms/src/Storages/DeltaMerge/DMVersionFilterBlockInputStream.h @@ -65,7 +65,10 @@ class DMVersionFilterBlockInputStream : public IBlockInputStream ~DMVersionFilterBlockInputStream() { LOG_FMT_DEBUG(log, - "Total rows: {}, pass: {:.2f}%, complete pass: {:.2f}%, complete not pass: {:.2f}%, not clean: {:.2f}%, effective: {:.2f}%, read tso: {}", + "Total rows: {}, pass: {:.2f}%" + ", complete pass: {:.2f}%, complete not pass: {:.2f}%" + ", not clean: {:.2f}%, effective: {:.2f}%" + ", read tso: {}", total_rows, passed_rows * 100.0 / total_rows, complete_passed * 100.0 / total_blocks, diff --git a/dbms/src/Storages/DeltaMerge/Delta/Snapshot.cpp b/dbms/src/Storages/DeltaMerge/Delta/Snapshot.cpp index af1fdfec94f..cc161d0f4d9 100644 --- a/dbms/src/Storages/DeltaMerge/Delta/Snapshot.cpp +++ b/dbms/src/Storages/DeltaMerge/Delta/Snapshot.cpp @@ -38,8 +38,7 @@ DeltaSnapshotPtr DeltaValueSpace::createSnapshot(const DMContext & context, bool snap->is_update = for_update; snap->_delta = this->shared_from_this(); - // TODO: Add tracing_id from mpp task or background tasks - auto storage_snap = std::make_shared(context.storage_pool, context.getReadLimiter(), /*tracing_id*/ "", true); + auto storage_snap = std::make_shared(context.storage_pool, context.getReadLimiter(), context.tracing_id, /*snapshot_read*/ true); snap->persisted_files_snap = persisted_file_set->createSnapshot(storage_snap); snap->shared_delta_index = delta_index; diff --git a/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp b/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp index d6c4955e243..156ca04176d 100644 --- a/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp +++ b/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp @@ -529,7 +529,7 @@ void DeltaMergeStore::write(const Context & db_context, const DB::Settings & db_ if (rows == 0) return; - auto dm_context = newDMContext(db_context, db_settings); + auto dm_context = newDMContext(db_context, db_settings, "write"); const auto bytes = block.bytes(); @@ -588,8 +588,8 @@ void DeltaMergeStore::write(const Context & db_context, const DB::Settings & db_ auto alloc_bytes = block.bytes(offset, limit); bool is_small = limit < dm_context->delta_cache_limit_rows / 4 && alloc_bytes < dm_context->delta_cache_limit_bytes / 4; - // Small column fies are appended to Delta Cache, then flushed later. - // While large column fies are directly written to PageStorage. + // Small column files are appended to Delta Cache, then flushed later. + // While large column files are directly written to PageStorage. if (is_small) { if (segment->writeToCache(*dm_context, block, offset, limit)) @@ -641,6 +641,7 @@ void DeltaMergeStore::write(const Context & db_context, const DB::Settings & db_ throw Exception("Fail point random_exception_after_dt_write_done is triggered.", ErrorCodes::FAIL_POINT_ERROR); }); + // TODO: Update the tracing_id before checkSegmentUpdate for (auto & segment : updated_segments) checkSegmentUpdate(dm_context, segment, ThreadType::Write); } @@ -854,6 +855,7 @@ void DeltaMergeStore::ingestFiles( flushCache(dm_context, range); + // TODO: Update the tracing_id before checkSegmentUpdate? for (auto & segment : updated_segments) checkSegmentUpdate(dm_context, segment, ThreadType::Write); } @@ -867,7 +869,7 @@ void DeltaMergeStore::deleteRange(const Context & db_context, const DB::Settings if (delete_range.none()) return; - auto dm_context = newDMContext(db_context, db_settings); + auto dm_context = newDMContext(db_context, db_settings, "delete_range"); Segments updated_segments; @@ -912,6 +914,7 @@ void DeltaMergeStore::deleteRange(const Context & db_context, const DB::Settings cur_range.setEnd(delete_range.end); } + // TODO: Update the tracing_id before checkSegmentUpdate? for (auto & segment : updated_segments) checkSegmentUpdate(dm_context, segment, ThreadType::Write); } @@ -954,7 +957,7 @@ void DeltaMergeStore::flushCache(const DMContextPtr & dm_context, const RowKeyRa void DeltaMergeStore::mergeDeltaAll(const Context & context) { - auto dm_context = newDMContext(context, context.getSettingsRef()); + auto dm_context = newDMContext(context, context.getSettingsRef(), /*tracing_id*/ "mergeDeltaAll"); std::vector all_segments; { @@ -974,7 +977,7 @@ void DeltaMergeStore::mergeDeltaAll(const Context & context) void DeltaMergeStore::compact(const Context & db_context, const RowKeyRange & range) { - auto dm_context = newDMContext(db_context, db_context.getSettingsRef()); + auto dm_context = newDMContext(db_context, db_context.getSettingsRef(), /*tracing_id*/ "compact"); RowKeyRange cur_range = range; while (!cur_range.none()) @@ -1020,7 +1023,7 @@ BlockInputStreams DeltaMergeStore::readRaw(const Context & db_context, { SegmentReadTasks tasks; - auto dm_context = newDMContext(db_context, db_settings, db_context.getCurrentQueryId()); + auto dm_context = newDMContext(db_context, db_settings, fmt::format("read_raw_{}", db_context.getCurrentQueryId())); { std::shared_lock lock(read_write_mutex); @@ -1080,6 +1083,7 @@ BlockInputStreams DeltaMergeStore::read(const Context & db_context, const SegmentIdSet & read_segments, size_t extra_table_id_index) { + // Use the id from MPP/Coprocessor level as tracing_id auto dm_context = newDMContext(db_context, db_settings, tracing_id); SegmentReadTasks tasks = getReadTasksByRanges(*dm_context, sorted_ranges, num_streams, read_segments); @@ -1088,6 +1092,7 @@ BlockInputStreams DeltaMergeStore::read(const Context & db_context, LOG_FMT_DEBUG(tracing_logger, "Read create segment snapshot done"); auto after_segment_read = [&](const DMContextPtr & dm_context_, const SegmentPtr & segment_) { + // TODO: Update the tracing_id before checkSegmentUpdate? this->checkSegmentUpdate(dm_context_, segment_, ThreadType::Read); }; @@ -1614,7 +1619,7 @@ UInt64 DeltaMergeStore::onSyncGc(Int64 limit) if (shutdown_called.load(std::memory_order_relaxed)) break; - auto dm_context = newDMContext(global_context, global_context.getSettingsRef()); + auto dm_context = newDMContext(global_context, global_context.getSettingsRef(), "onSyncGc"); SegmentPtr segment; SegmentSnapshotPtr segment_snap; { diff --git a/dbms/src/Storages/DeltaMerge/File/DMFileReader.cpp b/dbms/src/Storages/DeltaMerge/File/DMFileReader.cpp index 0a2934bad07..423d8d4d031 100644 --- a/dbms/src/Storages/DeltaMerge/File/DMFileReader.cpp +++ b/dbms/src/Storages/DeltaMerge/File/DMFileReader.cpp @@ -44,7 +44,7 @@ DMFileReader::Stream::Stream( const String & file_name_base, size_t aio_threshold, size_t max_read_buffer_size, - const DB::LoggerPtr & log, + const LoggerPtr & log, const ReadLimiterPtr & read_limiter) : single_file_mode(reader.single_file_mode) , avg_size_hint(reader.dmfile->getColumnStat(col_id).avg_size) diff --git a/dbms/src/Storages/DeltaMerge/File/DMFileReader.h b/dbms/src/Storages/DeltaMerge/File/DMFileReader.h index 0ebfb96bf51..9211918c2d0 100644 --- a/dbms/src/Storages/DeltaMerge/File/DMFileReader.h +++ b/dbms/src/Storages/DeltaMerge/File/DMFileReader.h @@ -44,7 +44,7 @@ class DMFileReader const String & file_name_base, size_t aio_threshold, size_t max_read_buffer_size, - const DB::LoggerPtr & log, + const LoggerPtr & log, const ReadLimiterPtr & read_limiter); const bool single_file_mode; diff --git a/dbms/src/Storages/DeltaMerge/Segment.cpp b/dbms/src/Storages/DeltaMerge/Segment.cpp index 01cada61ec2..cb2cc7dbfae 100644 --- a/dbms/src/Storages/DeltaMerge/Segment.cpp +++ b/dbms/src/Storages/DeltaMerge/Segment.cpp @@ -727,6 +727,7 @@ std::optional Segment::getSplitPointFast(DMContext & dm_context, co auto stream = builder .setColumnCache(stable_snap->getColumnCaches()[file_index]) .setReadPacks(read_pack) + .setTracingID(fmt::format("{}-getSplitPointFast", dm_context.tracing_id)) .build( read_file, /*read_columns=*/{getExtraHandleColumnDefine(is_common_handle)}, diff --git a/dbms/src/Storages/DeltaMerge/StableValueSpace.cpp b/dbms/src/Storages/DeltaMerge/StableValueSpace.cpp index 1fdfd21aa4a..2ea137ccec1 100644 --- a/dbms/src/Storages/DeltaMerge/StableValueSpace.cpp +++ b/dbms/src/Storages/DeltaMerge/StableValueSpace.cpp @@ -209,6 +209,7 @@ void StableValueSpace::calculateStableProperty(const DMContext & context, const BlockInputStreamPtr data_stream = builder .setRowsThreshold(std::numeric_limits::max()) // because we just read one pack at a time .onlyReadOnePackEveryTime() + .setTracingID(fmt::format("{}-calculateStableProperty", context.tracing_id)) .build(file, read_columns, RowKeyRanges{rowkey_range}); auto mvcc_stream = std::make_shared>( data_stream, From c960ca39ecd5d9b4964c96a9c20bd407d5a6a155 Mon Sep 17 00:00:00 2001 From: JaySon-Huang Date: Thu, 31 Mar 2022 14:07:07 +0800 Subject: [PATCH 06/10] Address comment --- dbms/src/Storages/DeltaMerge/StableValueSpace.cpp | 8 ++++---- dbms/src/Storages/StorageDeltaMerge.cpp | 2 +- dbms/src/Storages/StorageDeltaMerge.h | 2 +- 3 files changed, 6 insertions(+), 6 deletions(-) diff --git a/dbms/src/Storages/DeltaMerge/StableValueSpace.cpp b/dbms/src/Storages/DeltaMerge/StableValueSpace.cpp index 2ea137ccec1..648ffd4f084 100644 --- a/dbms/src/Storages/DeltaMerge/StableValueSpace.cpp +++ b/dbms/src/Storages/DeltaMerge/StableValueSpace.cpp @@ -130,7 +130,7 @@ size_t StableValueSpace::getBytes() const size_t StableValueSpace::getBytesOnDisk() const { - // If this stable value space is logical splited, some file may not used, + // If this stable value space is logical splitted, some file may not used, // and this will return more bytes than actual used. size_t bytes = 0; for (const auto & file : files) @@ -258,9 +258,9 @@ void StableValueSpace::calculateStableProperty(const DMContext & context, const } if (unlikely((size_t)new_pack_properties.property_size() != use_packs_count)) { - throw Exception("new_pack_properties size " + std::to_string(new_pack_properties.property_size()) - + " doesn't match use packs size " + std::to_string(use_packs_count), - ErrorCodes::LOGICAL_ERROR); + throw Exception( + fmt::format("size doesn't match [new_pack_properties_size={}] [use_packs_size={}]", new_pack_properties.property_size(), use_packs_count), + ErrorCodes::LOGICAL_ERROR); } } for (size_t pack_id = 0; pack_id < use_packs.size(); pack_id++) diff --git a/dbms/src/Storages/StorageDeltaMerge.cpp b/dbms/src/Storages/StorageDeltaMerge.cpp index beef2acc9b0..1f496ade671 100644 --- a/dbms/src/Storages/StorageDeltaMerge.cpp +++ b/dbms/src/Storages/StorageDeltaMerge.cpp @@ -81,7 +81,7 @@ StorageDeltaMerge::StorageDeltaMerge( , store_inited(false) , max_column_id_used(0) , global_context(global_context_.getGlobalContext()) - , log(DB::Logger::get("StorageDeltaMerge", fmt::format("{}.{}", db_name_, table_name_))) + , log(Logger::get("StorageDeltaMerge", fmt::format("{}.{}", db_name_, table_name_))) { if (primary_expr_ast_->children.empty()) throw Exception("No primary key"); diff --git a/dbms/src/Storages/StorageDeltaMerge.h b/dbms/src/Storages/StorageDeltaMerge.h index 4235ad49071..84ae387ecee 100644 --- a/dbms/src/Storages/StorageDeltaMerge.h +++ b/dbms/src/Storages/StorageDeltaMerge.h @@ -235,7 +235,7 @@ class StorageDeltaMerge Context & global_context; - DB::LoggerPtr log; + LoggerPtr log; }; From d435fb8c9f3567b2b3d1772bb6f20997b9531fc2 Mon Sep 17 00:00:00 2001 From: JaySon-Huang Date: Thu, 31 Mar 2022 19:30:11 +0800 Subject: [PATCH 07/10] Refine some loggers --- dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp | 10 +++++----- dbms/src/Storages/DeltaMerge/DeltaMergeStore.h | 6 +++--- 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp b/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp index 156ca04176d..11ec13f25dd 100644 --- a/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp +++ b/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp @@ -102,7 +102,7 @@ namespace DM // MergeDeltaTaskPool // ================================================ -std::pair DeltaMergeStore::MergeDeltaTaskPool::tryAddTask(const BackgroundTask & task, const ThreadType & whom, const size_t max_task_num, Poco::Logger * log_) +std::pair DeltaMergeStore::MergeDeltaTaskPool::tryAddTask(const BackgroundTask & task, const ThreadType & whom, const size_t max_task_num, const LoggerPtr & log_) { std::scoped_lock lock(mutex); if (light_tasks.size() + heavy_tasks.size() >= max_task_num) @@ -142,7 +142,7 @@ std::pair DeltaMergeStore::MergeDeltaTaskPool::tryAddTask(const Back return std::make_pair(true, is_heavy); } -DeltaMergeStore::BackgroundTask DeltaMergeStore::MergeDeltaTaskPool::nextTask(bool is_heavy, Poco::Logger * log_) +DeltaMergeStore::BackgroundTask DeltaMergeStore::MergeDeltaTaskPool::nextTask(bool is_heavy, const LoggerPtr & log_) { std::scoped_lock lock(mutex); @@ -207,7 +207,7 @@ DeltaMergeStore::DeltaMergeStore(Context & db_context, , blockable_background_pool(db_context.getBlockableBackgroundPool()) , next_gc_check_key(is_common_handle ? RowKeyValue::COMMON_HANDLE_MIN_KEY : RowKeyValue::INT_HANDLE_MIN_KEY) , hash_salt(++DELTA_MERGE_STORE_HASH_SALT) - , log(&Poco::Logger::get("DeltaMergeStore[" + db_name + "." + table_name + "]")) + , log(Logger::get("DeltaMergeStore", fmt::format("{}.{}", db_name, table_name))) { LOG_FMT_INFO(log, "Restore DeltaMerge Store start [{}.{}]", db_name, table_name); @@ -1539,7 +1539,7 @@ namespace GC { // Returns true if it needs gc. // This is for optimization purpose, does not mean to be accurate. -bool shouldCompactStable(const SegmentPtr & seg, DB::Timestamp gc_safepoint, double ratio_threshold, Poco::Logger * log) +bool shouldCompactStable(const SegmentPtr & seg, DB::Timestamp gc_safepoint, double ratio_threshold, const LoggerPtr & log) { // Always GC. if (ratio_threshold < 1.0) @@ -1559,7 +1559,7 @@ bool shouldCompactStable(const SegmentPtr & seg, DB::Timestamp gc_safepoint, dou return false; } -bool shouldCompactDeltaWithStable(const DMContext & context, const SegmentSnapshotPtr & snap, const RowKeyRange & segment_range, double ratio_threshold, Poco::Logger * log) +bool shouldCompactDeltaWithStable(const DMContext & context, const SegmentSnapshotPtr & snap, const RowKeyRange & segment_range, double ratio_threshold, const LoggerPtr & log) { auto actual_delete_range = snap->delta->getSquashDeleteRange().shrink(segment_range); if (actual_delete_range.none()) diff --git a/dbms/src/Storages/DeltaMerge/DeltaMergeStore.h b/dbms/src/Storages/DeltaMerge/DeltaMergeStore.h index 599097adeb0..4f831ddfe0e 100644 --- a/dbms/src/Storages/DeltaMerge/DeltaMergeStore.h +++ b/dbms/src/Storages/DeltaMerge/DeltaMergeStore.h @@ -287,9 +287,9 @@ class DeltaMergeStore : private boost::noncopyable // first element of return value means whether task is added or not // second element of return value means whether task is heavy or not - std::pair tryAddTask(const BackgroundTask & task, const ThreadType & whom, size_t max_task_num, Poco::Logger * log_); + std::pair tryAddTask(const BackgroundTask & task, const ThreadType & whom, size_t max_task_num, const LoggerPtr & log_); - BackgroundTask nextTask(bool is_heavy, Poco::Logger * log_); + BackgroundTask nextTask(bool is_heavy, const LoggerPtr & log_); }; DeltaMergeStore(Context & db_context, // @@ -497,7 +497,7 @@ class DeltaMergeStore : private boost::noncopyable UInt64 hash_salt; - Poco::Logger * log; + LoggerPtr log; }; // namespace DM using DeltaMergeStorePtr = std::shared_ptr; From 0cb882f7c3736889e7a31e6d586b03e0a015b8d6 Mon Sep 17 00:00:00 2001 From: JaySon-Huang Date: Sat, 2 Apr 2022 15:30:04 +0800 Subject: [PATCH 08/10] Add prefix for segment logging --- dbms/src/Common/Logger.h | 28 +++++++++++++++++++++++- dbms/src/Storages/DeltaMerge/Segment.cpp | 7 +++--- 2 files changed, 31 insertions(+), 4 deletions(-) diff --git a/dbms/src/Common/Logger.h b/dbms/src/Common/Logger.h index 677723628ac..02aaa4d8cbe 100644 --- a/dbms/src/Common/Logger.h +++ b/dbms/src/Common/Logger.h @@ -50,8 +50,20 @@ class Logger : private boost::noncopyable return getInternal(source, buf, std::forward(first_identifier), std::forward(rest)...); } + template + static LoggerPtr get(Poco::Logger * source_log, T && first_identifier, Args &&... rest) + { + FmtBuffer buf; + return getInternal(source_log, buf, std::forward(first_identifier), std::forward(rest)...); + } + Logger(const std::string & source, const std::string & identifier) - : logger(&Poco::Logger::get(source)) + : Logger(&Poco::Logger::get(source), identifier) + { + } + + Logger(Poco::Logger * source_log, const std::string & identifier) + : logger(source_log) , id(identifier) { } @@ -114,6 +126,20 @@ class Logger : private boost::noncopyable return std::make_shared(source, buf.toString()); } + template + static LoggerPtr getInternal(Poco::Logger * source_log, FmtBuffer & buf, T && first, Args &&... args) + { + buf.fmtAppend("{} ", std::forward(first)); + return getInternal(source_log, buf, std::forward(args)...); + } + + template + static LoggerPtr getInternal(Poco::Logger * source_log, FmtBuffer & buf, T && identifier) + { + buf.fmtAppend("{}", std::forward(identifier)); + return std::make_shared(source_log, buf.toString()); + } + std::string wrapMsg(const std::string & msg) const { return fmt::format("{} {}", id, msg); diff --git a/dbms/src/Storages/DeltaMerge/Segment.cpp b/dbms/src/Storages/DeltaMerge/Segment.cpp index cb2cc7dbfae..498b64d2af6 100644 --- a/dbms/src/Storages/DeltaMerge/Segment.cpp +++ b/dbms/src/Storages/DeltaMerge/Segment.cpp @@ -433,7 +433,7 @@ BlockInputStreamPtr Segment::getInputStream(const DMContext & dm_context, dm_context.tracing_id); LOG_FMT_TRACE( - Logger::get(log->name(), dm_context.tracing_id), + Logger::get(log, dm_context.tracing_id), "Segment [{}] is read by max_version: {}, {} ranges: {}", segment_id, max_version, @@ -1348,7 +1348,8 @@ Segment::ReadInfo Segment::getReadInfo(const DMContext & dm_context, const RowKeyRanges & read_ranges, UInt64 max_version) const { - LOG_FMT_DEBUG(log, "Segment[{}] getReadInfo start", segment_id); + auto tracing_logger = Logger::get(log, dm_context.tracing_id); + LOG_FMT_DEBUG(tracing_logger, "Segment[{}] getReadInfo start", segment_id); auto new_read_columns = arrangeReadColumns(getExtraHandleColumnDefine(is_common_handle), read_columns); auto pk_ver_col_defs @@ -1363,7 +1364,7 @@ Segment::ReadInfo Segment::getReadInfo(const DMContext & dm_context, // Hold compacted_index reference, to prevent it from deallocated. delta_reader->setDeltaIndex(compacted_index); - LOG_FMT_DEBUG(log, "Segment[{}] getReadInfo end", segment_id); + LOG_FMT_DEBUG(tracing_logger, "Segment[{}] getReadInfo end", segment_id); if (fully_indexed) { From ed9de1a1c5f63a38c3a75464c110600599e3e073 Mon Sep 17 00:00:00 2001 From: JaySon-Huang Date: Thu, 7 Apr 2022 16:25:07 +0800 Subject: [PATCH 09/10] Add table id for tracing_id --- dbms/src/Flash/Coprocessor/DAGStorageInterpreter.cpp | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/dbms/src/Flash/Coprocessor/DAGStorageInterpreter.cpp b/dbms/src/Flash/Coprocessor/DAGStorageInterpreter.cpp index 31f36ef19a4..0a3e6396ece 100644 --- a/dbms/src/Flash/Coprocessor/DAGStorageInterpreter.cpp +++ b/dbms/src/Flash/Coprocessor/DAGStorageInterpreter.cpp @@ -261,7 +261,7 @@ LearnerReadSnapshot DAGStorageInterpreter::doBatchCopLearnerRead() std::unordered_map DAGStorageInterpreter::generateSelectQueryInfos() { std::unordered_map ret; - auto create_query_info = [&]() -> SelectQueryInfo { + auto create_query_info = [&](Int64 table_id) -> SelectQueryInfo { SelectQueryInfo query_info; /// to avoid null point exception query_info.query = makeDummyQuery(); @@ -270,14 +270,14 @@ std::unordered_map DAGStorageInterpreter::generateSele analyzer->getPreparedSets(), analyzer->getCurrentInputColumns(), context.getTimezoneInfo()); - query_info.req_id = log->identifier(); + query_info.req_id = fmt::format("{} Table<{}>", log->identifier(), table_id); return query_info; }; if (table_scan.isPartitionTableScan()) { for (const auto physical_table_id : table_scan.getPhysicalTableIDs()) { - SelectQueryInfo query_info = create_query_info(); + SelectQueryInfo query_info = create_query_info(physical_table_id); query_info.mvcc_query_info = std::make_unique(mvcc_query_info->resolve_locks, mvcc_query_info->read_tso); ret.emplace(physical_table_id, std::move(query_info)); } @@ -293,8 +293,8 @@ std::unordered_map DAGStorageInterpreter::generateSele } else { - TableID table_id = logical_table_id; - SelectQueryInfo query_info = create_query_info(); + const TableID table_id = logical_table_id; + SelectQueryInfo query_info = create_query_info(table_id); query_info.mvcc_query_info = std::move(mvcc_query_info); ret.emplace(table_id, std::move(query_info)); } From eefc6584f5279275d6dce9360ace506bb9e77362 Mon Sep 17 00:00:00 2001 From: JaySon-Huang Date: Thu, 7 Apr 2022 17:18:26 +0800 Subject: [PATCH 10/10] Add epoch for segment --- dbms/src/Storages/DeltaMerge/Segment.cpp | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/dbms/src/Storages/DeltaMerge/Segment.cpp b/dbms/src/Storages/DeltaMerge/Segment.cpp index 498b64d2af6..ac192ff6082 100644 --- a/dbms/src/Storages/DeltaMerge/Segment.cpp +++ b/dbms/src/Storages/DeltaMerge/Segment.cpp @@ -1349,7 +1349,7 @@ Segment::ReadInfo Segment::getReadInfo(const DMContext & dm_context, UInt64 max_version) const { auto tracing_logger = Logger::get(log, dm_context.tracing_id); - LOG_FMT_DEBUG(tracing_logger, "Segment[{}] getReadInfo start", segment_id); + LOG_FMT_DEBUG(tracing_logger, "Segment[{}] [epoch={}] getReadInfo start", segment_id, epoch); auto new_read_columns = arrangeReadColumns(getExtraHandleColumnDefine(is_common_handle), read_columns); auto pk_ver_col_defs @@ -1364,14 +1364,14 @@ Segment::ReadInfo Segment::getReadInfo(const DMContext & dm_context, // Hold compacted_index reference, to prevent it from deallocated. delta_reader->setDeltaIndex(compacted_index); - LOG_FMT_DEBUG(tracing_logger, "Segment[{}] getReadInfo end", segment_id); + LOG_FMT_DEBUG(tracing_logger, "Segment[{}] [epoch={}] getReadInfo end", segment_id, epoch); if (fully_indexed) { // Try update shared index, if my_delta_index is more advanced. bool ok = segment_snap->delta->getSharedDeltaIndex()->updateIfAdvanced(*my_delta_index); if (ok) - LOG_FMT_DEBUG(log, "{} Updated delta index", simpleInfo()); + LOG_FMT_DEBUG(tracing_logger, "{} Updated delta index", simpleInfo()); } // Refresh the reference in DeltaIndexManager, so that the index can be properly managed.