Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add RowsStreamingWindowBuild to avoid OOM in Window operator #9025

Closed
wants to merge 1 commit into from

Conversation

JkSelf
Copy link
Collaborator

@JkSelf JkSelf commented Mar 11, 2024

Unlike StreamingWindowBuild, RowLevelStreamingWindowBuild in this PR is capable of processing window functions as rows arrive within a single partition, without the need to wait for the entire partition to be ready. This approach can significantly reduce memory usage, especially when a single partition contains a large amount of data. It is particularly suited for optimizing rank and row_number functions, as well as aggregate window functions with a default frame.

The detailed discussions is here. The design doc is here.

@facebook-github-bot facebook-github-bot added the CLA Signed This label is managed by the Facebook bot. Authors need to sign the CLA before a PR can be reviewed. label Mar 11, 2024
Copy link

netlify bot commented Mar 11, 2024

Deploy Preview for meta-velox ready!

Name Link
🔨 Latest commit bf3925c
🔍 Latest deploy log https://app.netlify.com/sites/meta-velox/deploys/66c92ef823947800087a0046
😎 Deploy Preview https://deploy-preview-9025--meta-velox.netlify.app
📱 Preview on mobile
Toggle QR Code...

QR Code

Use your smartphone camera to open QR code link.

To edit notification comments on pull requests, go to your Netlify site configuration.

@JkSelf JkSelf force-pushed the rank-optimization branch 2 times, most recently from 196b0c8 to 28cda2f Compare March 12, 2024 05:13
@JkSelf
Copy link
Collaborator Author

JkSelf commented Mar 12, 2024

@mbasmanova @aditi-pandit Can you help to review? Thanks for your help.

Copy link
Collaborator

@aditi-pandit aditi-pandit left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@JkSelf : Had a high level question about this PR.

My understanding is that :

  • You are trying to simulate an end of partition by artificially adding a partitionStartRow offset for each input block.
  • Each partition has an offset indicating which row position is the first one.
  • So for every input block you might naturally encounter end of partition if there is such a row. But you always reach end of partition at the end of the block.
  • The next input block is a sort of new partition but carries the offset in the partition.

That would mean that resetPartition function is called at end of each input block. That is changing the semantics of WindowFunction. That is misleading. We shouldn't do that.

Instead we should enhance WindowPartition structure to model a partially filled partition. The code in Window operator that iterates in callApplyLoop would have logic to handle paritial WindowPartitions.

const auto& functionName = windowNodeFunction.functionCall->name();
const auto& frame = windowNodeFunction.frame;

bool isRankLikeFunction =
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could add to WindowFunction API a new function supportsStreaming() that returns true/false depending on whether it supports this. So this code would remain independent of the function names.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be preferable to make such metadata available from the registry without having to instantiate a function.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@aditi-pandit @mbasmanova Move this metadata in registry to avoid instancing function.

@JkSelf JkSelf force-pushed the rank-optimization branch 7 times, most recently from 39d0150 to 194ebc3 Compare March 19, 2024 07:13
@JkSelf
Copy link
Collaborator Author

JkSelf commented Mar 19, 2024

@JkSelf : Had a high level question about this PR.

My understanding is that :

  • You are trying to simulate an end of partition by artificially adding a partitionStartRow offset for each input block.
  • Each partition has an offset indicating which row position is the first one.
  • So for every input block you might naturally encounter end of partition if there is such a row. But you always reach end of partition at the end of the block.
  • The next input block is a sort of new partition but carries the offset in the partition.

That would mean that resetPartition function is called at end of each input block. That is changing the semantics of WindowFunction. That is misleading. We shouldn't do that.

Instead we should enhance WindowPartition structure to model a partially filled partition. The code in Window operator that iterates in callApplyLoop would have logic to handle paritial WindowPartitions.

@aditi-pandit Make sense to me. Updated based on your suggestions and make the WindowPartition be a partially partition. Can you help to review again? Thanks.

@JkSelf JkSelf force-pushed the rank-optimization branch 4 times, most recently from 56f2161 to 160e96c Compare March 21, 2024 07:41
@mbasmanova
Copy link
Contributor

@JkSelf Would you update the PR description to provide some context on the changes here. Specifically, please, describe the overall design you implemented.

@@ -187,6 +195,28 @@ void Window::createWindowFunctions() {
}
}

// The supportRankWindowBuild is designed to support 'rank' and
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This optimization can be applied more broadly. With default frame, all aggregate functions can be processed in streaming manner without holding all rows of a partition in memory.

See code around analyzeFrameValues in AggregateWindow.cpp for some context.

Also, note that row_number and rank functions ignore frames.

@mbasmanova
Copy link
Contributor

@rui-mo @PHILO-HE Folks, would you help review this PR?

Copy link
Collaborator

@rui-mo rui-mo left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks!

velox/exec/RankLikeWindowBuild.cpp Outdated Show resolved Hide resolved
velox/exec/RankLikeWindowBuild.cpp Outdated Show resolved Hide resolved
velox/exec/RankLikeWindowBuild.h Outdated Show resolved Hide resolved
velox/exec/RankLikeWindowBuild.h Outdated Show resolved Hide resolved
velox/exec/Window.cpp Outdated Show resolved Hide resolved
velox/exec/WindowPartition.h Outdated Show resolved Hide resolved
velox/exec/RankLikeWindowBuild.h Outdated Show resolved Hide resolved
@aditi-pandit
Copy link
Collaborator

aditi-pandit commented Mar 21, 2024

Thanks @JkSelf for incorporating previous suggestions. Sorry for the delay in my review. I got swamped with many things the last 2 days

Agree with others about having a design doc. Putting a doc together will give an opportunity to think about this more clearly from first principles.

Had a suggestion for you about this PR as well:
i) Split this PR into multiple parts. One specifically to add the streaming metadata to the function. Like @mbasmanova pointed out even aggregate functions are streaming for default window frame. So it would be better to add an API for WindowFunction to get if the function implementation in streaming. The parameters of this method include the windowFrame.
ii) The second enhances WindowPartition with the partial data. Ensure this part works with both Streaming and SortWindowBuild.
iii) I feel that RankLikeWindowBuild is not separately needed. We could enhance StreamingWindowBUild to accomodate the changes directly. Maybe give that a thought as well.

@JkSelf
Copy link
Collaborator Author

JkSelf commented Mar 22, 2024

@mbasmanova @aditi-pandit @rui-mo Thanks for your review and suggestions.

I have written a design document about streaming processing for Rank and row_number here. Correct me if something wrong. Thanks.

Had a suggestion for you about this PR as well:
i) Split this PR into multiple parts. One specifically to add the streaming metadata to the function. Like @mbasmanova pointed out even aggregate functions are streaming for default window frame. So it would be better to add an API for WindowFunction to get if the function implementation in streaming. The parameters of this method include the windowFrame.

Sure. I will split one PR to solve the streaming metadata into WindowFunction, another is to handle the streaming processing for rank and row_number(), and the last one is to deal with the streaming processing for aggregate window functions with default window frames.

ii) The second enhances WindowPartition with the partial data. Ensure this part works with both Streaming and SortWindowBuild.

The patch already support WindowPartition with partial input. And can pass the window test with StreamingWindowBuild and SortWindowBuild. The failed unit tests is not related with this PR.

iii) I feel that RankLikeWindowBuild is not separately needed. We could enhance StreamingWindowBUild to accomodate the changes directly. Maybe give that a thought as well.

It is feasible to put the current RankLikeWindowBuild into StreamingWindowBuild, but it will increase the complexity of the StreamingWindowBuild code. Personally, I feel that keeping them separate would be clearer.

/// and row_number functions. RankWindowBuild adopts a streaming method to
/// construct WindowPartition, which can reduce the occurrence of Out Of Memory
/// (OOM).
class RankLikeWindowBuild : public WindowBuild {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

RankLike name doesn't work very well. dense_rank function is like rank, but it doesn't support this specific functionality. On the other hand, sum is not like rank, but it does support this functionality. Let's come up with a name that better reflects the functionality.

Copy link
Collaborator Author

@JkSelf JkSelf Mar 25, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mbasmanova Yes. How about RowLevelStreamingWindowBuild and also changing the existing StreamingWindowBuild to PartitionLevelStreamingWindowBuild?

velox/exec/WindowPartition.h Outdated Show resolved Hide resolved
velox/exec/WindowPartition.h Outdated Show resolved Hide resolved
Copy link
Contributor

@xiaoxmeng xiaoxmeng left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@JkSelf LGTM. Thanks for the iterations!

@facebook-github-bot
Copy link
Contributor

@xiaoxmeng has imported this pull request. If you are a Meta employee, you can view this diff on Phabricator.

@JkSelf JkSelf force-pushed the rank-optimization branch 2 times, most recently from d338045 to df8f3e3 Compare August 20, 2024 12:23
Copy link
Contributor

@xiaoxmeng xiaoxmeng left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@JkSelf few more comments. thanks!

velox/exec/WindowPartition.h Outdated Show resolved Hide resolved
velox/exec/WindowPartition.cpp Outdated Show resolved Hide resolved
velox/exec/WindowPartition.cpp Show resolved Hide resolved
@facebook-github-bot
Copy link
Contributor

@xiaoxmeng has imported this pull request. If you are a Meta employee, you can view this diff on Phabricator.

@kagamiori
Copy link
Contributor

Hi @JkSelf, thank you for adding RowsStreamingWindowBuild. Could you also run the window fuzzer that compares Velox results against Presto for 2+ hours? To run that, you need to first start a Presto server following steps 1--10 here (but change the version 0.284 to 0.288): #8111. Then, you can start velox_window_fuzzer_test with command-line arguments --enable_window_reference_verification --presto_url=http://127.0.0.1:8080 --logtostderr=1 --minloglevel=0 --duration_sec=7200 .

Please let me know if you have any questions with running the fuzzer test. Thanks!

@JkSelf
Copy link
Collaborator Author

JkSelf commented Aug 22, 2024

Hi @JkSelf, thank you for adding RowsStreamingWindowBuild. Could you also run the window fuzzer that compares Velox results against Presto for 2+ hours? To run that, you need to first start a Presto server following steps 1--10 here (but change the version 0.284 to 0.288): #8111. Then, you can start velox_window_fuzzer_test with command-line arguments --enable_window_reference_verification --presto_url=http://127.0.0.1:8080 --logtostderr=1 --minloglevel=0 --duration_sec=7200 .

Please let me know if you have any questions with running the fuzzer test. Thanks!

@kagamiori Thanks for your review. I will try to execute the window fuzzer test locally following the steps you provided.

@JkSelf
Copy link
Collaborator Author

JkSelf commented Aug 22, 2024

@kagamiori @xiaoxmeng

I encountered the following error during the execution process, but this error doesn't seem to be related to my PR. I tried running it without my PR, and the same error occurred.

I0822 23:45:06.386909 2118850 WindowFuzzer.cpp:277] ==============================> Done with iteration 228
I0822 23:45:06.387009 2118850 WindowFuzzer.cpp:217] ==============================> Started iteration 229 (seed: 2266604503)
I0822 23:45:06.413928 2118850 AggregationFuzzerBase.cpp:427] Executing query plan:
-- Window[1][partition by [p0, p1] order by [s0 ASC NULLS FIRST, s1 DESC NULLS FIRST, row_number ASC NULLS LAST] w0 := approx_percentile(ROW["c0"],ROW["c1"],ROW["c2"]) ROWS between k0 FOLLOWING and -8268856359831993385 FOLLOWING] -> c0:DOUBLE, c1:BIGINT, c2:DOUBLE, s0:ROW<f0:BOOLEAN,f1:INTERVAL DAY TO SECOND,f2:BOOLEAN,f3:VARCHAR,f4:INTEGER>, s1:SMALLINT, p0:ARRAY<DOUBLE>, p1:ARRAY<DOUBLE>, k0:INTEGER, row_number:BIGINT, w0:DOUBLE
  -- Values[0][1000 rows in 10 vectors] -> c0:DOUBLE, c1:BIGINT, c2:DOUBLE, s0:ROW<f0:BOOLEAN,f1:INTERVAL DAY TO SECOND,f2:BOOLEAN,f3:VARCHAR,f4:INTEGER>, s1:SMALLINT, p0:ARRAY<DOUBLE>, p1:ARRAY<DOUBLE>, k0:INTEGER, row_number:BIGINT
I0822 23:45:06.414655 2121801 Task.cpp:1920] Terminating task test_cursor 1170 with state Failed after running for 0ms
I0822 23:45:06.414741 2121801 Task.cpp:1155] All drivers (1) finished for task test_cursor 1170 after running for 0ms
E0822 23:45:06.414860 2121801 MemoryPool.cpp:448] [MEM] Memory leak (Used memory): Memory Pool[op.1.0.0.Window LEAF root[query.TaskCursorQuery_1169.1619] parent[node.1] MALLOC track-usage thread-safe]<unlimited max capacity unlimited capacity used 128B available 1023.88KB reservation [used 128B, reserved 1.00MB, min 0B] counters [allocs 9, frees 8, reserves 0, releases 1, collisions 0])>
E0822 23:45:06.414930 2121801 Exceptions.h:67] Line: /mnt/DP_disk3/jk/projects/fb-velox/velox/velox/common/memory/MemoryArbitrator.cpp:100, Function:removePool, Expression: pool->reservedBytes() == 0 (1048576 vs. 0), Source: RUNTIME, ErrorCode: INVALID_STATE
terminate called after throwing an instance of 'facebook::velox::VeloxRuntimeError'
  what():  Exception: VeloxRuntimeError
Error Source: RUNTIME
Error Code: INVALID_STATE
Reason: (1048576 vs. 0)
Retriable: False
Expression: pool->reservedBytes() == 0
Function: removePool
File: /mnt/DP_disk3/jk/projects/fb-velox/velox/velox/common/memory/MemoryArbitrator.cpp
Line: 100
Stack trace:
# 0  facebook::velox::VeloxException::VeloxException(char const*, unsigned long, char const*, std::basic_string_view<char, std::char_traits<char> >, std::basic_string_view<char, std::char_traits<char> >, std::basic_string_view<char, std::char_traits<char> >, std::basic_string_view<char, std::char_traits<char> >, bool, facebook::velox::VeloxException::Type, std::basic_string_view<char, std::char_traits<char> >)
# 1  void facebook::velox::detail::veloxCheckFail<facebook::velox::VeloxRuntimeError, std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> > const&>(facebook::velox::detail::VeloxCheckFailArgs const&, std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> > const&)
# 2  facebook::velox::memory::(anonymous namespace)::NoopArbitrator::removePool(facebook::velox::memory::MemoryPool*)
# 3  facebook::velox::memory::MemoryManager::dropPool(facebook::velox::memory::MemoryPool*)
# 4  facebook::velox::memory::MemoryPoolImpl::~MemoryPoolImpl()
# 5  std::_Sp_counted_ptr<facebook::velox::core::QueryCtx*, (__gnu_cxx::_Lock_policy)2>::_M_dispose()
# 6  std::_Sp_counted_base<(__gnu_cxx::_Lock_policy)2>::_M_release() [clone .part.0]
# 7  facebook::velox::exec::Task::~Task()
# 8  std::_Sp_counted_ptr<facebook::velox::exec::Task*, (__gnu_cxx::_Lock_policy)2>::_M_dispose()
# 9  std::_Sp_counted_base<(__gnu_cxx::_Lock_policy)2>::_M_release() [clone .part.0]
# 10 std::_Sp_counted_ptr<facebook::velox::exec::Driver*, (__gnu_cxx::_Lock_policy)2>::_M_dispose()
# 11 std::_Sp_counted_base<(__gnu_cxx::_Lock_policy)2>::_M_release() [clone .part.0]
# 12 unsigned long folly::detail::function::DispatchSmall::exec<facebook::velox::exec::Driver::enqueue(std::shared_ptr<facebook::velox::exec::Driver>)::{lambda()#1}>(folly::detail::function::Op, folly::detail::function::Data*, folly::detail::function::Data)
# 13 folly::ThreadPoolExecutor::runTask(std::shared_ptr<folly::ThreadPoolExecutor::Thread> const&, folly::ThreadPoolExecutor::Task&&)
# 14 folly::CPUThreadPoolExecutor::threadRun(std::shared_ptr<folly::ThreadPoolExecutor::Thread>)
# 15 void folly::detail::function::call_<std::_Bind<void (folly::ThreadPoolExecutor::*(folly::ThreadPoolExecutor*, std::shared_ptr<folly::ThreadPoolExecutor::Thread>))(std::shared_ptr<folly::ThreadPoolExecutor::Thread>)>, true, false, void>(, folly::detail::function::Data&)
# 16 0x00000000000d6df3
# 17 start_thread
# 18 clone

*** Aborted at 1724341506 (Unix time, try 'date -d @1724341506') ***
*** Signal 6 (SIGABRT) (0x2054c2) received by PID 2118850 (pthread TID 0x7f7e077fe700) (linux TID 2121801) (maybe from PID 2118850, UID 0) (code: -6), stack trace: ***
    @ 0000000007449637 folly::symbolizer::(anonymous namespace)::signalHandler(int, siginfo_t*, void*)
                       /mnt/DP_disk3/jk/projects/gluten/ep/build-velox/build/velox_ep/folly/folly/experimental/symbolizer/SignalHandler.cpp:453
    @ 000000000001441f (unknown)
    @ 000000000004300b gsignal
    @ 0000000000022858 abort
    @ 000000000009e8d0 (unknown)
    @ 00000000000aa37b (unknown)
    @ 00000000000a9358 (unknown)
    @ 00000000000a9d10 __gxx_personality_v0
    @ 0000000000003f88 __libunwind_Unwind_Resume
    @ 00000000023e7d28 facebook::velox::memory::MemoryManager::dropPool(facebook::velox::memory::MemoryPool*) [clone .cold]
    @ 0000000007395017 facebook::velox::memory::MemoryPoolImpl::~MemoryPoolImpl()
    @ 00000000071d6a98 std::_Sp_counted_ptr<facebook::velox::core::QueryCtx*, (__gnu_cxx::_Lock_policy)2>::_M_dispose()
    @ 00000000024332d9 std::_Sp_counted_base<(__gnu_cxx::_Lock_policy)2>::_M_release() [clone .part.0]
    @ 0000000006c75269 facebook::velox::exec::Task::~Task()
    @ 0000000006c92975 std::_Sp_counted_ptr<facebook::velox::exec::Task*, (__gnu_cxx::_Lock_policy)2>::_M_dispose()
    @ 00000000024332d9 std::_Sp_counted_base<(__gnu_cxx::_Lock_policy)2>::_M_release() [clone .part.0]
    @ 0000000006d0fdd3 std::_Sp_counted_ptr<facebook::velox::exec::Driver*, (__gnu_cxx::_Lock_policy)2>::_M_dispose()
    @ 00000000024332d9 std::_Sp_counted_base<(__gnu_cxx::_Lock_policy)2>::_M_release() [clone .part.0]
    @ 0000000006ce647f unsigned long folly::detail::function::DispatchSmall::exec<facebook::velox::exec::Driver::enqueue(std::shared_ptr<facebook::velox::exec::Driver>)::{lambda()#1}>(folly::detail::function::Op, folly::detail::function::Data*, folly::detail::function::Data)
    @ 000000000743f420 folly::ThreadPoolExecutor::runTask(std::shared_ptr<folly::ThreadPoolExecutor::Thread> const&, folly::ThreadPoolExecutor::Task&&)
                       /mnt/DP_disk3/jk/projects/gluten/ep/build-velox/build/velox_ep/folly/folly/Function.h:639
                       -> /mnt/DP_disk3/jk/projects/gluten/ep/build-velox/build/velox_ep/folly/folly/executors/ThreadPoolExecutor.cpp
    @ 0000000007427d25 folly::CPUThreadPoolExecutor::threadRun(std::shared_ptr<folly::ThreadPoolExecutor::Thread>)
                       /mnt/DP_disk3/jk/projects/gluten/ep/build-velox/build/velox_ep/folly/folly/executors/CPUThreadPoolExecutor.cpp:350
    @ 0000000007445d2d void folly::detail::function::call_<std::_Bind<void (folly::ThreadPoolExecutor::*(folly::ThreadPoolExecutor*, std::shared_ptr<folly::ThreadPoolExecutor::Thread>))(std::shared_ptr<folly::ThreadPoolExecutor::Thread>)>, true, false, void>(, folly::detail::function::Data&)
                       /usr/include/c++/9/bits/invoke.h:73
                       -> /mnt/DP_disk3/jk/projects/gluten/ep/build-velox/build/velox_ep/folly/folly/executors/ThreadPoolExecutor.cpp
    @ 00000000000d6df3 (unknown)
    @ 0000000000008608 start_thread
    @ 000000000011f352 clone
Aborted (core dumped)

@xiaoxmeng
Copy link
Contributor

enable_window_reference_verification

This seems to be memory leak somewhere, and I didn't see that in Meta internal run against duckDB. Not sure if it relates to Presto verification. I will try with enableWindowVerification set with duckDB.

@JkSelf
Copy link
Collaborator Author

JkSelf commented Aug 23, 2024

enable_window_reference_verification

This seems to be memory leak somewhere, and I didn't see that in Meta internal run against duckDB. Not sure if it relates to Presto verification. I will try with enableWindowVerification set with duckDB.

@xiaoxmeng @kagamiori
I have discovered that the memory leak occurs during the creation of the frame.

Detected total of 1 leaked allocations:
======== Leaked memory from 1 total allocations of 96B total size ========
# 0  facebook::velox::memory::MemoryPoolImpl::recordAllocDbg(void const*, unsigned
# 1  facebook::velox::memory::MemoryPoolImpl::allocate(long)
# 2  boost::intrusive_ptr<facebook::velox::Buffer> facebook::velox::AlignedBuffer::e<int>(unsigned long, facebook::velox::memory::MemoryPool*, std::optional<int> cons
# 3  std::shared_ptr<facebook::velox::BaseVector> facebook::velox::createEmpty<(facvelox::TypeKind)3>(int, facebook::velox::memory::MemoryPool*, std::shared_ptr<faceblox::Type const> const&)
# 4  facebook::velox::BaseVector::createInternal(std::shared_ptr<facebook::velox::Tst> const&, int, facebook::velox::memory::MemoryPool*)::{lambda()#1}::operator()() {lambda()#1}::operator()() const
# 5  facebook::velox::BaseVector::createInternal(std::shared_ptr<facebook::velox::Tst> const&, int, facebook::velox::memory::MemoryPool*)::{lambda()#1}::operator()()
# 6  facebook::velox::BaseVector::createInternal(std::shared_ptr<facebook::velox::Tst> const&, int, facebook::velox::memory::MemoryPool*)
# 7  std::shared_ptr<facebook::velox::BaseVector> facebook::velox::BaseVector::creabook::velox::BaseVector>(std::shared_ptr<facebook::velox::Type const> const&, int, k::velox::memory::MemoryPool*)
# 8  facebook::velox::exec::Window::createWindowFrame(std::shared_ptr<facebook::vele::WindowNode const> const&, facebook::velox::core::WindowNode::Frame const&, std::ptr<facebook::velox::RowType const> const&)::{lambda(std::shared_ptr<facebook::velo::ITypedExpr const> const&)#1}::operator()(std::shared_ptr<facebook::velox::core::Ipr const> const&) const
# 9  facebook::velox::exec::Window::createWindowFrame(std::shared_ptr<facebook::vele::WindowNode const> const&, facebook::velox::core::WindowNode::Frame const&, std::ptr<facebook::velox::RowType const> const&)
# 10 facebook::velox::exec::Window::createWindowFunctions()
# 11 facebook::velox::exec::Window::initialize()
# 12 facebook::velox::exec::Driver::initializeOperators()
# 13 facebook::velox::exec::Driver::runInternal(std::shared_ptr<facebook::velox::exver>&, std::shared_ptr<facebook::velox::exec::BlockingState>&, std::shared_ptr<faceelox::RowVector>&)
# 14 facebook::velox::exec::Driver::run(std::shared_ptr<facebook::velox::exec::Driv
# 15 facebook::velox::exec::Driver::enqueue(std::shared_ptr<facebook::velox::exec::)::{lambda()#1}::operator()() const
# 16 void folly::detail::function::call_<facebook::velox::exec::Driver::enqueue(stdd_ptr<facebook::velox::exec::Driver>)::{lambda()#1}, true, false, void>(, folly::deunction::Data&)
# 17 folly::ThreadPoolExecutor::runTask(std::shared_ptr<folly::ThreadPoolExecutor:: const&, folly::ThreadPoolExecutor::Task&&)
# 18 folly::CPUThreadPoolExecutor::threadRun(std::shared_ptr<folly::ThreadPoolExecuread>)
# 19 void folly::detail::function::call_<std::_Bind<void (folly::ThreadPoolExecutorly::ThreadPoolExecutor*, std::shared_ptr<folly::ThreadPoolExecutor::Thread>))(std::ptr<folly::ThreadPoolExecutor::Thread>)>, true, false, void>(, folly::detail::functta&)
# 20 folly::detail::function::FunctionTraits<void ()>::operator()()
# 21 folly::NamedThreadFactory::newThread(folly::Function<void ()>&&)::{lambda()#1}tor()()
# 22 void std::__invoke_impl<void, folly::NamedThreadFactory::newThread(folly::Funcid ()>&&)::{lambda()#1}>(std::__invoke_other, folly::NamedThreadFactory::newThread(Function<void ()>&&)::{lambda()#1}&&)
# 23 std::__invoke_result<folly::NamedThreadFactory::newThread(folly::Function<void::{lambda()#1}>::type std::__invoke<folly::NamedThreadFactory::newThread(folly::Funoid ()>&&)::{lambda()#1}>(std::__invoke_result&&, (folly::NamedThreadFactory::newThlly::Function<void ()>&&)::{lambda()#1}&&)...)
# 24 void std::thread::_Invoker<std::tuple<folly::NamedThreadFactory::newThread(folction<void ()>&&)::{lambda()#1}> >::_M_invoke<0ul>(std::_Index_tuple<0ul>)
# 25 std::thread::_Invoker<std::tuple<folly::NamedThreadFactory::newThread(folly::F<void ()>&&)::{lambda()#1}> >::operator()()
# 26 std::thread::_State_impl<std::thread::_Invoker<std::tuple<folly::NamedThreadFanewThread(folly::Function<void ()>&&)::{lambda()#1}> > >::_M_run()
# 27 0x00000000000d6df3
# 28 start_thread
# 29 clone

The frame start has already been established and a buffer has been allocated here. However, when creating the frame end, the offset is negative, which triggers an error here. At this point, the buffer allocated for the frame start has not been released in time, resulting in the reported memory leak. The issue can fixed by converting the negative to positive in WindowFuzzer.cpp.

@facebook-github-bot
Copy link
Contributor

@xiaoxmeng merged this pull request in d33cdb2.

Copy link

Conbench analyzed the 1 benchmark run on commit d33cdb25.

There were no benchmark performance regressions. 🎉

The full Conbench report has more details.

ccat3z pushed a commit to ccat3z/velox that referenced this pull request Sep 18, 2024
…kincubator#9025)

Summary:
Unlike `StreamingWindowBuild`, `RowLevelStreamingWindowBuild ` in this PR is capable of processing window functions as rows arrive within a single partition, without the need to wait for the entire partition to be ready. This approach can significantly reduce memory usage, especially when a single partition contains a large amount of data. It is particularly suited for optimizing `rank `and `row_number `functions, as well as aggregate window functions with a default frame.

The detailed discussions is [here](facebookincubator#8975). The design doc is [here](https://docs.google.com/document/d/17ONSJHK8XP5Lixm8XBl01RMNl4ntpixiVFe693ahw6k/edit?usp=sharing).

Pull Request resolved: facebookincubator#9025

Test Plan: Run through 10hrs fuzzer testing

Reviewed By: kagamiori

Differential Revision: D61473798

Pulled By: xiaoxmeng

fbshipit-source-id: 569a752770395330c48a3521bd5421eb89f5623d
weiting-chen pushed a commit to oap-project/velox that referenced this pull request Sep 20, 2024
 to fix the result mismatch in RowsStreamingWindowBuild (#499)

* Revert "Add RowsStreamingWindowBuild to avoid OOM in Window operator (9025)"

This reverts commit f34c9b1.

* Add RowsStreamingWindowBuild to avoid OOM in Window operator (facebookincubator#9025)

Summary:
Unlike `StreamingWindowBuild`, `RowLevelStreamingWindowBuild ` in this PR is capable of processing window functions as rows arrive within a single partition, without the need to wait for the entire partition to be ready. This approach can significantly reduce memory usage, especially when a single partition contains a large amount of data. It is particularly suited for optimizing `rank `and `row_number `functions, as well as aggregate window functions with a default frame.

The detailed discussions is [here](facebookincubator#8975). The design doc is [here](https://docs.google.com/document/d/17ONSJHK8XP5Lixm8XBl01RMNl4ntpixiVFe693ahw6k/edit?usp=sharing).

Pull Request resolved: facebookincubator#9025

Test Plan: Run through 10hrs fuzzer testing

Reviewed By: kagamiori

Differential Revision: D61473798

Pulled By: xiaoxmeng

fbshipit-source-id: 569a752770395330c48a3521bd5421eb89f5623d

* Fix error message

* Fix the result mismatch in RowsStreamingWindowBuild (facebookincubator#10979)

Summary:
For a Range frame, it is necessary to ensure that the peer is ready before commencing the window function computation

Pull Request resolved: facebookincubator#10979

Reviewed By: kagamiori

Differential Revision: D62622816

Pulled By: xiaoxmeng

fbshipit-source-id: 1a9911da416c867c9e295242a05d0f33fbc2e22d

---------

Co-authored-by: Jia Ke <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
CLA Signed This label is managed by the Facebook bot. Authors need to sign the CLA before a PR can be reviewed. Merged
Projects
None yet
Development

Successfully merging this pull request may close these issues.

9 participants