From c361d3aa366fe2cfbf2dd3604fb6fa525d543134 Mon Sep 17 00:00:00 2001 From: yanweiqi <592838129@qq.com> Date: Fri, 23 Dec 2022 16:02:14 +0800 Subject: [PATCH] *: remove limitByStream (#6539) ref pingcap/tiflash#5900 --- .../DataStreams/LimitByBlockInputStream.cpp | 92 ------------------- .../src/DataStreams/LimitByBlockInputStream.h | 52 ----------- .../Interpreters/InterpreterSelectQuery.cpp | 24 ----- 3 files changed, 168 deletions(-) delete mode 100644 dbms/src/DataStreams/LimitByBlockInputStream.cpp delete mode 100644 dbms/src/DataStreams/LimitByBlockInputStream.h diff --git a/dbms/src/DataStreams/LimitByBlockInputStream.cpp b/dbms/src/DataStreams/LimitByBlockInputStream.cpp deleted file mode 100644 index 83e93041c34..00000000000 --- a/dbms/src/DataStreams/LimitByBlockInputStream.cpp +++ /dev/null @@ -1,92 +0,0 @@ -// Copyright 2022 PingCAP, Ltd. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -#include -#include - - -namespace DB -{ - -LimitByBlockInputStream::LimitByBlockInputStream(const BlockInputStreamPtr & input, size_t group_size_, const Names & columns) - : columns_names(columns) - , group_size(group_size_) -{ - children.push_back(input); -} - -Block LimitByBlockInputStream::readImpl() -{ - /// Execute until end of stream or until - /// a block with some new records will be gotten. - while (true) - { - Block block = children[0]->read(); - if (!block) - return Block(); - - const ColumnRawPtrs column_ptrs(getKeyColumns(block)); - const size_t rows = block.rows(); - IColumn::Filter filter(rows); - size_t inserted_count = 0; - - for (size_t i = 0; i < rows; ++i) - { - UInt128 key; - SipHash hash; - - for (auto & column : column_ptrs) - column->updateHashWithValue(i, hash); - - hash.get128(key); - - if (keys_counts[key]++ < group_size) - { - inserted_count++; - filter[i] = 1; - } - else - filter[i] = 0; - } - - /// Just go to the next block if there isn't any new records in the current one. - if (!inserted_count) - continue; - - size_t all_columns = block.columns(); - for (size_t i = 0; i < all_columns; ++i) - block.safeGetByPosition(i).column = block.safeGetByPosition(i).column->filter(filter, inserted_count); - - return block; - } -} - -ColumnRawPtrs LimitByBlockInputStream::getKeyColumns(Block & block) const -{ - ColumnRawPtrs column_ptrs; - column_ptrs.reserve(columns_names.size()); - - for (const auto & name : columns_names) - { - auto & column = block.getByName(name).column; - - /// Ignore all constant columns. - if (!column->isColumnConst()) - column_ptrs.emplace_back(column.get()); - } - - return column_ptrs; -} - -} diff --git a/dbms/src/DataStreams/LimitByBlockInputStream.h b/dbms/src/DataStreams/LimitByBlockInputStream.h deleted file mode 100644 index 4a91f0ca9cc..00000000000 --- a/dbms/src/DataStreams/LimitByBlockInputStream.h +++ /dev/null @@ -1,52 +0,0 @@ -// Copyright 2022 PingCAP, Ltd. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -#pragma once - -#include -#include - -namespace DB -{ - -/** Implements LIMIT BY clause witch can be used to obtain a "top N by subgroup". - * - * For example, if you have table T like this (Num: 1 1 3 3 3 4 4 5 7 7 7 7), - * the query SELECT Num FROM T LIMIT 2 BY Num - * will give you the following result: (Num: 1 1 3 3 4 4 5 7 7). - */ -class LimitByBlockInputStream : public IProfilingBlockInputStream -{ -public: - LimitByBlockInputStream(const BlockInputStreamPtr & input, size_t group_size_, const Names & columns); - - String getName() const override { return "LimitBy"; } - - Block getHeader() const override { return children.at(0)->getHeader(); } - -protected: - Block readImpl() override; - -private: - ColumnRawPtrs getKeyColumns(Block & block) const; - -private: - using MapHashed = HashMap; - - const Names columns_names; - const size_t group_size; - MapHashed keys_counts; -}; - -} diff --git a/dbms/src/Interpreters/InterpreterSelectQuery.cpp b/dbms/src/Interpreters/InterpreterSelectQuery.cpp index 47571a6f860..f1a39672a99 100644 --- a/dbms/src/Interpreters/InterpreterSelectQuery.cpp +++ b/dbms/src/Interpreters/InterpreterSelectQuery.cpp @@ -27,7 +27,6 @@ #include #include #include -#include #include #include #include @@ -616,12 +615,6 @@ void InterpreterSelectQuery::executeImpl(Pipeline & pipeline, const BlockInputSt if (need_second_distinct_pass) executeDistinct(pipeline, false, expressions.selected_columns); - if (expressions.has_limit_by) - { - executeExpression(pipeline, expressions.before_limit_by); - executeLimitBy(pipeline); - } - /** We must do projection after DISTINCT because projection may remove some columns. */ executeProjection(pipeline, expressions.final_projection); @@ -1291,23 +1284,6 @@ void InterpreterSelectQuery::executePreLimit(Pipeline & pipeline) } -void InterpreterSelectQuery::executeLimitBy(Pipeline & pipeline) // NOLINT -{ - if (!query.limit_by_value || !query.limit_by_expression_list) - return; - - Names columns; - for (const auto & elem : query.limit_by_expression_list->children) - columns.emplace_back(elem->getColumnName()); - - auto value = safeGet(typeid_cast(*query.limit_by_value).value); - - pipeline.transform([&](auto & stream) { - stream = std::make_shared(stream, value, columns); - }); -} - - bool hasWithTotalsInAnySubqueryInFromClause(const ASTSelectQuery & query) { if (query.group_by_with_totals)