Skip to content

Commit

Permalink
Support window Function lead/lag (#5671)
Browse files Browse the repository at this point in the history
close #5579
  • Loading branch information
SeaRise authored Sep 2, 2022
1 parent 47a4ada commit e9f3239
Show file tree
Hide file tree
Showing 23 changed files with 1,392 additions and 181 deletions.
122 changes: 102 additions & 20 deletions dbms/src/DataStreams/WindowBlockInputStream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,8 @@
// See the License for the specific language governing permissions and
// limitations under the License.

#include <AggregateFunctions/AggregateFunctionFactory.h>
#include <Common/Arena.h>
#include <DataStreams/WindowBlockInputStream.h>
#include <Interpreters/ExpressionActions.h>
#include <Interpreters/WindowDescription.h>
#include <Interpreters/convertFieldToType.h>

namespace DB
{
Expand Down Expand Up @@ -70,6 +66,7 @@ void WindowBlockInputStream::initialWorkspaces()
{
WindowFunctionWorkspace workspace;
workspace.window_function = window_function_description.window_function;
workspace.arguments = window_function_description.arguments;
workspaces.push_back(std::move(workspace));
}
only_have_row_number = onlyHaveRowNumber();
Expand Down Expand Up @@ -239,15 +236,29 @@ void WindowBlockInputStream::advanceFrameStart()
return;
}

if (only_have_pure_window)
switch (window_description.frame.begin_type)
{
case WindowFrame::BoundaryType::Unbounded:
// UNBOUNDED PRECEDING, just mark it valid. It is initialized when
// the new partition starts.
frame_started = true;
break;
case WindowFrame::BoundaryType::Current:
{
RUNTIME_CHECK_MSG(
only_have_pure_window,
"window function only support pure window function in WindowFrame::BoundaryType::Current now.");
frame_start = current_row;
frame_started = true;
return;
break;
}
case WindowFrame::BoundaryType::Offset:
default:
throw Exception(
ErrorCodes::NOT_IMPLEMENTED,
"The frame begin type '{}' is not implemented",
window_description.frame.begin_type);
}

throw Exception(ErrorCodes::NOT_IMPLEMENTED,
"window function only support pure window function now.");
}

bool WindowBlockInputStream::arePeers(const RowNumber & x, const RowNumber & y) const
Expand Down Expand Up @@ -307,16 +318,12 @@ void WindowBlockInputStream::advanceFrameEndCurrentRow()
|| frame_end.block + 1 == partition_end.block);

// If window only have row_number or rank/dense_rank functions, set frame_end to the next row of current_row and frame_ended to true
if (only_have_pure_window)
{
frame_end = current_row;
advanceRowNumber(frame_end);
frame_ended = true;
return;
}

throw Exception(ErrorCodes::NOT_IMPLEMENTED,
"window function only support pure window function now.");
RUNTIME_CHECK(
only_have_pure_window,
"window function only support pure window function in WindowFrame::BoundaryType::Current now.");
frame_end = current_row;
advanceRowNumber(frame_end);
frame_ended = true;
}

void WindowBlockInputStream::advanceFrameEnd()
Expand All @@ -339,6 +346,12 @@ void WindowBlockInputStream::advanceFrameEnd()
advanceFrameEndCurrentRow();
break;
case WindowFrame::BoundaryType::Unbounded:
{
// The UNBOUNDED FOLLOWING frame ends when the partition ends.
frame_end = partition_end;
frame_ended = partition_ended;
break;
}
case WindowFrame::BoundaryType::Offset:
default:
throw Exception(ErrorCodes::NOT_IMPLEMENTED,
Expand All @@ -355,7 +368,7 @@ void WindowBlockInputStream::writeOutCurrentRow()
for (size_t wi = 0; wi < workspaces.size(); ++wi)
{
auto & ws = workspaces[wi];
ws.window_function->windowInsertResultInto(this->shared_from_this(), wi);
ws.window_function->windowInsertResultInto(this->shared_from_this(), wi, ws.arguments);
}
}

Expand Down Expand Up @@ -527,6 +540,7 @@ void WindowBlockInputStream::tryCalculate()
assert(frame_start <= frame_end);

// Write out the results.
// TODO execute the window function by block instead of row.
writeOutCurrentRow();

prev_frame_start = frame_start;
Expand Down Expand Up @@ -592,4 +606,72 @@ void WindowBlockInputStream::appendInfo(FmtBuffer & buffer) const
boundaryTypeToString(window_description.frame.begin_type),
boundaryTypeToString(window_description.frame.end_type));
}

void WindowBlockInputStream::advanceRowNumber(RowNumber & x) const
{
assert(x.block >= first_block_number);
assert(x.block - first_block_number < window_blocks.size());

const auto block_rows = blockAt(x).rows;
assert(x.row < block_rows);

++x.row;
if (x.row < block_rows)
{
return;
}

x.row = 0;
++x.block;
}

bool WindowBlockInputStream::lead(RowNumber & x, size_t offset) const
{
assert(frame_started);
assert(frame_ended);
assert(frame_start <= frame_end);

assert(x.block >= first_block_number);
assert(x.block - first_block_number < window_blocks.size());

const auto block_rows = blockAt(x).rows;
assert(x.row < block_rows);

x.row += offset;
if (x.row < block_rows)
{
return x < frame_end;
}

++x.block;
if (x.block - first_block_number == window_blocks.size())
return false;
size_t new_offset = x.row - block_rows;
x.row = 0;
return lead(x, new_offset);
}

bool WindowBlockInputStream::lag(RowNumber & x, size_t offset) const
{
assert(frame_started);
assert(frame_ended);
assert(frame_start <= frame_end);

assert(x.block >= first_block_number);
assert(x.block - first_block_number < window_blocks.size());

if (x.row >= offset)
{
x.row -= offset;
return frame_start <= x;
}

if (x.block <= first_block_number)
return false;

--x.block;
size_t new_offset = offset - x.row - 1;
x.row = blockAt(x.block).rows - 1;
return lag(x, new_offset);
}
} // namespace DB
29 changes: 11 additions & 18 deletions dbms/src/DataStreams/WindowBlockInputStream.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@
#pragma once

#include <Common/FmtUtils.h>
#include <Core/ColumnNumbers.h>
#include <DataStreams/IProfilingBlockInputStream.h>
#include <Interpreters/AggregateDescription.h>
#include <Interpreters/WindowDescription.h>

#include <deque>
Expand All @@ -29,6 +29,8 @@ struct WindowFunctionWorkspace
{
// TODO add aggregation function
WindowFunctionPtr window_function = nullptr;

ColumnNumbers arguments;
};

struct WindowBlock
Expand Down Expand Up @@ -59,6 +61,11 @@ struct RowNumber
{
return *this < other || *this == other;
}

String toString() const
{
return fmt::format("[block={},row={}]", block, row);
}
};

class WindowBlockInputStream : public IProfilingBlockInputStream
Expand Down Expand Up @@ -137,23 +144,11 @@ class WindowBlockInputStream : public IProfilingBlockInputStream
return window_blocks[x.block - first_block_number].output_columns;
}

void advanceRowNumber(RowNumber & x) const
{
assert(x.block >= first_block_number);
assert(x.block - first_block_number < window_blocks.size());

const auto block_rows = blockAt(x).rows;
assert(x.row < block_rows);
void advanceRowNumber(RowNumber & x) const;

++x.row;
if (x.row < block_rows)
{
return;
}
bool lead(RowNumber & x, size_t offset) const;

x.row = 0;
++x.block;
}
bool lag(RowNumber & x, size_t offset) const;

RowNumber blocksEnd() const
{
Expand Down Expand Up @@ -189,8 +184,6 @@ class WindowBlockInputStream : public IProfilingBlockInputStream
// Per-window-function scratch spaces.
std::vector<WindowFunctionWorkspace> workspaces;

std::unique_ptr<Arena> arena;

// A sliding window of blocks we currently need. We add the input blocks as
// they arrive, and discard the blocks we don't need anymore. The blocks
// have an always-incrementing index. The index of the first block is in
Expand Down
62 changes: 56 additions & 6 deletions dbms/src/Debug/astToExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,8 @@ std::unordered_map<String, tipb::ExprType> window_func_name_to_sig({
{"RowNumber", tipb::ExprType::RowNumber},
{"Rank", tipb::ExprType::Rank},
{"DenseRank", tipb::ExprType::DenseRank},
{"Lead", tipb::ExprType::Lead},
{"Lag", tipb::ExprType::Lag},
});

DAGColumnInfo toNullableDAGColumnInfo(const DAGColumnInfo & input)
Expand Down Expand Up @@ -1433,12 +1435,42 @@ bool Window::toTiPBExecutor(tipb::Executor * tipb_executor, int32_t collator_id,
auto window_sig = window_sig_it->second;
window_expr->set_tp(window_sig);
auto * ft = window_expr->mutable_field_type();
// TODO: Maybe more window functions with different field type.
ft->set_tp(TiDB::TypeLongLong);
ft->set_flag(TiDB::ColumnFlagBinary);
ft->set_collate(collator_id);
ft->set_flen(21);
ft->set_decimal(-1);
switch (window_sig)
{
case tipb::ExprType::Lead:
case tipb::ExprType::Lag:
{
// TODO handling complex situations
// like lead(col, offset, NULL), lead(data_type1, offset, data_type2)
assert(window_expr->children_size() >= 1 && window_expr->children_size() <= 3);
const auto first_arg_type = window_expr->children(0).field_type();
ft->set_tp(first_arg_type.tp());
if (window_expr->children_size() < 3)
{
auto field_type = TiDB::fieldTypeToColumnInfo(first_arg_type);
field_type.clearNotNullFlag();
ft->set_flag(field_type.flag);
}
else
{
const auto third_arg_type = window_expr->children(2).field_type();
assert(first_arg_type.tp() == third_arg_type.tp());
ft->set_flag(TiDB::fieldTypeToColumnInfo(first_arg_type).hasNotNullFlag()
? third_arg_type.flag()
: first_arg_type.flag());
}
ft->set_collate(first_arg_type.collate());
ft->set_flen(first_arg_type.flen());
ft->set_decimal(first_arg_type.decimal());
break;
}
default:
ft->set_tp(TiDB::TypeLongLong);
ft->set_flag(TiDB::ColumnFlagBinary);
ft->set_collate(collator_id);
ft->set_flen(21);
ft->set_decimal(-1);
}
}

for (const auto & child : order_by_exprs)
Expand Down Expand Up @@ -1859,6 +1891,24 @@ ExecutorPtr compileWindow(ExecutorPtr input, size_t & executor_index, ASTPtr fun
ci.flag = TiDB::ColumnFlagBinary;
break;
}
case tipb::ExprType::Lead:
case tipb::ExprType::Lag:
{
// TODO handling complex situations
// like lead(col, offset, NULL), lead(data_type1, offset, data_type2)
assert(children_ci.size() >= 1 && children_ci.size() <= 3);
if (children_ci.size() < 3)
{
ci = children_ci[0];
ci.clearNotNullFlag();
}
else
{
assert(children_ci[0].tp == children_ci[2].tp);
ci = children_ci[0].hasNotNullFlag() ? children_ci[2] : children_ci[0];
}
break;
}
default:
throw Exception(fmt::format("Unsupported window function {}", func->name), ErrorCodes::LOGICAL_ERROR);
}
Expand Down
Loading

0 comments on commit e9f3239

Please sign in to comment.