Skip to content

Commit

Permalink
This is an automated cherry-pick of pingcap#6745
Browse files Browse the repository at this point in the history
Signed-off-by: ti-chi-bot <[email protected]>
  • Loading branch information
lidezhu authored and ti-chi-bot committed Feb 7, 2023
1 parent 5b6910f commit 0ff4a26
Show file tree
Hide file tree
Showing 8 changed files with 254 additions and 2 deletions.
41 changes: 41 additions & 0 deletions dbms/src/DataStreams/LimitBlockInputStream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,19 @@ LimitBlockInputStream::LimitBlockInputStream(
const BlockInputStreamPtr & input,
size_t limit_,
size_t offset_,
<<<<<<< HEAD
const LogWithPrefixPtr & log_,
bool always_read_till_end_)
: limit(limit_)
, offset(offset_)
, always_read_till_end(always_read_till_end_)
, log(getMPPTaskLog(log_, NAME))
=======
const String & req_id)
: log(Logger::get(req_id))
, limit(limit_)
, offset(offset_)
>>>>>>> 3f0dae0d3f (fix the problem that offset in limit query for tiflash system tables doesn't take effect (#6745))
{
children.push_back(input);
}
Expand All @@ -25,8 +32,11 @@ Block LimitBlockInputStream::readImpl()
Block res;
size_t rows = 0;
<<<<<<< HEAD
/// pos - how many lines were read, including the last read block
=======
>>>>>>> 3f0dae0d3f (fix the problem that offset in limit query for tiflash system tables doesn't take effect (#6745))
if (pos >= offset + limit)
{
if (!always_read_till_end)
Expand All @@ -51,13 +61,37 @@ Block LimitBlockInputStream::readImpl()
/// give away the whole block
if (pos >= offset + rows && pos <= offset + limit)
return res;
<<<<<<< HEAD

/// give away a piece of the block
size_t start = std::max(
static_cast<Int64>(0),
static_cast<Int64>(offset) - static_cast<Int64>(pos) + static_cast<Int64>(rows));

size_t length = std::min(
=======
}

do
{
res = children.back()->read();
if (!res)
return res;
rows = res.rows();
pos += rows;
} while (pos <= offset);

/// give away the whole block
if (pos >= offset + rows && pos <= offset + limit)
return res;

/// give away a piece of the block
UInt64 start = std::max(
static_cast<Int64>(0),
static_cast<Int64>(offset) - static_cast<Int64>(pos) + static_cast<Int64>(rows));

UInt64 length = std::min(
>>>>>>> 3f0dae0d3f (fix the problem that offset in limit query for tiflash system tables doesn't take effect (#6745))
static_cast<Int64>(limit),
std::min(
static_cast<Int64>(pos) - static_cast<Int64>(offset),
Expand All @@ -69,4 +103,11 @@ Block LimitBlockInputStream::readImpl()
return res;
}
<<<<<<< HEAD
=======
void LimitBlockInputStream::appendInfo(FmtBuffer & buffer) const
{
buffer.fmtAppend(", limit = {}", limit);
}
>>>>>>> 3f0dae0d3f (fix the problem that offset in limit query for tiflash system tables doesn't take effect (#6745))
} // namespace DB
12 changes: 12 additions & 0 deletions dbms/src/DataStreams/LimitBlockInputStream.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,12 @@ class LimitBlockInputStream : public IProfilingBlockInputStream
const BlockInputStreamPtr & input,
size_t limit_,
size_t offset_,
<<<<<<< HEAD
const LogWithPrefixPtr & log_,
bool always_read_till_end_ = false);
=======
const String & req_id);
>>>>>>> 3f0dae0d3f (fix the problem that offset in limit query for tiflash system tables doesn't take effect (#6745))
String getName() const override { return NAME; }
Expand All @@ -33,11 +37,19 @@ class LimitBlockInputStream : public IProfilingBlockInputStream
Block readImpl() override;
private:
<<<<<<< HEAD
size_t limit;
size_t offset;
size_t pos = 0;
bool always_read_till_end;
LogWithPrefixPtr log;
=======
LoggerPtr log;
size_t limit;
size_t offset;
/// how many lines were read, including the last read block
size_t pos = 0;
>>>>>>> 3f0dae0d3f (fix the problem that offset in limit query for tiflash system tables doesn't take effect (#6745))
};

} // namespace DB
56 changes: 56 additions & 0 deletions dbms/src/DataStreams/LimitTransformAction.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
// Copyright 2023 PingCAP, Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include <DataStreams/LimitTransformAction.h>
#include <common/likely.h>

namespace DB
{
namespace
{
// Removes all rows outside specified range of Block.
void cut(Block & block, size_t rows [[maybe_unused]], size_t limit, size_t pos)
{
assert(rows + limit > pos);
size_t pop_back_cnt = pos - limit;
for (auto & col : block)
{
auto mutate_col = (*std::move(col.column)).mutate();
mutate_col->popBack(pop_back_cnt);
col.column = std::move(mutate_col);
}
}
} // namespace

bool GlobalLimitTransformAction::transform(Block & block)
{
if (unlikely(!block))
return true;

/// pos - how many lines were read, including the last read block
if (pos >= limit)
return false;

auto rows = block.rows();
size_t prev_pos = pos.fetch_add(rows);
if (prev_pos >= limit)
return false;

size_t cur_pos = prev_pos + rows;
if (cur_pos > limit)
cut(block, rows, limit, cur_pos);
// for pos <= limit, give away the whole block
return true;
}
} // namespace DB
46 changes: 46 additions & 0 deletions dbms/src/DataStreams/LimitTransformAction.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
// Copyright 2023 PingCAP, Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#pragma once

#include <Core/Block.h>

#include <atomic>
#include <memory>

namespace DB
{
struct GlobalLimitTransformAction
{
public:
GlobalLimitTransformAction(
const Block & header_,
size_t limit_)
: header(header_)
, limit(limit_)
{
}

bool transform(Block & block);

Block getHeader() const { return header; }
size_t getLimit() const { return limit; }

private:
const Block header;
const size_t limit;
std::atomic_size_t pos{0};
};

using GlobalLimitPtr = std::shared_ptr<GlobalLimitTransformAction>;
} // namespace DB
9 changes: 9 additions & 0 deletions dbms/src/Flash/Coprocessor/DAGQueryBlockInterpreter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1187,12 +1187,21 @@ void DAGQueryBlockInterpreter::executeLimit(DAGPipeline & pipeline)
if (query_block.limitOrTopN->tp() == tipb::TypeLimit)
limit = query_block.limitOrTopN->limit().limit();
else
<<<<<<< HEAD
limit = query_block.limitOrTopN->topn().limit();
pipeline.transform([&](auto & stream) { stream = std::make_shared<LimitBlockInputStream>(stream, limit, 0, log, false); });
if (pipeline.hasMoreThanOneStream())
{
executeUnion(pipeline, max_streams, log);
pipeline.transform([&](auto & stream) { stream = std::make_shared<LimitBlockInputStream>(stream, limit, 0, log, false); });
=======
limit = query_block.limit_or_topn->topn().limit();
pipeline.transform([&](auto & stream) { stream = std::make_shared<LimitBlockInputStream>(stream, limit, /*offset*/ 0, log->identifier()); });
if (pipeline.hasMoreThanOneStream())
{
executeUnion(pipeline, max_streams, log, false, "for partial limit");
pipeline.transform([&](auto & stream) { stream = std::make_shared<LimitBlockInputStream>(stream, limit, /*offset*/ 0, log->identifier()); });
>>>>>>> 3f0dae0d3f (fix the problem that offset in limit query for tiflash system tables doesn't take effect (#6745))
}
}
Expand Down
74 changes: 74 additions & 0 deletions dbms/src/Flash/Planner/Plans/PhysicalLimit.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
// Copyright 2023 PingCAP, Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include <Common/Logger.h>
#include <DataStreams/LimitBlockInputStream.h>
#include <DataStreams/LimitTransformAction.h>
#include <Flash/Coprocessor/DAGContext.h>
#include <Flash/Coprocessor/DAGPipeline.h>
#include <Flash/Coprocessor/InterpreterUtils.h>
#include <Flash/Pipeline/Exec/PipelineExecBuilder.h>
#include <Flash/Planner/Plans/PhysicalLimit.h>
#include <Interpreters/Context.h>
#include <Operators/LimitTransformOp.h>

namespace DB
{
PhysicalPlanNodePtr PhysicalLimit::build(
const String & executor_id,
const LoggerPtr & log,
const tipb::Limit & limit,
const PhysicalPlanNodePtr & child)
{
assert(child);
auto physical_limit = std::make_shared<PhysicalLimit>(
executor_id,
child->getSchema(),
log->identifier(),
child,
limit.limit());
return physical_limit;
}

void PhysicalLimit::buildBlockInputStreamImpl(DAGPipeline & pipeline, Context & context, size_t max_streams)
{
child->buildBlockInputStream(pipeline, context, max_streams);

pipeline.transform([&](auto & stream) { stream = std::make_shared<LimitBlockInputStream>(stream, limit, /*offset*/ 0, log->identifier()); });
if (pipeline.hasMoreThanOneStream())
{
executeUnion(pipeline, max_streams, log, false, "for partial limit");
pipeline.transform([&](auto & stream) { stream = std::make_shared<LimitBlockInputStream>(stream, limit, /*offset*/ 0, log->identifier()); });
}
}

void PhysicalLimit::buildPipelineExec(PipelineExecGroupBuilder & group_builder, Context & /*context*/, size_t /*concurrency*/)
{
auto input_header = group_builder.getCurrentHeader();
auto global_limit = std::make_shared<GlobalLimitTransformAction>(input_header, limit);
group_builder.transform([&](auto & builder) {
builder.appendTransformOp(std::make_unique<LimitTransformOp>(group_builder.exec_status, global_limit, log->identifier()));
});
}

void PhysicalLimit::finalize(const Names & parent_require)
{
child->finalize(parent_require);
}

const Block & PhysicalLimit::getSampleBlock() const
{
return child->getSampleBlock();
}
} // namespace DB
14 changes: 12 additions & 2 deletions dbms/src/Interpreters/InterpreterSelectQuery.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1245,10 +1245,14 @@ void InterpreterSelectQuery::executePreLimit(Pipeline & pipeline)
getLimitLengthAndOffset(query, limit_length, limit_offset);

/// If there is LIMIT
if (query.limit_length)
if (limit_length)
{
pipeline.transform([&](auto & stream) {
<<<<<<< HEAD
stream = std::make_shared<LimitBlockInputStream>(stream, limit_length + limit_offset, 0, nullptr, false);
=======
stream = std::make_shared<LimitBlockInputStream>(stream, limit_length + limit_offset, /* offset */ 0, /*req_id=*/"");
>>>>>>> 3f0dae0d3f (fix the problem that offset in limit query for tiflash system tables doesn't take effect (#6745))
});
}
}
Expand Down Expand Up @@ -1303,8 +1307,9 @@ void InterpreterSelectQuery::executeLimit(Pipeline & pipeline)
getLimitLengthAndOffset(query, limit_length, limit_offset);
/// If there is LIMIT
if (query.limit_length)
if (limit_length)
{
<<<<<<< HEAD
/** Rare case:
* if there is no WITH TOTALS and there is a subquery in FROM, and there is WITH TOTALS on one of the levels,
* then when using LIMIT, you should read the data to the end, rather than cancel the query earlier,
Expand All @@ -1324,6 +1329,11 @@ void InterpreterSelectQuery::executeLimit(Pipeline & pipeline)
pipeline.transform([&](auto & stream) {
stream = std::make_shared<LimitBlockInputStream>(stream, limit_length, limit_offset, nullptr, always_read_till_end);
=======
RUNTIME_CHECK_MSG(pipeline.streams.size() == 1, "Cannot executeLimit with multiple streams");
pipeline.transform([&](auto & stream) {
stream = std::make_shared<LimitBlockInputStream>(stream, limit_length, limit_offset, /*req_id=*/"");
>>>>>>> 3f0dae0d3f (fix the problem that offset in limit query for tiflash system tables doesn't take effect (#6745))
});
}
}
Expand Down
4 changes: 4 additions & 0 deletions dbms/src/Storages/System/StorageSystemNumbers.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,11 @@ BlockInputStreams StorageSystemNumbers::read(
res[i] = std::make_shared<NumbersBlockInputStream>(max_block_size, i * max_block_size, num_streams * max_block_size);

if (limit) /// This formula is how to split 'limit' elements to 'num_streams' chunks almost uniformly.
<<<<<<< HEAD
res[i] = std::make_shared<LimitBlockInputStream>(res[i], limit * (i + 1) / num_streams - limit * i / num_streams, 0, nullptr);
=======
res[i] = std::make_shared<LimitBlockInputStream>(res[i], limit * (i + 1) / num_streams - limit * i / num_streams, /*offset*/ 0, /*req_id=*/"");
>>>>>>> 3f0dae0d3f (fix the problem that offset in limit query for tiflash system tables doesn't take effect (#6745))
}
return res;
Expand Down

0 comments on commit 0ff4a26

Please sign in to comment.