Skip to content

Commit

Permalink
Issue duckdb#7809: Parallel Partition Sorting
Browse files Browse the repository at this point in the history
Implement multiple sort tasks.
Fix jump reset code in WindowExecutor
to handle reverse-order blocks.
  • Loading branch information
Richard Wesley committed Aug 15, 2023
1 parent 18232fc commit fc67d77
Show file tree
Hide file tree
Showing 4 changed files with 24 additions and 11 deletions.
17 changes: 13 additions & 4 deletions src/common/sort/partition_state.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,9 @@ void PartitionLocalMergeState::Scan() {
LocalSortState local_sort;
local_sort.Initialize(global_sort, global_sort.buffer_manager);

while (group_data.Scan(chunk_state, payload_chunk)) {
TupleDataScanState local_scan;
group_data.InitializeScan(local_scan, merge_state->column_ids);
while (group_data.Scan(chunk_state, local_scan, payload_chunk)) {
sort_chunk.Reset();
executor.Execute(payload_chunk, sort_chunk);

Expand Down Expand Up @@ -343,7 +345,8 @@ void PartitionLocalSinkState::Combine() {
PartitionGlobalMergeState::PartitionGlobalMergeState(PartitionGlobalSinkState &sink, GroupDataPtr group_data_p,
hash_t hash_bin)
: sink(sink), group_data(std::move(group_data_p)), memory_per_thread(sink.memory_per_thread),
stage(PartitionSortStage::INIT), total_tasks(0), tasks_assigned(0), tasks_completed(0) {
num_threads(TaskScheduler::GetScheduler(sink.context).NumberOfThreads()), stage(PartitionSortStage::INIT),
total_tasks(0), tasks_assigned(0), tasks_completed(0) {

const auto group_idx = sink.hash_groups.size();
auto new_group = make_uniq<PartitionGlobalHashGroup>(sink.buffer_manager, sink.partitions, sink.orders,
Expand All @@ -355,7 +358,6 @@ PartitionGlobalMergeState::PartitionGlobalMergeState(PartitionGlobalSinkState &s

sink.bin_groups[hash_bin] = group_idx;

vector<column_t> column_ids;
column_ids.reserve(sink.payload_types.size());
for (column_t i = 0; i < sink.payload_types.size(); ++i) {
column_ids.emplace_back(i);
Expand All @@ -364,7 +366,6 @@ PartitionGlobalMergeState::PartitionGlobalMergeState(PartitionGlobalSinkState &s
}

void PartitionLocalMergeState::Prepare() {
Scan();
merge_state->group_data.reset();

auto &global_sort = *merge_state->global_sort;
Expand All @@ -379,6 +380,9 @@ void PartitionLocalMergeState::Merge() {

void PartitionLocalMergeState::ExecuteTask() {
switch (stage) {
case PartitionSortStage::SCAN:
Scan();
break;
case PartitionSortStage::PREPARE:
Prepare();
break;
Expand Down Expand Up @@ -425,6 +429,11 @@ bool PartitionGlobalMergeState::TryPrepareNextStage() {

switch (stage) {
case PartitionSortStage::INIT:
total_tasks = num_threads;
stage = PartitionSortStage::SCAN;
return true;

case PartitionSortStage::SCAN:
total_tasks = 1;
stage = PartitionSortStage::PREPARE;
return true;
Expand Down
11 changes: 6 additions & 5 deletions src/execution/window_executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -344,13 +344,14 @@ void WindowBoundariesState::Update(const idx_t row_idx, const WindowInputColumn

// when the partition changes, recompute the boundaries
if (!is_same_partition || is_jump) {
partition_start = row_idx;
peer_start = row_idx;

if (is_jump) {
// Go back as far as the previous partition start
idx_t n = 1;
partition_start = FindPrevStart(partition_mask, partition_start, row_idx + 1, n);
partition_start = FindPrevStart(partition_mask, 0, row_idx + 1, n);
n = 1;
peer_start = FindPrevStart(order_mask, 0, row_idx + 1, n);
} else {
partition_start = row_idx;
peer_start = row_idx;
}

// find end of partition
Expand Down
6 changes: 4 additions & 2 deletions src/include/duckdb/common/sort/partition_state.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ class PartitionLocalSinkState {
void Combine();
};

enum class PartitionSortStage : uint8_t { INIT, PREPARE, MERGE, SORTED };
enum class PartitionSortStage : uint8_t { INIT, SCAN, PREPARE, MERGE, SORTED };

class PartitionLocalMergeState;

Expand All @@ -147,9 +147,11 @@ class PartitionGlobalMergeState {
PartitionGlobalSinkState &sink;
GroupDataPtr group_data;
PartitionGlobalHashGroup *hash_group;
TupleDataScanState chunk_state;
vector<column_t> column_ids;
TupleDataParallelScanState chunk_state;
GlobalSortState *global_sort;
const idx_t memory_per_thread;
const idx_t num_threads;

private:
mutable mutex lock;
Expand Down
1 change: 1 addition & 0 deletions test/sql/window/test_ignore_nulls.test
Original file line number Diff line number Diff line change
Expand Up @@ -287,6 +287,7 @@ SELECT *,
ORDER BY id
) AS grade
FROM lvl
ORDER BY id
----
40000 values hashing to c302c8b0f3c10c1e5cc7211c4af7a8d6

Expand Down

0 comments on commit fc67d77

Please sign in to comment.