From d33cdb2512052b1bc131908be0a7d26ad240d89d Mon Sep 17 00:00:00 2001 From: Jia Ke Date: Fri, 23 Aug 2024 22:40:39 -0700 Subject: [PATCH] Add RowsStreamingWindowBuild to avoid OOM in Window operator (#9025) Summary: Unlike `StreamingWindowBuild`, `RowLevelStreamingWindowBuild ` in this PR is capable of processing window functions as rows arrive within a single partition, without the need to wait for the entire partition to be ready. This approach can significantly reduce memory usage, especially when a single partition contains a large amount of data. It is particularly suited for optimizing `rank `and `row_number `functions, as well as aggregate window functions with a default frame. The detailed discussions is [here](https://github.com/facebookincubator/velox/discussions/8975). The design doc is [here](https://docs.google.com/document/d/17ONSJHK8XP5Lixm8XBl01RMNl4ntpixiVFe693ahw6k/edit?usp=sharing). Pull Request resolved: https://github.com/facebookincubator/velox/pull/9025 Test Plan: Run through 10hrs fuzzer testing Reviewed By: kagamiori Differential Revision: D61473798 Pulled By: xiaoxmeng fbshipit-source-id: 569a752770395330c48a3521bd5421eb89f5623d --- velox/exec/AggregateWindow.cpp | 1 + velox/exec/CMakeLists.txt | 3 +- ....cpp => PartitionStreamingWindowBuild.cpp} | 17 +- ...uild.h => PartitionStreamingWindowBuild.h} | 14 +- velox/exec/RowsStreamingWindowBuild.cpp | 93 +++++++ velox/exec/RowsStreamingWindowBuild.h | 82 ++++++ velox/exec/SortWindowBuild.cpp | 6 +- velox/exec/SortWindowBuild.h | 2 +- velox/exec/Window.cpp | 74 ++++- velox/exec/Window.h | 7 +- velox/exec/WindowBuild.h | 9 +- velox/exec/WindowFunction.cpp | 13 +- velox/exec/WindowFunction.h | 27 ++ velox/exec/WindowPartition.cpp | 135 +++++++-- velox/exec/WindowPartition.h | 90 +++++- velox/exec/tests/PlanBuilderTest.cpp | 6 +- velox/exec/tests/PlanNodeToStringTest.cpp | 6 +- .../exec/tests/WindowFunctionRegistryTest.cpp | 6 +- velox/exec/tests/WindowTest.cpp | 258 ++++++++++++++++++ velox/functions/lib/window/NthValue.cpp | 1 + velox/functions/lib/window/Ntile.cpp | 1 + velox/functions/lib/window/Rank.cpp | 24 +- velox/functions/lib/window/RowNumber.cpp | 1 + velox/functions/prestosql/window/CumeDist.cpp | 1 + .../prestosql/window/FirstLastValue.cpp | 1 + velox/functions/prestosql/window/LeadLag.cpp | 2 + 26 files changed, 815 insertions(+), 65 deletions(-) rename velox/exec/{StreamingWindowBuild.cpp => PartitionStreamingWindowBuild.cpp} (86%) rename velox/exec/{StreamingWindowBuild.h => PartitionStreamingWindowBuild.h} (84%) create mode 100644 velox/exec/RowsStreamingWindowBuild.cpp create mode 100644 velox/exec/RowsStreamingWindowBuild.h diff --git a/velox/exec/AggregateWindow.cpp b/velox/exec/AggregateWindow.cpp index cb32bd0779c3..2bdad5342c6e 100644 --- a/velox/exec/AggregateWindow.cpp +++ b/velox/exec/AggregateWindow.cpp @@ -410,6 +410,7 @@ void registerAggregateWindowFunction(const std::string& name) { exec::registerWindowFunction( name, std::move(signatures), + {exec::WindowFunction::ProcessMode::kRows, true}, [name]( const std::vector& args, const TypePtr& resultType, diff --git a/velox/exec/CMakeLists.txt b/velox/exec/CMakeLists.txt index c68a9af02ea5..48b63d0872df 100644 --- a/velox/exec/CMakeLists.txt +++ b/velox/exec/CMakeLists.txt @@ -59,9 +59,11 @@ velox_add_library( OutputBufferManager.cpp PartitionedOutput.cpp PartitionFunction.cpp + PartitionStreamingWindowBuild.cpp PlanNodeStats.cpp PrefixSort.cpp ProbeOperatorState.cpp + RowsStreamingWindowBuild.cpp RowContainer.cpp RowNumber.cpp SortBuffer.cpp @@ -71,7 +73,6 @@ velox_add_library( SpillFile.cpp Spiller.cpp StreamingAggregation.cpp - StreamingWindowBuild.cpp Strings.cpp TableScan.cpp TableWriteMerge.cpp diff --git a/velox/exec/StreamingWindowBuild.cpp b/velox/exec/PartitionStreamingWindowBuild.cpp similarity index 86% rename from velox/exec/StreamingWindowBuild.cpp rename to velox/exec/PartitionStreamingWindowBuild.cpp index 2d855867ebd0..f8deecf99194 100644 --- a/velox/exec/StreamingWindowBuild.cpp +++ b/velox/exec/PartitionStreamingWindowBuild.cpp @@ -14,24 +14,24 @@ * limitations under the License. */ -#include "velox/exec/StreamingWindowBuild.h" +#include "velox/exec/PartitionStreamingWindowBuild.h" namespace facebook::velox::exec { -StreamingWindowBuild::StreamingWindowBuild( +PartitionStreamingWindowBuild::PartitionStreamingWindowBuild( const std::shared_ptr& windowNode, velox::memory::MemoryPool* pool, const common::SpillConfig* spillConfig, tsan_atomic* nonReclaimableSection) : WindowBuild(windowNode, pool, spillConfig, nonReclaimableSection) {} -void StreamingWindowBuild::buildNextPartition() { +void PartitionStreamingWindowBuild::buildNextPartition() { partitionStartRows_.push_back(sortedRows_.size()); sortedRows_.insert(sortedRows_.end(), inputRows_.begin(), inputRows_.end()); inputRows_.clear(); } -void StreamingWindowBuild::addInput(RowVectorPtr input) { +void PartitionStreamingWindowBuild::addInput(RowVectorPtr input) { for (auto i = 0; i < inputChannels_.size(); ++i) { decodedInputVectors_[i].decode(*input->childAt(inputChannels_[i])); } @@ -53,14 +53,15 @@ void StreamingWindowBuild::addInput(RowVectorPtr input) { } } -void StreamingWindowBuild::noMoreInput() { +void PartitionStreamingWindowBuild::noMoreInput() { buildNextPartition(); // Help for last partition related calculations. partitionStartRows_.push_back(sortedRows_.size()); } -std::unique_ptr StreamingWindowBuild::nextPartition() { +std::shared_ptr +PartitionStreamingWindowBuild::nextPartition() { VELOX_CHECK_GT( partitionStartRows_.size(), 0, "No window partitions available") @@ -91,11 +92,11 @@ std::unique_ptr StreamingWindowBuild::nextPartition() { sortedRows_.data() + partitionStartRows_[currentPartition_], partitionSize); - return std::make_unique( + return std::make_shared( data_.get(), partition, inversedInputChannels_, sortKeyInfo_); } -bool StreamingWindowBuild::hasNextPartition() { +bool PartitionStreamingWindowBuild::hasNextPartition() { return partitionStartRows_.size() > 0 && currentPartition_ < int(partitionStartRows_.size() - 2); } diff --git a/velox/exec/StreamingWindowBuild.h b/velox/exec/PartitionStreamingWindowBuild.h similarity index 84% rename from velox/exec/StreamingWindowBuild.h rename to velox/exec/PartitionStreamingWindowBuild.h index a9c2e2abf473..bb5cb352d24f 100644 --- a/velox/exec/StreamingWindowBuild.h +++ b/velox/exec/PartitionStreamingWindowBuild.h @@ -20,13 +20,13 @@ namespace facebook::velox::exec { -/// The StreamingWindowBuild is used when the input data is already sorted by -/// {partition keys + order by keys}. The logic identifies partition changes -/// when receiving input rows and splits out WindowPartitions for the Window -/// operator to process. -class StreamingWindowBuild : public WindowBuild { +/// The PartitionStreamingWindowBuild is used when the input data is already +/// sorted by {partition keys + order by keys}. The logic identifies partition +/// changes when receiving input rows and splits out WindowPartitions for the +/// Window operator to process. +class PartitionStreamingWindowBuild : public WindowBuild { public: - StreamingWindowBuild( + PartitionStreamingWindowBuild( const std::shared_ptr& windowNode, velox::memory::MemoryPool* pool, const common::SpillConfig* spillConfig, @@ -46,7 +46,7 @@ class StreamingWindowBuild : public WindowBuild { bool hasNextPartition() override; - std::unique_ptr nextPartition() override; + std::shared_ptr nextPartition() override; bool needsInput() override { // No partitions are available or the currentPartition is the last available diff --git a/velox/exec/RowsStreamingWindowBuild.cpp b/velox/exec/RowsStreamingWindowBuild.cpp new file mode 100644 index 000000000000..81d4a4f8d00a --- /dev/null +++ b/velox/exec/RowsStreamingWindowBuild.cpp @@ -0,0 +1,93 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "velox/exec/RowsStreamingWindowBuild.h" +#include "velox/common/testutil/TestValue.h" + +namespace facebook::velox::exec { + +RowsStreamingWindowBuild::RowsStreamingWindowBuild( + const std::shared_ptr& windowNode, + velox::memory::MemoryPool* pool, + const common::SpillConfig* spillConfig, + tsan_atomic* nonReclaimableSection) + : WindowBuild(windowNode, pool, spillConfig, nonReclaimableSection) { + velox::common::testutil::TestValue::adjust( + "facebook::velox::exec::RowsStreamingWindowBuild::RowsStreamingWindowBuild", + this); +} + +void RowsStreamingWindowBuild::addPartitionInputs(bool finished) { + if (inputRows_.empty()) { + return; + } + + if (windowPartitions_.size() <= inputPartition_) { + windowPartitions_.push_back(std::make_shared( + data_.get(), inversedInputChannels_, sortKeyInfo_)); + } + + windowPartitions_[inputPartition_]->addRows(inputRows_); + + if (finished) { + windowPartitions_[inputPartition_]->setComplete(); + ++inputPartition_; + } + + inputRows_.clear(); +} + +void RowsStreamingWindowBuild::addInput(RowVectorPtr input) { + for (auto i = 0; i < inputChannels_.size(); ++i) { + decodedInputVectors_[i].decode(*input->childAt(inputChannels_[i])); + } + + for (auto row = 0; row < input->size(); ++row) { + char* newRow = data_->newRow(); + + for (auto col = 0; col < input->childrenSize(); ++col) { + data_->store(decodedInputVectors_[col], row, newRow, col); + } + + if (previousRow_ != nullptr && + compareRowsWithKeys(previousRow_, newRow, partitionKeyInfo_)) { + addPartitionInputs(true); + } + + if (previousRow_ != nullptr && inputRows_.size() >= numRowsPerOutput_) { + addPartitionInputs(false); + } + + inputRows_.push_back(newRow); + previousRow_ = newRow; + } +} + +void RowsStreamingWindowBuild::noMoreInput() { + addPartitionInputs(true); +} + +std::shared_ptr RowsStreamingWindowBuild::nextPartition() { + VELOX_CHECK(hasNextPartition()); + return windowPartitions_[++outputPartition_]; +} + +bool RowsStreamingWindowBuild::hasNextPartition() { + return !windowPartitions_.empty() && + outputPartition_ + 2 <= windowPartitions_.size(); +} + +} // namespace facebook::velox::exec diff --git a/velox/exec/RowsStreamingWindowBuild.h b/velox/exec/RowsStreamingWindowBuild.h new file mode 100644 index 000000000000..c003f1e6a3f9 --- /dev/null +++ b/velox/exec/RowsStreamingWindowBuild.h @@ -0,0 +1,82 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include "velox/exec/WindowBuild.h" + +namespace facebook::velox::exec { + +/// Unlike PartitionStreamingWindowBuild, RowsStreamingWindowBuild is capable of +/// processing window functions as rows arrive within a single partition, +/// without the need to wait for the entirewindow partition to be ready. This +/// approach can significantly reduce memory usage, especially when a single +/// partition contains a large amount of data. It is particularly suited for +/// optimizing rank, dense_rank and row_number functions, as well as aggregate +/// window functions with a default frame. +class RowsStreamingWindowBuild : public WindowBuild { + public: + RowsStreamingWindowBuild( + const std::shared_ptr& windowNode, + velox::memory::MemoryPool* pool, + const common::SpillConfig* spillConfig, + tsan_atomic* nonReclaimableSection); + + void addInput(RowVectorPtr input) override; + + void spill() override { + VELOX_UNREACHABLE(); + } + + std::optional spilledStats() const override { + return std::nullopt; + } + + void noMoreInput() override; + + bool hasNextPartition() override; + + std::shared_ptr nextPartition() override; + + bool needsInput() override { + // No partitions are available or the currentPartition is the last available + // one, so can consume input rows. + return windowPartitions_.empty() || + outputPartition_ == windowPartitions_.size() - 1; + } + + private: + // Adds input rows to the current partition, or creates a new partition if it + // does not exist. + void addPartitionInputs(bool finished); + + // Points to the input rows in the current partition. + std::vector inputRows_; + + // Used to compare rows based on partitionKeys. + char* previousRow_ = nullptr; + + // Point to the current output partition if not -1. + vector_size_t outputPartition_ = -1; + + // Current input partition that receives inputs. + vector_size_t inputPartition_ = 0; + + // Holds all the built window partitions. + std::vector> windowPartitions_; +}; + +} // namespace facebook::velox::exec diff --git a/velox/exec/SortWindowBuild.cpp b/velox/exec/SortWindowBuild.cpp index c65009012fdd..400b1edbe636 100644 --- a/velox/exec/SortWindowBuild.cpp +++ b/velox/exec/SortWindowBuild.cpp @@ -294,11 +294,11 @@ void SortWindowBuild::loadNextPartitionFromSpill() { } } -std::unique_ptr SortWindowBuild::nextPartition() { +std::shared_ptr SortWindowBuild::nextPartition() { if (merge_ != nullptr) { VELOX_CHECK(!sortedRows_.empty(), "No window partitions available") auto partition = folly::Range(sortedRows_.data(), sortedRows_.size()); - return std::make_unique( + return std::make_shared( data_.get(), partition, inversedInputChannels_, sortKeyInfo_); } @@ -316,7 +316,7 @@ std::unique_ptr SortWindowBuild::nextPartition() { auto partition = folly::Range( sortedRows_.data() + partitionStartRows_[currentPartition_], partitionSize); - return std::make_unique( + return std::make_shared( data_.get(), partition, inversedInputChannels_, sortKeyInfo_); } diff --git a/velox/exec/SortWindowBuild.h b/velox/exec/SortWindowBuild.h index 645949ddb7e0..0caecfe6a5c3 100644 --- a/velox/exec/SortWindowBuild.h +++ b/velox/exec/SortWindowBuild.h @@ -53,7 +53,7 @@ class SortWindowBuild : public WindowBuild { bool hasNextPartition() override; - std::unique_ptr nextPartition() override; + std::shared_ptr nextPartition() override; private: void ensureInputFits(const RowVectorPtr& input); diff --git a/velox/exec/Window.cpp b/velox/exec/Window.cpp index 4577388006fa..e4d36d994591 100644 --- a/velox/exec/Window.cpp +++ b/velox/exec/Window.cpp @@ -15,8 +15,9 @@ */ #include "velox/exec/Window.h" #include "velox/exec/OperatorUtils.h" +#include "velox/exec/PartitionStreamingWindowBuild.h" +#include "velox/exec/RowsStreamingWindowBuild.h" #include "velox/exec/SortWindowBuild.h" -#include "velox/exec/StreamingWindowBuild.h" #include "velox/exec/Task.h" namespace facebook::velox::exec { @@ -41,8 +42,13 @@ Window::Window( auto* spillConfig = spillConfig_.has_value() ? &spillConfig_.value() : nullptr; if (windowNode->inputsSorted()) { - windowBuild_ = std::make_unique( - windowNode, pool(), spillConfig, &nonReclaimableSection_); + if (supportRowsStreaming()) { + windowBuild_ = std::make_unique( + windowNode_, pool(), spillConfig, &nonReclaimableSection_); + } else { + windowBuild_ = std::make_unique( + windowNode, pool(), spillConfig, &nonReclaimableSection_); + } } else { windowBuild_ = std::make_unique( windowNode, pool(), spillConfig, &nonReclaimableSection_, &spillStats_); @@ -54,6 +60,7 @@ void Window::initialize() { VELOX_CHECK_NOT_NULL(windowNode_); createWindowFunctions(); createPeerAndFrameBuffers(); + windowBuild_->setNumRowsPerOutput(numRowsPerOutput_); windowNode_.reset(); } @@ -188,6 +195,31 @@ void Window::createWindowFunctions() { } } +bool Window::supportRowsStreaming() { + for (const auto& windowFunction : windowNode_->windowFunctions()) { + const auto& functionName = windowFunction.functionCall->name(); + const auto windowFunctionMetadata = + exec::getWindowFunctionMetadata(functionName); + + if (windowFunctionMetadata.processMode != + exec::WindowFunction::ProcessMode::kRows) { + return false; + } + + const auto& frame = windowFunction.frame; + // The default frame spans from the start of the partition to current row. + const bool isDefaultFrame = + (frame.startType == core::WindowNode::BoundType::kUnboundedPreceding && + frame.endType == core::WindowNode::BoundType::kCurrentRow); + + if (windowFunctionMetadata.isAggregate && !isDefaultFrame) { + return false; + } + } + + return true; +} + void Window::addInput(RowVectorPtr input) { windowBuild_->addInput(input); numRows_ += input->size(); @@ -542,9 +574,13 @@ void Window::callApplyForPartitionRows( vector_size_t endRow, vector_size_t resultOffset, const RowVectorPtr& result) { - getInputColumns(startRow, endRow, resultOffset, result); - + // NOTE: for a partial window partition, the last row of the previously + // processed rows (used for peer group comparison) will be deleted by + // computePeerAndFrameBuffers after peer group comparison. Hence we need to + // call getInputColumns after computePeerAndFrameBuffers. computePeerAndFrameBuffers(startRow, endRow); + + getInputColumns(startRow, endRow, resultOffset, result); vector_size_t numFuncs = windowFunctions_.size(); for (auto i = 0; i < numFuncs; ++i) { windowFunctions_[i]->apply( @@ -560,6 +596,10 @@ void Window::callApplyForPartitionRows( const vector_size_t numRows = endRow - startRow; numProcessedRows_ += numRows; partitionOffset_ += numRows; + + if (currentPartition_->partial()) { + currentPartition_->removeProcessedRows(numRows); + } } vector_size_t Window::callApplyLoop( @@ -573,18 +613,25 @@ vector_size_t Window::callApplyLoop( // This function requires that the currentPartition_ is available for output. VELOX_DCHECK_NOT_NULL(currentPartition_); while (numOutputRowsLeft > 0) { - const auto rowsForCurrentPartition = - currentPartition_->numRows() - partitionOffset_; - if (rowsForCurrentPartition <= numOutputRowsLeft) { + const auto numPartitionRows = + currentPartition_->numRowsForProcessing(partitionOffset_); + if (numPartitionRows <= numOutputRowsLeft) { // Current partition can fit completely in the output buffer. // So output all its rows. callApplyForPartitionRows( partitionOffset_, - partitionOffset_ + rowsForCurrentPartition, + partitionOffset_ + numPartitionRows, resultIndex, result); - resultIndex += rowsForCurrentPartition; - numOutputRowsLeft -= rowsForCurrentPartition; + resultIndex += numPartitionRows; + numOutputRowsLeft -= numPartitionRows; + + if (!currentPartition_->complete()) { + // There are more data need to process for a partial partition. + VELOX_CHECK(currentPartition_->partial()); + break; + } + callResetPartition(); if (currentPartition_ == nullptr) { // The WindowBuild doesn't have any more partitions to process right @@ -627,6 +674,11 @@ RowVectorPtr Window::getOutput() { } } + if (!currentPartition_->complete() && + (currentPartition_->numRowsForProcessing(partitionOffset_) == 0)) { + return nullptr; + } + const auto numOutputRows = std::min(numRowsPerOutput_, numRowsLeft); auto result = BaseVector::create( outputType_, numOutputRows, operatorCtx_->pool()); diff --git a/velox/exec/Window.h b/velox/exec/Window.h index 393bcc364acc..eb3389369a23 100644 --- a/velox/exec/Window.h +++ b/velox/exec/Window.h @@ -88,6 +88,11 @@ class Window : public Operator { const std::optional end; }; + // Returns if a window operator support rows-wise streaming processing or not. + // Currently we supports 'rank', 'dense_rank' and 'row_number' functions with + // any frame type. Also supports the agg window function with default frame. + bool supportRowsStreaming(); + // Creates WindowFunction and frame objects for this operator. void createWindowFunctions(); @@ -165,7 +170,7 @@ class Window : public Operator { // Used to access window partition rows and columns by the window // operator and functions. This structure is owned by the WindowBuild. - std::unique_ptr currentPartition_; + std::shared_ptr currentPartition_; // HashStringAllocator required by functions that allocate out of line // buffers. diff --git a/velox/exec/WindowBuild.h b/velox/exec/WindowBuild.h index 1f9207c4fbd5..01c470803ed7 100644 --- a/velox/exec/WindowBuild.h +++ b/velox/exec/WindowBuild.h @@ -66,7 +66,7 @@ class WindowBuild { /// access the underlying columns of Window partition data. Check /// hasNextPartition() before invoking this function. This function fails if /// called when no partition is available. - virtual std::unique_ptr nextPartition() = 0; + virtual std::shared_ptr nextPartition() = 0; /// Returns the average size of input rows in bytes stored in the data /// container of the WindowBuild. @@ -74,6 +74,10 @@ class WindowBuild { return data_->estimateRowSize(); } + void setNumRowsPerOutput(vector_size_t numRowsPerOutput) { + numRowsPerOutput_ = numRowsPerOutput; + } + protected: bool compareRowsWithKeys( const char* lhs, @@ -111,6 +115,9 @@ class WindowBuild { /// Number of input rows. vector_size_t numRows_ = 0; + + // The maximum number of rows that can fit into an output block. + vector_size_t numRowsPerOutput_; }; } // namespace facebook::velox::exec diff --git a/velox/exec/WindowFunction.cpp b/velox/exec/WindowFunction.cpp index b093024a1dbe..6eea3160b21e 100644 --- a/velox/exec/WindowFunction.cpp +++ b/velox/exec/WindowFunction.cpp @@ -41,13 +41,24 @@ std::optional getWindowFunctionEntry( bool registerWindowFunction( const std::string& name, std::vector signatures, + WindowFunction::Metadata metadata, WindowFunctionFactory factory) { auto sanitizedName = sanitizeName(name); windowFunctions()[sanitizedName] = { - std::move(signatures), std::move(factory)}; + std::move(signatures), std::move(factory), std::move(metadata)}; return true; } +WindowFunction::Metadata getWindowFunctionMetadata(const std::string& name) { + const auto sanitizedName = sanitizeName(name); + if (auto func = getWindowFunctionEntry(sanitizedName)) { + return func.value()->metadata; + } else { + VELOX_USER_FAIL( + "Window function metadata not found for function: {}", name); + } +} + std::optional> getWindowFunctionSignatures( const std::string& name) { auto sanitizedName = sanitizeName(name); diff --git a/velox/exec/WindowFunction.h b/velox/exec/WindowFunction.h index ee0ef26869c1..e9bea92ee2c2 100644 --- a/velox/exec/WindowFunction.h +++ b/velox/exec/WindowFunction.h @@ -33,6 +33,28 @@ struct WindowFunctionArg { class WindowFunction { public: + /// The data process mode for calculating the window function. + enum class ProcessMode { + /// Process can only start after all the rows from a partition become + /// available. + kPartition, + /// Process can start as soon as rows are available within a partition, + /// without waiting for all the rows in the partition to be ready. + kRows, + }; + + /// Indicates whether this is an aggregate window function and its process + /// unit. + struct Metadata { + ProcessMode processMode; + bool isAggregate; + + static Metadata defaultMetadata() { + static Metadata defaultValue{ProcessMode::kPartition, false}; + return defaultValue; + } + }; + explicit WindowFunction( TypePtr resultType, memory::MemoryPool* pool, @@ -149,6 +171,7 @@ using WindowFunctionFactory = std::function( bool registerWindowFunction( const std::string& name, std::vector signatures, + WindowFunction::Metadata metadata, WindowFunctionFactory factory); /// Returns signatures of the window function with the specified name. @@ -159,8 +182,12 @@ std::optional> getWindowFunctionSignatures( struct WindowFunctionEntry { std::vector signatures; WindowFunctionFactory factory; + WindowFunction::Metadata metadata; }; +/// Returns window function metadata. +WindowFunction::Metadata getWindowFunctionMetadata(const std::string& name); + using WindowFunctionMap = std::unordered_map; /// Returns a map of all window function names to their registrations. diff --git a/velox/exec/WindowPartition.cpp b/velox/exec/WindowPartition.cpp index 2783b6b7b2f3..f57077364c34 100644 --- a/velox/exec/WindowPartition.cpp +++ b/velox/exec/WindowPartition.cpp @@ -21,13 +21,70 @@ WindowPartition::WindowPartition( RowContainer* data, const folly::Range& rows, const std::vector& inputMapping, - const std::vector>& sortKeyInfo) - : data_(data), + const std::vector>& sortKeyInfo, + bool partial, + bool complete) + : partial_(partial), + data_(data), partition_(rows), + complete_(complete), inputMapping_(inputMapping), sortKeyInfo_(sortKeyInfo) { - for (int i = 0; i < inputMapping_.size(); i++) { - columns_.emplace_back(data_->columnAt(inputMapping_[i])); + VELOX_CHECK_NE(partial_, complete_); + VELOX_CHECK_NE(complete_, partition_.empty()); + + for (auto index : inputMapping_) { + columns_.emplace_back(data_->columnAt(index)); + } +} + +WindowPartition::WindowPartition( + RowContainer* data, + const folly::Range& rows, + const std::vector& inputMapping, + const std::vector>& sortKeyInfo) + : WindowPartition(data, rows, inputMapping, sortKeyInfo, false, true) {} + +WindowPartition::WindowPartition( + RowContainer* data, + const std::vector& inputMapping, + const std::vector>& sortKeyInfo) + : WindowPartition(data, {}, inputMapping, sortKeyInfo, true, false) {} + +void WindowPartition::addRows(const std::vector& rows) { + checkPartial(); + rows_.insert(rows_.end(), rows.begin(), rows.end()); + partition_ = folly::Range(rows_.data(), rows_.size()); +} + +void WindowPartition::eraseRows(vector_size_t numRows) { + checkPartial(); + VELOX_CHECK_GE(data_->numRows(), numRows); + data_->eraseRows(folly::Range(rows_.data(), numRows)); +} + +void WindowPartition::removeProcessedRows(vector_size_t numRows) { + checkPartial(); + + VELOX_CHECK_NULL(previousRow_); + if (complete_ && rows_.size() == numRows) { + eraseRows(numRows); + } else { + eraseRows(numRows - 1); + previousRow_ = rows_[numRows - 1]; + } + + rows_.erase(rows_.begin(), rows_.begin() + numRows); + partition_ = folly::Range(rows_.data(), rows_.size()); + startRow_ += numRows; +} + +vector_size_t WindowPartition::numRowsForProcessing( + vector_size_t partitionOffset) const { + if (partial_) { + return partition_.size(); + } else { + return partition_.size() - partitionOffset; } } @@ -50,8 +107,9 @@ void WindowPartition::extractColumn( vector_size_t numRows, vector_size_t resultOffset, const VectorPtr& result) const { + VELOX_CHECK_GE(partitionOffset, startRow_); RowContainer::extractColumn( - partition_.data() + partitionOffset, + partition_.data() + partitionOffset - startRow_, numRows, columns_[columnIndex], resultOffset, @@ -130,23 +188,65 @@ bool WindowPartition::compareRowsWithSortKeys(const char* lhs, const char* rhs) return false; } +vector_size_t WindowPartition::findPeerRowEndIndex( + vector_size_t startRow, + vector_size_t lastRow, + const std::function& peerCompare) { + auto peerEnd = startRow; + while (peerEnd <= lastRow) { + if (peerCompare( + partition_[startRow - startRow_], + partition_[peerEnd - startRow_])) { + break; + } + ++peerEnd; + } + return peerEnd; +} + +void WindowPartition::removePreviousRow() { + VELOX_CHECK_NOT_NULL(previousRow_); + data_->eraseRows(folly::Range(&previousRow_, 1)); + previousRow_ = nullptr; +} + std::pair WindowPartition::computePeerBuffers( vector_size_t start, vector_size_t end, vector_size_t prevPeerStart, vector_size_t prevPeerEnd, vector_size_t* rawPeerStarts, - vector_size_t* rawPeerEnds) const { + vector_size_t* rawPeerEnds) { const auto peerCompare = [&](const char* lhs, const char* rhs) -> bool { return compareRowsWithSortKeys(lhs, rhs); }; - VELOX_CHECK_LE(end, numRows()); + VELOX_CHECK_LE(end, numRows() + startRow_); - const auto lastPartitionRow = numRows() - 1; + auto lastPartitionRow = numRows() + startRow_ - 1; auto peerStart = prevPeerStart; auto peerEnd = prevPeerEnd; - for (auto i = start, j = 0; i < end; ++i, ++j) { + + size_t next = start; + size_t index{0}; + if (partial_ && start > 0) { + const auto peerGroup = peerCompare(previousRow_, partition_[0]); + + // The first row is the last row in previous batch so delete it after used + // for the first peer group detection. + removePreviousRow(); + + if (!peerGroup) { + peerEnd = findPeerRowEndIndex(start, lastPartitionRow, peerCompare); + + for (; next < std::min(end, peerEnd); ++next, ++index) { + rawPeerStarts[index] = peerStart; + rawPeerEnds[index] = peerEnd - 1; + } + } + } + + for (; next < end; ++next, ++index) { // When traversing input partition rows, the peers are the rows with the // same values for the ORDER BY clause. These rows are equal in some ways // and affect the results of ranking functions. This logic exploits the fact @@ -155,22 +255,17 @@ std::pair WindowPartition::computePeerBuffers( // across the rows in that peer interval. Note: peerStart and peerEnd can be // maintained across getOutput calls. Hence, they are returned to the // caller. - if (i == 0 || i >= peerEnd) { + if (next == 0 || next >= peerEnd) { // Compute peerStart and peerEnd rows for the first row of the partition // or when past the previous peerGroup. - peerStart = i; - peerEnd = i; - while (peerEnd <= lastPartitionRow) { - if (peerCompare(partition_[peerStart], partition_[peerEnd])) { - break; - } - ++peerEnd; - } + peerStart = next; + peerEnd = findPeerRowEndIndex(peerStart, lastPartitionRow, peerCompare); } - rawPeerStarts[j] = peerStart; - rawPeerEnds[j] = peerEnd - 1; + rawPeerStarts[index] = peerStart; + rawPeerEnds[index] = peerEnd - 1; } + VELOX_CHECK_EQ(index, end - start); return {peerStart, peerEnd}; } diff --git a/velox/exec/WindowPartition.h b/velox/exec/WindowPartition.h index 7073af3a4238..7948611a9829 100644 --- a/velox/exec/WindowPartition.h +++ b/velox/exec/WindowPartition.h @@ -20,9 +20,14 @@ /// Simple WindowPartition that builds over the RowContainer used for storing /// the input rows in the Window Operator. This works completely in-memory. +/// WindowPartition supports partial window partitioning to facilitate +/// RowsStreamingWindowBuild which can start data processing with a portion set +/// of rows without having to wait until an entire partition of rows are ready. + /// TODO: This implementation will be revised for Spill to disk semantics. namespace facebook::velox::exec { + class WindowPartition { public: /// The WindowPartition is used by the Window operator and WindowFunction @@ -42,11 +47,46 @@ class WindowPartition { const std::vector>& sortKeyInfo); + /// The WindowPartition is used for RowStreamingWindowBuild which allows to + /// start data processing with a subset of partition rows. 'partial_' flag is + /// set for the constructed window partition. + WindowPartition( + RowContainer* data, + const std::vector& inputMapping, + const std::vector>& + sortKeyInfo); + + /// Adds remaining input 'rows' for a partial window partition. + void addRows(const std::vector& rows); + + /// Removes the first 'numRows' in 'rows_' from a partial window partition + /// after been processed. + void removeProcessedRows(vector_size_t numRows); + /// Returns the number of rows in the current WindowPartition. vector_size_t numRows() const { return partition_.size(); } + /// Returns the number of rows in a window partition remaining for data + /// processing. + vector_size_t numRowsForProcessing(vector_size_t partitionOffset) const; + + bool complete() const { + return complete_; + } + + bool partial() const { + return partial_; + } + + void setComplete() { + VELOX_CHECK(!complete_); + checkPartial(); + + complete_ = true; + } + /// Copies the values at 'columnIndex' into 'result' (starting at /// 'resultOffset') for the rows at positions in the 'rowNumbers' /// array from the partition input data. @@ -107,7 +147,7 @@ class WindowPartition { vector_size_t prevPeerStart, vector_size_t prevPeerEnd, vector_size_t* rawPeerStarts, - vector_size_t* rawPeerEnds) const; + vector_size_t* rawPeerEnds); /// Sets in 'rawFrameBounds' the frame boundary for the k range /// preceding/following frame. @@ -128,8 +168,33 @@ class WindowPartition { vector_size_t* rawFrameBounds) const; private: + WindowPartition( + RowContainer* data, + const folly::Range& rows, + const std::vector& inputMapping, + const std::vector>& + sortKeyInfo, + bool partial, + bool complete); + bool compareRowsWithSortKeys(const char* lhs, const char* rhs) const; + // Finds the index of the last peer row in range of ['startRow', 'lastRow']. + vector_size_t findPeerRowEndIndex( + vector_size_t startRow, + vector_size_t lastRow, + const std::function& peerCompare); + + // Removes 'numRows' from 'data_' and 'rows_'. + void eraseRows(vector_size_t numRows); + + void checkPartial() const { + VELOX_CHECK(partial_, "WindowPartition should be partial"); + } + + // Removes the previous row from 'data_'. + void removePreviousRow(); + // Searches for 'currentRow[frameColumn]' in 'orderByColumn' of rows between // 'start' and 'end' in the partition. 'firstMatch' specifies if first or last // row is matched. @@ -162,9 +227,16 @@ class WindowPartition { const vector_size_t* rawPeerBounds, vector_size_t* rawFrameBounds) const; + // Indicates if this is a partial partition for RowStreamWindowBuild + // processing. + const bool partial_; + // The RowContainer associated with the partition. // It is owned by the WindowBuild that creates the partition. - RowContainer* data_; + RowContainer* const data_; + + // Points to the input rows for partial partition. + std::vector rows_; // folly::Range is for the partition rows iterator provided by the // Window operator. The pointers are to rows from a RowContainer owned @@ -172,6 +244,10 @@ class WindowPartition { // of WindowPartition. folly::Range partition_; + // Indicates if a partial partition has received all the input rows. For a + // non-partial partition, this is always true. + bool complete_ = true; + // Mapping from window input column -> index in data_. This is required // because the WindowBuild reorders data_ to place partition and sort keys // before other columns in data_. But the Window Operator and Function code @@ -189,5 +265,15 @@ class WindowPartition { // corresponding indexes of their input arguments into this vector. // They will request for column vector values at the respective index. std::vector columns_; + + // The partition offset of the first row in 'rows_'. It is updated for + // partial partition during the data processing but always zero for + // non-partial partition. + vector_size_t startRow_{0}; + + // Points to the last row from the previous processed peer group if not null. + // This is only set for a partial window partition and always null for a + // non-partial one. + char* previousRow_{nullptr}; }; } // namespace facebook::velox::exec diff --git a/velox/exec/tests/PlanBuilderTest.cpp b/velox/exec/tests/PlanBuilderTest.cpp index 5204e9274403..c4b22f3678b8 100644 --- a/velox/exec/tests/PlanBuilderTest.cpp +++ b/velox/exec/tests/PlanBuilderTest.cpp @@ -98,7 +98,11 @@ void registerWindowFunction() { .returnType("BIGINT") .build(), }; - exec::registerWindowFunction("window1", std::move(signatures), nullptr); + exec::registerWindowFunction( + "window1", + std::move(signatures), + exec::WindowFunction::Metadata::defaultMetadata(), + nullptr); } } // namespace diff --git a/velox/exec/tests/PlanNodeToStringTest.cpp b/velox/exec/tests/PlanNodeToStringTest.cpp index b40463e4d55c..59485b89478f 100644 --- a/velox/exec/tests/PlanNodeToStringTest.cpp +++ b/velox/exec/tests/PlanNodeToStringTest.cpp @@ -729,7 +729,11 @@ TEST_F(PlanNodeToStringTest, window) { .returnType("BIGINT") .build(), }; - exec::registerWindowFunction("window1", std::move(signatures), nullptr); + exec::registerWindowFunction( + "window1", + std::move(signatures), + exec::WindowFunction::Metadata::defaultMetadata(), + nullptr); auto plan = PlanBuilder() diff --git a/velox/exec/tests/WindowFunctionRegistryTest.cpp b/velox/exec/tests/WindowFunctionRegistryTest.cpp index b0aee416caf7..f78b18a05024 100644 --- a/velox/exec/tests/WindowFunctionRegistryTest.cpp +++ b/velox/exec/tests/WindowFunctionRegistryTest.cpp @@ -37,7 +37,11 @@ void registerWindowFunction(const std::string& name) { .build(), exec::FunctionSignatureBuilder().returnType("date").build(), }; - exec::registerWindowFunction(name, std::move(signatures), nullptr); + exec::registerWindowFunction( + name, + std::move(signatures), + exec::WindowFunction::Metadata::defaultMetadata(), + nullptr); } } // namespace diff --git a/velox/exec/tests/WindowTest.cpp b/velox/exec/tests/WindowTest.cpp index febdcd743d30..a55b6f4598ab 100644 --- a/velox/exec/tests/WindowTest.cpp +++ b/velox/exec/tests/WindowTest.cpp @@ -13,9 +13,11 @@ * See the License for the specific language governing permissions and * limitations under the License. */ +#include "velox/common/base/Exceptions.h" #include "velox/common/base/tests/GTestUtils.h" #include "velox/common/file/FileSystems.h" #include "velox/exec/PlanNodeStats.h" +#include "velox/exec/RowsStreamingWindowBuild.h" #include "velox/exec/tests/utils/AssertQueryBuilder.h" #include "velox/exec/tests/utils/OperatorTestBase.h" #include "velox/exec/tests/utils/PlanBuilder.h" @@ -79,6 +81,262 @@ TEST_F(WindowTest, spill) { ASSERT_GT(stats.spilledPartitions, 0); } +TEST_F(WindowTest, rowBasedStreamingWindowOOM) { + const vector_size_t size = 1'000'000; + auto data = makeRowVector( + {"d", "p", "s"}, + { + // Payload. + makeFlatVector(size, [](auto row) { return row; }), + // Partition key. + makeFlatVector(size, [](auto row) { return row; }), + // Sorting key. + makeFlatVector(size, [](auto row) { return row; }), + }); + + createDuckDbTable({data}); + + // Abstract the common values vector split. + auto valuesSplit = split(data, 10); + + auto planNodeIdGenerator = std::make_shared(); + CursorParameters params; + auto queryCtx = core::QueryCtx::create(executor_.get()); + queryCtx->testingOverrideMemoryPool(memory::memoryManager()->addRootPool( + queryCtx->queryId(), + 8'388'608 /* 8MB */, + exec::MemoryReclaimer::create())); + + params.queryCtx = queryCtx; + + auto testWindowBuild = [&](bool useStreamingWindow) { + if (useStreamingWindow) { + params.planNode = + PlanBuilder(planNodeIdGenerator) + .values(valuesSplit) + .streamingWindow( + {"row_number() over (partition by p order by s)"}) + .project({"d"}) + .singleAggregation({}, {"sum(d)"}) + .planNode(); + + readCursor(params, [](Task*) {}); + } else { + params.planNode = + PlanBuilder(planNodeIdGenerator) + .values(valuesSplit) + .window({"row_number() over (partition by p order by s)"}) + .project({"d"}) + .singleAggregation({}, {"sum(d)"}) + .planNode(); + + VELOX_ASSERT_THROW( + readCursor(params, [](Task*) {}), + "Exceeded memory pool capacity after attempt to grow capacity through arbitration."); + } + }; + // RowStreamingWindow will not OOM. + testWindowBuild(true); + // SortBasedWindow will OOM. + testWindowBuild(false); +} + +TEST_F(WindowTest, rowBasedStreamingWindowMemoryUsage) { + auto memoryUsage = [&](bool useStreamingWindow, vector_size_t size) { + auto data = makeRowVector( + {"d", "p", "s"}, + { + // Payload. + makeFlatVector(size, [](auto row) { return row; }), + // Partition key. + makeFlatVector(size, [](auto row) { return row % 11; }), + // Sorting key. + makeFlatVector(size, [](auto row) { return row; }), + }); + + createDuckDbTable({data}); + + // Abstract the common values vector split. + auto valuesSplit = split(data, 10); + core::PlanNodeId windowId; + auto builder = PlanBuilder().values(valuesSplit); + if (useStreamingWindow) { + builder.orderBy({"p", "s"}, false) + .streamingWindow({"row_number() over (partition by p order by s)"}); + } else { + builder.window({"row_number() over (partition by p order by s)"}); + } + auto plan = builder.capturePlanNodeId(windowId).planNode(); + auto task = + AssertQueryBuilder(plan, duckDbQueryRunner_) + .config(core::QueryConfig::kPreferredOutputBatchBytes, "1024") + .assertResults( + "SELECT *, row_number() over (partition by p order by s) FROM tmp"); + + return exec::toPlanStats(task->taskStats()).at(windowId).peakMemoryBytes; + }; + + const vector_size_t smallSize = 100'000; + const vector_size_t largeSize = 1'000'000; + // As the volume of data increases, the peak memory usage of the sort-based + // window will increase (2418624 vs 17098688). Since the peak memory usage of + // the RowBased Window represents the one batch data in a single partition, + // the peak memory usage will not increase as the volume of data grows. + auto sortWindowSmallUsage = memoryUsage(false, smallSize); + auto sortWindowLargeUsage = memoryUsage(false, largeSize); + ASSERT_GT(sortWindowLargeUsage, sortWindowSmallUsage); + + auto rowWindowSmallUsage = memoryUsage(true, smallSize); + auto rowWindowLargeUsage = memoryUsage(true, largeSize); + ASSERT_EQ(rowWindowSmallUsage, rowWindowLargeUsage); +} + +DEBUG_ONLY_TEST_F(WindowTest, rankRowStreamingWindowBuild) { + auto data = makeRowVector( + {"c1"}, + {makeFlatVector(std::vector{1, 1, 1, 1, 1, 2, 2})}); + + createDuckDbTable({data}); + + const std::vector kClauses = { + "rank() over (order by c1 rows unbounded preceding)"}; + + auto plan = PlanBuilder() + .values({data}) + .orderBy({"c1"}, false) + .streamingWindow(kClauses) + .planNode(); + + std::atomic_bool isStreamCreated{false}; + SCOPED_TESTVALUE_SET( + "facebook::velox::exec::RowsStreamingWindowBuild::RowsStreamingWindowBuild", + std::function( + [&](RowsStreamingWindowBuild* windowBuild) { + isStreamCreated.store(true); + })); + + AssertQueryBuilder(plan, duckDbQueryRunner_) + .config(core::QueryConfig::kPreferredOutputBatchBytes, "1024") + .config(core::QueryConfig::kPreferredOutputBatchRows, "2") + .config(core::QueryConfig::kMaxOutputBatchRows, "2") + .assertResults( + "SELECT *, rank() over (order by c1 rows unbounded preceding) FROM tmp"); + + ASSERT_TRUE(isStreamCreated.load()); +} + +DEBUG_ONLY_TEST_F(WindowTest, valuesRowsStreamingWindowBuild) { + const vector_size_t size = 1'00; + + auto data = makeRowVector( + {makeFlatVector(size, [](auto row) { return row % 5; }), + makeFlatVector(size, [](auto row) { return row % 50; }), + makeFlatVector( + size, [](auto row) { return row % 3 + 1; }, nullEvery(5)), + makeFlatVector(size, [](auto row) { return row % 40; }), + makeFlatVector(size, [](auto row) { return row; })}); + + createDuckDbTable({data}); + + const std::vector kClauses = { + "rank() over (partition by c0, c2 order by c1, c3)", + "dense_rank() over (partition by c0, c2 order by c1, c3)", + "row_number() over (partition by c0, c2 order by c1, c3)", + "sum(c4) over (partition by c0, c2 order by c1, c3)"}; + + auto plan = PlanBuilder() + .values({split(data, 10)}) + .orderBy({"c0", "c2", "c1", "c3"}, false) + .streamingWindow(kClauses) + .planNode(); + + std::atomic_bool isStreamCreated{false}; + SCOPED_TESTVALUE_SET( + "facebook::velox::exec::RowsStreamingWindowBuild::RowsStreamingWindowBuild", + std::function( + [&](RowsStreamingWindowBuild* windowBuild) { + isStreamCreated.store(true); + })); + + AssertQueryBuilder(plan, duckDbQueryRunner_) + .config(core::QueryConfig::kPreferredOutputBatchBytes, "1024") + .assertResults( + "SELECT *, rank() over (partition by c0, c2 order by c1, c3), dense_rank() over (partition by c0, c2 order by c1, c3), row_number() over (partition by c0, c2 order by c1, c3), sum(c4) over (partition by c0, c2 order by c1, c3) FROM tmp"); + ASSERT_TRUE(isStreamCreated.load()); +} + +DEBUG_ONLY_TEST_F(WindowTest, aggregationWithNonDefaultFrame) { + const vector_size_t size = 1'00; + + auto data = makeRowVector( + {makeFlatVector(size, [](auto row) { return row % 5; }), + makeFlatVector(size, [](auto row) { return row % 50; }), + makeFlatVector( + size, [](auto row) { return row % 3 + 1; }, nullEvery(5)), + makeFlatVector(size, [](auto row) { return row % 40; }), + makeFlatVector(size, [](auto row) { return row; })}); + + createDuckDbTable({data}); + + const std::vector kClauses = { + "sum(c4) over (partition by c0, c2 order by c1, c3 range between unbounded preceding and unbounded following)"}; + + auto plan = PlanBuilder() + .values({split(data, 10)}) + .orderBy({"c0", "c2", "c1", "c3"}, false) + .streamingWindow(kClauses) + .planNode(); + + std::atomic_bool isStreamCreated{false}; + SCOPED_TESTVALUE_SET( + "facebook::velox::exec::RowsStreamingWindowBuild::RowsStreamingWindowBuild", + std::function( + [&](RowsStreamingWindowBuild* windowBuild) { + isStreamCreated.store(true); + })); + + AssertQueryBuilder(plan, duckDbQueryRunner_) + .config(core::QueryConfig::kPreferredOutputBatchBytes, "1024") + .assertResults( + "SELECT *, sum(c4) over (partition by c0, c2 order by c1, c3 range between unbounded preceding and unbounded following) FROM tmp"); + + ASSERT_FALSE(isStreamCreated.load()); +} + +DEBUG_ONLY_TEST_F(WindowTest, nonRowsStreamingWindow) { + auto data = makeRowVector( + {"c1"}, + {makeFlatVector(std::vector{1, 1, 1, 1, 1, 2, 2})}); + + createDuckDbTable({data}); + + const std::vector kClauses = { + "first_value(c1) over (order by c1 rows unbounded preceding)", + "nth_value(c1, 1) over (order by c1 rows unbounded preceding)"}; + + auto plan = PlanBuilder() + .values({data}) + .orderBy({"c1"}, false) + .streamingWindow(kClauses) + .planNode(); + + std::atomic_bool isStreamCreated{false}; + SCOPED_TESTVALUE_SET( + "facebook::velox::exec::RowsStreamingWindowBuild::RowsStreamingWindowBuild", + std::function( + [&](RowsStreamingWindowBuild* windowBuild) { + isStreamCreated.store(true); + })); + + AssertQueryBuilder(plan, duckDbQueryRunner_) + .config(core::QueryConfig::kPreferredOutputBatchBytes, "1024") + .config(core::QueryConfig::kPreferredOutputBatchRows, "2") + .config(core::QueryConfig::kMaxOutputBatchRows, "2") + .assertResults( + "SELECT *, first_value(c1) over (order by c1 rows unbounded preceding), nth_value(c1, 1) over (order by c1 rows unbounded preceding) FROM tmp"); + ASSERT_FALSE(isStreamCreated.load()); +} + TEST_F(WindowTest, missingFunctionSignature) { auto input = {makeRowVector({ makeFlatVector({1, 2, 3}), diff --git a/velox/functions/lib/window/NthValue.cpp b/velox/functions/lib/window/NthValue.cpp index 8fd456c620cc..5ef33fc303cd 100644 --- a/velox/functions/lib/window/NthValue.cpp +++ b/velox/functions/lib/window/NthValue.cpp @@ -319,6 +319,7 @@ void registerNthValue(const std::string& name, TypeKind offsetTypeKind) { exec::registerWindowFunction( name, std::move(signatures), + exec::WindowFunction::Metadata::defaultMetadata(), [name]( const std::vector& args, const TypePtr& resultType, diff --git a/velox/functions/lib/window/Ntile.cpp b/velox/functions/lib/window/Ntile.cpp index eacd562d2744..8b4460b90b42 100644 --- a/velox/functions/lib/window/Ntile.cpp +++ b/velox/functions/lib/window/Ntile.cpp @@ -241,6 +241,7 @@ void registerNtile(const std::string& name, const std::string& type) { exec::registerWindowFunction( name, std::move(signatures), + exec::WindowFunction::Metadata::defaultMetadata(), [name]( const std::vector& args, const TypePtr& resultType, diff --git a/velox/functions/lib/window/Rank.cpp b/velox/functions/lib/window/Rank.cpp index 646557a2e30c..625d2bd3b977 100644 --- a/velox/functions/lib/window/Rank.cpp +++ b/velox/functions/lib/window/Rank.cpp @@ -97,9 +97,7 @@ void registerRankInternal( exec::FunctionSignatureBuilder().returnType(returnType).build(), }; - exec::registerWindowFunction( - name, - std::move(signatures), + auto windowFunctionFactory = [name]( const std::vector& /*args*/, const TypePtr& resultType, @@ -107,9 +105,23 @@ void registerRankInternal( velox::memory::MemoryPool* /*pool*/, HashStringAllocator* /*stringAllocator*/, const core::QueryConfig& /*queryConfig*/) - -> std::unique_ptr { - return std::make_unique>(resultType); - }); + -> std::unique_ptr { + return std::make_unique>(resultType); + }; + + if constexpr (TRank == RankType::kRank || TRank == RankType::kDenseRank) { + exec::registerWindowFunction( + name, + std::move(signatures), + {exec::WindowFunction::ProcessMode::kRows, false}, + std::move(windowFunctionFactory)); + } else { + exec::registerWindowFunction( + name, + std::move(signatures), + exec::WindowFunction::Metadata::defaultMetadata(), + std::move(windowFunctionFactory)); + } } void registerRankBigint(const std::string& name) { diff --git a/velox/functions/lib/window/RowNumber.cpp b/velox/functions/lib/window/RowNumber.cpp index 16b7feb0a543..81fc7e5e0c6d 100644 --- a/velox/functions/lib/window/RowNumber.cpp +++ b/velox/functions/lib/window/RowNumber.cpp @@ -75,6 +75,7 @@ void registerRowNumber(const std::string& name, TypeKind resultTypeKind) { exec::registerWindowFunction( name, std::move(signatures), + {exec::WindowFunction::ProcessMode::kRows, false}, [name]( const std::vector& /*args*/, const TypePtr& resultType, diff --git a/velox/functions/prestosql/window/CumeDist.cpp b/velox/functions/prestosql/window/CumeDist.cpp index 98b264ec9ca9..439693e7ccc7 100644 --- a/velox/functions/prestosql/window/CumeDist.cpp +++ b/velox/functions/prestosql/window/CumeDist.cpp @@ -74,6 +74,7 @@ void registerCumeDist(const std::string& name) { exec::registerWindowFunction( name, std::move(signatures), + exec::WindowFunction::Metadata::defaultMetadata(), [name]( const std::vector& /*args*/, const TypePtr& /*resultType*/, diff --git a/velox/functions/prestosql/window/FirstLastValue.cpp b/velox/functions/prestosql/window/FirstLastValue.cpp index eaec4db98703..8aff5e26d2a2 100644 --- a/velox/functions/prestosql/window/FirstLastValue.cpp +++ b/velox/functions/prestosql/window/FirstLastValue.cpp @@ -175,6 +175,7 @@ void registerFirstLastInternal(const std::string& name) { exec::registerWindowFunction( name, std::move(signatures), + exec::WindowFunction::Metadata::defaultMetadata(), [](const std::vector& args, const TypePtr& resultType, bool ignoreNulls, diff --git a/velox/functions/prestosql/window/LeadLag.cpp b/velox/functions/prestosql/window/LeadLag.cpp index 8e4c598c4d63..9d65351c76d5 100644 --- a/velox/functions/prestosql/window/LeadLag.cpp +++ b/velox/functions/prestosql/window/LeadLag.cpp @@ -424,6 +424,7 @@ void registerLag(const std::string& name) { exec::registerWindowFunction( name, signatures(), + exec::WindowFunction::Metadata::defaultMetadata(), [name]( const std::vector& args, const TypePtr& resultType, @@ -441,6 +442,7 @@ void registerLead(const std::string& name) { exec::registerWindowFunction( name, signatures(), + exec::WindowFunction::Metadata::defaultMetadata(), [name]( const std::vector& args, const TypePtr& resultType,