From 10d6bfc1fa24512c44800c2dc45ea65fefcf2ea4 Mon Sep 17 00:00:00 2001 From: Richard Wesley Date: Fri, 28 Jun 2024 11:52:06 -0700 Subject: [PATCH] Issue #12600: Streaming Positive LEAD PR Feedback: * Avoid copying fixes: duckdb#12600 fixes: duckdblabs/duckdb-internal#2342 --- .../aggregate/physical_streaming_window.cpp | 191 +++++++----------- .../aggregate/physical_streaming_window.hpp | 7 +- 2 files changed, 79 insertions(+), 119 deletions(-) diff --git a/src/execution/operator/aggregate/physical_streaming_window.cpp b/src/execution/operator/aggregate/physical_streaming_window.cpp index df99bd135e2..bcfd6cfc931 100644 --- a/src/execution/operator/aggregate/physical_streaming_window.cpp +++ b/src/execution/operator/aggregate/physical_streaming_window.cpp @@ -470,77 +470,16 @@ void StreamingWindowState::AggregateState::Execute(ExecutionContext &context, Da } } -OperatorResultType PhysicalStreamingWindow::Execute(ExecutionContext &context, DataChunk &input, DataChunk &chunk, - GlobalOperatorState &gstate_p, OperatorState &state_p) const { +void PhysicalStreamingWindow::ExecuteFunctions(ExecutionContext &context, DataChunk &chunk, DataChunk &delayed, + GlobalOperatorState &gstate_p, OperatorState &state_p) const { auto &gstate = gstate_p.Cast(); auto &state = state_p.Cast(); - if (!state.initialized) { - state.Initialize(context.client, input, select_list); - } - // Put payload columns in place - for (idx_t col_idx = 0; col_idx < input.data.size(); col_idx++) { - chunk.data[col_idx].Reference(input.data[col_idx]); - } - idx_t count = input.size(); - - // Handle LEAD - if (state.lead_count > 0) { - // Break the combined delayed + input into chunk + delayed - auto &delayed = state.delayed; - const idx_t available = delayed.size() + count; - // If we don't have enough to produce a single row, - // then just delay more rows and return nothing - if (available <= state.lead_count) { - delayed.Append(input); - chunk.SetCardinality(0); - return OperatorResultType::NEED_MORE_INPUT; - } - - const idx_t prefixed = MinValue(delayed.size(), count); - if (prefixed) { - // Copy delayed[:prefixed] => chunk[:prefixed] - chunk.Reset(); - for (idx_t col_idx = 0; col_idx < delayed.data.size(); col_idx++) { - auto &src = delayed.data[col_idx]; - auto &dst = chunk.data[col_idx]; - VectorOperations::Copy(src, dst, prefixed, 0, 0); - } - const idx_t ragged = delayed.size() - prefixed; - if (ragged) { - // Corner case: didn't consume all the delayed values - // Example: input has 1 row but delay is 2 rows. - // Copy delayed[ragged:] => delayed[0:] - auto &shifted = state.shifted; - shifted.Reset(); - delayed.Copy(shifted, ragged); - delayed.Reset(); - shifted.Copy(delayed, 0); - // Copy input[:] => delayed[ragged:] - delayed.Append(input); - } else { - // Copy input[:count - prefixed] => chunk[prefixed:count] - const idx_t src_count = count - prefixed; - for (idx_t col_idx = 0; col_idx < input.data.size(); col_idx++) { - auto &src = input.data[col_idx]; - auto &dst = chunk.data[col_idx]; - VectorOperations::Copy(src, dst, src_count, 0, prefixed); - } - // Copy input[count - prefixed:] => delayed - delayed.Reset(); - input.Copy(delayed, src_count); - } - } else { - // Nothing delayed yet, so just truncate and copy the delayed values - count -= state.lead_count; - input.Copy(delayed, count); - } - } - chunk.SetCardinality(count); - // Compute window functions - for (idx_t expr_idx = 0; expr_idx < select_list.size(); expr_idx++) { - idx_t col_idx = input.data.size() + expr_idx; + const idx_t count = chunk.size(); + const column_t input_width = children[0]->GetTypes().size(); + for (column_t expr_idx = 0; expr_idx < select_list.size(); expr_idx++) { + column_t col_idx = input_width + expr_idx; auto &expr = *select_list[expr_idx]; auto &result = chunk.data[col_idx]; switch (expr.GetExpressionType()) { @@ -566,76 +505,92 @@ OperatorResultType PhysicalStreamingWindow::Execute(ExecutionContext &context, D } case ExpressionType::WINDOW_LAG: case ExpressionType::WINDOW_LEAD: - state.lead_lag_states[expr_idx]->Execute(context, chunk, state.delayed, result); + state.lead_lag_states[expr_idx]->Execute(context, chunk, delayed, result); break; default: throw NotImplementedException("%s for StreamingWindow", ExpressionTypeToString(expr.GetExpressionType())); } } gstate.row_number += NumericCast(count); - return OperatorResultType::NEED_MORE_INPUT; } -OperatorFinalizeResultType PhysicalStreamingWindow::FinalExecute(ExecutionContext &context, DataChunk &chunk, - GlobalOperatorState &gstate_p, - OperatorState &state_p) const { - - auto &gstate = gstate_p.Cast(); +void PhysicalStreamingWindow::ExecuteInput(ExecutionContext &context, DataChunk &delayed, DataChunk &input, + DataChunk &chunk, GlobalOperatorState &gstate_p, + OperatorState &state_p) const { auto &state = state_p.Cast(); - if (!state.initialized || !state.lead_count) { - return OperatorFinalizeResultType::FINISHED; - } - - // The delayed input is the input - auto &input = state.delayed; - // Put payload columns in place for (idx_t col_idx = 0; col_idx < input.data.size(); col_idx++) { chunk.data[col_idx].Reference(input.data[col_idx]); } idx_t count = input.size(); + + // Handle LEAD + if (state.lead_count > 0) { + // Nothing delayed yet, so just truncate and copy the delayed values + count -= state.lead_count; + input.Copy(delayed, count); + } chunk.SetCardinality(count); - // There is no more delayed data - auto &delayed = state.shifted; - delayed.Reset(); + ExecuteFunctions(context, chunk, state.delayed, gstate_p, state_p); +} - // Compute window functions - for (idx_t expr_idx = 0; expr_idx < select_list.size(); expr_idx++) { - idx_t col_idx = input.data.size() + expr_idx; - auto &expr = *select_list[expr_idx]; - auto &result = chunk.data[col_idx]; - switch (expr.GetExpressionType()) { - case ExpressionType::WINDOW_AGGREGATE: - state.aggregate_states[expr_idx]->Execute(context, chunk, result); - break; - case ExpressionType::WINDOW_FIRST_VALUE: - case ExpressionType::WINDOW_PERCENT_RANK: - case ExpressionType::WINDOW_RANK: - case ExpressionType::WINDOW_RANK_DENSE: { - // Reference constant vector - chunk.data[col_idx].Reference(*state.const_vectors[expr_idx]); - break; - } - case ExpressionType::WINDOW_ROW_NUMBER: { - // Set row numbers - int64_t start_row = gstate.row_number; - auto rdata = FlatVector::GetData(chunk.data[col_idx]); - for (idx_t i = 0; i < count; i++) { - rdata[i] = NumericCast(start_row + NumericCast(i)); - } - break; - } - case ExpressionType::WINDOW_LAG: - case ExpressionType::WINDOW_LEAD: - state.lead_lag_states[expr_idx]->Execute(context, chunk, delayed, result); - break; - default: - throw NotImplementedException("%s for StreamingWindow", ExpressionTypeToString(expr.GetExpressionType())); - } +void PhysicalStreamingWindow::ExecuteDelayed(ExecutionContext &context, DataChunk &delayed, DataChunk &input, + DataChunk &chunk, GlobalOperatorState &gstate_p, + OperatorState &state_p) const { + + // Put payload columns in place + for (idx_t col_idx = 0; col_idx < delayed.data.size(); col_idx++) { + chunk.data[col_idx].Reference(delayed.data[col_idx]); + } + idx_t count = delayed.size(); + chunk.SetCardinality(count); + + ExecuteFunctions(context, chunk, input, gstate_p, state_p); +} + +OperatorResultType PhysicalStreamingWindow::Execute(ExecutionContext &context, DataChunk &input, DataChunk &chunk, + GlobalOperatorState &gstate_p, OperatorState &state_p) const { + auto &state = state_p.Cast(); + if (!state.initialized) { + state.Initialize(context.client, input, select_list); + } + + auto &delayed = state.delayed; + const idx_t available = delayed.size() + input.size(); + if (available <= state.lead_count) { + // If we don't have enough to produce a single row, + // then just delay more rows, return nothing + // and ask for more data. + delayed.Append(input); + chunk.SetCardinality(0); + return OperatorResultType::NEED_MORE_INPUT; + } else if (delayed.size()) { + // We have enough delayed rows so flush them + ExecuteDelayed(context, delayed, input, chunk, gstate_p, state_p); + delayed.Reset(); + // Come back to process the input + return OperatorResultType::HAVE_MORE_OUTPUT; + } else { + // No delayed rows, so emit what we can and delay the rest. + ExecuteInput(context, delayed, input, chunk, gstate_p, state_p); + return OperatorResultType::NEED_MORE_INPUT; + } +} + +OperatorFinalizeResultType PhysicalStreamingWindow::FinalExecute(ExecutionContext &context, DataChunk &chunk, + GlobalOperatorState &gstate_p, + OperatorState &state_p) const { + auto &state = state_p.Cast(); + + if (state.initialized && state.lead_count) { + auto &delayed = state.delayed; + // There are no more row + auto &input = state.shifted; + input.Reset(); + ExecuteDelayed(context, delayed, input, chunk, gstate_p, state_p); } - gstate.row_number += NumericCast(count); return OperatorFinalizeResultType::FINISHED; } diff --git a/src/include/duckdb/execution/operator/aggregate/physical_streaming_window.hpp b/src/include/duckdb/execution/operator/aggregate/physical_streaming_window.hpp index c50e8f1b581..13288980515 100644 --- a/src/include/duckdb/execution/operator/aggregate/physical_streaming_window.hpp +++ b/src/include/duckdb/execution/operator/aggregate/physical_streaming_window.hpp @@ -49,7 +49,12 @@ class PhysicalStreamingWindow : public PhysicalOperator { string ParamsToString() const override; private: - void ExecuteAggregate(); + void ExecuteFunctions(ExecutionContext &context, DataChunk &chunk, DataChunk &delayed, + GlobalOperatorState &gstate_p, OperatorState &state_p) const; + void ExecuteInput(ExecutionContext &context, DataChunk &delayed, DataChunk &input, DataChunk &chunk, + GlobalOperatorState &gstate, OperatorState &state) const; + void ExecuteDelayed(ExecutionContext &context, DataChunk &delayed, DataChunk &input, DataChunk &chunk, + GlobalOperatorState &gstate, OperatorState &state) const; }; } // namespace duckdb