diff --git a/r/R/parquet.R b/r/R/parquet.R index 83f520b352fef..aaf21e16cadeb 100644 --- a/r/R/parquet.R +++ b/r/R/parquet.R @@ -15,6 +15,127 @@ # specific language governing permissions and limitations # under the License. +#' Read a Parquet file +#' +#' '[Parquet](https://parquet.apache.org/)' is a columnar storage file format. +#' This function enables you to read Parquet files into R. +#' +#' @inheritParams read_delim_arrow +#' @param props [ParquetReaderProperties] +#' @param ... Additional arguments passed to `ParquetFileReader$create()` +#' +#' @return A [arrow::Table][Table], or a `data.frame` if `as_data_frame` is +#' `TRUE`. +#' @examples +#' \donttest{ +#' df <- read_parquet(system.file("v0.7.1.parquet", package="arrow")) +#' head(df) +#' } +#' @export +read_parquet <- function(file, + col_select = NULL, + as_data_frame = TRUE, + props = ParquetReaderProperties$create(), + ...) { + reader <- ParquetFileReader$create(file, props = props, ...) + tab <- reader$ReadTable(!!enquo(col_select)) + + if (as_data_frame) { + tab <- as.data.frame(tab) + } + tab +} + +#' Write Parquet file to disk +#' +#' [Parquet](https://parquet.apache.org/) is a columnar storage file format. +#' This function enables you to write Parquet files from R. +#' +#' @param table An [arrow::Table][Table], or an object convertible to it. +#' @param sink an [arrow::io::OutputStream][OutputStream] or a string which is interpreted as a file path +#' @param chunk_size chunk size. If NULL, the number of rows of the table is used +#' +#' @param version parquet version +#' @param compression compression specification. Possible values: +#' - a single string: uses that compression algorithm for all columns +#' - an unnamed string vector: specify a compression algorithm for each, same order as the columns +#' - a named string vector: specify compression algorithm individually +#' @param compression_level compression level. A single integer, a named integer vector +#' or an unnamed integer vector of the same size as the number of columns of `table` +#' @param use_dictionary Specify if we should use dictionary encoding. +#' @param write_statistics Specify if we should write statistics +#' @param data_page_size Set a target threshhold for the approximate encoded size of data +#' pages within a column chunk. If omitted, the default data page size (1Mb) is used. +#' @param properties properties for parquet writer, derived from arguments `version`, `compression`, `compression_level`, `use_dictionary`, `write_statistics` and `data_page_size` +#' +#' @param use_deprecated_int96_timestamps Write timestamps to INT96 Parquet format +#' @param coerce_timestamps Cast timestamps a particular resolution. can be NULL, "ms" or "us" +#' @param allow_truncated_timestamps Allow loss of data when coercing timestamps to a particular +#' resolution. E.g. if microsecond or nanosecond data is lost when coercing to +#' ms', do not raise an exception +#' +#' @param arrow_properties arrow specific writer properties, derived from arguments `use_deprecated_int96_timestamps`, `coerce_timestamps` and `allow_truncated_timestamps` +#' +#' @examples +#' \donttest{ +#' tf1 <- tempfile(fileext = ".parquet") +#' write_parquet(data.frame(x = 1:5), tf2) +#' +#' # using compression +#' tf2 <- tempfile(fileext = ".gz.parquet") +#' write_parquet(data.frame(x = 1:5), compression = "gzip", compression_level = 5) +#' +#' } +#' @export +write_parquet <- function(table, + sink, + chunk_size = NULL, + + # writer properties + version = NULL, + compression = NULL, + compression_level = NULL, + use_dictionary = NULL, + write_statistics = NULL, + data_page_size = NULL, + + properties = ParquetWriterProperties$create( + table, + version = version, + compression = compression, + compression_level = compression_level, + use_dictionary = use_dictionary, + write_statistics = write_statistics, + data_page_size = data_page_size + ), + + # arrow writer properties + use_deprecated_int96_timestamps = FALSE, + coerce_timestamps = NULL, + allow_truncated_timestamps = FALSE, + + arrow_properties = ParquetArrowWriterProperties$create( + use_deprecated_int96_timestamps = use_deprecated_int96_timestamps, + coerce_timestamps = coerce_timestamps, + allow_truncated_timestamps = allow_truncated_timestamps + ) +) { + table <- to_arrow(table) + + if (is.character(sink)) { + sink <- FileOutputStream$create(sink) + on.exit(sink$close()) + } else if (!inherits(sink, OutputStream)) { + abort("sink must be a file path or an OutputStream") + } + + schema <- table$schema + writer <- ParquetFileWriter$create(schema, sink, properties = properties, arrow_properties = arrow_properties) + writer$WriteTable(table, chunk_size = chunk_size %||% table$num_rows) + writer$Close() +} + + ParquetArrowWriterPropertiesBuilder <- R6Class("ParquetArrowWriterPropertiesBuilder", inherit = Object, public = list( store_schema = function() { @@ -193,125 +314,6 @@ ParquetFileWriter$create <- function( ) } -#' Write Parquet file to disk -#' -#' [Parquet](https://parquet.apache.org/) is a columnar storage file format. -#' This function enables you to write Parquet files from R. -#' -#' @param table An [arrow::Table][Table], or an object convertible to it. -#' @param sink an [arrow::io::OutputStream][OutputStream] or a string which is interpreted as a file path -#' @param chunk_size chunk size. If NULL, the number of rows of the table is used -#' -#' @param version parquet version -#' @param compression compression specification. Possible values: -#' - a single string: uses that compression algorithm for all columns -#' - an unnamed string vector: specify a compression algorithm for each, same order as the columns -#' - a named string vector: specify compression algorithm individually -#' @param compression_level compression level. A single integer, a named integer vector -#' or an unnamed integer vector of the same size as the number of columns of `table` -#' @param use_dictionary Specify if we should use dictionary encoding. -#' @param write_statistics Specify if we should write statistics -#' @param data_page_size Set a target threshhold for the approximate encoded size of data -#' pages within a column chunk. If omitted, the default data page size (1Mb) is used. -#' @param properties properties for parquet writer, derived from arguments `version`, `compression`, `compression_level`, `use_dictionary`, `write_statistics` and `data_page_size` -#' -#' @param use_deprecated_int96_timestamps Write timestamps to INT96 Parquet format -#' @param coerce_timestamps Cast timestamps a particular resolution. can be NULL, "ms" or "us" -#' @param allow_truncated_timestamps Allow loss of data when coercing timestamps to a particular -#' resolution. E.g. if microsecond or nanosecond data is lost when coercing to -#' ms', do not raise an exception -#' -#' @param arrow_properties arrow specific writer properties, derived from arguments `use_deprecated_int96_timestamps`, `coerce_timestamps` and `allow_truncated_timestamps` -#' -#' @examples -#' \donttest{ -#' tf1 <- tempfile(fileext = ".parquet") -#' write_parquet(data.frame(x = 1:5), tf2) -#' -#' # using compression -#' tf2 <- tempfile(fileext = ".gz.parquet") -#' write_parquet(data.frame(x = 1:5), compression = "gzip", compression_level = 5) -#' -#' } -#' @export -write_parquet <- function(table, - sink, - chunk_size = NULL, - - # writer properties - version = NULL, - compression = NULL, - compression_level = NULL, - use_dictionary = NULL, - write_statistics = NULL, - data_page_size = NULL, - - properties = ParquetWriterProperties$create( - table, - version = version, - compression = compression, - compression_level = compression_level, - use_dictionary = use_dictionary, - write_statistics = write_statistics, - data_page_size = data_page_size - ), - - # arrow writer properties - use_deprecated_int96_timestamps = FALSE, - coerce_timestamps = NULL, - allow_truncated_timestamps = FALSE, - - arrow_properties = ParquetArrowWriterProperties$create( - use_deprecated_int96_timestamps = use_deprecated_int96_timestamps, - coerce_timestamps = coerce_timestamps, - allow_truncated_timestamps = allow_truncated_timestamps - ) -) { - table <- to_arrow(table) - - if (is.character(sink)) { - sink <- FileOutputStream$create(sink) - on.exit(sink$close()) - } else if (!inherits(sink, OutputStream)) { - abort("sink must be a file path or an OutputStream") - } - - schema <- table$schema - writer <- ParquetFileWriter$create(schema, sink, properties = properties, arrow_properties = arrow_properties) - writer$WriteTable(table, chunk_size = chunk_size %||% table$num_rows) - writer$Close() -} - -#' Read a Parquet file -#' -#' '[Parquet](https://parquet.apache.org/)' is a columnar storage file format. -#' This function enables you to read Parquet files into R. -#' -#' @inheritParams read_delim_arrow -#' @param props [ParquetReaderProperties] -#' @param ... Additional arguments passed to `ParquetFileReader$create()` -#' -#' @return A [arrow::Table][Table], or a `data.frame` if `as_data_frame` is -#' `TRUE`. -#' @examples -#' \donttest{ -#' df <- read_parquet(system.file("v0.7.1.parquet", package="arrow")) -#' head(df) -#' } -#' @export -read_parquet <- function(file, - col_select = NULL, - as_data_frame = TRUE, - props = ParquetReaderProperties$create(), - ...) { - reader <- ParquetFileReader$create(file, props = props, ...) - tab <- reader$ReadTable(!!enquo(col_select)) - - if (as_data_frame) { - tab <- as.data.frame(tab) - } - tab -} #' @title ParquetFileReader class #' @rdname ParquetFileReader