Skip to content

Commit

Permalink
Internal duckdb#3273: Shared Window Expressions
Browse files Browse the repository at this point in the history
PR Feedback:
* Handle different IGNORE NULLS settings for the same column
* Handle volatile expressions
  • Loading branch information
hawkfish committed Oct 21, 2024
1 parent 414596a commit c897456
Show file tree
Hide file tree
Showing 5 changed files with 94 additions and 32 deletions.
4 changes: 2 additions & 2 deletions src/execution/operator/aggregate/physical_window.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -574,9 +574,9 @@ WindowHashGroup::WindowHashGroup(WindowGlobalSinkState &gstate, const idx_t hash
}

// Set up the collection for any fully materialised data
const auto &shared = gstate.shared;
const auto &shared = WindowSharedExpressions::GetSortedExpressions(gstate.shared.coll_shared);
vector<LogicalType> types;
for (auto &expr : shared.coll_exprs) {
for (auto &expr : shared) {
types.emplace_back(expr->return_type);
}
auto &buffer_manager = BufferManager::GetBufferManager(gstate.context);
Expand Down
45 changes: 36 additions & 9 deletions src/execution/window_executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -912,24 +912,50 @@ void ExclusionFilter::ResetMask(idx_t row_idx, idx_t offset) {
}
}

column_t WindowSharedExpressions::RegisterExpr(const unique_ptr<Expression> &expr, Expressions &shared) {
column_t WindowSharedExpressions::RegisterExpr(const unique_ptr<Expression> &expr, Shared &shared) {
auto pexpr = expr.get();
if (!pexpr) {
return DConstants::INVALID_INDEX;
}

column_t result = shared.size();
for (column_t i = 0; i < shared.size(); ++i) {
if (pexpr->Equals(*shared[i])) {
return i;
}
// We need to make separate columns for volatile arguments
const auto is_volatile = expr->IsVolatile();
auto i = shared.columns.find(*pexpr);
if (i != shared.columns.end() && !is_volatile) {
return i->second.front();
}

shared.emplace_back(pexpr);
// New column, find maximum column number
column_t result = shared.size++;
shared.columns[*pexpr].emplace_back(result);

return result;
}

vector<const Expression *> WindowSharedExpressions::GetSortedExpressions(Shared &shared) {
vector<const Expression *> sorted(shared.size, nullptr);
for (auto &col : shared.columns) {
auto &expr = col.first.get();
for (auto col_idx : col.second) {
sorted[col_idx] = &expr;
}
}

return sorted;
}
void WindowSharedExpressions::PrepareExecutors(Shared &shared, ExpressionExecutor &exec, DataChunk &chunk) {
const auto sorted = GetSortedExpressions(shared);
vector<LogicalType> types;
for (auto expr : sorted) {
exec.AddExpression(*expr);
types.emplace_back(expr->return_type);
}

if (!types.empty()) {
chunk.Initialize(exec.GetAllocator(), types);
}
}

//===--------------------------------------------------------------------===//
// WindowExecutor
//===--------------------------------------------------------------------===//
Expand Down Expand Up @@ -1488,11 +1514,12 @@ class WindowValueGlobalState : public WindowExecutorGlobalState {
using WindowCollectionPtr = unique_ptr<WindowCollection>;
WindowValueGlobalState(const WindowValueExecutor &executor, const idx_t payload_count,
const ValidityMask &partition_mask, const ValidityMask &order_mask)
: WindowExecutorGlobalState(executor, payload_count, partition_mask, order_mask),
: WindowExecutorGlobalState(executor, payload_count, partition_mask, order_mask), ignore_nulls(&all_valid),
child_idx(executor.child_idx) {
}

// IGNORE NULLS
ValidityMask all_valid;
optional_ptr<ValidityMask> ignore_nulls;

const column_t child_idx;
Expand Down Expand Up @@ -1577,7 +1604,7 @@ unique_ptr<WindowExecutorGlobalState> WindowValueExecutor::GetGlobalState(const

void WindowValueExecutor::Finalize(WindowExecutorGlobalState &gstate, WindowExecutorLocalState &lstate,
CollectionPtr collection) const {
if (child_idx != DConstants::INVALID_INDEX) {
if (child_idx != DConstants::INVALID_INDEX && wexpr.ignore_nulls) {
auto &gvstate = gstate.Cast<WindowValueGlobalState>();
gvstate.ignore_nulls = &collection->validities[child_idx];
}
Expand Down
39 changes: 18 additions & 21 deletions src/include/duckdb/execution/window_executor.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -149,61 +149,58 @@ enum WindowBounds : uint8_t { PARTITION_BEGIN, PARTITION_END, PEER_BEGIN, PEER_E

//! A shared set of expressions
struct WindowSharedExpressions {
using Expressions = vector<const Expression *>;
struct Shared {
column_t size = 0;
expression_map_t<vector<column_t>> columns;
};

//! Register a shared expression in a shared set
static column_t RegisterExpr(const unique_ptr<Expression> &expr, Expressions &shared);
static column_t RegisterExpr(const unique_ptr<Expression> &expr, Shared &shared);

//! Register a shared collection expression
column_t RegisterCollection(const unique_ptr<Expression> &expr, bool build_validity) {
auto result = RegisterExpr(expr, coll_exprs);
auto result = RegisterExpr(expr, coll_shared);
if (build_validity) {
coll_validity.insert(result);
}
return result;
}
//! Register a shared collection expression
inline column_t RegisterSink(const unique_ptr<Expression> &expr) {
return RegisterExpr(expr, sink_exprs);
return RegisterExpr(expr, sink_shared);
}
//! Register a shared evaluation expression
inline column_t RegisterEvaluate(const unique_ptr<Expression> &expr) {
return RegisterExpr(expr, eval_exprs);
return RegisterExpr(expr, eval_shared);
}

//! Expression layout
static vector<const Expression *> GetSortedExpressions(Shared &shared);

//! Expression execution utility
static void PrepareExecutors(Expressions &exprs, ExpressionExecutor &exec, DataChunk &chunk) {
vector<LogicalType> types;
for (auto &expr : exprs) {
exec.AddExpression(*expr);
types.emplace_back(expr->return_type);
}
if (!types.empty()) {
chunk.Initialize(exec.GetAllocator(), types);
}
}
static void PrepareExecutors(Shared &shared, ExpressionExecutor &exec, DataChunk &chunk);

//! Prepare collection expressions
inline void PrepareCollection(ExpressionExecutor &exec, DataChunk &chunk) {
PrepareExecutors(coll_exprs, exec, chunk);
PrepareExecutors(coll_shared, exec, chunk);
}

//! Prepare collection expressions
inline void PrepareSink(ExpressionExecutor &exec, DataChunk &chunk) {
PrepareExecutors(sink_exprs, exec, chunk);
PrepareExecutors(sink_shared, exec, chunk);
}

//! Prepare collection expressions
inline void PrepareEvaluate(ExpressionExecutor &exec, DataChunk &chunk) {
PrepareExecutors(eval_exprs, exec, chunk);
PrepareExecutors(eval_shared, exec, chunk);
}

//! Fully materialised shared expressions
Expressions coll_exprs;
Shared coll_shared;
//! Sink shared expressions
Expressions sink_exprs;
Shared sink_shared;
//! Evaluate shared expressions
Expressions eval_exprs;
Shared eval_shared;
//! Requested collection validity masks
unordered_set<column_t> coll_validity;
};
Expand Down
22 changes: 22 additions & 0 deletions test/sql/window/test_ignore_nulls.test
Original file line number Diff line number Diff line change
Expand Up @@ -303,6 +303,28 @@ ORDER BY id
----
40000 values hashing to c302c8b0f3c10c1e5cc7211c4af7a8d6

# Independent INGORE/RESPECT NULLS with shared input.
query III
SELECT
v,
lead(v) OVER (ORDER BY id),
lead(v IGNORE NULLS) OVER (ORDER BY id)
FROM (VALUES
(1, 1),
(2, NULL),
(3, 2),
(4, NULL),
(5, 3),
(6, NULL)
) tbl(id, v);
----
1 NULL 2
NULL 2 2
2 NULL 3
NULL 3 3
3 NULL NULL
NULL NULL NULL

#
# Unsupported
#
Expand Down
16 changes: 16 additions & 0 deletions test/sql/window/test_volatile_independence.test
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
# name: test/sql/window/test_volatile_independence.test
# description: Window Sharing should distiguish identical volatile expressions
# group: [window]

statement ok
SELECT SETSEED(0.8675309);

query II
SELECT
list(random()) OVER (ORDER BY id),
max(random()) OVER (ORDER BY id)
FROM range(3) t(id);
----
[0.23450047366512405] 0.772592377754351
[0.23450047366512405, 0.4512114749041355] 0.772592377754351
[0.23450047366512405, 0.4512114749041355, 0.5199990716061366] 0.9123614504583814

0 comments on commit c897456

Please sign in to comment.