Skip to content

Commit

Permalink
Add RankLikeWindowBuild to optimize rank functions
Browse files Browse the repository at this point in the history
  • Loading branch information
JkSelf committed Mar 12, 2024
1 parent 02ca9b0 commit 28cda2f
Show file tree
Hide file tree
Showing 9 changed files with 292 additions and 42 deletions.
1 change: 1 addition & 0 deletions velox/exec/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ add_library(
OutputBufferManager.cpp
PlanNodeStats.cpp
ProbeOperatorState.cpp
RankLikeWindowBuild.cpp
RowContainer.cpp
RowNumber.cpp
SortBuffer.cpp
Expand Down
89 changes: 89 additions & 0 deletions velox/exec/RankLikeWindowBuild.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
/*
* Copyright (c) Facebook, Inc. and its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include "velox/exec/RankLikeWindowBuild.h"

namespace facebook::velox::exec {

RankLikeWindowBuild::RankLikeWindowBuild(
const std::shared_ptr<const core::WindowNode>& windowNode,
velox::memory::MemoryPool* pool,
const common::SpillConfig* spillConfig,
tsan_atomic<bool>* nonReclaimableSection)
: WindowBuild(windowNode, pool, spillConfig, nonReclaimableSection) {
partitionOffsets_.push_back(0);
}

void RankLikeWindowBuild::addInput(RowVectorPtr input) {
for (auto i = 0; i < inputChannels_.size(); ++i) {
decodedInputVectors_[i].decode(*input->childAt(inputChannels_[i]));
}

for (auto row = 0; row < input->size(); ++row) {
char* newRow = data_->newRow();

for (auto col = 0; col < input->childrenSize(); ++col) {
data_->store(decodedInputVectors_[col], row, newRow, col);
}

if (previousRow_ != nullptr &&
compareRowsWithKeys(previousRow_, newRow, partitionKeyInfo_)) {
sortedRows_.push_back(inputRows_);
partitionOffsets_.push_back(0);
inputRows_.clear();
}

inputRows_.push_back(newRow);
previousRow_ = newRow;
}
partitionOffsets_.push_back(inputRows_.size());
sortedRows_.push_back(inputRows_);
inputRows_.clear();
}

void RankLikeWindowBuild::noMoreInput() {
isFinished_ = true;
inputRows_.clear();
}

std::unique_ptr<WindowPartition> RankLikeWindowBuild::nextPartition() {
currentPartition_++;

if (currentPartition_ > 0) {
// Erase data_ and sortedRows;
data_->eraseRows(folly::Range<char**>(
sortedRows_[currentPartition_ - 1].data(),
sortedRows_[currentPartition_ - 1].size()));
sortedRows_[currentPartition_ - 1].clear();
}

auto partition = folly::Range(
sortedRows_[currentPartition_].data(),
sortedRows_[currentPartition_].size());

auto offset = 0;
for (auto i = currentPartition_; partitionOffsets_[i] != 0; i--) {
offset += partitionOffsets_[i];
}
return std::make_unique<WindowPartition>(
data_.get(), partition, inputColumns_, sortKeyInfo_, offset);
}

bool RankLikeWindowBuild::hasNextPartition() {
return sortedRows_.size() > 0 && currentPartition_ != sortedRows_.size() - 1;
}

} // namespace facebook::velox::exec
80 changes: 80 additions & 0 deletions velox/exec/RankLikeWindowBuild.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
/*
* Copyright (c) Facebook, Inc. and its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#pragma once

#include "velox/exec/WindowBuild.h"

namespace facebook::velox::exec {

/// If the data has already been sorted according to the partition key and sort
/// key, there is no need to hold all the data of a partition in memory for Rank
/// and row_number functions. RankWindowBuild adopts a streaming method to
/// construct WindowPartition, which can reduce the occurrence of Out Of Memory
/// (OOM).
class RankLikeWindowBuild : public WindowBuild {
public:
RankLikeWindowBuild(
const std::shared_ptr<const core::WindowNode>& windowNode,
velox::memory::MemoryPool* pool,
const common::SpillConfig* spillConfig,
tsan_atomic<bool>* nonReclaimableSection);

void addInput(RowVectorPtr input) override;

void spill() override {
VELOX_UNREACHABLE();
}

std::optional<common::SpillStats> spilledStats() const override {
return std::nullopt;
}

void noMoreInput() override;

bool hasNextPartition() override;

std::unique_ptr<WindowPartition> nextPartition() override;

bool needsInput() override {
return !isFinished_;
}

private:
// Vector of pointers to each input row in the data_ RowContainer.
// Rows are erased from data_ when they are output from the
// Window operator.
std::vector<std::vector<char*>> sortedRows_;

// Holds input rows within the current partition.
std::vector<char*> inputRows_;

// Indices of the start row (in sortedRows_) of each partition in
// the RowContainer data_. This auxiliary structure helps demarcate
// partitions.
std::vector<vector_size_t> partitionOffsets_;

// Used to compare rows based on partitionKeys.
char* previousRow_ = nullptr;

// Current partition being output. Used to construct WindowPartitions
// during resetPartition.
vector_size_t currentPartition_ = -1;

bool isFinished_ = false;
};

} // namespace facebook::velox::exec
100 changes: 64 additions & 36 deletions velox/exec/Window.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,48 +15,13 @@
*/
#include "velox/exec/Window.h"
#include "velox/exec/OperatorUtils.h"
#include "velox/exec/RankLikeWindowBuild.h"
#include "velox/exec/SortWindowBuild.h"
#include "velox/exec/StreamingWindowBuild.h"
#include "velox/exec/Task.h"

namespace facebook::velox::exec {

Window::Window(
int32_t operatorId,
DriverCtx* driverCtx,
const std::shared_ptr<const core::WindowNode>& windowNode)
: Operator(
driverCtx,
windowNode->outputType(),
operatorId,
windowNode->id(),
"Window",
windowNode->canSpill(driverCtx->queryConfig())
? driverCtx->makeSpillConfig(operatorId)
: std::nullopt),
numInputColumns_(windowNode->inputType()->size()),
windowNode_(windowNode),
currentPartition_(nullptr),
stringAllocator_(pool()) {
auto* spillConfig =
spillConfig_.has_value() ? &spillConfig_.value() : nullptr;
if (windowNode->inputsSorted()) {
windowBuild_ = std::make_unique<StreamingWindowBuild>(
windowNode, pool(), spillConfig, &nonReclaimableSection_);
} else {
windowBuild_ = std::make_unique<SortWindowBuild>(
windowNode, pool(), spillConfig, &nonReclaimableSection_);
}
}

void Window::initialize() {
Operator::initialize();
VELOX_CHECK_NOT_NULL(windowNode_);
createWindowFunctions();
createPeerAndFrameBuffers();
windowNode_.reset();
}

namespace {
void checkRowFrameBounds(const core::WindowNode::Frame& frame) {
auto frameBoundCheck = [&](const core::TypedExprPtr& frameValue) -> void {
Expand Down Expand Up @@ -106,8 +71,71 @@ void checkKRangeFrameBounds(
frameBoundCheck(frame.endValue);
}

// The RankLikeWindowBuild is designed to support 'rank', 'dense_rank', and
// 'row_number' functions with a default frame.
bool checkRankLikeWindowBuild(
const std::shared_ptr<const core::WindowNode>& windowNode) {
for (const auto& windowNodeFunction : windowNode->windowFunctions()) {
const auto& functionName = windowNodeFunction.functionCall->name();
const auto& frame = windowNodeFunction.frame;

bool isRankLikeFunction =
(functionName == "rank" || functionName == "dense_rank" ||
functionName == "row_number");
bool isDefaultFrame =
(frame.startType == core::WindowNode::BoundType::kUnboundedPreceding &&
frame.endType == core::WindowNode::BoundType::kCurrentRow);

if (!isRankLikeFunction || !isDefaultFrame) {
return false;
}
}
return true;
}

} // namespace

Window::Window(
int32_t operatorId,
DriverCtx* driverCtx,
const std::shared_ptr<const core::WindowNode>& windowNode)
: Operator(
driverCtx,
windowNode->outputType(),
operatorId,
windowNode->id(),
"Window",
windowNode->canSpill(driverCtx->queryConfig())
? driverCtx->makeSpillConfig(operatorId)
: std::nullopt),
numInputColumns_(windowNode->inputType()->size()),
windowNode_(windowNode),
currentPartition_(nullptr),
stringAllocator_(pool()) {
auto* spillConfig =
spillConfig_.has_value() ? &spillConfig_.value() : nullptr;
if (windowNode->inputsSorted()) {
if (checkRankLikeWindowBuild(windowNode)) {
windowBuild_ = std::make_unique<RankLikeWindowBuild>(
windowNode, pool(), spillConfig, &nonReclaimableSection_);
} else {
windowBuild_ = std::make_unique<StreamingWindowBuild>(
windowNode, pool(), spillConfig, &nonReclaimableSection_);
}
} else {
windowBuild_ = std::make_unique<SortWindowBuild>(
windowNode, pool(), spillConfig, &nonReclaimableSection_);
}
}

void Window::initialize() {
Operator::initialize();
VELOX_CHECK_NOT_NULL(windowNode_);
createWindowFunctions();
createPeerAndFrameBuffers();
windowNode_.reset();
}

Window::WindowFrame Window::createWindowFrame(
const std::shared_ptr<const core::WindowNode>& windowNode,
const core::WindowNode::Frame& frame,
Expand Down
6 changes: 4 additions & 2 deletions velox/exec/WindowPartition.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,13 @@ WindowPartition::WindowPartition(
RowContainer* data,
const folly::Range<char**>& rows,
const std::vector<exec::RowColumn>& columns,
const std::vector<std::pair<column_index_t, core::SortOrder>>& sortKeyInfo)
const std::vector<std::pair<column_index_t, core::SortOrder>>& sortKeyInfo,
vector_size_t offsetInPartition)
: data_(data),
partition_(rows),
columns_(columns),
sortKeyInfo_(sortKeyInfo) {}
sortKeyInfo_(sortKeyInfo),
offsetInPartition_(offsetInPartition) {}

void WindowPartition::extractColumn(
int32_t columnIndex,
Expand Down
13 changes: 12 additions & 1 deletion velox/exec/WindowPartition.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,18 +33,27 @@ class WindowPartition {
/// 'columns' : Input rows of 'data' used for accessing column data from it.
/// 'sortKeyInfo' : Order by columns used by the the Window operator. Used to
/// get peer rows from the input partition.
/// 'offsetInPartition' : In RankLikeWindowBuild, record the current offset of
/// the partial WindowPartition within the entire Partition.
WindowPartition(
RowContainer* data,
const folly::Range<char**>& rows,
const std::vector<exec::RowColumn>& columns,
const std::vector<std::pair<column_index_t, core::SortOrder>>&
sortKeyInfo);
sortKeyInfo,
vector_size_t offsetInPartition = 0);

/// Returns the number of rows in the current WindowPartition.
vector_size_t numRows() const {
return partition_.size();
}

/// Returns the current offset of the partial WindowPartition within the
/// entire Partition.
vector_size_t offsetInPartition() const {
return offsetInPartition_;
}

/// Copies the values at 'columnIndex' into 'result' (starting at
/// 'resultOffset') for the rows at positions in the 'rowNumbers'
/// array from the partition input data.
Expand Down Expand Up @@ -181,5 +190,7 @@ class WindowPartition {

// ORDER BY column info for this partition.
const std::vector<std::pair<column_index_t, core::SortOrder>> sortKeyInfo_;

vector_size_t offsetInPartition_ = 0;
};
} // namespace facebook::velox::exec
39 changes: 39 additions & 0 deletions velox/exec/tests/WindowTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,45 @@ TEST_F(WindowTest, spill) {
ASSERT_GT(stats.spilledPartitions, 0);
}

TEST_F(WindowTest, rankLikeOptimization) {
const vector_size_t size = 1'000;
auto data = makeRowVector(
{"d", "p", "s"},
{
// Payload.
makeFlatVector<int64_t>(size, [](auto row) { return row; }),
// Partition key.
makeFlatVector<int16_t>(size, [](auto row) { return row % 11; }),
// Sorting key.
makeFlatVector<int32_t>(size, [](auto row) { return row; }),
});

createDuckDbTable({data});

const std::vector<std::string> kClauses = {
"rank() over (partition by p order by s)",
"dense_rank() over (partition by p order by s)",
"row_number() over (partition by p order by s)",
};
core::PlanNodeId windowId;
auto plan = PlanBuilder()
.values({split(data, 10)})
.orderBy({"p", "s"}, false)
.streamingWindow(kClauses)
.capturePlanNodeId(windowId)
.planNode();

auto spillDirectory = TempDirectoryPath::create();
auto task =
AssertQueryBuilder(plan, duckDbQueryRunner_)
.config(core::QueryConfig::kPreferredOutputBatchBytes, "1024")
.config(core::QueryConfig::kSpillEnabled, "true")
.config(core::QueryConfig::kWindowSpillEnabled, "true")
.spillDirectory(spillDirectory->path)
.assertResults(
"SELECT *, rank() over (partition by p order by s), dense_rank() over (partition by p order by s), row_number() over (partition by p order by s) FROM tmp");
}

TEST_F(WindowTest, missingFunctionSignature) {
auto input = {makeRowVector({
makeFlatVector<int64_t>({1, 2, 3}),
Expand Down
Loading

0 comments on commit 28cda2f

Please sign in to comment.