-
Notifications
You must be signed in to change notification settings - Fork 1.1k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Add RowsStreamingWindowBuild to avoid OOM in Window operator (#9025)
Summary: Unlike `StreamingWindowBuild`, `RowLevelStreamingWindowBuild ` in this PR is capable of processing window functions as rows arrive within a single partition, without the need to wait for the entire partition to be ready. This approach can significantly reduce memory usage, especially when a single partition contains a large amount of data. It is particularly suited for optimizing `rank `and `row_number `functions, as well as aggregate window functions with a default frame. The detailed discussions is [here](#8975). The design doc is [here](https://docs.google.com/document/d/17ONSJHK8XP5Lixm8XBl01RMNl4ntpixiVFe693ahw6k/edit?usp=sharing). Pull Request resolved: #9025 Test Plan: Run through 10hrs fuzzer testing Reviewed By: kagamiori Differential Revision: D61473798 Pulled By: xiaoxmeng fbshipit-source-id: 569a752770395330c48a3521bd5421eb89f5623d
- Loading branch information
1 parent
3d10ccf
commit d33cdb2
Showing
26 changed files
with
815 additions
and
65 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,93 @@ | ||
/* | ||
* Copyright (c) Facebook, Inc. and its affiliates. | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); | ||
* you may not use this file except in compliance with the License. | ||
* You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
#include "velox/exec/RowsStreamingWindowBuild.h" | ||
#include "velox/common/testutil/TestValue.h" | ||
|
||
namespace facebook::velox::exec { | ||
|
||
RowsStreamingWindowBuild::RowsStreamingWindowBuild( | ||
const std::shared_ptr<const core::WindowNode>& windowNode, | ||
velox::memory::MemoryPool* pool, | ||
const common::SpillConfig* spillConfig, | ||
tsan_atomic<bool>* nonReclaimableSection) | ||
: WindowBuild(windowNode, pool, spillConfig, nonReclaimableSection) { | ||
velox::common::testutil::TestValue::adjust( | ||
"facebook::velox::exec::RowsStreamingWindowBuild::RowsStreamingWindowBuild", | ||
this); | ||
} | ||
|
||
void RowsStreamingWindowBuild::addPartitionInputs(bool finished) { | ||
if (inputRows_.empty()) { | ||
return; | ||
} | ||
|
||
if (windowPartitions_.size() <= inputPartition_) { | ||
windowPartitions_.push_back(std::make_shared<WindowPartition>( | ||
data_.get(), inversedInputChannels_, sortKeyInfo_)); | ||
} | ||
|
||
windowPartitions_[inputPartition_]->addRows(inputRows_); | ||
|
||
if (finished) { | ||
windowPartitions_[inputPartition_]->setComplete(); | ||
++inputPartition_; | ||
} | ||
|
||
inputRows_.clear(); | ||
} | ||
|
||
void RowsStreamingWindowBuild::addInput(RowVectorPtr input) { | ||
for (auto i = 0; i < inputChannels_.size(); ++i) { | ||
decodedInputVectors_[i].decode(*input->childAt(inputChannels_[i])); | ||
} | ||
|
||
for (auto row = 0; row < input->size(); ++row) { | ||
char* newRow = data_->newRow(); | ||
|
||
for (auto col = 0; col < input->childrenSize(); ++col) { | ||
data_->store(decodedInputVectors_[col], row, newRow, col); | ||
} | ||
|
||
if (previousRow_ != nullptr && | ||
compareRowsWithKeys(previousRow_, newRow, partitionKeyInfo_)) { | ||
addPartitionInputs(true); | ||
} | ||
|
||
if (previousRow_ != nullptr && inputRows_.size() >= numRowsPerOutput_) { | ||
addPartitionInputs(false); | ||
} | ||
|
||
inputRows_.push_back(newRow); | ||
previousRow_ = newRow; | ||
} | ||
} | ||
|
||
void RowsStreamingWindowBuild::noMoreInput() { | ||
addPartitionInputs(true); | ||
} | ||
|
||
std::shared_ptr<WindowPartition> RowsStreamingWindowBuild::nextPartition() { | ||
VELOX_CHECK(hasNextPartition()); | ||
return windowPartitions_[++outputPartition_]; | ||
} | ||
|
||
bool RowsStreamingWindowBuild::hasNextPartition() { | ||
return !windowPartitions_.empty() && | ||
outputPartition_ + 2 <= windowPartitions_.size(); | ||
} | ||
|
||
} // namespace facebook::velox::exec |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,82 @@ | ||
/* | ||
* Copyright (c) Facebook, Inc. and its affiliates. | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); | ||
* you may not use this file except in compliance with the License. | ||
* You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
#pragma once | ||
|
||
#include "velox/exec/WindowBuild.h" | ||
|
||
namespace facebook::velox::exec { | ||
|
||
/// Unlike PartitionStreamingWindowBuild, RowsStreamingWindowBuild is capable of | ||
/// processing window functions as rows arrive within a single partition, | ||
/// without the need to wait for the entirewindow partition to be ready. This | ||
/// approach can significantly reduce memory usage, especially when a single | ||
/// partition contains a large amount of data. It is particularly suited for | ||
/// optimizing rank, dense_rank and row_number functions, as well as aggregate | ||
/// window functions with a default frame. | ||
class RowsStreamingWindowBuild : public WindowBuild { | ||
public: | ||
RowsStreamingWindowBuild( | ||
const std::shared_ptr<const core::WindowNode>& windowNode, | ||
velox::memory::MemoryPool* pool, | ||
const common::SpillConfig* spillConfig, | ||
tsan_atomic<bool>* nonReclaimableSection); | ||
|
||
void addInput(RowVectorPtr input) override; | ||
|
||
void spill() override { | ||
VELOX_UNREACHABLE(); | ||
} | ||
|
||
std::optional<common::SpillStats> spilledStats() const override { | ||
return std::nullopt; | ||
} | ||
|
||
void noMoreInput() override; | ||
|
||
bool hasNextPartition() override; | ||
|
||
std::shared_ptr<WindowPartition> nextPartition() override; | ||
|
||
bool needsInput() override { | ||
// No partitions are available or the currentPartition is the last available | ||
// one, so can consume input rows. | ||
return windowPartitions_.empty() || | ||
outputPartition_ == windowPartitions_.size() - 1; | ||
} | ||
|
||
private: | ||
// Adds input rows to the current partition, or creates a new partition if it | ||
// does not exist. | ||
void addPartitionInputs(bool finished); | ||
|
||
// Points to the input rows in the current partition. | ||
std::vector<char*> inputRows_; | ||
|
||
// Used to compare rows based on partitionKeys. | ||
char* previousRow_ = nullptr; | ||
|
||
// Point to the current output partition if not -1. | ||
vector_size_t outputPartition_ = -1; | ||
|
||
// Current input partition that receives inputs. | ||
vector_size_t inputPartition_ = 0; | ||
|
||
// Holds all the built window partitions. | ||
std::vector<std::shared_ptr<WindowPartition>> windowPartitions_; | ||
}; | ||
|
||
} // namespace facebook::velox::exec |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.