diff --git a/dbms/src/DataStreams/LimitBlockInputStream.cpp b/dbms/src/DataStreams/LimitBlockInputStream.cpp index 13ed3d25929..c631d457ab3 100644 --- a/dbms/src/DataStreams/LimitBlockInputStream.cpp +++ b/dbms/src/DataStreams/LimitBlockInputStream.cpp @@ -22,9 +22,11 @@ namespace DB LimitBlockInputStream::LimitBlockInputStream( const BlockInputStreamPtr & input, size_t limit_, + size_t offset_, const String & req_id) : log(Logger::get(req_id)) - , action(input->getHeader(), limit_) + , limit(limit_) + , offset(offset_) { children.push_back(input); } @@ -32,20 +34,46 @@ LimitBlockInputStream::LimitBlockInputStream( Block LimitBlockInputStream::readImpl() { - Block res = children.back()->read(); + Block res; + size_t rows = 0; - if (action.transform(res)) + if (pos >= offset + limit) { return res; } - else + + do { - return {}; - } + res = children.back()->read(); + if (!res) + return res; + rows = res.rows(); + pos += rows; + } while (pos <= offset); + + /// give away the whole block + if (pos >= offset + rows && pos <= offset + limit) + return res; + + /// give away a piece of the block + UInt64 start = std::max( + static_cast(0), + static_cast(offset) - static_cast(pos) + static_cast(rows)); + + UInt64 length = std::min( + static_cast(limit), + std::min( + static_cast(pos) - static_cast(offset), + static_cast(limit) + static_cast(offset) - static_cast(pos) + static_cast(rows))); + + for (size_t i = 0; i < res.columns(); ++i) + res.safeGetByPosition(i).column = res.safeGetByPosition(i).column->cut(start, length); + + return res; } void LimitBlockInputStream::appendInfo(FmtBuffer & buffer) const { - buffer.fmtAppend(", limit = {}", action.getLimit()); + buffer.fmtAppend(", limit = {}", limit); } } // namespace DB diff --git a/dbms/src/DataStreams/LimitBlockInputStream.h b/dbms/src/DataStreams/LimitBlockInputStream.h index c749ef30800..61d08ff2223 100644 --- a/dbms/src/DataStreams/LimitBlockInputStream.h +++ b/dbms/src/DataStreams/LimitBlockInputStream.h @@ -34,6 +34,7 @@ class LimitBlockInputStream : public IProfilingBlockInputStream LimitBlockInputStream( const BlockInputStreamPtr & input, size_t limit_, + size_t offset_, const String & req_id); String getName() const override { return NAME; } @@ -46,7 +47,10 @@ class LimitBlockInputStream : public IProfilingBlockInputStream private: LoggerPtr log; - LocalLimitTransformAction action; + size_t limit; + size_t offset; + /// how many lines were read, including the last read block + size_t pos = 0; }; } // namespace DB diff --git a/dbms/src/DataStreams/LimitTransformAction.cpp b/dbms/src/DataStreams/LimitTransformAction.cpp index 1fe4d06e520..c59f22d72df 100644 --- a/dbms/src/DataStreams/LimitTransformAction.cpp +++ b/dbms/src/DataStreams/LimitTransformAction.cpp @@ -33,23 +33,6 @@ void cut(Block & block, size_t rows [[maybe_unused]], size_t limit, size_t pos) } } // namespace -bool LocalLimitTransformAction::transform(Block & block) -{ - if (unlikely(!block)) - return true; - - /// pos - how many lines were read, including the last read block - if (pos >= limit) - return false; - - auto rows = block.rows(); - pos += rows; - if (pos > limit) - cut(block, rows, limit, pos); - // for pos <= limit, give away the whole block - return true; -} - bool GlobalLimitTransformAction::transform(Block & block) { if (unlikely(!block)) diff --git a/dbms/src/DataStreams/LimitTransformAction.h b/dbms/src/DataStreams/LimitTransformAction.h index e158f826c4a..51efe9c416d 100644 --- a/dbms/src/DataStreams/LimitTransformAction.h +++ b/dbms/src/DataStreams/LimitTransformAction.h @@ -20,28 +20,6 @@ namespace DB { -struct LocalLimitTransformAction -{ -public: - LocalLimitTransformAction( - const Block & header_, - size_t limit_) - : header(header_) - , limit(limit_) - { - } - - bool transform(Block & block); - - Block getHeader() const { return header; } - size_t getLimit() const { return limit; } - -private: - const Block header; - const size_t limit; - size_t pos = 0; -}; - struct GlobalLimitTransformAction { public: diff --git a/dbms/src/Flash/Coprocessor/DAGQueryBlockInterpreter.cpp b/dbms/src/Flash/Coprocessor/DAGQueryBlockInterpreter.cpp index 2e0a54a3e4f..9fcc310d1bc 100644 --- a/dbms/src/Flash/Coprocessor/DAGQueryBlockInterpreter.cpp +++ b/dbms/src/Flash/Coprocessor/DAGQueryBlockInterpreter.cpp @@ -712,11 +712,11 @@ void DAGQueryBlockInterpreter::executeLimit(DAGPipeline & pipeline) limit = query_block.limit_or_topn->limit().limit(); else limit = query_block.limit_or_topn->topn().limit(); - pipeline.transform([&](auto & stream) { stream = std::make_shared(stream, limit, log->identifier()); }); + pipeline.transform([&](auto & stream) { stream = std::make_shared(stream, limit, /*offset*/ 0, log->identifier()); }); if (pipeline.hasMoreThanOneStream()) { executeUnion(pipeline, max_streams, log, false, "for partial limit"); - pipeline.transform([&](auto & stream) { stream = std::make_shared(stream, limit, log->identifier()); }); + pipeline.transform([&](auto & stream) { stream = std::make_shared(stream, limit, /*offset*/ 0, log->identifier()); }); } } diff --git a/dbms/src/Flash/Planner/Plans/PhysicalLimit.cpp b/dbms/src/Flash/Planner/Plans/PhysicalLimit.cpp index 0314d79ea3f..0c0edd0e975 100644 --- a/dbms/src/Flash/Planner/Plans/PhysicalLimit.cpp +++ b/dbms/src/Flash/Planner/Plans/PhysicalLimit.cpp @@ -45,11 +45,11 @@ void PhysicalLimit::buildBlockInputStreamImpl(DAGPipeline & pipeline, Context & { child->buildBlockInputStream(pipeline, context, max_streams); - pipeline.transform([&](auto & stream) { stream = std::make_shared(stream, limit, log->identifier()); }); + pipeline.transform([&](auto & stream) { stream = std::make_shared(stream, limit, /*offset*/ 0, log->identifier()); }); if (pipeline.hasMoreThanOneStream()) { executeUnion(pipeline, max_streams, log, false, "for partial limit"); - pipeline.transform([&](auto & stream) { stream = std::make_shared(stream, limit, log->identifier()); }); + pipeline.transform([&](auto & stream) { stream = std::make_shared(stream, limit, /*offset*/ 0, log->identifier()); }); } } diff --git a/dbms/src/Interpreters/InterpreterSelectQuery.cpp b/dbms/src/Interpreters/InterpreterSelectQuery.cpp index 35d71695914..474e979ac4f 100644 --- a/dbms/src/Interpreters/InterpreterSelectQuery.cpp +++ b/dbms/src/Interpreters/InterpreterSelectQuery.cpp @@ -1196,10 +1196,10 @@ void InterpreterSelectQuery::executePreLimit(Pipeline & pipeline) getLimitLengthAndOffset(query, limit_length, limit_offset); /// If there is LIMIT - if (query.limit_length) + if (limit_length) { pipeline.transform([&](auto & stream) { - stream = std::make_shared(stream, limit_length + limit_offset, /*req_id=*/""); + stream = std::make_shared(stream, limit_length + limit_offset, /* offset */ 0, /*req_id=*/""); }); } } @@ -1237,10 +1237,11 @@ void InterpreterSelectQuery::executeLimit(Pipeline & pipeline) getLimitLengthAndOffset(query, limit_length, limit_offset); /// If there is LIMIT - if (query.limit_length) + if (limit_length) { + RUNTIME_CHECK_MSG(pipeline.streams.size() == 1, "Cannot executeLimit with multiple streams"); pipeline.transform([&](auto & stream) { - stream = std::make_shared(stream, limit_length, /*req_id=*/""); + stream = std::make_shared(stream, limit_length, limit_offset, /*req_id=*/""); }); } } diff --git a/dbms/src/Storages/System/StorageSystemNumbers.cpp b/dbms/src/Storages/System/StorageSystemNumbers.cpp index 68130897963..016b90b5cc5 100644 --- a/dbms/src/Storages/System/StorageSystemNumbers.cpp +++ b/dbms/src/Storages/System/StorageSystemNumbers.cpp @@ -96,7 +96,7 @@ BlockInputStreams StorageSystemNumbers::read( res[i] = std::make_shared(max_block_size, i * max_block_size, num_streams * max_block_size); if (limit) /// This formula is how to split 'limit' elements to 'num_streams' chunks almost uniformly. - res[i] = std::make_shared(res[i], limit * (i + 1) / num_streams - limit * i / num_streams, /*req_id=*/""); + res[i] = std::make_shared(res[i], limit * (i + 1) / num_streams - limit * i / num_streams, /*offset*/ 0, /*req_id=*/""); } return res;