From 71bb131b7ce372990c3f99317bdad7b6aff21d85 Mon Sep 17 00:00:00 2001 From: Christopher Dunderdale <47271795+thatstatsguy@users.noreply.github.com> Date: Mon, 22 Aug 2022 18:27:00 +0200 Subject: [PATCH] ARROW-16690: [R][FlightRPC] Additional max_chunksize parameter in do_put method (#13267) **Summary** An additional parameter in Flight do_put to specify chunk size in R. **Problem** Currently, all data is sent through in a single message. It's a likely scenario that users will want the ability to control the batch sizes without building a custom do_put method. **Solution** Additional (optional) parameter to specify chunk size. Lead-authored-by: Christopher.Dunderdale Co-authored-by: Christopher Dunderdale <47271795+thatstatsguy@users.noreply.github.com> Signed-off-by: Dewey Dunnington --- r/R/flight.R | 11 +++++++++-- r/man/flight_put.Rd | 5 ++++- r/tests/testthat/test-python-flight.R | 14 ++++++++++++++ 3 files changed, 27 insertions(+), 3 deletions(-) diff --git a/r/R/flight.R b/r/R/flight.R index f56308f958495..0bd661e58d565 100644 --- a/r/R/flight.R +++ b/r/R/flight.R @@ -56,9 +56,11 @@ flight_disconnect <- function(client) { #' @param overwrite logical: if `path` exists on `client` already, should we #' replace it with the contents of `data`? Default is `TRUE`; if `FALSE` and #' `path` exists, the function will error. +#' @param max_chunksize integer: Maximum size for RecordBatch chunks when a `data.frame` is sent. +#' Individual chunks may be smaller depending on the chunk layout of individual columns. #' @return `client`, invisibly. #' @export -flight_put <- function(client, data, path, overwrite = TRUE) { +flight_put <- function(client, data, path, overwrite = TRUE, max_chunksize = NULL) { assert_is(data, c("data.frame", "Table", "RecordBatch")) if (!overwrite && flight_path_exists(client, path)) { @@ -70,8 +72,13 @@ flight_put <- function(client, data, path, overwrite = TRUE) { py_data <- reticulate::r_to_py(data) writer <- client$do_put(descriptor_for_path(path), py_data$schema)[[1]] - if (inherits(data, "RecordBatch")) { + if (inherits(data, "RecordBatch") && !is.null(max_chunksize)) { + warning("`max_chunksize` is not supported for flight_put with RecordBatch") writer$write_batch(py_data) + } else if (inherits(data, "RecordBatch")) { + writer$write_batch(py_data) + } else if (!is.null(max_chunksize)) { + writer$write_table(py_data, max_chunksize) } else { writer$write_table(py_data) } diff --git a/r/man/flight_put.Rd b/r/man/flight_put.Rd index 13a8da16fead5..c306b0f7bb9e0 100644 --- a/r/man/flight_put.Rd +++ b/r/man/flight_put.Rd @@ -4,7 +4,7 @@ \alias{flight_put} \title{Send data to a Flight server} \usage{ -flight_put(client, data, path, overwrite = TRUE) +flight_put(client, data, path, overwrite = TRUE, max_chunksize = NULL) } \arguments{ \item{client}{\code{pyarrow.flight.FlightClient}, as returned by \code{\link[=flight_connect]{flight_connect()}}} @@ -16,6 +16,9 @@ flight_put(client, data, path, overwrite = TRUE) \item{overwrite}{logical: if \code{path} exists on \code{client} already, should we replace it with the contents of \code{data}? Default is \code{TRUE}; if \code{FALSE} and \code{path} exists, the function will error.} + +\item{max_chunksize}{integer: Maximum size for RecordBatch chunks when a \code{data.frame} is sent. +Individual chunks may be smaller depending on the chunk layout of individual columns.} } \value{ \code{client}, invisibly. diff --git a/r/tests/testthat/test-python-flight.R b/r/tests/testthat/test-python-flight.R index 6fdf38f815ba6..f41abed1ffa80 100644 --- a/r/tests/testthat/test-python-flight.R +++ b/r/tests/testthat/test-python-flight.R @@ -37,6 +37,20 @@ if (process_is_running("demo_flight_server")) { regexp = 'data must be a "data.frame", "Table", or "RecordBatch"' ) }) + + test_that("flight_put with max_chunksize", { + flight_put(client, example_data, path = flight_obj, max_chunksize = 1) + expect_true(flight_path_exists(client, flight_obj)) + expect_true(flight_obj %in% list_flights(client)) + expect_warning( + flight_put(client, record_batch(example_data), path = flight_obj, max_chunksize = 123), + regexp = "`max_chunksize` is not supported for flight_put with RecordBatch" + ) + expect_error( + flight_put(client, Array$create(c(1:3)), path = flight_obj), + regexp = 'data must be a "data.frame", "Table", or "RecordBatch"' + ) + }) test_that("flight_get", { expect_identical(as.data.frame(flight_get(client, flight_obj)), example_data)