diff --git a/dbms/CMakeLists.txt b/dbms/CMakeLists.txt index eef01dbfc55..33a0ee94745 100644 --- a/dbms/CMakeLists.txt +++ b/dbms/CMakeLists.txt @@ -76,6 +76,7 @@ add_headers_and_sources(dbms src/Interpreters/ClusterProxy) add_headers_and_sources(dbms src/Columns) add_headers_and_sources(dbms src/Storages) add_headers_and_sources(dbms src/Storages/DeltaMerge) +add_headers_and_sources(dbms src/Storages/DeltaMerge/BitmapFilter) add_headers_and_sources(dbms src/Storages/DeltaMerge/Index) add_headers_and_sources(dbms src/Storages/DeltaMerge/Filter) add_headers_and_sources(dbms src/Storages/DeltaMerge/FilterParser) diff --git a/dbms/src/Common/CurrentMetrics.cpp b/dbms/src/Common/CurrentMetrics.cpp index 4073f80f2db..06412649922 100644 --- a/dbms/src/Common/CurrentMetrics.cpp +++ b/dbms/src/Common/CurrentMetrics.cpp @@ -55,6 +55,7 @@ M(DT_SnapshotOfDeltaMerge) \ M(DT_SnapshotOfDeltaCompact) \ M(DT_SnapshotOfPlaceIndex) \ + M(DT_SnapshotOfBitmapFilter) \ M(IOLimiterPendingBgWriteReq) \ M(IOLimiterPendingFgWriteReq) \ M(IOLimiterPendingBgReadReq) \ diff --git a/dbms/src/Core/Block.cpp b/dbms/src/Core/Block.cpp index a2501e8c609..6f6ce974137 100644 --- a/dbms/src/Core/Block.cpp +++ b/dbms/src/Core/Block.cpp @@ -269,7 +269,7 @@ size_t Block::rows() const if (elem.column) return elem.column->size(); - return 0; + return segment_row_id_col != nullptr ? segment_row_id_col->size() : 0; } diff --git a/dbms/src/Core/Block.h b/dbms/src/Core/Block.h index f7c3a786c8d..e9b8619fcb1 100644 --- a/dbms/src/Core/Block.h +++ b/dbms/src/Core/Block.h @@ -45,6 +45,13 @@ class Block Container data; IndexByName index_by_name; + // `start_offset` is the offset of first row in this Block. + // It is used for calculating `segment_row_id`. + UInt64 start_offset = 0; + // `segment_row_id_col` is a virtual column that represents the records' row id in the corresponding segment. + // Only used for calculating MVCC-bitmap-filter. + ColumnPtr segment_row_id_col; + public: BlockInfo info; @@ -110,8 +117,8 @@ class Block /// Approximate number of allocated bytes in memory - for profiling and limits. size_t allocatedBytes() const; - explicit operator bool() const { return !data.empty(); } - bool operator!() const { return data.empty(); } + explicit operator bool() const { return !data.empty() || segment_row_id_col != nullptr; } + bool operator!() const { return data.empty() && segment_row_id_col == nullptr; } /** Get a list of column names separated by commas. */ std::string dumpNames() const; @@ -149,6 +156,10 @@ class Block */ void updateHash(SipHash & hash) const; + void setStartOffset(UInt64 offset) { start_offset = offset; } + UInt64 startOffset() const { return start_offset; } + void setSegmentRowIdCol(ColumnPtr && col) { segment_row_id_col = col; } + ColumnPtr segmentRowIdCol() const { return segment_row_id_col; } private: void eraseImpl(size_t position); diff --git a/dbms/src/Interpreters/Settings.h b/dbms/src/Interpreters/Settings.h index be64319f516..050bdacda7f 100644 --- a/dbms/src/Interpreters/Settings.h +++ b/dbms/src/Interpreters/Settings.h @@ -249,6 +249,7 @@ struct Settings \ M(SettingDouble, dt_page_gc_threshold, 0.5, "Max valid rate of deciding to do a GC in PageStorage") \ M(SettingBool, dt_enable_read_thread, true, "Enable storage read thread or not") \ + M(SettingBool, dt_enable_bitmap_filter, true, "Use bitmap filter to read data or not") \ \ M(SettingChecksumAlgorithm, dt_checksum_algorithm, ChecksumAlgo::XXH3, "Checksum algorithm for delta tree stable storage") \ M(SettingCompressionMethod, dt_compression_method, CompressionMethod::LZ4, "The method of data compression when writing.") \ diff --git a/dbms/src/Storages/DeltaMerge/BitmapFilter/BitmapFilter.cpp b/dbms/src/Storages/DeltaMerge/BitmapFilter/BitmapFilter.cpp new file mode 100644 index 00000000000..eb76380fd6c --- /dev/null +++ b/dbms/src/Storages/DeltaMerge/BitmapFilter/BitmapFilter.cpp @@ -0,0 +1,120 @@ +// Copyright 2022 PingCAP, Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include +#include +#include + +namespace DB::DM +{ +BitmapFilter::BitmapFilter(UInt32 size_, bool default_value) + : filter(size_, default_value) + , all_match(default_value) +{} + +void BitmapFilter::set(BlockInputStreamPtr & stream) +{ + stream->readPrefix(); + for (;;) + { + FilterPtr f = nullptr; + auto blk = stream->read(f, /*res_filter*/ true); + if (likely(blk)) + { + set(blk.segmentRowIdCol(), f); + } + else + { + break; + } + } + stream->readSuffix(); +} + +void BitmapFilter::set(const ColumnPtr & col, const FilterPtr & f) +{ + const auto * v = toColumnVectorDataPtr(col); + set(v->data(), v->size(), f); +} + +void BitmapFilter::set(const UInt32 * data, UInt32 size, const FilterPtr & f) +{ + if (size == 0) + { + return; + } + if (!f) + { + for (UInt32 i = 0; i < size; i++) + { + UInt32 row_id = *(data + i); + filter[row_id] = true; + } + } + else + { + RUNTIME_CHECK(size == f->size(), size, f->size()); + for (UInt32 i = 0; i < size; i++) + { + UInt32 row_id = *(data + i); + filter[row_id] = (*f)[i]; + } + } +} + +void BitmapFilter::set(UInt32 start, UInt32 limit) +{ + RUNTIME_CHECK(start + limit <= filter.size(), start, limit, filter.size()); + std::fill(filter.begin() + start, filter.begin() + start + limit, true); +} + +bool BitmapFilter::get(IColumn::Filter & f, UInt32 start, UInt32 limit) const +{ + RUNTIME_CHECK(start + limit <= filter.size(), start, limit, filter.size()); + auto begin = filter.cbegin() + start; + auto end = filter.cbegin() + start + limit; + if (all_match || std::find(begin, end, false) == end) + { + return true; + } + else + { + std::copy(begin, end, f.begin()); + return false; + } +} + +void BitmapFilter::runOptimize() +{ + all_match = std::find(filter.begin(), filter.end(), false) == filter.end(); +} + +String BitmapFilter::toDebugString() const +{ + String s(filter.size(), '1'); + for (UInt32 i = 0; i < filter.size(); i++) + { + if (!filter[i]) + { + s[i] = '0'; + } + } + return fmt::format("{}", s); +} + +size_t BitmapFilter::count() const +{ + return std::count(filter.cbegin(), filter.cend(), true); +} +} // namespace DB::DM \ No newline at end of file diff --git a/dbms/src/Storages/DeltaMerge/BitmapFilter/BitmapFilter.h b/dbms/src/Storages/DeltaMerge/BitmapFilter/BitmapFilter.h new file mode 100644 index 00000000000..540759ade4a --- /dev/null +++ b/dbms/src/Storages/DeltaMerge/BitmapFilter/BitmapFilter.h @@ -0,0 +1,46 @@ +// Copyright 2022 PingCAP, Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +#include +#include + +namespace DB::DM +{ + +class BitmapFilter +{ +public: + BitmapFilter(UInt32 size_, bool default_value); + + void set(BlockInputStreamPtr & stream); + void set(const ColumnPtr & col, const FilterPtr & f); + void set(const UInt32 * data, UInt32 size, const FilterPtr & f); + void set(UInt32 start, UInt32 limit); + // If return true, all data is match and do not fill the filter. + bool get(IColumn::Filter & f, UInt32 start, UInt32 limit) const; + + void runOptimize(); + + String toDebugString() const; + size_t count() const; + +private: + std::vector filter; + bool all_match; +}; + +using BitmapFilterPtr = std::shared_ptr; +} // namespace DB::DM \ No newline at end of file diff --git a/dbms/src/Storages/DeltaMerge/BitmapFilter/BitmapFilterBlockInputStream.cpp b/dbms/src/Storages/DeltaMerge/BitmapFilter/BitmapFilterBlockInputStream.cpp new file mode 100644 index 00000000000..2814e14b03e --- /dev/null +++ b/dbms/src/Storages/DeltaMerge/BitmapFilter/BitmapFilterBlockInputStream.cpp @@ -0,0 +1,99 @@ +// Copyright 2022 PingCAP, Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include +#include +#include + +namespace DB::DM +{ +BitmapFilterBlockInputStream::BitmapFilterBlockInputStream( + const ColumnDefines & columns_to_read, + BlockInputStreamPtr stable_, + BlockInputStreamPtr delta_, + size_t stable_rows_, + const BitmapFilterPtr & bitmap_filter_, + const String & req_id_) + : header(toEmptyBlock(columns_to_read)) + , stable(stable_) + , delta(delta_) + , stable_rows(stable_rows_) + , bitmap_filter(bitmap_filter_) + , log(Logger::get(NAME, req_id_)) +{} + +Block BitmapFilterBlockInputStream::readImpl(FilterPtr & res_filter, bool return_filter) +{ + auto [block, from_delta] = readBlock(); + if (block) + { + if (from_delta) + { + block.setStartOffset(block.startOffset() + stable_rows); + } + + filter.resize(block.rows()); + bool all_match = bitmap_filter->get(filter, block.startOffset(), block.rows()); + if (!all_match) + { + if (return_filter) + { + res_filter = &filter; + } + else + { + for (auto & col : block) + { + col.column = col.column->filter(filter, block.rows()); + } + } + } + else + { + res_filter = nullptr; + } + } + return block; +} + +// +std::pair BitmapFilterBlockInputStream::readBlock() +{ + if (stable == nullptr && delta == nullptr) + { + return {{}, false}; + } + + if (stable == nullptr) + { + return {delta->read(), true}; + } + + auto block = stable->read(); + if (block) + { + return {block, false}; + } + else + { + stable = nullptr; + if (delta != nullptr) + { + block = delta->read(); + } + return {block, true}; + } +} + +} // namespace DB::DM diff --git a/dbms/src/Storages/DeltaMerge/BitmapFilter/BitmapFilterBlockInputStream.h b/dbms/src/Storages/DeltaMerge/BitmapFilter/BitmapFilterBlockInputStream.h new file mode 100644 index 00000000000..fc8125e70b1 --- /dev/null +++ b/dbms/src/Storages/DeltaMerge/BitmapFilter/BitmapFilterBlockInputStream.h @@ -0,0 +1,64 @@ +// Copyright 2022 PingCAP, Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +#include +#include +#include +#include + +namespace DB::DM +{ +class BitmapFilterBlockInputStream : public IProfilingBlockInputStream +{ + static constexpr auto NAME = "BitmapFilterBlockInputStream"; + +public: + BitmapFilterBlockInputStream( + const ColumnDefines & columns_to_read, + BlockInputStreamPtr stable_, + BlockInputStreamPtr delta_, + size_t stable_rows_, + const BitmapFilterPtr & bitmap_filter_, + const String & req_id_); + + String getName() const override { return NAME; } + + Block getHeader() const override { return header; } + +protected: + Block readImpl() override + { + FilterPtr filter_ignored; + return readImpl(filter_ignored, false); + } + // When all rows in block are not filtered out, + // `res_filter` will be set to null. + // The caller needs to do handle this situation. + Block readImpl(FilterPtr & res_filter, bool return_filter) override; + +private: + std::pair readBlock(); + + Block header; + BlockInputStreamPtr stable; + BlockInputStreamPtr delta; + size_t stable_rows; + BitmapFilterPtr bitmap_filter; + const LoggerPtr log; + IColumn::Filter filter{}; +}; + +} // namespace DB::DM diff --git a/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFile.cpp b/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFile.cpp index 0daf8cb4659..2530a727a5e 100644 --- a/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFile.cpp +++ b/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFile.cpp @@ -29,7 +29,7 @@ namespace DM /// ====================================================== /// Helper methods. /// ====================================================== -size_t copyColumnsData( +std::pair copyColumnsData( const Columns & from, const ColumnPtr & pk_col, MutableColumns & to, @@ -46,11 +46,11 @@ size_t copyColumnsData( { for (size_t col_index = 0; col_index < to.size(); ++col_index) to[col_index]->insertFrom(*from[col_index], rows_offset); - return 1; + return {rows_offset, 1}; } else { - return 0; + return {rows_offset, 0}; } } else @@ -58,7 +58,7 @@ size_t copyColumnsData( auto [actual_offset, actual_limit] = RowKeyFilter::getPosRangeOfSorted(*range, pk_col, rows_offset, rows_limit); for (size_t col_index = 0; col_index < to.size(); ++col_index) to[col_index]->insertRangeFrom(*from[col_index], actual_offset, actual_limit); - return actual_limit; + return {actual_offset, actual_limit}; } } else @@ -73,7 +73,7 @@ size_t copyColumnsData( for (size_t col_index = 0; col_index < to.size(); ++col_index) to[col_index]->insertRangeFrom(*from[col_index], rows_offset, rows_limit); } - return rows_limit; + return {rows_offset, rows_limit}; } } diff --git a/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFile.h b/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFile.h index fee160dbb01..cb8f007756e 100644 --- a/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFile.h +++ b/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFile.h @@ -151,7 +151,8 @@ class ColumnFileReader /// Read data from this reader and store the result into output_cols. /// Note that if "range" is specified, then the caller must guarantee that the rows between [rows_offset, rows_offset + rows_limit) are sorted. - virtual size_t readRows(MutableColumns & /*output_cols*/, size_t /*rows_offset*/, size_t /*rows_limit*/, const RowKeyRange * /*range*/) + /// Returns + virtual std::pair readRows(MutableColumns & /*output_cols*/, size_t /*rows_offset*/, size_t /*rows_limit*/, const RowKeyRange * /*range*/) { throw Exception("Unsupported operation", ErrorCodes::LOGICAL_ERROR); } @@ -163,7 +164,7 @@ class ColumnFileReader virtual ColumnFileReaderPtr createNewReader(const ColumnDefinesPtr & col_defs) = 0; }; -size_t copyColumnsData( +std::pair copyColumnsData( const Columns & from, const ColumnPtr & pk_col, MutableColumns & to, diff --git a/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileBig.cpp b/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileBig.cpp index e82a061e94b..9cfbafb1ad4 100644 --- a/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileBig.cpp +++ b/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileBig.cpp @@ -111,7 +111,7 @@ void ColumnFileBigReader::initStream() } } -size_t ColumnFileBigReader::readRowsRepeatedly(MutableColumns & output_cols, size_t rows_offset, size_t rows_limit, const RowKeyRange * range) +std::pair ColumnFileBigReader::readRowsRepeatedly(MutableColumns & output_cols, size_t rows_offset, size_t rows_limit, const RowKeyRange * range) { if (unlikely(rows_offset + rows_limit > column_file.valid_rows)) throw Exception("Try to read more rows", ErrorCodes::LOGICAL_ERROR); @@ -121,7 +121,7 @@ size_t ColumnFileBigReader::readRowsRepeatedly(MutableColumns & output_cols, siz auto [start_block_index, rows_start_in_start_block] = locatePosByAccumulation(cached_block_rows_end, rows_offset); auto [end_block_index, rows_end_in_end_block] = locatePosByAccumulation(cached_block_rows_end, // rows_offset + rows_limit); - + size_t actual_offset = 0; size_t actual_read = 0; for (size_t block_index = start_block_index; block_index < cached_pk_ver_columns.size() && block_index <= end_block_index; ++block_index) @@ -138,15 +138,23 @@ size_t ColumnFileBigReader::readRowsRepeatedly(MutableColumns & output_cols, siz const auto & columns = cached_pk_ver_columns.at(block_index); const auto & pk_column = columns[0]; - actual_read += copyColumnsData(columns, pk_column, output_cols, rows_start_in_block, rows_in_block_limit, range); + auto [offset, rows] = copyColumnsData(columns, pk_column, output_cols, rows_start_in_block, rows_in_block_limit, range); + // For DMFile, records are sorted by row key. Only the prefix blocks and the suffix blocks will be filtered by range. + // It will read a continuous piece of data here. Update `actual_offset` after first successful read of data. + if (actual_read == 0 && rows > 0) + { + auto rows_before_block_index = block_index == 0 ? 0 : cached_block_rows_end[block_index - 1]; + actual_offset = rows_before_block_index + offset; + } + actual_read += rows; } - return actual_read; + return {actual_offset, actual_read}; } -size_t ColumnFileBigReader::readRowsOnce(MutableColumns & output_cols, // - size_t rows_offset, - size_t rows_limit, - const RowKeyRange * range) +std::pair ColumnFileBigReader::readRowsOnce(MutableColumns & output_cols, // + size_t rows_offset, + size_t rows_limit, + const RowKeyRange * range) { auto read_next_block = [&, this]() -> bool { rows_before_cur_block += (static_cast(cur_block)) ? cur_block.rows() : 0; @@ -169,6 +177,7 @@ size_t ColumnFileBigReader::readRowsOnce(MutableColumns & output_cols, // }; size_t rows_end = rows_offset + rows_limit; + size_t actual_offset = 0; size_t actual_read = 0; size_t read_offset = rows_offset; while (read_offset < rows_end) @@ -196,14 +205,21 @@ size_t ColumnFileBigReader::readRowsOnce(MutableColumns & output_cols, // auto read_start_in_block = read_offset - rows_before_cur_block; auto read_limit_in_block = read_end_for_cur_block - read_offset; - actual_read += copyColumnsData(cur_block_data, cur_block_data[0], output_cols, read_start_in_block, read_limit_in_block, range); + auto [offset, rows] = copyColumnsData(cur_block_data, cur_block_data[0], output_cols, read_start_in_block, read_limit_in_block, range); + // For DMFile, records are sorted by row key. Only the prefix blocks and the suffix blocks will be filtered by range. + // It will read a continuous piece of data here. Update `actual_offset` after first successful read of data. + if (actual_read == 0 && rows > 0) + { + actual_offset = rows_before_cur_block + offset; + } + actual_read += rows; read_offset += read_limit_in_block; cur_block_offset += read_limit_in_block; } - return actual_read; + return {actual_offset, actual_read}; } -size_t ColumnFileBigReader::readRows(MutableColumns & output_cols, size_t rows_offset, size_t rows_limit, const RowKeyRange * range) +std::pair ColumnFileBigReader::readRows(MutableColumns & output_cols, size_t rows_offset, size_t rows_limit, const RowKeyRange * range) { initStream(); diff --git a/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileBig.h b/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileBig.h index 0181834abfc..2ac97ee8b55 100644 --- a/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileBig.h +++ b/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileBig.h @@ -133,8 +133,8 @@ class ColumnFileBigReader : public ColumnFileReader private: void initStream(); - size_t readRowsRepeatedly(MutableColumns & output_cols, size_t rows_offset, size_t rows_limit, const RowKeyRange * range); - size_t readRowsOnce(MutableColumns & output_cols, size_t rows_offset, size_t rows_limit, const RowKeyRange * range); + std::pair readRowsRepeatedly(MutableColumns & output_cols, size_t rows_offset, size_t rows_limit, const RowKeyRange * range); + std::pair readRowsOnce(MutableColumns & output_cols, size_t rows_offset, size_t rows_limit, const RowKeyRange * range); public: ColumnFileBigReader(const DMContext & context_, const ColumnFileBig & column_file_, const ColumnDefinesPtr & col_defs_) @@ -162,7 +162,7 @@ class ColumnFileBigReader : public ColumnFileReader } } - size_t readRows(MutableColumns & output_cols, size_t rows_offset, size_t rows_limit, const RowKeyRange * range) override; + std::pair readRows(MutableColumns & output_cols, size_t rows_offset, size_t rows_limit, const RowKeyRange * range) override; Block readNextBlock() override; diff --git a/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileInMemory.cpp b/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileInMemory.cpp index 266b835ceb9..fff0d964f42 100644 --- a/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileInMemory.cpp +++ b/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileInMemory.cpp @@ -111,7 +111,7 @@ ColumnPtr ColumnFileInMemoryReader::getVersionColumn() return cols_data_cache[1]; } -size_t ColumnFileInMemoryReader::readRows(MutableColumns & output_cols, size_t rows_offset, size_t rows_limit, const RowKeyRange * range) +std::pair ColumnFileInMemoryReader::readRows(MutableColumns & output_cols, size_t rows_offset, size_t rows_limit, const RowKeyRange * range) { memory_file.fillColumns(*col_defs, output_cols.size(), cols_data_cache); diff --git a/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileInMemory.h b/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileInMemory.h index 7ef8dc5f98f..74c408efdb2 100644 --- a/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileInMemory.h +++ b/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileInMemory.h @@ -137,7 +137,7 @@ class ColumnFileInMemoryReader : public ColumnFileReader ColumnPtr getPKColumn(); ColumnPtr getVersionColumn(); - size_t readRows(MutableColumns & output_cols, size_t rows_offset, size_t rows_limit, const RowKeyRange * range) override; + std::pair readRows(MutableColumns & output_cols, size_t rows_offset, size_t rows_limit, const RowKeyRange * range) override; Block readNextBlock() override; diff --git a/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileSetReader.cpp b/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileSetReader.cpp index 34720f414c5..f428192519e 100644 --- a/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileSetReader.cpp +++ b/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileSetReader.cpp @@ -122,7 +122,7 @@ Block ColumnFileSetReader::readPKVersion(size_t offset, size_t limit) return block; } -size_t ColumnFileSetReader::readRows(MutableColumns & output_columns, size_t offset, size_t limit, const RowKeyRange * range) +size_t ColumnFileSetReader::readRows(MutableColumns & output_columns, size_t offset, size_t limit, const RowKeyRange * range, std::vector * row_ids) { // Note that DeltaMergeBlockInputStream could ask for rows with larger index than total_delta_rows, // because DeltaIndex::placed_rows could be larger than total_delta_rows. @@ -157,7 +157,19 @@ size_t ColumnFileSetReader::readRows(MutableColumns & output_columns, size_t off continue; auto & column_file_reader = column_file_readers[file_index]; - actual_read += column_file_reader->readRows(output_columns, rows_start_in_file, rows_in_file_limit, range); + auto [read_offset, read_rows] = column_file_reader->readRows(output_columns, rows_start_in_file, rows_in_file_limit, range); + actual_read += read_rows; + if (row_ids != nullptr) + { + auto rows_before_cur_file = file_index == 0 ? 0 : column_file_rows_end[file_index - 1]; + auto start_row_id = read_offset + rows_before_cur_file; + auto row_ids_offset = row_ids->size(); + row_ids->resize(row_ids->size() + read_rows); + for (size_t i = 0; i < read_rows; ++i) + { + (*row_ids)[row_ids_offset + i] = start_row_id + i; + } + } } return actual_read; } diff --git a/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileSetReader.h b/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileSetReader.h index 062269b69f6..dd7e6c6052c 100644 --- a/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileSetReader.h +++ b/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileSetReader.h @@ -55,7 +55,9 @@ class ColumnFileSetReader // Use for DeltaMergeBlockInputStream to read rows from MemTableSet to do full compaction with other layer. // This method will check whether offset and limit are valid. It only return those valid rows. - size_t readRows(MutableColumns & output_columns, size_t offset, size_t limit, const RowKeyRange * range); + // The returned rows is not continuous, since records may be filtered by `range`. When `row_ids` is not null, + // this function will fill corresponding offset of each row into `*row_ids`. + size_t readRows(MutableColumns & output_columns, size_t offset, size_t limit, const RowKeyRange * range, std::vector * row_ids = nullptr); void getPlaceItems(BlockOrDeletes & place_items, size_t rows_begin, size_t deletes_begin, size_t rows_end, size_t deletes_end, size_t place_rows_offset = 0); diff --git a/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileTiny.cpp b/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileTiny.cpp index 3dcdabf736a..576fe6ea2c0 100644 --- a/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileTiny.cpp +++ b/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileTiny.cpp @@ -224,7 +224,7 @@ ColumnPtr ColumnFileTinyReader::getVersionColumn() return cols_data_cache[1]; } -size_t ColumnFileTinyReader::readRows(MutableColumns & output_cols, size_t rows_offset, size_t rows_limit, const RowKeyRange * range) +std::pair ColumnFileTinyReader::readRows(MutableColumns & output_cols, size_t rows_offset, size_t rows_limit, const RowKeyRange * range) { tiny_file.fillColumns(storage_snap->log_reader, *col_defs, output_cols.size(), cols_data_cache); diff --git a/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileTiny.h b/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileTiny.h index 604b4fecfd6..4a2f494b712 100644 --- a/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileTiny.h +++ b/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileTiny.h @@ -174,7 +174,7 @@ class ColumnFileTinyReader : public ColumnFileReader ColumnPtr getPKColumn(); ColumnPtr getVersionColumn(); - size_t readRows(MutableColumns & output_cols, size_t rows_offset, size_t rows_limit, const RowKeyRange * range) override; + std::pair readRows(MutableColumns & output_cols, size_t rows_offset, size_t rows_limit, const RowKeyRange * range) override; Block readNextBlock() override; diff --git a/dbms/src/Storages/DeltaMerge/DMVersionFilterBlockInputStream.cpp b/dbms/src/Storages/DeltaMerge/DMVersionFilterBlockInputStream.cpp index 0ada7d3cac8..3eba8d16915 100644 --- a/dbms/src/Storages/DeltaMerge/DMVersionFilterBlockInputStream.cpp +++ b/dbms/src/Storages/DeltaMerge/DMVersionFilterBlockInputStream.cpp @@ -69,7 +69,7 @@ Block DMVersionFilterBlockInputStream::read(FilterPtr & res_filter, bool r ProfileEvents::increment(ProfileEvents::DMCleanReadRows, rows); - return getNewBlockByHeader(header, cur_raw_block); + return getNewBlock(cur_raw_block); } filter.resize(rows); @@ -393,23 +393,32 @@ Block DMVersionFilterBlockInputStream::read(FilterPtr & res_filter, bool r if (passed_count == rows) { ++complete_passed; - return getNewBlockByHeader(header, cur_raw_block); + return getNewBlock(cur_raw_block); } if (return_filter) { // The caller of this method should do the filtering, we just need to return the original block. res_filter = &filter; - return getNewBlockByHeader(header, cur_raw_block); + return getNewBlock(cur_raw_block); } else { Block res; - for (const auto & c : header) + if (cur_raw_block.segmentRowIdCol() == nullptr) { - auto & column = cur_raw_block.getByName(c.name); - column.column = column.column->filter(filter, passed_count); - res.insert(std::move(column)); + for (const auto & c : header) + { + auto & column = cur_raw_block.getByName(c.name); + column.column = column.column->filter(filter, passed_count); + res.insert(std::move(column)); + } + } + else + { + // `DMVersionFilterBlockInputStream` is the last stage for generating segment row id. + // In the way we use it, the other columns are not used subsequently. + res.setSegmentRowIdCol(cur_raw_block.segmentRowIdCol()->filter(filter, passed_count)); } return res; } diff --git a/dbms/src/Storages/DeltaMerge/DMVersionFilterBlockInputStream.h b/dbms/src/Storages/DeltaMerge/DMVersionFilterBlockInputStream.h index 6d4d6662a08..97a1a22da9a 100644 --- a/dbms/src/Storages/DeltaMerge/DMVersionFilterBlockInputStream.h +++ b/dbms/src/Storages/DeltaMerge/DMVersionFilterBlockInputStream.h @@ -199,6 +199,22 @@ class DMVersionFilterBlockInputStream : public IBlockInputStream return matched ? cur_version : std::numeric_limits::max(); } + Block getNewBlock(const Block & block) + { + if (block.segmentRowIdCol() == nullptr) + { + return getNewBlockByHeader(header, block); + } + else + { + // `DMVersionFilterBlockInputStream` is the last stage for generating segment row id. + // In the way we use it, the other columns are not used subsequently. + Block res; + res.setSegmentRowIdCol(block.segmentRowIdCol()); + return res; + } + } + private: const UInt64 version_limit; const bool is_common_handle; diff --git a/dbms/src/Storages/DeltaMerge/Delta/DeltaValueSpace.h b/dbms/src/Storages/DeltaMerge/Delta/DeltaValueSpace.h index 13a8fc6ac51..738144f8d78 100644 --- a/dbms/src/Storages/DeltaMerge/Delta/DeltaValueSpace.h +++ b/dbms/src/Storages/DeltaMerge/Delta/DeltaValueSpace.h @@ -399,7 +399,7 @@ class DeltaValueReader // Use for DeltaMergeBlockInputStream to read delta rows, and merge with stable rows. // This method will check whether offset and limit are valid. It only return those valid rows. - size_t readRows(MutableColumns & output_cols, size_t offset, size_t limit, const RowKeyRange * range); + size_t readRows(MutableColumns & output_cols, size_t offset, size_t limit, const RowKeyRange * range, std::vector * row_ids = nullptr); // Get blocks or delete_ranges of `ExtraHandleColumn` and `VersionColumn`. // If there are continuous blocks, they will be squashed into one block. @@ -420,6 +420,7 @@ class DeltaValueInputStream : public IBlockInputStream ColumnFileSetInputStream persisted_files_input_stream; bool persisted_files_done = false; + size_t read_rows = 0; public: DeltaValueInputStream(const DMContext & context_, @@ -434,6 +435,15 @@ class DeltaValueInputStream : public IBlockInputStream Block getHeader() const override { return persisted_files_input_stream.getHeader(); } Block read() override + { + auto block = doRead(); + block.setStartOffset(read_rows); + read_rows += block.rows(); + return block; + } + + // Read block from old to new. + Block doRead() { if (persisted_files_done) return mem_table_input_stream.read(); diff --git a/dbms/src/Storages/DeltaMerge/Delta/Snapshot.cpp b/dbms/src/Storages/DeltaMerge/Delta/Snapshot.cpp index ab26f570eb5..498c629f6fd 100644 --- a/dbms/src/Storages/DeltaMerge/Delta/Snapshot.cpp +++ b/dbms/src/Storages/DeltaMerge/Delta/Snapshot.cpp @@ -82,7 +82,7 @@ DeltaValueReaderPtr DeltaValueReader::createNewReader(const ColumnDefinesPtr & n return std::shared_ptr(new_reader); } -size_t DeltaValueReader::readRows(MutableColumns & output_cols, size_t offset, size_t limit, const RowKeyRange * range) +size_t DeltaValueReader::readRows(MutableColumns & output_cols, size_t offset, size_t limit, const RowKeyRange * range, std::vector * row_ids) { // Note that DeltaMergeBlockInputStream could ask for rows with larger index than total_delta_rows, // because DeltaIndex::placed_rows could be larger than total_delta_rows. @@ -104,10 +104,23 @@ size_t DeltaValueReader::readRows(MutableColumns & output_cols, size_t offset, s auto mem_table_end = offset + limit <= mem_table_rows_offset ? 0 : std::min(offset + limit - mem_table_rows_offset, total_delta_rows - mem_table_rows_offset); size_t actual_read = 0; + size_t persisted_read_rows = 0; if (persisted_files_start < persisted_files_end) - actual_read += persisted_files_reader->readRows(output_cols, persisted_files_start, persisted_files_end - persisted_files_start, range); + { + persisted_read_rows = persisted_files_reader->readRows(output_cols, persisted_files_start, persisted_files_end - persisted_files_start, range, row_ids); + actual_read += persisted_read_rows; + } if (mem_table_start < mem_table_end) - actual_read += mem_table_reader->readRows(output_cols, mem_table_start, mem_table_end - mem_table_start, range); + { + actual_read += mem_table_reader->readRows(output_cols, mem_table_start, mem_table_end - mem_table_start, range, row_ids); + } + + if (row_ids != nullptr) + { + std::transform(row_ids->cbegin() + persisted_read_rows, row_ids->cend(), + row_ids->begin() + persisted_read_rows, // write to the same location + [mem_table_rows_offset](UInt32 id) { return id + mem_table_rows_offset; }); + } return actual_read; } diff --git a/dbms/src/Storages/DeltaMerge/DeltaMerge.h b/dbms/src/Storages/DeltaMerge/DeltaMerge.h index ed7a7865c8f..cb1aa2fbad9 100644 --- a/dbms/src/Storages/DeltaMerge/DeltaMerge.h +++ b/dbms/src/Storages/DeltaMerge/DeltaMerge.h @@ -32,7 +32,7 @@ namespace DM /// Note that the columns in stable input stream and value space must exactly the same, including name, type, and id. /// The first column must be PK column. /// This class does not guarantee that the rows in the return blocks are filltered by range. -template +template class DeltaMergeBlockInputStream final : public SkippableBlockInputStream , Allocator { @@ -77,6 +77,7 @@ class DeltaMergeBlockInputStream final : public SkippableBlockInputStream Columns cur_stable_block_columns; size_t cur_stable_block_rows = 0; size_t cur_stable_block_pos = 0; + UInt64 cur_stable_block_start_offset = 0; bool stable_done = false; bool delta_done = false; @@ -90,13 +91,21 @@ class DeltaMergeBlockInputStream final : public SkippableBlockInputStream size_t last_handle_pos = 0; size_t last_handle_read_num = 0; + // Use for calculating MVCC-bitmap-filter when `need_row_id` is true. + ColumnUInt32::MutablePtr seg_row_id_col; + // `stable_rows` is the total rows of the underlying DMFiles, includes not valid rows. + UInt64 stable_rows; + // `delta_row_ids` is used to return the row id of delta. + std::vector delta_row_ids; + public: DeltaMergeBlockInputStream(const SkippableBlockInputStreamPtr & stable_input_stream_, const DeltaValueReaderPtr & delta_value_reader_, const IndexIterator & delta_index_start_, const IndexIterator & delta_index_end_, const RowKeyRange rowkey_range_, - size_t max_block_size_) + size_t max_block_size_, + UInt64 stable_rows_) : stable_input_stream(stable_input_stream_) , delta_value_reader(delta_value_reader_) , delta_index_it(delta_index_start_) @@ -105,6 +114,7 @@ class DeltaMergeBlockInputStream final : public SkippableBlockInputStream , is_common_handle(rowkey_range.is_common_handle) , rowkey_column_size(rowkey_range.rowkey_column_size) , max_block_size(max_block_size_) + , stable_rows(stable_rows_) { if constexpr (skippable_place) { @@ -214,6 +224,14 @@ class DeltaMergeBlockInputStream final : public SkippableBlockInputStream /// become invalid, so need to update last_value here last_value = last_value_ref.toRowKeyValue(); last_value_ref = last_value.toRowKeyValueRef(); + + if constexpr (need_row_id) + { + RUNTIME_CHECK_MSG(block.rows() == block.segmentRowIdCol()->size(), + "Build bitmap error: block.rows {} != segmentRowId.size() {}", + block.rows(), + block.segmentRowIdCol()->size()); + } } } @@ -247,7 +265,12 @@ class DeltaMergeBlockInputStream final : public SkippableBlockInputStream if (limit == max_block_size) continue; - return header.cloneWithColumns(std::move(columns)); + auto block = header.cloneWithColumns(std::move(columns)); + if constexpr (need_row_id) + { + block.setSegmentRowIdCol(std::move(seg_row_id_col)); + } + return block; } return {}; } @@ -255,6 +278,27 @@ class DeltaMergeBlockInputStream final : public SkippableBlockInputStream private: inline bool finished() { return stable_done && delta_done; } + inline void fillSegmentRowId(UInt64 start, UInt64 limit) + { + ColumnUInt32::Container & v = seg_row_id_col->getData(); + auto offset = v.size(); + v.resize(v.size() + limit); + for (UInt64 i = 0; i < limit; ++i) + { + v[offset + i] = start + i; + } + } + + inline void fillSegmentRowId(const std::vector & row_ids) + { + ColumnUInt32::Container & v = seg_row_id_col->getData(); + auto offset = v.size(); + v.resize(v.size() + row_ids.size()); + for (UInt32 i = 0; i < row_ids.size(); ++i) + { + v[offset + i] = row_ids[i] + stable_rows; + } + } template inline void next(MutableColumns & output_columns, size_t & output_write_limit) { @@ -310,11 +354,15 @@ class DeltaMergeBlockInputStream final : public SkippableBlockInputStream inline void initOutputColumns(MutableColumns & columns) { columns.resize(num_columns); - for (size_t i = 0; i < num_columns; ++i) { columns[i] = header.safeGetByPosition(i).column->cloneEmpty(); } + if constexpr (need_row_id) + { + seg_row_id_col = ColumnUInt32::create(); + seg_row_id_col->reserve(max_block_size); + } } inline size_t curStableBlockRemaining() { return cur_stable_block_rows - cur_stable_block_pos; } @@ -335,11 +383,13 @@ class DeltaMergeBlockInputStream final : public SkippableBlockInputStream cur_stable_block_columns.clear(); cur_stable_block_rows = 0; cur_stable_block_pos = 0; + cur_stable_block_start_offset = 0; auto block = stable_input_stream->read(); if (!block || !block.rows()) return false; cur_stable_block_rows = block.rows(); + cur_stable_block_start_offset = block.startOffset(); for (size_t column_id = 0; column_id < num_columns; ++column_id) cur_stable_block_columns.push_back(block.getByPosition(column_id).column); return true; @@ -474,6 +524,11 @@ class DeltaMergeBlockInputStream final : public SkippableBlockInputStream output_write_limit -= std::min(final_limit, output_write_limit); } + if constexpr (need_row_id) + { + fillSegmentRowId(final_offset + cur_stable_block_start_offset, final_limit); + } + cur_stable_block_pos += copy_rows; use_stable_rows -= copy_rows; } @@ -491,7 +546,18 @@ class DeltaMergeBlockInputStream final : public SkippableBlockInputStream // Note that the rows between [use_delta_offset, use_delta_offset + write_rows) are guaranteed sorted, // otherwise we won't read them in the same range. - auto actual_write = delta_value_reader->readRows(output_columns, use_delta_offset, write_rows, &rowkey_range); + size_t actual_write = 0; + if constexpr (need_row_id) + { + delta_row_ids.clear(); + delta_row_ids.reserve(write_rows); + actual_write = delta_value_reader->readRows(output_columns, use_delta_offset, write_rows, &rowkey_range, &delta_row_ids); + fillSegmentRowId(delta_row_ids); + } + else + { + actual_write = delta_value_reader->readRows(output_columns, use_delta_offset, write_rows, &rowkey_range); + } if constexpr (skippable_place) { diff --git a/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp b/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp index 4272affaad9..e54c63bf283 100644 --- a/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp +++ b/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp @@ -954,6 +954,19 @@ BlockInputStreams DeltaMergeStore::readRaw(const Context & db_context, return res; } +static inline ReadMode getReadMode(const Context & db_context, bool is_fast_scan, bool keep_order) +{ + if (is_fast_scan) + { + return ReadMode::Fast; + } + if (db_context.getSettingsRef().dt_enable_bitmap_filter && !keep_order) + { + return ReadMode::Bitmap; + } + return ReadMode::Normal; +} + BlockInputStreams DeltaMergeStore::read(const Context & db_context, const DB::Settings & db_settings, const ColumnDefines & columns_to_read, @@ -1000,7 +1013,7 @@ BlockInputStreams DeltaMergeStore::read(const Context & db_context, filter, max_version, expected_block_size, - /* read_mode = */ is_fast_scan ? ReadMode::Fast : ReadMode::Normal, + getReadMode(db_context, is_fast_scan, keep_order), std::move(tasks), after_segment_read, log_tracing_id, diff --git a/dbms/src/Storages/DeltaMerge/File/DMFileReader.cpp b/dbms/src/Storages/DeltaMerge/File/DMFileReader.cpp index e2be2e7a509..b07b8ce199d 100644 --- a/dbms/src/Storages/DeltaMerge/File/DMFileReader.cpp +++ b/dbms/src/Storages/DeltaMerge/File/DMFileReader.cpp @@ -301,6 +301,7 @@ bool DMFileReader::getSkippedRows(size_t & skip_rows) scan_context->total_dmfile_skipped_packs += 1; scan_context->total_dmfile_skipped_rows += pack_stats[next_pack_id].rows; } + next_row_offset += skip_rows; return next_pack_id < use_packs.size(); } @@ -330,6 +331,7 @@ Block DMFileReader::read() return {}; // Find max continuing rows we can read. size_t start_pack_id = next_pack_id; + size_t start_row_offset = next_row_offset; // When single_file_mode is true, or read_one_pack_every_time is true, we can just read one pack every time. // 0 means no limit size_t read_pack_limit = (single_file_mode || read_one_pack_every_time) ? 1 : 0; @@ -365,11 +367,13 @@ Block DMFileReader::read() deleted_rows += 1; } } + next_row_offset += read_rows; if (read_rows == 0) return {}; Block res; + res.setStartOffset(start_row_offset); size_t read_packs = next_pack_id - start_pack_id; diff --git a/dbms/src/Storages/DeltaMerge/File/DMFileReader.h b/dbms/src/Storages/DeltaMerge/File/DMFileReader.h index 5083f33da3b..5ebde116815 100644 --- a/dbms/src/Storages/DeltaMerge/File/DMFileReader.h +++ b/dbms/src/Storages/DeltaMerge/File/DMFileReader.h @@ -168,6 +168,7 @@ class DMFileReader const size_t rows_threshold_per_read; size_t next_pack_id = 0; + size_t next_row_offset = 0; FileProviderPtr file_provider; diff --git a/dbms/src/Storages/DeltaMerge/RowKeyFilter.h b/dbms/src/Storages/DeltaMerge/RowKeyFilter.h index 8f1a59326f4..bee76f567c2 100644 --- a/dbms/src/Storages/DeltaMerge/RowKeyFilter.h +++ b/dbms/src/Storages/DeltaMerge/RowKeyFilter.h @@ -53,6 +53,12 @@ inline Block cutBlock(Block && block, const std::vectorpopBack(pop_size); col.column = std::move(mutate_col); } + if (block.segmentRowIdCol() != nullptr) + { + auto mut_col = (*std::move(block.segmentRowIdCol())).mutate(); + mut_col->popBack(pop_size); + block.setSegmentRowIdCol(std::move(mut_col)); + } } else { @@ -60,12 +66,21 @@ inline Block cutBlock(Block && block, const std::vectorcut(offset, limit); } + if (block.segmentRowIdCol() != nullptr) + { + block.setSegmentRowIdCol(block.segmentRowIdCol()->cut(offset, limit)); + } } return std::move(block); } else { auto new_columns = block.cloneEmptyColumns(); + MutableColumnPtr new_seg_row_id_col; + if (block.segmentRowIdCol() != nullptr) + { + new_seg_row_id_col = block.segmentRowIdCol()->cloneEmpty(); + } for (const auto & [offset, limit] : offset_and_limits) { if (!limit) @@ -75,8 +90,14 @@ inline Block cutBlock(Block && block, const std::vectorinsertRangeFrom(*block.getByPosition(i).column, offset, limit); } + if (block.segmentRowIdCol() != nullptr) + { + new_seg_row_id_col->insertRangeFrom(*block.segmentRowIdCol(), offset, limit); + } } - return block.cloneWithColumns(std::move(new_columns)); + auto new_block = block.cloneWithColumns(std::move(new_columns)); + new_block.setSegmentRowIdCol(std::move(new_seg_row_id_col)); + return new_block; } } @@ -152,6 +173,10 @@ inline Block filterUnsorted(const RowKeyRanges & rowkey_ranges, Block && block, { col.column = col.column->filter(filter, passed_count); } + if (block.segmentRowIdCol() != nullptr) + { + block.setSegmentRowIdCol(block.segmentRowIdCol()->filter(filter, passed_count)); + } return std::move(block); } } // namespace RowKeyFilter diff --git a/dbms/src/Storages/DeltaMerge/Segment.cpp b/dbms/src/Storages/DeltaMerge/Segment.cpp index 21a38301039..1a5fac19bfe 100644 --- a/dbms/src/Storages/DeltaMerge/Segment.cpp +++ b/dbms/src/Storages/DeltaMerge/Segment.cpp @@ -20,6 +20,7 @@ #include #include #include +#include #include #include #include @@ -90,6 +91,7 @@ extern const Metric DT_SnapshotOfSegmentMerge; extern const Metric DT_SnapshotOfDeltaMerge; extern const Metric DT_SnapshotOfPlaceIndex; extern const Metric DT_SnapshotOfSegmentIngest; +extern const Metric DT_SnapshotOfBitmapFilter; } // namespace CurrentMetrics namespace DB @@ -428,7 +430,7 @@ bool Segment::isDefinitelyEmpty(DMContext & dm_context, const SegmentSnapshotPtr streams.push_back(stream); } - BlockInputStreamPtr stable_stream = std::make_shared(streams); + BlockInputStreamPtr stable_stream = std::make_shared>(streams); stable_stream = std::make_shared>(stable_stream, read_ranges, 0); stable_stream->readPrefix(); while (true) @@ -583,23 +585,35 @@ BlockInputStreamPtr Segment::getInputStream(const ReadMode & read_mode, { case ReadMode::Normal: return getInputStreamModeNormal(dm_context, columns_to_read, segment_snap, read_ranges, filter, max_version, expected_block_size); - break; case ReadMode::Fast: return getInputStreamModeFast(dm_context, columns_to_read, segment_snap, read_ranges, filter, expected_block_size); - break; case ReadMode::Raw: return getInputStreamModeRaw(dm_context, columns_to_read, segment_snap, read_ranges, expected_block_size); - break; + case ReadMode::Bitmap: + return getBitmapFilterInputStream(dm_context, columns_to_read, segment_snap, read_ranges, filter, max_version, expected_block_size); + default: + return nullptr; } } +bool Segment::useCleanRead(const SegmentSnapshotPtr & segment_snap, + const ColumnDefines & columns_to_read) +{ + return segment_snap->delta->getRows() == 0 // + && segment_snap->delta->getDeletes() == 0 // + && !hasColumn(columns_to_read, EXTRA_HANDLE_COLUMN_ID) // + && !hasColumn(columns_to_read, VERSION_COLUMN_ID) // + && !hasColumn(columns_to_read, TAG_COLUMN_ID); +} + BlockInputStreamPtr Segment::getInputStreamModeNormal(const DMContext & dm_context, const ColumnDefines & columns_to_read, const SegmentSnapshotPtr & segment_snap, const RowKeyRanges & read_ranges, const RSOperatorPtr & filter, UInt64 max_version, - size_t expected_block_size) + size_t expected_block_size, + bool need_row_id) { LOG_TRACE(log, "Begin segment create input stream"); @@ -631,10 +645,7 @@ BlockInputStreamPtr Segment::getInputStreamModeNormal(const DMContext & dm_conte expected_block_size, false); } - else if (segment_snap->delta->getRows() == 0 && segment_snap->delta->getDeletes() == 0 // - && !hasColumn(columns_to_read, EXTRA_HANDLE_COLUMN_ID) // - && !hasColumn(columns_to_read, VERSION_COLUMN_ID) // - && !hasColumn(columns_to_read, TAG_COLUMN_ID)) + else if (useCleanRead(segment_snap, columns_to_read)) { // No delta, let's try some optimizations. stream = segment_snap->stable->getInputStream( @@ -657,7 +668,8 @@ BlockInputStreamPtr Segment::getInputStreamModeNormal(const DMContext & dm_conte read_info.index_begin, read_info.index_end, expected_block_size, - max_version); + max_version, + need_row_id); } stream = std::make_shared>(stream, real_ranges, 0); @@ -1964,7 +1976,8 @@ SkippableBlockInputStreamPtr Segment::getPlacedStream(const DMContext & dm_conte const IndexIterator & delta_index_begin, const IndexIterator & delta_index_end, size_t expected_block_size, - UInt64 max_version) + UInt64 max_version, + bool need_row_id) { if (unlikely(rowkey_ranges.empty())) throw Exception("rowkey ranges shouldn't be empty", ErrorCodes::LOGICAL_ERROR); @@ -1972,13 +1985,28 @@ SkippableBlockInputStreamPtr Segment::getPlacedStream(const DMContext & dm_conte SkippableBlockInputStreamPtr stable_input_stream = stable_snap->getInputStream(dm_context, read_columns, rowkey_ranges, filter, max_version, expected_block_size, false); RowKeyRange rowkey_range = rowkey_ranges.size() == 1 ? rowkey_ranges[0] : mergeRanges(rowkey_ranges, rowkey_ranges[0].is_common_handle, rowkey_ranges[0].rowkey_column_size); - return std::make_shared>( // - stable_input_stream, - delta_reader, - delta_index_begin, - delta_index_end, - rowkey_range, - expected_block_size); + if (!need_row_id) + { + return std::make_shared>( // + stable_input_stream, + delta_reader, + delta_index_begin, + delta_index_end, + rowkey_range, + expected_block_size, + stable_snap->getDMFilesRows()); + } + else + { + return std::make_shared>( // + stable_input_stream, + delta_reader, + delta_index_begin, + delta_index_end, + rowkey_range, + expected_block_size, + stable_snap->getDMFilesRows()); + } } std::pair Segment::ensurePlace(const DMContext & dm_context, @@ -2224,5 +2252,285 @@ bool Segment::placeDelete(const DMContext & dm_context, return fully_indexed; } +BitmapFilterPtr Segment::buildBitmapFilter(const DMContext & dm_context, + const SegmentSnapshotPtr & segment_snap, + const RowKeyRanges & read_ranges, + const RSOperatorPtr & filter, + UInt64 max_version, + size_t expected_block_size) +{ + RUNTIME_CHECK_MSG(!dm_context.read_delta_only, "Read delta only is unsupported"); + + if (dm_context.read_stable_only || (segment_snap->delta->getRows() == 0 && segment_snap->delta->getDeletes() == 0)) + { + return buildBitmapFilterStableOnly(dm_context, segment_snap, read_ranges, filter, max_version, expected_block_size); + } + else + { + return buildBitmapFilterNormal(dm_context, segment_snap, read_ranges, filter, max_version, expected_block_size); + } +} + +BitmapFilterPtr Segment::buildBitmapFilterNormal(const DMContext & dm_context, + const SegmentSnapshotPtr & segment_snap, + const RowKeyRanges & read_ranges, + const RSOperatorPtr & filter, + UInt64 max_version, + size_t expected_block_size) +{ + Stopwatch sw_total; + static ColumnDefines columns_to_read{ + getExtraHandleColumnDefine(is_common_handle), + }; + auto stream = getInputStreamModeNormal( + dm_context, + columns_to_read, + segment_snap, + read_ranges, + filter, + max_version, + expected_block_size, + /*need_row_id*/ true); + auto total_rows = segment_snap->delta->getRows() + segment_snap->stable->getDMFilesRows(); + auto bitmap_filter = std::make_shared(total_rows, /*default_value*/ false); + bitmap_filter->set(stream); + bitmap_filter->runOptimize(); + LOG_DEBUG(log, "total_rows={} cost={}ms", total_rows, sw_total.elapsedMilliseconds()); + return bitmap_filter; +} + +struct PackInfo +{ + UInt64 pack_id; + UInt64 offset; + UInt64 rows; +}; +std::pair, IdSetPtr> parseDMFilePackInfo(const DMFilePtr & dmfile, + const DMContext & dm_context, + const RowKeyRanges & read_ranges, + const RSOperatorPtr & filter, + UInt64 max_version) +{ + DMFilePackFilter pack_filter = DMFilePackFilter::loadFrom( + dmfile, + dm_context.db_context.getMinMaxIndexCache(), + /*set_cache_if_miss*/ true, + read_ranges, + filter, + /*read_pack*/ {}, + dm_context.db_context.getFileProvider(), + dm_context.db_context.getReadLimiter(), + dm_context.scan_context, + dm_context.tracing_id); + const auto & use_packs = pack_filter.getUsePacks(); + const auto & handle_res = pack_filter.getHandleRes(); + const auto & pack_stats = dmfile->getPackStats(); + + // Packs that all rows compliant with MVCC filter and RowKey filter requirements. + // For building bitmap filter, we don't need to read these packs, + // just set corresponding positions in the bitmap to true. + std::vector all_packs; + // Packs that some rows compliant with MVCC filter and RowKey filter requirements. + // We need to read these packs and do RowKey filter and MVCC filter for them. + auto some_packs = std::make_shared(); + UInt64 rows = 0; + for (size_t pack_id = 0; pack_id < pack_stats.size(); pack_id++) + { + const auto & pack_stat = pack_stats[pack_id]; + rows += pack_stat.rows; + if (!use_packs[pack_id]) + { + continue; + } + + // assert(handle_res[pack_id] == RSResult::Some || handle_res[pack_id] == RSResult::All); + + if (handle_res[pack_id] == RSResult::Some) + { + // We need to read this pack to do RowKey filter. + some_packs->insert(pack_id); + continue; + } + + // assert(handle_res[pack_id] == RSResult::All); + + if (pack_stat.not_clean > 0) + { + // We need to read this pack to do MVCC filter. + some_packs->insert(pack_id); + continue; + } + + if (pack_filter.getMaxVersion(pack_id) > max_version) + { + // We need to read this pack to do MVCC filter. + some_packs->insert(pack_id); + continue; + } + all_packs.push_back({pack_id, rows - pack_stat.rows, pack_stat.rows}); + } + return {all_packs, some_packs}; +} + +BitmapFilterPtr Segment::buildBitmapFilterStableOnly(const DMContext & dm_context, + const SegmentSnapshotPtr & segment_snap, + const RowKeyRanges & read_ranges, + const RSOperatorPtr & filter, + UInt64 max_version, + size_t expected_block_size) +{ + Stopwatch sw; + const auto & dmfiles = segment_snap->stable->getDMFiles(); + RUNTIME_CHECK(!dmfiles.empty(), dmfiles.size()); + std::vector> all_packs{dmfiles.size()}; + std::vector some_packs{dmfiles.size()}; + bool all_match = true; + size_t some_packs_count = 0; + for (size_t i = 0; i < dmfiles.size(); i++) + { + std::tie(all_packs[i], some_packs[i]) = parseDMFilePackInfo(dmfiles[i], + dm_context, + read_ranges, + filter, + max_version); + all_match = all_match && (all_packs[i].size() == dmfiles[i]->getPacks()); + some_packs_count += some_packs[i]->size(); + } + + if (all_match) + { + LOG_DEBUG(log, "all match, total_rows={}, cost={}ms", segment_snap->stable->getDMFilesRows(), sw.elapsedMilliseconds()); + return std::make_shared(segment_snap->stable->getDMFilesRows(), /*default_value*/ true); + } + + auto bitmap_filter = std::make_shared(segment_snap->stable->getDMFilesRows(), /*default_value*/ false); + UInt32 preceded_dmfile_rows = 0; + for (size_t i = 0; i < dmfiles.size(); i++) + { + const auto & all_pack = all_packs[i]; + for (const auto & pack : all_pack) + { + bitmap_filter->set(pack.offset + preceded_dmfile_rows, pack.rows); + } + preceded_dmfile_rows += dmfiles[i]->getRows(); + } + if (some_packs_count <= 0) + { + return bitmap_filter; + } + static ColumnDefines columns_to_read{ + getExtraHandleColumnDefine(is_common_handle), + getVersionColumnDefine(), + getTagColumnDefine(), + }; + BlockInputStreamPtr stream = segment_snap->stable->getInputStream(dm_context, + columns_to_read, + read_ranges, + filter, + max_version, + expected_block_size, + /*enable_handle_clean_read*/ false, + /*is_fast_scan*/ false, + /*enable_del_clean_read*/ false, + /*read_packs*/ some_packs, + /*need_row_id*/ true); + stream = std::make_shared>(stream, read_ranges, 0); + static ColumnDefines read_columns{ + getExtraHandleColumnDefine(is_common_handle), + }; + stream = std::make_shared>( + stream, + read_columns, + max_version, + is_common_handle, + dm_context.tracing_id); + bitmap_filter->set(stream); + LOG_DEBUG(log, "some_packs={}, total_rows={}, cost={}ms", some_packs_count, segment_snap->stable->getDMFilesRows(), sw.elapsedMilliseconds()); + return bitmap_filter; +} + +BlockInputStreamPtr Segment::getBitmapFilterInputStream(BitmapFilterPtr && bitmap_filter, + const SegmentSnapshotPtr & segment_snap, + const DMContext & dm_context, + const ColumnDefines & columns_to_read, + const RowKeyRanges & read_ranges, + const RSOperatorPtr & filter, + UInt64 max_version, + size_t expected_block_size) +{ + auto enable_handle_clean_read = !hasColumn(columns_to_read, EXTRA_HANDLE_COLUMN_ID); + constexpr auto is_fast_scan = true; + auto enable_del_clean_read = !hasColumn(columns_to_read, TAG_COLUMN_ID); + BlockInputStreamPtr stable_stream = segment_snap->stable->getInputStream( + dm_context, + columns_to_read, + read_ranges, + filter, + max_version, + expected_block_size, + enable_handle_clean_read, + is_fast_scan, + enable_del_clean_read); + + auto columns_to_read_ptr = std::make_shared(columns_to_read); + BlockInputStreamPtr delta_stream = std::make_shared( + dm_context, + segment_snap->delta, + columns_to_read_ptr, + this->rowkey_range); + + return std::make_shared( + columns_to_read, + stable_stream, + delta_stream, + segment_snap->stable->getDMFilesRows(), + bitmap_filter, + dm_context.tracing_id); +} + +RowKeyRanges Segment::shrinkRowKeyRanges(const RowKeyRanges & read_ranges) +{ + RowKeyRanges real_ranges; + for (const auto & read_range : read_ranges) + { + auto real_range = rowkey_range.shrink(read_range); + if (!real_range.none()) + real_ranges.emplace_back(std::move(real_range)); + } + return real_ranges; +} + +BlockInputStreamPtr Segment::getBitmapFilterInputStream(const DMContext & dm_context, + const ColumnDefines & columns_to_read, + const SegmentSnapshotPtr & segment_snap, + const RowKeyRanges & read_ranges, + const RSOperatorPtr & filter, + UInt64 max_version, + size_t expected_block_size) +{ + auto real_ranges = shrinkRowKeyRanges(read_ranges); + if (real_ranges.empty()) + { + return std::make_shared(toEmptyBlock(columns_to_read)); + } + auto bitmap_filter = buildBitmapFilter( + dm_context, + segment_snap, + real_ranges, + filter, + max_version, + expected_block_size); + + return getBitmapFilterInputStream( + std::move(bitmap_filter), + segment_snap, + dm_context, + columns_to_read, + real_ranges, + filter, + max_version, + expected_block_size); +} + } // namespace DM } // namespace DB diff --git a/dbms/src/Storages/DeltaMerge/Segment.h b/dbms/src/Storages/DeltaMerge/Segment.h index 084fe00ec6d..c0faefcf817 100644 --- a/dbms/src/Storages/DeltaMerge/Segment.h +++ b/dbms/src/Storages/DeltaMerge/Segment.h @@ -17,6 +17,7 @@ #include #include #include +#include #include #include #include @@ -176,7 +177,8 @@ class Segment const RowKeyRanges & read_ranges, const RSOperatorPtr & filter, UInt64 max_version, - size_t expected_block_size); + size_t expected_block_size, + bool need_row_id = false); BlockInputStreamPtr getInputStreamModeNormal( const DMContext & dm_context, @@ -532,7 +534,11 @@ class Segment void setLastCheckGCSafePoint(DB::Timestamp gc_safe_point) { last_check_gc_safe_point.store(gc_safe_point, std::memory_order_relaxed); } +#ifndef DBMS_PUBLIC_GTEST private: +#else +public: +#endif ReadInfo getReadInfo( const DMContext & dm_context, const ColumnDefines & read_columns, @@ -556,7 +562,8 @@ class Segment const IndexIterator & delta_index_begin, const IndexIterator & delta_index_end, size_t expected_block_size, - UInt64 max_version = std::numeric_limits::max()); + UInt64 max_version = std::numeric_limits::max(), + bool need_row_id = false); /// Make sure that all delta packs have been placed. /// Note that the index returned could be partial index, and cannot be updated to shared index. @@ -592,6 +599,44 @@ class Segment const RowKeyRange & relevant_range, bool relevant_place) const; + static bool useCleanRead(const SegmentSnapshotPtr & segment_snap, + const ColumnDefines & columns_to_read); + RowKeyRanges shrinkRowKeyRanges(const RowKeyRanges & read_ranges); + BitmapFilterPtr buildBitmapFilter(const DMContext & dm_context, + const SegmentSnapshotPtr & segment_snap, + const RowKeyRanges & read_ranges, + const RSOperatorPtr & filter, + UInt64 max_version, + size_t expected_block_size); + BitmapFilterPtr buildBitmapFilterNormal(const DMContext & dm_context, + const SegmentSnapshotPtr & segment_snap, + const RowKeyRanges & read_ranges, + const RSOperatorPtr & filter, + UInt64 max_version, + size_t expected_block_size); + BitmapFilterPtr buildBitmapFilterStableOnly(const DMContext & dm_context, + const SegmentSnapshotPtr & segment_snap, + const RowKeyRanges & read_ranges, + const RSOperatorPtr & filter, + UInt64 max_version, + size_t expected_block_size); + BlockInputStreamPtr getBitmapFilterInputStream(BitmapFilterPtr && bitmap_filter, + const SegmentSnapshotPtr & segment_snap, + const DMContext & dm_context, + const ColumnDefines & columns_to_read, + const RowKeyRanges & read_ranges, + const RSOperatorPtr & filter, + UInt64 max_version, + size_t expected_block_size); + BlockInputStreamPtr getBitmapFilterInputStream(const DMContext & dm_context, + const ColumnDefines & columns_to_read, + const SegmentSnapshotPtr & segment_snap, + const RowKeyRanges & read_ranges, + const RSOperatorPtr & filter, + UInt64 max_version, + size_t expected_block_size); + + private: /// The version of this segment. After split / merge / mergeDelta / replaceData, epoch got increased by 1. const UInt64 epoch; diff --git a/dbms/src/Storages/DeltaMerge/SegmentReadTaskPool.cpp b/dbms/src/Storages/DeltaMerge/SegmentReadTaskPool.cpp index 07236426def..97dca404652 100644 --- a/dbms/src/Storages/DeltaMerge/SegmentReadTaskPool.cpp +++ b/dbms/src/Storages/DeltaMerge/SegmentReadTaskPool.cpp @@ -158,7 +158,7 @@ BlockInputStreamPtr SegmentReadTaskPool::buildInputStream(SegmentReadTaskPtr & t BlockInputStreamPtr stream; auto block_size = std::max(expected_block_size, static_cast(dm_context->db_context.getSettingsRef().dt_segment_stable_pack_rows)); stream = t->segment->getInputStream(read_mode, *dm_context, columns_to_read, t->read_snapshot, t->ranges, filter, max_version, block_size); - LOG_DEBUG(log, "getInputStream succ, pool_id={} segment_id={}", pool_id, t->segment->segmentId()); + LOG_DEBUG(log, "getInputStream succ, read_mode={}, pool_id={} segment_id={}", magic_enum::enum_name(read_mode), pool_id, t->segment->segmentId()); return stream; } diff --git a/dbms/src/Storages/DeltaMerge/SegmentReadTaskPool.h b/dbms/src/Storages/DeltaMerge/SegmentReadTaskPool.h index 12b292726dc..db1bbda7012 100644 --- a/dbms/src/Storages/DeltaMerge/SegmentReadTaskPool.h +++ b/dbms/src/Storages/DeltaMerge/SegmentReadTaskPool.h @@ -129,6 +129,8 @@ enum class ReadMode * are just returned. */ Raw, + + Bitmap, }; // If `enable_read_thread_` is true, `SegmentReadTasksWrapper` use `std::unordered_map` to index `SegmentReadTask` by segment id, diff --git a/dbms/src/Storages/DeltaMerge/SkippableBlockInputStream.h b/dbms/src/Storages/DeltaMerge/SkippableBlockInputStream.h index f864a5a6e3e..f1c4c0f64ee 100644 --- a/dbms/src/Storages/DeltaMerge/SkippableBlockInputStream.h +++ b/dbms/src/Storages/DeltaMerge/SkippableBlockInputStream.h @@ -14,6 +14,7 @@ #pragma once +#include #include #include @@ -52,10 +53,21 @@ class EmptySkippableBlockInputStream : public SkippableBlockInputStream ColumnDefines read_columns; }; +template class ConcatSkippableBlockInputStream : public SkippableBlockInputStream { public: - ConcatSkippableBlockInputStream(SkippableBlockInputStreams inputs_) + explicit ConcatSkippableBlockInputStream(SkippableBlockInputStreams inputs_) + : rows(inputs_.size(), 0) + , precede_stream_rows(0) + { + children.insert(children.end(), inputs_.begin(), inputs_.end()); + current_stream = children.begin(); + } + + ConcatSkippableBlockInputStream(SkippableBlockInputStreams inputs_, std::vector && rows_) + : rows(std::move(rows_)) + , precede_stream_rows(0) { children.insert(children.end(), inputs_.begin(), inputs_.end()); current_stream = children.begin(); @@ -83,6 +95,7 @@ class ConcatSkippableBlockInputStream : public SkippableBlockInputStream else { (*current_stream)->readSuffix(); + precede_stream_rows += rows[current_stream - children.begin()]; ++current_stream; } } @@ -99,10 +112,18 @@ class ConcatSkippableBlockInputStream : public SkippableBlockInputStream res = (*current_stream)->read(); if (res) + { + res.setStartOffset(res.startOffset() + precede_stream_rows); + if constexpr (need_row_id) + { + res.setSegmentRowIdCol(createSegmentRowIdCol(res.startOffset(), res.rows())); + } break; + } else { (*current_stream)->readSuffix(); + precede_stream_rows += rows[current_stream - children.begin()]; ++current_stream; } } @@ -111,7 +132,20 @@ class ConcatSkippableBlockInputStream : public SkippableBlockInputStream } private: + ColumnPtr createSegmentRowIdCol(UInt64 start, UInt64 limit) + { + auto seg_row_id_col = ColumnUInt32::create(); + ColumnUInt32::Container & res = seg_row_id_col->getData(); + res.resize(limit); + for (UInt64 i = 0; i < limit; ++i) + { + res[i] = i + start; + } + return seg_row_id_col; + } BlockInputStreams::iterator current_stream; + std::vector rows; + size_t precede_stream_rows; }; } // namespace DM diff --git a/dbms/src/Storages/DeltaMerge/StableValueSpace.cpp b/dbms/src/Storages/DeltaMerge/StableValueSpace.cpp index d20410f661b..f11fe77692a 100644 --- a/dbms/src/Storages/DeltaMerge/StableValueSpace.cpp +++ b/dbms/src/Storages/DeltaMerge/StableValueSpace.cpp @@ -346,11 +346,15 @@ StableValueSpace::Snapshot::getInputStream( size_t expected_block_size, bool enable_handle_clean_read, bool is_fast_scan, - bool enable_del_clean_read) + bool enable_del_clean_read, + const std::vector & read_packs, + bool need_row_id) { LOG_DEBUG(log, "max_data_version: {}, enable_handle_clean_read: {}, is_fast_mode: {}, enable_del_clean_read: {}", max_data_version, enable_handle_clean_read, is_fast_scan, enable_del_clean_read); SkippableBlockInputStreams streams; - + std::vector rows; + streams.reserve(stable->files.size()); + rows.reserve(stable->files.size()); for (size_t i = 0; i < stable->files.size(); i++) { DMFileBlockInputStreamBuilder builder(context.db_context); @@ -359,10 +363,19 @@ StableValueSpace::Snapshot::getInputStream( .setRSOperator(filter) .setColumnCache(column_caches[i]) .setTracingID(context.tracing_id) - .setRowsThreshold(expected_block_size); + .setRowsThreshold(expected_block_size) + .setReadPacks(read_packs.size() > i ? read_packs[i] : nullptr); streams.push_back(builder.build(stable->files[i], read_columns, rowkey_ranges, context.scan_context)); + rows.push_back(stable->files[i]->getRows()); + } + if (need_row_id) + { + return std::make_shared>(streams, std::move(rows)); + } + else + { + return std::make_shared>(streams, std::move(rows)); } - return std::make_shared(streams); } RowsAndBytes StableValueSpace::Snapshot::getApproxRowsAndBytes(const DMContext & context, const RowKeyRange & range) const diff --git a/dbms/src/Storages/DeltaMerge/StableValueSpace.h b/dbms/src/Storages/DeltaMerge/StableValueSpace.h index af5e98c2206..8ed6120ac89 100644 --- a/dbms/src/Storages/DeltaMerge/StableValueSpace.h +++ b/dbms/src/Storages/DeltaMerge/StableValueSpace.h @@ -16,6 +16,7 @@ #include #include +#include #include #include #include @@ -214,7 +215,9 @@ class StableValueSpace : public std::enable_shared_from_this size_t expected_block_size, bool enable_handle_clean_read, bool is_fast_scan = false, - bool enable_del_clean_read = false); + bool enable_del_clean_read = false, + const std::vector & read_packs = {}, + bool need_row_id = false); RowsAndBytes getApproxRowsAndBytes(const DMContext & context, const RowKeyRange & range) const; diff --git a/dbms/src/Storages/DeltaMerge/tests/gtest_segment_bitmap.cpp b/dbms/src/Storages/DeltaMerge/tests/gtest_segment_bitmap.cpp new file mode 100644 index 00000000000..fd5657fee75 --- /dev/null +++ b/dbms/src/Storages/DeltaMerge/tests/gtest_segment_bitmap.cpp @@ -0,0 +1,643 @@ +// Copyright 2022 PingCAP, Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include + +using namespace std::chrono_literals; +using namespace DB::tests; + +namespace DB::DM::tests +{ +template +::testing::AssertionResult sequenceEqual(const E * expected, const A * actual, size_t size) +{ + for (size_t i = 0; i < size; i++) + { + if (expected[i] != actual[i]) + { + return ::testing::AssertionFailure() + << fmt::format("Value at index {} mismatch: expected {} vs actual {}. expected => {} actual => {}", + i, + expected[i], + actual[i], + std::vector(expected, expected + size), + std::vector(actual, actual + size)); + } + } + return ::testing::AssertionSuccess(); +} + +template +std::vector genSequence(T begin, T end) +{ + auto size = end - begin; + std::vector v(size); + std::iota(v.begin(), v.end(), begin); + return v; +} + +template +std::vector genSequence(const std::vector> & ranges) +{ + std::vector res; + for (auto [begin, end] : ranges) + { + auto v = genSequence(begin, end); + res.insert(res.end(), v.begin(), v.end()); + } + return res; +} + +// "[a, b)" => std::pair{a, b} +template +std::pair parseRange(String & str_range) +{ + boost::algorithm::trim(str_range); + RUNTIME_CHECK(str_range.front() == '[' && str_range.back() == ')', str_range); + std::vector values; + str_range = str_range.substr(1, str_range.size() - 2); + boost::split(values, str_range, boost::is_any_of(",")); + RUNTIME_CHECK(values.size() == 2, str_range); + return {static_cast(std::stol(values[0])), static_cast(std::stol(values[1]))}; +} + +// "[a, b)|[c, d)" => [std::pair{a, b}, std::pair{c, d}] +template +std::vector> parseRanges(std::string_view str_ranges) +{ + std::vector ranges; + boost::split(ranges, str_ranges, boost::is_any_of("|")); + RUNTIME_CHECK(!ranges.empty(), str_ranges); + std::vector> vector_ranges; + for (auto & r : ranges) + { + vector_ranges.emplace_back(parseRange(r)); + } + return vector_ranges; +} + +template +std::vector genSequence(std::string_view str_ranges) +{ + auto vector_ranges = parseRanges(str_ranges); + return genSequence(vector_ranges); +} + +struct SegDataUnit +{ + String type; + std::pair range; +}; + +// "type:[a, b)" => SegDataUnit +SegDataUnit parseSegDataUnit(String & s) +{ + boost::algorithm::trim(s); + std::vector values; + boost::split(values, s, boost::is_any_of(":")); + RUNTIME_CHECK(values.size() == 2, s); + return SegDataUnit{boost::algorithm::trim_copy(values[0]), parseRange(values[1])}; +} + +void check(const std::vector & seg_data_units) +{ + RUNTIME_CHECK(!seg_data_units.empty()); + std::vector stable_units; + std::vector mem_units; + for (size_t i = 0; i < seg_data_units.size(); i++) + { + const auto & type = seg_data_units[i].type; + if (type == "s") + { + stable_units.emplace_back(i); + } + else if (type == "d_mem" || type == "d_mem_del") + { + mem_units.emplace_back(i); + } + auto [begin, end] = seg_data_units[i].range; + RUNTIME_CHECK(begin < end, begin, end); + } + RUNTIME_CHECK(stable_units.empty() || (stable_units.size() == 1 && stable_units[0] == 0)); + std::vector expected_mem_units(mem_units.size()); + std::iota(expected_mem_units.begin(), expected_mem_units.end(), seg_data_units.size() - mem_units.size()); + RUNTIME_CHECK(mem_units == expected_mem_units, expected_mem_units, mem_units); +} + +std::vector parseSegData(std::string_view seg_data) +{ + std::vector str_seg_data_units; + boost::split(str_seg_data_units, seg_data, boost::is_any_of("|")); + RUNTIME_CHECK(!str_seg_data_units.empty(), seg_data); + std::vector seg_data_units; + for (auto & s : str_seg_data_units) + { + seg_data_units.emplace_back(parseSegDataUnit(s)); + } + check(seg_data_units); + return seg_data_units; +} + +class SegmentBitmapFilterTest : public SegmentTestBasic +{ +protected: + DB::LoggerPtr log = DB::Logger::get("SegmentBitmapFilterTest"); + static constexpr auto SEG_ID = DELTA_MERGE_FIRST_SEGMENT_ID; + ColumnPtr hold_row_id; + ColumnPtr hold_handle; + RowKeyRanges read_ranges; + + /* + 0----------------stable_rows----------------stable_rows + delta_rows <-- append + | stable value space | delta value space .......................... <-- append + |--------------------|--ColumnFilePersisted--|ColumnFileInMemory... <-- append + |--------------------|-Tiny|DeleteRange|Big--|ColumnFileInMemory... <-- append + + `seg_data`: s:[a, b)|d_tiny:[a, b)|d_tiny_del:[a, b)|d_big:[a, b)|d_dr:[a, b)|d_mem:[a, b)|d_mem_del + - s: stable + - d_tiny: delta ColumnFileTiny + - d_del_tiny: delta ColumnFileTiny with delete flag + - d_big: delta ColumnFileBig + - d_dr: delta delete range + + Returns {row_id, handle}. + */ + std::pair *, const PaddedPODArray *> writeSegment(std::string_view seg_data) + { + auto seg_data_units = parseSegData(seg_data); + for (const auto & unit : seg_data_units) + { + writeSegment(unit); + } + hold_row_id = getSegmentRowId(SEG_ID, read_ranges); + hold_handle = getSegmentHandle(SEG_ID, read_ranges); + if (hold_row_id == nullptr) + { + RUNTIME_CHECK(hold_handle == nullptr); + return {nullptr, nullptr}; + } + else + { + RUNTIME_CHECK(hold_handle != nullptr); + return {toColumnVectorDataPtr(hold_row_id), toColumnVectorDataPtr(hold_handle)}; + } + } + + void writeSegment(const SegDataUnit & unit) + { + const auto & type = unit.type; + auto [begin, end] = unit.range; + + if (type == "d_mem") + { + SegmentTestBasic::writeSegment(SEG_ID, end - begin, begin); + } + else if (type == "d_mem_del") + { + SegmentTestBasic::writeSegmentWithDeletedPack(SEG_ID, end - begin, begin); + } + else if (type == "d_tiny") + { + SegmentTestBasic::writeSegment(SEG_ID, end - begin, begin); + SegmentTestBasic::flushSegmentCache(SEG_ID); + } + else if (type == "d_tiny_del") + { + SegmentTestBasic::writeSegmentWithDeletedPack(SEG_ID, end - begin, begin); + SegmentTestBasic::flushSegmentCache(SEG_ID); + } + else if (type == "d_big") + { + SegmentTestBasic::ingestDTFileIntoDelta(SEG_ID, end - begin, begin, false); + } + else if (type == "d_dr") + { + SegmentTestBasic::writeSegmentWithDeleteRange(SEG_ID, begin, end); + } + else if (type == "s") + { + SegmentTestBasic::writeSegment(SEG_ID, end - begin, begin); + SegmentTestBasic::mergeSegmentDelta(SEG_ID); + } + else + { + RUNTIME_CHECK(false, type); + } + } + + struct TestCase + { + TestCase(std::string_view seg_data_, + size_t expected_size_, + std::string_view expected_row_id_, + std::string_view expected_handle_) + : seg_data(seg_data_) + , expected_size(expected_size_) + , expected_row_id(expected_row_id_) + , expected_handle(expected_handle_) + {} + std::string seg_data; + size_t expected_size; + std::string expected_row_id; + std::string expected_handle; + }; + + void runTestCase(TestCase test_case) + { + auto [row_id, handle] = writeSegment(test_case.seg_data); + if (test_case.expected_size == 0) + { + ASSERT_EQ(nullptr, row_id); + ASSERT_EQ(nullptr, handle); + } + else + { + ASSERT_EQ(test_case.expected_size, row_id->size()); + auto expected_row_id = genSequence(test_case.expected_row_id); + ASSERT_TRUE(sequenceEqual(expected_row_id.data(), row_id->data(), test_case.expected_size)); + + ASSERT_EQ(test_case.expected_size, handle->size()); + auto expected_handle = genSequence(test_case.expected_handle); + ASSERT_TRUE(sequenceEqual(expected_handle.data(), handle->data(), test_case.expected_size)); + } + } +}; + +TEST_F(SegmentBitmapFilterTest, InMemory_1) +try +{ + runTestCase(TestCase( + "d_mem:[0, 1000)", + 1000, + "[0, 1000)", + "[0, 1000)")); +} +CATCH + +TEST_F(SegmentBitmapFilterTest, InMemory_2) +try +{ + runTestCase(TestCase{ + "d_mem:[0, 1000)|d_mem:[0, 1000)", + 1000, + "[1000, 2000)", + "[0, 1000)"}); +} +CATCH + +TEST_F(SegmentBitmapFilterTest, InMemory_3) +try +{ + runTestCase(TestCase{ + "d_mem:[0, 1000)|d_mem:[100, 200)", + 1000, + "[0, 100)|[1000, 1100)|[200, 1000)", + "[0, 1000)"}); +} +CATCH + +TEST_F(SegmentBitmapFilterTest, InMemory_4) +try +{ + runTestCase(TestCase{ + "d_mem:[0, 1000)|d_mem:[-100, 100)", + 1100, + "[1000, 1200)|[100, 1000)", + "[-100, 1000)"}); +} +CATCH + +TEST_F(SegmentBitmapFilterTest, InMemory_5) +try +{ + runTestCase(TestCase{ + "d_mem:[0, 1000)|d_mem_del:[0, 1000)", + 0, + "", + ""}); +} +CATCH + +TEST_F(SegmentBitmapFilterTest, InMemory_6) +try +{ + runTestCase(TestCase{ + "d_mem:[0, 1000)|d_mem_del:[100, 200)", + 900, + "[0, 100)|[200, 1000)", + "[0, 100)|[200, 1000)"}); +} +CATCH + +TEST_F(SegmentBitmapFilterTest, InMemory_7) +try +{ + runTestCase(TestCase{ + "d_mem:[0, 1000)|d_mem_del:[-100, 100)", + 900, + "[100, 1000)", + "[100, 1000)"}); +} +CATCH + +TEST_F(SegmentBitmapFilterTest, Tiny_1) +try +{ + runTestCase(TestCase{ + "d_tiny:[100, 500)|d_mem:[200, 1000)", + 900, + "[0, 100)|[400, 1200)", + "[100, 1000)"}); +} +CATCH + +TEST_F(SegmentBitmapFilterTest, TinyDel_1) +try +{ + runTestCase(TestCase{ + "d_tiny:[100, 500)|d_tiny_del:[200, 300)|d_mem:[0, 100)", + 400, + "[500, 600)|[0, 100)|[200, 400)", + "[0, 200)|[300, 500)"}); +} +CATCH + +TEST_F(SegmentBitmapFilterTest, DeleteRange) +try +{ + runTestCase(TestCase{ + "d_tiny:[100, 500)|d_dr:[250, 300)|d_mem:[240, 290)", + 390, + "[0, 140)|[400, 450)|[200, 400)", + "[100, 290)|[300, 500)"}); +} +CATCH + +TEST_F(SegmentBitmapFilterTest, Big) +try +{ + runTestCase(TestCase{ + "d_tiny:[100, 500)|d_big:[250, 1000)|d_mem:[240, 290)", + 900, + "[0, 140)|[1150, 1200)|[440, 1150)", + "[100, 1000)"}); +} +CATCH + +TEST_F(SegmentBitmapFilterTest, Stable_1) +try +{ + runTestCase(TestCase{ + "s:[0, 1024)", + 1024, + "[0, 1024)", + "[0, 1024)"}); +} +CATCH + +TEST_F(SegmentBitmapFilterTest, Stable_2) +try +{ + runTestCase(TestCase{ + "s:[0, 1024)|d_dr:[0, 1023)", + 1, + "[1023, 1024)", + "[1023, 1024)"}); +} +CATCH + + +TEST_F(SegmentBitmapFilterTest, Stable_3) +try +{ + runTestCase(TestCase{ + "s:[0, 1024)|d_dr:[128, 256)|d_tiny_del:[300, 310)", + 886, + "[0, 128)|[256, 300)|[310, 1024)", + "[0, 128)|[256, 300)|[310, 1024)"}); +} +CATCH + +TEST_F(SegmentBitmapFilterTest, Mix) +try +{ + runTestCase(TestCase{ + "s:[0, 1024)|d_dr:[128, 256)|d_tiny_del:[300, 310)|d_tiny:[200, 255)|d_mem:[298, 305)", + 946, + "[0, 128)|[1034, 1089)|[256, 298)|[1089, 1096)|[310, 1024)", + "[0, 128)|[200, 255)|[256, 305)|[310, 1024)"}); +} +CATCH + +TEST_F(SegmentBitmapFilterTest, Ranges) +try +{ + read_ranges.emplace_back(buildRowKeyRange(222, 244)); + read_ranges.emplace_back(buildRowKeyRange(300, 303)); + read_ranges.emplace_back(buildRowKeyRange(555, 666)); + runTestCase(TestCase{ + "s:[0, 1024)|d_dr:[128, 256)|d_tiny_del:[300, 310)|d_tiny:[200, 255)|d_mem:[298, 305)", + 136, + "[1056, 1078)|[1091, 1094)|[555, 666)", + "[222, 244)|[300, 303)|[555, 666)"}); +} +CATCH + +TEST_F(SegmentBitmapFilterTest, LogicalSplit) +try +{ + runTestCase(TestCase{ + "s:[0, 1024)|d_dr:[128, 256)|d_tiny_del:[300, 310)|d_tiny:[200, 255)|d_mem:[298, 305)", + 946, + "[0, 128)|[1034, 1089)|[256, 298)|[1089, 1096)|[310, 1024)", + "[0, 128)|[200, 255)|[256, 305)|[310, 1024)"}); + + auto new_seg_id = splitSegmentAt(SEG_ID, 512, Segment::SplitMode::Logical); + + ASSERT_TRUE(new_seg_id.has_value()); + ASSERT_TRUE(areSegmentsSharingStable({SEG_ID, *new_seg_id})); + + auto left_handle = getSegmentHandle(SEG_ID, {}); + const auto * left_h = toColumnVectorDataPtr(left_handle); + auto expected_left_handle = genSequence("[0, 128)|[200, 255)|[256, 305)|[310, 512)"); + ASSERT_EQ(expected_left_handle.size(), left_h->size()); + ASSERT_TRUE(sequenceEqual(expected_left_handle.data(), left_h->data(), left_h->size())); + + auto left_row_id = getSegmentRowId(SEG_ID, {}); + const auto * left_r = toColumnVectorDataPtr(left_row_id); + auto expected_left_row_id = genSequence("[0, 128)|[1034, 1089)|[256, 298)|[1089, 1096)|[310, 512)"); + ASSERT_EQ(expected_left_row_id.size(), left_r->size()); + ASSERT_TRUE(sequenceEqual(expected_left_row_id.data(), left_r->data(), left_r->size())); + + auto right_handle = getSegmentHandle(*new_seg_id, {}); + const auto * right_h = toColumnVectorDataPtr(right_handle); + auto expected_right_handle = genSequence("[512, 1024)"); + ASSERT_EQ(expected_right_handle.size(), right_h->size()); + ASSERT_TRUE(sequenceEqual(expected_right_handle.data(), right_h->data(), right_h->size())); + + auto right_row_id = getSegmentRowId(*new_seg_id, {}); + const auto * right_r = toColumnVectorDataPtr(right_row_id); + auto expected_right_row_id = genSequence("[512, 1024)"); + ASSERT_EQ(expected_right_row_id.size(), right_r->size()); + ASSERT_TRUE(sequenceEqual(expected_right_row_id.data(), right_r->data(), right_r->size())); +} +CATCH + +TEST_F(SegmentBitmapFilterTest, CleanStable) +{ + writeSegment("d_mem:[0, 10000)|d_mem:[20000, 25000)"); + mergeSegmentDelta(SEG_ID, true); + auto [seg, snap] = getSegmentForRead(SEG_ID); + ASSERT_EQ(seg->getDelta()->getRows(), 0); + ASSERT_EQ(seg->getDelta()->getDeletes(), 0); + ASSERT_EQ(seg->getStable()->getRows(), 15000); + auto bitmap_filter = seg->buildBitmapFilterStableOnly( + *dm_context, + snap, + {seg->getRowKeyRange()}, + EMPTY_FILTER, + std::numeric_limits::max(), + DEFAULT_BLOCK_SIZE); + ASSERT_NE(bitmap_filter, nullptr); + std::string expect_result; + expect_result.append(std::string(15000, '1')); + ASSERT_EQ(bitmap_filter->toDebugString(), expect_result); +} + +TEST_F(SegmentBitmapFilterTest, NotCleanStable) +{ + writeSegment("d_mem:[0, 10000)|d_mem:[5000, 15000)"); + mergeSegmentDelta(SEG_ID, true); + auto [seg, snap] = getSegmentForRead(SEG_ID); + ASSERT_EQ(seg->getDelta()->getRows(), 0); + ASSERT_EQ(seg->getDelta()->getDeletes(), 0); + ASSERT_EQ(seg->getStable()->getRows(), 20000); + { + auto bitmap_filter = seg->buildBitmapFilterStableOnly( + *dm_context, + snap, + {seg->getRowKeyRange()}, + EMPTY_FILTER, + std::numeric_limits::max(), + DEFAULT_BLOCK_SIZE); + ASSERT_NE(bitmap_filter, nullptr); + std::string expect_result; + expect_result.append(std::string(5000, '1')); + for (int i = 0; i < 5000; i++) + { + expect_result.append(std::string("01")); + } + expect_result.append(std::string(5000, '1')); + ASSERT_EQ(bitmap_filter->toDebugString(), expect_result); + } + { + // Stale read + ASSERT_EQ(version, 2); + auto bitmap_filter = seg->buildBitmapFilterStableOnly( + *dm_context, + snap, + {seg->getRowKeyRange()}, + EMPTY_FILTER, + 1, + DEFAULT_BLOCK_SIZE); + ASSERT_NE(bitmap_filter, nullptr); + std::string expect_result; + expect_result.append(std::string(5000, '1')); + for (int i = 0; i < 5000; i++) + { + expect_result.append(std::string("10")); + } + expect_result.append(std::string(5000, '0')); + ASSERT_EQ(bitmap_filter->toDebugString(), expect_result); + } +} + +TEST_F(SegmentBitmapFilterTest, StableRange) +{ + writeSegment("d_mem:[0, 50000)"); + mergeSegmentDelta(SEG_ID, true); + auto [seg, snap] = getSegmentForRead(SEG_ID); + ASSERT_EQ(seg->getDelta()->getRows(), 0); + ASSERT_EQ(seg->getDelta()->getDeletes(), 0); + ASSERT_EQ(seg->getStable()->getRows(), 50000); + + auto bitmap_filter = seg->buildBitmapFilterStableOnly( + *dm_context, + snap, + {buildRowKeyRange(10000, 50000)}, // [10000, 50000) + EMPTY_FILTER, + std::numeric_limits::max(), + DEFAULT_BLOCK_SIZE); + ASSERT_NE(bitmap_filter, nullptr); + std::string expect_result; + // [0, 10000) is filtered by range. + expect_result.append(std::string(10000, '0')); + expect_result.append(std::string(40000, '1')); + ASSERT_EQ(bitmap_filter->toDebugString(), expect_result); +} + +TEST_F(SegmentBitmapFilterTest, StableLogicalSplit) +try +{ + writeSegment("d_mem:[0, 50000)"); + mergeSegmentDelta(SEG_ID, true); + auto [seg, snap] = getSegmentForRead(SEG_ID); + ASSERT_EQ(seg->getDelta()->getRows(), 0); + ASSERT_EQ(seg->getDelta()->getDeletes(), 0); + ASSERT_EQ(seg->getStable()->getRows(), 50000); + + auto new_seg_id = splitSegmentAt(SEG_ID, 25000, Segment::SplitMode::Logical); + + ASSERT_TRUE(new_seg_id.has_value()); + ASSERT_TRUE(areSegmentsSharingStable({SEG_ID, *new_seg_id})); + + auto left_handle = getSegmentHandle(SEG_ID, {}); + const auto * left_h = toColumnVectorDataPtr(left_handle); + auto expected_left_handle = genSequence("[0, 25000)"); + ASSERT_EQ(expected_left_handle.size(), left_h->size()); + ASSERT_TRUE(sequenceEqual(expected_left_handle.data(), left_h->data(), left_h->size())); + + auto left_row_id = getSegmentRowId(SEG_ID, {}); + const auto * left_r = toColumnVectorDataPtr(left_row_id); + auto expected_left_row_id = genSequence("[0, 25000)"); + ASSERT_EQ(expected_left_row_id.size(), left_r->size()); + ASSERT_TRUE(sequenceEqual(expected_left_row_id.data(), left_r->data(), left_r->size())); + + auto right_handle = getSegmentHandle(*new_seg_id, {}); + const auto * right_h = toColumnVectorDataPtr(right_handle); + auto expected_right_handle = genSequence("[25000, 50000)"); + ASSERT_EQ(expected_right_handle.size(), right_h->size()); + ASSERT_TRUE(sequenceEqual(expected_right_handle.data(), right_h->data(), right_h->size())); + + auto right_row_id = getSegmentRowId(*new_seg_id, {}); + const auto * right_r = toColumnVectorDataPtr(right_row_id); + auto expected_right_row_id = genSequence("[25000, 50000)"); + ASSERT_EQ(expected_right_row_id.size(), right_r->size()); + ASSERT_TRUE(sequenceEqual(expected_right_row_id.data(), right_r->data(), right_r->size())); +} +CATCH +} // namespace DB::DM::tests diff --git a/dbms/src/Storages/DeltaMerge/tests/gtest_segment_test_basic.cpp b/dbms/src/Storages/DeltaMerge/tests/gtest_segment_test_basic.cpp index 85c54c5e7c0..7c2334d74f8 100644 --- a/dbms/src/Storages/DeltaMerge/tests/gtest_segment_test_basic.cpp +++ b/dbms/src/Storages/DeltaMerge/tests/gtest_segment_test_basic.cpp @@ -399,7 +399,7 @@ void SegmentTestBasic::writeSegment(PageId segment_id, UInt64 write_rows, std::o operation_statistics["write"]++; } -void SegmentTestBasic::ingestDTFileIntoDelta(PageId segment_id, UInt64 write_rows, std::optional start_at) +void SegmentTestBasic::ingestDTFileIntoDelta(PageId segment_id, UInt64 write_rows, std::optional start_at, bool clear) { LOG_INFO(logger_op, "ingestDTFileIntoDelta, segment_id={} write_rows={}", segment_id, write_rows); @@ -431,7 +431,7 @@ void SegmentTestBasic::ingestDTFileIntoDelta(PageId segment_id, UInt64 write_row wbs.data.putRefPage(ref_id, dm_file->pageId()); auto ref_file = DMFile::restore(dm_context->db_context.getFileProvider(), file_id, ref_id, parent_path, DMFile::ReadMetaMode::all()); wbs.writeLogAndData(); - ASSERT_TRUE(segment->ingestDataToDelta(*dm_context, segment->getRowKeyRange(), {ref_file}, /* clear_data_in_range */ true)); + ASSERT_TRUE(segment->ingestDataToDelta(*dm_context, segment->getRowKeyRange(), {ref_file}, /* clear_data_in_range */ clear)); ingest_wbs.rollbackWrittenLogAndData(); } diff --git a/dbms/src/Storages/DeltaMerge/tests/gtest_segment_test_basic.h b/dbms/src/Storages/DeltaMerge/tests/gtest_segment_test_basic.h index 51f966df239..b6fcb709036 100644 --- a/dbms/src/Storages/DeltaMerge/tests/gtest_segment_test_basic.h +++ b/dbms/src/Storages/DeltaMerge/tests/gtest_segment_test_basic.h @@ -63,7 +63,7 @@ class SegmentTestBasic : public DB::base::TiFlashStorageTestBasic * written randomly in the segment range. */ void writeSegment(PageId segment_id, UInt64 write_rows = 100, std::optional start_at = std::nullopt); - void ingestDTFileIntoDelta(PageId segment_id, UInt64 write_rows = 100, std::optional start_at = std::nullopt); + void ingestDTFileIntoDelta(PageId segment_id, UInt64 write_rows = 100, std::optional start_at = std::nullopt, bool clear = false); void ingestDTFileByReplace(PageId segment_id, UInt64 write_rows = 100, std::optional start_at = std::nullopt, bool clear = false); void writeSegmentWithDeletedPack(PageId segment_id, UInt64 write_rows = 100, std::optional start_at = std::nullopt); void deleteRangeSegment(PageId segment_id); @@ -92,6 +92,13 @@ class SegmentTestBasic : public DB::base::TiFlashStorageTestBasic void printFinishedOperations() const; + std::vector readSegment(PageId segment_id, bool need_row_id, const RowKeyRanges & ranges); + ColumnPtr getSegmentRowId(PageId segment_id, const RowKeyRanges & ranges); + ColumnPtr getSegmentHandle(PageId segment_id, const RowKeyRanges & ranges); + void writeSegmentWithDeleteRange(PageId segment_id, Int64 begin, Int64 end); + RowKeyValue buildRowKeyValue(Int64 key); + RowKeyRange buildRowKeyRange(Int64 begin, Int64 end); + protected: std::mt19937 random; @@ -114,6 +121,8 @@ class SegmentTestBasic : public DB::base::TiFlashStorageTestBasic */ void reloadDMContext(); + std::pair getSegmentForRead(PageId segment_id); + protected: inline static constexpr PageId NAMESPACE_ID = 100; diff --git a/dbms/src/Storages/DeltaMerge/tests/gtest_segment_test_basic_bitmap.cpp b/dbms/src/Storages/DeltaMerge/tests/gtest_segment_test_basic_bitmap.cpp new file mode 100644 index 00000000000..4a9d36ec323 --- /dev/null +++ b/dbms/src/Storages/DeltaMerge/tests/gtest_segment_test_basic_bitmap.cpp @@ -0,0 +1,135 @@ +// Copyright 2022 PingCAP, Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include + +namespace CurrentMetrics +{ +extern const Metric DT_SnapshotOfBitmapFilter; +} // namespace CurrentMetrics + +namespace DB::DM::tests +{ + +Block mergeSegmentRowIds(std::vector && blocks) +{ + auto accumulated_block = std::move(blocks[0]); + RUNTIME_CHECK(accumulated_block.segmentRowIdCol() != nullptr); + for (size_t block_idx = 1; block_idx < blocks.size(); ++block_idx) + { + auto block = std::move(blocks[block_idx]); + auto accu_row_id_col = accumulated_block.segmentRowIdCol(); + auto row_id_col = block.segmentRowIdCol(); + RUNTIME_CHECK(row_id_col != nullptr); + auto mut_col = (*std::move(accu_row_id_col)).mutate(); + mut_col->insertRangeFrom(*row_id_col, 0, row_id_col->size()); + accumulated_block.setSegmentRowIdCol(std::move(mut_col)); + } + return accumulated_block; +} + +RowKeyRange SegmentTestBasic::buildRowKeyRange(Int64 begin, Int64 end) +{ + HandleRange range(begin, end); + return RowKeyRange::fromHandleRange(range); +} + +std::pair SegmentTestBasic::getSegmentForRead(PageId segment_id) +{ + RUNTIME_CHECK(segments.find(segment_id) != segments.end()); + auto segment = segments[segment_id]; + auto snapshot = segment->createSnapshot( + *dm_context, + /* for_update */ false, + CurrentMetrics::DT_SnapshotOfBitmapFilter); + RUNTIME_CHECK(snapshot != nullptr); + return {segment, snapshot}; +} +std::vector SegmentTestBasic::readSegment(PageId segment_id, bool need_row_id, const RowKeyRanges & ranges) +{ + auto [segment, snapshot] = getSegmentForRead(segment_id); + ColumnDefines columns_to_read = {getExtraHandleColumnDefine(options.is_common_handle), + getVersionColumnDefine()}; + auto stream = segment->getInputStreamModeNormal( + *dm_context, + columns_to_read, + snapshot, + ranges.empty() ? RowKeyRanges{segment->getRowKeyRange()} : ranges, + nullptr, + std::numeric_limits::max(), + DEFAULT_BLOCK_SIZE, + need_row_id); + std::vector blks; + for (auto blk = stream->read(); blk; blk = stream->read()) + { + blks.push_back(blk); + } + return blks; +} + +ColumnPtr SegmentTestBasic::getSegmentRowId(PageId segment_id, const RowKeyRanges & ranges) +{ + LOG_INFO(logger_op, "getSegmentRowId, segment_id={}", segment_id); + auto blks = readSegment(segment_id, true, ranges); + if (blks.empty()) + { + return nullptr; + } + else + { + auto block = mergeSegmentRowIds(std::move(blks)); + RUNTIME_CHECK(!block.has(EXTRA_HANDLE_COLUMN_NAME)); + RUNTIME_CHECK(block.segmentRowIdCol() != nullptr); + return block.segmentRowIdCol(); + } +} + +ColumnPtr SegmentTestBasic::getSegmentHandle(PageId segment_id, const RowKeyRanges & ranges) +{ + LOG_INFO(logger_op, "getSegmentHandle, segment_id={}", segment_id); + auto blks = readSegment(segment_id, false, ranges); + if (blks.empty()) + { + return nullptr; + } + else + { + auto block = mergeBlocks(std::move(blks)); + RUNTIME_CHECK(block.has(EXTRA_HANDLE_COLUMN_NAME)); + RUNTIME_CHECK(block.segmentRowIdCol() == nullptr); + return block.getByName(EXTRA_HANDLE_COLUMN_NAME).column; + } +} + +void SegmentTestBasic::writeSegmentWithDeleteRange(PageId segment_id, Int64 begin, Int64 end) +{ + auto range = buildRowKeyRange(begin, end); + RUNTIME_CHECK(segments.find(segment_id) != segments.end()); + auto segment = segments[segment_id]; + RUNTIME_CHECK(segment->write(*dm_context, range)); +} +} // namespace DB::DM::tests