diff --git a/benchmark/micro/window/streaming_lag.benchmark b/benchmark/micro/window/streaming_lag.benchmark new file mode 100644 index 00000000000..5e546bdbd88 --- /dev/null +++ b/benchmark/micro/window/streaming_lag.benchmark @@ -0,0 +1,27 @@ +# name: benchmark/micro/window/streaming_lag.benchmark +# description: Verify performance of streaming LAG +# group: [window] + +load +SELECT SETSEED(0.8675309); +CREATE OR REPLACE TABLE df AS + SELECT + RANDOM() AS a, + RANDOM() AS b, + RANDOM() AS c, + FROM range(10_000_000); + +run +SELECT sum(a_1 + a_2 + b_1 + b_2) +FROM ( + SELECT + LAG(a, 1) OVER () AS a_1, + LAG(a, 2) OVER () AS a_2, + LAG(b, 1) OVER () AS b_1, + LAG(b, 2) OVER () AS b_2 + FROM df +) t +; + +result I +20000902.549240764 diff --git a/src/execution/operator/aggregate/physical_streaming_window.cpp b/src/execution/operator/aggregate/physical_streaming_window.cpp index 4473917873b..8e2d328a04e 100644 --- a/src/execution/operator/aggregate/physical_streaming_window.cpp +++ b/src/execution/operator/aggregate/physical_streaming_window.cpp @@ -9,28 +9,6 @@ namespace duckdb { -bool PhysicalStreamingWindow::IsStreamingFunction(unique_ptr &expr) { - auto &wexpr = expr->Cast(); - if (!wexpr.partitions.empty() || !wexpr.orders.empty() || wexpr.ignore_nulls || - wexpr.exclude_clause != WindowExcludeMode::NO_OTHER) { - return false; - } - switch (wexpr.type) { - // TODO: add more expression types here? - case ExpressionType::WINDOW_AGGREGATE: - // We can stream aggregates if they are "running totals" - return wexpr.start == WindowBoundary::UNBOUNDED_PRECEDING && wexpr.end == WindowBoundary::CURRENT_ROW_ROWS; - case ExpressionType::WINDOW_FIRST_VALUE: - case ExpressionType::WINDOW_PERCENT_RANK: - case ExpressionType::WINDOW_RANK: - case ExpressionType::WINDOW_RANK_DENSE: - case ExpressionType::WINDOW_ROW_NUMBER: - return true; - default: - return false; - } -} - PhysicalStreamingWindow::PhysicalStreamingWindow(vector types, vector> select_list, idx_t estimated_cardinality, PhysicalOperatorType type) : PhysicalOperator(type, std::move(types), estimated_cardinality), select_list(std::move(select_list)) { @@ -49,8 +27,9 @@ class StreamingWindowState : public OperatorState { public: struct AggregateState { AggregateState(ClientContext &client, BoundWindowExpression &wexpr, Allocator &allocator) - : arena_allocator(Allocator::DefaultAllocator()), statev(LogicalType::POINTER, data_ptr_cast(&state_ptr)), - hashes(LogicalType::HASH), addresses(LogicalType::POINTER) { + : wexpr(wexpr), arena_allocator(Allocator::DefaultAllocator()), + statev(LogicalType::POINTER, data_ptr_cast(&state_ptr)), hashes(LogicalType::HASH), + addresses(LogicalType::POINTER) { D_ASSERT(wexpr.GetExpressionType() == ExpressionType::WINDOW_AGGREGATE); auto &aggregate = *wexpr.aggregate; bind_data = wexpr.bind_info.get(); @@ -83,6 +62,10 @@ class StreamingWindowState : public OperatorState { } } + void Execute(ExecutionContext &context, DataChunk &input, Vector &result); + + //! The aggregate expression + BoundWindowExpression &wexpr; //! The allocator to use for aggregate data structures ArenaAllocator arena_allocator; //! The single aggregate state we update row-by-row @@ -118,6 +101,98 @@ class StreamingWindowState : public OperatorState { Vector addresses; }; + struct LeadLagState { + static constexpr idx_t MAX_BUFFER = STANDARD_VECTOR_SIZE; + + static bool ComputeOffset(ClientContext &context, BoundWindowExpression &wexpr, int64_t &offset) { + offset = 1; + if (wexpr.offset_expr) { + if (wexpr.offset_expr->HasParameter() || !wexpr.offset_expr->IsFoldable()) { + return false; + } + auto offset_value = ExpressionExecutor::EvaluateScalar(context, *wexpr.offset_expr); + if (offset_value.IsNull()) { + return false; + } + Value bigint_value; + if (!offset_value.DefaultTryCastAs(LogicalType::BIGINT, bigint_value, nullptr, false)) { + return false; + } + offset = bigint_value.GetValue(); + } + + // We can only support negative LEAD values and positive LAG values + if (wexpr.type == ExpressionType::WINDOW_LEAD) { + offset = -offset; + } + return 0 <= offset && idx_t(offset) < MAX_BUFFER; + } + + static bool ComputeDefault(ClientContext &context, BoundWindowExpression &wexpr, Value &result) { + if (!wexpr.default_expr) { + result = Value(wexpr.return_type); + return true; + } + + if (wexpr.default_expr && (wexpr.default_expr->HasParameter() || !wexpr.default_expr->IsFoldable())) { + return false; + } + auto dflt_value = ExpressionExecutor::EvaluateScalar(context, *wexpr.default_expr); + return dflt_value.DefaultTryCastAs(wexpr.return_type, result, nullptr, false); + } + + LeadLagState(ClientContext &context, BoundWindowExpression &wexpr) + : wexpr(wexpr), curr(wexpr.return_type), prev(wexpr.return_type, MAX_BUFFER), + temp(wexpr.return_type, MAX_BUFFER) { + ComputeOffset(context, wexpr, offset); + ComputeDefault(context, wexpr, dflt); + prev.Reference(dflt); + prev.Flatten(MAX_BUFFER); + } + + void Execute(ExecutionContext &context, DataChunk &input, Vector &result) { + ExpressionExecutor executor(context.client, *wexpr.children[0]); + executor.ExecuteExpression(input, curr); + + // Copy prev[MAX_BUFFER-offset, MAX_BUFFER] => result[0, offset] + idx_t source_count = MAX_BUFFER; + idx_t source_offset = source_count - idx_t(offset); + idx_t target_offset = 0; + VectorOperations::Copy(prev, result, source_count, source_offset, target_offset); + // Copy curr[0, count-offset] => result[offset, count] + target_offset = idx_t(offset); + source_count = input.size() - target_offset; + source_offset = 0; + VectorOperations::Copy(curr, result, source_count, source_offset, target_offset); + // Copy curr[0, count] => prev[prev.count - count, prev.count] + source_count = input.size(); + source_offset = 0; + target_offset = MAX_BUFFER - source_count; + if (target_offset) { + // Shift down incomplete buffers + VectorOperations::Copy(prev, temp, MAX_BUFFER, source_count, 0); + VectorOperations::Copy(temp, prev, target_offset, 0, 0); + VectorOperations::Copy(curr, prev, source_count, source_offset, target_offset); + } else { + // Overwrite + VectorOperations::Copy(curr, prev, source_count, source_offset, target_offset); + } + } + + //! The aggregate expression + BoundWindowExpression &wexpr; + //! The constant offset + int64_t offset; + //! The constant default value + Value dflt; + //! The current set of values + Vector curr; + //! The previous set of values + Vector prev; + //! The copy buffer + Vector temp; + }; + explicit StreamingWindowState(ClientContext &client) : initialized(false), allocator(Allocator::Get(client)) { } @@ -127,6 +202,7 @@ class StreamingWindowState : public OperatorState { void Initialize(ClientContext &context, DataChunk &input, const vector> &expressions) { const_vectors.resize(expressions.size()); aggregate_states.resize(expressions.size()); + lead_lag_states.resize(expressions.size()); for (idx_t expr_idx = 0; expr_idx < expressions.size(); expr_idx++) { auto &expr = *expressions[expr_idx]; @@ -155,6 +231,10 @@ class StreamingWindowState : public OperatorState { const_vectors[expr_idx] = make_uniq(Value((int64_t)1)); break; } + case ExpressionType::WINDOW_LAG: + case ExpressionType::WINDOW_LEAD: + lead_lag_states[expr_idx] = make_uniq(context, wexpr); + break; default: break; } @@ -169,8 +249,41 @@ class StreamingWindowState : public OperatorState { // Aggregation vector> aggregate_states; Allocator &allocator; + + // Lead/Lag + vector> lead_lag_states; }; +bool PhysicalStreamingWindow::IsStreamingFunction(ClientContext &context, unique_ptr &expr) { + auto &wexpr = expr->Cast(); + if (!wexpr.partitions.empty() || !wexpr.orders.empty() || wexpr.ignore_nulls || + wexpr.exclude_clause != WindowExcludeMode::NO_OTHER) { + return false; + } + switch (wexpr.type) { + // TODO: add more expression types here? + case ExpressionType::WINDOW_AGGREGATE: + // We can stream aggregates if they are "running totals" + return wexpr.start == WindowBoundary::UNBOUNDED_PRECEDING && wexpr.end == WindowBoundary::CURRENT_ROW_ROWS; + case ExpressionType::WINDOW_FIRST_VALUE: + case ExpressionType::WINDOW_PERCENT_RANK: + case ExpressionType::WINDOW_RANK: + case ExpressionType::WINDOW_RANK_DENSE: + case ExpressionType::WINDOW_ROW_NUMBER: + return true; + case ExpressionType::WINDOW_LAG: + case ExpressionType::WINDOW_LEAD: { + // We can stream LEAD/LAG if the arguments are constant and the delta is less than a block behind + Value dflt; + int64_t offset; + return StreamingWindowState::LeadLagState::ComputeDefault(context, wexpr, dflt) && + StreamingWindowState::LeadLagState::ComputeOffset(context, wexpr, offset); + } + default: + return false; + } +} + unique_ptr PhysicalStreamingWindow::GetGlobalOperatorState(ClientContext &context) const { return make_uniq(); } @@ -179,6 +292,113 @@ unique_ptr PhysicalStreamingWindow::GetOperatorState(ExecutionCon return make_uniq(context.client); } +void StreamingWindowState::AggregateState::Execute(ExecutionContext &context, DataChunk &input, Vector &result) { + // Establish the aggregation environment + const idx_t count = input.size(); + auto &aggregate = *wexpr.aggregate; + auto &aggr_state = *this; + auto &statev = aggr_state.statev; + + // Compute the FILTER mask (if any) + ValidityMask filter_mask; + auto filtered = count; + auto &filter_sel = aggr_state.filter_sel; + if (wexpr.filter_expr) { + ExpressionExecutor filter_executor(context.client, *wexpr.filter_expr); + filtered = filter_executor.SelectExpression(input, filter_sel); + if (filtered < count) { + filter_mask.Initialize(count); + filter_mask.SetAllInvalid(count); + for (idx_t f = 0; f < filtered; ++f) { + filter_mask.SetValid(filter_sel.get_index(f)); + } + } + } + + // Check for COUNT(*) + if (wexpr.children.empty()) { + D_ASSERT(GetTypeIdSize(result.GetType().InternalType()) == sizeof(int64_t)); + auto data = FlatVector::GetData(result); + auto &unfiltered = aggr_state.unfiltered; + for (idx_t i = 0; i < count; ++i) { + unfiltered += int64_t(filter_mask.RowIsValid(i)); + data[i] = unfiltered; + } + return; + } + + // Compute the arguments + ExpressionExecutor executor(context.client); + for (auto &child : wexpr.children) { + executor.AddExpression(*child); + } + auto &arg_chunk = aggr_state.arg_chunk; + executor.Execute(input, arg_chunk); + arg_chunk.Flatten(); + + // Update the distinct hash table + ValidityMask distinct_mask; + if (aggr_state.distinct) { + auto &distinct_args = aggr_state.distinct_args; + distinct_args.Reference(arg_chunk); + if (wexpr.filter_expr) { + distinct_args.Slice(filter_sel, filtered); + } + idx_t distinct = 0; + auto &distinct_sel = aggr_state.distinct_sel; + if (filtered) { + // FindOrCreateGroups assumes non-empty input + auto &hashes = aggr_state.hashes; + distinct_args.Hash(hashes); + + auto &addresses = aggr_state.addresses; + distinct = aggr_state.distinct->FindOrCreateGroups(distinct_args, hashes, addresses, distinct_sel); + } + + // Translate the distinct selection from filtered row numbers + // back to input row numbers. We need to produce output for all input rows, + // so we filter out + if (distinct < filtered) { + distinct_mask.Initialize(count); + distinct_mask.SetAllInvalid(count); + for (idx_t d = 0; d < distinct; ++d) { + const auto f = distinct_sel.get_index(d); + distinct_mask.SetValid(filter_sel.get_index(f)); + } + } + } + + // Iterate through them using a single SV + sel_t s = 0; + SelectionVector sel(&s); + auto &arg_cursor = aggr_state.arg_cursor; + arg_cursor.Reset(); + arg_cursor.Slice(sel, 1); + // This doesn't work for STRUCTs because the SV + // is not copied to the children when you slice + vector structs; + for (column_t col_idx = 0; col_idx < arg_chunk.ColumnCount(); ++col_idx) { + auto &col_vec = arg_cursor.data[col_idx]; + DictionaryVector::Child(col_vec).Reference(arg_chunk.data[col_idx]); + if (col_vec.GetType().InternalType() == PhysicalType::STRUCT) { + structs.emplace_back(col_idx); + } + } + + // Update the state and finalize it one row at a time. + AggregateInputData aggr_input_data(wexpr.bind_info.get(), aggr_state.arena_allocator); + for (idx_t i = 0; i < count; ++i) { + sel.set_index(0, i); + for (const auto struct_idx : structs) { + arg_cursor.data[struct_idx].Slice(arg_chunk.data[struct_idx], sel, 1); + } + if (filter_mask.RowIsValid(i) && distinct_mask.RowIsValid(i)) { + aggregate.update(arg_cursor.data.data(), aggr_input_data, arg_cursor.ColumnCount(), statev, 1); + } + aggregate.finalize(statev, aggr_input_data, result, 1, i); + } +} + OperatorResultType PhysicalStreamingWindow::Execute(ExecutionContext &context, DataChunk &input, DataChunk &chunk, GlobalOperatorState &gstate_p, OperatorState &state_p) const { auto &gstate = gstate_p.Cast(); @@ -198,113 +418,9 @@ OperatorResultType PhysicalStreamingWindow::Execute(ExecutionContext &context, D auto &expr = *select_list[expr_idx]; auto &result = chunk.data[col_idx]; switch (expr.GetExpressionType()) { - case ExpressionType::WINDOW_AGGREGATE: { - // Establish the aggregation environment - auto &wexpr = expr.Cast(); - auto &aggregate = *wexpr.aggregate; - auto &aggr_state = *state.aggregate_states[expr_idx]; - auto &statev = aggr_state.statev; - - // Compute the FILTER mask (if any) - ValidityMask filter_mask; - auto filtered = count; - auto &filter_sel = aggr_state.filter_sel; - if (wexpr.filter_expr) { - ExpressionExecutor filter_executor(context.client, *wexpr.filter_expr); - filtered = filter_executor.SelectExpression(input, filter_sel); - if (filtered < count) { - filter_mask.Initialize(count); - filter_mask.SetAllInvalid(count); - for (idx_t f = 0; f < filtered; ++f) { - filter_mask.SetValid(filter_sel.get_index(f)); - } - } - } - - // Check for COUNT(*) - if (wexpr.children.empty()) { - D_ASSERT(GetTypeIdSize(result.GetType().InternalType()) == sizeof(int64_t)); - auto data = FlatVector::GetData(result); - auto &unfiltered = aggr_state.unfiltered; - for (idx_t i = 0; i < count; ++i) { - unfiltered += int64_t(filter_mask.RowIsValid(i)); - data[i] = unfiltered; - } - break; - } - - // Compute the arguments - ExpressionExecutor executor(context.client); - for (auto &child : wexpr.children) { - executor.AddExpression(*child); - } - auto &arg_chunk = aggr_state.arg_chunk; - executor.Execute(input, arg_chunk); - arg_chunk.Flatten(); - - // Update the distinct hash table - ValidityMask distinct_mask; - if (aggr_state.distinct) { - auto &distinct_args = aggr_state.distinct_args; - distinct_args.Reference(arg_chunk); - if (wexpr.filter_expr) { - distinct_args.Slice(filter_sel, filtered); - } - idx_t distinct = 0; - auto &distinct_sel = aggr_state.distinct_sel; - if (filtered) { - // FindOrCreateGroups assumes non-empty input - auto &hashes = aggr_state.hashes; - distinct_args.Hash(hashes); - - auto &addresses = aggr_state.addresses; - distinct = aggr_state.distinct->FindOrCreateGroups(distinct_args, hashes, addresses, distinct_sel); - } - - // Translate the distinct selection from filtered row numbers - // back to input row numbers. We need to produce output for all input rows, - // so we filter out - if (distinct < filtered) { - distinct_mask.Initialize(count); - distinct_mask.SetAllInvalid(count); - for (idx_t d = 0; d < distinct; ++d) { - const auto f = distinct_sel.get_index(d); - distinct_mask.SetValid(filter_sel.get_index(f)); - } - } - } - - // Iterate through them using a single SV - sel_t s = 0; - SelectionVector sel(&s); - auto &arg_cursor = aggr_state.arg_cursor; - arg_cursor.Reset(); - arg_cursor.Slice(sel, 1); - // This doesn't work for STRUCTs because the SV - // is not copied to the children when you slice - vector structs; - for (column_t col_idx = 0; col_idx < arg_chunk.ColumnCount(); ++col_idx) { - auto &col_vec = arg_cursor.data[col_idx]; - DictionaryVector::Child(col_vec).Reference(arg_chunk.data[col_idx]); - if (col_vec.GetType().InternalType() == PhysicalType::STRUCT) { - structs.emplace_back(col_idx); - } - } - - // Update the state and finalize it one row at a time. - AggregateInputData aggr_input_data(wexpr.bind_info.get(), aggr_state.arena_allocator); - for (idx_t i = 0; i < count; ++i) { - sel.set_index(0, i); - for (const auto struct_idx : structs) { - arg_cursor.data[struct_idx].Slice(arg_chunk.data[struct_idx], sel, 1); - } - if (filter_mask.RowIsValid(i) && distinct_mask.RowIsValid(i)) { - aggregate.update(arg_cursor.data.data(), aggr_input_data, arg_cursor.ColumnCount(), statev, 1); - } - aggregate.finalize(statev, aggr_input_data, result, 1, i); - } + case ExpressionType::WINDOW_AGGREGATE: + state.aggregate_states[expr_idx]->Execute(context, input, result); break; - } case ExpressionType::WINDOW_FIRST_VALUE: case ExpressionType::WINDOW_PERCENT_RANK: case ExpressionType::WINDOW_RANK: @@ -322,6 +438,10 @@ OperatorResultType PhysicalStreamingWindow::Execute(ExecutionContext &context, D } break; } + case ExpressionType::WINDOW_LAG: + case ExpressionType::WINDOW_LEAD: + state.lead_lag_states[expr_idx]->Execute(context, input, result); + break; default: throw NotImplementedException("%s for StreamingWindow", ExpressionTypeToString(expr.GetExpressionType())); } diff --git a/src/execution/physical_plan/plan_window.cpp b/src/execution/physical_plan/plan_window.cpp index 9bc4d1600ca..f294d9324fb 100644 --- a/src/execution/physical_plan/plan_window.cpp +++ b/src/execution/physical_plan/plan_window.cpp @@ -33,7 +33,7 @@ unique_ptr PhysicalPlanGenerator::CreatePlan(LogicalWindow &op vector blocking_windows; vector streaming_windows; for (idx_t expr_idx = 0; expr_idx < op.expressions.size(); expr_idx++) { - if (enable_optimizer && PhysicalStreamingWindow::IsStreamingFunction(op.expressions[expr_idx])) { + if (enable_optimizer && PhysicalStreamingWindow::IsStreamingFunction(context, op.expressions[expr_idx])) { streaming_windows.push_back(expr_idx); } else { blocking_windows.push_back(expr_idx); diff --git a/src/include/duckdb/execution/operator/aggregate/physical_streaming_window.hpp b/src/include/duckdb/execution/operator/aggregate/physical_streaming_window.hpp index 938ffaf762a..6b108380628 100644 --- a/src/include/duckdb/execution/operator/aggregate/physical_streaming_window.hpp +++ b/src/include/duckdb/execution/operator/aggregate/physical_streaming_window.hpp @@ -18,7 +18,7 @@ class PhysicalStreamingWindow : public PhysicalOperator { public: static constexpr const PhysicalOperatorType TYPE = PhysicalOperatorType::STREAMING_WINDOW; - static bool IsStreamingFunction(unique_ptr &expr); + static bool IsStreamingFunction(ClientContext &context, unique_ptr &expr); public: PhysicalStreamingWindow(vector types, vector> select_list, @@ -40,6 +40,9 @@ class PhysicalStreamingWindow : public PhysicalOperator { } string ParamsToString() const override; + +private: + void ExecuteAggregate(); }; } // namespace duckdb diff --git a/test/sql/window/test_streaming_lead_lag.test b/test/sql/window/test_streaming_lead_lag.test new file mode 100644 index 00000000000..f63de50b21f --- /dev/null +++ b/test/sql/window/test_streaming_lead_lag.test @@ -0,0 +1,84 @@ +# name: test/sql/window/test_streaming_lead_lag.test +# description: Streaming window functions +# group: [window] + +statement ok +PRAGMA enable_verification + +statement ok +PRAGMA explain_output = PHYSICAL_ONLY; + +query TT +EXPLAIN +SELECT i, LAG(i, 1) OVER() AS i1 +FROM range(10) tbl(i); +---- +physical_plan :.*STREAMING_WINDOW.* + +query II +SELECT i, LAG(i, 1) OVER() AS i1 +FROM range(10) tbl(i); +---- +0 NULL +1 0 +2 1 +3 2 +4 3 +5 4 +6 5 +7 6 +8 7 +9 8 + +query TT +EXPLAIN +SELECT i, LAG(i, -1) OVER() AS i1 +FROM range(10) tbl(i); +---- +physical_plan :.*STREAMING_WINDOW.* + +query TT +EXPLAIN +SELECT i, LEAD(i, -1) OVER() AS i1 +FROM range(10) tbl(i); +---- +physical_plan :.*STREAMING_WINDOW.* + +query II +SELECT i, LEAD(i, -1) OVER() AS i1 +FROM range(10) tbl(i); +---- +0 NULL +1 0 +2 1 +3 2 +4 3 +5 4 +6 5 +7 6 +8 7 +9 8 + +query TT +EXPLAIN +SELECT i, LEAD(i, 1) OVER() AS i1 +FROM range(10) tbl(i); +---- +physical_plan :.*STREAMING_WINDOW.* + +# Test shift down +query TT +EXPLAIN +SELECT i, LAG(i, 1) OVER() AS i1 +FROM range(3000) tbl(i) +WHERE i % 2 = 0 +QUALIFY i1 <> i - 2 +---- +physical_plan :.*STREAMING_WINDOW.* + +query TT +SELECT i, LAG(i, 1) OVER() AS i1 +FROM range(3000) tbl(i) +WHERE i % 2 = 0 +QUALIFY i1 <> i - 2 +---- diff --git a/test/sql/window/test_streaming_window.test b/test/sql/window/test_streaming_window.test index 9aa79fe122b..78458005868 100644 --- a/test/sql/window/test_streaming_window.test +++ b/test/sql/window/test_streaming_window.test @@ -103,6 +103,64 @@ select percent_rank() over (), i, j from integers 0.0 1 2 0.0 1 NULL +query TT +EXPLAIN +SELECT i, LAG(i, 1) OVER() AS i1 +FROM range(10) tbl(i); +---- +physical_plan :.*STREAMING_WINDOW.* + +query II +SELECT i, LAG(i, 1) OVER() AS i1 +FROM range(10) tbl(i); +---- +0 NULL +1 0 +2 1 +3 2 +4 3 +5 4 +6 5 +7 6 +8 7 +9 8 + +query TT +EXPLAIN +SELECT i, LAG(i, -1) OVER() AS i1 +FROM range(10) tbl(i); +---- +physical_plan :.*STREAMING_WINDOW.* + +query TT +EXPLAIN +SELECT i, LEAD(i, -1) OVER() AS i1 +FROM range(10) tbl(i); +---- +physical_plan :.*STREAMING_WINDOW.* + +query II +SELECT i, LEAD(i, -1) OVER() AS i1 +FROM range(10) tbl(i); +---- +0 NULL +1 0 +2 1 +3 2 +4 3 +5 4 +6 5 +7 6 +8 7 +9 8 + +query TT +EXPLAIN +SELECT i, LEAD(i, 1) OVER() AS i1 +FROM range(10) tbl(i); +---- +physical_plan :.*STREAMING_WINDOW.* + # Test running aggregates query TT EXPLAIN