Skip to content

Commit

Permalink
Remove 'array_expression' class
Browse files Browse the repository at this point in the history
  • Loading branch information
nealrichardson committed May 10, 2021
1 parent 2e6374f commit b31fb5e
Show file tree
Hide file tree
Showing 9 changed files with 79 additions and 279 deletions.
4 changes: 0 additions & 4 deletions r/NAMESPACE
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ S3method("[[<-",Schema)
S3method("names<-",ArrowTabular)
S3method(Ops,ArrowDatum)
S3method(Ops,Expression)
S3method(Ops,array_expression)
S3method(all,ArrowDatum)
S3method(all,equal.ArrowObject)
S3method(any,ArrowDatum)
Expand All @@ -37,7 +36,6 @@ S3method(as.list,ArrowTabular)
S3method(as.list,Schema)
S3method(as.raw,Buffer)
S3method(as.vector,ArrowDatum)
S3method(as.vector,array_expression)
S3method(c,Dataset)
S3method(dim,ArrowTabular)
S3method(dim,Dataset)
Expand All @@ -51,7 +49,6 @@ S3method(head,arrow_dplyr_query)
S3method(is.na,ArrowDatum)
S3method(is.na,Expression)
S3method(is.na,Scalar)
S3method(is.na,array_expression)
S3method(is.nan,ArrowDatum)
S3method(is_in,ArrowDatum)
S3method(is_in,default)
Expand Down Expand Up @@ -80,7 +77,6 @@ S3method(names,StructArray)
S3method(names,Table)
S3method(names,arrow_dplyr_query)
S3method(print,"arrow-enum")
S3method(print,array_expression)
S3method(print,arrow_dplyr_query)
S3method(print,arrow_info)
S3method(print,arrow_r_metadata)
Expand Down
71 changes: 67 additions & 4 deletions r/R/arrow-datum.R
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,73 @@ as.vector.ArrowDatum <- function(x, mode) {
)
}

#' @export
Ops.ArrowDatum <- function(e1, e2) {
if (.Generic == "!") {
eval_array_expression(.Generic, e1)
} else if (.Generic %in% names(.array_function_map)) {
eval_array_expression(.Generic, e1, e2)
} else {
stop(paste0("Unsupported operation on `", class(e1)[1L], "` : "), .Generic, call. = FALSE)
}
}

# Wrapper around call_function that:
# (1) maps R function names to Arrow C++ compute ("/" --> "divide_checked")
# (2) wraps R input args as Array or Scalar
eval_array_expression <- function(FUN,
...,
args = list(...),
options = empty_named_list()) {
if (FUN == "-" && length(args) == 1L) {
if (inherits(args[[1]], "ArrowObject")) {
return(eval_array_expression("negate_checked", args[[1]]))
} else {
return(-args[[1]])
}
}
args <- lapply(args, .wrap_arrow, FUN)

# In Arrow, "divide" is one function, which does integer division on
# integer inputs and floating-point division on floats
if (FUN == "/") {
# TODO: omg so many ways it's wrong to assume these types
args <- map(args, ~.$cast(float64()))
} else if (FUN == "%/%") {
# In R, integer division works like floor(float division)
out <- eval_array_expression("/", args = args, options = options)
return(out$cast(int32(), allow_float_truncate = TRUE))
} else if (FUN == "%%") {
# {e1 - e2 * ( e1 %/% e2 )}
# ^^^ form doesn't work because Ops.Array evaluates eagerly,
# but we can build that up
quotient <- eval_array_expression("%/%", args = args)
base <- eval_array_expression("*", quotient, args[[2]])
# this cast is to ensure that the result of this and e1 are the same
# (autocasting only applies to scalars)
base <- base$cast(args[[1]]$type)
return(eval_array_expression("-", args[[1]], base))
}

call_function(
.array_function_map[[FUN]] %||% FUN,
args = args,
options = options
)
}

.wrap_arrow <- function(arg, fun) {
if (!inherits(arg, "ArrowObject")) {
# TODO: Array$create if lengths are equal?
if (fun == "%in%") {
arg <- Array$create(arg)
} else {
arg <- Scalar$create(arg)
}
}
arg
}

#' @export
na.omit.ArrowDatum <- function(object, ...){
object$Filter(!is.na(object))
Expand All @@ -66,10 +133,6 @@ filter_rows <- function(x, i, keep_na = TRUE, ...) {
# General purpose function for [ row subsetting with R semantics
# Based on the input for `i`, calls x$Filter, x$Slice, or x$Take
nrows <- x$num_rows %||% x$length() # Depends on whether Array or Table-like
if (inherits(i, "array_expression")) {
# Evaluate it
i <- eval_array_expression(i)
}
if (is.logical(i)) {
if (isTRUE(i)) {
# Shortcut without doing any work
Expand Down
1 change: 0 additions & 1 deletion r/R/arrow-package.R
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@
# Create these once, at package build time
if (arrow_available()) {
dplyr_functions$dataset <- build_function_list(build_dataset_expression)
dplyr_functions$array <- build_function_list(build_array_expression)
}
invisible()
}
Expand Down
6 changes: 3 additions & 3 deletions r/R/arrow-tabular.R
Original file line number Diff line number Diff line change
Expand Up @@ -223,13 +223,13 @@ na.fail.ArrowTabular <- function(object, ...){

#' @export
na.omit.ArrowTabular <- function(object, ...){
not_na <- map(object$columns, ~build_array_expression("is_valid", .x))
not_na <- map(object$columns, ~call_function("is_valid", .x))
not_na_agg <- Reduce("&", not_na)
object$Filter(eval_array_expression(not_na_agg))
object$Filter(not_na_agg)
}

#' @export
na.exclude.ArrowTabular <- na.omit.ArrowTabular
na.exclude.ArrowTabular <- na.omit.ArrowTabular

ToString_tabular <- function(x, ...) {
# Generic to work with both RecordBatch and Table
Expand Down
44 changes: 1 addition & 43 deletions r/R/dplyr.R
Original file line number Diff line number Diff line change
Expand Up @@ -776,52 +776,10 @@ summarise.arrow_dplyr_query <- function(.data, ...) {
dplyr::group_vars(.data) # vars needed for grouping
))
.data <- dplyr::select(.data, vars_to_keep)
if (isTRUE(getOption("arrow.summarize", FALSE))) {
# Try stuff, if successful return()
out <- try(do_arrow_group_by(.data, ...), silent = TRUE)
if (inherits(out, "try-error")) {
return(abandon_ship(call, .data, format(out)))
} else {
return(out)
}
} else {
# If unsuccessful or if option not set, do the work in R
dplyr::summarise(dplyr::collect(.data), ...)
}
dplyr::summarise(dplyr::collect(.data), ...)
}
summarise.Dataset <- summarise.ArrowTabular <- summarise.arrow_dplyr_query

do_arrow_group_by <- function(.data, ...) {
exprs <- quos(...)
mask <- arrow_mask(.data)
# Add aggregation wrappers to arrow_mask somehow
# (this is not ideal, would overwrite same-named objects)
mask$sum <- function(x, na.rm = FALSE) {
list(
fun = "sum",
data = x,
options = list(na.rm = na.rm)
)
}
results <- list()
for (i in seq_along(exprs)) {
# Iterate over the indices and not the names because names may be repeated
# (which overwrites the previous name)
new_var <- names(exprs)[i]
results[[new_var]] <- arrow_eval(exprs[[i]], mask)
if (inherits(results[[new_var]], "try-error")) {
msg <- paste('Expression', as_label(exprs[[i]]), 'not supported in Arrow')
stop(msg, call. = FALSE)
}
# Put it in the data mask too?
#mask[[new_var]] <- mask$.data[[new_var]] <- results[[new_var]]
}
# Now, from that, split out the array (expressions) and options
opts <- lapply(results, function(x) x[c("fun", "options")])
inputs <- lapply(results, function(x) eval_array_expression(x$data, .data$.data))
grouping_vars <- lapply(.data$group_by_vars, function(x) eval_array_expression(.data$selected_columns[[x]], .data$.data))
compute__GroupBy(inputs, grouping_vars, opts)
}

group_by.arrow_dplyr_query <- function(.data,
...,
Expand Down
174 changes: 0 additions & 174 deletions r/R/expression.R
Original file line number Diff line number Diff line change
Expand Up @@ -17,100 +17,6 @@

#' @include arrowExports.R

array_expression <- function(FUN,
...,
args = list(...),
options = empty_named_list()) {
structure(
list(
fun = FUN,
args = args,
options = options
),
class = "array_expression"
)
}

#' @export
Ops.ArrowDatum <- function(e1, e2) {
if (.Generic == "!") {
eval_array_expression(build_array_expression(.Generic, e1))
} else if (.Generic %in% names(.array_function_map)) {
eval_array_expression(build_array_expression(.Generic, e1, e2))
} else {
stop(paste0("Unsupported operation on `", class(e1)[1L], "` : "), .Generic, call. = FALSE)
}
}

#' @export
Ops.array_expression <- function(e1, e2) {
if (.Generic == "!") {
build_array_expression(.Generic, e1)
} else {
build_array_expression(.Generic, e1, e2)
}
}

build_array_expression <- function(FUN,
...,
args = list(...),
options = empty_named_list()) {
if (FUN == "-" && length(args) == 1L) {
if (inherits(args[[1]], c("ArrowObject", "array_expression"))) {
return(build_array_expression("negate_checked", args[[1]]))
} else {
return(-args[[1]])
}
}
args <- lapply(args, .wrap_arrow, FUN)

# In Arrow, "divide" is one function, which does integer division on
# integer inputs and floating-point division on floats
if (FUN == "/") {
# TODO: omg so many ways it's wrong to assume these types
args <- lapply(args, cast_array_expression, float64())
} else if (FUN == "%/%") {
# In R, integer division works like floor(float division)
out <- build_array_expression("/", args = args, options = options)
return(cast_array_expression(out, int32(), allow_float_truncate = TRUE))
} else if (FUN == "%%") {
# {e1 - e2 * ( e1 %/% e2 )}
# ^^^ form doesn't work because Ops.Array evaluates eagerly,
# but we can build that up
quotient <- build_array_expression("%/%", args = args)
base <- build_array_expression("*", quotient, args[[2]])
# this cast is to ensure that the result of this and e1 are the same
# (autocasting only applies to scalars)
base <- cast_array_expression(base, args[[1]]$type)
return(build_array_expression("-", args[[1]], base))
}

array_expression(.array_function_map[[FUN]] %||% FUN, args = args, options = options)
}

cast_array_expression <- function(x, to_type, safe = TRUE, ...) {
opts <- list(
to_type = to_type,
allow_int_overflow = !safe,
allow_time_truncate = !safe,
allow_float_truncate = !safe
)
array_expression("cast", x, options = modifyList(opts, list(...)))
}

.wrap_arrow <- function(arg, fun) {
if (!inherits(arg, c("ArrowObject", "array_expression"))) {
# TODO: Array$create if lengths are equal?
# TODO: these kernels should autocast like the dataset ones do (e.g. int vs. float)
if (fun == "%in%") {
arg <- Array$create(arg)
} else {
arg <- Scalar$create(arg)
}
}
arg
}

.unary_function_map <- list(
"!" = "invert",
"as.factor" = "dictionary_encode",
Expand Down Expand Up @@ -150,86 +56,6 @@ cast_array_expression <- function(x, to_type, safe = TRUE, ...) {

.array_function_map <- c(.unary_function_map, .binary_function_map)

eval_array_expression <- function(x, data = NULL) {
if (!is.null(data)) {
x <- bind_array_refs(x, data)
}
if (!inherits(x, "array_expression")) {
# Nothing to evaluate
return(x)
}
x$args <- lapply(x$args, function (a) {
if (inherits(a, "array_expression")) {
eval_array_expression(a)
} else {
a
}
})
if (x$fun == "is_in_meta_binary" && inherits(x$args[[2]], "Scalar")) {
x$args[[2]] <- Array$create(x$args[[2]])
}
call_function(x$fun, args = x$args, options = x$options %||% empty_named_list())
}

find_array_refs <- function(x) {
if (identical(x$fun, "array_ref")) {
out <- x$args$field_name
} else {
out <- lapply(x$args, find_array_refs)
}
unlist(out)
}

# Take an array_expression and replace array_refs with arrays/chunkedarrays from data
bind_array_refs <- function(x, data) {
if (inherits(x, "array_expression")) {
if (identical(x$fun, "array_ref")) {
x <- data[[x$args$field_name]]
} else {
x$args <- lapply(x$args, bind_array_refs, data)
}
}
x
}

#' @export
is.na.array_expression <- function(x) array_expression("is.na", x)

#' @export
as.vector.array_expression <- function(x, ...) {
as.vector(eval_array_expression(x))
}

#' @export
print.array_expression <- function(x, ...) {
cat(.format_array_expression(x), "\n", sep = "")
invisible(x)
}

.format_array_expression <- function(x) {
printed_args <- map_chr(x$args, function(arg) {
if (inherits(arg, "Scalar")) {
deparse(as.vector(arg))
} else if (inherits(arg, "ArrowObject")) {
paste0("<", class(arg)[1], ">")
} else if (inherits(arg, "array_expression")) {
.format_array_expression(arg)
} else {
# Should not happen
deparse(arg)
}
})
if (identical(x$fun, "array_ref")) {
x$args$field_name
} else {
# Prune this for readability
function_name <- sub("_kleene", "", x$fun)
paste0(function_name, "(", paste(printed_args, collapse = ", "), ")")
}
}

###########

#' Arrow expressions
#'
#' @description
Expand Down
6 changes: 3 additions & 3 deletions r/tests/testthat/test-dplyr-filter.R
Original file line number Diff line number Diff line change
Expand Up @@ -303,9 +303,9 @@ test_that("filter environment scope", {

skip("Need to substitute in user defined function too")
# TODO: fix this: this isEqualTo function is eagerly evaluating; it should
# instead yield array_expressions. Probably bc the parent env of the function
# has the Ops.Array methods defined; we need to move it so that the parent
# env is the data mask we use in the dplyr eval
# instead yield Expressions. Probably bc the parent env of the function
# has the Ops.Expression methods defined; we need to move it so that the
# parent env is the data mask we use in the dplyr eval
isEqualTo <- function(x, y) x == y & !is.na(x)
expect_dplyr_equal(
input %>%
Expand Down
Loading

0 comments on commit b31fb5e

Please sign in to comment.