diff --git a/r/R/compute.R b/r/R/compute.R index 0985e73a5f2d3..111edd78a798b 100644 --- a/r/R/compute.R +++ b/r/R/compute.R @@ -344,7 +344,7 @@ cast_options <- function(safe = TRUE, ...) { #' @return `NULL`, invisibly #' @export #' -#' @examplesIf arrow_with_dataset() +#' @examplesIf FALSE && arrow_with_dataset() #' library(dplyr, warn.conflicts = FALSE) #' #' some_model <- lm(mpg ~ disp + cyl, data = mtcars) diff --git a/r/R/query-engine.R b/r/R/query-engine.R index 84360490fdbe7..4fefaaba6ccda 100644 --- a/r/R/query-engine.R +++ b/r/R/query-engine.R @@ -193,6 +193,9 @@ ExecPlan <- R6Class("ExecPlan", node }, Run = function(node, as_table = FALSE) { + table_at_end <- as_table + as_table <- FALSE + # a section of this code is used by `BuildAndShow()` too - the 2 need to be in sync # Start of chunk used in `BuildAndShow()` assert_is(node, "ExecNode") @@ -266,7 +269,11 @@ ExecPlan <- R6Class("ExecPlan", } } - out + if (table_at_end && !inherits(out, "Table")) { + out$read_table() + } else { + out + } }, Write = function(node, ...) { # TODO(ARROW-16200): take FileSystemDatasetWriteOptions not ... diff --git a/r/man/register_scalar_function.Rd b/r/man/register_scalar_function.Rd index 4da8f54f645b0..95ffcbbafe4a3 100644 --- a/r/man/register_scalar_function.Rd +++ b/r/man/register_scalar_function.Rd @@ -48,7 +48,7 @@ stateless and return output with the same shape (i.e., the same number of rows) as the input. } \examples{ -\dontshow{if (arrow_with_dataset()) (if (getRversion() >= "3.4") withAutoprint else force)(\{ # examplesIf} +\dontshow{if (FALSE && arrow_with_dataset()) (if (getRversion() >= "3.4") withAutoprint else force)(\{ # examplesIf} library(dplyr, warn.conflicts = FALSE) some_model <- lm(mpg ~ disp + cyl, data = mtcars) diff --git a/r/src/compute-exec.cpp b/r/src/compute-exec.cpp index 91d646f0a3c08..8d7abcc668d4f 100644 --- a/r/src/compute-exec.cpp +++ b/r/src/compute-exec.cpp @@ -129,9 +129,61 @@ std::shared_ptr ExecPlan_run( const std::shared_ptr& plan, const std::shared_ptr& final_node, cpp11::list sort_options, cpp11::strings metadata, int64_t head = -1) { - auto prepared_plan = ExecPlan_prepare(plan, final_node, sort_options, metadata, head); - StopIfNotOk(prepared_plan.first->StartProducing()); - return prepared_plan.second; + // For now, don't require R to construct SinkNodes. + // Instead, just pass the node we should collect as an argument. + arrow::AsyncGenerator> sink_gen; + + // Sorting uses a different sink node; there is no general sort yet + if (sort_options.size() > 0) { + if (head >= 0) { + // Use the SelectK node to take only what we need + MakeExecNodeOrStop( + "select_k_sink", plan.get(), {final_node.get()}, + compute::SelectKSinkNodeOptions{ + arrow::compute::SelectKOptions( + head, std::dynamic_pointer_cast( + make_compute_options("sort_indices", sort_options)) + ->sort_keys), + &sink_gen}); + } else { + MakeExecNodeOrStop("order_by_sink", plan.get(), {final_node.get()}, + compute::OrderBySinkNodeOptions{ + *std::dynamic_pointer_cast( + make_compute_options("sort_indices", sort_options)), + &sink_gen}); + } + } else { + MakeExecNodeOrStop("sink", plan.get(), {final_node.get()}, + compute::SinkNodeOptions{&sink_gen}); + } + + // End of chunk used in ExecPlan_BuildAndShow + + StopIfNotOk(plan->Validate()); + + // If the generator is destroyed before being completely drained, inform plan + std::shared_ptr stop_producing{nullptr, [plan](...) { + bool not_finished_yet = + plan->finished().TryAddCallback([&plan] { + return [plan](const arrow::Status&) {}; + }); + + if (not_finished_yet) { + plan->StopProducing(); + } + }}; + + // Attach metadata to the schema + auto out_schema = final_node->output_schema(); + if (metadata.size() > 0) { + auto kv = strings_to_kvm(metadata); + out_schema = out_schema->WithMetadata(kv); + } + + StopIfNotOk(plan->StartProducing()); + return compute::MakeGeneratorReader( + out_schema, [stop_producing, plan, sink_gen] { return sink_gen(); }, + gc_memory_pool()); } // [[arrow::export]] @@ -139,15 +191,63 @@ std::shared_ptr ExecPlan_read_table( const std::shared_ptr& plan, const std::shared_ptr& final_node, cpp11::list sort_options, cpp11::strings metadata, int64_t head = -1) { - auto prepared_plan = ExecPlan_prepare(plan, final_node, sort_options, metadata, head); + // For now, don't require R to construct SinkNodes. + // Instead, just pass the node we should collect as an argument. + arrow::AsyncGenerator> sink_gen; - auto result = RunWithCapturedRIfPossible>( - [&]() -> arrow::Result> { - ARROW_RETURN_NOT_OK(prepared_plan.first->StartProducing()); - return prepared_plan.second->ToTable(); - }); + // Sorting uses a different sink node; there is no general sort yet + if (sort_options.size() > 0) { + if (head >= 0) { + // Use the SelectK node to take only what we need + MakeExecNodeOrStop( + "select_k_sink", plan.get(), {final_node.get()}, + compute::SelectKSinkNodeOptions{ + arrow::compute::SelectKOptions( + head, std::dynamic_pointer_cast( + make_compute_options("sort_indices", sort_options)) + ->sort_keys), + &sink_gen}); + } else { + MakeExecNodeOrStop("order_by_sink", plan.get(), {final_node.get()}, + compute::OrderBySinkNodeOptions{ + *std::dynamic_pointer_cast( + make_compute_options("sort_indices", sort_options)), + &sink_gen}); + } + } else { + MakeExecNodeOrStop("sink", plan.get(), {final_node.get()}, + compute::SinkNodeOptions{&sink_gen}); + } + + // End of chunk used in ExecPlan_BuildAndShow + + StopIfNotOk(plan->Validate()); - return ValueOrStop(result); + // If the generator is destroyed before being completely drained, inform plan + std::shared_ptr stop_producing{nullptr, [plan](...) { + bool not_finished_yet = + plan->finished().TryAddCallback([&plan] { + return [plan](const arrow::Status&) {}; + }); + + if (not_finished_yet) { + plan->StopProducing(); + } + }}; + + // Attach metadata to the schema + auto out_schema = final_node->output_schema(); + if (metadata.size() > 0) { + auto kv = strings_to_kvm(metadata); + out_schema = out_schema->WithMetadata(kv); + } + + StopIfNotOk(plan->StartProducing()); + auto reader = compute::MakeGeneratorReader( + out_schema, [stop_producing, plan, sink_gen] { return sink_gen(); }, + gc_memory_pool()); + + return ValueOrStop(reader->ToTable()); } // [[arrow::export]] @@ -272,14 +372,8 @@ void ExecPlan_Write( ds::WriteNodeOptions{std::move(opts), std::move(kv)}); StopIfNotOk(plan->Validate()); - - arrow::Status result = RunWithCapturedRIfPossibleVoid([&]() { - RETURN_NOT_OK(plan->StartProducing()); - RETURN_NOT_OK(plan->finished().status()); - return arrow::Status::OK(); - }); - - StopIfNotOk(result); + StopIfNotOk(plan->StartProducing()); + StopIfNotOk(plan->finished().status()); } #endif diff --git a/r/src/safe-call-into-r.h b/r/src/safe-call-into-r.h index 08e8a8c11b6c5..98322af4316b8 100644 --- a/r/src/safe-call-into-r.h +++ b/r/src/safe-call-into-r.h @@ -174,7 +174,7 @@ arrow::Result RunWithCapturedR(std::function()> make_arrow_c GetMainRThread().ResetError(); arrow::Result result = arrow::internal::SerialExecutor::RunInSerialExecutor( - [make_arrow_call](arrow::internal::Executor* executor) { + [&make_arrow_call](arrow::internal::Executor* executor) { GetMainRThread().Executor() = executor; return make_arrow_call(); }); @@ -198,7 +198,7 @@ arrow::Result RunWithCapturedRIfPossible( // Note that the use of the io_context here is arbitrary (i.e. we could use // any construct that launches a background thread). const auto& io_context = arrow::io::default_io_context(); - return RunWithCapturedR([&]() { + return RunWithCapturedR([&make_arrow_call, io_context]() { return DeferNotOk(io_context.executor()->Submit(std::move(make_arrow_call))); }); } else { @@ -210,10 +210,11 @@ arrow::Result RunWithCapturedRIfPossible( // a Result. static inline arrow::Status RunWithCapturedRIfPossibleVoid( std::function make_arrow_call) { - auto result = RunWithCapturedRIfPossible([&]() -> arrow::Result { - ARROW_RETURN_NOT_OK(make_arrow_call()); - return true; - }); + auto result = + RunWithCapturedRIfPossible([&make_arrow_call]() -> arrow::Result { + ARROW_RETURN_NOT_OK(make_arrow_call()); + return true; + }); ARROW_RETURN_NOT_OK(result); return arrow::Status::OK(); } diff --git a/r/tests/testthat/test-compute.R b/r/tests/testthat/test-compute.R index 9e487169f4b15..a3bbd787962ac 100644 --- a/r/tests/testthat/test-compute.R +++ b/r/tests/testthat/test-compute.R @@ -103,6 +103,8 @@ test_that("register_scalar_function() adds a compute function to the registry", Scalar$create(32L, float64()) ) + skip("while testing valgrind errors") + expect_identical( record_batch(a = 1L) %>% dplyr::mutate(b = times_32(a)) %>% @@ -206,6 +208,7 @@ test_that("register_user_defined_function() errors for unsupported specification }) test_that("user-defined functions work during multi-threaded execution", { + skip("while testing valgrind errors") skip_if_not(CanRunWithCapturedR()) skip_if_not_available("dataset") # Snappy has a UBSan issue: https://github.com/google/snappy/pull/148 @@ -258,6 +261,7 @@ test_that("user-defined functions work during multi-threaded execution", { }) test_that("user-defined error when called from an unsupported context", { + skip("while testing valgrind errors") skip_if_not_available("dataset") skip_if_not(CanRunWithCapturedR())