diff --git a/dbms/src/Common/Logger.h b/dbms/src/Common/Logger.h index 677723628ac..02aaa4d8cbe 100644 --- a/dbms/src/Common/Logger.h +++ b/dbms/src/Common/Logger.h @@ -50,8 +50,20 @@ class Logger : private boost::noncopyable return getInternal(source, buf, std::forward(first_identifier), std::forward(rest)...); } + template + static LoggerPtr get(Poco::Logger * source_log, T && first_identifier, Args &&... rest) + { + FmtBuffer buf; + return getInternal(source_log, buf, std::forward(first_identifier), std::forward(rest)...); + } + Logger(const std::string & source, const std::string & identifier) - : logger(&Poco::Logger::get(source)) + : Logger(&Poco::Logger::get(source), identifier) + { + } + + Logger(Poco::Logger * source_log, const std::string & identifier) + : logger(source_log) , id(identifier) { } @@ -114,6 +126,20 @@ class Logger : private boost::noncopyable return std::make_shared(source, buf.toString()); } + template + static LoggerPtr getInternal(Poco::Logger * source_log, FmtBuffer & buf, T && first, Args &&... args) + { + buf.fmtAppend("{} ", std::forward(first)); + return getInternal(source_log, buf, std::forward(args)...); + } + + template + static LoggerPtr getInternal(Poco::Logger * source_log, FmtBuffer & buf, T && identifier) + { + buf.fmtAppend("{}", std::forward(identifier)); + return std::make_shared(source_log, buf.toString()); + } + std::string wrapMsg(const std::string & msg) const { return fmt::format("{} {}", id, msg); diff --git a/dbms/src/Flash/Coprocessor/DAGStorageInterpreter.cpp b/dbms/src/Flash/Coprocessor/DAGStorageInterpreter.cpp index a7a8ca48ef5..0a3e6396ece 100644 --- a/dbms/src/Flash/Coprocessor/DAGStorageInterpreter.cpp +++ b/dbms/src/Flash/Coprocessor/DAGStorageInterpreter.cpp @@ -261,7 +261,7 @@ LearnerReadSnapshot DAGStorageInterpreter::doBatchCopLearnerRead() std::unordered_map DAGStorageInterpreter::generateSelectQueryInfos() { std::unordered_map ret; - auto create_query_info = [&]() -> SelectQueryInfo { + auto create_query_info = [&](Int64 table_id) -> SelectQueryInfo { SelectQueryInfo query_info; /// to avoid null point exception query_info.query = makeDummyQuery(); @@ -270,13 +270,14 @@ std::unordered_map DAGStorageInterpreter::generateSele analyzer->getPreparedSets(), analyzer->getCurrentInputColumns(), context.getTimezoneInfo()); + query_info.req_id = fmt::format("{} Table<{}>", log->identifier(), table_id); return query_info; }; if (table_scan.isPartitionTableScan()) { for (const auto physical_table_id : table_scan.getPhysicalTableIDs()) { - SelectQueryInfo query_info = create_query_info(); + SelectQueryInfo query_info = create_query_info(physical_table_id); query_info.mvcc_query_info = std::make_unique(mvcc_query_info->resolve_locks, mvcc_query_info->read_tso); ret.emplace(physical_table_id, std::move(query_info)); } @@ -292,8 +293,8 @@ std::unordered_map DAGStorageInterpreter::generateSele } else { - TableID table_id = logical_table_id; - SelectQueryInfo query_info = create_query_info(); + const TableID table_id = logical_table_id; + SelectQueryInfo query_info = create_query_info(table_id); query_info.mvcc_query_info = std::move(mvcc_query_info); ret.emplace(table_id, std::move(query_info)); } diff --git a/dbms/src/Server/tests/gtest_dttool.cpp b/dbms/src/Server/tests/gtest_dttool.cpp index 8499f2f863c..4ea163f0507 100644 --- a/dbms/src/Server/tests/gtest_dttool.cpp +++ b/dbms/src/Server/tests/gtest_dttool.cpp @@ -13,6 +13,7 @@ // limitations under the License. #include +#include #include #include #include @@ -287,4 +288,4 @@ TEST_F(DTToolTest, BlockwiseInvariant) EXPECT_EQ(new_prop_iter, refreshed_file->getPackProperties().property().end()); stream->readSuffix(); } -} \ No newline at end of file +} diff --git a/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileBig.cpp b/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileBig.cpp index 8fe9e3ff825..9fe72839628 100644 --- a/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileBig.cpp +++ b/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileBig.cpp @@ -43,7 +43,7 @@ void ColumnFileBig::calculateStat(const DMContext & context) {}, context.db_context.getFileProvider(), context.getReadLimiter(), - /*tracing_logger*/ nullptr); + /*tracing_id*/ context.tracing_id); std::tie(valid_rows, valid_bytes) = pack_filter.validRowsAndBytes(); } @@ -87,7 +87,9 @@ void ColumnFileBigReader::initStream() return; DMFileBlockInputStreamBuilder builder(context.db_context); - file_stream = builder.build(column_file.getFile(), *col_defs, RowKeyRanges{column_file.segment_range}); + file_stream = builder + .setTracingID(context.tracing_id) + .build(column_file.getFile(), *col_defs, RowKeyRanges{column_file.segment_range}); // If we only need to read pk and version columns, then cache columns data in memory. if (pk_ver_only) diff --git a/dbms/src/Storages/DeltaMerge/DMContext.h b/dbms/src/Storages/DeltaMerge/DMContext.h index 30ae4c723d5..1caeb5a87a8 100644 --- a/dbms/src/Storages/DeltaMerge/DMContext.h +++ b/dbms/src/Storages/DeltaMerge/DMContext.h @@ -14,6 +14,7 @@ #pragma once +#include #include #include #include @@ -24,6 +25,7 @@ namespace DB { class StoragePathPool; + namespace DM { class StoragePool; @@ -81,7 +83,7 @@ struct DMContext : private boost::noncopyable const bool enable_relevant_place; const bool enable_skippable_place; - const String query_id; + String tracing_id; public: DMContext(const Context & db_context_, @@ -93,7 +95,7 @@ struct DMContext : private boost::noncopyable bool is_common_handle_, size_t rowkey_column_size_, const DB::Settings & settings, - const String & query_id_ = "") + const String & tracing_id_ = "") : db_context(db_context_) , path_pool(path_pool_) , storage_pool(storage_pool_) @@ -117,7 +119,7 @@ struct DMContext : private boost::noncopyable , read_stable_only(settings.dt_read_stable_only) , enable_relevant_place(settings.dt_enable_relevant_place) , enable_skippable_place(settings.dt_enable_skippable_place) - , query_id(query_id_) + , tracing_id(tracing_id_) { } diff --git a/dbms/src/Storages/DeltaMerge/DMVersionFilterBlockInputStream.h b/dbms/src/Storages/DeltaMerge/DMVersionFilterBlockInputStream.h index 7cf83f08ab3..d512f140109 100644 --- a/dbms/src/Storages/DeltaMerge/DMVersionFilterBlockInputStream.h +++ b/dbms/src/Storages/DeltaMerge/DMVersionFilterBlockInputStream.h @@ -15,6 +15,8 @@ #pragma once #include +#include +#include #include #include #include @@ -36,17 +38,20 @@ class DMVersionFilterBlockInputStream : public IBlockInputStream { static_assert(MODE == DM_VERSION_FILTER_MODE_MVCC || MODE == DM_VERSION_FILTER_MODE_COMPACT); + constexpr static const char * MVCC_FILTER_NAME = "DMVersionFilterBlockInputStream"; + constexpr static const char * COMPACT_FILTER_NAME = "DMVersionFilterBlockInputStream"; + public: DMVersionFilterBlockInputStream(const BlockInputStreamPtr & input, const ColumnDefines & read_columns, UInt64 version_limit_, bool is_common_handle_, - const String & query_id_ = "") + const String & tracing_id = "") : version_limit(version_limit_) , is_common_handle(is_common_handle_) , header(toEmptyBlock(read_columns)) - , query_id(query_id_) - , log(&Poco::Logger::get("DMVersionFilterBlockInputStream<" + String(MODE == DM_VERSION_FILTER_MODE_MVCC ? "MVCC" : "COMPACT") + ">")) + , log(Logger::get((MODE == DM_VERSION_FILTER_MODE_MVCC ? MVCC_FILTER_NAME : COMPACT_FILTER_NAME), + tracing_id)) { children.push_back(input); @@ -60,15 +65,17 @@ class DMVersionFilterBlockInputStream : public IBlockInputStream ~DMVersionFilterBlockInputStream() { LOG_FMT_DEBUG(log, - "Total rows: {}, pass: {:.2f}%, complete pass: {:.2f}%, complete not pass: {:.2f}%, not clean: {:.2f}%, effective: {:.2f}%, read tso: {}, query id: {}", + "Total rows: {}, pass: {:.2f}%" + ", complete pass: {:.2f}%, complete not pass: {:.2f}%" + ", not clean: {:.2f}%, effective: {:.2f}%" + ", read tso: {}", total_rows, passed_rows * 100.0 / total_rows, complete_passed * 100.0 / total_blocks, complete_not_passed * 100.0 / total_blocks, not_clean_rows * 100.0 / passed_rows, effective_num_rows * 100.0 / passed_rows, - version_limit, - (query_id.empty() ? "" : query_id)); + version_limit); } void readPrefix() override; @@ -192,7 +199,6 @@ class DMVersionFilterBlockInputStream : public IBlockInputStream const UInt64 version_limit; const bool is_common_handle; const Block header; - const String query_id; size_t handle_col_pos; size_t version_col_pos; @@ -230,7 +236,7 @@ class DMVersionFilterBlockInputStream : public IBlockInputStream size_t not_clean_rows = 0; size_t effective_num_rows = 0; - Poco::Logger * const log; + const LoggerPtr log; }; } // namespace DM } // namespace DB diff --git a/dbms/src/Storages/DeltaMerge/Delta/Snapshot.cpp b/dbms/src/Storages/DeltaMerge/Delta/Snapshot.cpp index af1fdfec94f..cc161d0f4d9 100644 --- a/dbms/src/Storages/DeltaMerge/Delta/Snapshot.cpp +++ b/dbms/src/Storages/DeltaMerge/Delta/Snapshot.cpp @@ -38,8 +38,7 @@ DeltaSnapshotPtr DeltaValueSpace::createSnapshot(const DMContext & context, bool snap->is_update = for_update; snap->_delta = this->shared_from_this(); - // TODO: Add tracing_id from mpp task or background tasks - auto storage_snap = std::make_shared(context.storage_pool, context.getReadLimiter(), /*tracing_id*/ "", true); + auto storage_snap = std::make_shared(context.storage_pool, context.getReadLimiter(), context.tracing_id, /*snapshot_read*/ true); snap->persisted_files_snap = persisted_file_set->createSnapshot(storage_snap); snap->shared_delta_index = delta_index; diff --git a/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp b/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp index 25abdbbb07d..11ec13f25dd 100644 --- a/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp +++ b/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp @@ -102,7 +102,7 @@ namespace DM // MergeDeltaTaskPool // ================================================ -std::pair DeltaMergeStore::MergeDeltaTaskPool::tryAddTask(const BackgroundTask & task, const ThreadType & whom, const size_t max_task_num, Poco::Logger * log_) +std::pair DeltaMergeStore::MergeDeltaTaskPool::tryAddTask(const BackgroundTask & task, const ThreadType & whom, const size_t max_task_num, const LoggerPtr & log_) { std::scoped_lock lock(mutex); if (light_tasks.size() + heavy_tasks.size() >= max_task_num) @@ -142,7 +142,7 @@ std::pair DeltaMergeStore::MergeDeltaTaskPool::tryAddTask(const Back return std::make_pair(true, is_heavy); } -DeltaMergeStore::BackgroundTask DeltaMergeStore::MergeDeltaTaskPool::nextTask(bool is_heavy, Poco::Logger * log_) +DeltaMergeStore::BackgroundTask DeltaMergeStore::MergeDeltaTaskPool::nextTask(bool is_heavy, const LoggerPtr & log_) { std::scoped_lock lock(mutex); @@ -207,7 +207,7 @@ DeltaMergeStore::DeltaMergeStore(Context & db_context, , blockable_background_pool(db_context.getBlockableBackgroundPool()) , next_gc_check_key(is_common_handle ? RowKeyValue::COMMON_HANDLE_MIN_KEY : RowKeyValue::INT_HANDLE_MIN_KEY) , hash_salt(++DELTA_MERGE_STORE_HASH_SALT) - , log(&Poco::Logger::get("DeltaMergeStore[" + db_name + "." + table_name + "]")) + , log(Logger::get("DeltaMergeStore", fmt::format("{}.{}", db_name, table_name))) { LOG_FMT_INFO(log, "Restore DeltaMerge Store start [{}.{}]", db_name, table_name); @@ -442,7 +442,7 @@ void DeltaMergeStore::shutdown() LOG_FMT_TRACE(log, "Shutdown DeltaMerge end [{}.{}]", db_name, table_name); } -DMContextPtr DeltaMergeStore::newDMContext(const Context & db_context, const DB::Settings & db_settings, const String & query_id) +DMContextPtr DeltaMergeStore::newDMContext(const Context & db_context, const DB::Settings & db_settings, const String & tracing_id) { std::shared_lock lock(read_write_mutex); @@ -458,7 +458,7 @@ DMContextPtr DeltaMergeStore::newDMContext(const Context & db_context, const DB: is_common_handle, rowkey_column_size, db_settings, - query_id); + tracing_id); return DMContextPtr(ctx); } @@ -529,7 +529,7 @@ void DeltaMergeStore::write(const Context & db_context, const DB::Settings & db_ if (rows == 0) return; - auto dm_context = newDMContext(db_context, db_settings); + auto dm_context = newDMContext(db_context, db_settings, "write"); const auto bytes = block.bytes(); @@ -588,8 +588,8 @@ void DeltaMergeStore::write(const Context & db_context, const DB::Settings & db_ auto alloc_bytes = block.bytes(offset, limit); bool is_small = limit < dm_context->delta_cache_limit_rows / 4 && alloc_bytes < dm_context->delta_cache_limit_bytes / 4; - // Small column fies are appended to Delta Cache, then flushed later. - // While large column fies are directly written to PageStorage. + // Small column files are appended to Delta Cache, then flushed later. + // While large column files are directly written to PageStorage. if (is_small) { if (segment->writeToCache(*dm_context, block, offset, limit)) @@ -641,6 +641,7 @@ void DeltaMergeStore::write(const Context & db_context, const DB::Settings & db_ throw Exception("Fail point random_exception_after_dt_write_done is triggered.", ErrorCodes::FAIL_POINT_ERROR); }); + // TODO: Update the tracing_id before checkSegmentUpdate for (auto & segment : updated_segments) checkSegmentUpdate(dm_context, segment, ThreadType::Write); } @@ -854,6 +855,7 @@ void DeltaMergeStore::ingestFiles( flushCache(dm_context, range); + // TODO: Update the tracing_id before checkSegmentUpdate? for (auto & segment : updated_segments) checkSegmentUpdate(dm_context, segment, ThreadType::Write); } @@ -867,7 +869,7 @@ void DeltaMergeStore::deleteRange(const Context & db_context, const DB::Settings if (delete_range.none()) return; - auto dm_context = newDMContext(db_context, db_settings); + auto dm_context = newDMContext(db_context, db_settings, "delete_range"); Segments updated_segments; @@ -912,6 +914,7 @@ void DeltaMergeStore::deleteRange(const Context & db_context, const DB::Settings cur_range.setEnd(delete_range.end); } + // TODO: Update the tracing_id before checkSegmentUpdate? for (auto & segment : updated_segments) checkSegmentUpdate(dm_context, segment, ThreadType::Write); } @@ -954,7 +957,7 @@ void DeltaMergeStore::flushCache(const DMContextPtr & dm_context, const RowKeyRa void DeltaMergeStore::mergeDeltaAll(const Context & context) { - auto dm_context = newDMContext(context, context.getSettingsRef()); + auto dm_context = newDMContext(context, context.getSettingsRef(), /*tracing_id*/ "mergeDeltaAll"); std::vector all_segments; { @@ -974,7 +977,7 @@ void DeltaMergeStore::mergeDeltaAll(const Context & context) void DeltaMergeStore::compact(const Context & db_context, const RowKeyRange & range) { - auto dm_context = newDMContext(db_context, db_context.getSettingsRef()); + auto dm_context = newDMContext(db_context, db_context.getSettingsRef(), /*tracing_id*/ "compact"); RowKeyRange cur_range = range; while (!cur_range.none()) @@ -1009,6 +1012,8 @@ void DeltaMergeStore::compact(const Context & db_context, const RowKeyRange & ra } } +// Read data without mvcc filtering && delete-range filtering. +// just for debug BlockInputStreams DeltaMergeStore::readRaw(const Context & db_context, const DB::Settings & db_settings, const ColumnDefines & columns_to_read, @@ -1018,7 +1023,7 @@ BlockInputStreams DeltaMergeStore::readRaw(const Context & db_context, { SegmentReadTasks tasks; - auto dm_context = newDMContext(db_context, db_settings, db_context.getCurrentQueryId()); + auto dm_context = newDMContext(db_context, db_settings, fmt::format("read_raw_{}", db_context.getCurrentQueryId())); { std::shared_lock lock(read_write_mutex); @@ -1073,17 +1078,21 @@ BlockInputStreams DeltaMergeStore::read(const Context & db_context, size_t num_streams, UInt64 max_version, const RSOperatorPtr & filter, + const String & tracing_id, size_t expected_block_size, const SegmentIdSet & read_segments, size_t extra_table_id_index) { - auto dm_context = newDMContext(db_context, db_settings, db_context.getCurrentQueryId()); + // Use the id from MPP/Coprocessor level as tracing_id + auto dm_context = newDMContext(db_context, db_settings, tracing_id); SegmentReadTasks tasks = getReadTasksByRanges(*dm_context, sorted_ranges, num_streams, read_segments); - LOG_FMT_DEBUG(log, "Read create segment snapshot done"); + auto tracing_logger = Logger::get(log->name(), dm_context->tracing_id); + LOG_FMT_DEBUG(tracing_logger, "Read create segment snapshot done"); auto after_segment_read = [&](const DMContextPtr & dm_context_, const SegmentPtr & segment_) { + // TODO: Update the tracing_id before checkSegmentUpdate? this->checkSegmentUpdate(dm_context_, segment_, ThreadType::Read); }; @@ -1113,7 +1122,7 @@ BlockInputStreams DeltaMergeStore::read(const Context & db_context, res.push_back(stream); } - LOG_FMT_DEBUG(log, "Read create stream done"); + LOG_FMT_DEBUG(tracing_logger, "Read create stream done"); return res; } @@ -1530,7 +1539,7 @@ namespace GC { // Returns true if it needs gc. // This is for optimization purpose, does not mean to be accurate. -bool shouldCompactStable(const SegmentPtr & seg, DB::Timestamp gc_safepoint, double ratio_threshold, Poco::Logger * log) +bool shouldCompactStable(const SegmentPtr & seg, DB::Timestamp gc_safepoint, double ratio_threshold, const LoggerPtr & log) { // Always GC. if (ratio_threshold < 1.0) @@ -1550,7 +1559,7 @@ bool shouldCompactStable(const SegmentPtr & seg, DB::Timestamp gc_safepoint, dou return false; } -bool shouldCompactDeltaWithStable(const DMContext & context, const SegmentSnapshotPtr & snap, const RowKeyRange & segment_range, double ratio_threshold, Poco::Logger * log) +bool shouldCompactDeltaWithStable(const DMContext & context, const SegmentSnapshotPtr & snap, const RowKeyRange & segment_range, double ratio_threshold, const LoggerPtr & log) { auto actual_delete_range = snap->delta->getSquashDeleteRange().shrink(segment_range); if (actual_delete_range.none()) @@ -1610,7 +1619,7 @@ UInt64 DeltaMergeStore::onSyncGc(Int64 limit) if (shutdown_called.load(std::memory_order_relaxed)) break; - auto dm_context = newDMContext(global_context, global_context.getSettingsRef()); + auto dm_context = newDMContext(global_context, global_context.getSettingsRef(), "onSyncGc"); SegmentPtr segment; SegmentSnapshotPtr segment_snap; { @@ -2501,8 +2510,9 @@ SegmentReadTasks DeltaMergeStore::getReadTasksByRanges( total_ranges += task->ranges.size(); } + auto tracing_logger = Logger::get(log->name(), dm_context.tracing_id); LOG_FMT_DEBUG( - log, + tracing_logger, "[sorted_ranges: {}] [tasks before split: {}] [tasks final: {}] [ranges final: {}]", sorted_ranges.size(), tasks.size(), diff --git a/dbms/src/Storages/DeltaMerge/DeltaMergeStore.h b/dbms/src/Storages/DeltaMerge/DeltaMergeStore.h index 51e50a255dc..4f831ddfe0e 100644 --- a/dbms/src/Storages/DeltaMerge/DeltaMergeStore.h +++ b/dbms/src/Storages/DeltaMerge/DeltaMergeStore.h @@ -287,9 +287,9 @@ class DeltaMergeStore : private boost::noncopyable // first element of return value means whether task is added or not // second element of return value means whether task is heavy or not - std::pair tryAddTask(const BackgroundTask & task, const ThreadType & whom, size_t max_task_num, Poco::Logger * log_); + std::pair tryAddTask(const BackgroundTask & task, const ThreadType & whom, size_t max_task_num, const LoggerPtr & log_); - BackgroundTask nextTask(bool is_heavy, Poco::Logger * log_); + BackgroundTask nextTask(bool is_heavy, const LoggerPtr & log_); }; DeltaMergeStore(Context & db_context, // @@ -358,6 +358,7 @@ class DeltaMergeStore : private boost::noncopyable size_t num_streams, UInt64 max_version, const RSOperatorPtr & filter, + const String & tracing_id, size_t expected_block_size = DEFAULT_BLOCK_SIZE, const SegmentIdSet & read_segments = {}, size_t extra_table_id_index = InvalidColumnID); @@ -414,7 +415,7 @@ class DeltaMergeStore : private boost::noncopyable private: #endif - DMContextPtr newDMContext(const Context & db_context, const DB::Settings & db_settings, const String & query_id = ""); + DMContextPtr newDMContext(const Context & db_context, const DB::Settings & db_settings, const String & tracing_id = ""); static bool pkIsHandle(const ColumnDefine & handle_define) { return handle_define.id != EXTRA_HANDLE_COLUMN_ID; } @@ -496,7 +497,7 @@ class DeltaMergeStore : private boost::noncopyable UInt64 hash_salt; - Poco::Logger * log; + LoggerPtr log; }; // namespace DM using DeltaMergeStorePtr = std::shared_ptr; diff --git a/dbms/src/Storages/DeltaMerge/File/DMFileBlockInputStream.cpp b/dbms/src/Storages/DeltaMerge/File/DMFileBlockInputStream.cpp index 749423b0bfa..c9212c4b81e 100644 --- a/dbms/src/Storages/DeltaMerge/File/DMFileBlockInputStream.cpp +++ b/dbms/src/Storages/DeltaMerge/File/DMFileBlockInputStream.cpp @@ -54,7 +54,7 @@ DMFileBlockInputStreamPtr DMFileBlockInputStreamBuilder::build(const DMFilePtr & read_packs, file_provider, read_limiter, - tracing_logger); + tracing_id); DMFileReader reader( dmfile, @@ -72,7 +72,7 @@ DMFileBlockInputStreamPtr DMFileBlockInputStreamBuilder::build(const DMFilePtr & read_limiter, rows_threshold_per_read, read_one_pack_every_time, - tracing_logger); + tracing_id); return std::make_shared(std::move(reader)); } diff --git a/dbms/src/Storages/DeltaMerge/File/DMFileBlockInputStream.h b/dbms/src/Storages/DeltaMerge/File/DMFileBlockInputStream.h index 9f166acd5e1..a36bf50a937 100644 --- a/dbms/src/Storages/DeltaMerge/File/DMFileBlockInputStream.h +++ b/dbms/src/Storages/DeltaMerge/File/DMFileBlockInputStream.h @@ -111,9 +111,9 @@ class DMFileBlockInputStreamBuilder return *this; } - DMFileBlockInputStreamBuilder & setTracingLogger(const DB::LoggerPtr & logger) + DMFileBlockInputStreamBuilder & setTracingID(const String & tracing_id_) { - tracing_logger = logger; + tracing_id = tracing_id_; return *this; } @@ -155,7 +155,7 @@ class DMFileBlockInputStreamBuilder size_t rows_threshold_per_read = DMFILE_READ_ROWS_THRESHOLD; bool read_one_pack_every_time = false; - DB::LoggerPtr tracing_logger; + String tracing_id; }; /** diff --git a/dbms/src/Storages/DeltaMerge/File/DMFilePackFilter.h b/dbms/src/Storages/DeltaMerge/File/DMFilePackFilter.h index ee59fb7bc1c..ea0c3265757 100644 --- a/dbms/src/Storages/DeltaMerge/File/DMFilePackFilter.h +++ b/dbms/src/Storages/DeltaMerge/File/DMFilePackFilter.h @@ -14,6 +14,7 @@ #pragma once +#include #include #include #include @@ -49,9 +50,9 @@ class DMFilePackFilter const IdSetPtr & read_packs, const FileProviderPtr & file_provider, const ReadLimiterPtr & read_limiter, - const DB::LoggerPtr & tracing_logger) + const String & tracing_id) { - auto pack_filter = DMFilePackFilter(dmfile, index_cache, set_cache_if_miss, rowkey_ranges, filter, read_packs, file_provider, read_limiter, tracing_logger); + auto pack_filter = DMFilePackFilter(dmfile, index_cache, set_cache_if_miss, rowkey_ranges, filter, read_packs, file_provider, read_limiter, tracing_id); pack_filter.init(); return pack_filter; } @@ -109,7 +110,7 @@ class DMFilePackFilter const IdSetPtr & read_packs_, // filter by pack index const FileProviderPtr & file_provider_, const ReadLimiterPtr & read_limiter_, - const DB::LoggerPtr & tracing_logger) + const String & tracing_id) : dmfile(dmfile_) , index_cache(index_cache_) , set_cache_if_miss(set_cache_if_miss_) @@ -119,7 +120,7 @@ class DMFilePackFilter , file_provider(file_provider_) , handle_res(dmfile->getPacks(), RSResult::All) , use_packs(dmfile->getPacks()) - , log(tracing_logger ? tracing_logger : DB::Logger::get("DMFilePackFilter")) + , log(Logger::get("DMFilePackFilter", tracing_id)) , read_limiter(read_limiter_) { } @@ -299,7 +300,7 @@ class DMFilePackFilter std::vector handle_res; std::vector use_packs; - DB::LoggerPtr log; + LoggerPtr log; ReadLimiterPtr read_limiter; }; diff --git a/dbms/src/Storages/DeltaMerge/File/DMFileReader.cpp b/dbms/src/Storages/DeltaMerge/File/DMFileReader.cpp index 2ffc75c938f..423d8d4d031 100644 --- a/dbms/src/Storages/DeltaMerge/File/DMFileReader.cpp +++ b/dbms/src/Storages/DeltaMerge/File/DMFileReader.cpp @@ -44,7 +44,7 @@ DMFileReader::Stream::Stream( const String & file_name_base, size_t aio_threshold, size_t max_read_buffer_size, - const DB::LoggerPtr & log, + const LoggerPtr & log, const ReadLimiterPtr & read_limiter) : single_file_mode(reader.single_file_mode) , avg_size_hint(reader.dmfile->getColumnStat(col_id).avg_size) @@ -223,7 +223,7 @@ DMFileReader::DMFileReader( const ReadLimiterPtr & read_limiter, size_t rows_threshold_per_read_, bool read_one_pack_every_time_, - const DB::LoggerPtr & tracing_logger) + const String & tracing_id_) : dmfile(dmfile_) , read_columns(read_columns_) , is_common_handle(is_common_handle_) @@ -238,7 +238,7 @@ DMFileReader::DMFileReader( , column_cache(column_cache_) , rows_threshold_per_read(rows_threshold_per_read_) , file_provider(file_provider_) - , log(tracing_logger ? tracing_logger : DB::Logger::get("DMFileReader")) + , log(Logger::get("DMFileReader", tracing_id_)) { for (const auto & cd : read_columns) { diff --git a/dbms/src/Storages/DeltaMerge/File/DMFileReader.h b/dbms/src/Storages/DeltaMerge/File/DMFileReader.h index e844df64cfd..9211918c2d0 100644 --- a/dbms/src/Storages/DeltaMerge/File/DMFileReader.h +++ b/dbms/src/Storages/DeltaMerge/File/DMFileReader.h @@ -44,7 +44,7 @@ class DMFileReader const String & file_name_base, size_t aio_threshold, size_t max_read_buffer_size, - const DB::LoggerPtr & log, + const LoggerPtr & log, const ReadLimiterPtr & read_limiter); const bool single_file_mode; @@ -90,7 +90,7 @@ class DMFileReader const ReadLimiterPtr & read_limiter, size_t rows_threshold_per_read_, bool read_one_pack_every_time_, - const DB::LoggerPtr & tracing_logger); + const String & tracing_id_); Block getHeader() const { return toEmptyBlock(read_columns); } @@ -142,7 +142,7 @@ class DMFileReader FileProviderPtr file_provider; - DB::LoggerPtr log; + LoggerPtr log; }; } // namespace DM diff --git a/dbms/src/Storages/DeltaMerge/FilterParser/FilterParser.cpp b/dbms/src/Storages/DeltaMerge/FilterParser/FilterParser.cpp index 3db5fb8610c..e90eab156a4 100644 --- a/dbms/src/Storages/DeltaMerge/FilterParser/FilterParser.cpp +++ b/dbms/src/Storages/DeltaMerge/FilterParser/FilterParser.cpp @@ -106,7 +106,7 @@ inline RSOperatorPtr parseTiCompareExpr( // const ColumnDefines & columns_to_read, const FilterParser::AttrCreatorByColumnID & creator, const TimezoneInfo & timezone_info, - Poco::Logger * /* log */) + const LoggerPtr & /*log*/) { if (unlikely(expr.children_size() != 2)) return createUnsupported(expr.ShortDebugString(), @@ -247,7 +247,7 @@ RSOperatorPtr parseTiExpr(const tipb::Expr & expr, const ColumnDefines & columns_to_read, const FilterParser::AttrCreatorByColumnID & creator, const TimezoneInfo & timezone_info, - Poco::Logger * log) + const LoggerPtr & log) { assert(isFunctionExpr(expr)); @@ -331,7 +331,7 @@ inline RSOperatorPtr tryParse(const tipb::Expr & filter, const ColumnDefines & columns_to_read, const FilterParser::AttrCreatorByColumnID & creator, const TimezoneInfo & timezone_info, - Poco::Logger * log) + const LoggerPtr & log) { if (isFunctionExpr(filter)) return cop::parseTiExpr(filter, columns_to_read, creator, timezone_info, log); @@ -345,7 +345,7 @@ inline RSOperatorPtr tryParse(const tipb::Expr & filter, RSOperatorPtr FilterParser::parseDAGQuery(const DAGQueryInfo & dag_info, const ColumnDefines & columns_to_read, FilterParser::AttrCreatorByColumnID && creator, - Poco::Logger * log) + const LoggerPtr & log) { RSOperatorPtr op = EMPTY_FILTER; if (dag_info.filters.empty()) diff --git a/dbms/src/Storages/DeltaMerge/FilterParser/FilterParser.h b/dbms/src/Storages/DeltaMerge/FilterParser/FilterParser.h index e9428a3b0ba..79d11f82d4f 100644 --- a/dbms/src/Storages/DeltaMerge/FilterParser/FilterParser.h +++ b/dbms/src/Storages/DeltaMerge/FilterParser/FilterParser.h @@ -47,7 +47,7 @@ class FilterParser const DAGQueryInfo & dag_info, const ColumnDefines & columns_to_read, AttrCreatorByColumnID && creator, - Poco::Logger * log); + const LoggerPtr & log); /// Some helper structure diff --git a/dbms/src/Storages/DeltaMerge/RowKeyRangeUtils.cpp b/dbms/src/Storages/DeltaMerge/RowKeyRangeUtils.cpp index dc1fae350f9..a2d992e0e8b 100644 --- a/dbms/src/Storages/DeltaMerge/RowKeyRangeUtils.cpp +++ b/dbms/src/Storages/DeltaMerge/RowKeyRangeUtils.cpp @@ -12,8 +12,10 @@ // See the License for the specific language governing permissions and // limitations under the License. +#include #include #include +#include namespace DB { @@ -44,7 +46,7 @@ class MergeRangeHelper public: explicit MergeRangeHelper(RowKeyRanges && sorted_ranges_) - : sorted_ranges(std::move(sorted_ranges_)) // + : sorted_ranges(std::move(sorted_ranges_)) { genMergeStats(); } @@ -156,7 +158,7 @@ void sortRangesByStartEdge(RowKeyRanges & ranges) }); } -RowKeyRanges tryMergeRanges(RowKeyRanges && sorted_ranges, size_t expected_ranges_count, Poco::Logger * log) +RowKeyRanges tryMergeRanges(RowKeyRanges && sorted_ranges, size_t expected_ranges_count, const LoggerPtr & log) { if (sorted_ranges.size() <= 1) return std::move(sorted_ranges); @@ -170,7 +172,6 @@ RowKeyRanges tryMergeRanges(RowKeyRanges && sorted_ranges, size_t expected_range /// Try to make the number of merged_ranges result larger or equal to expected_ranges_count. do_merge_ranges.trySplit(expected_ranges_count); - if (log) LOG_FMT_TRACE(log, "[original ranges: {}] [expected ranges: {}] [after merged ranges: {}] [final ranges: {}]", ori_size, expected_ranges_count, after_merge_count, do_merge_ranges.currentRangesCount()); diff --git a/dbms/src/Storages/DeltaMerge/RowKeyRangeUtils.h b/dbms/src/Storages/DeltaMerge/RowKeyRangeUtils.h index 8331c11993b..59b2c46d45b 100644 --- a/dbms/src/Storages/DeltaMerge/RowKeyRangeUtils.h +++ b/dbms/src/Storages/DeltaMerge/RowKeyRangeUtils.h @@ -15,15 +15,17 @@ #pragma once #include -#include namespace DB { +class Logger; +using LoggerPtr = std::shared_ptr; + namespace DM { void sortRangesByStartEdge(RowKeyRanges & ranges); -RowKeyRanges tryMergeRanges(RowKeyRanges && ranges, size_t expected_ranges_count, Poco::Logger * log = nullptr); +RowKeyRanges tryMergeRanges(RowKeyRanges && ranges, size_t expected_ranges_count, const LoggerPtr & log = nullptr); } // namespace DM -} // namespace DB \ No newline at end of file +} // namespace DB diff --git a/dbms/src/Storages/DeltaMerge/Segment.cpp b/dbms/src/Storages/DeltaMerge/Segment.cpp index a38aa202954..ac192ff6082 100644 --- a/dbms/src/Storages/DeltaMerge/Segment.cpp +++ b/dbms/src/Storages/DeltaMerge/Segment.cpp @@ -382,7 +382,7 @@ BlockInputStreamPtr Segment::getInputStream(const DMContext & dm_context, BlockInputStreamPtr stream; if (dm_context.read_delta_only) { - throw Exception("Unsupported"); + throw Exception("Unsupported for read_delta_only"); } else if (dm_context.read_stable_only) { @@ -430,10 +430,10 @@ BlockInputStreamPtr Segment::getInputStream(const DMContext & dm_context, columns_to_read, max_version, is_common_handle, - dm_context.query_id); + dm_context.tracing_id); LOG_FMT_TRACE( - log, + Logger::get(log, dm_context.tracing_id), "Segment [{}] is read by max_version: {}, {} ranges: {}", segment_id, max_version, @@ -727,6 +727,7 @@ std::optional Segment::getSplitPointFast(DMContext & dm_context, co auto stream = builder .setColumnCache(stable_snap->getColumnCaches()[file_index]) .setReadPacks(read_pack) + .setTracingID(fmt::format("{}-getSplitPointFast", dm_context.tracing_id)) .build( read_file, /*read_columns=*/{getExtraHandleColumnDefine(is_common_handle)}, @@ -1347,7 +1348,8 @@ Segment::ReadInfo Segment::getReadInfo(const DMContext & dm_context, const RowKeyRanges & read_ranges, UInt64 max_version) const { - LOG_FMT_DEBUG(log, "Segment[{}] getReadInfo start", segment_id); + auto tracing_logger = Logger::get(log, dm_context.tracing_id); + LOG_FMT_DEBUG(tracing_logger, "Segment[{}] [epoch={}] getReadInfo start", segment_id, epoch); auto new_read_columns = arrangeReadColumns(getExtraHandleColumnDefine(is_common_handle), read_columns); auto pk_ver_col_defs @@ -1362,14 +1364,14 @@ Segment::ReadInfo Segment::getReadInfo(const DMContext & dm_context, // Hold compacted_index reference, to prevent it from deallocated. delta_reader->setDeltaIndex(compacted_index); - LOG_FMT_DEBUG(log, "Segment[{}] getReadInfo end", segment_id); + LOG_FMT_DEBUG(tracing_logger, "Segment[{}] [epoch={}] getReadInfo end", segment_id, epoch); if (fully_indexed) { // Try update shared index, if my_delta_index is more advanced. bool ok = segment_snap->delta->getSharedDeltaIndex()->updateIfAdvanced(*my_delta_index); if (ok) - LOG_FMT_DEBUG(log, "{} Updated delta index", simpleInfo()); + LOG_FMT_DEBUG(tracing_logger, "{} Updated delta index", simpleInfo()); } // Refresh the reference in DeltaIndexManager, so that the index can be properly managed. diff --git a/dbms/src/Storages/DeltaMerge/SegmentReadTaskPool.h b/dbms/src/Storages/DeltaMerge/SegmentReadTaskPool.h index fa0a1b93b22..dc14716af80 100644 --- a/dbms/src/Storages/DeltaMerge/SegmentReadTaskPool.h +++ b/dbms/src/Storages/DeltaMerge/SegmentReadTaskPool.h @@ -60,7 +60,7 @@ struct SegmentReadTask class SegmentReadTaskPool : private boost::noncopyable { public: - SegmentReadTaskPool(SegmentReadTasks && tasks_) + explicit SegmentReadTaskPool(SegmentReadTasks && tasks_) : tasks(std::move(tasks_)) {} diff --git a/dbms/src/Storages/DeltaMerge/StableValueSpace.cpp b/dbms/src/Storages/DeltaMerge/StableValueSpace.cpp index 01cfdccfc38..648ffd4f084 100644 --- a/dbms/src/Storages/DeltaMerge/StableValueSpace.cpp +++ b/dbms/src/Storages/DeltaMerge/StableValueSpace.cpp @@ -60,7 +60,7 @@ void StableValueSpace::setFiles(const DMFiles & files_, const RowKeyRange & rang {}, dm_context->db_context.getFileProvider(), dm_context->getReadLimiter(), - /*tracing_logger*/ nullptr); + dm_context->tracing_id); auto [file_valid_rows, file_valid_bytes] = pack_filter.validRowsAndBytes(); rows += file_valid_rows; bytes += file_valid_bytes; @@ -130,7 +130,7 @@ size_t StableValueSpace::getBytes() const size_t StableValueSpace::getBytesOnDisk() const { - // If this stable value space is logical splited, some file may not used, + // If this stable value space is logical splitted, some file may not used, // and this will return more bytes than actual used. size_t bytes = 0; for (const auto & file : files) @@ -184,7 +184,7 @@ void StableValueSpace::calculateStableProperty(const DMContext & context, const const auto & pack_properties = file->getPackProperties(); if (pack_stats.empty()) continue; - // if PackPropertys of this DMFile is empty, this must be an old format file generated by previous version. + // if PackProperties of this DMFile is empty, this must be an old format file generated by previous version. // so we need to create file property for this file. // but to keep dmfile immutable, we just cache the result in memory. // @@ -209,6 +209,7 @@ void StableValueSpace::calculateStableProperty(const DMContext & context, const BlockInputStreamPtr data_stream = builder .setRowsThreshold(std::numeric_limits::max()) // because we just read one pack at a time .onlyReadOnePackEveryTime() + .setTracingID(fmt::format("{}-calculateStableProperty", context.tracing_id)) .build(file, read_columns, RowKeyRanges{rowkey_range}); auto mvcc_stream = std::make_shared>( data_stream, @@ -243,7 +244,7 @@ void StableValueSpace::calculateStableProperty(const DMContext & context, const {}, context.db_context.getFileProvider(), context.getReadLimiter(), - /*tracing_logger*/ nullptr); + context.tracing_id); const auto & use_packs = pack_filter.getUsePacks(); size_t new_pack_properties_index = 0; bool use_new_pack_properties = pack_properties.property_size() == 0; @@ -257,9 +258,9 @@ void StableValueSpace::calculateStableProperty(const DMContext & context, const } if (unlikely((size_t)new_pack_properties.property_size() != use_packs_count)) { - throw Exception("new_pack_propertys size " + std::to_string(new_pack_properties.property_size()) - + " doesn't match use packs size " + std::to_string(use_packs_count), - ErrorCodes::LOGICAL_ERROR); + throw Exception( + fmt::format("size doesn't match [new_pack_properties_size={}] [use_packs_size={}]", new_pack_properties.property_size(), use_packs_count), + ErrorCodes::LOGICAL_ERROR); } } for (size_t pack_id = 0; pack_id < use_packs.size(); pack_id++) @@ -319,13 +320,15 @@ void StableValueSpace::drop(const FileProviderPtr & file_provider) } } -SkippableBlockInputStreamPtr StableValueSpace::Snapshot::getInputStream(const DMContext & context, // - const ColumnDefines & read_columns, - const RowKeyRanges & rowkey_ranges, - const RSOperatorPtr & filter, - UInt64 max_data_version, - size_t expected_block_size, - bool enable_clean_read) +SkippableBlockInputStreamPtr +StableValueSpace::Snapshot::getInputStream( + const DMContext & context, + const ColumnDefines & read_columns, + const RowKeyRanges & rowkey_ranges, + const RSOperatorPtr & filter, + UInt64 max_data_version, + size_t expected_block_size, + bool enable_clean_read) { LOG_FMT_DEBUG(log, "max_data_version: {}, enable_clean_read: {}", max_data_version, enable_clean_read); SkippableBlockInputStreams streams; @@ -337,6 +340,7 @@ SkippableBlockInputStreamPtr StableValueSpace::Snapshot::getInputStream(const DM .enableCleanRead(enable_clean_read, max_data_version) .setRSOperator(filter) .setColumnCache(column_caches[i]) + .setTracingID(context.tracing_id) .setRowsThreshold(expected_block_size); streams.push_back(builder.build(stable->files[i], read_columns, rowkey_ranges)); } @@ -366,7 +370,7 @@ RowsAndBytes StableValueSpace::Snapshot::getApproxRowsAndBytes(const DMContext & IdSetPtr{}, context.db_context.getFileProvider(), context.getReadLimiter(), - /*tracing_logger*/ nullptr); + context.tracing_id); const auto & pack_stats = f->getPackStats(); const auto & use_packs = filter.getUsePacks(); for (size_t i = 0; i < pack_stats.size(); ++i) diff --git a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_delta_merge_store.cpp b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_delta_merge_store.cpp index 9ebe1cc7bb7..bb0e47bddbd 100644 --- a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_delta_merge_store.cpp +++ b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_delta_merge_store.cpp @@ -12,7 +12,9 @@ // See the License for the specific language governing permissions and // limitations under the License. +#include #include +#include #include #include #include @@ -206,6 +208,8 @@ class DeltaMergeStoreRWTest protected: TestMode mode; DeltaMergeStorePtr store; + + constexpr static const char * TRACING_NAME = "DeltaMergeStoreRWTest"; }; TEST_F(DeltaMergeStoreTest, Create) @@ -414,6 +418,7 @@ try /* num_streams= */ 1, /* max_version= */ std::numeric_limits::max(), EMPTY_FILTER, + TRACING_NAME, /* expected_block_size= */ 1024)[0]; size_t num_rows_read = 0; @@ -519,6 +524,7 @@ try /* num_streams= */ 1, /* max_version= */ std::numeric_limits::max(), EMPTY_FILTER, + TRACING_NAME, /* expected_block_size= */ 1024)[0]; size_t num_rows_read = 0; while (Block block = in->read()) @@ -555,6 +561,7 @@ try /* num_streams= */ 1, /* max_version= */ std::numeric_limits::max(), EMPTY_FILTER, + TRACING_NAME, /* expected_block_size= */ 1024)[0]; size_t num_rows_read = 0; while (Block block = in->read()) @@ -639,6 +646,7 @@ try /* num_streams= */ 1, /* max_version= */ std::numeric_limits::max(), EMPTY_FILTER, + TRACING_NAME, /* expected_block_size= */ 1024)[0]; size_t num_rows_read = 0; while (Block block = in->read()) @@ -723,6 +731,7 @@ try /* num_streams= */ 1, /* max_version= */ std::numeric_limits::max(), EMPTY_FILTER, + TRACING_NAME, /* expected_block_size= */ 1024)[0]; size_t num_rows_read = 0; while (Block block = in->read()) @@ -753,6 +762,7 @@ try /* num_streams= */ 1, /* max_version= */ UInt64(1), EMPTY_FILTER, + TRACING_NAME, /* expected_block_size= */ 1024)[0]; size_t num_rows_read = 0; while (Block block = in->read()) @@ -807,6 +817,7 @@ try /* num_streams= */ 1, /* max_version= */ std::numeric_limits::max(), EMPTY_FILTER, + TRACING_NAME, /* expected_block_size= */ 1024)[0]; size_t num_rows_read = 0; while (Block block = in->read()) @@ -845,6 +856,7 @@ try /* num_streams= */ 1, /* max_version= */ std::numeric_limits::max(), EMPTY_FILTER, + TRACING_NAME, /* expected_block_size= */ 1024)[0]; size_t num_rows_read = 0; // block_num represents index of current segment @@ -903,6 +915,7 @@ try /* num_streams= */ 1, /* max_version= */ std::numeric_limits::max(), EMPTY_FILTER, + TRACING_NAME, /* expected_block_size= */ 1024); ASSERT_EQ(ins.size(), 1UL); BlockInputStreamPtr in = ins[0]; @@ -925,6 +938,7 @@ try /* num_streams= */ 1, /* max_version= */ tso2, EMPTY_FILTER, + TRACING_NAME, /* expected_block_size= */ 1024); ASSERT_EQ(ins.size(), 1UL); BlockInputStreamPtr in = ins[0]; @@ -947,6 +961,7 @@ try /* num_streams= */ 1, /* max_version= */ tso1, EMPTY_FILTER, + TRACING_NAME, /* expected_block_size= */ 1024); ASSERT_EQ(ins.size(), 1UL); BlockInputStreamPtr in = ins[0]; @@ -969,6 +984,7 @@ try /* num_streams= */ 1, /* max_version= */ tso1 - 1, EMPTY_FILTER, + TRACING_NAME, /* expected_block_size= */ 1024); ASSERT_EQ(ins.size(), 1UL); BlockInputStreamPtr in = ins[0]; @@ -1029,6 +1045,7 @@ try /* num_streams= */ 1, /* max_version= */ tso1, EMPTY_FILTER, + TRACING_NAME, /* expected_block_size= */ 1024); ASSERT_EQ(ins.size(), 1UL); BlockInputStreamPtr in = ins[0]; @@ -1065,6 +1082,7 @@ try /* num_streams= */ 1, /* max_version= */ tso2 - 1, EMPTY_FILTER, + TRACING_NAME, /* expected_block_size= */ 1024); ASSERT_EQ(ins.size(), 1UL); BlockInputStreamPtr in = ins[0]; @@ -1102,6 +1120,7 @@ try /* num_streams= */ 1, /* max_version= */ tso3 - 1, EMPTY_FILTER, + TRACING_NAME, /* expected_block_size= */ 1024); ASSERT_EQ(ins.size(), 1UL); BlockInputStreamPtr in = ins[0]; @@ -1126,6 +1145,7 @@ try /* num_streams= */ 1, /* max_version= */ std::numeric_limits::max(), EMPTY_FILTER, + TRACING_NAME, /* expected_block_size= */ 1024); ASSERT_EQ(ins.size(), 1UL); BlockInputStreamPtr in = ins[0]; @@ -1150,6 +1170,7 @@ try /* num_streams= */ 1, /* max_version= */ std::numeric_limits::max(), EMPTY_FILTER, + TRACING_NAME, /* expected_block_size= */ 1024); ASSERT_EQ(ins.size(), 1UL); BlockInputStreamPtr in = ins[0]; @@ -1210,6 +1231,7 @@ try /* num_streams= */ 1, /* max_version= */ tso1, EMPTY_FILTER, + TRACING_NAME, /* expected_block_size= */ 1024); ASSERT_EQ(ins.size(), 1); BlockInputStreamPtr in = ins[0]; @@ -1246,6 +1268,7 @@ try /* num_streams= */ 1, /* max_version= */ tso2 - 1, EMPTY_FILTER, + TRACING_NAME, /* expected_block_size= */ 1024); ASSERT_EQ(ins.size(), 1); BlockInputStreamPtr in = ins[0]; @@ -1283,6 +1306,7 @@ try /* num_streams= */ 1, /* max_version= */ std::numeric_limits::max(), EMPTY_FILTER, + TRACING_NAME, /* expected_block_size= */ 1024); ASSERT_EQ(ins.size(), 1); BlockInputStreamPtr in = ins[0]; @@ -1338,6 +1362,7 @@ try /* num_streams= */ 1, /* max_version= */ tso1, EMPTY_FILTER, + TRACING_NAME, /* expected_block_size= */ 1024); ASSERT_EQ(ins.size(), 1); BlockInputStreamPtr in = ins[0]; @@ -1374,6 +1399,7 @@ try /* num_streams= */ 1, /* max_version= */ std::numeric_limits::max(), EMPTY_FILTER, + TRACING_NAME, /* expected_block_size= */ 1024); ASSERT_EQ(ins.size(), 1); BlockInputStreamPtr in = ins[0]; @@ -1459,6 +1485,7 @@ try /* num_streams= */ 1, /* max_version= */ std::numeric_limits::max(), EMPTY_FILTER, + TRACING_NAME, /* expected_block_size= */ 1024); ASSERT_EQ(ins.size(), 1UL); BlockInputStreamPtr in = ins[0]; @@ -1566,6 +1593,7 @@ try /* num_streams= */ 1, /* max_version= */ std::numeric_limits::max(), EMPTY_FILTER, + TRACING_NAME, /* expected_block_size= */ 1024); ASSERT_EQ(ins.size(), 1UL); BlockInputStreamPtr & in = ins[0]; @@ -1672,6 +1700,7 @@ try /* num_streams= */ 1, /* max_version= */ std::numeric_limits::max(), EMPTY_FILTER, + TRACING_NAME, /* expected_block_size= */ 1024); ASSERT_EQ(ins.size(), 1UL); BlockInputStreamPtr & in = ins[0]; @@ -1759,6 +1788,7 @@ try /* num_streams= */ 1, /* max_version= */ std::numeric_limits::max(), EMPTY_FILTER, + TRACING_NAME, /* expected_block_size= */ 1024); ASSERT_EQ(ins.size(), 1UL); BlockInputStreamPtr & in = ins[0]; @@ -1861,6 +1891,7 @@ try /* num_streams= */ 1, /* max_version= */ std::numeric_limits::max(), EMPTY_FILTER, + TRACING_NAME, /* expected_block_size= */ 1024)[0]; in->readPrefix(); @@ -1936,6 +1967,7 @@ try /* num_streams= */ 1, /* max_version= */ std::numeric_limits::max(), EMPTY_FILTER, + TRACING_NAME, /* expected_block_size= */ 1024)[0]; in->readPrefix(); @@ -2011,6 +2043,7 @@ try /* num_streams= */ 1, /* max_version= */ std::numeric_limits::max(), EMPTY_FILTER, + TRACING_NAME, /* expected_block_size= */ 1024)[0]; in->readPrefix(); @@ -2086,6 +2119,7 @@ try /* num_streams= */ 1, /* max_version= */ std::numeric_limits::max(), EMPTY_FILTER, + TRACING_NAME, /* expected_block_size= */ 1024)[0]; in->readPrefix(); @@ -2161,6 +2195,7 @@ try /* num_streams= */ 1, /* max_version= */ std::numeric_limits::max(), EMPTY_FILTER, + TRACING_NAME, /* expected_block_size= */ 1024)[0]; in->readPrefix(); @@ -2234,6 +2269,7 @@ try /* num_streams= */ 1, /* max_version= */ std::numeric_limits::max(), EMPTY_FILTER, + TRACING_NAME, /* expected_block_size= */ 1024)[0]; size_t num_rows_read = 0; @@ -2306,6 +2342,7 @@ try /* num_streams= */ 1, /* max_version= */ std::numeric_limits::max(), EMPTY_FILTER, + TRACING_NAME, /* expected_block_size= */ 1024)[0]; in->readPrefix(); @@ -2395,6 +2432,7 @@ try /* num_streams= */ 1, /* max_version= */ std::numeric_limits::max(), EMPTY_FILTER, + TRACING_NAME, /* expected_block_size= */ 1024); ASSERT_EQ(ins.size(), 1UL); BlockInputStreamPtr & in = ins[0]; @@ -2528,6 +2566,7 @@ try /* num_streams= */ 1, /* max_version= */ std::numeric_limits::max(), EMPTY_FILTER, + TRACING_NAME, /* expected_block_size= */ 1024); ASSERT_EQ(ins.size(), 1UL); BlockInputStreamPtr & in = ins[0]; @@ -2588,6 +2627,7 @@ try /* num_streams= */ 1, /* max_version= */ std::numeric_limits::max(), EMPTY_FILTER, + TRACING_NAME, /* expected_block_size= */ 1024); ASSERT_EQ(ins.size(), 1UL); BlockInputStreamPtr & in = ins[0]; @@ -2686,6 +2726,7 @@ try /* num_streams= */ 1, /* max_version= */ std::numeric_limits::max(), EMPTY_FILTER, + TRACING_NAME, /* expected_block_size= */ 1024)[0]; in->readPrefix(); @@ -2738,6 +2779,7 @@ try /* num_streams= */ 1, /* max_version= */ std::numeric_limits::max(), EMPTY_FILTER, + TRACING_NAME, /* expected_block_size= */ 1024)[0]; in->readPrefix(); @@ -2853,6 +2895,7 @@ try /* num_streams= */ 1, /* max_version= */ std::numeric_limits::max(), EMPTY_FILTER, + TRACING_NAME, /* expected_block_size= */ 1024)[0]; size_t num_rows_read = 0; @@ -2981,6 +3024,7 @@ try /* num_streams= */ 1, /* max_version= */ std::numeric_limits::max(), EMPTY_FILTER, + TRACING_NAME, /* expected_block_size= */ 1024)[0]; size_t num_rows_read = 0; while (Block block = in->read()) @@ -3054,6 +3098,7 @@ try /* num_streams= */ 1, /* max_version= */ std::numeric_limits::max(), EMPTY_FILTER, + TRACING_NAME, /* expected_block_size= */ 1024)[0]; size_t num_rows_read = 0; while (Block block = in->read()) @@ -3084,6 +3129,7 @@ try /* num_streams= */ 1, /* max_version= */ UInt64(1), EMPTY_FILTER, + TRACING_NAME, /* expected_block_size= */ 1024)[0]; size_t num_rows_read = 0; while (Block block = in->read()) @@ -3141,6 +3187,7 @@ try /* num_streams= */ 1, /* max_version= */ std::numeric_limits::max(), EMPTY_FILTER, + TRACING_NAME, /* expected_block_size= */ 1024)[0]; size_t num_rows_read = 0; while (Block block = in->read()) @@ -3190,6 +3237,7 @@ try /* num_streams= */ 1, /* max_version= */ std::numeric_limits::max(), EMPTY_FILTER, + TRACING_NAME, /* expected_block_size= */ 1024)[0]; size_t num_rows_read = 0; while (Block block = in->read()) @@ -3262,6 +3310,7 @@ try /* num_streams= */ 1, /* max_version= */ std::numeric_limits::max(), EMPTY_FILTER, + TRACING_NAME, /* expected_block_size= */ 1024); ASSERT_EQ(ins.size(), 1UL); BlockInputStreamPtr in = ins[0]; @@ -3305,10 +3354,11 @@ try } CATCH -INSTANTIATE_TEST_CASE_P(TestMode, // - DeltaMergeStoreRWTest, - testing::Values(TestMode::V1_BlockOnly, TestMode::V2_BlockOnly, TestMode::V2_FileOnly, TestMode::V2_Mix), - testModeToString); +INSTANTIATE_TEST_CASE_P( + TestMode, + DeltaMergeStoreRWTest, + testing::Values(TestMode::V1_BlockOnly, TestMode::V2_BlockOnly, TestMode::V2_FileOnly, TestMode::V2_Mix), + testModeToString); } // namespace tests } // namespace DM diff --git a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_minmax_index.cpp b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_minmax_index.cpp index bbb8aaf8206..31fd99faf01 100644 --- a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_minmax_index.cpp +++ b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_minmax_index.cpp @@ -124,7 +124,7 @@ bool checkMatch( store->mergeDeltaAll(context); const ColumnDefine & col_to_read = check_pk ? getExtraHandleColumnDefine(is_common_handle) : cd; - auto streams = store->read(context, context.getSettingsRef(), {col_to_read}, {all_range}, 1, std::numeric_limits::max(), filter); + auto streams = store->read(context, context.getSettingsRef(), {col_to_read}, {all_range}, 1, std::numeric_limits::max(), filter, name); streams[0]->readPrefix(); auto rows = streams[0]->read().rows(); streams[0]->readSuffix(); diff --git a/dbms/src/Storages/DeltaMerge/tools/workload/DTWorkload.cpp b/dbms/src/Storages/DeltaMerge/tools/workload/DTWorkload.cpp index 5f99cbcd33f..e5c7fd30f40 100644 --- a/dbms/src/Storages/DeltaMerge/tools/workload/DTWorkload.cpp +++ b/dbms/src/Storages/DeltaMerge/tools/workload/DTWorkload.cpp @@ -191,7 +191,7 @@ void DTWorkload::read(const ColumnDefines & columns, int stream_count, T func) auto filter = EMPTY_FILTER; int excepted_block_size = 1024; uint64_t read_ts = ts_gen->get(); - auto streams = store->read(*context, context->getSettingsRef(), columns, ranges, stream_count, read_ts, filter, excepted_block_size); + auto streams = store->read(*context, context->getSettingsRef(), columns, ranges, stream_count, read_ts, filter, "DTWorkload", excepted_block_size); std::vector threads; threads.reserve(streams.size()); for (auto & stream : streams) diff --git a/dbms/src/Storages/SelectQueryInfo.cpp b/dbms/src/Storages/SelectQueryInfo.cpp index 631f707e21d..075c0ad0631 100644 --- a/dbms/src/Storages/SelectQueryInfo.cpp +++ b/dbms/src/Storages/SelectQueryInfo.cpp @@ -12,28 +12,31 @@ // See the License for the specific language governing permissions and // limitations under the License. -#include - #include #include +#include namespace DB { SelectQueryInfo::SelectQueryInfo() = default; -SelectQueryInfo::SelectQueryInfo(const SelectQueryInfo & query_info_) - : query(query_info_.query), - sets(query_info_.sets), - mvcc_query_info(query_info_.mvcc_query_info != nullptr ? std::make_unique(*query_info_.mvcc_query_info) : nullptr), - dag_query(query_info_.dag_query != nullptr ? std::make_unique(*query_info_.dag_query) : nullptr) -{} +SelectQueryInfo::~SelectQueryInfo() = default; -SelectQueryInfo::SelectQueryInfo(SelectQueryInfo && query_info_) - : query(query_info_.query), sets(query_info_.sets), mvcc_query_info(std::move(query_info_.mvcc_query_info)), - dag_query(std::move(query_info_.dag_query)) +SelectQueryInfo::SelectQueryInfo(const SelectQueryInfo & rhs) + : query(rhs.query) + , sets(rhs.sets) + , mvcc_query_info(rhs.mvcc_query_info != nullptr ? std::make_unique(*rhs.mvcc_query_info) : nullptr) + , dag_query(rhs.dag_query != nullptr ? std::make_unique(*rhs.dag_query) : nullptr) + , req_id(rhs.req_id) {} -SelectQueryInfo::~SelectQueryInfo() = default; +SelectQueryInfo::SelectQueryInfo(SelectQueryInfo && rhs) noexcept + : query(std::move(rhs.query)) + , sets(std::move(rhs.sets)) + , mvcc_query_info(std::move(rhs.mvcc_query_info)) + , dag_query(std::move(rhs.dag_query)) + , req_id(std::move(rhs.req_id)) +{} } // namespace DB diff --git a/dbms/src/Storages/SelectQueryInfo.h b/dbms/src/Storages/SelectQueryInfo.h index bed607ce0fe..d49d4c831d1 100644 --- a/dbms/src/Storages/SelectQueryInfo.h +++ b/dbms/src/Storages/SelectQueryInfo.h @@ -15,6 +15,7 @@ #pragma once #include +#include #include namespace DB @@ -32,6 +33,7 @@ using PreparedSets = std::unordered_map; struct MvccQueryInfo; struct DAGQueryInfo; + /** Query along with some additional data, * that can be used during query processing * inside storage engines. @@ -48,14 +50,16 @@ struct SelectQueryInfo std::unique_ptr dag_query; - SelectQueryInfo(); - - SelectQueryInfo(const SelectQueryInfo & query_info_); + std::string req_id; - SelectQueryInfo(SelectQueryInfo && query_info_); + SelectQueryInfo(); ~SelectQueryInfo(); + // support copying and moving + SelectQueryInfo(const SelectQueryInfo & rhs); + SelectQueryInfo(SelectQueryInfo && rhs) noexcept; + bool fromAST() const { return dag_query == nullptr; }; }; diff --git a/dbms/src/Storages/StorageDeltaMerge.cpp b/dbms/src/Storages/StorageDeltaMerge.cpp index 224500cd69d..1f496ade671 100644 --- a/dbms/src/Storages/StorageDeltaMerge.cpp +++ b/dbms/src/Storages/StorageDeltaMerge.cpp @@ -14,6 +14,7 @@ #include #include +#include #include #include #include @@ -80,7 +81,7 @@ StorageDeltaMerge::StorageDeltaMerge( , store_inited(false) , max_column_id_used(0) , global_context(global_context_.getGlobalContext()) - , log(&Poco::Logger::get("StorageDeltaMerge")) + , log(Logger::get("StorageDeltaMerge", fmt::format("{}.{}", db_name_, table_name_))) { if (primary_expr_ast_->children.empty()) throw Exception("No primary key"); @@ -626,8 +627,9 @@ BlockInputStreams StorageDeltaMerge::read( throw Exception("TMTContext is not initialized", ErrorCodes::LOGICAL_ERROR); const auto & mvcc_query_info = *query_info.mvcc_query_info; + auto tracing_logger = Logger::get("StorageDeltaMerge", log->identifier(), query_info.req_id); - LOG_FMT_DEBUG(log, "Read with tso: {}", mvcc_query_info.read_tso); + LOG_FMT_DEBUG(tracing_logger, "Read with tso: {}", mvcc_query_info.read_tso); // Check whether tso is smaller than TiDB GcSafePoint const auto check_read_tso = [&tmt, &context, this](UInt64 read_tso) { @@ -652,7 +654,7 @@ BlockInputStreams StorageDeltaMerge::read( check_read_tso(mvcc_query_info.read_tso); FmtBuffer fmt_buf; - if (unlikely(log->trace())) + if (unlikely(tracing_logger->trace())) { fmt_buf.append("orig, "); fmt_buf.joinStr( @@ -685,9 +687,9 @@ BlockInputStreams StorageDeltaMerge::read( is_common_handle, rowkey_column_size, /*expected_ranges_count*/ num_streams, - log); + tracing_logger); - if (unlikely(log->trace())) + if (unlikely(tracing_logger->trace())) { fmt_buf.append(" merged, "); fmt_buf.joinStr( @@ -697,7 +699,7 @@ BlockInputStreams StorageDeltaMerge::read( fb.append(range.toDebugString()); }, ","); - LOG_FMT_TRACE(log, "reading ranges: {}", fmt_buf.toString()); + LOG_FMT_TRACE(tracing_logger, "reading ranges: {}", fmt_buf.toString()); } /// Get Rough set filter from query @@ -722,10 +724,10 @@ BlockInputStreams StorageDeltaMerge::read( rs_operator = FilterParser::parseDAGQuery(*query_info.dag_query, columns_to_read, std::move(create_attr_by_column_id), log); } if (likely(rs_operator != DM::EMPTY_FILTER)) - LOG_FMT_DEBUG(log, "Rough set filter: {}", rs_operator->toDebugString()); + LOG_FMT_DEBUG(tracing_logger, "Rough set filter: {}", rs_operator->toDebugString()); } else - LOG_FMT_DEBUG(log, "Rough set filter is disabled."); + LOG_FMT_DEBUG(tracing_logger, "Rough set filter is disabled."); auto streams = store->read( context, @@ -735,6 +737,7 @@ BlockInputStreams StorageDeltaMerge::read( num_streams, /*max_version=*/mvcc_query_info.read_tso, rs_operator, + query_info.req_id, max_block_size, parseSegmentSet(select_query.segment_expression_list), extra_table_id_index); @@ -742,7 +745,7 @@ BlockInputStreams StorageDeltaMerge::read( /// Ensure read_tso info after read. check_read_tso(mvcc_query_info.read_tso); - LOG_FMT_TRACE(log, "[ranges: {}] [streams: {}]", ranges.size(), streams.size()); + LOG_FMT_TRACE(tracing_logger, "[ranges: {}] [streams: {}]", ranges.size(), streams.size()); return streams; } @@ -797,6 +800,7 @@ UInt64 StorageDeltaMerge::onSyncGc(Int64 limit) return 0; } +// just for testing size_t getRows(DM::DeltaMergeStorePtr & store, const Context & context, const DM::RowKeyRange & range) { size_t rows = 0; @@ -809,7 +813,8 @@ size_t getRows(DM::DeltaMergeStorePtr & store, const Context & context, const DM {range}, 1, std::numeric_limits::max(), - EMPTY_FILTER)[0]; + EMPTY_FILTER, + /*tracing_id*/ "getRows")[0]; stream->readPrefix(); Block block; while ((block = stream->read())) @@ -819,6 +824,7 @@ size_t getRows(DM::DeltaMergeStorePtr & store, const Context & context, const DM return rows; } +// just for testing DM::RowKeyRange getRange(DM::DeltaMergeStorePtr & store, const Context & context, size_t total_rows, size_t delete_rows) { auto start_index = rand() % (total_rows - delete_rows + 1); // NOLINT(cert-msc50-cpp) @@ -832,7 +838,8 @@ DM::RowKeyRange getRange(DM::DeltaMergeStorePtr & store, const Context & context {DM::RowKeyRange::newAll(store->isCommonHandle(), store->getRowKeyColumnSize())}, 1, std::numeric_limits::max(), - EMPTY_FILTER)[0]; + EMPTY_FILTER, + /*tracing_id*/ "getRange")[0]; stream->readPrefix(); Block block; size_t index = 0; diff --git a/dbms/src/Storages/StorageDeltaMerge.h b/dbms/src/Storages/StorageDeltaMerge.h index f5513063d3e..84ae387ecee 100644 --- a/dbms/src/Storages/StorageDeltaMerge.h +++ b/dbms/src/Storages/StorageDeltaMerge.h @@ -235,7 +235,7 @@ class StorageDeltaMerge Context & global_context; - Poco::Logger * log; + LoggerPtr log; }; diff --git a/dbms/src/Storages/StorageDeltaMergeHelpers.h b/dbms/src/Storages/StorageDeltaMergeHelpers.h index 96b3424b836..18337f879de 100644 --- a/dbms/src/Storages/StorageDeltaMergeHelpers.h +++ b/dbms/src/Storages/StorageDeltaMergeHelpers.h @@ -14,6 +14,7 @@ #pragma once +#include #include #include #include @@ -30,7 +31,13 @@ namespace ErrorCodes extern const int LOGICAL_ERROR; } -inline DM::RowKeyRanges getQueryRanges(const DB::MvccQueryInfo::RegionsQueryInfo & regions, TableID table_id, bool is_common_handle, size_t rowkey_column_size, size_t expected_ranges_count = 1, Poco::Logger * log = nullptr) +inline DM::RowKeyRanges getQueryRanges( + const DB::MvccQueryInfo::RegionsQueryInfo & regions, + TableID table_id, + bool is_common_handle, + size_t rowkey_column_size, + size_t expected_ranges_count = 1, + const LoggerPtr & log = nullptr) { // todo check table id in DecodedTiKVKey??? DM::RowKeyRanges ranges; diff --git a/dbms/src/Storages/System/StorageSystemDTTables.cpp b/dbms/src/Storages/System/StorageSystemDTTables.cpp index 9523aff1f44..b3f9cf5b29e 100644 --- a/dbms/src/Storages/System/StorageSystemDTTables.cpp +++ b/dbms/src/Storages/System/StorageSystemDTTables.cpp @@ -28,7 +28,8 @@ namespace DB { -StorageSystemDTTables::StorageSystemDTTables(const std::string & name_) : name(name_) +StorageSystemDTTables::StorageSystemDTTables(const std::string & name_) + : name(name_) { setColumns(ColumnsDescription({ {"database", std::make_shared()}, @@ -85,6 +86,7 @@ StorageSystemDTTables::StorageSystemDTTables(const std::string & name_) : name(n {"storage_stable_num_snapshots", std::make_shared()}, {"storage_stable_oldest_snapshot_lifetime", std::make_shared()}, {"storage_stable_oldest_snapshot_thread_id", std::make_shared()}, + {"storage_stable_oldest_snapshot_tracing_id", std::make_shared()}, {"storage_stable_num_pages", std::make_shared()}, {"storage_stable_num_normal_pages", std::make_shared()}, {"storage_stable_max_page_id", std::make_shared()}, @@ -92,6 +94,7 @@ StorageSystemDTTables::StorageSystemDTTables(const std::string & name_) : name(n {"storage_delta_num_snapshots", std::make_shared()}, {"storage_delta_oldest_snapshot_lifetime", std::make_shared()}, {"storage_delta_oldest_snapshot_thread_id", std::make_shared()}, + {"storage_delta_oldest_snapshot_tracing_id", std::make_shared()}, {"storage_delta_num_pages", std::make_shared()}, {"storage_delta_num_normal_pages", std::make_shared()}, {"storage_delta_max_page_id", std::make_shared()}, @@ -99,6 +102,7 @@ StorageSystemDTTables::StorageSystemDTTables(const std::string & name_) : name(n {"storage_meta_num_snapshots", std::make_shared()}, {"storage_meta_oldest_snapshot_lifetime", std::make_shared()}, {"storage_meta_oldest_snapshot_thread_id", std::make_shared()}, + {"storage_meta_oldest_snapshot_tracing_id", std::make_shared()}, {"storage_meta_num_pages", std::make_shared()}, {"storage_meta_num_normal_pages", std::make_shared()}, {"storage_meta_max_page_id", std::make_shared()}, @@ -108,7 +112,8 @@ StorageSystemDTTables::StorageSystemDTTables(const std::string & name_) : name(n } -BlockInputStreams StorageSystemDTTables::read(const Names & column_names, +BlockInputStreams StorageSystemDTTables::read( + const Names & column_names, const SelectQueryInfo &, const Context & context, QueryProcessingStage::Enum & processed_stage, @@ -126,19 +131,19 @@ BlockInputStreams StorageSystemDTTables::read(const Names & column_names, for (const auto & d : databases) { String database_name = d.first; - auto & database = d.second; + const auto & database = d.second; const DatabaseTiFlash * db_tiflash = typeid_cast(database.get()); auto it = database->getIterator(context); for (; it->isValid(); it->next()) { - auto & table_name = it->name(); + const auto & table_name = it->name(); auto & storage = it->table(); if (storage->getName() != MutableSupport::delta_tree_storage_name) continue; auto dm_storage = std::dynamic_pointer_cast(storage); - auto & table_info = dm_storage->getTableInfo(); + const auto & table_info = dm_storage->getTableInfo(); auto table_id = table_info.id; auto stat = dm_storage->getStore()->getStat(); @@ -201,6 +206,7 @@ BlockInputStreams StorageSystemDTTables::read(const Names & column_names, res_columns[j++]->insert(stat.storage_stable_num_snapshots); res_columns[j++]->insert(stat.storage_stable_oldest_snapshot_lifetime); res_columns[j++]->insert(stat.storage_stable_oldest_snapshot_thread_id); + res_columns[j++]->insert(stat.storage_stable_oldest_snapshot_tracing_id); res_columns[j++]->insert(stat.storage_stable_num_pages); res_columns[j++]->insert(stat.storage_stable_num_normal_pages); res_columns[j++]->insert(stat.storage_stable_max_page_id); @@ -208,6 +214,7 @@ BlockInputStreams StorageSystemDTTables::read(const Names & column_names, res_columns[j++]->insert(stat.storage_delta_num_snapshots); res_columns[j++]->insert(stat.storage_delta_oldest_snapshot_lifetime); res_columns[j++]->insert(stat.storage_delta_oldest_snapshot_thread_id); + res_columns[j++]->insert(stat.storage_delta_oldest_snapshot_tracing_id); res_columns[j++]->insert(stat.storage_delta_num_pages); res_columns[j++]->insert(stat.storage_delta_num_normal_pages); res_columns[j++]->insert(stat.storage_delta_max_page_id); @@ -215,6 +222,7 @@ BlockInputStreams StorageSystemDTTables::read(const Names & column_names, res_columns[j++]->insert(stat.storage_meta_num_snapshots); res_columns[j++]->insert(stat.storage_meta_oldest_snapshot_lifetime); res_columns[j++]->insert(stat.storage_meta_oldest_snapshot_thread_id); + res_columns[j++]->insert(stat.storage_meta_oldest_snapshot_tracing_id); res_columns[j++]->insert(stat.storage_meta_num_pages); res_columns[j++]->insert(stat.storage_meta_num_normal_pages); res_columns[j++]->insert(stat.storage_meta_max_page_id); diff --git a/dbms/src/Storages/tests/gtest_filter_parser.cpp b/dbms/src/Storages/tests/gtest_filter_parser.cpp index 52f9efe0f1e..a027ea71cfc 100644 --- a/dbms/src/Storages/tests/gtest_filter_parser.cpp +++ b/dbms/src/Storages/tests/gtest_filter_parser.cpp @@ -54,14 +54,14 @@ class FilterParserTest : public ::testing::Test } FilterParserTest() - : log(&Poco::Logger::get("FilterParserTest")) + : log(Logger::get("FilterParserTest")) , ctx(TiFlashTestEnv::getContext()) { default_timezone_info = ctx.getTimezoneInfo(); } protected: - Poco::Logger * log; + LoggerPtr log; Context ctx; static TimezoneInfo default_timezone_info; DM::RSOperatorPtr generateRsOperator(String table_info_json, const String & query, TimezoneInfo & timezone_info); @@ -98,7 +98,7 @@ DM::RSOperatorPtr FilterParserTest::generateRsOperator(const String table_info_j DM::ColumnDefines columns_to_read; { NamesAndTypes source_columns; - std::tie(source_columns, std::ignore) = parseColumnsFromTableInfo(table_info, log); + std::tie(source_columns, std::ignore) = parseColumnsFromTableInfo(table_info, log->getLog()); dag_query = std::make_unique( conditions, DAGPreparedSets(),