Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ARROW-17252: [R] Intermittent valgrind failure #13773

Merged
merged 8 commits into from
Aug 9, 2022
7 changes: 6 additions & 1 deletion r/R/compute.R
Original file line number Diff line number Diff line change
Expand Up @@ -344,7 +344,7 @@ cast_options <- function(safe = TRUE, ...) {
#' @return `NULL`, invisibly
#' @export
#'
#' @examplesIf arrow_with_dataset()
#' @examplesIf arrow_with_dataset() && identical(Sys.getenv("R_ARROW_COLLECT_WITH_UDF"), "true")
paleolimbot marked this conversation as resolved.
Show resolved Hide resolved
#' library(dplyr, warn.conflicts = FALSE)
#'
#' some_model <- lm(mpg ~ disp + cyl, data = mtcars)
Expand All @@ -358,10 +358,15 @@ cast_options <- function(safe = TRUE, ...) {
#' auto_convert = TRUE
#' )
#'
#' # User-defined functions require some special handling
#' # in the query engine which currently require an opt-in using
#' # the R_ARROW_COLLECT_WITH_UDF environment variable.
#' Sys.setenv(R_ARROW_COLLECT_WITH_UDF = "true")
#' as_arrow_table(mtcars) %>%
#' transmute(mpg, mpg_predicted = mtcars_predict_mpg(disp, cyl)) %>%
#' collect() %>%
#' head()
#' Sys.unsetenv("R_ARROW_COLLECT_WITH_UDF")
#'
register_scalar_function <- function(name, fun, in_type, out_type,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What if you put Sys.setenv(R_ARROW_COLLECT_WITH_UDF = "true") inside of register_scalar_function()? You're already opting-in to UDFs by calling this function, and there's no reason you'd want to call this but not have COLLECT_WITH_UDF working.

auto_convert = FALSE) {
Expand Down
9 changes: 8 additions & 1 deletion r/R/table.R
Original file line number Diff line number Diff line change
Expand Up @@ -331,5 +331,12 @@ as_arrow_table.arrow_dplyr_query <- function(x, ...) {
# See query-engine.R for ExecPlan/Nodes
plan <- ExecPlan$create()
final_node <- plan$Build(x)
plan$Run(final_node, as_table = TRUE)

run_with_event_loop <- identical(
Sys.getenv("R_ARROW_COLLECT_WITH_UDF", ""),
"true"
)

result <- plan$Run(final_node, as_table = run_with_event_loop)
as_arrow_table(result)
}
7 changes: 6 additions & 1 deletion r/man/register_scalar_function.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

46 changes: 26 additions & 20 deletions r/tests/testthat/test-compute.R
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,8 @@ test_that("arrow_scalar_function() works with auto_convert = TRUE", {

test_that("register_scalar_function() adds a compute function to the registry", {
skip_if_not(CanRunWithCapturedR())
# until R_ARROW_COLLECT_WITH_UDF is no longer needed to slience valgrind
skip_on_linux_devel()

register_scalar_function(
"times_32",
Expand All @@ -103,12 +105,14 @@ test_that("register_scalar_function() adds a compute function to the registry",
Scalar$create(32L, float64())
)

expect_identical(
record_batch(a = 1L) %>%
dplyr::mutate(b = times_32(a)) %>%
dplyr::collect(),
tibble::tibble(a = 1L, b = 32.0)
)
withr::with_envvar(list(R_ARROW_COLLECT_WITH_UDF = "true"), {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can revert this

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we? To avoid the leaks for any subsequent tests we need the variable to not be "true".

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was thinking you can revert this because you've already called register_scalar_function() so the env var will be set.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see! with_envvar() was definitely not doing the right thing here (which is hopefully why the last valgrind check failed). I fixed it (I think) and requested another check!

expect_identical(
record_batch(a = 1L) %>%
dplyr::mutate(b = times_32(a)) %>%
dplyr::collect(),
tibble::tibble(a = 1L, b = 32.0)
)
})
})

test_that("arrow_scalar_function() with bad return type errors", {
Expand Down Expand Up @@ -237,24 +241,26 @@ test_that("user-defined functions work during multi-threaded execution", {
)
on.exit(unregister_binding("times_32", update_cache = TRUE))

# check a regular collect()
result <- open_dataset(tf_dataset) %>%
dplyr::mutate(fun_result = times_32(value)) %>%
dplyr::collect() %>%
dplyr::arrange(row_num)
withr::with_envvar(list(R_ARROW_COLLECT_WITH_UDF = "true"), {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You should be able to remove this now

# check a regular collect()
result <- open_dataset(tf_dataset) %>%
dplyr::mutate(fun_result = times_32(value)) %>%
dplyr::collect() %>%
dplyr::arrange(row_num)

expect_identical(result$fun_result, example_df$value * 32)
expect_identical(result$fun_result, example_df$value * 32)

# check a write_dataset()
open_dataset(tf_dataset) %>%
dplyr::mutate(fun_result = times_32(value)) %>%
write_dataset(tf_dest)
# check a write_dataset()
open_dataset(tf_dataset) %>%
dplyr::mutate(fun_result = times_32(value)) %>%
write_dataset(tf_dest)

result2 <- dplyr::collect(open_dataset(tf_dest)) %>%
dplyr::arrange(row_num) %>%
dplyr::collect()
result2 <- dplyr::collect(open_dataset(tf_dest)) %>%
dplyr::arrange(row_num) %>%
dplyr::collect()

expect_identical(result2$fun_result, example_df$value * 32)
expect_identical(result2$fun_result, example_df$value * 32)
})
})

test_that("user-defined error when called from an unsupported context", {
Expand Down