Skip to content

Commit

Permalink
Feature duckdb#1272: Segment Tree Finalize
Browse files Browse the repository at this point in the history
* Just because an aggregation was started doesn't mean it is complete
* Add a sleep in the tight wait loop to be a bit friendlier
  • Loading branch information
hawkfish committed Jul 16, 2024
1 parent ab38c51 commit 5f7c30c
Showing 1 changed file with 24 additions and 16 deletions.
40 changes: 24 additions & 16 deletions src/execution/window_segment_tree.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
#include "duckdb/execution/window_executor.hpp"

#include <numeric>
#include <thread>
#include <utility>

namespace duckdb {
Expand Down Expand Up @@ -822,6 +823,8 @@ void WindowNaiveAggregator::Evaluate(const WindowAggregatorState &gsink, WindowA
//===--------------------------------------------------------------------===//
class WindowSegmentTreeGlobalState : public WindowAggregatorGlobalState {
public:
using AtomicCounters = vector<std::atomic<idx_t>>;

WindowSegmentTreeGlobalState(const WindowSegmentTree &aggregator, idx_t group_count);

//! The owning aggregator
Expand All @@ -832,8 +835,10 @@ class WindowSegmentTreeGlobalState : public WindowAggregatorGlobalState {
vector<idx_t> levels_flat_start;
//! The level being built (read)
std::atomic<idx_t> build_level;
//! The number of entries buit so far at this level
vector<idx_t> build_complete;
//! The number of entries started so far at each level
unique_ptr<AtomicCounters> build_started;
//! The number of entries completed so far at each level
unique_ptr<AtomicCounters> build_completed;

// TREE_FANOUT needs to cleanly divide STANDARD_VECTOR_SIZE
static constexpr idx_t TREE_FANOUT = 16;
Expand Down Expand Up @@ -931,12 +936,6 @@ void WindowSegmentTree::Finalize(WindowAggregatorState &gsink, WindowAggregatorS
auto &gasink = gsink.Cast<WindowSegmentTreeGlobalState>();
auto &inputs = gasink.inputs;

// Single threaded Finalize for now
lock_guard<mutex> gestate_guard(gasink.lock);
if (gasink.finalized) {
return;
}

WindowAggregator::Finalize(gsink, lstate, stats);

if (inputs.ColumnCount() > 0) {
Expand Down Expand Up @@ -1102,7 +1101,16 @@ WindowSegmentTreeGlobalState::WindowSegmentTreeGlobalState(const WindowSegmentTr

// Start by building from the bottom level
build_level = 0;
build_complete.resize(levels_flat_start.size(), 0);

build_started = make_uniq<AtomicCounters>(levels_flat_start.size());
for (auto &counter : *build_started) {
counter = 0;
}

build_completed = make_uniq<AtomicCounters>(levels_flat_start.size());
for (auto &counter : *build_completed) {
counter = 0;
}
}

void WindowSegmentTreeState::Finalize(WindowSegmentTreeGlobalState &gstate) {
Expand Down Expand Up @@ -1131,27 +1139,27 @@ void WindowSegmentTreeState::Finalize(WindowSegmentTreeGlobalState &gstate) {
const idx_t build_count = (level_size + gstate.TREE_FANOUT - 1) / gstate.TREE_FANOUT;

// Build the next fan-in
auto build_complete = new (gstate.build_complete.data() + level_current) std::atomic<idx_t>;
const idx_t complete = (*build_complete)++;
if (complete >= build_count) {
const idx_t build_idx = (*gstate.build_started).at(level_current)++;
if (build_idx >= build_count) {
// Nothing left at this level, so wait until other threads are done.
// Since we are only building TREE_FANOUT values at a time, this will be quick.
while (level_current == gstate.build_level.load()) {
continue;
std::this_thread::sleep_for(std::chrono::milliseconds(1));
}
continue;
}

// compute the aggregate for this entry in the segment tree
const idx_t pos = complete * gstate.TREE_FANOUT;
const idx_t levels_flat_offset = levels_flat_start[level_current] + complete;
const idx_t pos = build_idx * gstate.TREE_FANOUT;
const idx_t levels_flat_offset = levels_flat_start[level_current] + build_idx;
auto state_ptr = levels_flat_native.GetStatePtr(levels_flat_offset);
gtstate.WindowSegmentValue(gstate, level_current, pos, MinValue(level_size, pos + gstate.TREE_FANOUT),
state_ptr);
gtstate.FlushStates(level_current > 0);

// If that was the last one, mark the level as complete.
if (complete + 1 == build_count) {
const idx_t build_complete = ++(*gstate.build_completed).at(level_current);
if (build_complete == build_count) {
gstate.build_level++;
continue;
}
Expand Down

0 comments on commit 5f7c30c

Please sign in to comment.