Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ARROW-17252: [R] Intermittent valgrind failure #13746

Closed
wants to merge 8 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion r/R/compute.R
Original file line number Diff line number Diff line change
Expand Up @@ -344,7 +344,7 @@ cast_options <- function(safe = TRUE, ...) {
#' @return `NULL`, invisibly
#' @export
#'
#' @examplesIf arrow_with_dataset()
#' @examplesIf FALSE && arrow_with_dataset()
#' library(dplyr, warn.conflicts = FALSE)
#'
#' some_model <- lm(mpg ~ disp + cyl, data = mtcars)
Expand Down
9 changes: 8 additions & 1 deletion r/R/query-engine.R
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,9 @@ ExecPlan <- R6Class("ExecPlan",
node
},
Run = function(node, as_table = FALSE) {
table_at_end <- as_table
as_table <- FALSE

# a section of this code is used by `BuildAndShow()` too - the 2 need to be in sync
# Start of chunk used in `BuildAndShow()`
assert_is(node, "ExecNode")
Expand Down Expand Up @@ -266,7 +269,11 @@ ExecPlan <- R6Class("ExecPlan",
}
}

out
if (table_at_end && !inherits(out, "Table")) {
out$read_table()
} else {
out
}
},
Write = function(node, ...) {
# TODO(ARROW-16200): take FileSystemDatasetWriteOptions not ...
Expand Down
2 changes: 1 addition & 1 deletion r/man/register_scalar_function.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

130 changes: 112 additions & 18 deletions r/src/compute-exec.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -129,25 +129,125 @@ std::shared_ptr<arrow::RecordBatchReader> ExecPlan_run(
const std::shared_ptr<compute::ExecPlan>& plan,
const std::shared_ptr<compute::ExecNode>& final_node, cpp11::list sort_options,
cpp11::strings metadata, int64_t head = -1) {
auto prepared_plan = ExecPlan_prepare(plan, final_node, sort_options, metadata, head);
StopIfNotOk(prepared_plan.first->StartProducing());
return prepared_plan.second;
// For now, don't require R to construct SinkNodes.
// Instead, just pass the node we should collect as an argument.
arrow::AsyncGenerator<arrow::util::optional<compute::ExecBatch>> sink_gen;

// Sorting uses a different sink node; there is no general sort yet
if (sort_options.size() > 0) {
if (head >= 0) {
// Use the SelectK node to take only what we need
MakeExecNodeOrStop(
"select_k_sink", plan.get(), {final_node.get()},
compute::SelectKSinkNodeOptions{
arrow::compute::SelectKOptions(
head, std::dynamic_pointer_cast<compute::SortOptions>(
make_compute_options("sort_indices", sort_options))
->sort_keys),
&sink_gen});
} else {
MakeExecNodeOrStop("order_by_sink", plan.get(), {final_node.get()},
compute::OrderBySinkNodeOptions{
*std::dynamic_pointer_cast<compute::SortOptions>(
make_compute_options("sort_indices", sort_options)),
&sink_gen});
}
} else {
MakeExecNodeOrStop("sink", plan.get(), {final_node.get()},
compute::SinkNodeOptions{&sink_gen});
}

// End of chunk used in ExecPlan_BuildAndShow

StopIfNotOk(plan->Validate());

// If the generator is destroyed before being completely drained, inform plan
std::shared_ptr<void> stop_producing{nullptr, [plan](...) {
bool not_finished_yet =
plan->finished().TryAddCallback([&plan] {
return [plan](const arrow::Status&) {};
});

if (not_finished_yet) {
plan->StopProducing();
}
}};

// Attach metadata to the schema
auto out_schema = final_node->output_schema();
if (metadata.size() > 0) {
auto kv = strings_to_kvm(metadata);
out_schema = out_schema->WithMetadata(kv);
}

StopIfNotOk(plan->StartProducing());
return compute::MakeGeneratorReader(
out_schema, [stop_producing, plan, sink_gen] { return sink_gen(); },
gc_memory_pool());
}

// [[arrow::export]]
std::shared_ptr<arrow::Table> ExecPlan_read_table(
const std::shared_ptr<compute::ExecPlan>& plan,
const std::shared_ptr<compute::ExecNode>& final_node, cpp11::list sort_options,
cpp11::strings metadata, int64_t head = -1) {
auto prepared_plan = ExecPlan_prepare(plan, final_node, sort_options, metadata, head);
// For now, don't require R to construct SinkNodes.
// Instead, just pass the node we should collect as an argument.
arrow::AsyncGenerator<arrow::util::optional<compute::ExecBatch>> sink_gen;

auto result = RunWithCapturedRIfPossible<std::shared_ptr<arrow::Table>>(
[&]() -> arrow::Result<std::shared_ptr<arrow::Table>> {
ARROW_RETURN_NOT_OK(prepared_plan.first->StartProducing());
return prepared_plan.second->ToTable();
});
// Sorting uses a different sink node; there is no general sort yet
if (sort_options.size() > 0) {
if (head >= 0) {
// Use the SelectK node to take only what we need
MakeExecNodeOrStop(
"select_k_sink", plan.get(), {final_node.get()},
compute::SelectKSinkNodeOptions{
arrow::compute::SelectKOptions(
head, std::dynamic_pointer_cast<compute::SortOptions>(
make_compute_options("sort_indices", sort_options))
->sort_keys),
&sink_gen});
} else {
MakeExecNodeOrStop("order_by_sink", plan.get(), {final_node.get()},
compute::OrderBySinkNodeOptions{
*std::dynamic_pointer_cast<compute::SortOptions>(
make_compute_options("sort_indices", sort_options)),
&sink_gen});
}
} else {
MakeExecNodeOrStop("sink", plan.get(), {final_node.get()},
compute::SinkNodeOptions{&sink_gen});
}

// End of chunk used in ExecPlan_BuildAndShow

StopIfNotOk(plan->Validate());

return ValueOrStop(result);
// If the generator is destroyed before being completely drained, inform plan
std::shared_ptr<void> stop_producing{nullptr, [plan](...) {
bool not_finished_yet =
plan->finished().TryAddCallback([&plan] {
return [plan](const arrow::Status&) {};
});

if (not_finished_yet) {
plan->StopProducing();
}
}};

// Attach metadata to the schema
auto out_schema = final_node->output_schema();
if (metadata.size() > 0) {
auto kv = strings_to_kvm(metadata);
out_schema = out_schema->WithMetadata(kv);
}

StopIfNotOk(plan->StartProducing());
auto reader = compute::MakeGeneratorReader(
out_schema, [stop_producing, plan, sink_gen] { return sink_gen(); },
gc_memory_pool());

return ValueOrStop(reader->ToTable());
}

// [[arrow::export]]
Expand Down Expand Up @@ -272,14 +372,8 @@ void ExecPlan_Write(
ds::WriteNodeOptions{std::move(opts), std::move(kv)});

StopIfNotOk(plan->Validate());

arrow::Status result = RunWithCapturedRIfPossibleVoid([&]() {
RETURN_NOT_OK(plan->StartProducing());
RETURN_NOT_OK(plan->finished().status());
return arrow::Status::OK();
});

StopIfNotOk(result);
StopIfNotOk(plan->StartProducing());
StopIfNotOk(plan->finished().status());
}

#endif
Expand Down
13 changes: 7 additions & 6 deletions r/src/safe-call-into-r.h
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ arrow::Result<T> RunWithCapturedR(std::function<arrow::Future<T>()> make_arrow_c
GetMainRThread().ResetError();

arrow::Result<T> result = arrow::internal::SerialExecutor::RunInSerialExecutor<T>(
[make_arrow_call](arrow::internal::Executor* executor) {
[&make_arrow_call](arrow::internal::Executor* executor) {
GetMainRThread().Executor() = executor;
return make_arrow_call();
});
Expand All @@ -198,7 +198,7 @@ arrow::Result<T> RunWithCapturedRIfPossible(
// Note that the use of the io_context here is arbitrary (i.e. we could use
// any construct that launches a background thread).
const auto& io_context = arrow::io::default_io_context();
return RunWithCapturedR<T>([&]() {
return RunWithCapturedR<T>([&make_arrow_call, io_context]() {
return DeferNotOk(io_context.executor()->Submit(std::move(make_arrow_call)));
});
} else {
Expand All @@ -210,10 +210,11 @@ arrow::Result<T> RunWithCapturedRIfPossible(
// a Result.
static inline arrow::Status RunWithCapturedRIfPossibleVoid(
std::function<arrow::Status()> make_arrow_call) {
auto result = RunWithCapturedRIfPossible<bool>([&]() -> arrow::Result<bool> {
ARROW_RETURN_NOT_OK(make_arrow_call());
return true;
});
auto result =
RunWithCapturedRIfPossible<bool>([&make_arrow_call]() -> arrow::Result<bool> {
ARROW_RETURN_NOT_OK(make_arrow_call());
return true;
});
ARROW_RETURN_NOT_OK(result);
return arrow::Status::OK();
}
Expand Down
4 changes: 4 additions & 0 deletions r/tests/testthat/test-compute.R
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,8 @@ test_that("register_scalar_function() adds a compute function to the registry",
Scalar$create(32L, float64())
)

skip("while testing valgrind errors")

expect_identical(
record_batch(a = 1L) %>%
dplyr::mutate(b = times_32(a)) %>%
Expand Down Expand Up @@ -206,6 +208,7 @@ test_that("register_user_defined_function() errors for unsupported specification
})

test_that("user-defined functions work during multi-threaded execution", {
skip("while testing valgrind errors")
skip_if_not(CanRunWithCapturedR())
skip_if_not_available("dataset")
# Snappy has a UBSan issue: https://github.com/google/snappy/pull/148
Expand Down Expand Up @@ -258,6 +261,7 @@ test_that("user-defined functions work during multi-threaded execution", {
})

test_that("user-defined error when called from an unsupported context", {
skip("while testing valgrind errors")
skip_if_not_available("dataset")
skip_if_not(CanRunWithCapturedR())

Expand Down