From e9a6e681cd8e27f12adb379ac211bba69e908eda Mon Sep 17 00:00:00 2001 From: Lloyd-Pottiger Date: Mon, 12 Aug 2024 16:09:34 +0800 Subject: [PATCH] pick some from https://github.com/tidbcloud/tiflash-cse/pull/234 to fix tidy Signed-off-by: Lloyd-Pottiger --- .../Coprocessor/DAGQueryBlockInterpreter.h | 125 ------------------ .../DMFileWithVectorIndexBlockInputStream.cpp | 90 ++++++++----- .../DMFileWithVectorIndexBlockInputStream.h | 2 +- .../File/VectorColumnFromIndexReader.cpp | 27 ++-- .../File/VectorColumnFromIndexReader.h | 6 +- .../Storages/DeltaMerge/Index/VectorIndex.h | 2 + 6 files changed, 78 insertions(+), 174 deletions(-) delete mode 100644 dbms/src/Flash/Coprocessor/DAGQueryBlockInterpreter.h diff --git a/dbms/src/Flash/Coprocessor/DAGQueryBlockInterpreter.h b/dbms/src/Flash/Coprocessor/DAGQueryBlockInterpreter.h deleted file mode 100644 index d1440061b16..00000000000 --- a/dbms/src/Flash/Coprocessor/DAGQueryBlockInterpreter.h +++ /dev/null @@ -1,125 +0,0 @@ -// Copyright 2023 PingCAP, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -#pragma once - -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#pragma GCC diagnostic push -#pragma GCC diagnostic ignored "-Wunused-parameter" -#pragma GCC diagnostic ignored "-Wnon-virtual-dtor" -#include -#include -#include -#pragma GCC diagnostic pop - -namespace DB -{ -class DAGQueryBlock; -class ExchangeReceiver; -class DAGExpressionAnalyzer; -struct SubqueryForSet; -class Join; -class Expand2; -using JoinPtr = std::shared_ptr; -using Expand2Ptr = std::shared_ptr; - -/** build ch plan from dag request: dag executors -> ch plan - */ -class DAGQueryBlockInterpreter -{ -public: - DAGQueryBlockInterpreter( - Context & context_, - const std::vector & input_streams_vec_, - const DAGQueryBlock & query_block_, - size_t max_streams_); - - ~DAGQueryBlockInterpreter() = default; - - BlockInputStreams execute(); - -#ifndef DBMS_PUBLIC_GTEST -private: -#endif - void executeImpl(DAGPipeline & pipeline); - void handleMockTableScan(const TiDBTableScan & table_scan, DAGPipeline & pipeline); - void handleTableScan(const TiDBTableScan & table_scan, DAGPipeline & pipeline); - void handleJoin( - const tipb::Join & join, - DAGPipeline & pipeline, - SubqueryForSet & right_query, - size_t fine_grained_shuffle_count); - void handleExchangeReceiver(DAGPipeline & pipeline); - void handleMockExchangeReceiver(DAGPipeline & pipeline); - void handleProjection(DAGPipeline & pipeline, const tipb::Projection & projection); - void handleWindow(DAGPipeline & pipeline, const tipb::Window & window, bool enable_fine_grained_shuffle); - void handleWindowOrder(DAGPipeline & pipeline, const tipb::Sort & window_sort, bool enable_fine_grained_shuffle); - void handleExpand2(DAGPipeline & pipeline, const tipb::Expand2 & expand2); - void executeWhere( - DAGPipeline & pipeline, - const ExpressionActionsPtr & expressionActionsPtr, - String & filter_column, - const String & extra_info = ""); - void executeWindowOrder(DAGPipeline & pipeline, SortDescription sort_desc, bool enable_fine_grained_shuffle); - void executeOrder(DAGPipeline & pipeline, const NamesAndTypes & order_columns); - void executeLimit(DAGPipeline & pipeline); - void executeExpand(DAGPipeline & pipeline, const ExpressionActionsPtr & expr); - void executeExpand2(DAGPipeline & pipeline, const Expand2Ptr & expand); - void executeWindow( - DAGPipeline & pipeline, - WindowDescription & window_description, - bool enable_fine_grained_shuffle); - void executeAggregation( - DAGPipeline & pipeline, - const ExpressionActionsPtr & expression_actions_ptr, - const Names & key_names, - const TiDB::TiDBCollators & collators, - AggregateDescriptions & aggregate_descriptions, - bool is_final_agg, - bool enable_fine_grained_shuffle); - void executeProject(DAGPipeline & pipeline, NamesWithAliases & project_cols, const String & extra_info = ""); - void handleExchangeSender(DAGPipeline & pipeline); - void handleMockExchangeSender(DAGPipeline & pipeline); - - void recordProfileStreams(DAGPipeline & pipeline, const String & key); - - void recordJoinExecuteInfo(size_t build_side_index, const JoinPtr & join_ptr); - - void restorePipelineConcurrency(DAGPipeline & pipeline); - - DAGContext & dagContext() const; - - Context & context; - std::vector input_streams_vec; - const DAGQueryBlock & query_block; - - NamesWithAliases final_project; - - /// How many streams we ask for storage to produce, and in how many threads we will do further processing. - size_t max_streams = 1; - - std::unique_ptr analyzer; - - LoggerPtr log; -}; -} // namespace DB diff --git a/dbms/src/Storages/DeltaMerge/File/DMFileWithVectorIndexBlockInputStream.cpp b/dbms/src/Storages/DeltaMerge/File/DMFileWithVectorIndexBlockInputStream.cpp index d0864ccb302..8a51aad6841 100644 --- a/dbms/src/Storages/DeltaMerge/File/DMFileWithVectorIndexBlockInputStream.cpp +++ b/dbms/src/Storages/DeltaMerge/File/DMFileWithVectorIndexBlockInputStream.cpp @@ -20,6 +20,8 @@ #include #include +#include + namespace DB::ErrorCodes { @@ -130,14 +132,38 @@ Block DMFileWithVectorIndexBlockInputStream::readImpl(FilterPtr & res_filter) else res = readByIndexReader(); - // Filter the output rows. If no rows need to filter, res_filter is nullptr. - filter.resize(res.rows()); - bool all_match = valid_rows_after_search.get(filter, res.startOffset(), res.rows()); + if (!res) + return {}; - if unlikely (all_match) - res_filter = nullptr; - else - res_filter = &filter; + // Assign output filter according to sorted_results. + // + // For example, if sorted_results is [3, 10], the complete filter array is: + // [0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 1] + // And we should only return filter array starting from res.startOffset(), like: + // [0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 1] + // ^startOffset ^startOffset+rows + // filter: [0, 0, 0, 0, 0] + // + // We will use startOffset as lowerBound (inclusive), ans startOffset+rows + // as upperBound (exclusive) to find whether this range has a match in sorted_results. + + const auto start_offset = res.startOffset(); + const auto max_rowid_exclusive = start_offset + res.rows(); + + filter.clear(); + filter.resize_fill(res.rows(), 0); + + auto it = std::lower_bound(sorted_results.begin(), sorted_results.end(), start_offset); + while (it != sorted_results.end()) + { + auto rowid = *it; + if (rowid >= max_rowid_exclusive) + break; + filter[rowid - start_offset] = 1; + ++it; + } + + res_filter = &filter; return res; } @@ -173,12 +199,18 @@ Block DMFileWithVectorIndexBlockInputStream::readByIndexReader() block.setStartOffset(block_start_row_id); auto vec_column = vec_cd.type->createColumn(); + vec_column->reserve(read_rows); Stopwatch w; vec_column_reader->read(vec_column, read_pack_id, read_rows); duration_read_from_vec_index_seconds += w.elapsedSeconds(); - block.insert(ColumnWithTypeAndName{std::move(vec_column), vec_cd.type, vec_cd.name, vec_cd.id}); + block.insert(ColumnWithTypeAndName{// + std::move(vec_column), + vec_cd.type, + vec_cd.name, + vec_cd.id}); + return block; } @@ -192,16 +224,19 @@ Block DMFileWithVectorIndexBlockInputStream::readByFollowingOtherColumns() if (!block_others) return {}; + auto read_rows = block_others.rows(); + // Using vec_cd.type to construct a Column directly instead of using // the type from dmfile, so that we don't need extra transforms // (e.g. wrap with a Nullable). vec_column_reader is compatible with // both Nullable and NotNullable. auto vec_column = vec_cd.type->createColumn(); + vec_column->reserve(read_rows); // Then read from vector index for the same pack. w.restart(); - vec_column_reader->read(vec_column, getPackIdFromBlock(block_others), block_others.rows()); + vec_column_reader->read(vec_column, getPackIdFromBlock(block_others), read_rows); duration_read_from_vec_index_seconds += w.elapsedSeconds(); // Re-assemble block using the same layout as header_layout. @@ -298,7 +333,8 @@ void DMFileWithVectorIndexBlockInputStream::loadVectorIndex() PerfContext::file_cache.fg_wait_download_from_s3 > perf_begin.fg_wait_download_from_s3) has_s3_download = true; - duration_load_index += watch.elapsedSeconds(); + auto download_duration = watch.elapsedSeconds(); + duration_load_index += download_duration; } else { @@ -354,7 +390,10 @@ void DMFileWithVectorIndexBlockInputStream::loadVectorSearchResult() auto perf_begin = PerfContext::vector_search; RUNTIME_CHECK(valid_rows.size() >= dmfile->getRows(), valid_rows.size(), dmfile->getRows()); - auto results_rowid = vec_index->search(ann_query_info, valid_rows); + sorted_results = vec_index->search(ann_query_info, valid_rows); + std::sort(sorted_results.begin(), sorted_results.end()); + // results must not contain duplicates. Usually there should be no duplicates. + sorted_results.erase(std::unique(sorted_results.begin(), sorted_results.end()), sorted_results.end()); auto discarded_nodes = PerfContext::vector_search.discarded_nodes - perf_begin.discarded_nodes; auto visited_nodes = PerfContext::vector_search.visited_nodes - perf_begin.visited_nodes; @@ -364,24 +403,10 @@ void DMFileWithVectorIndexBlockInputStream::loadVectorSearchResult() scan_context->total_vector_idx_search_discarded_nodes += discarded_nodes; scan_context->total_vector_idx_search_visited_nodes += visited_nodes; - size_t rows_after_mvcc = valid_rows.count(); - size_t rows_after_vector_search = results_rowid.size(); - - // After searching with the BitmapFilter, we create a bitmap - // to exclude rows that are not in the search result, because these rows - // are produced as [] or NULL, which is not a valid vector for future use. - // The bitmap will be used when returning the output to the caller. - { - valid_rows_after_search = BitmapFilter(valid_rows.size(), false); - for (auto rowid : results_rowid) - valid_rows_after_search.set(rowid, 1, true); - valid_rows_after_search.runOptimize(); - } - vec_column_reader = std::make_shared( // dmfile, vec_index, - std::move(results_rowid)); + sorted_results); // Vector index is very likely to filter out some packs. For example, // if we query for Top 1, then only 1 pack will be remained. So we @@ -393,7 +418,7 @@ void DMFileWithVectorIndexBlockInputStream::loadVectorSearchResult() auto & pack_res = reader.pack_filter.getPackRes(); size_t results_it = 0; - const size_t results_it_max = results_rowid.size(); + const size_t results_it_max = sorted_results.size(); UInt32 pack_start = 0; @@ -406,8 +431,8 @@ void DMFileWithVectorIndexBlockInputStream::loadVectorSearchResult() UInt32 pack_end = pack_start + pack_stats[pack_id].rows; while (results_it < results_it_max // - && results_rowid[results_it] >= pack_start // - && results_rowid[results_it] < pack_end) + && sorted_results[results_it] >= pack_start // + && sorted_results[results_it] < pack_end) { pack_has_result = true; results_it++; @@ -428,7 +453,7 @@ void DMFileWithVectorIndexBlockInputStream::loadVectorSearchResult() log, "Finished vector search over column dmf_{}/{}(id={}), cost={:.2f}s " "top_k_[query/visited/discarded/result]={}/{}/{}/{} " - "rows_[file/after_mvcc/after_search]={}/{}/{} " + "rows_[file/after_search]={}/{} " "pack_[total/before_search/after_search]={}/{}/{}", dmfile->fileId(), @@ -439,11 +464,10 @@ void DMFileWithVectorIndexBlockInputStream::loadVectorSearchResult() ann_query_info->top_k(), visited_nodes, // Visited nodes will be larger than query_top_k when there are MVCC rows discarded_nodes, // How many nodes are skipped by MVCC - results_rowid.size(), + sorted_results.size(), dmfile->getRows(), - rows_after_mvcc, - rows_after_vector_search, + sorted_results.size(), pack_stats.size(), valid_packs_before_search, diff --git a/dbms/src/Storages/DeltaMerge/File/DMFileWithVectorIndexBlockInputStream.h b/dbms/src/Storages/DeltaMerge/File/DMFileWithVectorIndexBlockInputStream.h index fdccd6a8786..6f82fb293d8 100644 --- a/dbms/src/Storages/DeltaMerge/File/DMFileWithVectorIndexBlockInputStream.h +++ b/dbms/src/Storages/DeltaMerge/File/DMFileWithVectorIndexBlockInputStream.h @@ -183,7 +183,7 @@ class DMFileWithVectorIndexBlockInputStream : public SkippableBlockInputStream // Set after load(). VectorColumnFromIndexReaderPtr vec_column_reader = nullptr; // Set after load(). Used to filter the output rows. - BitmapFilter valid_rows_after_search{0, false}; + std::vector sorted_results{}; // Key is rowid IColumn::Filter filter; bool loaded = false; diff --git a/dbms/src/Storages/DeltaMerge/File/VectorColumnFromIndexReader.cpp b/dbms/src/Storages/DeltaMerge/File/VectorColumnFromIndexReader.cpp index 25d4842e75a..5614838b37a 100644 --- a/dbms/src/Storages/DeltaMerge/File/VectorColumnFromIndexReader.cpp +++ b/dbms/src/Storages/DeltaMerge/File/VectorColumnFromIndexReader.cpp @@ -12,8 +12,11 @@ // See the License for the specific language governing permissions and // limitations under the License. +#include #include +#include + namespace DB::DM { @@ -31,20 +34,22 @@ std::vector VectorColumnFromIndexReader::calcPackStartRowID(const DMFile } MutableColumnPtr VectorColumnFromIndexReader::calcResultsByPack( - std::vector && results, + const std::vector & sorted_results, const DMFileMeta::PackStats & pack_stats, const std::vector & pack_start_rowid) { auto column = ColumnArray::create(ColumnUInt32::create()); - // results must be in ascending order. - std::sort(results.begin(), results.end()); - // results must not contain duplicates. Usually there should be no duplicates. - results.erase(std::unique(results.begin(), results.end()), results.end()); +#ifndef NDEBUG + { + const auto sorted = std::is_sorted(sorted_results.begin(), sorted_results.end()); + RUNTIME_CHECK(sorted); + } +#endif std::vector offsets_in_pack; size_t results_it = 0; - const size_t results_it_max = results.size(); + const size_t results_it_max = sorted_results.size(); for (size_t pack_id = 0, pack_id_max = pack_start_rowid.size(); pack_id < pack_id_max; pack_id++) { offsets_in_pack.clear(); @@ -53,14 +58,13 @@ MutableColumnPtr VectorColumnFromIndexReader::calcResultsByPack( UInt32 pack_end = pack_start + pack_stats[pack_id].rows; while (results_it < results_it_max // - && results[results_it] >= pack_start // - && results[results_it] < pack_end) + && sorted_results[results_it] >= pack_start // + && sorted_results[results_it] < pack_end) { - offsets_in_pack.push_back(results[results_it] - pack_start); + offsets_in_pack.push_back(sorted_results[results_it] - pack_start); results_it++; } - // insert column->insertData( reinterpret_cast(offsets_in_pack.data()), offsets_in_pack.size() * sizeof(UInt32)); @@ -75,7 +79,6 @@ void VectorColumnFromIndexReader::read(MutableColumnPtr & column, size_t start_p { std::vector value; const auto * results_by_pack = checkAndGetColumn(this->results_by_pack.get()); - checkAndGetColumn(column.get()); size_t pack_id = start_pack_id; UInt32 remaining_rows_in_pack = pack_stats[pack_id].rows; @@ -111,7 +114,7 @@ void VectorColumnFromIndexReader::read(MutableColumnPtr & column, size_t start_p } RUNTIME_CHECK(filled_result_rows == offset_in_pack); - // TODO(vector-index): We could fill multiple rows if rowid is continuous. + // TODO: We could fill multiple rows if rowid is continuous. VectorIndexViewer::Key rowid = pack_start_rowid[pack_id] + offset_in_pack; index->get(rowid, value); column->insertData(reinterpret_cast(value.data()), value.size() * sizeof(Float32)); diff --git a/dbms/src/Storages/DeltaMerge/File/VectorColumnFromIndexReader.h b/dbms/src/Storages/DeltaMerge/File/VectorColumnFromIndexReader.h index 056d1efcf70..5fff067dc72 100644 --- a/dbms/src/Storages/DeltaMerge/File/VectorColumnFromIndexReader.h +++ b/dbms/src/Storages/DeltaMerge/File/VectorColumnFromIndexReader.h @@ -53,7 +53,7 @@ class VectorColumnFromIndexReader static std::vector calcPackStartRowID(const DMFileMeta::PackStats & pack_stats); static MutableColumnPtr calcResultsByPack( - std::vector && results, + const std::vector & results, const DMFileMeta::PackStats & pack_stats, const std::vector & pack_start_rowid); @@ -63,12 +63,12 @@ class VectorColumnFromIndexReader explicit VectorColumnFromIndexReader( const DMFilePtr & dmfile_, const VectorIndexViewerPtr & index_, - std::vector && results_) + const std::vector & sorted_results_) : dmfile(dmfile_) , pack_stats(dmfile_->getPackStats()) , pack_start_rowid(calcPackStartRowID(pack_stats)) , index(index_) - , results_by_pack(calcResultsByPack(std::move(results_), pack_stats, pack_start_rowid)) + , results_by_pack(calcResultsByPack(sorted_results_, pack_stats, pack_start_rowid)) {} void read(MutableColumnPtr & column, size_t start_pack_id, UInt32 read_rows); diff --git a/dbms/src/Storages/DeltaMerge/Index/VectorIndex.h b/dbms/src/Storages/DeltaMerge/Index/VectorIndex.h index 9a555aea027..451950be438 100644 --- a/dbms/src/Storages/DeltaMerge/Index/VectorIndex.h +++ b/dbms/src/Storages/DeltaMerge/Index/VectorIndex.h @@ -23,6 +23,8 @@ #include #include #include + + namespace DB::DM {