From 6301306a88901d4b6696706ded144c9cd33b3a6a Mon Sep 17 00:00:00 2001 From: JaySon Date: Thu, 24 Mar 2022 12:52:34 +0800 Subject: [PATCH] Logger: Refactor dmfile reader and add tracing_logger (#4379) ref pingcap/tiflash#4287 --- CMakeLists.txt | 3 +- dbms/src/Common/CPUAffinityManager.cpp | 2 +- dbms/src/Common/CPUAffinityManager.h | 7 +- dbms/src/Server/DTTool/DTToolBench.cpp | 17 +- dbms/src/Server/DTTool/DTToolInspect.cpp | 2 +- .../DeltaMerge/ColumnFile/ColumnFileBig.cpp | 24 +- .../DMVersionFilterBlockInputStream.cpp | 4 +- .../DMVersionFilterBlockInputStream.h | 2 +- .../Storages/DeltaMerge/DeltaMergeDefines.h | 13 +- .../Storages/DeltaMerge/DeltaMergeStore.cpp | 3 +- dbms/src/Storages/DeltaMerge/File/DMFile.h | 1 + .../File/DMFileBlockInputStream.cpp | 65 ++++ .../DeltaMerge/File/DMFileBlockInputStream.h | 171 ++++++++--- .../DeltaMerge/File/DMFilePackFilter.h | 47 ++- .../Storages/DeltaMerge/File/DMFileReader.cpp | 56 ++-- .../Storages/DeltaMerge/File/DMFileReader.h | 36 +-- dbms/src/Storages/DeltaMerge/Segment.cpp | 31 +- dbms/src/Storages/DeltaMerge/Segment.h | 6 +- .../Storages/DeltaMerge/StableValueSpace.cpp | 92 +++--- .../DeltaMerge/tests/bank/SimpleLockManager.h | 12 +- .../Storages/DeltaMerge/tests/bank/main.cpp | 2 +- .../DeltaMerge/tests/gtest_dm_file.cpp | 287 ++++++------------ .../tests/gtest_dm_minmax_index.cpp | 2 +- .../DeltaMerge/tests/gtest_dm_segment.cpp | 12 +- .../DeltaMerge/tests/gtest_version_filter.cpp | 20 +- .../tools/workload/DataGenerator.cpp | 10 +- .../Storages/Page/V3/LogFile/LogReader.cpp | 4 +- dbms/src/Storages/Page/V3/LogFile/LogReader.h | 5 +- dbms/src/Storages/Page/V3/WAL/WALReader.cpp | 3 +- .../Storages/Page/V3/tests/gtest_wal_log.cpp | 3 +- dbms/src/Storages/StorageDeltaMerge.cpp | 4 +- 31 files changed, 457 insertions(+), 489 deletions(-) create mode 100644 dbms/src/Storages/DeltaMerge/File/DMFileBlockInputStream.cpp diff --git a/CMakeLists.txt b/CMakeLists.txt index e37df3a59c2..39a98aaeffe 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -107,8 +107,7 @@ endif () if (CMAKE_CXX_COMPILER_ID STREQUAL "Clang") # clang: warning: argument unused during compilation: '-stdlib=libc++' # clang: warning: argument unused during compilation: '-specs=/usr/share/dpkg/no-pie-compile.specs' [-Wunused-command-line-argument] - # clang: warning: private field 'hash_salt' is not used [-Wunused-private-field] - set (COMMON_WARNING_FLAGS "${COMMON_WARNING_FLAGS} -Wno-unused-command-line-argument -Wno-unused-private-field") + set (COMMON_WARNING_FLAGS "${COMMON_WARNING_FLAGS} -Wno-unused-command-line-argument") endif () if (ARCH_LINUX) diff --git a/dbms/src/Common/CPUAffinityManager.cpp b/dbms/src/Common/CPUAffinityManager.cpp index cd6330d420b..eb573e35e65 100644 --- a/dbms/src/Common/CPUAffinityManager.cpp +++ b/dbms/src/Common/CPUAffinityManager.cpp @@ -338,4 +338,4 @@ void CPUAffinityManager::checkThreadCPUAffinity() const } } #endif -} // namespace DB \ No newline at end of file +} // namespace DB diff --git a/dbms/src/Common/CPUAffinityManager.h b/dbms/src/Common/CPUAffinityManager.h index a27255e2829..8c88c3ab1fd 100644 --- a/dbms/src/Common/CPUAffinityManager.h +++ b/dbms/src/Common/CPUAffinityManager.h @@ -114,8 +114,9 @@ class CPUAffinityManager cpu_set_t other_cpu_set; #endif - int query_cpu_percent; - int cpu_cores; + // unused except Linux + [[maybe_unused]] int query_cpu_percent; + [[maybe_unused]] int cpu_cores; std::vector query_threads; Poco::Logger * log; @@ -127,4 +128,4 @@ class CPUAffinityManager CPUAffinityManager(CPUAffinityManager &&) = delete; CPUAffinityManager & operator=(CPUAffinityManager &&) = delete; }; -} // namespace DB \ No newline at end of file +} // namespace DB diff --git a/dbms/src/Server/DTTool/DTToolBench.cpp b/dbms/src/Server/DTTool/DTToolBench.cpp index b67a3512ba9..491ac710721 100644 --- a/dbms/src/Server/DTTool/DTToolBench.cpp +++ b/dbms/src/Server/DTTool/DTToolBench.cpp @@ -386,22 +386,13 @@ int benchEntry(const std::vector & opts) auto start = high_resolution_clock::now(); { - auto stream = DB::DM::DMFileBlockInputStream( - *db_context, - std::numeric_limits::max(), - false, - dm_context->hash_salt, - dmfile, - *defines, - {DB::DM::RowKeyRange::newAll(false, 1)}, - DB::DM::RSOperatorPtr{}, - std::make_shared(), - DB::DM::IdSetPtr{}); + auto builder = DB::DM::DMFileBlockInputStreamBuilder(*db_context); + auto stream = builder.setColumnCache(std::make_shared()).build(dmfile, *defines, {DB::DM::RowKeyRange::newAll(false, 1)}); for (size_t j = 0; j < blocks.size(); ++j) { - TIFLASH_NO_OPTIMIZE(stream.read()); + TIFLASH_NO_OPTIMIZE(stream->read()); } - stream.readSuffix(); + stream->readSuffix(); } auto end = high_resolution_clock::now(); auto duration = duration_cast(end - start).count(); diff --git a/dbms/src/Server/DTTool/DTToolInspect.cpp b/dbms/src/Server/DTTool/DTToolInspect.cpp index fc6c4bbb5f4..6496c41fb7b 100644 --- a/dbms/src/Server/DTTool/DTToolInspect.cpp +++ b/dbms/src/Server/DTTool/DTToolInspect.cpp @@ -45,7 +45,7 @@ int inspectServiceMain(DB::Context & context, const InspectArgs & args) // black_hole is used to consume data manually. // we use SCOPE_EXIT to ensure the release of memory area. - auto black_hole = reinterpret_cast(::operator new (DBMS_DEFAULT_BUFFER_SIZE, std::align_val_t{64})); + auto * black_hole = reinterpret_cast(::operator new (DBMS_DEFAULT_BUFFER_SIZE, std::align_val_t{64})); SCOPE_EXIT({ ::operator delete (black_hole, std::align_val_t{64}); }); auto consume = [&](DB::ReadBuffer & t) { while (t.readBig(black_hole, DBMS_DEFAULT_BUFFER_SIZE) != 0) {} diff --git a/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileBig.cpp b/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileBig.cpp index 1e99af00295..f1e98b680ce 100644 --- a/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileBig.cpp +++ b/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileBig.cpp @@ -33,10 +33,16 @@ ColumnFileBig::ColumnFileBig(const DMContext & context, const DMFilePtr & file_, void ColumnFileBig::calculateStat(const DMContext & context) { auto index_cache = context.db_context.getGlobalContext().getMinMaxIndexCache(); - auto hash_salt = context.hash_salt; - auto pack_filter - = DMFilePackFilter::loadFrom(file, index_cache, hash_salt, {segment_range}, EMPTY_FILTER, {}, context.db_context.getFileProvider(), context.getReadLimiter()); + auto pack_filter = DMFilePackFilter::loadFrom( + file, + index_cache, + {segment_range}, + EMPTY_FILTER, + {}, + context.db_context.getFileProvider(), + context.getReadLimiter(), + /*tracing_logger*/ nullptr); std::tie(valid_rows, valid_bytes) = pack_filter.validRowsAndBytes(); } @@ -79,16 +85,8 @@ void ColumnFileBigReader::initStream() if (file_stream) return; - file_stream = std::make_shared(context.db_context, - /*max_version*/ MAX_UINT64, - /*clean_read*/ false, - context.hash_salt, - column_file.getFile(), - *col_defs, - RowKeyRanges{column_file.segment_range}, - RSOperatorPtr{}, - ColumnCachePtr{}, - IdSetPtr{}); + DMFileBlockInputStreamBuilder builder(context.db_context); + file_stream = builder.build(column_file.getFile(), *col_defs, RowKeyRanges{column_file.segment_range}); // If we only need to read pk and version columns, then cache columns data in memory. if (pk_ver_only) diff --git a/dbms/src/Storages/DeltaMerge/DMVersionFilterBlockInputStream.cpp b/dbms/src/Storages/DeltaMerge/DMVersionFilterBlockInputStream.cpp index 827631d39d6..9e6bee47015 100644 --- a/dbms/src/Storages/DeltaMerge/DMVersionFilterBlockInputStream.cpp +++ b/dbms/src/Storages/DeltaMerge/DMVersionFilterBlockInputStream.cpp @@ -260,7 +260,7 @@ Block DMVersionFilterBlockInputStream::read(FilterPtr & res_filter, bool r } // Let's calculate gc_hint_version - gc_hint_version = UINT64_MAX; + gc_hint_version = std::numeric_limits::max(); { UInt8 * filter_pos = filter.data(); size_t handle_pos = 0; @@ -380,7 +380,7 @@ Block DMVersionFilterBlockInputStream::read(FilterPtr & res_filter, bool r else { Block res; - for (auto & c : header) + for (const auto & c : header) { auto & column = cur_raw_block.getByName(c.name); column.column = column.column->filter(filter, passed_count); diff --git a/dbms/src/Storages/DeltaMerge/DMVersionFilterBlockInputStream.h b/dbms/src/Storages/DeltaMerge/DMVersionFilterBlockInputStream.h index 083eb24e300..331e4cd6f3e 100644 --- a/dbms/src/Storages/DeltaMerge/DMVersionFilterBlockInputStream.h +++ b/dbms/src/Storages/DeltaMerge/DMVersionFilterBlockInputStream.h @@ -183,7 +183,7 @@ class DMVersionFilterBlockInputStream : public IBlockInputStream } } - return matched ? cur_version : UINT64_MAX; + return matched ? cur_version : std::numeric_limits::max(); } private: diff --git a/dbms/src/Storages/DeltaMerge/DeltaMergeDefines.h b/dbms/src/Storages/DeltaMerge/DeltaMergeDefines.h index 404e292fdc8..7423c8b63ad 100644 --- a/dbms/src/Storages/DeltaMerge/DeltaMergeDefines.h +++ b/dbms/src/Storages/DeltaMerge/DeltaMergeDefines.h @@ -157,17 +157,8 @@ inline const ColumnDefine & getExtraTableIDColumnDefine() return EXTRA_TABLE_ID_COLUMN_DEFINE_; } -static constexpr UInt64 MIN_UINT64 = std::numeric_limits::min(); -static constexpr UInt64 MAX_UINT64 = std::numeric_limits::max(); - -static constexpr Int64 MIN_INT64 = std::numeric_limits::min(); -static constexpr Int64 MAX_INT64 = std::numeric_limits::max(); - -static constexpr Handle N_INF_HANDLE = MIN_INT64; // Used in range, indicating negative infinity. -static constexpr Handle P_INF_HANDLE = MAX_INT64; // Used in range, indicating positive infinity. - -static_assert(static_cast(static_cast(MIN_INT64)) == MIN_INT64, "Unsupported compiler!"); -static_assert(static_cast(static_cast(MAX_INT64)) == MAX_INT64, "Unsupported compiler!"); +static_assert(static_cast(static_cast(std::numeric_limits::min())) == std::numeric_limits::min(), "Unsupported compiler!"); +static_assert(static_cast(static_cast(std::numeric_limits::max())) == std::numeric_limits::max(), "Unsupported compiler!"); static constexpr bool DM_RUN_CHECK = true; diff --git a/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp b/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp index d2d9c708efa..09bd86918c0 100644 --- a/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp +++ b/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp @@ -27,7 +27,6 @@ #include #include #include -#include #include #include #include @@ -1021,7 +1020,7 @@ BlockInputStreams DeltaMergeStore::readRaw(const Context & db_context, after_segment_read, columns_to_read, EMPTY_FILTER, - MAX_UINT64, + std::numeric_limits::max(), DEFAULT_BLOCK_SIZE, true, db_settings.dt_raw_filter_range, diff --git a/dbms/src/Storages/DeltaMerge/File/DMFile.h b/dbms/src/Storages/DeltaMerge/File/DMFile.h index 0749b4eee6e..ee7cf54b0d8 100644 --- a/dbms/src/Storages/DeltaMerge/File/DMFile.h +++ b/dbms/src/Storages/DeltaMerge/File/DMFile.h @@ -416,6 +416,7 @@ class DMFile : private boost::noncopyable friend class DMFileWriter; friend class DMFileReader; friend class DMFilePackFilter; + friend class DMFileBlockInputStreamBuilder; friend int ::DTTool::Migrate::migrateServiceMain(DB::Context & context, const ::DTTool::Migrate::MigrateArgs & args); friend bool ::DTTool::Migrate::isRecognizable(const DB::DM::DMFile & file, const std::string & target); friend bool ::DTTool::Migrate::needFrameMigration(const DB::DM::DMFile & file, const std::string & target); diff --git a/dbms/src/Storages/DeltaMerge/File/DMFileBlockInputStream.cpp b/dbms/src/Storages/DeltaMerge/File/DMFileBlockInputStream.cpp new file mode 100644 index 00000000000..848e1f39ce7 --- /dev/null +++ b/dbms/src/Storages/DeltaMerge/File/DMFileBlockInputStream.cpp @@ -0,0 +1,65 @@ +#include +#include + +namespace DB::DM +{ + +DMFileBlockInputStreamBuilder::DMFileBlockInputStreamBuilder(const Context & context) + : file_provider(context.getFileProvider()) + , read_limiter(context.getReadLimiter()) +{ + // init from global context + const auto & global_context = context.getGlobalContext(); + setCaches(global_context.getMarkCache(), global_context.getMinMaxIndexCache()); + // init from settings + setFromSettings(context.getSettingsRef()); +} + +DMFileBlockInputStreamPtr DMFileBlockInputStreamBuilder::build(const DMFilePtr & dmfile, const ColumnDefines & read_columns, const RowKeyRanges & rowkey_ranges) +{ + if (dmfile->getStatus() != DMFile::Status::READABLE) + throw Exception(fmt::format( + "DMFile [{}] is expected to be in READABLE status, but: {}", + dmfile->fileId(), + DMFile::statusString(dmfile->getStatus())), + ErrorCodes::LOGICAL_ERROR); + + // if `rowkey_ranges` is empty, we unconditionally read all packs + // `rowkey_ranges` and `is_common_handle` will only be useful in clean read mode. + // It is safe to ignore them here. + if (unlikely(rowkey_ranges.empty() && enable_clean_read)) + throw Exception("rowkey ranges shouldn't be empty with clean-read enabled", ErrorCodes::LOGICAL_ERROR); + + bool is_common_handle = !rowkey_ranges.empty() && rowkey_ranges[0].is_common_handle; + + DMFilePackFilter pack_filter = DMFilePackFilter::loadFrom( + dmfile, + index_cache, + rowkey_ranges, + rs_filter, + read_packs, + file_provider, + read_limiter, + tracing_logger); + + DMFileReader reader( + dmfile, + read_columns, + is_common_handle, + enable_clean_read, + max_data_version, + std::move(pack_filter), + mark_cache, + enable_column_cache, + column_cache, + aio_threshold, + max_read_buffer_size, + file_provider, + read_limiter, + rows_threshold_per_read, + read_one_pack_every_time, + tracing_logger); + + return std::make_shared(std::move(reader)); +} +} // namespace DB::DM diff --git a/dbms/src/Storages/DeltaMerge/File/DMFileBlockInputStream.h b/dbms/src/Storages/DeltaMerge/File/DMFileBlockInputStream.h index d83c7de5a1d..9f166acd5e1 100644 --- a/dbms/src/Storages/DeltaMerge/File/DMFileBlockInputStream.h +++ b/dbms/src/Storages/DeltaMerge/File/DMFileBlockInputStream.h @@ -14,55 +14,26 @@ #pragma once -#include #include #include +#include #include namespace DB { +class Context; +class Logger; +using LoggerPtr = std::shared_ptr; namespace DM { class DMFileBlockInputStream : public SkippableBlockInputStream { public: - DMFileBlockInputStream(const Context & context, - UInt64 max_read_version, - bool enable_clean_read, - UInt64 hash_salt, - const DMFilePtr & dmfile, - const ColumnDefines & read_columns, - const RowKeyRanges & rowkey_ranges, - const RSOperatorPtr & filter, - const ColumnCachePtr & column_cache_, - const IdSetPtr & read_packs, - size_t expected_size = DMFILE_READ_ROWS_THRESHOLD, - bool read_one_pack_every_time_ = false) - : reader(dmfile, - read_columns, - // clean read - enable_clean_read, - max_read_version, - // filters - rowkey_ranges, - filter, - read_packs, - // caches - hash_salt, - context.getGlobalContext().getMarkCache(), - context.getGlobalContext().getMinMaxIndexCache(), - context.getSettingsRef().dt_enable_stable_column_cache, - column_cache_, - context.getSettingsRef().min_bytes_to_use_direct_io, - context.getSettingsRef().max_read_buffer_size, - context.getFileProvider(), - context.getReadLimiter(), - expected_size, - read_one_pack_every_time_) - { - } + explicit DMFileBlockInputStream(DMFileReader && reader_) + : reader(std::move(reader_)) + {} - ~DMFileBlockInputStream() {} + ~DMFileBlockInputStream() = default; String getName() const override { return "DMFile"; } @@ -78,6 +49,115 @@ class DMFileBlockInputStream : public SkippableBlockInputStream using DMFileBlockInputStreamPtr = std::shared_ptr; +class DMFileBlockInputStreamBuilder +{ +public: + // Construct a builder by `context`. + // It implicitly set the params by + // - mark cache and min-max-index cache from global context + // - current settings from this context + // - current read limiter form this context + // - current file provider from this context + explicit DMFileBlockInputStreamBuilder(const Context & context); + + // Build the final stream ptr. + // Should not use the builder again after `build` is called. + DMFileBlockInputStreamPtr build( + const DMFilePtr & dmfile, + const ColumnDefines & read_columns, + const RowKeyRanges & rowkey_ranges); + + // **** filters **** // + + // Only set this param to true when + // 1. There is no delta. + // 2. You don't need pk, version and delete_tag columns + // If you have no idea what it means, then simply set it to false. + // `max_data_version_` is the MVCC filter version for reading. Used by clean read check + DMFileBlockInputStreamBuilder & enableCleanRead(bool enable, UInt64 max_data_version_) + { + enable_clean_read = enable; + max_data_version = max_data_version_; + return *this; + } + + DMFileBlockInputStreamBuilder & setRSOperator(const RSOperatorPtr & filter_) + { + rs_filter = filter_; + return *this; + } + + DMFileBlockInputStreamBuilder & setReadPacks(const IdSetPtr & read_packs_) + { + read_packs = read_packs_; + return *this; + } + + DMFileBlockInputStreamBuilder & setColumnCache(const ColumnCachePtr & column_cache_) + { + // note that `enable_column_cache` is controlled by Settings (see `setFromSettings`) + column_cache = column_cache_; + return *this; + } + + DMFileBlockInputStreamBuilder & onlyReadOnePackEveryTime() + { + read_one_pack_every_time = true; + return *this; + } + DMFileBlockInputStreamBuilder & setRowsThreshold(size_t rows_threshold_per_read_) + { + rows_threshold_per_read = rows_threshold_per_read_; + return *this; + } + + DMFileBlockInputStreamBuilder & setTracingLogger(const DB::LoggerPtr & logger) + { + tracing_logger = logger; + return *this; + } + +private: + // These methods are called by the ctor + + DMFileBlockInputStreamBuilder & setFromSettings(const Settings & settings) + { + enable_column_cache = settings.dt_enable_stable_column_cache; + aio_threshold = settings.min_bytes_to_use_direct_io; + max_read_buffer_size = settings.max_read_buffer_size; + return *this; + } + DMFileBlockInputStreamBuilder & setCaches(const MarkCachePtr & mark_cache_, const MinMaxIndexCachePtr & index_cache_) + { + mark_cache = mark_cache_; + index_cache = index_cache_; + return *this; + } + +private: + FileProviderPtr file_provider; + + // clean read + bool enable_clean_read = false; + UInt64 max_data_version = std::numeric_limits::max(); + // Rough set filter + RSOperatorPtr rs_filter; + // packs filter (filter by pack index) + IdSetPtr read_packs; + MarkCachePtr mark_cache; + MinMaxIndexCachePtr index_cache; + // column cache + bool enable_column_cache = false; + ColumnCachePtr column_cache; + ReadLimiterPtr read_limiter; + size_t aio_threshold; + size_t max_read_buffer_size; + size_t rows_threshold_per_read = DMFILE_READ_ROWS_THRESHOLD; + bool read_one_pack_every_time = false; + + DB::LoggerPtr tracing_logger; +}; + /** * Create a simple stream that read all blocks on default. * @param context Database context. @@ -88,18 +168,11 @@ inline DMFileBlockInputStreamPtr createSimpleBlockInputStream(const DB::Context { // disable clean read is needed, since we just want to read all data from the file, and we do not know about the column handle // enable read_one_pack_every_time_ is needed to preserve same block structure as the original file - return std::make_shared(context, - DB::DM::MAX_UINT64 /*< max_read_version */, - false /*< enable_clean_read */, - 0 /*< hash_salt */, - file, - file->getColumnDefines(), - DB::DM::RowKeyRanges{}, - DB::DM::RSOperatorPtr{}, - DB::DM::ColumnCachePtr{}, - DB::DM::IdSetPtr{}, - DMFILE_READ_ROWS_THRESHOLD, - true /*< read_one_pack_every_time_ */); + DMFileBlockInputStreamBuilder builder(context); + return builder + .setRowsThreshold(DMFILE_READ_ROWS_THRESHOLD) + .onlyReadOnePackEveryTime() + .build(file, file->getColumnDefines(), DB::DM::RowKeyRanges{}); } } // namespace DM diff --git a/dbms/src/Storages/DeltaMerge/File/DMFilePackFilter.h b/dbms/src/Storages/DeltaMerge/File/DMFilePackFilter.h index a58a54b53bd..c81954efbc7 100644 --- a/dbms/src/Storages/DeltaMerge/File/DMFilePackFilter.h +++ b/dbms/src/Storages/DeltaMerge/File/DMFilePackFilter.h @@ -14,6 +14,7 @@ #pragma once +#include #include #include #include @@ -39,22 +40,23 @@ using IdSetPtr = std::shared_ptr; class DMFilePackFilter { public: - static DMFilePackFilter loadFrom(const DMFilePtr & dmfile, - const MinMaxIndexCachePtr & index_cache, - UInt64 hash_salt, - const RowKeyRanges & rowkey_ranges, - const RSOperatorPtr & filter, - const IdSetPtr & read_packs, - const FileProviderPtr & file_provider, - const ReadLimiterPtr & read_limiter) + static DMFilePackFilter loadFrom( + const DMFilePtr & dmfile, + const MinMaxIndexCachePtr & index_cache, + const RowKeyRanges & rowkey_ranges, + const RSOperatorPtr & filter, + const IdSetPtr & read_packs, + const FileProviderPtr & file_provider, + const ReadLimiterPtr & read_limiter, + const DB::LoggerPtr & tracing_logger) { - auto pack_filter = DMFilePackFilter(dmfile, index_cache, hash_salt, rowkey_ranges, filter, read_packs, file_provider, read_limiter); + auto pack_filter = DMFilePackFilter(dmfile, index_cache, rowkey_ranges, filter, read_packs, file_provider, read_limiter, tracing_logger); pack_filter.init(); return pack_filter; } - const std::vector & getHandleRes() { return handle_res; } - const std::vector & getUsePacks() { return use_packs; } + inline const std::vector & getHandleRes() const { return handle_res; } + inline const std::vector & getUsePacks() const { return use_packs; } Handle getMinHandle(size_t pack_id) { @@ -85,7 +87,7 @@ class DMFilePackFilter { size_t rows = 0; size_t bytes = 0; - auto & pack_stats = dmfile->getPackStats(); + const auto & pack_stats = dmfile->getPackStats(); for (size_t i = 0; i < pack_stats.size(); ++i) { if (use_packs[i]) @@ -100,22 +102,21 @@ class DMFilePackFilter private: DMFilePackFilter(const DMFilePtr & dmfile_, const MinMaxIndexCachePtr & index_cache_, - UInt64 hash_salt_, const RowKeyRanges & rowkey_ranges_, // filter by handle range const RSOperatorPtr & filter_, // filter by push down where clause const IdSetPtr & read_packs_, // filter by pack index const FileProviderPtr & file_provider_, - const ReadLimiterPtr & read_limiter_) + const ReadLimiterPtr & read_limiter_, + const DB::LoggerPtr & tracing_logger) : dmfile(dmfile_) , index_cache(index_cache_) - , hash_salt(hash_salt_) , rowkey_ranges(rowkey_ranges_) , filter(filter_) , read_packs(read_packs_) , file_provider(file_provider_) , handle_res(dmfile->getPacks(), RSResult::All) , use_packs(dmfile->getPacks()) - , log(&Poco::Logger::get("DMFilePackFilter")) + , log(tracing_logger ? tracing_logger : DB::Logger::get("DMFilePackFilter")) , read_limiter(read_limiter_) { } @@ -165,7 +166,7 @@ class DMFilePackFilter { for (size_t i = 0; i < pack_count; ++i) { - use_packs[i] = ((bool)use_packs[i]) && ((bool)read_packs->count(i)); + use_packs[i] = (static_cast(use_packs[i])) && (static_cast(read_packs->count(i))); } } @@ -186,7 +187,7 @@ class DMFilePackFilter for (size_t i = 0; i < pack_count; ++i) { - use_packs[i] = ((bool)use_packs[i]) && (filter->roughCheck(i, param) != None); + use_packs[i] = (static_cast(use_packs[i])) && (filter->roughCheck(i, param) != None); } } @@ -194,7 +195,7 @@ class DMFilePackFilter after_filter += u; ProfileEvents::increment(ProfileEvents::DMFileFilterAftRoughSet, after_filter); - Float64 filter_rate = (Float64)(after_read_packs - after_filter) * 100 / after_read_packs; + Float64 filter_rate = (after_read_packs - after_filter) * 100.0 / after_read_packs; if (after_read_packs != 0) { GET_METRIC(tiflash_storage_rough_set_filter_rate, type_dtfile_pack).Observe(filter_rate); @@ -207,9 +208,6 @@ class DMFilePackFilter << ", pack_count: " << pack_count); } - friend class DMFileReader; - -private: static void loadIndex(ColumnIndexes & indexes, const DMFilePtr & dmfile, const FileProviderPtr & file_provider, @@ -217,7 +215,7 @@ class DMFilePackFilter ColId col_id, const ReadLimiterPtr & read_limiter) { - auto & type = dmfile->getColumnStat(col_id).type; + const auto & type = dmfile->getColumnStat(col_id).type; const auto file_name_base = DMFile::getFileNameBase(col_id); auto load = [&]() { @@ -275,7 +273,6 @@ class DMFilePackFilter private: DMFilePtr dmfile; MinMaxIndexCachePtr index_cache; - UInt64 hash_salt; RowKeyRanges rowkey_ranges; RSOperatorPtr filter; IdSetPtr read_packs; @@ -286,7 +283,7 @@ class DMFilePackFilter std::vector handle_res; std::vector use_packs; - Poco::Logger * log; + DB::LoggerPtr log; ReadLimiterPtr read_limiter; }; diff --git a/dbms/src/Storages/DeltaMerge/File/DMFileReader.cpp b/dbms/src/Storages/DeltaMerge/File/DMFileReader.cpp index 152a440c3b2..f0ccccb1620 100644 --- a/dbms/src/Storages/DeltaMerge/File/DMFileReader.cpp +++ b/dbms/src/Storages/DeltaMerge/File/DMFileReader.cpp @@ -15,8 +15,11 @@ #include #include #include +#include #include #include +#include +#include #include #include #include @@ -41,7 +44,7 @@ DMFileReader::Stream::Stream( const String & file_name_base, size_t aio_threshold, size_t max_read_buffer_size, - Poco::Logger * log, + const DB::LoggerPtr & log, const ReadLimiterPtr & read_limiter) : single_file_mode(reader.single_file_mode) , avg_size_hint(reader.dmfile->getColumnStat(col_id).avg_size) @@ -111,11 +114,12 @@ DMFileReader::Stream::Stream( size_t buffer_size = 0; size_t estimated_size = 0; + const auto & use_packs = reader.pack_filter.getUsePacks(); if (reader.single_file_mode) { for (size_t i = 0; i < packs; i++) { - if (!reader.use_packs[i]) + if (!use_packs[i]) { continue; } @@ -127,7 +131,7 @@ DMFileReader::Stream::Stream( { for (size_t i = 0; i < packs;) { - if (!reader.use_packs[i]) + if (!use_packs[i]) { ++i; continue; @@ -135,7 +139,7 @@ DMFileReader::Stream::Stream( size_t cur_offset_in_file = getOffsetInFile(i); size_t end = i + 1; // First find the end of current available range. - while (end < packs && reader.use_packs[end]) + while (end < packs && use_packs[end]) ++end; // Second If the end of range is inside the block, we will need to read it too. @@ -199,17 +203,14 @@ DMFileReader::Stream::Stream( DMFileReader::DMFileReader( const DMFilePtr & dmfile_, const ColumnDefines & read_columns_, + bool is_common_handle_, // clean read bool enable_clean_read_, UInt64 max_read_version_, // filters - const RowKeyRanges & rowkey_ranges_, - const RSOperatorPtr & filter_, - const IdSetPtr & read_packs_, + DMFilePackFilter && pack_filter_, // caches - UInt64 hash_salt_, const MarkCachePtr & mark_cache_, - const MinMaxIndexCachePtr & index_cache_, bool enable_column_cache_, const ColumnCachePtr & column_cache_, size_t aio_threshold, @@ -217,38 +218,24 @@ DMFileReader::DMFileReader( const FileProviderPtr & file_provider_, const ReadLimiterPtr & read_limiter, size_t rows_threshold_per_read_, - bool read_one_pack_every_time_) + bool read_one_pack_every_time_, + const DB::LoggerPtr & tracing_logger) : dmfile(dmfile_) , read_columns(read_columns_) + , is_common_handle(is_common_handle_) + , read_one_pack_every_time(read_one_pack_every_time_) + , single_file_mode(dmfile_->isSingleFileMode()) , enable_clean_read(enable_clean_read_) , max_read_version(max_read_version_) - , pack_filter(dmfile_, index_cache_, hash_salt_, rowkey_ranges_, filter_, read_packs_, file_provider_, read_limiter) - , handle_res(pack_filter.getHandleRes()) - , use_packs(pack_filter.getUsePacks()) + , pack_filter(std::move(pack_filter_)) , skip_packs_by_column(read_columns.size(), 0) - , hash_salt(hash_salt_) , mark_cache(mark_cache_) , enable_column_cache(enable_column_cache_ && column_cache_) , column_cache(column_cache_) , rows_threshold_per_read(rows_threshold_per_read_) , file_provider(file_provider_) - , read_one_pack_every_time(read_one_pack_every_time_) - , single_file_mode(dmfile_->isSingleFileMode()) - , log(&Poco::Logger::get("DMFileReader")) + , log(tracing_logger ? tracing_logger : DB::Logger::get("DMFileReader")) { - if (dmfile->getStatus() != DMFile::Status::READABLE) - throw Exception("DMFile [" + DB::toString(dmfile->fileId()) - + "] is expected to be in READABLE status, but: " + DMFile::statusString(dmfile->getStatus())); - - // if `rowkey_ranges` is empty, we unconditionally read all packs - // `rowkey_ranges` and `is_common_handle` will only be useful in clean read mode. - // It is safe to ignore them here. - if (unlikely(rowkey_ranges_.empty() && enable_clean_read_)) - throw Exception("rowkey ranges shouldn't be empty with clean-read enabled", ErrorCodes::LOGICAL_ERROR); - - pack_filter.init(); - is_common_handle = !rowkey_ranges_.empty() && rowkey_ranges_[0].is_common_handle; - for (const auto & cd : read_columns) { // New inserted column, will be filled with default value later @@ -276,12 +263,13 @@ DMFileReader::DMFileReader( bool DMFileReader::shouldSeek(size_t pack_id) { // If current pack is the first one, or we just finished reading the last pack, then no need to seek. - return pack_id != 0 && !use_packs[pack_id - 1]; + return pack_id != 0 && !pack_filter.getUsePacks()[pack_id - 1]; } bool DMFileReader::getSkippedRows(size_t & skip_rows) { skip_rows = 0; + const auto & use_packs = pack_filter.getUsePacks(); const auto & pack_stats = dmfile->getPackStats(); for (; next_pack_id < use_packs.size() && !use_packs[next_pack_id]; ++next_pack_id) { @@ -306,6 +294,7 @@ Block DMFileReader::read() size_t skip_rows; getSkippedRows(skip_rows); + const auto & use_packs = pack_filter.getUsePacks(); if (next_pack_id >= use_packs.size()) return {}; @@ -315,10 +304,11 @@ Block DMFileReader::read() // 0 means no limit size_t read_pack_limit = (single_file_mode || read_one_pack_every_time) ? 1 : 0; - auto & pack_stats = dmfile->getPackStats(); + const auto & pack_stats = dmfile->getPackStats(); size_t read_rows = 0; size_t not_clean_rows = 0; + const std::vector & handle_res = pack_filter.getHandleRes(); // alias of handle_res in pack_filter RSResult expected_handle_res = handle_res[next_pack_id]; for (; next_pack_id < use_packs.size() && use_packs[next_pack_id] && read_rows < rows_threshold_per_read; ++next_pack_id) { @@ -382,7 +372,7 @@ Block DMFileReader::read() } else if (cd.id == TAG_COLUMN_ID) { - column = cd.type->createColumnConst(read_rows, Field((UInt64)(pack_stats[start_pack_id].first_tag))); + column = cd.type->createColumnConst(read_rows, Field(static_cast(pack_stats[start_pack_id].first_tag))); } res.insert(ColumnWithTypeAndName{column, cd.type, cd.name, cd.id}); diff --git a/dbms/src/Storages/DeltaMerge/File/DMFileReader.h b/dbms/src/Storages/DeltaMerge/File/DMFileReader.h index e6109a99721..e844df64cfd 100644 --- a/dbms/src/Storages/DeltaMerge/File/DMFileReader.h +++ b/dbms/src/Storages/DeltaMerge/File/DMFileReader.h @@ -16,6 +16,7 @@ #include #include +#include #include #include #include @@ -43,7 +44,7 @@ class DMFileReader const String & file_name_base, size_t aio_threshold, size_t max_read_buffer_size, - Poco::Logger * log, + const DB::LoggerPtr & log, const ReadLimiterPtr & read_limiter); const bool single_file_mode; @@ -69,29 +70,27 @@ class DMFileReader DMFileReader( const DMFilePtr & dmfile_, const ColumnDefines & read_columns_, + bool is_common_handle_, // Only set this param to true when // 1. There is no delta. // 2. You don't need pk, version and delete_tag columns // If you have no idea what it means, then simply set it to false. bool enable_clean_read_, // The the MVCC filter version. Used by clean read check. - UInt64 max_data_version_, + UInt64 max_read_version_, // filters - const RowKeyRanges & rowkey_ranges_, - const RSOperatorPtr & filter_, - const IdSetPtr & read_packs_, // filter by pack index + DMFilePackFilter && pack_filter_, // caches - UInt64 hash_salt_, const MarkCachePtr & mark_cache_, - const MinMaxIndexCachePtr & index_cache_, bool enable_column_cache_, const ColumnCachePtr & column_cache_, size_t aio_threshold, size_t max_read_buffer_size, const FileProviderPtr & file_provider_, const ReadLimiterPtr & read_limiter, - size_t rows_threshold_per_read_ = DMFILE_READ_ROWS_THRESHOLD, - bool read_one_pack_every_time_ = false); + size_t rows_threshold_per_read_, + bool read_one_pack_every_time_, + const DB::LoggerPtr & tracing_logger); Block getHeader() const { return toEmptyBlock(read_columns); } @@ -115,6 +114,13 @@ class DMFileReader ColumnDefines read_columns; ColumnStreams column_streams; + const bool is_common_handle; + + // read_one_pack_every_time is used to create info for every pack + const bool read_one_pack_every_time; + + const bool single_file_mode; + /// Clean read optimize // If there is no delta for some packs in stable, we can try to do clean read. const bool enable_clean_read; @@ -122,15 +128,10 @@ class DMFileReader /// Filters DMFilePackFilter pack_filter; - const std::vector & handle_res; // alias of handle_res in pack_filter - const std::vector & use_packs; // alias of use_packs in pack_filter - - bool is_common_handle; std::vector skip_packs_by_column; /// Caches - const UInt64 hash_salt; MarkCachePtr mark_cache; const bool enable_column_cache; ColumnCachePtr column_cache; @@ -141,12 +142,7 @@ class DMFileReader FileProviderPtr file_provider; - // read_one_pack_every_time is used to create info for every pack - const bool read_one_pack_every_time; - - const bool single_file_mode; - - Poco::Logger * log; + DB::LoggerPtr log; }; } // namespace DM diff --git a/dbms/src/Storages/DeltaMerge/Segment.cpp b/dbms/src/Storages/DeltaMerge/Segment.cpp index 12524d0081b..18404edef00 100644 --- a/dbms/src/Storages/DeltaMerge/Segment.cpp +++ b/dbms/src/Storages/DeltaMerge/Segment.cpp @@ -23,6 +23,7 @@ #include #include #include +#include #include #include #include @@ -126,7 +127,7 @@ DMFilePtr writeIntoNewDMFile(DMContext & dm_context, // // When the input_stream is not mvcc, we assume the rows in this input_stream is most valid and make it not tend to be gc. size_t cur_effective_num_rows = block.rows(); size_t cur_not_clean_rows = 1; - size_t gc_hint_version = UINT64_MAX; + size_t gc_hint_version = std::numeric_limits::max(); if (mvcc_stream) { cur_effective_num_rows = mvcc_stream->getEffectiveNumRows(); @@ -524,7 +525,7 @@ BlockInputStreamPtr Segment::getInputStreamRaw(const DMContext & dm_context, *new_columns_to_read, rowkey_ranges, EMPTY_FILTER, - MAX_UINT64, + std::numeric_limits::max(), expected_block_size, false); @@ -722,22 +723,20 @@ std::optional Segment::getSplitPointFast(DMContext & dm_context, co if (unlikely(!read_file)) throw Exception("Logical error: failed to find split point"); - DMFileBlockInputStream stream(dm_context.db_context, - MAX_UINT64, - false, - dm_context.hash_salt, - read_file, - {getExtraHandleColumnDefine(is_common_handle)}, - {RowKeyRange::newAll(is_common_handle, rowkey_column_size)}, - EMPTY_FILTER, - stable_snap->getColumnCaches()[file_index], - read_pack); - - stream.readPrefix(); - auto block = stream.read(); + DMFileBlockInputStreamBuilder builder(dm_context.db_context); + auto stream = builder + .setColumnCache(stable_snap->getColumnCaches()[file_index]) + .setReadPacks(read_pack) + .build( + read_file, + /*read_columns=*/{getExtraHandleColumnDefine(is_common_handle)}, + /*rowkey_ranges=*/{RowKeyRange::newAll(is_common_handle, rowkey_column_size)}); + + stream->readPrefix(); + auto block = stream->read(); if (!block) throw Exception("Unexpected empty block"); - stream.readSuffix(); + stream->readSuffix(); RowKeyColumnContainer rowkey_column(block.getByPosition(0).column, is_common_handle); RowKeyValue split_point(rowkey_column.getRowKeyValue(read_row_in_pack)); diff --git a/dbms/src/Storages/DeltaMerge/Segment.h b/dbms/src/Storages/DeltaMerge/Segment.h index 4fb7e72f7c1..664560dd401 100644 --- a/dbms/src/Storages/DeltaMerge/Segment.h +++ b/dbms/src/Storages/DeltaMerge/Segment.h @@ -158,7 +158,7 @@ class Segment : private boost::noncopyable const ColumnDefines & columns_to_read, const RowKeyRanges & read_ranges, const RSOperatorPtr & filter = {}, - UInt64 max_version = MAX_UINT64, + UInt64 max_version = std::numeric_limits::max(), size_t expected_block_size = DEFAULT_BLOCK_SIZE); /// Return a stream which is suitable for exporting data. @@ -293,7 +293,7 @@ class Segment : private boost::noncopyable const ColumnDefines & read_columns, const SegmentSnapshotPtr & segment_snap, const RowKeyRanges & read_ranges, - UInt64 max_version = MAX_UINT64) const; + UInt64 max_version = std::numeric_limits::max()) const; static ColumnDefinesPtr arrangeReadColumns( const ColumnDefine & handle, @@ -311,7 +311,7 @@ class Segment : private boost::noncopyable const IndexIterator & delta_index_begin, const IndexIterator & delta_index_end, size_t expected_block_size, - UInt64 max_version = MAX_UINT64); + UInt64 max_version = std::numeric_limits::max()); /// Merge delta & stable, and then take the middle one. std::optional getSplitPointSlow( diff --git a/dbms/src/Storages/DeltaMerge/StableValueSpace.cpp b/dbms/src/Storages/DeltaMerge/StableValueSpace.cpp index 5a9303bec34..e9d7d788a89 100644 --- a/dbms/src/Storages/DeltaMerge/StableValueSpace.cpp +++ b/dbms/src/Storages/DeltaMerge/StableValueSpace.cpp @@ -14,9 +14,11 @@ #include #include +#include #include #include #include +#include #include #include #include @@ -47,17 +49,17 @@ void StableValueSpace::setFiles(const DMFiles & files_, const RowKeyRange & rang else { auto index_cache = dm_context->db_context.getGlobalContext().getMinMaxIndexCache(); - auto hash_salt = dm_context->hash_salt; for (const auto & file : files_) { - auto pack_filter = DMFilePackFilter::loadFrom(file, - index_cache, - hash_salt, - {range}, - EMPTY_FILTER, - {}, - dm_context->db_context.getFileProvider(), - dm_context->getReadLimiter()); + auto pack_filter = DMFilePackFilter::loadFrom( + file, + index_cache, + {range}, + EMPTY_FILTER, + {}, + dm_context->db_context.getFileProvider(), + dm_context->getReadLimiter(), + /*tracing_logger*/ nullptr); auto [file_valid_rows, file_valid_bytes] = pack_filter.validRowsAndBytes(); rows += file_valid_rows; bytes += file_valid_bytes; @@ -202,18 +204,11 @@ void StableValueSpace::calculateStableProperty(const DMContext & context, const // // If we pass `segment_range` instead, // then the returned stream is a `SkippableBlockInputStream` which will complicate the implementation - BlockInputStreamPtr data_stream = std::make_shared(context.db_context, - std::numeric_limits::max(), - false, - context.hash_salt, - file, - read_columns, - RowKeyRanges{rowkey_range}, - nullptr, - nullptr, - IdSetPtr{}, - UINT64_MAX, // because we just read one pack at a time - true); + DMFileBlockInputStreamBuilder builder(context.db_context); + BlockInputStreamPtr data_stream = builder + .setRowsThreshold(std::numeric_limits::max()) // because we just read one pack at a time + .onlyReadOnePackEveryTime() + .build(file, read_columns, RowKeyRanges{rowkey_range}); auto mvcc_stream = std::make_shared>( data_stream, read_columns, @@ -238,14 +233,15 @@ void StableValueSpace::calculateStableProperty(const DMContext & context, const } mvcc_stream->readSuffix(); } - auto pack_filter = DMFilePackFilter::loadFrom(file, - context.db_context.getGlobalContext().getMinMaxIndexCache(), - context.hash_salt, - {rowkey_range}, - EMPTY_FILTER, - {}, - context.db_context.getFileProvider(), - context.getReadLimiter()); + auto pack_filter = DMFilePackFilter::loadFrom( + file, + context.db_context.getGlobalContext().getMinMaxIndexCache(), + {rowkey_range}, + EMPTY_FILTER, + {}, + context.db_context.getFileProvider(), + context.getReadLimiter(), + /*tracing_logger*/ nullptr); const auto & use_packs = pack_filter.getUsePacks(); size_t new_pack_properties_index = 0; bool use_new_pack_properties = pack_properties.property_size() == 0; @@ -334,25 +330,20 @@ SkippableBlockInputStreamPtr StableValueSpace::Snapshot::getInputStream(const DM for (size_t i = 0; i < stable->files.size(); i++) { - streams.push_back(std::make_shared( // - context.db_context, - max_data_version, - enable_clean_read, - context.hash_salt, - stable->files[i], - read_columns, - rowkey_ranges, - filter, - column_caches[i], - IdSetPtr{}, - expected_block_size)); + DMFileBlockInputStreamBuilder builder(context.db_context); + builder + .enableCleanRead(enable_clean_read, max_data_version) + .setRSOperator(filter) + .setColumnCache(column_caches[i]) + .setRowsThreshold(expected_block_size); + streams.push_back(builder.build(stable->files[i], read_columns, rowkey_ranges)); } return std::make_shared(streams); } RowsAndBytes StableValueSpace::Snapshot::getApproxRowsAndBytes(const DMContext & context, const RowKeyRange & range) const { - // Avoid unnessary reading IO + // Avoid unnecessary reading IO if (valid_rows == 0 || range.none()) return {0, 0}; @@ -366,14 +357,15 @@ RowsAndBytes StableValueSpace::Snapshot::getApproxRowsAndBytes(const DMContext & // don't refill the cache if the index does not exist. for (auto & f : stable->files) { - auto filter = DMFilePackFilter::loadFrom(f, // - nullptr, - context.hash_salt, - {range}, - RSOperatorPtr{}, - IdSetPtr{}, - context.db_context.getFileProvider(), - context.getReadLimiter()); + auto filter = DMFilePackFilter::loadFrom( + f, + nullptr, + {range}, + RSOperatorPtr{}, + IdSetPtr{}, + context.db_context.getFileProvider(), + context.getReadLimiter(), + /*tracing_logger*/ nullptr); const auto & pack_stats = f->getPackStats(); const auto & use_packs = filter.getUsePacks(); for (size_t i = 0; i < pack_stats.size(); ++i) diff --git a/dbms/src/Storages/DeltaMerge/tests/bank/SimpleLockManager.h b/dbms/src/Storages/DeltaMerge/tests/bank/SimpleLockManager.h index 452bbf96f40..f8315ac108f 100644 --- a/dbms/src/Storages/DeltaMerge/tests/bank/SimpleLockManager.h +++ b/dbms/src/Storages/DeltaMerge/tests/bank/SimpleLockManager.h @@ -76,7 +76,7 @@ class SimpleLockManager { lock_map.emplace(std::piecewise_construct, std::make_tuple(id), std::make_tuple()); } - if (isWriteLocked(id, UINT64_MAX)) + if (isWriteLocked(id, std::numeric_limits::max())) { latch.unlock(); return false; @@ -85,7 +85,7 @@ class SimpleLockManager while (true) { latch.lock(); - if (isWriteLocked(id, UINT64_MAX)) + if (isWriteLocked(id, std::numeric_limits::max())) { latch.unlock(); return false; @@ -107,7 +107,7 @@ class SimpleLockManager { std::lock_guard guard{mutex}; auto & locks = lock_map[id]; - size_t index = UINT64_MAX; + size_t index = std::numeric_limits::max(); for (size_t i = 0; i < locks.size(); i++) { if (locks[i].transaction_id == transaction_id && locks[i].type == LockType::READ) @@ -115,7 +115,7 @@ class SimpleLockManager index = i; } } - if (index != UINT64_MAX) + if (index != std::numeric_limits::max()) { locks.erase(locks.begin() + index, locks.begin() + index + 1); } @@ -130,7 +130,7 @@ class SimpleLockManager { std::lock_guard guard{mutex}; auto & locks = lock_map[id]; - size_t index = UINT64_MAX; + size_t index = std::numeric_limits::max(); for (size_t i = 0; i < locks.size(); i++) { if (locks[i].transaction_id == transaction_id && locks[i].type == LockType::WRITE) @@ -138,7 +138,7 @@ class SimpleLockManager index = i; } } - if (index != UINT64_MAX) + if (index != std::numeric_limits::max()) { locks.erase(locks.begin() + index, locks.begin() + index + 1); } diff --git a/dbms/src/Storages/DeltaMerge/tests/bank/main.cpp b/dbms/src/Storages/DeltaMerge/tests/bank/main.cpp index 6c4d9523e8c..115e170c48b 100644 --- a/dbms/src/Storages/DeltaMerge/tests/bank/main.cpp +++ b/dbms/src/Storages/DeltaMerge/tests/bank/main.cpp @@ -168,7 +168,7 @@ void run_bank(UInt64 account, UInt64 initial_balance, UInt64 worker_count, UInt6 verify_thread.join(); std::cout << "Last Verify\n"; - std::cout << proxy.sumBalance(0, end, UINT64_MAX) << std::endl; + std::cout << proxy.sumBalance(0, end, std::numeric_limits::max()) << std::endl; std::cout << "Complete\n"; } diff --git a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_file.cpp b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_file.cpp index c07ee59433b..e6fb6f236f3 100644 --- a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_file.cpp +++ b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_file.cpp @@ -19,6 +19,7 @@ #include #include #include +#include #include #include @@ -194,17 +195,10 @@ try { // Test read - auto stream = std::make_shared( // - dbContext(), - std::numeric_limits::max(), - false, - dmContext().hash_salt, - dm_file, - *cols, - RowKeyRanges{RowKeyRange::newAll(false, 1)}, - RSOperatorPtr{}, - column_cache_, - IdSetPtr{}); + DMFileBlockInputStreamBuilder builder(dbContext()); + auto stream = builder + .setColumnCache(column_cache_) + .build(dm_file, *cols, RowKeyRanges{RowKeyRange::newAll(false, 1)}); size_t num_rows_read = 0; stream->readPrefix(); @@ -233,24 +227,17 @@ try ASSERT_EQ(propertys.property_size(), 2); for (int i = 0; i < propertys.property_size(); i++) { - auto & property = propertys.property(i); + const auto & property = propertys.property(i); ASSERT_EQ((size_t)property.num_rows(), (size_t)block_propertys[i].effective_num_rows); ASSERT_EQ((size_t)property.gc_hint_version(), (size_t)block_propertys[i].effective_num_rows); } } { // Test read after restore - auto stream = std::make_shared( // - dbContext(), - std::numeric_limits::max(), - false, - dmContext().hash_salt, - dm_file, - *cols, - RowKeyRanges{RowKeyRange::newAll(false, 1)}, - RSOperatorPtr{}, - column_cache_, - IdSetPtr{}); + DMFileBlockInputStreamBuilder builder(dbContext()); + auto stream = builder + .setColumnCache(column_cache_) + .build(dm_file, *cols, RowKeyRanges{RowKeyRange::newAll(false, 1)}); size_t num_rows_read = 0; stream->readPrefix(); @@ -290,8 +277,8 @@ try EXPECT_FALSE(dm_file->canGC()); DMFile::ListOptions options; options.only_list_can_gc = true; - auto scanIds = DMFile::listAllInPath(file_provider, parent_path, options); - ASSERT_TRUE(scanIds.empty()); + auto scan_ids = DMFile::listAllInPath(file_provider, parent_path, options); + ASSERT_TRUE(scan_ids.empty()); { // Write some data and finialize the file @@ -312,24 +299,24 @@ try ASSERT_FALSE(dm_file->canGC()); options.only_list_can_gc = false; // Now the file can be scaned - scanIds = DMFile::listAllInPath(file_provider, parent_path, options); - ASSERT_EQ(scanIds.size(), 1UL); - EXPECT_EQ(*scanIds.begin(), id); + scan_ids = DMFile::listAllInPath(file_provider, parent_path, options); + ASSERT_EQ(scan_ids.size(), 1UL); + EXPECT_EQ(*scan_ids.begin(), id); options.only_list_can_gc = true; - scanIds = DMFile::listAllInPath(file_provider, parent_path, options); - EXPECT_TRUE(scanIds.empty()); + scan_ids = DMFile::listAllInPath(file_provider, parent_path, options); + EXPECT_TRUE(scan_ids.empty()); // After enable GC, the file can be scaned with `can_gc=true` dm_file->enableGC(); ASSERT_TRUE(dm_file->canGC()); options.only_list_can_gc = false; - scanIds = DMFile::listAllInPath(file_provider, parent_path, options); - ASSERT_EQ(scanIds.size(), 1UL); - EXPECT_EQ(*scanIds.begin(), id); + scan_ids = DMFile::listAllInPath(file_provider, parent_path, options); + ASSERT_EQ(scan_ids.size(), 1UL); + EXPECT_EQ(*scan_ids.begin(), id); options.only_list_can_gc = true; - scanIds = DMFile::listAllInPath(file_provider, parent_path, options); - ASSERT_EQ(scanIds.size(), 1UL); - EXPECT_EQ(*scanIds.begin(), id); + scan_ids = DMFile::listAllInPath(file_provider, parent_path, options); + ASSERT_EQ(scan_ids.size(), 1UL); + EXPECT_EQ(*scan_ids.begin(), id); } CATCH @@ -359,17 +346,10 @@ try { // Test read - auto stream = std::make_shared( // - dbContext(), - std::numeric_limits::max(), - false, - dmContext().hash_salt, - dm_file, - *cols, - RowKeyRanges{RowKeyRange::newAll(false, 1)}, - RSOperatorPtr{}, - column_cache_, - IdSetPtr{}); + DMFileBlockInputStreamBuilder builder(dbContext()); + auto stream = builder + .setColumnCache(column_cache_) + .build(dm_file, *cols, RowKeyRanges{RowKeyRange::newAll(false, 1)}); size_t num_rows_read = 0; stream->readPrefix(); @@ -432,17 +412,10 @@ try { // Test read - auto stream = std::make_shared( // - dbContext(), - std::numeric_limits::max(), - false, - dmContext().hash_salt, - dm_file, - *cols, - RowKeyRanges{RowKeyRange::newAll(false, 1)}, - RSOperatorPtr{}, - column_cache_, - IdSetPtr{}); + DMFileBlockInputStreamBuilder builder(dbContext()); + auto stream = builder + .setColumnCache(column_cache_) + .build(dm_file, *cols, RowKeyRanges{RowKeyRange::newAll(false, 1)}); size_t num_rows_read = 0; stream->readPrefix(); @@ -519,17 +492,10 @@ try ranges.emplace_back(HandleRange::newAll()); // full range auto test_read_range = [&](const HandleRange & range) { // Test read - auto stream = std::make_shared( // - dbContext(), - std::numeric_limits::max(), - false, - dmContext().hash_salt, - dm_file, - *cols, - RowKeyRanges{RowKeyRange::fromHandleRange(range)}, // Filtered by read_range - EMPTY_FILTER, - column_cache_, - IdSetPtr{}); + DMFileBlockInputStreamBuilder builder(dbContext()); + auto stream = builder + .setColumnCache(column_cache_) + .build(dm_file, *cols, RowKeyRanges{RowKeyRange::fromHandleRange(range)}); // Filtered by read_range Int64 num_rows_read = 0; stream->readPrefix(); @@ -629,17 +595,11 @@ try // Filtered by rough set filter auto filter = toRSFilter(i64_cd, range); // Test read - auto stream = std::make_shared( // - dbContext(), - std::numeric_limits::max(), - false, - dmContext().hash_salt, - dm_file, - *cols, - RowKeyRanges{RowKeyRange::newAll(false, 1)}, - filter, // Filtered by rough set filter - column_cache_, - IdSetPtr{}); + DMFileBlockInputStreamBuilder builder(dbContext()); + auto stream = builder + .setColumnCache(column_cache_) + .setRSOperator(filter) // Filtered by rough set filter + .build(dm_file, *cols, RowKeyRanges{RowKeyRange::newAll(false, 1)}); Int64 num_rows_read = 0; stream->readPrefix(); @@ -730,17 +690,11 @@ try filters.emplace_back(createOr({one_part_filter, createUnsupported("test", "test", false)}), num_rows_write); auto test_read_filter = [&](const DM::RSOperatorPtr & filter, const size_t num_rows_should_read) { // Test read - auto stream = std::make_shared( // - dbContext(), - std::numeric_limits::max(), - false, - dmContext().hash_salt, - dm_file, - *cols, - RowKeyRanges{RowKeyRange::newAll(false, 1)}, - filter, // Filtered by rough set filter - column_cache_, - IdSetPtr{}); + DMFileBlockInputStreamBuilder builder(dbContext()); + auto stream = builder + .setColumnCache(column_cache_) + .setRSOperator(filter) // Filtered by rough set filter + .build(dm_file, *cols, RowKeyRanges{RowKeyRange::newAll(false, 1)}); Int64 num_rows_read = 0; stream->readPrefix(); @@ -821,17 +775,11 @@ try id_set_ptr = std::make_shared(test_sets[test_index]); // Test read - auto stream = std::make_shared( // - dbContext(), - std::numeric_limits::max(), - false, - dmContext().hash_salt, - dm_file, - *cols, - RowKeyRanges{RowKeyRange::newAll(false, 1)}, - EMPTY_FILTER, - column_cache_, - id_set_ptr); + DMFileBlockInputStreamBuilder builder(dbContext()); + auto stream = builder + .setColumnCache(column_cache_) + .setReadPacks(id_set_ptr) // filter by pack index + .build(dm_file, *cols, RowKeyRanges{RowKeyRange::newAll(false, 1)}); Int64 num_rows_read = 0; stream->readPrefix(); @@ -919,17 +867,10 @@ try { // Test Read - auto stream = std::make_unique( // - dbContext(), - std::numeric_limits::max(), - false, - dmContext().hash_salt, - dm_file, - *cols, - RowKeyRanges{RowKeyRange::newAll(false, 1)}, - RSOperatorPtr{}, - column_cache_, - IdSetPtr{}); + DMFileBlockInputStreamBuilder builder(dbContext()); + auto stream = builder + .setColumnCache(column_cache_) + .build(dm_file, *cols, RowKeyRanges{RowKeyRange::newAll(false, 1)}); size_t num_rows_read = 0; stream->readPrefix(); @@ -985,17 +926,10 @@ try { // Test Read - auto stream = std::make_unique( // - dbContext(), - std::numeric_limits::max(), - false, - dmContext().hash_salt, - dm_file, - *cols, - RowKeyRanges{RowKeyRange::newAll(false, 1)}, - RSOperatorPtr{}, - column_cache_, - IdSetPtr{}); + DMFileBlockInputStreamBuilder builder(dbContext()); + auto stream = builder + .setColumnCache(column_cache_) + .build(dm_file, *cols, RowKeyRanges{RowKeyRange::newAll(false, 1)}); size_t num_rows_read = 0; stream->readPrefix(); @@ -1053,17 +987,10 @@ try { // Test read - auto stream = std::make_shared( // - dbContext(), - std::numeric_limits::max(), - false, - dmContext().hash_salt, - dm_file, - *cols, - RowKeyRanges{RowKeyRange::newAll(false, 1)}, - RSOperatorPtr{}, - column_cache_, - IdSetPtr{}); + DMFileBlockInputStreamBuilder builder(dbContext()); + auto stream = builder + .setColumnCache(column_cache_) + .build(dm_file, *cols, RowKeyRanges{RowKeyRange::newAll(false, 1)}); size_t num_rows_read = 0; stream->readPrefix(); @@ -1220,17 +1147,10 @@ try { // Test read - auto stream = std::make_shared( // - dbContext(), - std::numeric_limits::max(), - false, - dmContext().hash_salt, - dm_file, - *cols, - RowKeyRanges{RowKeyRange::newAll(is_common_handle, rowkey_column_size)}, - RSOperatorPtr{}, - column_cache_, - IdSetPtr{}); + DMFileBlockInputStreamBuilder builder(dbContext()); + auto stream = builder + .setColumnCache(column_cache_) + .build(dm_file, *cols, RowKeyRanges{RowKeyRange::newAll(is_common_handle, rowkey_column_size)}); size_t num_rows_read = 0; stream->readPrefix(); @@ -1314,17 +1234,10 @@ try for (const auto & range : ranges) { // Test read - auto stream = std::make_shared( // - dbContext(), - std::numeric_limits::max(), - false, - dmContext().hash_salt, - dm_file, - *cols, - RowKeyRanges{range.range}, // Filtered by read_range - EMPTY_FILTER, - column_cache_, - IdSetPtr{}); + DMFileBlockInputStreamBuilder builder(dbContext()); + auto stream = builder + .setColumnCache(column_cache_) + .build(dm_file, *cols, RowKeyRanges{range.range}); // Filtered by read_range Int64 num_rows_read = 0; stream->readPrefix(); @@ -1429,17 +1342,10 @@ try { // Test read with new columns after ddl - auto stream = std::make_unique( // - dbContext(), - std::numeric_limits::max(), - false, - dmContext().hash_salt, - dm_file, - *cols_after_ddl, - RowKeyRanges{RowKeyRange::newAll(false, 1)}, - RSOperatorPtr{}, - column_cache_, - IdSetPtr{}); + DMFileBlockInputStreamBuilder builder(dbContext()); + auto stream = builder + .setColumnCache(column_cache_) + .build(dm_file, *cols_after_ddl, RowKeyRanges{RowKeyRange::newAll(false, 1)}); size_t num_rows_read = 0; stream->readPrefix(); @@ -1522,17 +1428,10 @@ try { // Test read with new columns after ddl - auto stream = std::make_unique( // - dbContext(), - std::numeric_limits::max(), - false, - dmContext().hash_salt, - dm_file, - *cols_after_ddl, - RowKeyRanges{RowKeyRange::newAll(false, 1)}, - RSOperatorPtr{}, - column_cache_, - IdSetPtr{}); + DMFileBlockInputStreamBuilder builder(dbContext()); + auto stream = builder + .setColumnCache(column_cache_) + .build(dm_file, *cols_after_ddl, RowKeyRanges{RowKeyRange::newAll(false, 1)}); size_t num_rows_read = 0; stream->readPrefix(); @@ -1592,17 +1491,10 @@ try { // Test read with new columns after ddl - auto stream = std::make_unique( // - dbContext(), - std::numeric_limits::max(), - false, - dmContext().hash_salt, - dm_file, - *cols_after_ddl, - RowKeyRanges{RowKeyRange::newAll(false, 1)}, - RSOperatorPtr{}, - column_cache_, - IdSetPtr{}); + DMFileBlockInputStreamBuilder builder(dbContext()); + auto stream = builder + .setColumnCache(column_cache_) + .build(dm_file, *cols_after_ddl, RowKeyRanges{RowKeyRange::newAll(false, 1)}); size_t num_rows_read = 0; stream->readPrefix(); @@ -1662,17 +1554,10 @@ try { // Test read with new columns after ddl - auto stream = std::make_unique( // - dbContext(), - std::numeric_limits::max(), - false, - dmContext().hash_salt, - dm_file, - *cols_after_ddl, - RowKeyRanges{RowKeyRange::newAll(false, 1)}, - RSOperatorPtr{}, - column_cache_, - IdSetPtr{}); + DMFileBlockInputStreamBuilder builder(dbContext()); + auto stream = builder + .setColumnCache(column_cache_) + .build(dm_file, *cols_after_ddl, RowKeyRanges{RowKeyRange::newAll(false, 1)}); size_t num_rows_read = 0; stream->readPrefix(); diff --git a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_minmax_index.cpp b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_minmax_index.cpp index 2f201d24a95..bbb8aaf8206 100644 --- a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_minmax_index.cpp +++ b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_minmax_index.cpp @@ -124,7 +124,7 @@ bool checkMatch( store->mergeDeltaAll(context); const ColumnDefine & col_to_read = check_pk ? getExtraHandleColumnDefine(is_common_handle) : cd; - auto streams = store->read(context, context.getSettingsRef(), {col_to_read}, {all_range}, 1, MAX_UINT64, filter); + auto streams = store->read(context, context.getSettingsRef(), {col_to_read}, {all_range}, 1, std::numeric_limits::max(), filter); streams[0]->readPrefix(); auto rows = streams[0]->read().rows(); streams[0]->readSuffix(); diff --git a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_segment.cpp b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_segment.cpp index a77629f72f4..3de6e5f9993 100644 --- a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_segment.cpp +++ b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_segment.cpp @@ -370,7 +370,7 @@ try snap, {RowKeyRange::newAll(false, 1)}, {}, - MAX_UINT64, + std::numeric_limits::max(), DEFAULT_BLOCK_SIZE); int num_rows_read = 0; in->readPrefix(); @@ -1769,10 +1769,10 @@ try } { - auto & stable = segment->getStable(); + const auto & stable = segment->getStable(); ASSERT_GT(stable->getDMFiles()[0]->getPacks(), (size_t)1); ASSERT_EQ(stable->getRows(), num_rows_write_every_round * write_round); - // caculate StableProperty + // calculate StableProperty ASSERT_EQ(stable->isStablePropertyCached(), false); auto start = RowKeyValue::fromHandle(0); auto end = RowKeyValue::fromHandle(num_rows_write_every_round); @@ -1780,8 +1780,8 @@ try // calculate the StableProperty for packs in the key range [0, num_rows_write_every_round) stable->calculateStableProperty(dmContext(), range, false); ASSERT_EQ(stable->isStablePropertyCached(), true); - auto & property = stable->getStableProperty(); - ASSERT_EQ(property.gc_hint_version, UINT64_MAX); + const auto & property = stable->getStableProperty(); + ASSERT_EQ(property.gc_hint_version, std::numeric_limits::max()); ASSERT_EQ(property.num_versions, num_rows_write_every_round); ASSERT_EQ(property.num_puts, num_rows_write_every_round); ASSERT_EQ(property.num_rows, num_rows_write_every_round); @@ -1831,7 +1831,7 @@ try stable->calculateStableProperty(dmContext(), range, false); ASSERT_EQ(stable->isStablePropertyCached(), true); auto & property = stable->getStableProperty(); - ASSERT_EQ(property.gc_hint_version, UINT64_MAX); + ASSERT_EQ(property.gc_hint_version, std::numeric_limits::max()); ASSERT_EQ(property.num_versions, num_rows_write_every_round); ASSERT_EQ(property.num_puts, num_rows_write_every_round); ASSERT_EQ(property.num_rows, num_rows_write_every_round); diff --git a/dbms/src/Storages/DeltaMerge/tests/gtest_version_filter.cpp b/dbms/src/Storages/DeltaMerge/tests/gtest_version_filter.cpp index 6e3b11d36b9..59052e2e8b2 100644 --- a/dbms/src/Storages/DeltaMerge/tests/gtest_version_filter.cpp +++ b/dbms/src/Storages/DeltaMerge/tests/gtest_version_filter.cpp @@ -216,7 +216,7 @@ TEST(VersionFilter_test, Compact) auto in = genInputStream(blocks, columns, 40, false); auto * mvcc_stream = typeid_cast *>(in.get()); ASSERT_NE(mvcc_stream, nullptr); - UInt64 gc_hint_version = UINT64_MAX; + UInt64 gc_hint_version = std::numeric_limits::max(); in->readPrefix(); while (true) { @@ -229,14 +229,14 @@ TEST(VersionFilter_test, Compact) } ASSERT_EQ(mvcc_stream->getEffectiveNumRows(), (size_t)1); ASSERT_EQ(mvcc_stream->getNotCleanRows(), (size_t)0); - ASSERT_EQ(gc_hint_version, (size_t)UINT64_MAX); + ASSERT_EQ(gc_hint_version, (size_t)std::numeric_limits::max()); in->readSuffix(); } { auto in = genInputStream(blocks, columns, 30, false); auto * mvcc_stream = typeid_cast *>(in.get()); ASSERT_NE(mvcc_stream, nullptr); - UInt64 gc_hint_version = UINT64_MAX; + UInt64 gc_hint_version = std::numeric_limits::max(); in->readPrefix(); while (true) { @@ -256,7 +256,7 @@ TEST(VersionFilter_test, Compact) auto in = genInputStream(blocks, columns, 20, false); auto * mvcc_stream = typeid_cast *>(in.get()); ASSERT_NE(mvcc_stream, nullptr); - UInt64 gc_hint_version = UINT64_MAX; + UInt64 gc_hint_version = std::numeric_limits::max(); in->readPrefix(); while (true) { @@ -276,7 +276,7 @@ TEST(VersionFilter_test, Compact) auto in = genInputStream(blocks, columns, 10, false); auto * mvcc_stream = typeid_cast *>(in.get()); ASSERT_NE(mvcc_stream, nullptr); - UInt64 gc_hint_version = UINT64_MAX; + UInt64 gc_hint_version = std::numeric_limits::max(); in->readPrefix(); while (true) { @@ -319,7 +319,7 @@ TEST(VersionFilter_test, CompactCommonHandle) auto in = genInputStream(blocks, columns, 40, true); auto * mvcc_stream = typeid_cast *>(in.get()); ASSERT_NE(mvcc_stream, nullptr); - UInt64 gc_hint_version = UINT64_MAX; + UInt64 gc_hint_version = std::numeric_limits::max(); in->readPrefix(); while (true) { @@ -332,14 +332,14 @@ TEST(VersionFilter_test, CompactCommonHandle) } ASSERT_EQ(mvcc_stream->getEffectiveNumRows(), (size_t)1); ASSERT_EQ(mvcc_stream->getNotCleanRows(), (size_t)0); - ASSERT_EQ(gc_hint_version, (size_t)UINT64_MAX); + ASSERT_EQ(gc_hint_version, (size_t)std::numeric_limits::max()); in->readSuffix(); } { auto in = genInputStream(blocks, columns, 30, true); auto * mvcc_stream = typeid_cast *>(in.get()); ASSERT_NE(mvcc_stream, nullptr); - UInt64 gc_hint_version = UINT64_MAX; + UInt64 gc_hint_version = std::numeric_limits::max(); in->readPrefix(); while (true) { @@ -359,7 +359,7 @@ TEST(VersionFilter_test, CompactCommonHandle) auto in = genInputStream(blocks, columns, 20, true); auto * mvcc_stream = typeid_cast *>(in.get()); ASSERT_NE(mvcc_stream, nullptr); - UInt64 gc_hint_version = UINT64_MAX; + UInt64 gc_hint_version = std::numeric_limits::max(); in->readPrefix(); while (true) { @@ -379,7 +379,7 @@ TEST(VersionFilter_test, CompactCommonHandle) auto in = genInputStream(blocks, columns, 10, true); auto * mvcc_stream = typeid_cast *>(in.get()); ASSERT_NE(mvcc_stream, nullptr); - UInt64 gc_hint_version = UINT64_MAX; + UInt64 gc_hint_version = std::numeric_limits::max(); in->readPrefix(); while (true) { diff --git a/dbms/src/Storages/DeltaMerge/tools/workload/DataGenerator.cpp b/dbms/src/Storages/DeltaMerge/tools/workload/DataGenerator.cpp index 6dc150440ac..be6ff1dcbbe 100644 --- a/dbms/src/Storages/DeltaMerge/tools/workload/DataGenerator.cpp +++ b/dbms/src/Storages/DeltaMerge/tools/workload/DataGenerator.cpp @@ -27,9 +27,8 @@ namespace DB::DM::tests class RandomDataGenerator : public DataGenerator { public: - RandomDataGenerator(const WorkloadOptions & opts_, const TableInfo & table_info_, TimestampGenerator & ts_gen_) - : opts(opts_) - , table_info(table_info_) + RandomDataGenerator(const TableInfo & table_info_, TimestampGenerator & ts_gen_) + : table_info(table_info_) , ts_gen(ts_gen_) , rand_gen(std::random_device()()) {} @@ -244,7 +243,6 @@ class RandomDataGenerator : public DataGenerator return str; } - const WorkloadOptions & opts; const TableInfo & table_info; TimestampGenerator & ts_gen; std::mt19937_64 rand_gen; @@ -254,7 +252,7 @@ class RandomDataGenerator : public DataGenerator std::unique_ptr DataGenerator::create([[maybe_unused]] const WorkloadOptions & opts, const TableInfo & table_info, TimestampGenerator & ts_gen) { - return std::make_unique(opts, table_info, ts_gen); + return std::make_unique(table_info, ts_gen); } -} // namespace DB::DM::tests \ No newline at end of file +} // namespace DB::DM::tests diff --git a/dbms/src/Storages/Page/V3/LogFile/LogReader.cpp b/dbms/src/Storages/Page/V3/LogFile/LogReader.cpp index f901d75f86e..bd3258b6e07 100644 --- a/dbms/src/Storages/Page/V3/LogFile/LogReader.cpp +++ b/dbms/src/Storages/Page/V3/LogFile/LogReader.cpp @@ -37,8 +37,7 @@ LogReader::LogReader( Reporter * reporter_, bool verify_checksum_, Format::LogNumberType log_num_, - WALRecoveryMode recovery_mode_, - Poco::Logger * log_) + WALRecoveryMode recovery_mode_) : verify_checksum(verify_checksum_) , recycled(false) , is_last_block(false) @@ -51,7 +50,6 @@ LogReader::LogReader( , reporter(reporter_) , end_of_buffer_offset(0) , log_number(log_num_) - , log(log_) { // Must be `BLOCK_SIZE`, or we can not ensure the correctness of reading. assert(file->internalBuffer().size() == Format::BLOCK_SIZE); diff --git a/dbms/src/Storages/Page/V3/LogFile/LogReader.h b/dbms/src/Storages/Page/V3/LogFile/LogReader.h index d5e764ea3f7..6ba98da6eb3 100644 --- a/dbms/src/Storages/Page/V3/LogFile/LogReader.h +++ b/dbms/src/Storages/Page/V3/LogFile/LogReader.h @@ -49,8 +49,7 @@ class LogReader Reporter * reporter_, bool verify_checksum_, Format::LogNumberType log_num_, - WALRecoveryMode recovery_mode_, - Poco::Logger * log_); + WALRecoveryMode recovery_mode_); LogReader(const LogReader &) = delete; LogReader & operator=(const LogReader &) = delete; @@ -127,8 +126,6 @@ class LogReader UInt64 end_of_buffer_offset; // which log number it is const Format::LogNumberType log_number; - - Poco::Logger * log; }; } // namespace PS::V3 diff --git a/dbms/src/Storages/Page/V3/WAL/WALReader.cpp b/dbms/src/Storages/Page/V3/WAL/WALReader.cpp index bfb9b7617dc..d6f27ba66b7 100644 --- a/dbms/src/Storages/Page/V3/WAL/WALReader.cpp +++ b/dbms/src/Storages/Page/V3/WAL/WALReader.cpp @@ -206,8 +206,7 @@ bool WALStoreReader::openNextFile() &reporter, /*verify_checksum*/ true, log_num, - WALRecoveryMode::TolerateCorruptedTailRecords, - logger); + WALRecoveryMode::TolerateCorruptedTailRecords); }; if (!checkpoint_read_done) diff --git a/dbms/src/Storages/Page/V3/tests/gtest_wal_log.cpp b/dbms/src/Storages/Page/V3/tests/gtest_wal_log.cpp index 70b5bb5ce89..fff43f2681f 100644 --- a/dbms/src/Storages/Page/V3/tests/gtest_wal_log.cpp +++ b/dbms/src/Storages/Page/V3/tests/gtest_wal_log.cpp @@ -167,8 +167,7 @@ class LogFileRWTest : public ::testing::TestWithParam> &report, /* verify_checksum */ true, /* log_number */ log_num, - wal_recovery_mode, - log); + wal_recovery_mode); } void resetReader(const WALRecoveryMode wal_recovery_mode = WALRecoveryMode::TolerateCorruptedTailRecords) diff --git a/dbms/src/Storages/StorageDeltaMerge.cpp b/dbms/src/Storages/StorageDeltaMerge.cpp index 93d20baaec7..299f24ba2b8 100644 --- a/dbms/src/Storages/StorageDeltaMerge.cpp +++ b/dbms/src/Storages/StorageDeltaMerge.cpp @@ -808,7 +808,7 @@ size_t getRows(DM::DeltaMergeStorePtr & store, const Context & context, const DM to_read, {range}, 1, - MAX_UINT64, + std::numeric_limits::max(), EMPTY_FILTER)[0]; stream->readPrefix(); Block block; @@ -831,7 +831,7 @@ DM::RowKeyRange getRange(DM::DeltaMergeStorePtr & store, const Context & context to_read, {DM::RowKeyRange::newAll(store->isCommonHandle(), store->getRowKeyColumnSize())}, 1, - MAX_UINT64, + std::numeric_limits::max(), EMPTY_FILTER)[0]; stream->readPrefix(); Block block;