From 8f4033476d92ee2d84c1e98ff762e04ff51d07b1 Mon Sep 17 00:00:00 2001 From: Jeremy Aguilon Date: Wed, 20 Sep 2023 17:05:05 -0400 Subject: [PATCH 01/10] Init fix to the asofjoin node --- cpp/src/arrow/acero/asof_join_node.cc | 41 +++++++++++++++++++--- cpp/src/arrow/acero/asof_join_node_test.cc | 4 ++- 2 files changed, 39 insertions(+), 6 deletions(-) diff --git a/cpp/src/arrow/acero/asof_join_node.cc b/cpp/src/arrow/acero/asof_join_node.cc index 23c07b8acb95f..1201d20334cf1 100644 --- a/cpp/src/arrow/acero/asof_join_node.cc +++ b/cpp/src/arrow/acero/asof_join_node.cc @@ -549,15 +549,16 @@ class BackpressureController : public BackpressureControl { class BackpressureHandler { private: - BackpressureHandler(size_t low_threshold, size_t high_threshold, + BackpressureHandler(ExecNode* input, size_t low_threshold, size_t high_threshold, std::unique_ptr backpressure_control) - : low_threshold_(low_threshold), + : input_(input), + low_threshold_(low_threshold), high_threshold_(high_threshold), backpressure_control_(std::move(backpressure_control)) {} public: static Result Make( - size_t low_threshold, size_t high_threshold, + ExecNode* input, size_t low_threshold, size_t high_threshold, std::unique_ptr backpressure_control) { if (low_threshold >= high_threshold) { return Status::Invalid("low threshold (", low_threshold, @@ -566,7 +567,7 @@ class BackpressureHandler { if (backpressure_control == NULLPTR) { return Status::Invalid("null backpressure control parameter"); } - BackpressureHandler backpressure_handler(low_threshold, high_threshold, + BackpressureHandler backpressure_handler(input, low_threshold, high_threshold, std::move(backpressure_control)); return std::move(backpressure_handler); } @@ -579,7 +580,16 @@ class BackpressureHandler { } } + Status ForceShutdown() { + // It may be unintuitive to call Resume() here, but this is to avoid a deadlock. + // Since acero's executor won't terminate if any one node is paused, we need to + // force resume the node before stopping production. + backpressure_control_->Resume(); + return input_->StopProducing(); + } + private: + ExecNode* input_; size_t low_threshold_; size_t high_threshold_; std::unique_ptr backpressure_control_; @@ -629,6 +639,8 @@ class BackpressureConcurrentQueue : public ConcurrentQueue { return ConcurrentQueue::TryPopUnlocked(); } + Status ForceShutdown() { return handler_.ForceShutdown(); } + private: BackpressureHandler handler_; }; @@ -673,7 +685,7 @@ class InputState { std::make_unique( /*node=*/asof_input, /*output=*/asof_node, backpressure_counter); ARROW_ASSIGN_OR_RAISE(auto handler, - BackpressureHandler::Make(low_threshold, high_threshold, + BackpressureHandler::Make(asof_input, low_threshold, high_threshold, std::move(backpressure_control))); return std::make_unique(index, tolerance, must_hash, may_rehash, key_hasher, asof_node, std::move(handler), schema, @@ -930,6 +942,12 @@ class InputState { total_batches_ = n; } + Status ForceShutdown() { + // Force the upstream input node to unpause. Necessary to avoid deadlock when we + // terminate the process thread + return queue_.ForceShutdown(); + } + private: // Pending record batches. The latest is the front. Batches cannot be empty. BackpressureConcurrentQueue> queue_; @@ -1323,6 +1341,12 @@ class AsofJoinNode : public ExecNode { if (st.ok()) { st = output_->InputFinished(this, batches_produced_); } + for (const auto& s : state_) { + auto shutdownResult = s->ForceShutdown(); + if (!shutdownResult.ok()) { + st = shutdownResult; + } + } })); } @@ -1679,6 +1703,13 @@ class AsofJoinNode : public ExecNode { const Ordering& ordering() const override { return ordering_; } Status InputReceived(ExecNode* input, ExecBatch batch) override { + // InputReceived may be called after execution was finished. Pushing it to the + // InputState may cause the BackPressureController to pause the input, causing a + // deadlock + if (process_task_.is_finished()) { + return Status::OK(); + } + // Get the input ARROW_DCHECK(std_has(inputs_, input)); size_t k = std_find(inputs_, input) - inputs_.begin(); diff --git a/cpp/src/arrow/acero/asof_join_node_test.cc b/cpp/src/arrow/acero/asof_join_node_test.cc index 96c00e6a4bd59..44f20784cac58 100644 --- a/cpp/src/arrow/acero/asof_join_node_test.cc +++ b/cpp/src/arrow/acero/asof_join_node_test.cc @@ -1439,6 +1439,8 @@ void TestBackpressure(BatchesMaker maker, int num_batches, int batch_size) { [shift](int row) -> int64_t { return row * 10 + shift; }}, schema, num_batches, batch_size); }; + // WIP + ASSERT_OK(Status::Invalid("FOO")); ASSERT_OK_AND_ASSIGN(auto l_batches, make_shift(l_schema, 0)); ASSERT_OK_AND_ASSIGN(auto r0_batches, make_shift(r0_schema, 1)); ASSERT_OK_AND_ASSIGN(auto r1_batches, make_shift(r1_schema, 2)); @@ -1529,7 +1531,7 @@ void TestBackpressure(BatchesMaker maker, int num_batches, int batch_size) { } TEST(AsofJoinTest, BackpressureWithBatches) { - return TestBackpressure(MakeIntegerBatches, /*num_batches=*/20, /*batch_size=*/1); + return TestBackpressure(MakeIntegerBatches, /*num_batches=*/100000, /*batch_size=*/1); } template From 6750a9fc5faf8830b7b9fe48fcbab4f487e4f817 Mon Sep 17 00:00:00 2001 From: Jeremy Aguilon Date: Fri, 22 Sep 2023 18:21:43 -0400 Subject: [PATCH 02/10] Start testing --- cpp/src/arrow/acero/asof_join_node.cc | 42 ++++++++++++++++------ cpp/src/arrow/acero/asof_join_node.h | 2 ++ cpp/src/arrow/acero/asof_join_node_test.cc | 42 +++++++++++++++++----- cpp/src/arrow/acero/options.h | 6 +++- cpp/src/arrow/acero/source_node.cc | 4 +++ cpp/src/arrow/acero/test_nodes.cc | 41 ++++++++++++++++++++- 6 files changed, 115 insertions(+), 22 deletions(-) diff --git a/cpp/src/arrow/acero/asof_join_node.cc b/cpp/src/arrow/acero/asof_join_node.cc index 1201d20334cf1..868a8be6873ad 100644 --- a/cpp/src/arrow/acero/asof_join_node.cc +++ b/cpp/src/arrow/acero/asof_join_node.cc @@ -17,6 +17,7 @@ #include "arrow/acero/asof_join_node.h" +#include #include #include #include @@ -538,8 +539,13 @@ class BackpressureController : public BackpressureControl { std::atomic& backpressure_counter) : node_(node), output_(output), backpressure_counter_(backpressure_counter) {} - void Pause() override { node_->PauseProducing(output_, ++backpressure_counter_); } - void Resume() override { node_->ResumeProducing(output_, ++backpressure_counter_); } + void Pause() override { + std::cout << "PAUSE PRODUCING" << std::endl; + node_->PauseProducing(output_, ++backpressure_counter_); } + void Resume() override { + + std::cout << "RESUME PRODUCING" << std::endl; + node_->ResumeProducing(output_, ++backpressure_counter_); } private: ExecNode* node_; @@ -1331,8 +1337,7 @@ class AsofJoinNode : public ExecNode { }; void EndFromProcessThread(Status st = Status::OK()) { - // We must spawn a new task to transfer off the process thread when - // marking this finished. Otherwise there is a chance that doing so could + // We must spawn a new task to transfer off the process thread when marking this finished. Otherwise there is a chance that doing so could // mark the plan finished which may destroy the plan which will destroy this // node which will cause us to join on ourselves. ARROW_UNUSED( @@ -1341,12 +1346,21 @@ class AsofJoinNode : public ExecNode { if (st.ok()) { st = output_->InputFinished(this, batches_produced_); } - for (const auto& s : state_) { - auto shutdownResult = s->ForceShutdown(); - if (!shutdownResult.ok()) { - st = shutdownResult; - } + //for (const auto& s : state_) { + // auto shutdownResult = s->ForceShutdown(); + // if (!shutdownResult.ok()) { + // st = shutdownResult; + // } + //} + DEBUG_SYNC(this, "Done producing", DEBUG_MANIP(std::endl)); +#ifndef NDEBUG + if (thread_finished_callback_) { + DEBUG_SYNC(this, "Calling callback", DEBUG_MANIP(std::endl)); + thread_finished_callback_(); + } else { + DEBUG_SYNC(this, "No callback", DEBUG_MANIP(std::endl)); } +#endif })); } @@ -1707,7 +1721,8 @@ class AsofJoinNode : public ExecNode { // InputState may cause the BackPressureController to pause the input, causing a // deadlock if (process_task_.is_finished()) { - return Status::OK(); + DEBUG_SYNC(this, "Input received while done. Num rows: ", batch.length, DEBUG_MANIP(std::endl)); + //return Status::OK(); } // Get the input @@ -1763,7 +1778,10 @@ class AsofJoinNode : public ExecNode { } #ifndef NDEBUG - std::ostream* GetDebugStream() { return debug_os_; } + std::ostream* GetDebugStream() { + return &std::cout; + //return debug_os_; + } std::mutex* GetDebugMutex() { return debug_mutex_; } #endif @@ -1784,6 +1802,7 @@ class AsofJoinNode : public ExecNode { #ifndef NDEBUG std::ostream* debug_os_; std::mutex* debug_mutex_; + std::function thread_finished_callback_; #endif // Backpressure counter common to all inputs @@ -1819,6 +1838,7 @@ AsofJoinNode::AsofJoinNode(ExecPlan* plan, NodeVector inputs, #ifndef NDEBUG debug_os_(join_options.debug_opts ? join_options.debug_opts->os : nullptr), debug_mutex_(join_options.debug_opts ? join_options.debug_opts->mutex : nullptr), + thread_finished_callback_(join_options.finishedCallback), #endif backpressure_counter_(1), process_(), diff --git a/cpp/src/arrow/acero/asof_join_node.h b/cpp/src/arrow/acero/asof_join_node.h index 6a0ce8fd386b0..518c3f4f3b001 100644 --- a/cpp/src/arrow/acero/asof_join_node.h +++ b/cpp/src/arrow/acero/asof_join_node.h @@ -26,6 +26,8 @@ namespace arrow { namespace acero { namespace asofjoin { +class ExecNode; + using AsofJoinKeys = AsofJoinNodeOptions::Keys; /// \brief Make the output schema of an as-of-join node diff --git a/cpp/src/arrow/acero/asof_join_node_test.cc b/cpp/src/arrow/acero/asof_join_node_test.cc index 44f20784cac58..40cca87fd5ac6 100644 --- a/cpp/src/arrow/acero/asof_join_node_test.cc +++ b/cpp/src/arrow/acero/asof_join_node_test.cc @@ -1439,10 +1439,17 @@ void TestBackpressure(BatchesMaker maker, int num_batches, int batch_size) { [shift](int row) -> int64_t { return row * 10 + shift; }}, schema, num_batches, batch_size); }; - // WIP - ASSERT_OK(Status::Invalid("FOO")); + auto make_shift_gated = [&maker, batch_size]( + const std::shared_ptr& schema, int shift) { + int n = 100; + return maker({[](int row) -> int64_t { return row; }, + [n](int row) -> int64_t { return row / n; }, + [shift](int row) -> int64_t { return row * 10 + shift; }}, + schema, n, batch_size); + }; ASSERT_OK_AND_ASSIGN(auto l_batches, make_shift(l_schema, 0)); - ASSERT_OK_AND_ASSIGN(auto r0_batches, make_shift(r0_schema, 1)); + // CHANGED + ASSERT_OK_AND_ASSIGN(auto r0_batches, make_shift_gated(r0_schema, 1)); ASSERT_OK_AND_ASSIGN(auto r1_batches, make_shift(r1_schema, 2)); BackpressureCountingNode::Register(); @@ -1488,11 +1495,20 @@ void TestBackpressure(BatchesMaker maker, int num_batches, int batch_size) { if (config.is_gated) { bp_decl = {std::string{GatedNodeOptions::kName}, {bp_decl}, gate_options}; } - bp_decls.push_back(bp_decl); + bp_decls.emplace_back(bp_decl); } + bool finished = false; + + auto opts = GetRepeatedOptions(source_configs.size(), "time", {"key"}, 0); + opts.finishedCallback = [&finished, &gate] { + std::cout << "IN FINISHED CALLBACK" << std::endl; + finished = true; + gate.ReleaseAllBatches(); + }; + Declaration asofjoin = {"asofjoin", bp_decls, - GetRepeatedOptions(source_configs.size(), "time", {"key"}, 0)}; + opts}; ASSERT_OK_AND_ASSIGN(std::shared_ptr tpool, internal::ThreadPool::Make(1)); @@ -1514,13 +1530,19 @@ void TestBackpressure(BatchesMaker maker, int num_batches, int batch_size) { return true; }; - BusyWait(10.0, has_bp_been_applied); + BusyWait(60.0, has_bp_been_applied); ASSERT_TRUE(has_bp_been_applied()); - gate.ReleaseAllBatches(); + for (int i = 0 ; i < 25; i++) { + gate.ReleaseOneBatch(); + } + + auto has_finished = [&] { return finished; }; + BusyWait(60.0, has_finished); + ASSERT_FINISHES_OK_AND_ASSIGN(BatchesWithCommonSchema batches, batches_fut); - // One of the inputs is gated. The other two will eventually be resumed by the asof + // One of the inputs is gated and was released. The other two will eventually be resumed by the asof // join node for (size_t i = 0; i < source_configs.size(); i++) { const auto& counters = bp_counters[i]; @@ -1528,10 +1550,12 @@ void TestBackpressure(BatchesMaker maker, int num_batches, int batch_size) { ASSERT_GE(counters.resume_count, 0); } } + + ASSERT_TRUE(false); } TEST(AsofJoinTest, BackpressureWithBatches) { - return TestBackpressure(MakeIntegerBatches, /*num_batches=*/100000, /*batch_size=*/1); + return TestBackpressure(MakeIntegerBatches, /*num_batches=*/20, /*batch_size=*/1); } template diff --git a/cpp/src/arrow/acero/options.h b/cpp/src/arrow/acero/options.h index 1ede3fbfc8ed0..6d73010a2a2e2 100644 --- a/cpp/src/arrow/acero/options.h +++ b/cpp/src/arrow/acero/options.h @@ -710,7 +710,8 @@ class ARROW_ACERO_EXPORT AsofJoinNodeOptions : public ExecNodeOptions { std::vector by_key; }; - AsofJoinNodeOptions(std::vector input_keys, int64_t tolerance) + AsofJoinNodeOptions(std::vector input_keys, int64_t tolerance, + std::function finishCallback = {}) : input_keys(std::move(input_keys)), tolerance(tolerance) {} /// \brief AsofJoin keys per input table. At least two keys must be given. The first key @@ -727,6 +728,9 @@ class ARROW_ACERO_EXPORT AsofJoinNodeOptions : public ExecNodeOptions { /// /// The tolerance is interpreted in the same units as the "on" key. int64_t tolerance; + + /// \brief TODO + std::function finishedCallback; }; /// \brief a node which select top_k/bottom_k rows passed through it diff --git a/cpp/src/arrow/acero/source_node.cc b/cpp/src/arrow/acero/source_node.cc index 8060e01f074f8..d383bc800cf28 100644 --- a/cpp/src/arrow/acero/source_node.cc +++ b/cpp/src/arrow/acero/source_node.cc @@ -19,6 +19,7 @@ #include #include +#include #include "arrow/acero/exec_plan.h" #include "arrow/acero/options.h" #include "arrow/acero/query_context.h" @@ -244,10 +245,13 @@ struct SourceNode : ExecNode, public TracedNode { // Could happen if we get something like Pause(1) Pause(3) Resume(2) return; } + + std::cout << "Pausing " << counter << " bp counter " << backpressure_counter_ << std::endl; backpressure_future_ = Future<>::Make(); } void ResumeProducing(ExecNode* output, int32_t counter) override { + std::cout << "Resuming " << counter << " bp counter " << backpressure_counter_ << std::endl; Future<> to_finish; { std::lock_guard lg(mutex_); diff --git a/cpp/src/arrow/acero/test_nodes.cc b/cpp/src/arrow/acero/test_nodes.cc index e109afbe1bffb..9854f6a049089 100644 --- a/cpp/src/arrow/acero/test_nodes.cc +++ b/cpp/src/arrow/acero/test_nodes.cc @@ -298,11 +298,25 @@ struct GatedNode : public ExecNode, public TracedNode { } void PauseProducing(ExecNode* output, int32_t counter) override { + std::cout << "PAUSE" << std::endl; + std::lock_guard lg(mutex_); inputs_[0]->PauseProducing(this, counter); + if (!backpressure_future_.is_finished()) { + return; + } + backpressure_future_ = Future<>::Make(); } void ResumeProducing(ExecNode* output, int32_t counter) override { - inputs_[0]->ResumeProducing(this, counter); + std::cout << "RESUME" << std::endl; + Future<> to_finish; + { + std::lock_guard lg(mutex_); + inputs_[0]->ResumeProducing(this, counter); + to_finish = backpressure_future_; + backpressure_future_ = Future<>::MakeFinished(); + } + to_finish.MarkFinished(); } Status StopProducingImpl() override { return Status::OK(); } @@ -325,11 +339,32 @@ struct GatedNode : public ExecNode, public TracedNode { if (callback_added) { break; } + + //if (!backpressure_future_.is_finished()) { + // backpressure_future_.Wait(); + //} + callback_added = backpressure_future_.TryAddCallback([this] { + return [this](const Status& st) { + DCHECK_OK(st); + plan_->query_context()->ScheduleTask( + [this] { + std::unique_lock lk(mutex_); + return SendBatchesUnlocked(std::move(lk)); + }, + "GatedNode::BackpressureApplied"); + }; + }); + if (callback_added) { + break; + } + // Otherwise, the future is already finished which means the gate is unlocked // and we are allowed to send a batch ExecBatch next = std::move(queued_batches_.front()); queued_batches_.pop(); lock.unlock(); + + // TODO: this deadlocks ARROW_RETURN_NOT_OK(output_->InputReceived(this, std::move(next))); lock.lock(); } @@ -351,6 +386,10 @@ struct GatedNode : public ExecNode, public TracedNode { Gate* gate_; std::queue queued_batches_; std::mutex mutex_; + // We preemptively pause inputs to more aggressively cover + // race conditions in tests, since source nodes may send + // extra batches before inputs are paused + Future<> backpressure_future_ = Future<>::MakeFinished(); }; } // namespace From 44beeb9d1a8bc0c78c5901eab46c51e32b5a5fa1 Mon Sep 17 00:00:00 2001 From: Jeremy Aguilon Date: Fri, 22 Sep 2023 21:42:36 -0400 Subject: [PATCH 03/10] Add test --- cpp/src/arrow/acero/asof_join_node.cc | 58 ++++++----------- cpp/src/arrow/acero/asof_join_node.h | 2 - cpp/src/arrow/acero/asof_join_node_test.cc | 76 ++++++++++------------ cpp/src/arrow/acero/options.h | 6 +- cpp/src/arrow/acero/source_node.cc | 4 -- cpp/src/arrow/acero/test_nodes.cc | 41 +----------- cpp/src/arrow/acero/test_nodes.h | 4 +- 7 files changed, 58 insertions(+), 133 deletions(-) diff --git a/cpp/src/arrow/acero/asof_join_node.cc b/cpp/src/arrow/acero/asof_join_node.cc index 868a8be6873ad..94dd112792d00 100644 --- a/cpp/src/arrow/acero/asof_join_node.cc +++ b/cpp/src/arrow/acero/asof_join_node.cc @@ -17,9 +17,9 @@ #include "arrow/acero/asof_join_node.h" -#include #include #include +#include #include #include #include @@ -539,13 +539,8 @@ class BackpressureController : public BackpressureControl { std::atomic& backpressure_counter) : node_(node), output_(output), backpressure_counter_(backpressure_counter) {} - void Pause() override { - std::cout << "PAUSE PRODUCING" << std::endl; - node_->PauseProducing(output_, ++backpressure_counter_); } - void Resume() override { - - std::cout << "RESUME PRODUCING" << std::endl; - node_->ResumeProducing(output_, ++backpressure_counter_); } + void Pause() override { node_->PauseProducing(output_, ++backpressure_counter_); } + void Resume() override { node_->ResumeProducing(output_, ++backpressure_counter_); } private: ExecNode* node_; @@ -690,9 +685,9 @@ class InputState { std::unique_ptr backpressure_control = std::make_unique( /*node=*/asof_input, /*output=*/asof_node, backpressure_counter); - ARROW_ASSIGN_OR_RAISE(auto handler, - BackpressureHandler::Make(asof_input, low_threshold, high_threshold, - std::move(backpressure_control))); + ARROW_ASSIGN_OR_RAISE( + auto handler, BackpressureHandler::Make(asof_input, low_threshold, high_threshold, + std::move(backpressure_control))); return std::make_unique(index, tolerance, must_hash, may_rehash, key_hasher, asof_node, std::move(handler), schema, time_col_index, key_col_index); @@ -1337,7 +1332,8 @@ class AsofJoinNode : public ExecNode { }; void EndFromProcessThread(Status st = Status::OK()) { - // We must spawn a new task to transfer off the process thread when marking this finished. Otherwise there is a chance that doing so could + // We must spawn a new task to transfer off the process thread when + // marking this finished. Otherwise there is a chance that doing so could // mark the plan finished which may destroy the plan which will destroy this // node which will cause us to join on ourselves. ARROW_UNUSED( @@ -1346,21 +1342,12 @@ class AsofJoinNode : public ExecNode { if (st.ok()) { st = output_->InputFinished(this, batches_produced_); } - //for (const auto& s : state_) { - // auto shutdownResult = s->ForceShutdown(); - // if (!shutdownResult.ok()) { - // st = shutdownResult; - // } - //} - DEBUG_SYNC(this, "Done producing", DEBUG_MANIP(std::endl)); -#ifndef NDEBUG - if (thread_finished_callback_) { - DEBUG_SYNC(this, "Calling callback", DEBUG_MANIP(std::endl)); - thread_finished_callback_(); - } else { - DEBUG_SYNC(this, "No callback", DEBUG_MANIP(std::endl)); + for (const auto& s : state_) { + auto shutdownResult = s->ForceShutdown(); + if (!shutdownResult.ok()) { + st = shutdownResult; + } } -#endif })); } @@ -1720,10 +1707,6 @@ class AsofJoinNode : public ExecNode { // InputReceived may be called after execution was finished. Pushing it to the // InputState may cause the BackPressureController to pause the input, causing a // deadlock - if (process_task_.is_finished()) { - DEBUG_SYNC(this, "Input received while done. Num rows: ", batch.length, DEBUG_MANIP(std::endl)); - //return Status::OK(); - } // Get the input ARROW_DCHECK(std_has(inputs_, input)); @@ -1731,8 +1714,12 @@ class AsofJoinNode : public ExecNode { // Put into the queue auto rb = *batch.ToRecordBatch(input->output_schema()); - DEBUG_SYNC(this, "received batch from input ", k, ":", DEBUG_MANIP(std::endl), - rb->ToString(), DEBUG_MANIP(std::endl)); + if (process_task_.is_finished()) { + DEBUG_SYNC(this, "Input received while done. Short circuiting.", + DEBUG_MANIP(std::endl)); + return Status::OK(); + } + ARROW_RETURN_NOT_OK(state_.at(k)->Push(rb)); process_.Push(true); return Status::OK(); @@ -1778,10 +1765,7 @@ class AsofJoinNode : public ExecNode { } #ifndef NDEBUG - std::ostream* GetDebugStream() { - return &std::cout; - //return debug_os_; - } + std::ostream* GetDebugStream() { return debug_os_; } std::mutex* GetDebugMutex() { return debug_mutex_; } #endif @@ -1802,7 +1786,6 @@ class AsofJoinNode : public ExecNode { #ifndef NDEBUG std::ostream* debug_os_; std::mutex* debug_mutex_; - std::function thread_finished_callback_; #endif // Backpressure counter common to all inputs @@ -1838,7 +1821,6 @@ AsofJoinNode::AsofJoinNode(ExecPlan* plan, NodeVector inputs, #ifndef NDEBUG debug_os_(join_options.debug_opts ? join_options.debug_opts->os : nullptr), debug_mutex_(join_options.debug_opts ? join_options.debug_opts->mutex : nullptr), - thread_finished_callback_(join_options.finishedCallback), #endif backpressure_counter_(1), process_(), diff --git a/cpp/src/arrow/acero/asof_join_node.h b/cpp/src/arrow/acero/asof_join_node.h index 518c3f4f3b001..6a0ce8fd386b0 100644 --- a/cpp/src/arrow/acero/asof_join_node.h +++ b/cpp/src/arrow/acero/asof_join_node.h @@ -26,8 +26,6 @@ namespace arrow { namespace acero { namespace asofjoin { -class ExecNode; - using AsofJoinKeys = AsofJoinNodeOptions::Keys; /// \brief Make the output schema of an as-of-join node diff --git a/cpp/src/arrow/acero/asof_join_node_test.cc b/cpp/src/arrow/acero/asof_join_node_test.cc index 40cca87fd5ac6..df3172b2a09bc 100644 --- a/cpp/src/arrow/acero/asof_join_node_test.cc +++ b/cpp/src/arrow/acero/asof_join_node_test.cc @@ -1424,7 +1424,8 @@ AsyncGenerator> GetGen(BatchesWithSchema bws) { } template -void TestBackpressure(BatchesMaker maker, int num_batches, int batch_size) { +void TestBackpressure(BatchesMaker maker, int batch_size, int num_l_batches, + int num_r0_batches, int num_r1_batches, bool slow_r0) { auto l_schema = schema({field("time", int32()), field("key", int32()), field("l_value", int32())}); auto r0_schema = @@ -1432,25 +1433,17 @@ void TestBackpressure(BatchesMaker maker, int num_batches, int batch_size) { auto r1_schema = schema({field("time", int32()), field("key", int32()), field("r1_value", int32())}); - auto make_shift = [&maker, num_batches, batch_size]( - const std::shared_ptr& schema, int shift) { + auto make_shift = [&maker, batch_size](int num_batches, + const std::shared_ptr& schema, + int shift) { return maker({[](int row) -> int64_t { return row; }, [num_batches](int row) -> int64_t { return row / num_batches; }, [shift](int row) -> int64_t { return row * 10 + shift; }}, schema, num_batches, batch_size); }; - auto make_shift_gated = [&maker, batch_size]( - const std::shared_ptr& schema, int shift) { - int n = 100; - return maker({[](int row) -> int64_t { return row; }, - [n](int row) -> int64_t { return row / n; }, - [shift](int row) -> int64_t { return row * 10 + shift; }}, - schema, n, batch_size); - }; - ASSERT_OK_AND_ASSIGN(auto l_batches, make_shift(l_schema, 0)); - // CHANGED - ASSERT_OK_AND_ASSIGN(auto r0_batches, make_shift_gated(r0_schema, 1)); - ASSERT_OK_AND_ASSIGN(auto r1_batches, make_shift(r1_schema, 2)); + ASSERT_OK_AND_ASSIGN(auto l_batches, make_shift(num_l_batches, l_schema, 0)); + ASSERT_OK_AND_ASSIGN(auto r0_batches, make_shift(num_r0_batches, r0_schema, 1)); + ASSERT_OK_AND_ASSIGN(auto r1_batches, make_shift(num_r1_batches, r1_schema, 2)); BackpressureCountingNode::Register(); RegisterTestNodes(); // for GatedNode @@ -1458,6 +1451,7 @@ void TestBackpressure(BatchesMaker maker, int num_batches, int batch_size) { struct BackpressureSourceConfig { std::string name_prefix; bool is_gated; + bool is_delayed; std::shared_ptr schema; decltype(l_batches) batches; @@ -1472,9 +1466,9 @@ void TestBackpressure(BatchesMaker maker, int num_batches, int batch_size) { // Two ungated and one gated std::vector source_configs = { - {"0", false, l_schema, l_batches}, - {"1", true, r0_schema, r0_batches}, - {"2", false, r1_schema, r1_batches}, + {"0", false, false, l_schema, l_batches}, + {"1", true, slow_r0, r0_schema, r0_batches}, + {"2", false, false, r1_schema, r1_batches}, }; std::vector bp_counters(source_configs.size()); @@ -1483,9 +1477,16 @@ void TestBackpressure(BatchesMaker maker, int num_batches, int batch_size) { std::vector bp_decls; for (size_t i = 0; i < source_configs.size(); i++) { const auto& config = source_configs[i]; - - src_decls.emplace_back("source", - SourceNodeOptions(config.schema, GetGen(config.batches))); + if (config.is_delayed) { + src_decls.emplace_back( + "source", + SourceNodeOptions(config.schema, MakeDelayedGen(config.batches, "slow_source", + /*delay_sec=*/0.5, + /*noisy=*/false))); + } else { + src_decls.emplace_back("source", + SourceNodeOptions(config.schema, GetGen(config.batches))); + } bp_options.push_back( std::make_shared(&bp_counters[i])); std::shared_ptr options = bp_options.back(); @@ -1498,17 +1499,9 @@ void TestBackpressure(BatchesMaker maker, int num_batches, int batch_size) { bp_decls.emplace_back(bp_decl); } - bool finished = false; - auto opts = GetRepeatedOptions(source_configs.size(), "time", {"key"}, 0); - opts.finishedCallback = [&finished, &gate] { - std::cout << "IN FINISHED CALLBACK" << std::endl; - finished = true; - gate.ReleaseAllBatches(); - }; - Declaration asofjoin = {"asofjoin", bp_decls, - opts}; + Declaration asofjoin = {"asofjoin", bp_decls, opts}; ASSERT_OK_AND_ASSIGN(std::shared_ptr tpool, internal::ThreadPool::Make(1)); @@ -1533,29 +1526,23 @@ void TestBackpressure(BatchesMaker maker, int num_batches, int batch_size) { BusyWait(60.0, has_bp_been_applied); ASSERT_TRUE(has_bp_been_applied()); - for (int i = 0 ; i < 25; i++) { - gate.ReleaseOneBatch(); - } - - auto has_finished = [&] { return finished; }; - BusyWait(60.0, has_finished); - + gate.ReleaseAllBatches(); ASSERT_FINISHES_OK_AND_ASSIGN(BatchesWithCommonSchema batches, batches_fut); - // One of the inputs is gated and was released. The other two will eventually be resumed by the asof - // join node + // One of the inputs is gated and was released. The other two will eventually be resumed + // by the asof join node for (size_t i = 0; i < source_configs.size(); i++) { const auto& counters = bp_counters[i]; if (!source_configs[i].is_gated) { ASSERT_GE(counters.resume_count, 0); } } - - ASSERT_TRUE(false); } TEST(AsofJoinTest, BackpressureWithBatches) { - return TestBackpressure(MakeIntegerBatches, /*num_batches=*/20, /*batch_size=*/1); + // Give the first right hand table a delay to stress test race conditions + return TestBackpressure(MakeIntegerBatches, /*batch_size=*/1, /*num_l_batches=*/20, + /*num_r0_batches=*/50, /*num_r1_batches=*/20, /*slow_r0=*/true); } template @@ -1621,7 +1608,10 @@ TEST(AsofJoinTest, BackpressureWithBatchesGen) { GTEST_SKIP() << "Skipping - see GH-36331"; int num_batches = GetEnvValue("ARROW_BACKPRESSURE_DEMO_NUM_BATCHES", 20); int batch_size = GetEnvValue("ARROW_BACKPRESSURE_DEMO_BATCH_SIZE", 1); - return TestBackpressure(MakeIntegerBatchGenForTest, num_batches, batch_size); + return TestBackpressure(MakeIntegerBatchGenForTest, /*batch_size=*/batch_size, + /*num_l_batches=*/num_batches, + /*num_r0_batches=*/num_batches, /*num_r1_batches=*/num_batches, + /*slow_r0=*/false); } } // namespace acero diff --git a/cpp/src/arrow/acero/options.h b/cpp/src/arrow/acero/options.h index 6d73010a2a2e2..1ede3fbfc8ed0 100644 --- a/cpp/src/arrow/acero/options.h +++ b/cpp/src/arrow/acero/options.h @@ -710,8 +710,7 @@ class ARROW_ACERO_EXPORT AsofJoinNodeOptions : public ExecNodeOptions { std::vector by_key; }; - AsofJoinNodeOptions(std::vector input_keys, int64_t tolerance, - std::function finishCallback = {}) + AsofJoinNodeOptions(std::vector input_keys, int64_t tolerance) : input_keys(std::move(input_keys)), tolerance(tolerance) {} /// \brief AsofJoin keys per input table. At least two keys must be given. The first key @@ -728,9 +727,6 @@ class ARROW_ACERO_EXPORT AsofJoinNodeOptions : public ExecNodeOptions { /// /// The tolerance is interpreted in the same units as the "on" key. int64_t tolerance; - - /// \brief TODO - std::function finishedCallback; }; /// \brief a node which select top_k/bottom_k rows passed through it diff --git a/cpp/src/arrow/acero/source_node.cc b/cpp/src/arrow/acero/source_node.cc index d383bc800cf28..8060e01f074f8 100644 --- a/cpp/src/arrow/acero/source_node.cc +++ b/cpp/src/arrow/acero/source_node.cc @@ -19,7 +19,6 @@ #include #include -#include #include "arrow/acero/exec_plan.h" #include "arrow/acero/options.h" #include "arrow/acero/query_context.h" @@ -245,13 +244,10 @@ struct SourceNode : ExecNode, public TracedNode { // Could happen if we get something like Pause(1) Pause(3) Resume(2) return; } - - std::cout << "Pausing " << counter << " bp counter " << backpressure_counter_ << std::endl; backpressure_future_ = Future<>::Make(); } void ResumeProducing(ExecNode* output, int32_t counter) override { - std::cout << "Resuming " << counter << " bp counter " << backpressure_counter_ << std::endl; Future<> to_finish; { std::lock_guard lg(mutex_); diff --git a/cpp/src/arrow/acero/test_nodes.cc b/cpp/src/arrow/acero/test_nodes.cc index 9854f6a049089..e109afbe1bffb 100644 --- a/cpp/src/arrow/acero/test_nodes.cc +++ b/cpp/src/arrow/acero/test_nodes.cc @@ -298,25 +298,11 @@ struct GatedNode : public ExecNode, public TracedNode { } void PauseProducing(ExecNode* output, int32_t counter) override { - std::cout << "PAUSE" << std::endl; - std::lock_guard lg(mutex_); inputs_[0]->PauseProducing(this, counter); - if (!backpressure_future_.is_finished()) { - return; - } - backpressure_future_ = Future<>::Make(); } void ResumeProducing(ExecNode* output, int32_t counter) override { - std::cout << "RESUME" << std::endl; - Future<> to_finish; - { - std::lock_guard lg(mutex_); - inputs_[0]->ResumeProducing(this, counter); - to_finish = backpressure_future_; - backpressure_future_ = Future<>::MakeFinished(); - } - to_finish.MarkFinished(); + inputs_[0]->ResumeProducing(this, counter); } Status StopProducingImpl() override { return Status::OK(); } @@ -339,32 +325,11 @@ struct GatedNode : public ExecNode, public TracedNode { if (callback_added) { break; } - - //if (!backpressure_future_.is_finished()) { - // backpressure_future_.Wait(); - //} - callback_added = backpressure_future_.TryAddCallback([this] { - return [this](const Status& st) { - DCHECK_OK(st); - plan_->query_context()->ScheduleTask( - [this] { - std::unique_lock lk(mutex_); - return SendBatchesUnlocked(std::move(lk)); - }, - "GatedNode::BackpressureApplied"); - }; - }); - if (callback_added) { - break; - } - // Otherwise, the future is already finished which means the gate is unlocked // and we are allowed to send a batch ExecBatch next = std::move(queued_batches_.front()); queued_batches_.pop(); lock.unlock(); - - // TODO: this deadlocks ARROW_RETURN_NOT_OK(output_->InputReceived(this, std::move(next))); lock.lock(); } @@ -386,10 +351,6 @@ struct GatedNode : public ExecNode, public TracedNode { Gate* gate_; std::queue queued_batches_; std::mutex mutex_; - // We preemptively pause inputs to more aggressively cover - // race conditions in tests, since source nodes may send - // extra batches before inputs are paused - Future<> backpressure_future_ = Future<>::MakeFinished(); }; } // namespace diff --git a/cpp/src/arrow/acero/test_nodes.h b/cpp/src/arrow/acero/test_nodes.h index 7e31aa31b34d7..fd9df4f14b0f6 100644 --- a/cpp/src/arrow/acero/test_nodes.h +++ b/cpp/src/arrow/acero/test_nodes.h @@ -74,8 +74,10 @@ class Gate { // A node that holds all input batches until a given gate is released struct GatedNodeOptions : public ExecNodeOptions { - explicit GatedNodeOptions(Gate* gate) : gate(gate) {} + explicit GatedNodeOptions(Gate* gate, bool input_finished_at_end = false) + : gate(gate), input_finished_at_end(input_finished_at_end) {} Gate* gate; + bool input_finished_at_end; static constexpr std::string_view kName = "gated"; }; From d24b10213f00c360a99a0dae0ad53b4904daf1c7 Mon Sep 17 00:00:00 2001 From: Jeremy Aguilon Date: Fri, 22 Sep 2023 21:43:20 -0400 Subject: [PATCH 04/10] . --- cpp/src/arrow/acero/test_nodes.h | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/cpp/src/arrow/acero/test_nodes.h b/cpp/src/arrow/acero/test_nodes.h index fd9df4f14b0f6..7e31aa31b34d7 100644 --- a/cpp/src/arrow/acero/test_nodes.h +++ b/cpp/src/arrow/acero/test_nodes.h @@ -74,10 +74,8 @@ class Gate { // A node that holds all input batches until a given gate is released struct GatedNodeOptions : public ExecNodeOptions { - explicit GatedNodeOptions(Gate* gate, bool input_finished_at_end = false) - : gate(gate), input_finished_at_end(input_finished_at_end) {} + explicit GatedNodeOptions(Gate* gate) : gate(gate) {} Gate* gate; - bool input_finished_at_end; static constexpr std::string_view kName = "gated"; }; From 78c6d79caa3c9d3bb6992c648acec144f6cc3045 Mon Sep 17 00:00:00 2001 From: Jeremy Aguilon Date: Fri, 22 Sep 2023 22:05:52 -0400 Subject: [PATCH 05/10] . --- cpp/src/arrow/acero/asof_join_node.cc | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/cpp/src/arrow/acero/asof_join_node.cc b/cpp/src/arrow/acero/asof_join_node.cc index 94dd112792d00..d77b99ec5cd12 100644 --- a/cpp/src/arrow/acero/asof_join_node.cc +++ b/cpp/src/arrow/acero/asof_join_node.cc @@ -1707,6 +1707,11 @@ class AsofJoinNode : public ExecNode { // InputReceived may be called after execution was finished. Pushing it to the // InputState may cause the BackPressureController to pause the input, causing a // deadlock + if (process_task_.is_finished()) { + DEBUG_SYNC(this, "Input received while done. Short circuiting.", + DEBUG_MANIP(std::endl)); + return Status::OK(); + } // Get the input ARROW_DCHECK(std_has(inputs_, input)); @@ -1714,11 +1719,6 @@ class AsofJoinNode : public ExecNode { // Put into the queue auto rb = *batch.ToRecordBatch(input->output_schema()); - if (process_task_.is_finished()) { - DEBUG_SYNC(this, "Input received while done. Short circuiting.", - DEBUG_MANIP(std::endl)); - return Status::OK(); - } ARROW_RETURN_NOT_OK(state_.at(k)->Push(rb)); process_.Push(true); From fb3a1f4432fec7f22718fadc9d99bd46d4c69ad4 Mon Sep 17 00:00:00 2001 From: Jeremy Aguilon Date: Fri, 22 Sep 2023 22:06:21 -0400 Subject: [PATCH 06/10] . --- cpp/src/arrow/acero/asof_join_node.cc | 2 ++ 1 file changed, 2 insertions(+) diff --git a/cpp/src/arrow/acero/asof_join_node.cc b/cpp/src/arrow/acero/asof_join_node.cc index d77b99ec5cd12..9ad459e97e755 100644 --- a/cpp/src/arrow/acero/asof_join_node.cc +++ b/cpp/src/arrow/acero/asof_join_node.cc @@ -1719,6 +1719,8 @@ class AsofJoinNode : public ExecNode { // Put into the queue auto rb = *batch.ToRecordBatch(input->output_schema()); + DEBUG_SYNC(this, "received batch from input ", k, ":", DEBUG_MANIP(std::endl), + rb->ToString(), DEBUG_MANIP(std::endl)); ARROW_RETURN_NOT_OK(state_.at(k)->Push(rb)); process_.Push(true); From 72ec897331a923ff3dbb593731c924c4dc7899cb Mon Sep 17 00:00:00 2001 From: Jeremy Aguilon Date: Sun, 1 Oct 2023 01:17:49 -0400 Subject: [PATCH 07/10] Update asof_join_node.cc Co-authored-by: Benjamin Kietzman --- cpp/src/arrow/acero/asof_join_node.cc | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/cpp/src/arrow/acero/asof_join_node.cc b/cpp/src/arrow/acero/asof_join_node.cc index 9ad459e97e755..4f57fc13182b3 100644 --- a/cpp/src/arrow/acero/asof_join_node.cc +++ b/cpp/src/arrow/acero/asof_join_node.cc @@ -1343,10 +1343,7 @@ class AsofJoinNode : public ExecNode { st = output_->InputFinished(this, batches_produced_); } for (const auto& s : state_) { - auto shutdownResult = s->ForceShutdown(); - if (!shutdownResult.ok()) { - st = shutdownResult; - } + st &= s->ForceShutdown(); } })); } From ecfb84dd7481daa8148f2aa29d35276dacc1d763 Mon Sep 17 00:00:00 2001 From: Jeremy Aguilon Date: Sun, 1 Oct 2023 01:25:11 -0400 Subject: [PATCH 08/10] Update asof_join_node.cc Co-authored-by: Benjamin Kietzman --- cpp/src/arrow/acero/asof_join_node.cc | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/cpp/src/arrow/acero/asof_join_node.cc b/cpp/src/arrow/acero/asof_join_node.cc index 4f57fc13182b3..eeaf6dd1f46ae 100644 --- a/cpp/src/arrow/acero/asof_join_node.cc +++ b/cpp/src/arrow/acero/asof_join_node.cc @@ -1702,8 +1702,8 @@ class AsofJoinNode : public ExecNode { Status InputReceived(ExecNode* input, ExecBatch batch) override { // InputReceived may be called after execution was finished. Pushing it to the - // InputState may cause the BackPressureController to pause the input, causing a - // deadlock + // InputState is unnecessary since we're done (and anyway may cause the + // BackPressureController to pause the input, causing a deadlock), so drop it. if (process_task_.is_finished()) { DEBUG_SYNC(this, "Input received while done. Short circuiting.", DEBUG_MANIP(std::endl)); From be5cb058bc15b62b7d327f894a85efd6b99ff2b0 Mon Sep 17 00:00:00 2001 From: Jeremy Aguilon Date: Sun, 1 Oct 2023 10:05:25 -0400 Subject: [PATCH 09/10] Update asof_join_node.cc --- cpp/src/arrow/acero/asof_join_node.cc | 1 - 1 file changed, 1 deletion(-) diff --git a/cpp/src/arrow/acero/asof_join_node.cc b/cpp/src/arrow/acero/asof_join_node.cc index eeaf6dd1f46ae..d19d2db299cba 100644 --- a/cpp/src/arrow/acero/asof_join_node.cc +++ b/cpp/src/arrow/acero/asof_join_node.cc @@ -19,7 +19,6 @@ #include #include -#include #include #include #include From 529e3b5acf5623f43397b6de81cb3710283eb9d6 Mon Sep 17 00:00:00 2001 From: Jeremy Aguilon Date: Mon, 23 Oct 2023 15:05:38 -0400 Subject: [PATCH 10/10] trigger GitHub actions