diff --git a/dbms/src/DataStreams/LimitBlockInputStream.cpp b/dbms/src/DataStreams/LimitBlockInputStream.cpp index c205aa91982..4edd19c5ce0 100644 --- a/dbms/src/DataStreams/LimitBlockInputStream.cpp +++ b/dbms/src/DataStreams/LimitBlockInputStream.cpp @@ -9,12 +9,19 @@ LimitBlockInputStream::LimitBlockInputStream( const BlockInputStreamPtr & input, size_t limit_, size_t offset_, +<<<<<<< HEAD const LogWithPrefixPtr & log_, bool always_read_till_end_) : limit(limit_) , offset(offset_) , always_read_till_end(always_read_till_end_) , log(getMPPTaskLog(log_, NAME)) +======= + const String & req_id) + : log(Logger::get(req_id)) + , limit(limit_) + , offset(offset_) +>>>>>>> 3f0dae0d3f (fix the problem that offset in limit query for tiflash system tables doesn't take effect (#6745)) { children.push_back(input); } @@ -25,8 +32,11 @@ Block LimitBlockInputStream::readImpl() Block res; size_t rows = 0; +<<<<<<< HEAD /// pos - how many lines were read, including the last read block +======= +>>>>>>> 3f0dae0d3f (fix the problem that offset in limit query for tiflash system tables doesn't take effect (#6745)) if (pos >= offset + limit) { if (!always_read_till_end) @@ -51,6 +61,7 @@ Block LimitBlockInputStream::readImpl() /// give away the whole block if (pos >= offset + rows && pos <= offset + limit) return res; +<<<<<<< HEAD /// give away a piece of the block size_t start = std::max( @@ -58,6 +69,29 @@ Block LimitBlockInputStream::readImpl() static_cast(offset) - static_cast(pos) + static_cast(rows)); size_t length = std::min( +======= + } + + do + { + res = children.back()->read(); + if (!res) + return res; + rows = res.rows(); + pos += rows; + } while (pos <= offset); + + /// give away the whole block + if (pos >= offset + rows && pos <= offset + limit) + return res; + + /// give away a piece of the block + UInt64 start = std::max( + static_cast(0), + static_cast(offset) - static_cast(pos) + static_cast(rows)); + + UInt64 length = std::min( +>>>>>>> 3f0dae0d3f (fix the problem that offset in limit query for tiflash system tables doesn't take effect (#6745)) static_cast(limit), std::min( static_cast(pos) - static_cast(offset), @@ -69,4 +103,11 @@ Block LimitBlockInputStream::readImpl() return res; } +<<<<<<< HEAD +======= +void LimitBlockInputStream::appendInfo(FmtBuffer & buffer) const +{ + buffer.fmtAppend(", limit = {}", limit); +} +>>>>>>> 3f0dae0d3f (fix the problem that offset in limit query for tiflash system tables doesn't take effect (#6745)) } // namespace DB diff --git a/dbms/src/DataStreams/LimitBlockInputStream.h b/dbms/src/DataStreams/LimitBlockInputStream.h index dee386f2f78..69985ddcbb0 100644 --- a/dbms/src/DataStreams/LimitBlockInputStream.h +++ b/dbms/src/DataStreams/LimitBlockInputStream.h @@ -22,8 +22,12 @@ class LimitBlockInputStream : public IProfilingBlockInputStream const BlockInputStreamPtr & input, size_t limit_, size_t offset_, +<<<<<<< HEAD const LogWithPrefixPtr & log_, bool always_read_till_end_ = false); +======= + const String & req_id); +>>>>>>> 3f0dae0d3f (fix the problem that offset in limit query for tiflash system tables doesn't take effect (#6745)) String getName() const override { return NAME; } @@ -33,11 +37,19 @@ class LimitBlockInputStream : public IProfilingBlockInputStream Block readImpl() override; private: +<<<<<<< HEAD size_t limit; size_t offset; size_t pos = 0; bool always_read_till_end; LogWithPrefixPtr log; +======= + LoggerPtr log; + size_t limit; + size_t offset; + /// how many lines were read, including the last read block + size_t pos = 0; +>>>>>>> 3f0dae0d3f (fix the problem that offset in limit query for tiflash system tables doesn't take effect (#6745)) }; } // namespace DB diff --git a/dbms/src/DataStreams/LimitTransformAction.cpp b/dbms/src/DataStreams/LimitTransformAction.cpp new file mode 100644 index 00000000000..c59f22d72df --- /dev/null +++ b/dbms/src/DataStreams/LimitTransformAction.cpp @@ -0,0 +1,56 @@ +// Copyright 2023 PingCAP, Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include +#include + +namespace DB +{ +namespace +{ +// Removes all rows outside specified range of Block. +void cut(Block & block, size_t rows [[maybe_unused]], size_t limit, size_t pos) +{ + assert(rows + limit > pos); + size_t pop_back_cnt = pos - limit; + for (auto & col : block) + { + auto mutate_col = (*std::move(col.column)).mutate(); + mutate_col->popBack(pop_back_cnt); + col.column = std::move(mutate_col); + } +} +} // namespace + +bool GlobalLimitTransformAction::transform(Block & block) +{ + if (unlikely(!block)) + return true; + + /// pos - how many lines were read, including the last read block + if (pos >= limit) + return false; + + auto rows = block.rows(); + size_t prev_pos = pos.fetch_add(rows); + if (prev_pos >= limit) + return false; + + size_t cur_pos = prev_pos + rows; + if (cur_pos > limit) + cut(block, rows, limit, cur_pos); + // for pos <= limit, give away the whole block + return true; +} +} // namespace DB diff --git a/dbms/src/DataStreams/LimitTransformAction.h b/dbms/src/DataStreams/LimitTransformAction.h new file mode 100644 index 00000000000..51efe9c416d --- /dev/null +++ b/dbms/src/DataStreams/LimitTransformAction.h @@ -0,0 +1,46 @@ +// Copyright 2023 PingCAP, Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +#pragma once + +#include + +#include +#include + +namespace DB +{ +struct GlobalLimitTransformAction +{ +public: + GlobalLimitTransformAction( + const Block & header_, + size_t limit_) + : header(header_) + , limit(limit_) + { + } + + bool transform(Block & block); + + Block getHeader() const { return header; } + size_t getLimit() const { return limit; } + +private: + const Block header; + const size_t limit; + std::atomic_size_t pos{0}; +}; + +using GlobalLimitPtr = std::shared_ptr; +} // namespace DB diff --git a/dbms/src/Flash/Coprocessor/DAGQueryBlockInterpreter.cpp b/dbms/src/Flash/Coprocessor/DAGQueryBlockInterpreter.cpp index 1f24ca74af9..0fe8462b9b2 100644 --- a/dbms/src/Flash/Coprocessor/DAGQueryBlockInterpreter.cpp +++ b/dbms/src/Flash/Coprocessor/DAGQueryBlockInterpreter.cpp @@ -1187,12 +1187,21 @@ void DAGQueryBlockInterpreter::executeLimit(DAGPipeline & pipeline) if (query_block.limitOrTopN->tp() == tipb::TypeLimit) limit = query_block.limitOrTopN->limit().limit(); else +<<<<<<< HEAD limit = query_block.limitOrTopN->topn().limit(); pipeline.transform([&](auto & stream) { stream = std::make_shared(stream, limit, 0, log, false); }); if (pipeline.hasMoreThanOneStream()) { executeUnion(pipeline, max_streams, log); pipeline.transform([&](auto & stream) { stream = std::make_shared(stream, limit, 0, log, false); }); +======= + limit = query_block.limit_or_topn->topn().limit(); + pipeline.transform([&](auto & stream) { stream = std::make_shared(stream, limit, /*offset*/ 0, log->identifier()); }); + if (pipeline.hasMoreThanOneStream()) + { + executeUnion(pipeline, max_streams, log, false, "for partial limit"); + pipeline.transform([&](auto & stream) { stream = std::make_shared(stream, limit, /*offset*/ 0, log->identifier()); }); +>>>>>>> 3f0dae0d3f (fix the problem that offset in limit query for tiflash system tables doesn't take effect (#6745)) } } diff --git a/dbms/src/Flash/Planner/Plans/PhysicalLimit.cpp b/dbms/src/Flash/Planner/Plans/PhysicalLimit.cpp new file mode 100644 index 00000000000..0c0edd0e975 --- /dev/null +++ b/dbms/src/Flash/Planner/Plans/PhysicalLimit.cpp @@ -0,0 +1,74 @@ +// Copyright 2023 PingCAP, Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +namespace DB +{ +PhysicalPlanNodePtr PhysicalLimit::build( + const String & executor_id, + const LoggerPtr & log, + const tipb::Limit & limit, + const PhysicalPlanNodePtr & child) +{ + assert(child); + auto physical_limit = std::make_shared( + executor_id, + child->getSchema(), + log->identifier(), + child, + limit.limit()); + return physical_limit; +} + +void PhysicalLimit::buildBlockInputStreamImpl(DAGPipeline & pipeline, Context & context, size_t max_streams) +{ + child->buildBlockInputStream(pipeline, context, max_streams); + + pipeline.transform([&](auto & stream) { stream = std::make_shared(stream, limit, /*offset*/ 0, log->identifier()); }); + if (pipeline.hasMoreThanOneStream()) + { + executeUnion(pipeline, max_streams, log, false, "for partial limit"); + pipeline.transform([&](auto & stream) { stream = std::make_shared(stream, limit, /*offset*/ 0, log->identifier()); }); + } +} + +void PhysicalLimit::buildPipelineExec(PipelineExecGroupBuilder & group_builder, Context & /*context*/, size_t /*concurrency*/) +{ + auto input_header = group_builder.getCurrentHeader(); + auto global_limit = std::make_shared(input_header, limit); + group_builder.transform([&](auto & builder) { + builder.appendTransformOp(std::make_unique(group_builder.exec_status, global_limit, log->identifier())); + }); +} + +void PhysicalLimit::finalize(const Names & parent_require) +{ + child->finalize(parent_require); +} + +const Block & PhysicalLimit::getSampleBlock() const +{ + return child->getSampleBlock(); +} +} // namespace DB diff --git a/dbms/src/Interpreters/InterpreterSelectQuery.cpp b/dbms/src/Interpreters/InterpreterSelectQuery.cpp index 4622f92d774..710d07b3627 100644 --- a/dbms/src/Interpreters/InterpreterSelectQuery.cpp +++ b/dbms/src/Interpreters/InterpreterSelectQuery.cpp @@ -1245,10 +1245,14 @@ void InterpreterSelectQuery::executePreLimit(Pipeline & pipeline) getLimitLengthAndOffset(query, limit_length, limit_offset); /// If there is LIMIT - if (query.limit_length) + if (limit_length) { pipeline.transform([&](auto & stream) { +<<<<<<< HEAD stream = std::make_shared(stream, limit_length + limit_offset, 0, nullptr, false); +======= + stream = std::make_shared(stream, limit_length + limit_offset, /* offset */ 0, /*req_id=*/""); +>>>>>>> 3f0dae0d3f (fix the problem that offset in limit query for tiflash system tables doesn't take effect (#6745)) }); } } @@ -1303,8 +1307,9 @@ void InterpreterSelectQuery::executeLimit(Pipeline & pipeline) getLimitLengthAndOffset(query, limit_length, limit_offset); /// If there is LIMIT - if (query.limit_length) + if (limit_length) { +<<<<<<< HEAD /** Rare case: * if there is no WITH TOTALS and there is a subquery in FROM, and there is WITH TOTALS on one of the levels, * then when using LIMIT, you should read the data to the end, rather than cancel the query earlier, @@ -1324,6 +1329,11 @@ void InterpreterSelectQuery::executeLimit(Pipeline & pipeline) pipeline.transform([&](auto & stream) { stream = std::make_shared(stream, limit_length, limit_offset, nullptr, always_read_till_end); +======= + RUNTIME_CHECK_MSG(pipeline.streams.size() == 1, "Cannot executeLimit with multiple streams"); + pipeline.transform([&](auto & stream) { + stream = std::make_shared(stream, limit_length, limit_offset, /*req_id=*/""); +>>>>>>> 3f0dae0d3f (fix the problem that offset in limit query for tiflash system tables doesn't take effect (#6745)) }); } } diff --git a/dbms/src/Storages/System/StorageSystemNumbers.cpp b/dbms/src/Storages/System/StorageSystemNumbers.cpp index 97f18a3379b..cdb738c37fd 100644 --- a/dbms/src/Storages/System/StorageSystemNumbers.cpp +++ b/dbms/src/Storages/System/StorageSystemNumbers.cpp @@ -82,7 +82,11 @@ BlockInputStreams StorageSystemNumbers::read( res[i] = std::make_shared(max_block_size, i * max_block_size, num_streams * max_block_size); if (limit) /// This formula is how to split 'limit' elements to 'num_streams' chunks almost uniformly. +<<<<<<< HEAD res[i] = std::make_shared(res[i], limit * (i + 1) / num_streams - limit * i / num_streams, 0, nullptr); +======= + res[i] = std::make_shared(res[i], limit * (i + 1) / num_streams - limit * i / num_streams, /*offset*/ 0, /*req_id=*/""); +>>>>>>> 3f0dae0d3f (fix the problem that offset in limit query for tiflash system tables doesn't take effect (#6745)) } return res;