Skip to content

Commit

Permalink
fix the problem that offset in limit query for tiflash system tables …
Browse files Browse the repository at this point in the history
…doesn't take effect (pingcap#6745)

* respect offset in LimitBlockInputStream

* remove LimitTransformAction

* fix unit test

Signed-off-by: ywqzzy <[email protected]>
  • Loading branch information
lidezhu authored and ywqzzy committed Feb 13, 2023
1 parent dea4761 commit 6882e35
Show file tree
Hide file tree
Showing 8 changed files with 50 additions and 56 deletions.
42 changes: 35 additions & 7 deletions dbms/src/DataStreams/LimitBlockInputStream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,30 +22,58 @@ namespace DB
LimitBlockInputStream::LimitBlockInputStream(
const BlockInputStreamPtr & input,
size_t limit_,
size_t offset_,
const String & req_id)
: log(Logger::get(req_id))
, action(input->getHeader(), limit_)
, limit(limit_)
, offset(offset_)
{
children.push_back(input);
}


Block LimitBlockInputStream::readImpl()
{
Block res = children.back()->read();
Block res;
size_t rows = 0;

if (action.transform(res))
if (pos >= offset + limit)
{
return res;
}
else

do
{
return {};
}
res = children.back()->read();
if (!res)
return res;
rows = res.rows();
pos += rows;
} while (pos <= offset);

/// give away the whole block
if (pos >= offset + rows && pos <= offset + limit)
return res;

/// give away a piece of the block
UInt64 start = std::max(
static_cast<Int64>(0),
static_cast<Int64>(offset) - static_cast<Int64>(pos) + static_cast<Int64>(rows));

UInt64 length = std::min(
static_cast<Int64>(limit),
std::min(
static_cast<Int64>(pos) - static_cast<Int64>(offset),
static_cast<Int64>(limit) + static_cast<Int64>(offset) - static_cast<Int64>(pos) + static_cast<Int64>(rows)));

for (size_t i = 0; i < res.columns(); ++i)
res.safeGetByPosition(i).column = res.safeGetByPosition(i).column->cut(start, length);

return res;
}

void LimitBlockInputStream::appendInfo(FmtBuffer & buffer) const
{
buffer.fmtAppend(", limit = {}", action.getLimit());
buffer.fmtAppend(", limit = {}", limit);
}
} // namespace DB
6 changes: 5 additions & 1 deletion dbms/src/DataStreams/LimitBlockInputStream.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ class LimitBlockInputStream : public IProfilingBlockInputStream
LimitBlockInputStream(
const BlockInputStreamPtr & input,
size_t limit_,
size_t offset_,
const String & req_id);

String getName() const override { return NAME; }
Expand All @@ -46,7 +47,10 @@ class LimitBlockInputStream : public IProfilingBlockInputStream

private:
LoggerPtr log;
LocalLimitTransformAction action;
size_t limit;
size_t offset;
/// how many lines were read, including the last read block
size_t pos = 0;
};

} // namespace DB
17 changes: 0 additions & 17 deletions dbms/src/DataStreams/LimitTransformAction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,23 +33,6 @@ void cut(Block & block, size_t rows [[maybe_unused]], size_t limit, size_t pos)
}
} // namespace

bool LocalLimitTransformAction::transform(Block & block)
{
if (unlikely(!block))
return true;

/// pos - how many lines were read, including the last read block
if (pos >= limit)
return false;

auto rows = block.rows();
pos += rows;
if (pos > limit)
cut(block, rows, limit, pos);
// for pos <= limit, give away the whole block
return true;
}

bool GlobalLimitTransformAction::transform(Block & block)
{
if (unlikely(!block))
Expand Down
22 changes: 0 additions & 22 deletions dbms/src/DataStreams/LimitTransformAction.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,28 +20,6 @@

namespace DB
{
struct LocalLimitTransformAction
{
public:
LocalLimitTransformAction(
const Block & header_,
size_t limit_)
: header(header_)
, limit(limit_)
{
}

bool transform(Block & block);

Block getHeader() const { return header; }
size_t getLimit() const { return limit; }

private:
const Block header;
const size_t limit;
size_t pos = 0;
};

struct GlobalLimitTransformAction
{
public:
Expand Down
4 changes: 2 additions & 2 deletions dbms/src/Flash/Coprocessor/DAGQueryBlockInterpreter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -712,11 +712,11 @@ void DAGQueryBlockInterpreter::executeLimit(DAGPipeline & pipeline)
limit = query_block.limit_or_topn->limit().limit();
else
limit = query_block.limit_or_topn->topn().limit();
pipeline.transform([&](auto & stream) { stream = std::make_shared<LimitBlockInputStream>(stream, limit, log->identifier()); });
pipeline.transform([&](auto & stream) { stream = std::make_shared<LimitBlockInputStream>(stream, limit, /*offset*/ 0, log->identifier()); });
if (pipeline.hasMoreThanOneStream())
{
executeUnion(pipeline, max_streams, log, false, "for partial limit");
pipeline.transform([&](auto & stream) { stream = std::make_shared<LimitBlockInputStream>(stream, limit, log->identifier()); });
pipeline.transform([&](auto & stream) { stream = std::make_shared<LimitBlockInputStream>(stream, limit, /*offset*/ 0, log->identifier()); });
}
}

Expand Down
4 changes: 2 additions & 2 deletions dbms/src/Flash/Planner/Plans/PhysicalLimit.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,11 +45,11 @@ void PhysicalLimit::buildBlockInputStreamImpl(DAGPipeline & pipeline, Context &
{
child->buildBlockInputStream(pipeline, context, max_streams);

pipeline.transform([&](auto & stream) { stream = std::make_shared<LimitBlockInputStream>(stream, limit, log->identifier()); });
pipeline.transform([&](auto & stream) { stream = std::make_shared<LimitBlockInputStream>(stream, limit, /*offset*/ 0, log->identifier()); });
if (pipeline.hasMoreThanOneStream())
{
executeUnion(pipeline, max_streams, log, false, "for partial limit");
pipeline.transform([&](auto & stream) { stream = std::make_shared<LimitBlockInputStream>(stream, limit, log->identifier()); });
pipeline.transform([&](auto & stream) { stream = std::make_shared<LimitBlockInputStream>(stream, limit, /*offset*/ 0, log->identifier()); });
}
}

Expand Down
9 changes: 5 additions & 4 deletions dbms/src/Interpreters/InterpreterSelectQuery.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1196,10 +1196,10 @@ void InterpreterSelectQuery::executePreLimit(Pipeline & pipeline)
getLimitLengthAndOffset(query, limit_length, limit_offset);

/// If there is LIMIT
if (query.limit_length)
if (limit_length)
{
pipeline.transform([&](auto & stream) {
stream = std::make_shared<LimitBlockInputStream>(stream, limit_length + limit_offset, /*req_id=*/"");
stream = std::make_shared<LimitBlockInputStream>(stream, limit_length + limit_offset, /* offset */ 0, /*req_id=*/"");
});
}
}
Expand Down Expand Up @@ -1237,10 +1237,11 @@ void InterpreterSelectQuery::executeLimit(Pipeline & pipeline)
getLimitLengthAndOffset(query, limit_length, limit_offset);

/// If there is LIMIT
if (query.limit_length)
if (limit_length)
{
RUNTIME_CHECK_MSG(pipeline.streams.size() == 1, "Cannot executeLimit with multiple streams");
pipeline.transform([&](auto & stream) {
stream = std::make_shared<LimitBlockInputStream>(stream, limit_length, /*req_id=*/"");
stream = std::make_shared<LimitBlockInputStream>(stream, limit_length, limit_offset, /*req_id=*/"");
});
}
}
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Storages/System/StorageSystemNumbers.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ BlockInputStreams StorageSystemNumbers::read(
res[i] = std::make_shared<NumbersBlockInputStream>(max_block_size, i * max_block_size, num_streams * max_block_size);

if (limit) /// This formula is how to split 'limit' elements to 'num_streams' chunks almost uniformly.
res[i] = std::make_shared<LimitBlockInputStream>(res[i], limit * (i + 1) / num_streams - limit * i / num_streams, /*req_id=*/"");
res[i] = std::make_shared<LimitBlockInputStream>(res[i], limit * (i + 1) / num_streams - limit * i / num_streams, /*offset*/ 0, /*req_id=*/"");
}

return res;
Expand Down

0 comments on commit 6882e35

Please sign in to comment.