Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GH-37796: [C++][Acero] Fix race condition caused by straggling input in the as-of-join node #37839

Merged
merged 10 commits into from
Oct 24, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 38 additions & 7 deletions cpp/src/arrow/acero/asof_join_node.cc
Original file line number Diff line number Diff line change
Expand Up @@ -549,15 +549,16 @@ class BackpressureController : public BackpressureControl {

class BackpressureHandler {
private:
BackpressureHandler(size_t low_threshold, size_t high_threshold,
BackpressureHandler(ExecNode* input, size_t low_threshold, size_t high_threshold,
std::unique_ptr<BackpressureControl> backpressure_control)
: low_threshold_(low_threshold),
: input_(input),
low_threshold_(low_threshold),
high_threshold_(high_threshold),
backpressure_control_(std::move(backpressure_control)) {}

public:
static Result<BackpressureHandler> Make(
size_t low_threshold, size_t high_threshold,
ExecNode* input, size_t low_threshold, size_t high_threshold,
std::unique_ptr<BackpressureControl> backpressure_control) {
if (low_threshold >= high_threshold) {
return Status::Invalid("low threshold (", low_threshold,
Expand All @@ -566,7 +567,7 @@ class BackpressureHandler {
if (backpressure_control == NULLPTR) {
return Status::Invalid("null backpressure control parameter");
}
BackpressureHandler backpressure_handler(low_threshold, high_threshold,
BackpressureHandler backpressure_handler(input, low_threshold, high_threshold,
std::move(backpressure_control));
return std::move(backpressure_handler);
}
Expand All @@ -579,7 +580,16 @@ class BackpressureHandler {
}
}

Status ForceShutdown() {
// It may be unintuitive to call Resume() here, but this is to avoid a deadlock.
// Since acero's executor won't terminate if any one node is paused, we need to
// force resume the node before stopping production.
backpressure_control_->Resume();
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps one thing to clarify is whether ResumeInput behaves idempotently? I.e., is it OK to always call resume, even though only some inputs hit this PauseInput race condition?

My perusal of source_node.cc tells me this is OK, but LMK if this is a poor assumption to make.

return input_->StopProducing();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So if I understand correctly this means we will call StopProducing on all right hand side nodes once:

  • The left hand side has finished
  • The right hand side has caught up

If so, then I agree this is a valid thing to do.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep.

As an aside, I feel like a more invasive change could fix this issue in the general case. If a node (in this example asof join) has:

  • Called output->InputFinished() AND
  • Called output_->InputReceived for however many record batches it advertised on InputFinished

We should be able to shut down execution, even if the node's inputs:

  • are paused or
  • not done streaming
  • haven't called InputFinished

But I think this is a more invasive change to exec_plan.h and might have some hairy issues that I'm not thinking of.

}

private:
ExecNode* input_;
size_t low_threshold_;
size_t high_threshold_;
std::unique_ptr<BackpressureControl> backpressure_control_;
Expand Down Expand Up @@ -629,6 +639,8 @@ class BackpressureConcurrentQueue : public ConcurrentQueue<T> {
return ConcurrentQueue<T>::TryPopUnlocked();
}

Status ForceShutdown() { return handler_.ForceShutdown(); }

private:
BackpressureHandler handler_;
};
Expand Down Expand Up @@ -672,9 +684,9 @@ class InputState {
std::unique_ptr<BackpressureControl> backpressure_control =
std::make_unique<BackpressureController>(
/*node=*/asof_input, /*output=*/asof_node, backpressure_counter);
ARROW_ASSIGN_OR_RAISE(auto handler,
BackpressureHandler::Make(low_threshold, high_threshold,
std::move(backpressure_control)));
ARROW_ASSIGN_OR_RAISE(
auto handler, BackpressureHandler::Make(asof_input, low_threshold, high_threshold,
std::move(backpressure_control)));
return std::make_unique<InputState>(index, tolerance, must_hash, may_rehash,
key_hasher, asof_node, std::move(handler), schema,
time_col_index, key_col_index);
Expand Down Expand Up @@ -930,6 +942,12 @@ class InputState {
total_batches_ = n;
}

Status ForceShutdown() {
// Force the upstream input node to unpause. Necessary to avoid deadlock when we
// terminate the process thread
return queue_.ForceShutdown();
}

private:
// Pending record batches. The latest is the front. Batches cannot be empty.
BackpressureConcurrentQueue<std::shared_ptr<RecordBatch>> queue_;
Expand Down Expand Up @@ -1323,6 +1341,9 @@ class AsofJoinNode : public ExecNode {
if (st.ok()) {
st = output_->InputFinished(this, batches_produced_);
}
for (const auto& s : state_) {
st &= s->ForceShutdown();
}
}));
}

Expand Down Expand Up @@ -1679,6 +1700,15 @@ class AsofJoinNode : public ExecNode {
const Ordering& ordering() const override { return ordering_; }

Status InputReceived(ExecNode* input, ExecBatch batch) override {
// InputReceived may be called after execution was finished. Pushing it to the
// InputState is unnecessary since we're done (and anyway may cause the
// BackPressureController to pause the input, causing a deadlock), so drop it.
if (process_task_.is_finished()) {
DEBUG_SYNC(this, "Input received while done. Short circuiting.",
DEBUG_MANIP(std::endl));
return Status::OK();
}

// Get the input
ARROW_DCHECK(std_has(inputs_, input));
size_t k = std_find(inputs_, input) - inputs_.begin();
Expand All @@ -1687,6 +1717,7 @@ class AsofJoinNode : public ExecNode {
auto rb = *batch.ToRecordBatch(input->output_schema());
DEBUG_SYNC(this, "received batch from input ", k, ":", DEBUG_MANIP(std::endl),
rb->ToString(), DEBUG_MANIP(std::endl));

ARROW_RETURN_NOT_OK(state_.at(k)->Push(rb));
process_.Push(true);
return Status::OK();
Expand Down
56 changes: 36 additions & 20 deletions cpp/src/arrow/acero/asof_join_node_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1424,31 +1424,34 @@ AsyncGenerator<std::optional<ExecBatch>> GetGen(BatchesWithSchema bws) {
}

template <typename BatchesMaker>
void TestBackpressure(BatchesMaker maker, int num_batches, int batch_size) {
void TestBackpressure(BatchesMaker maker, int batch_size, int num_l_batches,
int num_r0_batches, int num_r1_batches, bool slow_r0) {
auto l_schema =
schema({field("time", int32()), field("key", int32()), field("l_value", int32())});
auto r0_schema =
schema({field("time", int32()), field("key", int32()), field("r0_value", int32())});
auto r1_schema =
schema({field("time", int32()), field("key", int32()), field("r1_value", int32())});

auto make_shift = [&maker, num_batches, batch_size](
const std::shared_ptr<Schema>& schema, int shift) {
auto make_shift = [&maker, batch_size](int num_batches,
const std::shared_ptr<Schema>& schema,
int shift) {
return maker({[](int row) -> int64_t { return row; },
[num_batches](int row) -> int64_t { return row / num_batches; },
[shift](int row) -> int64_t { return row * 10 + shift; }},
schema, num_batches, batch_size);
};
ASSERT_OK_AND_ASSIGN(auto l_batches, make_shift(l_schema, 0));
ASSERT_OK_AND_ASSIGN(auto r0_batches, make_shift(r0_schema, 1));
ASSERT_OK_AND_ASSIGN(auto r1_batches, make_shift(r1_schema, 2));
ASSERT_OK_AND_ASSIGN(auto l_batches, make_shift(num_l_batches, l_schema, 0));
ASSERT_OK_AND_ASSIGN(auto r0_batches, make_shift(num_r0_batches, r0_schema, 1));
ASSERT_OK_AND_ASSIGN(auto r1_batches, make_shift(num_r1_batches, r1_schema, 2));

BackpressureCountingNode::Register();
RegisterTestNodes(); // for GatedNode

struct BackpressureSourceConfig {
std::string name_prefix;
bool is_gated;
bool is_delayed;
std::shared_ptr<Schema> schema;
decltype(l_batches) batches;

Expand All @@ -1463,9 +1466,9 @@ void TestBackpressure(BatchesMaker maker, int num_batches, int batch_size) {

// Two ungated and one gated
std::vector<BackpressureSourceConfig> source_configs = {
{"0", false, l_schema, l_batches},
{"1", true, r0_schema, r0_batches},
{"2", false, r1_schema, r1_batches},
{"0", false, false, l_schema, l_batches},
{"1", true, slow_r0, r0_schema, r0_batches},
{"2", false, false, r1_schema, r1_batches},
};

std::vector<BackpressureCounters> bp_counters(source_configs.size());
Expand All @@ -1474,9 +1477,16 @@ void TestBackpressure(BatchesMaker maker, int num_batches, int batch_size) {
std::vector<Declaration::Input> bp_decls;
for (size_t i = 0; i < source_configs.size(); i++) {
const auto& config = source_configs[i];

src_decls.emplace_back("source",
SourceNodeOptions(config.schema, GetGen(config.batches)));
if (config.is_delayed) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I assume this new option triggers the deadlock on the unfixed code?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correct

src_decls.emplace_back(
"source",
SourceNodeOptions(config.schema, MakeDelayedGen(config.batches, "slow_source",
/*delay_sec=*/0.5,
/*noisy=*/false)));
} else {
src_decls.emplace_back("source",
SourceNodeOptions(config.schema, GetGen(config.batches)));
}
bp_options.push_back(
std::make_shared<BackpressureCountingNodeOptions>(&bp_counters[i]));
std::shared_ptr<ExecNodeOptions> options = bp_options.back();
Expand All @@ -1486,11 +1496,12 @@ void TestBackpressure(BatchesMaker maker, int num_batches, int batch_size) {
if (config.is_gated) {
bp_decl = {std::string{GatedNodeOptions::kName}, {bp_decl}, gate_options};
}
bp_decls.push_back(bp_decl);
bp_decls.emplace_back(bp_decl);
}

Declaration asofjoin = {"asofjoin", bp_decls,
GetRepeatedOptions(source_configs.size(), "time", {"key"}, 0)};
auto opts = GetRepeatedOptions(source_configs.size(), "time", {"key"}, 0);

Declaration asofjoin = {"asofjoin", bp_decls, opts};

ASSERT_OK_AND_ASSIGN(std::shared_ptr<internal::ThreadPool> tpool,
internal::ThreadPool::Make(1));
Expand All @@ -1512,14 +1523,14 @@ void TestBackpressure(BatchesMaker maker, int num_batches, int batch_size) {
return true;
};

BusyWait(10.0, has_bp_been_applied);
BusyWait(60.0, has_bp_been_applied);
ASSERT_TRUE(has_bp_been_applied());

gate.ReleaseAllBatches();
ASSERT_FINISHES_OK_AND_ASSIGN(BatchesWithCommonSchema batches, batches_fut);

// One of the inputs is gated. The other two will eventually be resumed by the asof
// join node
// One of the inputs is gated and was released. The other two will eventually be resumed
// by the asof join node
for (size_t i = 0; i < source_configs.size(); i++) {
const auto& counters = bp_counters[i];
if (!source_configs[i].is_gated) {
Expand All @@ -1529,7 +1540,9 @@ void TestBackpressure(BatchesMaker maker, int num_batches, int batch_size) {
}

TEST(AsofJoinTest, BackpressureWithBatches) {
return TestBackpressure(MakeIntegerBatches, /*num_batches=*/20, /*batch_size=*/1);
// Give the first right hand table a delay to stress test race conditions
return TestBackpressure(MakeIntegerBatches, /*batch_size=*/1, /*num_l_batches=*/20,
/*num_r0_batches=*/50, /*num_r1_batches=*/20, /*slow_r0=*/true);
}

template <typename BatchesMaker>
Expand Down Expand Up @@ -1595,7 +1608,10 @@ TEST(AsofJoinTest, BackpressureWithBatchesGen) {
GTEST_SKIP() << "Skipping - see GH-36331";
int num_batches = GetEnvValue("ARROW_BACKPRESSURE_DEMO_NUM_BATCHES", 20);
int batch_size = GetEnvValue("ARROW_BACKPRESSURE_DEMO_BATCH_SIZE", 1);
return TestBackpressure(MakeIntegerBatchGenForTest, num_batches, batch_size);
return TestBackpressure(MakeIntegerBatchGenForTest, /*batch_size=*/batch_size,
/*num_l_batches=*/num_batches,
/*num_r0_batches=*/num_batches, /*num_r1_batches=*/num_batches,
/*slow_r0=*/false);
}

} // namespace acero
Expand Down
Loading