Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ARROW-17178: [R] Support head() in arrow_dplyr_query with user-defined function #13706

Merged
merged 34 commits into from
Sep 16, 2022

Conversation

paleolimbot
Copy link
Member

@paleolimbot paleolimbot commented Jul 25, 2022

This PR adds support for more types of queries that include calls to R code (i.e., map_batches(..., .lazy = TRUE), user-defined functions in mutates, arranges, and filters, and custom extension type behaviour). Previously these queries failed because it wasn't possible to guarantee that the exec plan would be completely executed within a call to RunWithCapturedR() where we establish an event loop on the main R thread and launch a background thread to do "arrow stuff" that can queue tasks to run on the R thread.

The approach I took here was to stuff more of the ExecPlan-to-RecordBatchReader logic in a subclass of RecordBatchReader that doesn't call plan->StartProducing() until the first batch has been pulled. This lets you return a record batch reader and pass it around at the R level (currently how head/tail/a few other things are implemented), and as long as it's drained all at once (i.e., reader$read_table()) the calls into R will work.

The R code calls within an exec plan won't work with reader$read_next_batch() or the C data interface because there we can't guarantee an event loop.

This also has the benefit of allowing us to inject some cancelability to the ExecPlan since we can check a StopToken after #13635 (ARROW-11841) for an interrupt (for all exec plans). The biggest benefit is, in my view, that the lifecycle of the ExecPlan is more explicit...before, the plan was stopped when the object was deleted but it was written in a way that I didn't understand for a long time. I think a reader subclass makes it more explicit and maybe will help to print out nested queries (since they're no longer eagerly evaluated).

An example of something that didn't work before that now does:

library(arrow, warn.conflicts = FALSE)
#> Some features are not enabled in this build of Arrow. Run `arrow_info()` for more information.
library(dplyr, warn.conflicts = FALSE)

register_scalar_function(
  "times_32",
  function(context, x) x * 32.0,
  int32(),
  float64(),
  auto_convert = TRUE
)

record_batch(a = 1:1000) %>%
  dplyr::mutate(b = times_32(a)) %>%
  as_record_batch_reader() %>%
  as_arrow_table()
#> Table
#> 1000 rows x 2 columns
#> $a <int32>
#> $b <double>

record_batch(a = 1:1000) %>%
  dplyr::mutate(fun_result = times_32(a)) %>%
  head(11) %>%
  dplyr::collect()
#> # A tibble: 11 × 2
#>        a fun_result
#>    <int>      <dbl>
#>  1     1         32
#>  2     2         64
#>  3     3         96
#>  4     4        128
#>  5     5        160
#>  6     6        192
#>  7     7        224
#>  8     8        256
#>  9     9        288
#> 10    10        320
#> 11    11        352

Created on 2022-07-25 by the reprex package (v2.0.1)

@github-actions
Copy link

@github-actions
Copy link

⚠️ Ticket has not been started in JIRA, please click 'Start Progress'.

@paleolimbot
Copy link
Member Author

@github-actions crossbow submit r-valgrind

@github-actions
Copy link

Unable to match any tasks for `r-valgrind`
The Archery job run can be found at: https://github.com/apache/arrow/actions/runs/2739103985

@paleolimbot
Copy link
Member Author

@github-actions crossbow submit test-r-linux-valgrind

@github-actions
Copy link

Revision: 5427055

Submitted crossbow builds: ursacomputing/crossbow @ actions-67d0a81c6d

Task Status
test-r-linux-valgrind Azure

@paleolimbot
Copy link
Member Author

@github-actions crossbow submit test-r-linux-valgrind

@github-actions
Copy link

Revision: 43dec7f

Submitted crossbow builds: ursacomputing/crossbow @ actions-171501a681

Task Status
test-r-linux-valgrind Azure

@paleolimbot
Copy link
Member Author

@github-actions crossbow submit test-r-linux-valgrind

@paleolimbot paleolimbot marked this pull request as ready for review August 29, 2022 12:10
@github-actions
Copy link

Revision: 3997260

Submitted crossbow builds: ursacomputing/crossbow @ actions-b55db16b92

Task Status
test-r-linux-valgrind Azure

@paleolimbot
Copy link
Member Author

A quick summary + reprex to augment the bit I wrote above...this PR (1) undoes the kludges I introduced when getting the user-defined function bits to work and not fail the valgrind check, (2) allows nested exec plans to with user-defined functions to work and (3) allows the result of an exec plan to be inspected (e.g., to print/walk its relation tree or calculate its schema).

Reprex to play with:

library(arrow, warn.conflicts = FALSE)
#> Some features are not enabled in this build of Arrow. Run `arrow_info()` for more information.
library(dplyr, warn.conflicts = FALSE)

# The result of an ExecPlan is now a subclass of the RecordBatchReader
# that more carefully manages the lifecycle of the underlying exec plan
# (which includes not starting it until the first batch has been pulled
# and releasing it as soon as it is no longer needed)
result <- mtcars |> 
  as_arrow_table() |> 
  filter(mpg > 25) |> 
  as_record_batch_reader()

result
#> ExecPlanReader
#> <Status: PLAN_NOT_STARTED>
#> 
#> mpg: double
#> cyl: double
#> disp: double
#> hp: double
#> drat: double
#> wt: double
#> qsec: double
#> vs: double
#> am: double
#> gear: double
#> carb: double
#> 
#> See $metadata for additional Schema metadata
#> 
#> See $Plan() for details.
result$Plan()
#> ExecPlan
#> ExecPlan with 4 nodes:
#> 3:SinkNode{}
#>   2:ProjectNode{projection=[mpg, cyl, disp, hp, drat, wt, qsec, vs, am, gear, carb]}
#>     1:FilterNode{filter=(mpg > 25)}
#>       0:TableSourceNode{}
result$PlanStatus()
#> [1] "PLAN_NOT_STARTED"

as_arrow_table(result)
#> Table
#> 6 rows x 11 columns
#> $mpg <double>
#> $cyl <double>
#> $disp <double>
#> $hp <double>
#> $drat <double>
#> $wt <double>
#> $qsec <double>
#> $vs <double>
#> $am <double>
#> $gear <double>
#> $carb <double>
#> 
#> See $metadata for additional Schema metadata
result$PlanStatus()
#> [1] "PLAN_FINISHED"

# head() on a record batch reader is now fully lazy (i.e., never
# pull batches from its source until requested)
endless_reader <- as_record_batch_reader(
  function() stop("this will error if called"),
  schema = schema()
)

head(endless_reader)
#> RecordBatchReader

Created on 2022-08-29 by the reprex package (v2.0.1)

Copy link
Member

@nealrichardson nealrichardson left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A couple of questions inline. Some bigger ones:

  • If this lets us print nested ExecPlans, let's add tests that show that.
  • Why does this necessarily not work with the C interface? The consumer from the C interface could still evaluate the whole plan in a single R call. I'm thinking of the duckdb integration, for example: I'm pretty sure that's all happening in C++ so it should be allowed. Maybe it does just work--want to add a test of a UDF --> to_duckdb() and see what happens?

.onUnload <- function(...) {
# When running valgrind we need to wait for the thread pools to finish
# running background tasks or else spurious memory leaks may be reported.
if (on_linux_dev()) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why only if on_linux_dev?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See below, too, but I'm a little worried that the waiting will unnecessarily delay a user restarting an R session (e.g., if it's waiting for a GCS read timeout or something). I'd prefer not to do this at all and have the ExecPlan properly clean up after itself when StopProducing() gets called, but that PR isn't quite finished yet I think.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also see below, but this doesn't fix the memory leak so I removed it.

@@ -238,18 +227,13 @@ ExecPlan <- R6Class("ExecPlan",
slice_size <- node$extras$head %||% node$extras$tail
if (!is.null(slice_size)) {
out <- head(out, slice_size)
# We already have everything we need for the head, so StopProducing
self$Stop()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why don't we want/need this anymore?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because of the change to head.RecordBatchReader(), no batches get pulled when head(out) is called (which is need to make sure that the ExecPlan can continue to defer StartProducing()). The C++ implementation of head.RecordBatchReader() takes care of calling Close() on the upstream reader (which, if it's an ExecPlanReader, will call StopProducing(). I should add a test to make sure that's the case though!

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Test added!

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm...the test passes locally and on Windows but fails on all the Linux CI. My guess is that this is related to how the request to stop producing is (or is not) relayed to the source node (or maybe how quickly that happens?).

r/R/table.R Outdated
@@ -331,12 +331,5 @@ as_arrow_table.arrow_dplyr_query <- function(x, ...) {
# See query-engine.R for ExecPlan/Nodes
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can all of this be simplified now to as_arrow_table(as_record_batch_reader(x)) now?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done!

} // namespace arrow

// [[arrow::export]]
void WaitForIdleThreadPool() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do these waits have a timeout? Or could they wait forever?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's currently no option to specify a wait period (which is why I fenced it to on_linux_dev())...I'd like to fire off a few more valgrind checks to make sure that it actually fixes the problem. Practically it doesn't matter (this occurs when shutting down the process).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I should say that the actual waiting doesn't matter (it very much matters if a user tries to restart R and has to wait unnecessarily!)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It also doesn't fix the memory leak anyway, so I removed it.

@paleolimbot paleolimbot requested a review from westonpace August 31, 2022 13:35
@paleolimbot
Copy link
Member Author

@westonpace Whenever you get a chance, the C++ code here needs a set of eyes, particularly because it involves the lifecycle of the ExecPlan (notably, getting multiple ExecPlans to run under the same R event loop so that we can evaluate user-defined functions).

I was hoping that this change would also make it more clear when we StartProducing() and StopProducing()...previously we did call StopProducing() when the result RecordBatchReader was garbage collected but since we don't have any control of when that happens, we couldn't guarantee a prompt stop request for something like head() (I know that a prompt stop request doesn't stop the plan immediately, but it sounds like at some point it will).

I was also hoping this change would fix the sporadic memory leaks observed, which as you'll see from the valgrind crossbow jobs above, it does not. The checks above also confirm that those leaks are not a result of the IO thread pool shutting down (although occasionally one can observe a direct leak of an ExecPlan).

list(
record_batch(
line = c(
"this is the RecordBatchReader that never ends",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

😹

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure I understand the comment. According to the rep doc, this will actually be repeated a finite number of times? Is it possible to create an infinite iterator in R?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Initially it was (you can create a RecordBatchReader from an arbitrary R function that returns record batches or NULL), but for some reason on CI the infinite iterator crashed on CI (but not locally on Mac or Ubuntu, where the head() call properly cancels the input). This PR is trying to make it more clear when StopProducing() actually gets called, even if the ExecPlan is currently rather bad about responding to that request.

I should take this joke out, though, since it's now misleading.

@paleolimbot
Copy link
Member Author

Just bumping this - as Neal noted, it need a review of the C++ from @westonpace or @pitrou since it delves into some object lifecycles with which I'm not all that familiar.

r/src/compute-exec.cpp Outdated Show resolved Hide resolved
r/src/compute-exec.cpp Outdated Show resolved Hide resolved
r/src/compute-exec.cpp Outdated Show resolved Hide resolved
Comment on lines +153 to +160
bool not_finished_yet = plan_->finished().TryAddCallback(
[&plan] { return [plan](const arrow::Status&) {}; });
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, I don't understand: what is the point of the inner callback here? And why capture plan by reference?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is copied from the original implementation ( https://github.com/apache/arrow/blob/master/r/src/compute-exec.cpp#L99-L109 ). I would absolutely love to make this simpler and can will a few things.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok - the CI reminded me why I keep adding this back in after trying to simplify it ( https://github.com/apache/arrow/actions/runs/3055062036/jobs/4927693653#step:9:8990 ). I added a comment here to remind the next person who wants to simplify it, too.

The point of this method is to call plan_->StopProducing() and give up our reference to it, but because it might not be safe to delete it yet, we need something to keep it alive until that happens. I think the outer plan was captured by reference because the inner one is captured by value (maybe so that only one extra reference gets added to the shared pointer).

I would love any leads on a better way to do this!

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, I see. It makes sense, thanks.

r/src/compute-exec.cpp Outdated Show resolved Hide resolved
r/src/compute-exec.cpp Outdated Show resolved Hide resolved
r/src/recordbatchreader.cpp Outdated Show resolved Hide resolved
r/src/recordbatchreader.cpp Outdated Show resolved Hide resolved
r/src/recordbatchreader.cpp Outdated Show resolved Hide resolved
list(
record_batch(
line = c(
"this is the RecordBatchReader that never ends",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure I understand the comment. According to the rep doc, this will actually be repeated a finite number of times? Is it possible to create an infinite iterator in R?

@paleolimbot paleolimbot merged commit 5988363 into apache:master Sep 16, 2022
@paleolimbot paleolimbot deleted the udf-head branch September 16, 2022 01:44
@ursabot
Copy link

ursabot commented Sep 16, 2022

Benchmark runs are scheduled for baseline = 557acf5 and contender = 5988363. 5988363 is a master commit associated with this PR. Results will be available as each benchmark for each run completes.
Conbench compare runs links:
[Finished ⬇️0.0% ⬆️0.0%] ec2-t3-xlarge-us-east-2
[Failed ⬇️0.88% ⬆️0.0%] test-mac-arm
[Failed ⬇️3.1% ⬆️0.0%] ursa-i9-9960x
[Finished ⬇️0.04% ⬆️0.04%] ursa-thinkcentre-m75q
Buildkite builds:
[Finished] 59883630 ec2-t3-xlarge-us-east-2
[Failed] 59883630 test-mac-arm
[Failed] 59883630 ursa-i9-9960x
[Finished] 59883630 ursa-thinkcentre-m75q
[Finished] 557acf52 ec2-t3-xlarge-us-east-2
[Finished] 557acf52 test-mac-arm
[Failed] 557acf52 ursa-i9-9960x
[Finished] 557acf52 ursa-thinkcentre-m75q
Supported benchmarks:
ec2-t3-xlarge-us-east-2: Supported benchmark langs: Python, R. Runs only benchmarks with cloud = True
test-mac-arm: Supported benchmark langs: C++, Python, R
ursa-i9-9960x: Supported benchmark langs: Python, R, JavaScript
ursa-thinkcentre-m75q: Supported benchmark langs: C++, Java

@ursabot
Copy link

ursabot commented Sep 16, 2022

['Python', 'R'] benchmarks have high level of regressions.
test-mac-arm
ursa-i9-9960x

zagto pushed a commit to zagto/arrow that referenced this pull request Oct 7, 2022
…d function (apache#13706)

This PR adds support for more types of queries that include calls to R code (i.e., `map_batches(..., .lazy = TRUE)`, user-defined functions in mutates, arranges, and filters, and custom extension type behaviour). Previously these queries failed because it wasn't possible to guarantee that the exec plan would be completely executed within a call to `RunWithCapturedR()` where we establish an event loop on the main R thread and launch a background thread to do "arrow stuff" that can queue tasks to run on the R thread.

The approach I took here was to stuff more of the ExecPlan-to-RecordBatchReader logic in a subclass of RecordBatchReader that doesn't call `plan->StartProducing()` until the first batch has been pulled. This lets you return a record batch reader and pass it around at the R level (currently how head/tail/a few other things are implemented), and as long as it's drained all at once (i.e., `reader$read_table()`) the calls into R will work. 

The R code calls within an exec plan *won't* work with `reader$read_next_batch()` or the C data interface because there we can't guarantee an event loop.

This also has the benefit of allowing us to inject some cancelability to the ExecPlan since we can check a StopToken after apache#13635 (ARROW-11841) for an interrupt (for all exec plans). The biggest benefit is, in my view, that the lifecycle of the ExecPlan is more explicit...before, the plan was stopped when the object was deleted but it was written in a way that I didn't understand for a long time. I think a reader subclass makes it more explicit and maybe will help to print out nested queries (since they're no longer eagerly evaluated).

An example of something that didn't work before that now does:

``` r
library(arrow, warn.conflicts = FALSE)
#> Some features are not enabled in this build of Arrow. Run `arrow_info()` for more information.
library(dplyr, warn.conflicts = FALSE)

register_scalar_function(
  "times_32",
  function(context, x) x * 32.0,
  int32(),
  float64(),
  auto_convert = TRUE
)

record_batch(a = 1:1000) %>%
  dplyr::mutate(b = times_32(a)) %>%
  as_record_batch_reader() %>%
  as_arrow_table()
#> Table
#> 1000 rows x 2 columns
#> $a <int32>
#> $b <double>

record_batch(a = 1:1000) %>%
  dplyr::mutate(fun_result = times_32(a)) %>%
  head(11) %>%
  dplyr::collect()
#> # A tibble: 11 × 2
#>        a fun_result
#>    <int>      <dbl>
#>  1     1         32
#>  2     2         64
#>  3     3         96
#>  4     4        128
#>  5     5        160
#>  6     6        192
#>  7     7        224
#>  8     8        256
#>  9     9        288
#> 10    10        320
#> 11    11        352
```

<sup>Created on 2022-07-25 by the [reprex package](https://reprex.tidyverse.org) (v2.0.1)</sup>

Lead-authored-by: Dewey Dunnington <[email protected]>
Co-authored-by: Dewey Dunnington <[email protected]>
Signed-off-by: Dewey Dunnington <[email protected]>
fatemehp pushed a commit to fatemehp/arrow that referenced this pull request Oct 17, 2022
…d function (apache#13706)

This PR adds support for more types of queries that include calls to R code (i.e., `map_batches(..., .lazy = TRUE)`, user-defined functions in mutates, arranges, and filters, and custom extension type behaviour). Previously these queries failed because it wasn't possible to guarantee that the exec plan would be completely executed within a call to `RunWithCapturedR()` where we establish an event loop on the main R thread and launch a background thread to do "arrow stuff" that can queue tasks to run on the R thread.

The approach I took here was to stuff more of the ExecPlan-to-RecordBatchReader logic in a subclass of RecordBatchReader that doesn't call `plan->StartProducing()` until the first batch has been pulled. This lets you return a record batch reader and pass it around at the R level (currently how head/tail/a few other things are implemented), and as long as it's drained all at once (i.e., `reader$read_table()`) the calls into R will work. 

The R code calls within an exec plan *won't* work with `reader$read_next_batch()` or the C data interface because there we can't guarantee an event loop.

This also has the benefit of allowing us to inject some cancelability to the ExecPlan since we can check a StopToken after apache#13635 (ARROW-11841) for an interrupt (for all exec plans). The biggest benefit is, in my view, that the lifecycle of the ExecPlan is more explicit...before, the plan was stopped when the object was deleted but it was written in a way that I didn't understand for a long time. I think a reader subclass makes it more explicit and maybe will help to print out nested queries (since they're no longer eagerly evaluated).

An example of something that didn't work before that now does:

``` r
library(arrow, warn.conflicts = FALSE)
#> Some features are not enabled in this build of Arrow. Run `arrow_info()` for more information.
library(dplyr, warn.conflicts = FALSE)

register_scalar_function(
  "times_32",
  function(context, x) x * 32.0,
  int32(),
  float64(),
  auto_convert = TRUE
)

record_batch(a = 1:1000) %>%
  dplyr::mutate(b = times_32(a)) %>%
  as_record_batch_reader() %>%
  as_arrow_table()
#> Table
#> 1000 rows x 2 columns
#> $a <int32>
#> $b <double>

record_batch(a = 1:1000) %>%
  dplyr::mutate(fun_result = times_32(a)) %>%
  head(11) %>%
  dplyr::collect()
#> # A tibble: 11 × 2
#>        a fun_result
#>    <int>      <dbl>
#>  1     1         32
#>  2     2         64
#>  3     3         96
#>  4     4        128
#>  5     5        160
#>  6     6        192
#>  7     7        224
#>  8     8        256
#>  9     9        288
#> 10    10        320
#> 11    11        352
```

<sup>Created on 2022-07-25 by the [reprex package](https://reprex.tidyverse.org) (v2.0.1)</sup>

Lead-authored-by: Dewey Dunnington <[email protected]>
Co-authored-by: Dewey Dunnington <[email protected]>
Signed-off-by: Dewey Dunnington <[email protected]>
paleolimbot added a commit that referenced this pull request Oct 29, 2022
This makes the default `map_batches()` behaviour lazy (i.e., the function is called once per batch as each batch arrives):

``` r
library(arrow, warn.conflicts = FALSE)
#> Some features are not enabled in this build of Arrow. Run `arrow_info()` for more information.

source <- RecordBatchReader$create(
  record_batch(a = 1:10),
  record_batch(a = 11:20)
)

mapped <- map_batches(source, function(x) {
  message("Hi! I'm being evaluated!")
  x
}, .schema = source$schema)

as_arrow_table(mapped)
#> Hi! I'm being evaluated!
#> Hi! I'm being evaluated!
#> Table
#> 20 rows x 1 columns
#> $a <int32>
```

<sup>Created on 2022-10-26 with [reprex v2.0.2](https://reprex.tidyverse.org)</sup>

This was previously a confusing default since piping the resulting `RecordBatchReader` into an `ExecPlan` would fail for some ExecPlans before ARROW-17178 (#13706). This PR commits to the (more optimal/expected) lazy behaviour.

Authored-by: Dewey Dunnington <[email protected]>
Signed-off-by: Dewey Dunnington <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants