From 1d1e04f3da9d959d011ae0668aea6f59a6063ea6 Mon Sep 17 00:00:00 2001 From: Lloyd-Pottiger Date: Wed, 20 Mar 2024 11:26:06 +0800 Subject: [PATCH] Storage: Refactor DMFileReader Signed-off-by: Lloyd-Pottiger --- .../Storages/DeltaMerge/File/DMFileReader.cpp | 219 ++++++------------ .../Storages/DeltaMerge/File/DMFileReader.h | 8 +- 2 files changed, 69 insertions(+), 158 deletions(-) diff --git a/dbms/src/Storages/DeltaMerge/File/DMFileReader.cpp b/dbms/src/Storages/DeltaMerge/File/DMFileReader.cpp index e9903d103c7..7c44386ddf5 100644 --- a/dbms/src/Storages/DeltaMerge/File/DMFileReader.cpp +++ b/dbms/src/Storages/DeltaMerge/File/DMFileReader.cpp @@ -31,6 +31,7 @@ #include #include + namespace CurrentMetrics { extern const Metric OpenFileForRead; @@ -78,7 +79,7 @@ DMFileReader::DMFileReader( , enable_column_cache(enable_column_cache_ && column_cache_) , max_read_version(max_read_version_) , pack_filter(std::move(pack_filter_)) - , skip_packs_by_column(read_columns.size(), 0) + , must_skip_next_read(read_columns.size(), false) , mark_cache(mark_cache_) , column_cache(column_cache_) , scan_context(scan_context_) @@ -111,10 +112,6 @@ DMFileReader::DMFileReader( if (max_sharing_column_count > 0) { col_data_cache = std::make_unique(path(), read_columns, max_sharing_column_count, log); - for (const auto & cd : read_columns) - { - last_read_from_cache[cd.id] = false; - } } } @@ -141,43 +138,14 @@ bool DMFileReader::getSkippedRows(size_t & skip_rows) size_t DMFileReader::skipNextBlock() { - // Go to next available pack. - size_t skip; - if (!getSkippedRows(skip)) - return 0; - - // Find the next contiguous packs will be read in next read, - // let next_pack_id point to the next pack of the contiguous packs. - // For example, if we have 10 packs, use_packs is [0, 1, 1, 0, 1, 1, 0, 0, 1, 1], - // and now next_pack_id is 1, then we will skip 2 packs(index 1 and 2), and next_pack_id will be 3. - const size_t read_pack_limit = read_one_pack_every_time ? 1 : 0; - const std::vector & handle_res = pack_filter.getHandleRes(); - const RSResult expected_handle_res = handle_res[next_pack_id]; - auto & use_packs = pack_filter.getUsePacks(); - size_t start_pack_id = next_pack_id; - const auto & pack_stats = dmfile->getPackStats(); - size_t read_rows = 0; - for (; next_pack_id < use_packs.size() && use_packs[next_pack_id] && read_rows < rows_threshold_per_read; - ++next_pack_id) - { - if (read_pack_limit != 0 && next_pack_id - start_pack_id >= read_pack_limit) - break; - if (enable_handle_clean_read && handle_res[next_pack_id] != expected_handle_res) - break; - - read_rows += pack_stats[next_pack_id].rows; - } - + // Only move forward the next_pack_id and next_row_offset, but don't read any data. + auto [read_rows, cr_on_normal_mode, cr_on_handle_on_fast_mode, cr_on_del_on_fast_mode] = getReadRows(); + (void)cr_on_normal_mode, (void)cr_on_handle_on_fast_mode, (void)cr_on_del_on_fast_mode; addSkippedRows(read_rows); next_row_offset += read_rows; - // When we read dmfile, if the previous pack is not read, // then we should seek to the right offset of dmfile. - // So if skip some packs successfully, - // then we set the last pack to false to indicate that we should seek before read. - if (likely(read_rows > 0)) - use_packs[next_pack_id - 1] = false; - + std::fill(must_skip_next_read.begin(), must_skip_next_read.end(), true); scan_context->late_materialization_skip_rows += read_rows; return read_rows; } @@ -294,119 +262,90 @@ bool DMFileReader::isCacheableColumn(const ColumnDefine & cd) return cd.id == EXTRA_HANDLE_COLUMN_ID || cd.id == VERSION_COLUMN_ID; } -Block DMFileReader::read() +// +std::tuple DMFileReader::getReadRows() { - Stopwatch watch; - SCOPE_EXIT(scan_context->total_dmfile_read_time_ns += watch.elapsed();); - // Go to next available pack. size_t skip_rows; - getSkippedRows(skip_rows); - const auto & use_packs = pack_filter.getUsePacksConst(); + const auto & use_packs = pack_filter.getUsePacksConst(); if (next_pack_id >= use_packs.size()) - return {}; + return {0, false, false, false}; + // Find max continuing rows we can read. size_t start_pack_id = next_pack_id; - size_t start_row_offset = next_row_offset; // When read_one_pack_every_time is true, we can just read one pack every time. - // 0 means no limit - size_t read_pack_limit = read_one_pack_every_time ? 1 : 0; + // std::numeric_limits::max() means no limit + size_t read_pack_limit = read_one_pack_every_time ? 1 : std::numeric_limits::max(); const auto & pack_stats = dmfile->getPackStats(); - const auto & pack_properties = dmfile->getPackProperties(); size_t read_rows = 0; - size_t not_clean_rows = 0; - size_t deleted_rows = 0; + // Clean read optimize + bool do_clean_read_on_normal_mode = enable_handle_clean_read && !is_fast_scan; + bool do_clean_read_on_handle_on_fast_mode = enable_handle_clean_read && is_fast_scan; + bool do_clean_read_on_del_on_fast_mode = enable_del_clean_read && is_fast_scan; - const std::vector & handle_res = pack_filter.getHandleRes(); // alias of handle_res in pack_filter + const auto & handle_res = pack_filter.getHandleRes(); // alias of handle_res in pack_filter RSResult expected_handle_res = handle_res[next_pack_id]; + do_clean_read_on_normal_mode = do_clean_read_on_normal_mode && (expected_handle_res == All); + do_clean_read_on_handle_on_fast_mode = do_clean_read_on_handle_on_fast_mode && (expected_handle_res == All); for (; next_pack_id < use_packs.size() && use_packs[next_pack_id] && read_rows < rows_threshold_per_read; ++next_pack_id) { - if (read_pack_limit != 0 && next_pack_id - start_pack_id >= read_pack_limit) + if (next_pack_id - start_pack_id >= read_pack_limit) break; - if (enable_handle_clean_read && handle_res[next_pack_id] != expected_handle_res) + // If try to do clean read on handle column, then handle_res of all read packs should be All. + if ((do_clean_read_on_normal_mode || do_clean_read_on_handle_on_fast_mode) && handle_res[next_pack_id] != All) break; read_rows += pack_stats[next_pack_id].rows; - not_clean_rows += pack_stats[next_pack_id].not_clean; + do_clean_read_on_normal_mode = do_clean_read_on_normal_mode && (pack_stats[next_pack_id].not_clean == 0) + && (pack_filter.getMaxVersion(next_pack_id) <= max_read_version); // Because deleted_rows is a new field in pack_properties, we need to check whehter this pack has this field. // If this pack doesn't have this field, then we can't know whether this pack contains deleted rows. - // Thus we just deleted_rows += 1, to make sure we will not do the optimization with del column(just to make deleted_rows != 0). - if (static_cast(pack_properties.property_size()) > next_pack_id - && pack_properties.property(next_pack_id).has_deleted_rows()) - { - deleted_rows += pack_properties.property(next_pack_id).deleted_rows(); - } - else - { - deleted_rows += 1; - } + // So only when this pack has this field and deleted_rows is 0, we can do clean read on del column. + do_clean_read_on_del_on_fast_mode = do_clean_read_on_del_on_fast_mode + && (static_cast(pack_properties.property_size()) > next_pack_id) + && (pack_properties.property(next_pack_id).has_deleted_rows()) + && (pack_properties.property(next_pack_id).deleted_rows() == 0); } - next_row_offset += read_rows; + return { + read_rows, + do_clean_read_on_normal_mode, + do_clean_read_on_handle_on_fast_mode, + do_clean_read_on_del_on_fast_mode, + }; +} + +Block DMFileReader::read() +{ + Stopwatch watch; + SCOPE_EXIT(scan_context->total_dmfile_read_time_ns += watch.elapsed();); + + size_t start_pack_id = next_pack_id; + auto [read_rows, cr_on_normal_mode, cr_on_handle_on_fast_mode, cr_on_del_on_fast_mode] = getReadRows(); if (read_rows == 0) return {}; Block res; - res.setStartOffset(start_row_offset); + res.setStartOffset(next_row_offset); + next_row_offset += read_rows; size_t read_packs = next_pack_id - start_pack_id; - addScannedRows(read_rows); - // TODO: this will need better algorithm: we should separate those packs which can and can not do clean read. - bool do_clean_read_on_normal_mode - = enable_handle_clean_read && expected_handle_res == All && not_clean_rows == 0 && (!is_fast_scan); - - bool do_clean_read_on_handle_on_fast_mode = enable_handle_clean_read && is_fast_scan && expected_handle_res == All; - bool do_clean_read_on_del_on_fast_mode = enable_del_clean_read && is_fast_scan && deleted_rows == 0; - - if (do_clean_read_on_normal_mode) - { - UInt64 max_version = 0; - for (size_t pack_id = start_pack_id; pack_id < next_pack_id; ++pack_id) - max_version = std::max(pack_filter.getMaxVersion(pack_id), max_version); - do_clean_read_on_normal_mode = max_version <= max_read_version; - } - + const auto & pack_stats = dmfile->getPackStats(); for (size_t i = 0; i < read_columns.size(); ++i) { try { // For clean read of column pk, version, tag, instead of loading data from disk, just create placeholder column is OK. const auto & cd = read_columns[i]; - if (cd.id == EXTRA_HANDLE_COLUMN_ID && do_clean_read_on_handle_on_fast_mode) - { - // Return the first row's handle - ColumnPtr column; - if (is_common_handle) - { - StringRef min_handle = pack_filter.getMinStringHandle(start_pack_id); - column = cd.type->createColumnConst(read_rows, Field(min_handle.data, min_handle.size)); - } - else - { - Handle min_handle = pack_filter.getMinHandle(start_pack_id); - column = cd.type->createColumnConst(read_rows, Field(min_handle)); - } - res.insert(ColumnWithTypeAndName{std::move(column), cd.type, cd.name, cd.id}); - skip_packs_by_column[i] = read_packs; - } - else if (cd.id == TAG_COLUMN_ID && do_clean_read_on_del_on_fast_mode) - { - ColumnPtr column = cd.type->createColumnConst( - read_rows, - Field(static_cast(pack_stats[start_pack_id].first_tag))); - res.insert(ColumnWithTypeAndName{std::move(column), cd.type, cd.name, cd.id}); - - skip_packs_by_column[i] = read_packs; - } - else if (do_clean_read_on_normal_mode && isExtraColumn(cd)) + if ((cr_on_normal_mode || cr_on_handle_on_fast_mode || cr_on_del_on_fast_mode) && isExtraColumn(cd)) { ColumnPtr column; if (cd.id == EXTRA_HANDLE_COLUMN_ID) @@ -435,21 +374,18 @@ Block DMFileReader::read() } res.insert(ColumnWithTypeAndName{column, cd.type, cd.name, cd.id}); - - skip_packs_by_column[i] = read_packs; + must_skip_next_read[i] = true; } else { const auto stream_name = DMFile::getFileNameBase(cd.id); if (auto iter = column_streams.find(stream_name); iter != column_streams.end()) { + auto data_type = dmfile->getColumnStat(cd.id).type; + ColumnPtr column; if (enable_column_cache && isCacheableColumn(cd)) { auto read_strategy = column_cache->getReadStrategy(start_pack_id, read_packs, cd.id); - - auto data_type = dmfile->getColumnStat(cd.id).type; - auto column = data_type->createColumn(); - column->reserve(read_rows); for (auto & [range, strategy] : read_strategy) { if (strategy == ColumnCache::Strategy::Memory) @@ -457,12 +393,9 @@ Block DMFileReader::read() for (size_t cursor = range.first; cursor < range.second; cursor++) { auto cache_element = column_cache->getColumn(cursor, cd.id); - column->insertRangeFrom( - *(cache_element.first), - cache_element.second.first, - cache_element.second.second); + column = cache_element.first; } - skip_packs_by_column[i] += (range.second - range.first); + must_skip_next_read[i] = true; } else if (strategy == ColumnCache::Strategy::Disk) { @@ -471,53 +404,34 @@ Block DMFileReader::read() { rows_count += pack_stats[cursor].rows; } - ColumnPtr col; - readColumn( - cd, - col, - range.first, - range.second - range.first, - rows_count, - skip_packs_by_column[i]); - column->insertRangeFrom(*col, 0, col->size()); - skip_packs_by_column[i] = 0; + readColumn(cd, column, range.first, range.second - range.first, rows_count, i); } else { throw Exception("Unknown strategy", ErrorCodes::LOGICAL_ERROR); } } - ColumnPtr result_column = std::move(column); size_t rows_offset = 0; for (size_t cursor = start_pack_id; cursor < start_pack_id + read_packs; cursor++) { - column_cache - ->tryPutColumn(cursor, cd.id, result_column, rows_offset, pack_stats[cursor].rows); + column_cache->tryPutColumn(cursor, cd.id, column, rows_offset, pack_stats[cursor].rows); rows_offset += pack_stats[cursor].rows; } - // Cast column's data from DataType in disk to what we need now - auto converted_column - = convertColumnByColumnDefineIfNeed(data_type, std::move(result_column), cd); - res.insert(ColumnWithTypeAndName{converted_column, cd.type, cd.name, cd.id}); } else { - auto data_type = dmfile->getColumnStat(cd.id).type; - ColumnPtr column; - readColumn(cd, column, start_pack_id, read_packs, read_rows, skip_packs_by_column[i]); - auto converted_column = convertColumnByColumnDefineIfNeed(data_type, std::move(column), cd); - - res.insert(ColumnWithTypeAndName{std::move(converted_column), cd.type, cd.name, cd.id}); - skip_packs_by_column[i] = 0; + readColumn(cd, column, start_pack_id, read_packs, read_rows, i); } + // Cast column's data from DataType in disk to what we need now + auto converted_column = convertColumnByColumnDefineIfNeed(data_type, std::move(column), cd); + res.insert(ColumnWithTypeAndName{std::move(converted_column), cd.type, cd.name, cd.id}); } else { // New column after ddl is not exist in this DMFile, fill with default value ColumnPtr column = createColumnWithDefaultValue(cd, read_rows); - res.insert(ColumnWithTypeAndName{std::move(column), cd.type, cd.name, cd.id}); - skip_packs_by_column[i] = 0; + must_skip_next_read[i] = false; } } } @@ -535,7 +449,6 @@ void DMFileReader::readFromDisk( MutableColumnPtr & column, size_t start_pack_id, size_t read_rows, - size_t skip_packs, bool force_seek) { const auto stream_name = DMFile::getFileNameBase(column_define.id); @@ -549,7 +462,7 @@ void DMFileReader::readFromDisk( #endif auto & top_stream = iter->second; - bool should_seek = force_seek || shouldSeek(start_pack_id) || skip_packs > 0; + bool should_seek = force_seek || shouldSeek(start_pack_id); auto data_type = dmfile->getColumnStat(column_define.id).type; data_type->deserializeBinaryBulkWithMultipleStreams( // *column, @@ -578,7 +491,7 @@ void DMFileReader::readColumn( size_t start_pack_id, size_t pack_count, size_t read_rows, - size_t skip_packs) + size_t column_index) { bool has_concurrent_reader = DMFileReaderPool::instance().hasConcurrentReader(*this); if (!getCachedPacks(column_define.id, start_pack_id, pack_count, read_rows, column)) @@ -591,13 +504,13 @@ void DMFileReader::readColumn( = has_concurrent_reader ? std::make_optional(true, nullptr) : std::nullopt; auto data_type = dmfile->getColumnStat(column_define.id).type; auto col = data_type->createColumn(); - readFromDisk(column_define, col, start_pack_id, read_rows, skip_packs, last_read_from_cache[column_define.id]); + readFromDisk(column_define, col, start_pack_id, read_rows, must_skip_next_read[column_index]); column = std::move(col); - last_read_from_cache[column_define.id] = false; + must_skip_next_read[column_index] = false; } else { - last_read_from_cache[column_define.id] = true; + must_skip_next_read[column_index] = true; } if (has_concurrent_reader && col_data_cache != nullptr) diff --git a/dbms/src/Storages/DeltaMerge/File/DMFileReader.h b/dbms/src/Storages/DeltaMerge/File/DMFileReader.h index 34bed3eaea5..2b249261bb9 100644 --- a/dbms/src/Storages/DeltaMerge/File/DMFileReader.h +++ b/dbms/src/Storages/DeltaMerge/File/DMFileReader.h @@ -96,13 +96,13 @@ class DMFileReader private: bool shouldSeek(size_t pack_id) const; + std::tuple getReadRows(); void readFromDisk( const ColumnDefine & column_define, MutableColumnPtr & column, size_t start_pack_id, size_t read_rows, - size_t skip_packs, bool force_seek); void readColumn( const ColumnDefine & column_define, @@ -110,7 +110,7 @@ class DMFileReader size_t start_pack_id, size_t pack_count, size_t read_rows, - size_t skip_packs); + size_t column_index); bool getCachedPacks(ColId col_id, size_t start_pack_id, size_t pack_count, size_t read_rows, ColumnPtr & col) const; void addScannedRows(UInt64 rows); @@ -141,7 +141,7 @@ class DMFileReader /// Filters DMFilePackFilter pack_filter; - std::vector skip_packs_by_column{}; + std::vector must_skip_next_read; /// Caches MarkCachePtr mark_cache; @@ -156,12 +156,10 @@ class DMFileReader size_t next_row_offset = 0; FileProviderPtr file_provider; - LoggerPtr log; // DataSharing std::unique_ptr col_data_cache{}; - std::unordered_map last_read_from_cache{}; }; } // namespace DB::DM