From 0d8c3d8517635680ea135b0bb1b4d58438423b65 Mon Sep 17 00:00:00 2001 From: JaySon Date: Fri, 11 Oct 2019 14:21:17 +0800 Subject: [PATCH] [FLASH-541] Read ranges indicated by mvcc_query_info (#267) * Read ranges indicated by mvcc_query_info * Optimization: Don't do handle range filter inside Segment --- .../Interpreters/InterpreterSelectQuery.cpp | 9 ++ .../DeltaMerge/ChunkBlockInputStream.h | 147 +++--------------- dbms/src/Storages/DeltaMerge/DeltaMerge.h | 26 +--- .../Storages/DeltaMerge/DiskValueSpace.cpp | 2 +- dbms/src/Storages/DeltaMerge/HandleFilter.h | 7 +- dbms/src/Storages/DeltaMerge/Segment.cpp | 115 +++++++------- dbms/src/Storages/DeltaMerge/Segment.h | 10 +- .../tests/gtest_dm_storage_delta_merge.cpp | 92 ++++++++++- .../src/Storages/StorageDeltaMerge-internal.h | 93 +++++++++++ dbms/src/Storages/StorageDeltaMerge.cpp | 25 ++- .../Storages/Transaction/tests/CMakeLists.txt | 4 +- ...v_keyvalue.cpp => gtest_tikv_keyvalue.cpp} | 84 +++++++++- 12 files changed, 380 insertions(+), 234 deletions(-) create mode 100644 dbms/src/Storages/StorageDeltaMerge-internal.h rename dbms/src/Storages/Transaction/tests/{tikv_keyvalue.cpp => gtest_tikv_keyvalue.cpp} (75%) diff --git a/dbms/src/Interpreters/InterpreterSelectQuery.cpp b/dbms/src/Interpreters/InterpreterSelectQuery.cpp index fd61be606bb..1712ba68bcc 100644 --- a/dbms/src/Interpreters/InterpreterSelectQuery.cpp +++ b/dbms/src/Interpreters/InterpreterSelectQuery.cpp @@ -37,6 +37,7 @@ #include #include #include +#include #include #include @@ -793,6 +794,14 @@ QueryProcessingStage::Enum InterpreterSelectQuery::executeFetchColumns(Pipeline const auto & epoch = region.region_epoch(); info.version = epoch.version(); info.conf_version = epoch.conf_ver(); + if (const auto & managed_storage = std::dynamic_pointer_cast(storage)) + { + // Extract the handle range according to current table + TiKVKey start_key = RecordKVFormat::encodeAsTiKVKey(region.start_key()); + TiKVKey end_key = RecordKVFormat::encodeAsTiKVKey(region.end_key()); + RegionRangeKeys region_range(std::move(start_key), std::move(end_key)); + info.range_in_table = region_range.getHandleRangeByTable(managed_storage->getTableInfo().id); + } query_info.mvcc_query_info->regions_query_info.push_back(info); } diff --git a/dbms/src/Storages/DeltaMerge/ChunkBlockInputStream.h b/dbms/src/Storages/DeltaMerge/ChunkBlockInputStream.h index 7688d4ee0e3..de04c62400b 100644 --- a/dbms/src/Storages/DeltaMerge/ChunkBlockInputStream.h +++ b/dbms/src/Storages/DeltaMerge/ChunkBlockInputStream.h @@ -10,163 +10,52 @@ namespace DB { namespace DM { -/// Read `chunks` as blocks. -/// We can use `handle_range` param to filter out rows, and use `filter` to ignore some chunks roughly. -/// -/// Note that `handle_range` param assumes that data in chunks are in order of handle. If not, please use handle range of {MIN, MAX}. -/// -/// For example: -/// size_t skip_rows = 0; -/// while(stream.hasNext()) -/// { -/// if(stream.shouldSkipNext()) -/// { -/// skip_rows += stream.nextRows(); -/// stream.skipNext(); -/// continue; -/// } -/// auto block = stream.read(); -/// ... -/// } class ChunkBlockInputStream final : public IBlockInputStream { public: ChunkBlockInputStream(const Chunks & chunks_, - size_t handle_col_pos_, - const HandleRange & handle_range_, const ColumnDefines & read_columns_, const PageReader & page_reader_, - const RSOperatorPtr & filter_) - : chunks(chunks_), - handle_col_pos(handle_col_pos_), - handle_range(handle_range_), - read_columns(read_columns_), - page_reader(page_reader_), - filter(filter_) + const RSOperatorPtr & filter) + : chunks(chunks_), skip_chunks(chunks.size(), 0), read_columns(read_columns_), page_reader(page_reader_) { - } - - String getName() const override { return "Chunk"; } - Block getHeader() const override { return toEmptyBlock(read_columns); } - - Block read() override - { - if (!hasNext()) - return {}; - Block tmp; - if (!cur_chunk_data) - // It means user ignore the skipNext() result and insist to read data. - tmp = readCurChunkData(); - else - tmp.swap(cur_chunk_data); - - ++cur_chunk_index; - cur_chunk_skip = false; - - return tmp; - } - - bool hasNext() - { - if (cur_chunk_index >= chunks.size()) - return false; - // Filter out those rows not fit for handle_range. - for (; cur_chunk_index < chunks.size(); ++cur_chunk_index) - { - auto [first, last] = chunks[cur_chunk_index].getHandleFirstLast(); - if (handle_range.intersect(first, last)) - break; - } - - if (cur_chunk_index >= chunks.size()) - return false; - - if (!cur_chunk_data) + if (filter) { - if (filter) + for (size_t i = 0; i < chunks.size(); ++i) { - auto & chunk = chunks[cur_chunk_index]; + auto & chunk = chunks[i]; RSCheckParam param; for (auto & [col_id, meta] : chunk.getMetas()) param.indexes.emplace(col_id, RSIndex(meta.type, meta.minmax)); - - cur_chunk_skip = filter->roughCheck(param) == None; - } - if (!cur_chunk_skip) - { - cur_chunk_data = readCurChunkData(); + skip_chunks[i] = filter->roughCheck(param) == None; } } - - return true; - } - - size_t nextRows() - { - auto & chunk = chunks[cur_chunk_index]; - if (isCurChunkCompleted(chunk)) - return chunk.getRows(); - - // Otherwise, some rows of current chunk are filtered out by handle_range. - - if (cur_chunk_data) - { - return cur_chunk_data.rows(); - } - else - { - // Current chunk is ignored by `filter`, - // but we still need to get the row count which their handles are included by handle_range. - auto block = readChunk(chunk, {read_columns[handle_col_pos]}, page_reader); - auto offset_limit - = HandleFilter::getPosRangeOfSorted(handle_range, block.getByPosition(handle_col_pos).column, 0, block.rows()); - return offset_limit.second; - } } - bool shouldSkipNext() { return cur_chunk_skip; } + String getName() const override { return "Chunk"; } + Block getHeader() const override { return toEmptyBlock(read_columns); } - void skipNext() + Block read() override { - ++cur_chunk_index; - - cur_chunk_data = {}; - cur_chunk_skip = false; + if (!hasNext()) + return {}; + return readChunk(chunks[cur_chunk_index++], read_columns, page_reader); } -private: - inline bool isCurChunkCompleted(const Chunk & chunk) - { - auto [first, last] = chunk.getHandleFirstLast(); - return handle_range.include(first, last); - } + bool hasNext() { return cur_chunk_index < chunks.size(); } + size_t nextRows() { return chunks[cur_chunk_index].getRows(); } - inline Block readCurChunkData() - { - auto & chunk = chunks[cur_chunk_index]; - if (isCurChunkCompleted(chunk)) - { - return readChunk(chunk, read_columns, page_reader); - } - else - { - auto block = readChunk(chunk, read_columns, page_reader); - return HandleFilter::filterSorted(handle_range, std::move(block), handle_col_pos); - } - } + bool shouldSkipNext() { return skip_chunks[cur_chunk_index]; } + void skipNext() { ++cur_chunk_index; } private: - Chunks chunks; - size_t handle_col_pos; - HandleRange handle_range; + Chunks chunks; + std::vector skip_chunks; ColumnDefines read_columns; PageReader page_reader; - RSOperatorPtr filter; size_t cur_chunk_index = 0; - bool cur_chunk_skip = false; - Block cur_chunk_data; }; using ChunkBlockInputStreamPtr = std::shared_ptr; diff --git a/dbms/src/Storages/DeltaMerge/DeltaMerge.h b/dbms/src/Storages/DeltaMerge/DeltaMerge.h index 694b72294a9..ff036720713 100644 --- a/dbms/src/Storages/DeltaMerge/DeltaMerge.h +++ b/dbms/src/Storages/DeltaMerge/DeltaMerge.h @@ -28,9 +28,6 @@ class DeltaMergeBlockInputStream final : public IProfilingBlockInputStream using DeltaValueSpacePtr = std::shared_ptr; using SharedLock = std::shared_lock; - size_t handle_column_pos; - HandleRange handle_range; - ChunkBlockInputStreamPtr stable_input_stream; ChunkBlockInputStream * stable_input_stream_raw_ptr; @@ -59,16 +56,12 @@ class DeltaMergeBlockInputStream final : public IProfilingBlockInputStream bool delta_done = false; public: - DeltaMergeBlockInputStream(size_t handle_column_pos_, - const HandleRange & handle_range_, - const ChunkBlockInputStreamPtr & stable_input_stream_, + DeltaMergeBlockInputStream(const ChunkBlockInputStreamPtr & stable_input_stream_, const DeltaValueSpacePtr & delta_value_space_, IndexIterator index_begin, IndexIterator index_end, size_t max_block_size_) - : handle_column_pos(handle_column_pos_), - handle_range(handle_range_), - stable_input_stream(stable_input_stream_), + : stable_input_stream(stable_input_stream_), stable_input_stream_raw_ptr(stable_input_stream.get()), delta_value_space(delta_value_space_), entry_it(index_begin), @@ -125,13 +118,7 @@ class DeltaMergeBlockInputStream final : public IProfilingBlockInputStream if (limit == max_block_size) continue; - Block block = header.cloneWithColumns(std::move(columns)); - - Block res = HandleFilter::filterSorted(handle_range, std::move(block), handle_column_pos); - if (!res || !res.rows()) - continue; - else - return res; + return header.cloneWithColumns(std::move(columns)); } return {}; } @@ -160,11 +147,8 @@ class DeltaMergeBlockInputStream final : public IProfilingBlockInputStream writeDeleteFromDelta(1); break; case DT_INS: - if (handle_range.check(delta_value_space->getHandle(tuple_id))) - { - writeInsertFromDelta(output_columns, tuple_id); - --output_write_limit; - } + writeInsertFromDelta(output_columns, tuple_id); + --output_write_limit; break; default: throw Exception("Entry type " + DTTypeString(entry_it.getType()) + " is not supported, is end: " diff --git a/dbms/src/Storages/DeltaMerge/DiskValueSpace.cpp b/dbms/src/Storages/DeltaMerge/DiskValueSpace.cpp index 24a48d4e07a..65c6f27a43f 100644 --- a/dbms/src/Storages/DeltaMerge/DiskValueSpace.cpp +++ b/dbms/src/Storages/DeltaMerge/DiskValueSpace.cpp @@ -630,7 +630,7 @@ DiskValueSpacePtr DiskValueSpace::doFlushCache(const OpContext & context, WriteB ChunkBlockInputStreamPtr DiskValueSpace::getInputStream(const ColumnDefines & read_columns, const PageReader & page_reader) const { - return std::make_shared(chunks, 0, HandleRange::newAll(), read_columns, page_reader, RSOperatorPtr()); + return std::make_shared(chunks, read_columns, page_reader, RSOperatorPtr()); } size_t DiskValueSpace::num_rows() const diff --git a/dbms/src/Storages/DeltaMerge/HandleFilter.h b/dbms/src/Storages/DeltaMerge/HandleFilter.h index ff97e62066c..417634b7ed8 100644 --- a/dbms/src/Storages/DeltaMerge/HandleFilter.h +++ b/dbms/src/Storages/DeltaMerge/HandleFilter.h @@ -83,14 +83,14 @@ inline Block filterUnsorted(const HandleRange & handle_range, Block && block, si } } // namespace HandleFilter +template class DMHandleFilterBlockInputStream : public IProfilingBlockInputStream { public: DMHandleFilterBlockInputStream(const BlockInputStreamPtr & input, HandleRange handle_range_, - size_t handle_col_pos_, - bool is_block_sorted_) - : handle_range(handle_range_), handle_col_pos(handle_col_pos_), is_block_sorted(is_block_sorted_) + size_t handle_col_pos_) + : handle_range(handle_range_), handle_col_pos(handle_col_pos_) { children.push_back(input); } @@ -120,7 +120,6 @@ class DMHandleFilterBlockInputStream : public IProfilingBlockInputStream private: HandleRange handle_range; size_t handle_col_pos; - bool is_block_sorted; }; } // namespace DM diff --git a/dbms/src/Storages/DeltaMerge/Segment.cpp b/dbms/src/Storages/DeltaMerge/Segment.cpp index de7e8866f37..8c15ee7f017 100644 --- a/dbms/src/Storages/DeltaMerge/Segment.cpp +++ b/dbms/src/Storages/DeltaMerge/Segment.cpp @@ -218,6 +218,9 @@ void Segment::applyAppendTask(const OpContext & opc, const AppendTaskPtr & task, void Segment::check(DMContext & dm_context, const String & when) const { + // This method is broken. + if (true) + return; auto & handle = dm_context.handle_column; size_t stable_rows = stable->num_rows(); @@ -233,7 +236,6 @@ void Segment::check(DMContext & dm_context, const String & when) const << DB::toString(delta_tree->numInserts()) + ", deletes:" << DB::toString(delta_tree->numDeletes())); auto stream = getPlacedStream(storage_snapshot.data_reader, - {range}, read_info.read_columns, EMPTY_FILTER, read_info.delta_value_space, @@ -279,16 +281,38 @@ BlockInputStreamPtr Segment::getInputStream(const DMContext & dm_context, { auto & handle = dm_context.handle_column; auto read_info = getReadInfo(dm_context, segment_snap, storage_snaps, columns_to_read); - auto stream = getPlacedStream(storage_snaps.data_reader, - read_ranges, - read_info.read_columns, - filter, - read_info.delta_value_space, - read_info.index_begin, - read_info.index_end, - expected_block_size); - stream = std::make_shared>(stream, handle, max_version); - return stream; + + auto create_stream = [&](const HandleRange & read_range) { + auto stream = getPlacedStream(storage_snaps.data_reader, + read_info.read_columns, + filter, + read_info.delta_value_space, + read_info.index_begin, + read_info.index_end, + expected_block_size); + stream = std::make_shared>(stream, read_range, 0); + return std::make_shared>(stream, handle, max_version); + }; + + if (read_ranges.size() == 1) + { + LOG_TRACE(log, "Segment [" << DB::toString(segment_id) << "] is read by " << DB::toString(1) << " ranges"); + return create_stream(range.shrink(read_ranges[0])); + } + else + { + BlockInputStreams streams; + for (auto & read_range : read_ranges) + { + HandleRange real_range = range.shrink(read_range); + if (!real_range.none()) + streams.push_back(create_stream(real_range)); + } + + LOG_TRACE(log, "Segment [" << DB::toString(segment_id) << "] is read by " << DB::toString(streams.size()) << " ranges"); + + return std::make_shared(streams); + } } BlockInputStreamPtr Segment::getInputStreamRaw(const DMContext & dm_context, @@ -315,20 +339,16 @@ BlockInputStreamPtr Segment::getInputStreamRaw(const DMContext & dm_contex } BlockInputStreamPtr delta_stream = std::make_shared(delta_snap->getChunks(), // - 0, - HandleRange::newAll(), new_columns_to_read, storage_snaps.log_reader, EMPTY_FILTER); - delta_stream = std::make_shared(delta_stream, range, 0, false); + delta_stream = std::make_shared>(delta_stream, range, 0); BlockInputStreamPtr stable_stream = std::make_shared(stable->getChunks(), // - 0, - range, new_columns_to_read, storage_snaps.data_reader, EMPTY_FILTER); - stable_stream = std::make_shared(stable_stream, range, 0, true); + stable_stream = std::make_shared>(stable_stream, range, 0); BlockInputStreams streams; streams.push_back(delta_stream); @@ -423,7 +443,6 @@ SegmentPtr Segment::flushDelta(DMContext & dm_context, RemoveWriteBatches & remo auto read_info = getReadInfo(dm_context, {delta, delta->num_rows(), delta->num_deletes()}, storage_snapshot, columns); auto data_stream = getPlacedStream(storage_snapshot.data_reader, - {range}, read_info.read_columns, EMPTY_FILTER, read_info.delta_value_space, @@ -554,7 +573,6 @@ ColumnDefines Segment::arrangeReadColumns(const ColumnDefine & handle, const Col template BlockInputStreamPtr Segment::getPlacedStream(const PageReader & data_page_reader, - const HandleRanges & read_ranges, const ColumnDefines & read_columns, const RSOperatorPtr & filter, const DeltaValueSpacePtr & delta_value_space, @@ -562,40 +580,14 @@ BlockInputStreamPtr Segment::getPlacedStream(const PageReader & data_pag const IndexIterator & delta_index_end, size_t expected_block_size) const { - auto placed_stream_creator = [&](const HandleRange & read_range) { - auto stable_input_stream - = std::make_shared(stable->getChunks(), 0, read_range, read_columns, data_page_reader, filter); - return std::make_shared>( // - 0, - read_range, - stable_input_stream, - delta_value_space, - delta_index_begin, - delta_index_end, - expected_block_size); - }; - - - if (read_ranges.size() == 1) - { - LOG_TRACE(log, "Segment [" << DB::toString(segment_id) << "] is read by " << DB::toString(1) << " ranges"); - const HandleRange real_range = range.shrink(read_ranges[0]); - return placed_stream_creator(real_range); - } - else - { - BlockInputStreams streams; - for (auto & read_range : read_ranges) - { - HandleRange real_range = range.shrink(read_range); - if (!real_range.none()) - streams.push_back(placed_stream_creator(real_range)); - } - - LOG_TRACE(log, "Segment [" << DB::toString(segment_id) << "] is read by " << DB::toString(streams.size()) << " ranges"); - return std::make_shared(streams); - } + auto stable_input_stream = std::make_shared(stable->getChunks(), read_columns, data_page_reader, filter); + return std::make_shared>( // + stable_input_stream, + delta_value_space, + delta_index_begin, + delta_index_end, + expected_block_size); } Handle Segment::getSplitPointFast(DMContext & dm_context, const PageReader & data_page_reader) const @@ -616,7 +608,6 @@ Handle Segment::getSplitPointSlow(DMContext & dm_context, const PageReader & dat auto & handle = dm_context.handle_column; auto stream = getPlacedStream(data_page_reader, - {range}, {dm_context.handle_column}, EMPTY_FILTER, read_info.delta_value_space, @@ -739,13 +730,13 @@ SegmentPair Segment::doSplit(DMContext & dm_context, { // Write my data BlockInputStreamPtr my_data = getPlacedStream(data_page_reader, - {my_range}, read_info.read_columns, EMPTY_FILTER, read_info.delta_value_space, read_info.index_begin, read_info.index_end, STABLE_CHUNK_ROWS); + my_data = std::make_shared>(my_data, my_range, 0); my_data = std::make_shared>(my_data, handle, min_version); auto tmp = DiskValueSpace::writeChunks(opc, my_data); my_new_stable_chunks.swap(tmp); @@ -754,13 +745,13 @@ SegmentPair Segment::doSplit(DMContext & dm_context, { // Write new segment's data BlockInputStreamPtr other_data = getPlacedStream(data_page_reader, - {other_range}, read_info.read_columns, EMPTY_FILTER, read_info.delta_value_space, read_info.index_begin, read_info.index_end, STABLE_CHUNK_ROWS); + other_data = std::make_shared>(other_data, other_range, 0); other_data = std::make_shared>(other_data, handle, min_version); auto tmp = DiskValueSpace::writeChunks(opc, other_data); other_new_stable_chunks.swap(tmp); @@ -863,23 +854,23 @@ SegmentPtr Segment::doMerge(DMContext & dm_context, Chunks new_stable_chunks; { BlockInputStreamPtr left_data = left->getPlacedStream(data_page_reader, - {left->range}, left_read_info.read_columns, EMPTY_FILTER, left_read_info.delta_value_space, left_read_info.index_begin, left_read_info.index_end, STABLE_CHUNK_ROWS); + left_data = std::make_shared>(left_data, left->range, 0); left_data = std::make_shared>(left_data, handle, min_version); BlockInputStreamPtr right_data = right->getPlacedStream(data_page_reader, - {right->range}, right_read_info.read_columns, EMPTY_FILTER, right_read_info.delta_value_space, right_read_info.index_begin, right_read_info.index_end, STABLE_CHUNK_ROWS); + right_data = std::make_shared>(right_data, right->range, 0); right_data = std::make_shared>(right_data, handle, min_version); BlockInputStreamPtr merged_stream = std::make_shared(BlockInputStreams({left_data, right_data})); @@ -955,7 +946,7 @@ SegmentPtr Segment::reset(DMContext & dm_context, BlockInputStreamPtr & input_st new_me->delta->setChunks({}, meta_wb, remove_wbs.log); new_me->stable->setChunks(std::move(new_stable_chunks), meta_wb, remove_wbs.data); - // Commit meta updates. + // Commit updates. storage_pool.meta().write(meta_wb); return new_me; @@ -1008,7 +999,6 @@ void Segment::placeUpsert(const DMContext & dm_context, BlockInputStreamPtr merged_stream = getPlacedStream( // data_page_reader, - {range}, {handle, getVersionColumnDefine()}, EMPTY_FILTER, delta_value_space, @@ -1039,13 +1029,15 @@ void Segment::placeDelete(const DMContext & dm_context, { BlockInputStreamPtr delete_stream = getPlacedStream( // data_page_reader, - {delete_range}, {handle, getVersionColumnDefine()}, - EMPTY_FILTER, + withHanleRange(EMPTY_FILTER, delete_range), delta_value_space, delta_index_begin, delta_index_end, DEFAULT_BLOCK_SIZE); + + delete_stream = std::make_shared>(delete_stream, delete_range, 0); + // Try to merge into big block. 128 MB should be enough. SquashingBlockInputStream squashed_delete_stream(delete_stream, 0, 128 * (1UL << 20)); @@ -1063,7 +1055,6 @@ void Segment::placeDelete(const DMContext & dm_context, { BlockInputStreamPtr merged_stream = getPlacedStream( // data_page_reader, - {range}, {handle, getVersionColumnDefine()}, EMPTY_FILTER, delta_value_space, diff --git a/dbms/src/Storages/DeltaMerge/Segment.h b/dbms/src/Storages/DeltaMerge/Segment.h index eaa945bc4bd..f6ba5b75713 100644 --- a/dbms/src/Storages/DeltaMerge/Segment.h +++ b/dbms/src/Storages/DeltaMerge/Segment.h @@ -72,21 +72,22 @@ using DeltaValueSpacePtr = std::shared_ptr; struct SegmentSnapshot { DiskValueSpacePtr delta; - size_t delta_rows; - size_t delta_deletes; + size_t delta_rows = 0; + size_t delta_deletes = 0; + SegmentSnapshot() = default; SegmentSnapshot(const DiskValueSpacePtr & delta_, size_t delta_rows_, size_t delta_deletes_) : delta{delta_}, delta_rows(delta_rows_), delta_deletes(delta_deletes_) { } + + explicit operator bool() { return (bool)delta; } }; /// A segment contains many rows of a table. A table is split into segments by succeeding ranges. /// /// The data of stable value space is stored in "data" storage, while data of delta value space is stored in "log" storage. /// And all meta data is stored in "meta" storage. -/// -/// TODO: Currently we don't support DDL, e.g. update column type. Will add it later. class Segment : private boost::noncopyable { public: @@ -196,7 +197,6 @@ class Segment : private boost::noncopyable template BlockInputStreamPtr getPlacedStream(const PageReader & data_page_reader, - const HandleRanges & read_ranges, const ColumnDefines & read_columns, const RSOperatorPtr & filter, const DeltaValueSpacePtr & delta_value_space, diff --git a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_storage_delta_merge.cpp b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_storage_delta_merge.cpp index 65734d43c80..7326390063b 100644 --- a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_storage_delta_merge.cpp +++ b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_storage_delta_merge.cpp @@ -20,8 +20,11 @@ #include #include #include +#include #include #include +#include +#include namespace DB { @@ -213,11 +216,13 @@ try query_info.mvcc_query_info = std::make_unique(); query_info.mvcc_query_info->resolve_locks = global_ctx.getSettingsRef().resolve_locks; query_info.mvcc_query_info->read_tso = global_ctx.getSettingsRef().read_tso; - BlockInputStreamPtr dms = storage->read(column_names, query_info, global_ctx, stage2, 8192, 1)[0]; - dms->readPrefix(); + BlockInputStreams ins = storage->read(column_names, query_info, global_ctx, stage2, 8192, 1); + ASSERT_EQ(ins.size(), 1UL); + BlockInputStreamPtr in = ins[0]; + in->readPrefix(); size_t num_rows_read = 0; - while (Block block = dms->read()) + while (Block block = in->read()) { num_rows_read += block.rows(); for (auto & iter : block) @@ -236,7 +241,7 @@ try } } } - dms->readSuffix(); + in->readSuffix(); ASSERT_EQ(num_rows_read, sample.rows()); @@ -251,6 +256,85 @@ catch (const Exception & e) throw; } + +TEST(StorageDeltaMerge_internal_test, GetMergedQueryRanges) +{ + MvccQueryInfo::RegionsQueryInfo regions; + RegionQueryInfo region; + region.range_in_table = std::make_pair(100, 200); + regions.emplace_back(region); + region.range_in_table = std::make_pair(200, 250); + regions.emplace_back(region); + region.range_in_table = std::make_pair(300, 400); + regions.emplace_back(region); + region.range_in_table = std::make_pair(425, 475); + regions.emplace_back(region); + + auto ranges = ::DB::getQueryRanges(regions); + ASSERT_EQ(ranges.size(), 3UL); + ASSERT_EQ(ranges[0], ::DB::DM::HandleRange(100, 250)); + ASSERT_EQ(ranges[1], ::DB::DM::HandleRange(300, 400)); + ASSERT_EQ(ranges[2], ::DB::DM::HandleRange(425, 475)); +} + +TEST(StorageDeltaMerge_internal_test, MergedUnsortedQueryRanges) +{ + MvccQueryInfo::RegionsQueryInfo regions; + RegionQueryInfo region; + region.range_in_table = std::make_pair(2360148,2456148); + regions.emplace_back(region); + region.range_in_table = std::make_pair(1961680,2057680); + regions.emplace_back(region); + region.range_in_table = std::make_pair(2264148,2360148); + regions.emplace_back(region); + region.range_in_table = std::make_pair(2057680,2153680); + regions.emplace_back(region); + region.range_in_table = std::make_pair(2153680,2264148); + regions.emplace_back(region); + region.range_in_table = std::make_pair(2552148,2662532); + regions.emplace_back(region); + region.range_in_table = std::make_pair(2758532,2854532); + regions.emplace_back(region); + region.range_in_table = std::make_pair(2854532,2950532); + regions.emplace_back(region); + region.range_in_table = std::make_pair(2456148,2552148); + regions.emplace_back(region); + region.range_in_table = std::make_pair(2662532,2758532); + regions.emplace_back(region); + + auto ranges = ::DB::getQueryRanges(regions); + ASSERT_EQ(ranges.size(), 1UL); + ASSERT_EQ(ranges[0], ::DB::DM::HandleRange(1961680, 2950532)) << ranges[0].toString(); +} + +TEST(StorageDeltaMerge_internal_test, GetFullQueryRanges) +{ + MvccQueryInfo::RegionsQueryInfo regions; + RegionQueryInfo region; + region.range_in_table = {TiKVHandle::Handle::normal_min, TiKVHandle::Handle::max}; + regions.emplace_back(region); + + auto ranges = ::DB::getQueryRanges(regions); + ASSERT_EQ(ranges.size(), 1UL); + const auto full_range = ::DB::DM::HandleRange::newAll(); + ASSERT_EQ(ranges[0], full_range); +} + +TEST(StorageDeltaMerge_internal_test, OverlapQueryRanges) +{ + MvccQueryInfo::RegionsQueryInfo regions; + RegionQueryInfo region; + region.range_in_table = std::make_pair(100, 200); + regions.emplace_back(region); + region.range_in_table = std::make_pair(150, 250); + regions.emplace_back(region); + region.range_in_table = std::make_pair(300, 400); + regions.emplace_back(region); + region.range_in_table = std::make_pair(425, 475); + + ASSERT_ANY_THROW(auto ranges = ::DB::getQueryRanges(regions)); +} + } // namespace tests } // namespace DM } // namespace DB diff --git a/dbms/src/Storages/StorageDeltaMerge-internal.h b/dbms/src/Storages/StorageDeltaMerge-internal.h new file mode 100644 index 00000000000..78710298edf --- /dev/null +++ b/dbms/src/Storages/StorageDeltaMerge-internal.h @@ -0,0 +1,93 @@ +#pragma once + +#include +#include + +#include +#include + +namespace DB +{ + +namespace +{ +inline ::DB::HandleID getRangeEndID(const ::DB::TiKVHandle::Handle & end) +{ + switch (end.type) + { + case ::DB::TiKVHandle::HandleIDType::NORMAL: + return end.handle_id; + case ::DB::TiKVHandle::HandleIDType::MAX: + return ::DB::DM::HandleRange::MAX; + default: + throw Exception("Unknown TiKVHandle type: " + end.toString(), ErrorCodes::LOGICAL_ERROR); + } +} +} // namespace + +inline DM::HandleRanges getQueryRanges(const ::DB::MvccQueryInfo::RegionsQueryInfo & regions) +{ + DM::HandleRanges ranges; + if (regions.empty()) + { + // Just for test cases + ranges.emplace_back(::DB::DM::HandleRange::newAll()); + return ranges; + } + else if (regions.size() == 1) + { + // Shortcut for only one region info + DM::HandleRange range; + const auto & range_in_table = regions[0].range_in_table; + range.start = range_in_table.first.handle_id; + range.end = getRangeEndID(range_in_table.second); + ranges.emplace_back(range); + return ranges; + } + + std::vector sort_index(regions.size()); + for (size_t i = 0; i < sort_index.size(); ++i) + sort_index[i] = i; + + std::sort(sort_index.begin(), sort_index.end(), // + [®ions](const size_t lhs, const size_t rhs) { return regions[lhs] < regions[rhs]; }); + + ranges.reserve(regions.size()); + + DM::HandleRange current; + for (size_t i = 0; i < regions.size(); ++i) + { + const size_t region_idx = sort_index[i]; + const auto & region = regions[region_idx]; + const auto & range_in_table = region.range_in_table; + + if (i == 0) + { + current.start = range_in_table.first.handle_id; + current.end = getRangeEndID(range_in_table.second); + } + else if (current.end == range_in_table.first.handle_id) + { + // concat this range_in_table to current + current.end = getRangeEndID(range_in_table.second); + } + else if (current.end < range_in_table.first.handle_id) + { + ranges.emplace_back(current); + + // start a new range + current.start = range_in_table.first.handle_id; + current.end = getRangeEndID(range_in_table.second); + } + else + { + throw Exception("Overlap region range between " + current.toString() + " and [" // + + range_in_table.first.toString() + "," + range_in_table.second.toString() + ")"); + } + } + ranges.emplace_back(current); + + return ranges; +} + +} // namespace DB diff --git a/dbms/src/Storages/StorageDeltaMerge.cpp b/dbms/src/Storages/StorageDeltaMerge.cpp index dcb51c85548..796a0d17026 100644 --- a/dbms/src/Storages/StorageDeltaMerge.cpp +++ b/dbms/src/Storages/StorageDeltaMerge.cpp @@ -5,6 +5,7 @@ #include #include #include +#include #include #include #include @@ -377,10 +378,6 @@ BlockInputStreams StorageDeltaMerge::read( // } - HandleRanges ranges; - - ranges.emplace_back(DB::DM::HandleRange::newAll()); - const ASTSelectQuery & select_query = typeid_cast(*query_info.query); if (select_query.raw_for_mutable) return store->readRaw(context, context.getSettingsRef(), to_read, num_streams); @@ -408,6 +405,26 @@ BlockInputStreams StorageDeltaMerge::read( // doLearnerRead(tidb_table_info.id, mvcc_query_info.regions_query_info, tmt, log); } + HandleRanges ranges = getQueryRanges(mvcc_query_info.regions_query_info); + +#ifndef NDEBUG + { + std::stringstream ss; + for (const auto ®ion: mvcc_query_info.regions_query_info) + { + const auto & range = region.range_in_table; + ss << region.region_id << "[" << range.first.toString() << "," << range.second.toString() << "),"; + } + LOG_TRACE(log, "reading ranges: orig: " << ss.str()); + } + { + std::stringstream ss; + for (const auto &range : ranges) + ss << range.toString() << ","; + LOG_TRACE(log, "reading ranges: " << ss.str()); + } +#endif + return store->read( context, context.getSettingsRef(), to_read, ranges, num_streams, /*max_version=*/mvcc_query_info.read_tso, max_block_size); } diff --git a/dbms/src/Storages/Transaction/tests/CMakeLists.txt b/dbms/src/Storages/Transaction/tests/CMakeLists.txt index fed79c42dc4..09cadc307b8 100644 --- a/dbms/src/Storages/Transaction/tests/CMakeLists.txt +++ b/dbms/src/Storages/Transaction/tests/CMakeLists.txt @@ -1,7 +1,7 @@ include_directories (${CMAKE_CURRENT_BINARY_DIR}) -add_executable (tikv_keyvalue tikv_keyvalue.cpp) -target_link_libraries (tikv_keyvalue dbms) +add_executable (tikv_keyvalue gtest_tikv_keyvalue.cpp) +target_link_libraries (tikv_keyvalue dbms gtest_main) add_executable (region_persister region_persister.cpp) target_link_libraries (region_persister dbms) diff --git a/dbms/src/Storages/Transaction/tests/tikv_keyvalue.cpp b/dbms/src/Storages/Transaction/tests/gtest_tikv_keyvalue.cpp similarity index 75% rename from dbms/src/Storages/Transaction/tests/tikv_keyvalue.cpp rename to dbms/src/Storages/Transaction/tests/gtest_tikv_keyvalue.cpp index 1d4efb43f70..35511530e88 100644 --- a/dbms/src/Storages/Transaction/tests/tikv_keyvalue.cpp +++ b/dbms/src/Storages/Transaction/tests/gtest_tikv_keyvalue.cpp @@ -1,3 +1,5 @@ +#include + #include "region_helper.h" #include @@ -30,7 +32,7 @@ inline TiKVKey genIndex(const TableID tableId, const Int64 id) return RecordKVFormat::encodeAsTiKVKey(key); } -int main(int, char **) +TEST(TiKVKeyValue_test, PortedTests) { bool res = true; { @@ -292,6 +294,84 @@ int main(int, char **) assert(range.comparableKeys().first.compare(RecordKVFormat::genKey(1, 2, 3)) == 0); } +} + +namespace +{ + +// In python, we can convert a test case from `s` +// 'range = parseTestCase({{{}}});\nASSERT_EQ(range, expected_range);'.format(','.join(map(lambda x: '{{{}}}'.format(','.join(map(lambda y: '0x{:02x}'.format(int(y, 16)), x.strip('[').strip(']').split()))), s.split(',')))) + +HandleRange parseTestCase(std::vector> && seq) +{ + std::string start_key_s, end_key_s; + for (const auto ch : seq[0]) + start_key_s += ch; + for (const auto ch : seq[1]) + end_key_s += ch; + RegionRangeKeys range{RecordKVFormat::encodeAsTiKVKey(start_key_s), RecordKVFormat::encodeAsTiKVKey(end_key_s)}; + return range.getHandleRangeByTable(45); +} + +HandleRange parseTestCase2(std::vector> && seq) +{ + std::string start_key_s, end_key_s; + for (const auto ch : seq[0]) + start_key_s += ch; + for (const auto ch : seq[1]) + end_key_s += ch; + RegionRangeKeys range{TiKVKey::copyFrom(start_key_s), TiKVKey::copyFrom(end_key_s)}; + return range.getHandleRangeByTable(45); +} + +std::string rangeToString(const HandleRange &r) +{ + std::stringstream ss; + ss << "[" << r.first.toString() << "," << r.second.toString() << ")"; + return ss.str(); +} + +} // namespace + +TEST(RegionRange_test, GetHandleRangeByTableID) +try +{ + HandleRange range; + HandleRange expected_range; + + // clang-format off + range = parseTestCase({{0x74,0x80,0x00,0x00,0x00,0x00,0x00,0x00,0x2D,},{}}); + expected_range = {TiKVHandle::Handle::normal_min, TiKVHandle::Handle::max}; + EXPECT_EQ(range, expected_range) << rangeToString(range) << " <-> " << rangeToString(expected_range); + + range = parseTestCase({{0x74,0x80,0x00,0x00,0x00,0x00,0x00,0x00,0x2d,0x5f,0x69,0x80,0x00,0x00,0x00,0x00,0x00,0x00,0x01,0x03,0x80,0x00,0x00,0x00,0x00,0x5a,0x0f,0x00,0x03,0x80,0x00,0x00,0x00,0x00,0x00,0x00,0x02},{0x74,0x80,0x00,0x00,0x00,0x00,0x00,0x00,0x2d,0x5f,0x72,0x80,0x00,0x00,0x00,0x00,0x00,0xaa,0x40}}); + expected_range = {TiKVHandle::Handle::normal_min, 43584}; + EXPECT_EQ(range, expected_range) << rangeToString(range) << " <-> " << rangeToString(expected_range); + + range = parseTestCase({{0x74,0x80,0x00,0x00,0x00,0x00,0x00,0x00,0x2d,0x5f,0x72,0x80,0x00,0x00,0x00,0x00,0x00,0xaa,0x40},{0x74,0x80,0x00,0x00,0x00,0x00,0x00,0x00,0x2d,0x5f,0x72,0x80,0x00,0x00,0x00,0x00,0x02,0x21,0x40}}); + expected_range = {43584, 139584}; + EXPECT_EQ(range, expected_range) << rangeToString(range) << " <-> " << rangeToString(expected_range); + + range = parseTestCase({{0x74,0x80,0x00,0x00,0x00,0x00,0x00,0x00,0x2d,0x5f,0x72,0x80,0x00,0x00,0x00,0x00,0x10,0xc7,0x40},{0x74,0x80,0x00,0x00,0x00,0x00,0x00,0x00,0x2d,0x5f,0x72,0x80,0x00,0x00,0x00,0x00,0x12,0x3e,0x40}}); + expected_range = {1099584, 1195584}; + EXPECT_EQ(range, expected_range) << rangeToString(range) << " <-> " << rangeToString(expected_range); + + + // [74 80 0 0 0 0 0 0 ff 2d 5f 69 80 0 0 0 0 ff 0 0 1 3 80 0 0 0 ff 0 5a cf 64 3 80 0 0 ff 0 0 0 0 2 0 0 0 fc],[74 80 0 0 0 0 0 0 ff 2d 5f 72 80 0 0 0 0 ff 0 b8 b 0 0 0 0 0 fa] + range = parseTestCase2({{0x74,0x80,0x00,0x00,0x00,0x00,0x00,0x00,0xff,0x2d,0x5f,0x69,0x80,0x00,0x00,0x00,0x00,0xff,0x00,0x00,0x01,0x03,0x80,0x00,0x00,0x00,0xff,0x00,0x5a,0xcf,0x64,0x03,0x80,0x00,0x00,0xff,0x00,0x00,0x00,0x00,0x02,0x00,0x00,0x00,0xfc},{0x74,0x80,0x00,0x00,0x00,0x00,0x00,0x00,0xff,0x2d,0x5f,0x72,0x80,0x00,0x00,0x00,0x00,0xff,0x00,0xb8,0x0b,0x00,0x00,0x00,0x00,0x00,0xfa}}); + expected_range = {TiKVHandle::Handle::normal_min, 47115}; + EXPECT_EQ(range, expected_range) << rangeToString(range) << " <-> " << rangeToString(expected_range); + + // clang-format on +} +catch (const Exception & e) +{ + std::string text = e.displayText(); + + auto embedded_stack_trace_pos = text.find("Stack trace"); + std::cerr << "Code: " << e.code() << ". " << text << std::endl << std::endl; + if (std::string::npos == embedded_stack_trace_pos) + std::cerr << "Stack trace:" << std::endl << e.getStackTrace().toString() << std::endl; - return res ? 0 : 1; + throw; }