Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ARROW-17252: [R] Intermittent valgrind failure #13773

Merged
merged 8 commits into from
Aug 9, 2022
9 changes: 8 additions & 1 deletion r/R/compute.R
Original file line number Diff line number Diff line change
Expand Up @@ -344,7 +344,7 @@ cast_options <- function(safe = TRUE, ...) {
#' @return `NULL`, invisibly
#' @export
#'
#' @examplesIf arrow_with_dataset()
#' @examplesIf arrow_with_dataset() && identical(Sys.getenv("NOT_CRAN"), "true")
#' library(dplyr, warn.conflicts = FALSE)
#'
#' some_model <- lm(mpg ~ disp + cyl, data = mtcars)
Expand Down Expand Up @@ -385,6 +385,13 @@ register_scalar_function <- function(name, fun, in_type, out_type,
update_cache = TRUE
)

# User-defined functions require some special handling
# in the query engine which currently require an opt-in using
# the R_ARROW_COLLECT_WITH_UDF environment variable while this
# behaviour is stabilized.
# TODO(ARROW-17178) remove the need for this!
Sys.setenv(R_ARROW_COLLECT_WITH_UDF = "true")

invisible(NULL)
}

Expand Down
9 changes: 8 additions & 1 deletion r/R/table.R
Original file line number Diff line number Diff line change
Expand Up @@ -331,5 +331,12 @@ as_arrow_table.arrow_dplyr_query <- function(x, ...) {
# See query-engine.R for ExecPlan/Nodes
plan <- ExecPlan$create()
final_node <- plan$Build(x)
plan$Run(final_node, as_table = TRUE)

run_with_event_loop <- identical(
Sys.getenv("R_ARROW_COLLECT_WITH_UDF", ""),
"true"
)

result <- plan$Run(final_node, as_table = run_with_event_loop)
as_arrow_table(result)
}
2 changes: 1 addition & 1 deletion r/man/register_scalar_function.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

22 changes: 21 additions & 1 deletion r/tests/testthat/test-compute.R
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,9 @@ test_that("arrow_scalar_function() works with auto_convert = TRUE", {

test_that("register_scalar_function() adds a compute function to the registry", {
skip_if_not(CanRunWithCapturedR())
# TODO(ARROW-17178): User-defined function-friendly ExecPlan execution has
# occasional valgrind errors
skip_on_linux_devel()

register_scalar_function(
"times_32",
Expand Down Expand Up @@ -109,6 +112,8 @@ test_that("register_scalar_function() adds a compute function to the registry",
dplyr::collect(),
tibble::tibble(a = 1L, b = 32.0)
)

Sys.unsetenv("R_ARROW_COLLECT_WITH_UDF")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NBD but should this more properly go inside of on.exit above? like

on.exit({
  unregister_binding("times_32", update_cache = TRUE)
  # TODO(ARROW-17178) remove the need for this!
  Sys.unsetenv("R_ARROW_COLLECT_WITH_UDF")
})

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed!

})

test_that("arrow_scalar_function() with bad return type errors", {
Expand Down Expand Up @@ -143,6 +148,9 @@ test_that("arrow_scalar_function() with bad return type errors", {
call_function("times_32_bad_return_type_scalar", Array$create(1L)),
"Expected return Array or Scalar with type 'double'"
)

# TODO(ARROW-17178) remove the need for this!
Sys.unsetenv("R_ARROW_COLLECT_WITH_UDF")
})

test_that("register_user_defined_function() can register multiple kernels", {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
test_that("register_user_defined_function() can register multiple kernels", {
test_that("register_scalar_function() can register multiple kernels", {

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also below

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed!

Expand Down Expand Up @@ -203,12 +211,18 @@ test_that("register_user_defined_function() errors for unsupported specification
),
"Kernels for user-defined function must accept the same number of arguments"
)

# TODO(ARROW-17178) remove the need for this!
Sys.unsetenv("R_ARROW_COLLECT_WITH_UDF")
})

test_that("user-defined functions work during multi-threaded execution", {
skip_if_not(CanRunWithCapturedR())
skip_if_not_available("dataset")
# Snappy has a UBSan issue: https://github.com/google/snappy/pull/148
# Skip on linux devel because:
# TODO(ARROW-17283): Snappy has a UBSan issue that is fixed in the dev version
# TODO(ARROW-17178): User-defined function-friendly ExecPlan execution has
# occasional valgrind errors
skip_on_linux_devel()

n_rows <- 10000
Expand Down Expand Up @@ -255,6 +269,9 @@ test_that("user-defined functions work during multi-threaded execution", {
dplyr::collect()

expect_identical(result2$fun_result, example_df$value * 32)

# TODO(ARROW-17178) remove the need for this!
Sys.unsetenv("R_ARROW_COLLECT_WITH_UDF")
})

test_that("user-defined error when called from an unsupported context", {
Expand Down Expand Up @@ -304,4 +321,7 @@ test_that("user-defined error when called from an unsupported context", {
"Call to R \\(.*?\\) from a non-R thread from an unsupported context"
)
}

# TODO(ARROW-17178) remove the need for this!
Sys.unsetenv("R_ARROW_COLLECT_WITH_UDF")
})