diff --git a/cpp/src/arrow/acero/asof_join_node.cc b/cpp/src/arrow/acero/asof_join_node.cc index 48cc83dd3d6a9..1d94467df9ee2 100644 --- a/cpp/src/arrow/acero/asof_join_node.cc +++ b/cpp/src/arrow/acero/asof_join_node.cc @@ -548,8 +548,10 @@ class InputState { // true when the queue is empty and, when memo may have future entries (the case of a // positive tolerance), when the memo is empty. // used when checking whether RHS is up to date with LHS. - bool CurrentEmpty() const { - return memo_.no_future_ ? Empty() : memo_.times_.empty() && Empty(); + // NOTE: The emptiness must be decided by a single call to Empty() in caller, due to the + // potential race with Push(), see GH-41614. + bool CurrentEmpty(bool empty) const { + return memo_.no_future_ ? empty : (memo_.times_.empty() && empty); } // in case memo may not have future entries (the case of a non-positive tolerance), @@ -650,13 +652,15 @@ class InputState { // timestamp, update latest_time and latest_ref_row to the value that immediately pass // the horizon. Update the memo-store with any entries or future entries so observed. // Returns true if updates were made, false if not. - Result AdvanceAndMemoize(OnType ts) { + // NOTE: The emptiness must be decided by a single call to Empty() in caller, due to the + // potential race with Push(), see GH-41614. + Result AdvanceAndMemoize(OnType ts, bool empty) { // Advance the right side row index until we reach the latest right row (for each key) // for the given left timestamp. DEBUG_SYNC(node_, "Advancing input ", index_, DEBUG_MANIP(std::endl)); // Check if already updated for TS (or if there is no latest) - if (Empty()) { // can't advance if empty and no future entries + if (empty) { // can't advance if empty and no future entries return memo_.no_future_ ? false : memo_.RemoveEntriesWithLesserTime(ts); } @@ -918,34 +922,46 @@ class CompositeTableBuilder { // guaranteeing this probability is below 1 in a billion. The fix is 128-bit hashing. // See ARROW-17653 class AsofJoinNode : public ExecNode { - // Advances the RHS as far as possible to be up to date for the current LHS timestamp - Result UpdateRhs() { + // A simple wrapper for the result of a single call to UpdateRhs(), identifying: + // 1) If any RHS has advanced. + // 2) If all RHS are up to date with LHS. + struct RhsUpdateState { + bool any_advanced; + bool all_up_to_date_with_lhs; + }; + // Advances the RHS as far as possible to be up to date for the current LHS timestamp, + // and checks if all RHS are up to date with LHS. The reason they have to be performed + // together is that they both depend on the emptiness of the RHS, which can be changed + // by Push() executing in another thread. + Result UpdateRhs() { auto& lhs = *state_.at(0); auto lhs_latest_time = lhs.GetLatestTime(); - bool any_updated = false; - for (size_t i = 1; i < state_.size(); ++i) { - ARROW_ASSIGN_OR_RAISE(bool advanced, state_[i]->AdvanceAndMemoize(lhs_latest_time)); - any_updated |= advanced; - } - return any_updated; - } - - // Returns false if RHS not up to date for LHS - bool IsUpToDateWithLhsRow() const { - auto& lhs = *state_[0]; - if (lhs.Empty()) return false; // can't proceed if nothing on the LHS - OnType lhs_ts = lhs.GetLatestTime(); + RhsUpdateState update_state{/*any_advanced=*/false, /*all_up_to_date_with_lhs=*/true}; for (size_t i = 1; i < state_.size(); ++i) { auto& rhs = *state_[i]; - if (!rhs.Finished()) { + + // Obtain RHS emptiness once for subsequent AdvanceAndMemoize() and CurrentEmpty(). + bool rhs_empty = rhs.Empty(); + // Obtain RHS current time here because AdvanceAndMemoize() can change the + // emptiness. + OnType rhs_current_time = rhs_empty ? OnType{} : rhs.GetLatestTime(); + + ARROW_ASSIGN_OR_RAISE(bool advanced, + rhs.AdvanceAndMemoize(lhs_latest_time, rhs_empty)); + update_state.any_advanced |= advanced; + + if (update_state.all_up_to_date_with_lhs && !rhs.Finished()) { // If RHS is finished, then we know it's up to date - if (rhs.CurrentEmpty()) - return false; // RHS isn't finished, but is empty --> not up to date - if (lhs_ts > rhs.GetCurrentTime()) - return false; // RHS isn't up to date (and not finished) + if (rhs.CurrentEmpty(rhs_empty)) { + // RHS isn't finished, but is empty --> not up to date + update_state.all_up_to_date_with_lhs = false; + } else if (lhs_latest_time > rhs_current_time) { + // RHS isn't up to date (and not finished) + update_state.all_up_to_date_with_lhs = false; + } } } - return true; + return update_state; } Result> ProcessInner() { @@ -963,20 +979,19 @@ class AsofJoinNode : public ExecNode { // If LHS is finished or empty then there's nothing we can do here if (lhs.Finished() || lhs.Empty()) break; - // Advance each of the RHS as far as possible to be up to date for the LHS timestamp - ARROW_ASSIGN_OR_RAISE(bool any_rhs_advanced, UpdateRhs()); + ARROW_ASSIGN_OR_RAISE(auto rhs_update_state, UpdateRhs()); // If we have received enough inputs to produce the next output batch // (decided by IsUpToDateWithLhsRow), we will perform the join and // materialize the output batch. The join is done by advancing through // the LHS and adding joined row to rows_ (done by Emplace). Finally, // input batches that are no longer needed are removed to free up memory. - if (IsUpToDateWithLhsRow()) { + if (rhs_update_state.all_up_to_date_with_lhs) { dst.Emplace(state_, tolerance_); ARROW_ASSIGN_OR_RAISE(bool advanced, lhs.Advance()); if (!advanced) break; // if we can't advance LHS, we're done for this batch } else { - if (!any_rhs_advanced) break; // need to wait for new data + if (!rhs_update_state.any_advanced) break; // need to wait for new data } } diff --git a/cpp/src/arrow/acero/asof_join_node_test.cc b/cpp/src/arrow/acero/asof_join_node_test.cc index d95d2aaad3643..051e280a4c53c 100644 --- a/cpp/src/arrow/acero/asof_join_node_test.cc +++ b/cpp/src/arrow/acero/asof_join_node_test.cc @@ -1678,5 +1678,59 @@ TEST(AsofJoinTest, BackpressureWithBatchesGen) { /*slow_r0=*/false); } +// Reproduction of GH-40675: A logical race between Process() and Push() that can be more +// easily observed with single small batch. +TEST(AsofJoinTest, RhsEmptinessRace) { + auto left_batch = ExecBatchFromJSON( + {int64(), utf8()}, R"([[1, "a"], [1, "b"], [5, "a"], [6, "b"], [7, "f"]])"); + auto right_batch = ExecBatchFromJSON( + {int64(), utf8(), float64()}, R"([[2, "a", 1.0], [9, "b", 3.0], [15, "g", 5.0]])"); + + Declaration left{ + "exec_batch_source", + ExecBatchSourceNodeOptions(schema({field("colA", int64()), field("col2", utf8())}), + {std::move(left_batch)})}; + Declaration right{ + "exec_batch_source", + ExecBatchSourceNodeOptions(schema({field("colB", int64()), field("col3", utf8()), + field("colC", float64())}), + {std::move(right_batch)})}; + AsofJoinNodeOptions asof_join_opts({{{"colA"}, {{"col2"}}}, {{"colB"}, {{"col3"}}}}, 1); + Declaration asof_join{ + "asofjoin", {std::move(left), std::move(right)}, std::move(asof_join_opts)}; + + ASSERT_OK_AND_ASSIGN(auto result, DeclarationToExecBatches(std::move(asof_join))); + + auto exp_batch = ExecBatchFromJSON( + {int64(), utf8(), float64()}, + R"([[1, "a", 1.0], [1, "b", null], [5, "a", null], [6, "b", null], [7, "f", null]])"); + AssertExecBatchesEqualIgnoringOrder(result.schema, {exp_batch}, result.batches); +} + +// Reproduction of GH-41149: Another case of the same root cause as GH-40675, but with +// empty "by" columns. +TEST(AsofJoinTest, RhsEmptinessRaceEmptyBy) { + auto left_batch = ExecBatchFromJSON({int64()}, R"([[1], [2], [3]])"); + auto right_batch = + ExecBatchFromJSON({utf8(), int64()}, R"([["Z", 2], ["B", 3], ["A", 4]])"); + + Declaration left{"exec_batch_source", + ExecBatchSourceNodeOptions(schema({field("on", int64())}), + {std::move(left_batch)})}; + Declaration right{ + "exec_batch_source", + ExecBatchSourceNodeOptions(schema({field("colVals", utf8()), field("on", int64())}), + {std::move(right_batch)})}; + AsofJoinNodeOptions asof_join_opts({{{"on"}, {}}, {{"on"}, {}}}, 1); + Declaration asof_join{ + "asofjoin", {std::move(left), std::move(right)}, std::move(asof_join_opts)}; + + ASSERT_OK_AND_ASSIGN(auto result, DeclarationToExecBatches(std::move(asof_join))); + + auto exp_batch = + ExecBatchFromJSON({int64(), utf8()}, R"([[1, "Z"], [2, "Z"], [3, "B"]])"); + AssertExecBatchesEqualIgnoringOrder(result.schema, {exp_batch}, result.batches); +} + } // namespace acero } // namespace arrow