Skip to content

Commit

Permalink
Support RowsStreamingWindowBuild in window operator
Browse files Browse the repository at this point in the history
  • Loading branch information
JkSelf committed Aug 23, 2024
1 parent b228e09 commit bf3925c
Show file tree
Hide file tree
Showing 26 changed files with 815 additions and 65 deletions.
1 change: 1 addition & 0 deletions velox/exec/AggregateWindow.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -410,6 +410,7 @@ void registerAggregateWindowFunction(const std::string& name) {
exec::registerWindowFunction(
name,
std::move(signatures),
{exec::WindowFunction::ProcessMode::kRows, true},
[name](
const std::vector<exec::WindowFunctionArg>& args,
const TypePtr& resultType,
Expand Down
3 changes: 2 additions & 1 deletion velox/exec/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -59,9 +59,11 @@ velox_add_library(
OutputBufferManager.cpp
PartitionedOutput.cpp
PartitionFunction.cpp
PartitionStreamingWindowBuild.cpp
PlanNodeStats.cpp
PrefixSort.cpp
ProbeOperatorState.cpp
RowsStreamingWindowBuild.cpp
RowContainer.cpp
RowNumber.cpp
SortBuffer.cpp
Expand All @@ -71,7 +73,6 @@ velox_add_library(
SpillFile.cpp
Spiller.cpp
StreamingAggregation.cpp
StreamingWindowBuild.cpp
Strings.cpp
TableScan.cpp
TableWriteMerge.cpp
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,24 +14,24 @@
* limitations under the License.
*/

#include "velox/exec/StreamingWindowBuild.h"
#include "velox/exec/PartitionStreamingWindowBuild.h"

namespace facebook::velox::exec {

StreamingWindowBuild::StreamingWindowBuild(
PartitionStreamingWindowBuild::PartitionStreamingWindowBuild(
const std::shared_ptr<const core::WindowNode>& windowNode,
velox::memory::MemoryPool* pool,
const common::SpillConfig* spillConfig,
tsan_atomic<bool>* nonReclaimableSection)
: WindowBuild(windowNode, pool, spillConfig, nonReclaimableSection) {}

void StreamingWindowBuild::buildNextPartition() {
void PartitionStreamingWindowBuild::buildNextPartition() {
partitionStartRows_.push_back(sortedRows_.size());
sortedRows_.insert(sortedRows_.end(), inputRows_.begin(), inputRows_.end());
inputRows_.clear();
}

void StreamingWindowBuild::addInput(RowVectorPtr input) {
void PartitionStreamingWindowBuild::addInput(RowVectorPtr input) {
for (auto i = 0; i < inputChannels_.size(); ++i) {
decodedInputVectors_[i].decode(*input->childAt(inputChannels_[i]));
}
Expand All @@ -53,14 +53,15 @@ void StreamingWindowBuild::addInput(RowVectorPtr input) {
}
}

void StreamingWindowBuild::noMoreInput() {
void PartitionStreamingWindowBuild::noMoreInput() {
buildNextPartition();

// Help for last partition related calculations.
partitionStartRows_.push_back(sortedRows_.size());
}

std::unique_ptr<WindowPartition> StreamingWindowBuild::nextPartition() {
std::shared_ptr<WindowPartition>
PartitionStreamingWindowBuild::nextPartition() {
VELOX_CHECK_GT(
partitionStartRows_.size(), 0, "No window partitions available")

Expand Down Expand Up @@ -91,11 +92,11 @@ std::unique_ptr<WindowPartition> StreamingWindowBuild::nextPartition() {
sortedRows_.data() + partitionStartRows_[currentPartition_],
partitionSize);

return std::make_unique<WindowPartition>(
return std::make_shared<WindowPartition>(
data_.get(), partition, inversedInputChannels_, sortKeyInfo_);
}

bool StreamingWindowBuild::hasNextPartition() {
bool PartitionStreamingWindowBuild::hasNextPartition() {
return partitionStartRows_.size() > 0 &&
currentPartition_ < int(partitionStartRows_.size() - 2);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,13 @@

namespace facebook::velox::exec {

/// The StreamingWindowBuild is used when the input data is already sorted by
/// {partition keys + order by keys}. The logic identifies partition changes
/// when receiving input rows and splits out WindowPartitions for the Window
/// operator to process.
class StreamingWindowBuild : public WindowBuild {
/// The PartitionStreamingWindowBuild is used when the input data is already
/// sorted by {partition keys + order by keys}. The logic identifies partition
/// changes when receiving input rows and splits out WindowPartitions for the
/// Window operator to process.
class PartitionStreamingWindowBuild : public WindowBuild {
public:
StreamingWindowBuild(
PartitionStreamingWindowBuild(
const std::shared_ptr<const core::WindowNode>& windowNode,
velox::memory::MemoryPool* pool,
const common::SpillConfig* spillConfig,
Expand All @@ -46,7 +46,7 @@ class StreamingWindowBuild : public WindowBuild {

bool hasNextPartition() override;

std::unique_ptr<WindowPartition> nextPartition() override;
std::shared_ptr<WindowPartition> nextPartition() override;

bool needsInput() override {
// No partitions are available or the currentPartition is the last available
Expand Down
93 changes: 93 additions & 0 deletions velox/exec/RowsStreamingWindowBuild.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
/*
* Copyright (c) Facebook, Inc. and its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include "velox/exec/RowsStreamingWindowBuild.h"
#include "velox/common/testutil/TestValue.h"

namespace facebook::velox::exec {

RowsStreamingWindowBuild::RowsStreamingWindowBuild(
const std::shared_ptr<const core::WindowNode>& windowNode,
velox::memory::MemoryPool* pool,
const common::SpillConfig* spillConfig,
tsan_atomic<bool>* nonReclaimableSection)
: WindowBuild(windowNode, pool, spillConfig, nonReclaimableSection) {
velox::common::testutil::TestValue::adjust(
"facebook::velox::exec::RowsStreamingWindowBuild::RowsStreamingWindowBuild",
this);
}

void RowsStreamingWindowBuild::addPartitionInputs(bool finished) {
if (inputRows_.empty()) {
return;
}

if (windowPartitions_.size() <= inputPartition_) {
windowPartitions_.push_back(std::make_shared<WindowPartition>(
data_.get(), inversedInputChannels_, sortKeyInfo_));
}

windowPartitions_[inputPartition_]->addRows(inputRows_);

if (finished) {
windowPartitions_[inputPartition_]->setComplete();
++inputPartition_;
}

inputRows_.clear();
}

void RowsStreamingWindowBuild::addInput(RowVectorPtr input) {
for (auto i = 0; i < inputChannels_.size(); ++i) {
decodedInputVectors_[i].decode(*input->childAt(inputChannels_[i]));
}

for (auto row = 0; row < input->size(); ++row) {
char* newRow = data_->newRow();

for (auto col = 0; col < input->childrenSize(); ++col) {
data_->store(decodedInputVectors_[col], row, newRow, col);
}

if (previousRow_ != nullptr &&
compareRowsWithKeys(previousRow_, newRow, partitionKeyInfo_)) {
addPartitionInputs(true);
}

if (previousRow_ != nullptr && inputRows_.size() >= numRowsPerOutput_) {
addPartitionInputs(false);
}

inputRows_.push_back(newRow);
previousRow_ = newRow;
}
}

void RowsStreamingWindowBuild::noMoreInput() {
addPartitionInputs(true);
}

std::shared_ptr<WindowPartition> RowsStreamingWindowBuild::nextPartition() {
VELOX_CHECK(hasNextPartition());
return windowPartitions_[++outputPartition_];
}

bool RowsStreamingWindowBuild::hasNextPartition() {
return !windowPartitions_.empty() &&
outputPartition_ + 2 <= windowPartitions_.size();
}

} // namespace facebook::velox::exec
82 changes: 82 additions & 0 deletions velox/exec/RowsStreamingWindowBuild.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
/*
* Copyright (c) Facebook, Inc. and its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#pragma once

#include "velox/exec/WindowBuild.h"

namespace facebook::velox::exec {

/// Unlike PartitionStreamingWindowBuild, RowsStreamingWindowBuild is capable of
/// processing window functions as rows arrive within a single partition,
/// without the need to wait for the entirewindow partition to be ready. This
/// approach can significantly reduce memory usage, especially when a single
/// partition contains a large amount of data. It is particularly suited for
/// optimizing rank, dense_rank and row_number functions, as well as aggregate
/// window functions with a default frame.
class RowsStreamingWindowBuild : public WindowBuild {
public:
RowsStreamingWindowBuild(
const std::shared_ptr<const core::WindowNode>& windowNode,
velox::memory::MemoryPool* pool,
const common::SpillConfig* spillConfig,
tsan_atomic<bool>* nonReclaimableSection);

void addInput(RowVectorPtr input) override;

void spill() override {
VELOX_UNREACHABLE();
}

std::optional<common::SpillStats> spilledStats() const override {
return std::nullopt;
}

void noMoreInput() override;

bool hasNextPartition() override;

std::shared_ptr<WindowPartition> nextPartition() override;

bool needsInput() override {
// No partitions are available or the currentPartition is the last available
// one, so can consume input rows.
return windowPartitions_.empty() ||
outputPartition_ == windowPartitions_.size() - 1;
}

private:
// Adds input rows to the current partition, or creates a new partition if it
// does not exist.
void addPartitionInputs(bool finished);

// Points to the input rows in the current partition.
std::vector<char*> inputRows_;

// Used to compare rows based on partitionKeys.
char* previousRow_ = nullptr;

// Point to the current output partition if not -1.
vector_size_t outputPartition_ = -1;

// Current input partition that receives inputs.
vector_size_t inputPartition_ = 0;

// Holds all the built window partitions.
std::vector<std::shared_ptr<WindowPartition>> windowPartitions_;
};

} // namespace facebook::velox::exec
6 changes: 3 additions & 3 deletions velox/exec/SortWindowBuild.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -294,11 +294,11 @@ void SortWindowBuild::loadNextPartitionFromSpill() {
}
}

std::unique_ptr<WindowPartition> SortWindowBuild::nextPartition() {
std::shared_ptr<WindowPartition> SortWindowBuild::nextPartition() {
if (merge_ != nullptr) {
VELOX_CHECK(!sortedRows_.empty(), "No window partitions available")
auto partition = folly::Range(sortedRows_.data(), sortedRows_.size());
return std::make_unique<WindowPartition>(
return std::make_shared<WindowPartition>(
data_.get(), partition, inversedInputChannels_, sortKeyInfo_);
}

Expand All @@ -316,7 +316,7 @@ std::unique_ptr<WindowPartition> SortWindowBuild::nextPartition() {
auto partition = folly::Range(
sortedRows_.data() + partitionStartRows_[currentPartition_],
partitionSize);
return std::make_unique<WindowPartition>(
return std::make_shared<WindowPartition>(
data_.get(), partition, inversedInputChannels_, sortKeyInfo_);
}

Expand Down
2 changes: 1 addition & 1 deletion velox/exec/SortWindowBuild.h
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ class SortWindowBuild : public WindowBuild {

bool hasNextPartition() override;

std::unique_ptr<WindowPartition> nextPartition() override;
std::shared_ptr<WindowPartition> nextPartition() override;

private:
void ensureInputFits(const RowVectorPtr& input);
Expand Down
Loading

0 comments on commit bf3925c

Please sign in to comment.