Skip to content

Commit

Permalink
Issue duckdb#12600: Streaming Positive LEAD
Browse files Browse the repository at this point in the history
PR Feedback:
* Avoid copying

fixes: duckdb#12600
fixes: duckdblabs/duckdb-internal#2342
  • Loading branch information
Richard Wesley committed Jun 28, 2024
1 parent 7494f36 commit 10d6bfc
Show file tree
Hide file tree
Showing 2 changed files with 79 additions and 119 deletions.
191 changes: 73 additions & 118 deletions src/execution/operator/aggregate/physical_streaming_window.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -470,77 +470,16 @@ void StreamingWindowState::AggregateState::Execute(ExecutionContext &context, Da
}
}

OperatorResultType PhysicalStreamingWindow::Execute(ExecutionContext &context, DataChunk &input, DataChunk &chunk,
GlobalOperatorState &gstate_p, OperatorState &state_p) const {
void PhysicalStreamingWindow::ExecuteFunctions(ExecutionContext &context, DataChunk &chunk, DataChunk &delayed,
GlobalOperatorState &gstate_p, OperatorState &state_p) const {
auto &gstate = gstate_p.Cast<StreamingWindowGlobalState>();
auto &state = state_p.Cast<StreamingWindowState>();

if (!state.initialized) {
state.Initialize(context.client, input, select_list);
}
// Put payload columns in place
for (idx_t col_idx = 0; col_idx < input.data.size(); col_idx++) {
chunk.data[col_idx].Reference(input.data[col_idx]);
}
idx_t count = input.size();

// Handle LEAD
if (state.lead_count > 0) {
// Break the combined delayed + input into chunk + delayed
auto &delayed = state.delayed;
const idx_t available = delayed.size() + count;
// If we don't have enough to produce a single row,
// then just delay more rows and return nothing
if (available <= state.lead_count) {
delayed.Append(input);
chunk.SetCardinality(0);
return OperatorResultType::NEED_MORE_INPUT;
}

const idx_t prefixed = MinValue<idx_t>(delayed.size(), count);
if (prefixed) {
// Copy delayed[:prefixed] => chunk[:prefixed]
chunk.Reset();
for (idx_t col_idx = 0; col_idx < delayed.data.size(); col_idx++) {
auto &src = delayed.data[col_idx];
auto &dst = chunk.data[col_idx];
VectorOperations::Copy(src, dst, prefixed, 0, 0);
}
const idx_t ragged = delayed.size() - prefixed;
if (ragged) {
// Corner case: didn't consume all the delayed values
// Example: input has 1 row but delay is 2 rows.
// Copy delayed[ragged:] => delayed[0:]
auto &shifted = state.shifted;
shifted.Reset();
delayed.Copy(shifted, ragged);
delayed.Reset();
shifted.Copy(delayed, 0);
// Copy input[:] => delayed[ragged:]
delayed.Append(input);
} else {
// Copy input[:count - prefixed] => chunk[prefixed:count]
const idx_t src_count = count - prefixed;
for (idx_t col_idx = 0; col_idx < input.data.size(); col_idx++) {
auto &src = input.data[col_idx];
auto &dst = chunk.data[col_idx];
VectorOperations::Copy(src, dst, src_count, 0, prefixed);
}
// Copy input[count - prefixed:] => delayed
delayed.Reset();
input.Copy(delayed, src_count);
}
} else {
// Nothing delayed yet, so just truncate and copy the delayed values
count -= state.lead_count;
input.Copy(delayed, count);
}
}
chunk.SetCardinality(count);

// Compute window functions
for (idx_t expr_idx = 0; expr_idx < select_list.size(); expr_idx++) {
idx_t col_idx = input.data.size() + expr_idx;
const idx_t count = chunk.size();
const column_t input_width = children[0]->GetTypes().size();
for (column_t expr_idx = 0; expr_idx < select_list.size(); expr_idx++) {
column_t col_idx = input_width + expr_idx;
auto &expr = *select_list[expr_idx];
auto &result = chunk.data[col_idx];
switch (expr.GetExpressionType()) {
Expand All @@ -566,76 +505,92 @@ OperatorResultType PhysicalStreamingWindow::Execute(ExecutionContext &context, D
}
case ExpressionType::WINDOW_LAG:
case ExpressionType::WINDOW_LEAD:
state.lead_lag_states[expr_idx]->Execute(context, chunk, state.delayed, result);
state.lead_lag_states[expr_idx]->Execute(context, chunk, delayed, result);
break;
default:
throw NotImplementedException("%s for StreamingWindow", ExpressionTypeToString(expr.GetExpressionType()));
}
}
gstate.row_number += NumericCast<int64_t>(count);
return OperatorResultType::NEED_MORE_INPUT;
}

OperatorFinalizeResultType PhysicalStreamingWindow::FinalExecute(ExecutionContext &context, DataChunk &chunk,
GlobalOperatorState &gstate_p,
OperatorState &state_p) const {

auto &gstate = gstate_p.Cast<StreamingWindowGlobalState>();
void PhysicalStreamingWindow::ExecuteInput(ExecutionContext &context, DataChunk &delayed, DataChunk &input,
DataChunk &chunk, GlobalOperatorState &gstate_p,
OperatorState &state_p) const {
auto &state = state_p.Cast<StreamingWindowState>();

if (!state.initialized || !state.lead_count) {
return OperatorFinalizeResultType::FINISHED;
}

// The delayed input is the input
auto &input = state.delayed;

// Put payload columns in place
for (idx_t col_idx = 0; col_idx < input.data.size(); col_idx++) {
chunk.data[col_idx].Reference(input.data[col_idx]);
}
idx_t count = input.size();

// Handle LEAD
if (state.lead_count > 0) {
// Nothing delayed yet, so just truncate and copy the delayed values
count -= state.lead_count;
input.Copy(delayed, count);
}
chunk.SetCardinality(count);

// There is no more delayed data
auto &delayed = state.shifted;
delayed.Reset();
ExecuteFunctions(context, chunk, state.delayed, gstate_p, state_p);
}

// Compute window functions
for (idx_t expr_idx = 0; expr_idx < select_list.size(); expr_idx++) {
idx_t col_idx = input.data.size() + expr_idx;
auto &expr = *select_list[expr_idx];
auto &result = chunk.data[col_idx];
switch (expr.GetExpressionType()) {
case ExpressionType::WINDOW_AGGREGATE:
state.aggregate_states[expr_idx]->Execute(context, chunk, result);
break;
case ExpressionType::WINDOW_FIRST_VALUE:
case ExpressionType::WINDOW_PERCENT_RANK:
case ExpressionType::WINDOW_RANK:
case ExpressionType::WINDOW_RANK_DENSE: {
// Reference constant vector
chunk.data[col_idx].Reference(*state.const_vectors[expr_idx]);
break;
}
case ExpressionType::WINDOW_ROW_NUMBER: {
// Set row numbers
int64_t start_row = gstate.row_number;
auto rdata = FlatVector::GetData<int64_t>(chunk.data[col_idx]);
for (idx_t i = 0; i < count; i++) {
rdata[i] = NumericCast<int64_t>(start_row + NumericCast<int64_t>(i));
}
break;
}
case ExpressionType::WINDOW_LAG:
case ExpressionType::WINDOW_LEAD:
state.lead_lag_states[expr_idx]->Execute(context, chunk, delayed, result);
break;
default:
throw NotImplementedException("%s for StreamingWindow", ExpressionTypeToString(expr.GetExpressionType()));
}
void PhysicalStreamingWindow::ExecuteDelayed(ExecutionContext &context, DataChunk &delayed, DataChunk &input,
DataChunk &chunk, GlobalOperatorState &gstate_p,
OperatorState &state_p) const {

// Put payload columns in place
for (idx_t col_idx = 0; col_idx < delayed.data.size(); col_idx++) {
chunk.data[col_idx].Reference(delayed.data[col_idx]);
}
idx_t count = delayed.size();
chunk.SetCardinality(count);

ExecuteFunctions(context, chunk, input, gstate_p, state_p);
}

OperatorResultType PhysicalStreamingWindow::Execute(ExecutionContext &context, DataChunk &input, DataChunk &chunk,
GlobalOperatorState &gstate_p, OperatorState &state_p) const {
auto &state = state_p.Cast<StreamingWindowState>();
if (!state.initialized) {
state.Initialize(context.client, input, select_list);
}

auto &delayed = state.delayed;
const idx_t available = delayed.size() + input.size();
if (available <= state.lead_count) {
// If we don't have enough to produce a single row,
// then just delay more rows, return nothing
// and ask for more data.
delayed.Append(input);
chunk.SetCardinality(0);
return OperatorResultType::NEED_MORE_INPUT;
} else if (delayed.size()) {
// We have enough delayed rows so flush them
ExecuteDelayed(context, delayed, input, chunk, gstate_p, state_p);
delayed.Reset();
// Come back to process the input
return OperatorResultType::HAVE_MORE_OUTPUT;
} else {
// No delayed rows, so emit what we can and delay the rest.
ExecuteInput(context, delayed, input, chunk, gstate_p, state_p);
return OperatorResultType::NEED_MORE_INPUT;
}
}

OperatorFinalizeResultType PhysicalStreamingWindow::FinalExecute(ExecutionContext &context, DataChunk &chunk,
GlobalOperatorState &gstate_p,
OperatorState &state_p) const {
auto &state = state_p.Cast<StreamingWindowState>();

if (state.initialized && state.lead_count) {
auto &delayed = state.delayed;
// There are no more row
auto &input = state.shifted;
input.Reset();
ExecuteDelayed(context, delayed, input, chunk, gstate_p, state_p);
}
gstate.row_number += NumericCast<int64_t>(count);

return OperatorFinalizeResultType::FINISHED;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,12 @@ class PhysicalStreamingWindow : public PhysicalOperator {
string ParamsToString() const override;

private:
void ExecuteAggregate();
void ExecuteFunctions(ExecutionContext &context, DataChunk &chunk, DataChunk &delayed,
GlobalOperatorState &gstate_p, OperatorState &state_p) const;
void ExecuteInput(ExecutionContext &context, DataChunk &delayed, DataChunk &input, DataChunk &chunk,
GlobalOperatorState &gstate, OperatorState &state) const;
void ExecuteDelayed(ExecutionContext &context, DataChunk &delayed, DataChunk &input, DataChunk &chunk,
GlobalOperatorState &gstate, OperatorState &state) const;
};

} // namespace duckdb

0 comments on commit 10d6bfc

Please sign in to comment.