Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[C++][Acero] Race condition in asof join causes execution to stall for large number of record batches #37796

Closed
JerAguilon opened this issue Sep 20, 2023 · 3 comments
Assignees
Labels
Component: C++ Critical Fix Bugfixes for security vulnerabilities, crashes, or invalid data. Type: bug
Milestone

Comments

@JerAguilon
Copy link
Contributor

JerAguilon commented Sep 20, 2023

Describe the bug, including details regarding any error messages, version, and platform.

  • Version: Repro'd on HEAD, v12.0.0, and v13.0.0

I've encountered a subtle race condition in the asof join node that is particularly common for large parquet files with many row groups:

  1. The left hand side of the asofjoin completes, so InputFinished proceeds as expected. So far so good
  2. The right hand table(s) of the join are a huge dataset scan. They're still streaming and can legally still call AsofJoinNode::InputReceived all they want (doc ref)
  3. Each input batch is blindly pushed to the InputStates, which in turn defer to BackpressureHandlers to decide whether to pause inputs. (code pointer)
  4. If enough batches come in right after EndFromProcessThread is called, then we might exceed the high_threshold and tell the input node to pause via the BackpressureController
  5. At this point, the process thread has stopped for the asofjoiner, so the right hand table(s) won't be dequeue'd, meaning BackpressureController::Resume() will never be called. This causes a deadlock

I have hackily fixed this in a local checkout by storing an atomic<bool> of whether EndFromProcessQueue was called. If it turns true, then at InputReceived I shortcircuit and return a Status::OK() without enqueueing the batch. Also at EndFromProcessQueue, I call ResumeProducing for all input nodes.

For good measure, I also call StopProducing() on all the inputs in EndFromProcessQueue... though I don't know if it's necessary

Happy to submit a PR once I find bandwidth, but reporting this early in case others run into it.

Component(s)

C++

@JerAguilon
Copy link
Contributor Author

JerAguilon commented Sep 20, 2023

Made a repro.

I patched #34234 to be able to build a simple example in Python, but mechanically the bug exists in C++ too.

https://gist.github.com/JerAguilon/5a6a80411fd53dad9d9d547003bec12e

Here we do 10 concurrent simple asof joins (500 rows on the left hand side, 5k rows on the right hand side ) and union them. 500rows * 10 asofs=5000, so we expect 5k rows out

The right hand side is a parquet file of row groups of size 1.

If I place ARROW_LOG(ERROR) << "paused"; statements on BackPressureController::Pause() (here) I get several "paused" logs after the 5000'th row is emitted from unioned.to_batches

And the entire thing halts.

@JerAguilon JerAguilon changed the title [C++][Acero] Race condition in asof join causes execution to stall for huge inputs [C++][Acero] Race condition in asof join causes execution to stall for large number of record batches Sep 20, 2023
@JerAguilon
Copy link
Contributor Author

take

@JerAguilon
Copy link
Contributor Author

I have a fix for this FYI, will try to make PR soon.

@bkietz bkietz closed this as completed in e3d6b9b Oct 24, 2023
@bkietz bkietz added this to the 15.0.0 milestone Oct 24, 2023
JerAguilon added a commit to JerAguilon/arrow that referenced this issue Oct 25, 2023
…input in the as-of-join node (apache#37839)

While asofjoining some large parquet datasets with many row groups, I ran into a deadlock that I described here: apache#37796. Copy pasting below for convenience:

1. The left hand side of the asofjoin completes and is matched with the right hand tables, so `InputFinished` proceeds as [expected](https://github.com/apache/arrow/blob/2455bc07e09cd5341d1fabdb293afbd07682f0b2/cpp/src/arrow/acero/asof_join_node.cc#L1323). So far so good
2. The right hand table(s) of the join are a huge dataset scan. They're still streaming and can legally still call `AsofJoinNode::InputReceived` all they want ([doc ref](https://arrow.apache.org/docs/cpp/api/acero.html#_CPPv4N5arrow5acero8ExecNode13InputReceivedEP8ExecNode9ExecBatch))
3. Each input batch is blindly pushed to the `InputState`s, which in turn defer to `BackpressureHandler`s to decide whether to pause inputs. ([code pointer](https://github.com/apache/arrow/blob/2455bc07e09cd5341d1fabdb293afbd07682f0b2/cpp/src/arrow/acero/asof_join_node.cc#L1689))
4. If enough batches come in right after `EndFromProcessThread` is called, then we might exceed the [high_threshold](https://github.com/apache/arrow/blob/2455bc07e09cd5341d1fabdb293afbd07682f0b2/cpp/src/arrow/acero/asof_join_node.cc#L575) and tell the input node to pause via the [BackpressureController](https://github.com/apache/arrow/blob/2455bc07e09cd5341d1fabdb293afbd07682f0b2/cpp/src/arrow/acero/asof_join_node.cc#L540)
5. At this point, the process thread has stopped for the asofjoiner, so the right hand table(s) won't be dequeue'd, meaning `BackpressureController::Resume()` will never be called. This causes a [deadlock](https://arrow.apache.org/docs/cpp/api/acero.html#_CPPv4N5arrow5acero19BackpressureControl5PauseEv)

TLDR this is caused by a straggling input node being paused due to backpressure _after_ the process thread has ended. And since every `PauseInput` needs a corresponding `ResumeInput` to exit gracefully, we deadlock.

Turns out this is fairly easy to reproduce with small tables, if you make a slow input node composed of 1-row record batches with a synthetic delay.

My solution is to:

1. Create a `ForceShutdown` hook that puts the input nodes in a resumed state, and for good measure we call `StopProducing`
2. Also for good measure, if nodes come after the process thread exits, we short circuit and return OK. This is because `InputReceived` can be called an arbitrary number of times after `StopProducing`, so it makes sense to not enqueue useless batches.

Yes, I added a delay to the batches of one of the already-existing asofjoin backpressure tests. Checkout out `main`, we get a timeout failure. With my changes, it passes.

I considered a more deterministic test, but I struggled to create callbacks in a way that wasn't invasive to the Asof implementation. The idea of using delays was inspired by things I saw in `source_node_test.cc`

<!--
We typically require tests for all PRs in order to:
1. Prevent the code from being accidentally broken by subsequent changes
3. Serve as another way to document the expected behavior of the code

No

* Closes: apache#37796

Lead-authored-by: Jeremy Aguilon <[email protected]>
Co-authored-by: Jeremy Aguilon <[email protected]>
Co-authored-by: Benjamin Kietzman <[email protected]>
Signed-off-by: Benjamin Kietzman <[email protected]>
loicalleyne pushed a commit to loicalleyne/arrow that referenced this issue Nov 13, 2023
…input in the as-of-join node (apache#37839)

### Rationale for this change

### What changes are included in this PR?

While asofjoining some large parquet datasets with many row groups, I ran into a deadlock that I described here: apache#37796. Copy pasting below for convenience:

1. The left hand side of the asofjoin completes and is matched with the right hand tables, so `InputFinished` proceeds as [expected](https://github.com/apache/arrow/blob/2455bc07e09cd5341d1fabdb293afbd07682f0b2/cpp/src/arrow/acero/asof_join_node.cc#L1323). So far so good
2. The right hand table(s) of the join are a huge dataset scan. They're still streaming and can legally still call `AsofJoinNode::InputReceived` all they want ([doc ref](https://arrow.apache.org/docs/cpp/api/acero.html#_CPPv4N5arrow5acero8ExecNode13InputReceivedEP8ExecNode9ExecBatch))
3. Each input batch is blindly pushed to the `InputState`s, which in turn defer to `BackpressureHandler`s to decide whether to pause inputs. ([code pointer](https://github.com/apache/arrow/blob/2455bc07e09cd5341d1fabdb293afbd07682f0b2/cpp/src/arrow/acero/asof_join_node.cc#L1689))
4. If enough batches come in right after `EndFromProcessThread` is called, then we might exceed the [high_threshold](https://github.com/apache/arrow/blob/2455bc07e09cd5341d1fabdb293afbd07682f0b2/cpp/src/arrow/acero/asof_join_node.cc#L575) and tell the input node to pause via the [BackpressureController](https://github.com/apache/arrow/blob/2455bc07e09cd5341d1fabdb293afbd07682f0b2/cpp/src/arrow/acero/asof_join_node.cc#L540)
5. At this point, the process thread has stopped for the asofjoiner, so the right hand table(s) won't be dequeue'd, meaning `BackpressureController::Resume()` will never be called. This causes a [deadlock](https://arrow.apache.org/docs/cpp/api/acero.html#_CPPv4N5arrow5acero19BackpressureControl5PauseEv)

TLDR this is caused by a straggling input node being paused due to backpressure _after_ the process thread has ended. And since every `PauseInput` needs a corresponding `ResumeInput` to exit gracefully, we deadlock.

Turns out this is fairly easy to reproduce with small tables, if you make a slow input node composed of 1-row record batches with a synthetic delay. 

My solution is to:

1. Create a `ForceShutdown` hook that puts the input nodes in a resumed state, and for good measure we call `StopProducing`
2. Also for good measure, if nodes come after the process thread exits, we short circuit and return OK. This is because `InputReceived` can be called an arbitrary number of times after `StopProducing`, so it makes sense to not enqueue useless batches.

### Are these changes tested?

Yes, I added a delay to the batches of one of the already-existing asofjoin backpressure tests. Checkout out `main`, we get a timeout failure. With my changes, it passes.

I considered a more deterministic test, but I struggled to create callbacks in a way that wasn't invasive to the Asof implementation. The idea of using delays was inspired by things I saw in `source_node_test.cc`

<!--
We typically require tests for all PRs in order to:
1. Prevent the code from being accidentally broken by subsequent changes
3. Serve as another way to document the expected behavior of the code

### Are there any user-facing changes?

No

* Closes: apache#37796

Lead-authored-by: Jeremy Aguilon <[email protected]>
Co-authored-by: Jeremy Aguilon <[email protected]>
Co-authored-by: Benjamin Kietzman <[email protected]>
Signed-off-by: Benjamin Kietzman <[email protected]>
@amoeba amoeba added the Critical Fix Bugfixes for security vulnerabilities, crashes, or invalid data. label Jan 13, 2024
dgreiss pushed a commit to dgreiss/arrow that referenced this issue Feb 19, 2024
…input in the as-of-join node (apache#37839)

### Rationale for this change

### What changes are included in this PR?

While asofjoining some large parquet datasets with many row groups, I ran into a deadlock that I described here: apache#37796. Copy pasting below for convenience:

1. The left hand side of the asofjoin completes and is matched with the right hand tables, so `InputFinished` proceeds as [expected](https://github.com/apache/arrow/blob/2455bc07e09cd5341d1fabdb293afbd07682f0b2/cpp/src/arrow/acero/asof_join_node.cc#L1323). So far so good
2. The right hand table(s) of the join are a huge dataset scan. They're still streaming and can legally still call `AsofJoinNode::InputReceived` all they want ([doc ref](https://arrow.apache.org/docs/cpp/api/acero.html#_CPPv4N5arrow5acero8ExecNode13InputReceivedEP8ExecNode9ExecBatch))
3. Each input batch is blindly pushed to the `InputState`s, which in turn defer to `BackpressureHandler`s to decide whether to pause inputs. ([code pointer](https://github.com/apache/arrow/blob/2455bc07e09cd5341d1fabdb293afbd07682f0b2/cpp/src/arrow/acero/asof_join_node.cc#L1689))
4. If enough batches come in right after `EndFromProcessThread` is called, then we might exceed the [high_threshold](https://github.com/apache/arrow/blob/2455bc07e09cd5341d1fabdb293afbd07682f0b2/cpp/src/arrow/acero/asof_join_node.cc#L575) and tell the input node to pause via the [BackpressureController](https://github.com/apache/arrow/blob/2455bc07e09cd5341d1fabdb293afbd07682f0b2/cpp/src/arrow/acero/asof_join_node.cc#L540)
5. At this point, the process thread has stopped for the asofjoiner, so the right hand table(s) won't be dequeue'd, meaning `BackpressureController::Resume()` will never be called. This causes a [deadlock](https://arrow.apache.org/docs/cpp/api/acero.html#_CPPv4N5arrow5acero19BackpressureControl5PauseEv)

TLDR this is caused by a straggling input node being paused due to backpressure _after_ the process thread has ended. And since every `PauseInput` needs a corresponding `ResumeInput` to exit gracefully, we deadlock.

Turns out this is fairly easy to reproduce with small tables, if you make a slow input node composed of 1-row record batches with a synthetic delay. 

My solution is to:

1. Create a `ForceShutdown` hook that puts the input nodes in a resumed state, and for good measure we call `StopProducing`
2. Also for good measure, if nodes come after the process thread exits, we short circuit and return OK. This is because `InputReceived` can be called an arbitrary number of times after `StopProducing`, so it makes sense to not enqueue useless batches.

### Are these changes tested?

Yes, I added a delay to the batches of one of the already-existing asofjoin backpressure tests. Checkout out `main`, we get a timeout failure. With my changes, it passes.

I considered a more deterministic test, but I struggled to create callbacks in a way that wasn't invasive to the Asof implementation. The idea of using delays was inspired by things I saw in `source_node_test.cc`

<!--
We typically require tests for all PRs in order to:
1. Prevent the code from being accidentally broken by subsequent changes
3. Serve as another way to document the expected behavior of the code

### Are there any user-facing changes?

No

* Closes: apache#37796

Lead-authored-by: Jeremy Aguilon <[email protected]>
Co-authored-by: Jeremy Aguilon <[email protected]>
Co-authored-by: Benjamin Kietzman <[email protected]>
Signed-off-by: Benjamin Kietzman <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Component: C++ Critical Fix Bugfixes for security vulnerabilities, crashes, or invalid data. Type: bug
Projects
None yet
Development

No branches or pull requests

3 participants