Skip to content

Commit

Permalink
Move read_parquet() and write_parquet() to top of the file
Browse files Browse the repository at this point in the history
  • Loading branch information
romainfrancois committed Sep 27, 2019
1 parent 45ec63b commit 56dac33
Showing 1 changed file with 121 additions and 119 deletions.
240 changes: 121 additions & 119 deletions r/R/parquet.R
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,127 @@
# specific language governing permissions and limitations
# under the License.

#' Read a Parquet file
#'
#' '[Parquet](https://parquet.apache.org/)' is a columnar storage file format.
#' This function enables you to read Parquet files into R.
#'
#' @inheritParams read_delim_arrow
#' @param props [ParquetReaderProperties]
#' @param ... Additional arguments passed to `ParquetFileReader$create()`
#'
#' @return A [arrow::Table][Table], or a `data.frame` if `as_data_frame` is
#' `TRUE`.
#' @examples
#' \donttest{
#' df <- read_parquet(system.file("v0.7.1.parquet", package="arrow"))
#' head(df)
#' }
#' @export
read_parquet <- function(file,
col_select = NULL,
as_data_frame = TRUE,
props = ParquetReaderProperties$create(),
...) {
reader <- ParquetFileReader$create(file, props = props, ...)
tab <- reader$ReadTable(!!enquo(col_select))

if (as_data_frame) {
tab <- as.data.frame(tab)
}
tab
}

#' Write Parquet file to disk
#'
#' [Parquet](https://parquet.apache.org/) is a columnar storage file format.
#' This function enables you to write Parquet files from R.
#'
#' @param table An [arrow::Table][Table], or an object convertible to it.
#' @param sink an [arrow::io::OutputStream][OutputStream] or a string which is interpreted as a file path
#' @param chunk_size chunk size. If NULL, the number of rows of the table is used
#'
#' @param version parquet version
#' @param compression compression specification. Possible values:
#' - a single string: uses that compression algorithm for all columns
#' - an unnamed string vector: specify a compression algorithm for each, same order as the columns
#' - a named string vector: specify compression algorithm individually
#' @param compression_level compression level. A single integer, a named integer vector
#' or an unnamed integer vector of the same size as the number of columns of `table`
#' @param use_dictionary Specify if we should use dictionary encoding.
#' @param write_statistics Specify if we should write statistics
#' @param data_page_size Set a target threshhold for the approximate encoded size of data
#' pages within a column chunk. If omitted, the default data page size (1Mb) is used.
#' @param properties properties for parquet writer, derived from arguments `version`, `compression`, `compression_level`, `use_dictionary`, `write_statistics` and `data_page_size`
#'
#' @param use_deprecated_int96_timestamps Write timestamps to INT96 Parquet format
#' @param coerce_timestamps Cast timestamps a particular resolution. can be NULL, "ms" or "us"
#' @param allow_truncated_timestamps Allow loss of data when coercing timestamps to a particular
#' resolution. E.g. if microsecond or nanosecond data is lost when coercing to
#' ms', do not raise an exception
#'
#' @param arrow_properties arrow specific writer properties, derived from arguments `use_deprecated_int96_timestamps`, `coerce_timestamps` and `allow_truncated_timestamps`
#'
#' @examples
#' \donttest{
#' tf1 <- tempfile(fileext = ".parquet")
#' write_parquet(data.frame(x = 1:5), tf2)
#'
#' # using compression
#' tf2 <- tempfile(fileext = ".gz.parquet")
#' write_parquet(data.frame(x = 1:5), compression = "gzip", compression_level = 5)
#'
#' }
#' @export
write_parquet <- function(table,
sink,
chunk_size = NULL,

# writer properties
version = NULL,
compression = NULL,
compression_level = NULL,
use_dictionary = NULL,
write_statistics = NULL,
data_page_size = NULL,

properties = ParquetWriterProperties$create(
table,
version = version,
compression = compression,
compression_level = compression_level,
use_dictionary = use_dictionary,
write_statistics = write_statistics,
data_page_size = data_page_size
),

# arrow writer properties
use_deprecated_int96_timestamps = FALSE,
coerce_timestamps = NULL,
allow_truncated_timestamps = FALSE,

arrow_properties = ParquetArrowWriterProperties$create(
use_deprecated_int96_timestamps = use_deprecated_int96_timestamps,
coerce_timestamps = coerce_timestamps,
allow_truncated_timestamps = allow_truncated_timestamps
)
) {
table <- to_arrow(table)

if (is.character(sink)) {
sink <- FileOutputStream$create(sink)
on.exit(sink$close())
} else if (!inherits(sink, OutputStream)) {
abort("sink must be a file path or an OutputStream")
}

schema <- table$schema
writer <- ParquetFileWriter$create(schema, sink, properties = properties, arrow_properties = arrow_properties)
writer$WriteTable(table, chunk_size = chunk_size %||% table$num_rows)
writer$Close()
}


ParquetArrowWriterPropertiesBuilder <- R6Class("ParquetArrowWriterPropertiesBuilder", inherit = Object,
public = list(
store_schema = function() {
Expand Down Expand Up @@ -193,125 +314,6 @@ ParquetFileWriter$create <- function(
)
}

#' Write Parquet file to disk
#'
#' [Parquet](https://parquet.apache.org/) is a columnar storage file format.
#' This function enables you to write Parquet files from R.
#'
#' @param table An [arrow::Table][Table], or an object convertible to it.
#' @param sink an [arrow::io::OutputStream][OutputStream] or a string which is interpreted as a file path
#' @param chunk_size chunk size. If NULL, the number of rows of the table is used
#'
#' @param version parquet version
#' @param compression compression specification. Possible values:
#' - a single string: uses that compression algorithm for all columns
#' - an unnamed string vector: specify a compression algorithm for each, same order as the columns
#' - a named string vector: specify compression algorithm individually
#' @param compression_level compression level. A single integer, a named integer vector
#' or an unnamed integer vector of the same size as the number of columns of `table`
#' @param use_dictionary Specify if we should use dictionary encoding.
#' @param write_statistics Specify if we should write statistics
#' @param data_page_size Set a target threshhold for the approximate encoded size of data
#' pages within a column chunk. If omitted, the default data page size (1Mb) is used.
#' @param properties properties for parquet writer, derived from arguments `version`, `compression`, `compression_level`, `use_dictionary`, `write_statistics` and `data_page_size`
#'
#' @param use_deprecated_int96_timestamps Write timestamps to INT96 Parquet format
#' @param coerce_timestamps Cast timestamps a particular resolution. can be NULL, "ms" or "us"
#' @param allow_truncated_timestamps Allow loss of data when coercing timestamps to a particular
#' resolution. E.g. if microsecond or nanosecond data is lost when coercing to
#' ms', do not raise an exception
#'
#' @param arrow_properties arrow specific writer properties, derived from arguments `use_deprecated_int96_timestamps`, `coerce_timestamps` and `allow_truncated_timestamps`
#'
#' @examples
#' \donttest{
#' tf1 <- tempfile(fileext = ".parquet")
#' write_parquet(data.frame(x = 1:5), tf2)
#'
#' # using compression
#' tf2 <- tempfile(fileext = ".gz.parquet")
#' write_parquet(data.frame(x = 1:5), compression = "gzip", compression_level = 5)
#'
#' }
#' @export
write_parquet <- function(table,
sink,
chunk_size = NULL,

# writer properties
version = NULL,
compression = NULL,
compression_level = NULL,
use_dictionary = NULL,
write_statistics = NULL,
data_page_size = NULL,

properties = ParquetWriterProperties$create(
table,
version = version,
compression = compression,
compression_level = compression_level,
use_dictionary = use_dictionary,
write_statistics = write_statistics,
data_page_size = data_page_size
),

# arrow writer properties
use_deprecated_int96_timestamps = FALSE,
coerce_timestamps = NULL,
allow_truncated_timestamps = FALSE,

arrow_properties = ParquetArrowWriterProperties$create(
use_deprecated_int96_timestamps = use_deprecated_int96_timestamps,
coerce_timestamps = coerce_timestamps,
allow_truncated_timestamps = allow_truncated_timestamps
)
) {
table <- to_arrow(table)

if (is.character(sink)) {
sink <- FileOutputStream$create(sink)
on.exit(sink$close())
} else if (!inherits(sink, OutputStream)) {
abort("sink must be a file path or an OutputStream")
}

schema <- table$schema
writer <- ParquetFileWriter$create(schema, sink, properties = properties, arrow_properties = arrow_properties)
writer$WriteTable(table, chunk_size = chunk_size %||% table$num_rows)
writer$Close()
}

#' Read a Parquet file
#'
#' '[Parquet](https://parquet.apache.org/)' is a columnar storage file format.
#' This function enables you to read Parquet files into R.
#'
#' @inheritParams read_delim_arrow
#' @param props [ParquetReaderProperties]
#' @param ... Additional arguments passed to `ParquetFileReader$create()`
#'
#' @return A [arrow::Table][Table], or a `data.frame` if `as_data_frame` is
#' `TRUE`.
#' @examples
#' \donttest{
#' df <- read_parquet(system.file("v0.7.1.parquet", package="arrow"))
#' head(df)
#' }
#' @export
read_parquet <- function(file,
col_select = NULL,
as_data_frame = TRUE,
props = ParquetReaderProperties$create(),
...) {
reader <- ParquetFileReader$create(file, props = props, ...)
tab <- reader$ReadTable(!!enquo(col_select))

if (as_data_frame) {
tab <- as.data.frame(tab)
}
tab
}

#' @title ParquetFileReader class
#' @rdname ParquetFileReader
Expand Down

0 comments on commit 56dac33

Please sign in to comment.