-
Notifications
You must be signed in to change notification settings - Fork 3.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
GH-37796: [C++][Acero] Fix race condition caused by straggling input in the as-of-join node #37839
Conversation
// It may be unintuitive to call Resume() here, but this is to avoid a deadlock. | ||
// Since acero's executor won't terminate if any one node is paused, we need to | ||
// force resume the node before stopping production. | ||
backpressure_control_->Resume(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Perhaps one thing to clarify is whether ResumeInput
behaves idempotently? I.e., is it OK to always call resume, even though only some inputs hit this PauseInput
race condition?
My perusal of source_node.cc
tells me this is OK, but LMK if this is a poor assumption to make.
@@ -19,6 +19,7 @@ | |||
|
|||
#include <atomic> | |||
#include <condition_variable> | |||
#include <iostream> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oops... Will remove
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This seems like a good idea to me.
@icexelloss @rtpsw do either of you want to take a look?
|
||
src_decls.emplace_back("source", | ||
SourceNodeOptions(config.schema, GetGen(config.batches))); | ||
if (config.is_delayed) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I assume this new option triggers the deadlock on the unfixed code?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Correct
// Since acero's executor won't terminate if any one node is paused, we need to | ||
// force resume the node before stopping production. | ||
backpressure_control_->Resume(); | ||
return input_->StopProducing(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So if I understand correctly this means we will call StopProducing
on all right hand side nodes once:
- The left hand side has finished
- The right hand side has caught up
If so, then I agree this is a valid thing to do.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yep.
As an aside, I feel like a more invasive change could fix this issue in the general case. If a node (in this example asof join) has:
- Called
output->InputFinished()
AND - Called
output_->InputReceived
for however many record batches it advertised onInputFinished
We should be able to shut down execution, even if the node's inputs:
- are paused or
- not done streaming
- haven't called
InputFinished
But I think this is a more invasive change to exec_plan.h
and might have some hairy issues that I'm not thinking of.
This looks reasonable to me. Free feel to merge. |
// InputReceived may be called after execution was finished. Pushing it to the | ||
// InputState may cause the BackPressureController to pause the input, causing a | ||
// deadlock |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
// InputReceived may be called after execution was finished. Pushing it to the | |
// InputState may cause the BackPressureController to pause the input, causing a | |
// deadlock | |
// InputReceived may be called after execution was finished. Pushing it to the | |
// InputState is unnecessary since we're done (and anyway may cause the | |
// BackPressureController to pause the input, causing a deadlock), so drop it. |
Do we still deadlock with this short circuit but without ForceShutdown etc?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, the forceShutdown
is still necessary. there's nothing stopping this order of events:
- We receive enough data to finish the as of join.
- Right before we finish processing and shut down the worker thread, lots of unneeded batches come in from input A. Input A pauses
- We shut down the thread, and input A can't be unpaused
Put another way, forceShutdown
keeps us from deadlocking when we ingest unneeded data before the worker thread exits. And this block keeps us from deadlocking when we ingest unneeded data after the worker thread exits.
But your comment change suggestions sound good to me
Clarifying comment for @bkietz added. Ready for more thoughts |
Forgive the ignorance - first time making a PR on arrow. There's no further action needed from me to merge, correct? |
Could you rebase to pick up the fix #37867 ? I think CI should be green after that |
Co-authored-by: Benjamin Kietzman <[email protected]>
Co-authored-by: Benjamin Kietzman <[email protected]>
Sadly there are some seemingly unrelated failures: |
CI failures seem unrelated. I'll merge. Thanks for working on this! |
After merging your PR, Conbench analyzed the 6 benchmarking runs that have been run so far on merge-commit e3d6b9b. There were no benchmark performance regressions. 🎉 The full Conbench report has more details. It also includes information about 1 possible false positive for unstable benchmarks that are known to sometimes produce them. |
…input in the as-of-join node (apache#37839) While asofjoining some large parquet datasets with many row groups, I ran into a deadlock that I described here: apache#37796. Copy pasting below for convenience: 1. The left hand side of the asofjoin completes and is matched with the right hand tables, so `InputFinished` proceeds as [expected](https://github.com/apache/arrow/blob/2455bc07e09cd5341d1fabdb293afbd07682f0b2/cpp/src/arrow/acero/asof_join_node.cc#L1323). So far so good 2. The right hand table(s) of the join are a huge dataset scan. They're still streaming and can legally still call `AsofJoinNode::InputReceived` all they want ([doc ref](https://arrow.apache.org/docs/cpp/api/acero.html#_CPPv4N5arrow5acero8ExecNode13InputReceivedEP8ExecNode9ExecBatch)) 3. Each input batch is blindly pushed to the `InputState`s, which in turn defer to `BackpressureHandler`s to decide whether to pause inputs. ([code pointer](https://github.com/apache/arrow/blob/2455bc07e09cd5341d1fabdb293afbd07682f0b2/cpp/src/arrow/acero/asof_join_node.cc#L1689)) 4. If enough batches come in right after `EndFromProcessThread` is called, then we might exceed the [high_threshold](https://github.com/apache/arrow/blob/2455bc07e09cd5341d1fabdb293afbd07682f0b2/cpp/src/arrow/acero/asof_join_node.cc#L575) and tell the input node to pause via the [BackpressureController](https://github.com/apache/arrow/blob/2455bc07e09cd5341d1fabdb293afbd07682f0b2/cpp/src/arrow/acero/asof_join_node.cc#L540) 5. At this point, the process thread has stopped for the asofjoiner, so the right hand table(s) won't be dequeue'd, meaning `BackpressureController::Resume()` will never be called. This causes a [deadlock](https://arrow.apache.org/docs/cpp/api/acero.html#_CPPv4N5arrow5acero19BackpressureControl5PauseEv) TLDR this is caused by a straggling input node being paused due to backpressure _after_ the process thread has ended. And since every `PauseInput` needs a corresponding `ResumeInput` to exit gracefully, we deadlock. Turns out this is fairly easy to reproduce with small tables, if you make a slow input node composed of 1-row record batches with a synthetic delay. My solution is to: 1. Create a `ForceShutdown` hook that puts the input nodes in a resumed state, and for good measure we call `StopProducing` 2. Also for good measure, if nodes come after the process thread exits, we short circuit and return OK. This is because `InputReceived` can be called an arbitrary number of times after `StopProducing`, so it makes sense to not enqueue useless batches. Yes, I added a delay to the batches of one of the already-existing asofjoin backpressure tests. Checkout out `main`, we get a timeout failure. With my changes, it passes. I considered a more deterministic test, but I struggled to create callbacks in a way that wasn't invasive to the Asof implementation. The idea of using delays was inspired by things I saw in `source_node_test.cc` <!-- We typically require tests for all PRs in order to: 1. Prevent the code from being accidentally broken by subsequent changes 3. Serve as another way to document the expected behavior of the code No * Closes: apache#37796 Lead-authored-by: Jeremy Aguilon <[email protected]> Co-authored-by: Jeremy Aguilon <[email protected]> Co-authored-by: Benjamin Kietzman <[email protected]> Signed-off-by: Benjamin Kietzman <[email protected]>
…input in the as-of-join node (apache#37839) ### Rationale for this change ### What changes are included in this PR? While asofjoining some large parquet datasets with many row groups, I ran into a deadlock that I described here: apache#37796. Copy pasting below for convenience: 1. The left hand side of the asofjoin completes and is matched with the right hand tables, so `InputFinished` proceeds as [expected](https://github.com/apache/arrow/blob/2455bc07e09cd5341d1fabdb293afbd07682f0b2/cpp/src/arrow/acero/asof_join_node.cc#L1323). So far so good 2. The right hand table(s) of the join are a huge dataset scan. They're still streaming and can legally still call `AsofJoinNode::InputReceived` all they want ([doc ref](https://arrow.apache.org/docs/cpp/api/acero.html#_CPPv4N5arrow5acero8ExecNode13InputReceivedEP8ExecNode9ExecBatch)) 3. Each input batch is blindly pushed to the `InputState`s, which in turn defer to `BackpressureHandler`s to decide whether to pause inputs. ([code pointer](https://github.com/apache/arrow/blob/2455bc07e09cd5341d1fabdb293afbd07682f0b2/cpp/src/arrow/acero/asof_join_node.cc#L1689)) 4. If enough batches come in right after `EndFromProcessThread` is called, then we might exceed the [high_threshold](https://github.com/apache/arrow/blob/2455bc07e09cd5341d1fabdb293afbd07682f0b2/cpp/src/arrow/acero/asof_join_node.cc#L575) and tell the input node to pause via the [BackpressureController](https://github.com/apache/arrow/blob/2455bc07e09cd5341d1fabdb293afbd07682f0b2/cpp/src/arrow/acero/asof_join_node.cc#L540) 5. At this point, the process thread has stopped for the asofjoiner, so the right hand table(s) won't be dequeue'd, meaning `BackpressureController::Resume()` will never be called. This causes a [deadlock](https://arrow.apache.org/docs/cpp/api/acero.html#_CPPv4N5arrow5acero19BackpressureControl5PauseEv) TLDR this is caused by a straggling input node being paused due to backpressure _after_ the process thread has ended. And since every `PauseInput` needs a corresponding `ResumeInput` to exit gracefully, we deadlock. Turns out this is fairly easy to reproduce with small tables, if you make a slow input node composed of 1-row record batches with a synthetic delay. My solution is to: 1. Create a `ForceShutdown` hook that puts the input nodes in a resumed state, and for good measure we call `StopProducing` 2. Also for good measure, if nodes come after the process thread exits, we short circuit and return OK. This is because `InputReceived` can be called an arbitrary number of times after `StopProducing`, so it makes sense to not enqueue useless batches. ### Are these changes tested? Yes, I added a delay to the batches of one of the already-existing asofjoin backpressure tests. Checkout out `main`, we get a timeout failure. With my changes, it passes. I considered a more deterministic test, but I struggled to create callbacks in a way that wasn't invasive to the Asof implementation. The idea of using delays was inspired by things I saw in `source_node_test.cc` <!-- We typically require tests for all PRs in order to: 1. Prevent the code from being accidentally broken by subsequent changes 3. Serve as another way to document the expected behavior of the code ### Are there any user-facing changes? No * Closes: apache#37796 Lead-authored-by: Jeremy Aguilon <[email protected]> Co-authored-by: Jeremy Aguilon <[email protected]> Co-authored-by: Benjamin Kietzman <[email protected]> Signed-off-by: Benjamin Kietzman <[email protected]>
Closes: #37796 |
…input in the as-of-join node (apache#37839) ### Rationale for this change ### What changes are included in this PR? While asofjoining some large parquet datasets with many row groups, I ran into a deadlock that I described here: apache#37796. Copy pasting below for convenience: 1. The left hand side of the asofjoin completes and is matched with the right hand tables, so `InputFinished` proceeds as [expected](https://github.com/apache/arrow/blob/2455bc07e09cd5341d1fabdb293afbd07682f0b2/cpp/src/arrow/acero/asof_join_node.cc#L1323). So far so good 2. The right hand table(s) of the join are a huge dataset scan. They're still streaming and can legally still call `AsofJoinNode::InputReceived` all they want ([doc ref](https://arrow.apache.org/docs/cpp/api/acero.html#_CPPv4N5arrow5acero8ExecNode13InputReceivedEP8ExecNode9ExecBatch)) 3. Each input batch is blindly pushed to the `InputState`s, which in turn defer to `BackpressureHandler`s to decide whether to pause inputs. ([code pointer](https://github.com/apache/arrow/blob/2455bc07e09cd5341d1fabdb293afbd07682f0b2/cpp/src/arrow/acero/asof_join_node.cc#L1689)) 4. If enough batches come in right after `EndFromProcessThread` is called, then we might exceed the [high_threshold](https://github.com/apache/arrow/blob/2455bc07e09cd5341d1fabdb293afbd07682f0b2/cpp/src/arrow/acero/asof_join_node.cc#L575) and tell the input node to pause via the [BackpressureController](https://github.com/apache/arrow/blob/2455bc07e09cd5341d1fabdb293afbd07682f0b2/cpp/src/arrow/acero/asof_join_node.cc#L540) 5. At this point, the process thread has stopped for the asofjoiner, so the right hand table(s) won't be dequeue'd, meaning `BackpressureController::Resume()` will never be called. This causes a [deadlock](https://arrow.apache.org/docs/cpp/api/acero.html#_CPPv4N5arrow5acero19BackpressureControl5PauseEv) TLDR this is caused by a straggling input node being paused due to backpressure _after_ the process thread has ended. And since every `PauseInput` needs a corresponding `ResumeInput` to exit gracefully, we deadlock. Turns out this is fairly easy to reproduce with small tables, if you make a slow input node composed of 1-row record batches with a synthetic delay. My solution is to: 1. Create a `ForceShutdown` hook that puts the input nodes in a resumed state, and for good measure we call `StopProducing` 2. Also for good measure, if nodes come after the process thread exits, we short circuit and return OK. This is because `InputReceived` can be called an arbitrary number of times after `StopProducing`, so it makes sense to not enqueue useless batches. ### Are these changes tested? Yes, I added a delay to the batches of one of the already-existing asofjoin backpressure tests. Checkout out `main`, we get a timeout failure. With my changes, it passes. I considered a more deterministic test, but I struggled to create callbacks in a way that wasn't invasive to the Asof implementation. The idea of using delays was inspired by things I saw in `source_node_test.cc` <!-- We typically require tests for all PRs in order to: 1. Prevent the code from being accidentally broken by subsequent changes 3. Serve as another way to document the expected behavior of the code ### Are there any user-facing changes? No * Closes: apache#37796 Lead-authored-by: Jeremy Aguilon <[email protected]> Co-authored-by: Jeremy Aguilon <[email protected]> Co-authored-by: Benjamin Kietzman <[email protected]> Signed-off-by: Benjamin Kietzman <[email protected]>
Rationale for this change
What changes are included in this PR?
While asofjoining some large parquet datasets with many row groups, I ran into a deadlock that I described here: #37796. Copy pasting below for convenience:
InputFinished
proceeds as expected. So far so goodAsofJoinNode::InputReceived
all they want (doc ref)InputState
s, which in turn defer toBackpressureHandler
s to decide whether to pause inputs. (code pointer)EndFromProcessThread
is called, then we might exceed the high_threshold and tell the input node to pause via the BackpressureControllerBackpressureController::Resume()
will never be called. This causes a deadlockTLDR this is caused by a straggling input node being paused due to backpressure after the process thread has ended. And since every
PauseInput
needs a correspondingResumeInput
to exit gracefully, we deadlock.Turns out this is fairly easy to reproduce with small tables, if you make a slow input node composed of 1-row record batches with a synthetic delay.
My solution is to:
ForceShutdown
hook that puts the input nodes in a resumed state, and for good measure we callStopProducing
InputReceived
can be called an arbitrary number of times afterStopProducing
, so it makes sense to not enqueue useless batches.Are these changes tested?
Yes, I added a delay to the batches of one of the already-existing asofjoin backpressure tests. Checkout out
main
, we get a timeout failure. With my changes, it passes.I considered a more deterministic test, but I struggled to create callbacks in a way that wasn't invasive to the Asof implementation. The idea of using delays was inspired by things I saw in
source_node_test.cc