Skip to content

Commit

Permalink
Expose options from ParquetWriterProperties and ParquetArrowWriterPro…
Browse files Browse the repository at this point in the history
…perties to write_parquet()
  • Loading branch information
romainfrancois committed Sep 27, 2019
1 parent 09ea0ad commit fa8990b
Show file tree
Hide file tree
Showing 10 changed files with 280 additions and 27 deletions.
3 changes: 1 addition & 2 deletions r/NAMESPACE
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,6 @@ S3method(type,default)
S3method(write_arrow,RecordBatchWriter)
S3method(write_arrow,character)
S3method(write_arrow,raw)
S3method(write_parquet,OutputStream)
S3method(write_parquet,character)
export(Array)
export(Buffer)
export(BufferOutputStream)
Expand Down Expand Up @@ -86,6 +84,7 @@ export(MessageType)
export(MockOutputStream)
export(ParquetFileReader)
export(ParquetReaderProperties)
export(ParquetVersionType)
export(RandomAccessFile)
export(ReadableFile)
export(RecordBatchFileReader)
Expand Down
1 change: 0 additions & 1 deletion r/R/arrow-package.R
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
#' @importFrom rlang list2 %||% is_false abort dots_n warn enquo quo_is_null enquos
#' @importFrom Rcpp sourceCpp
#' @importFrom tidyselect vars_select
#' @importFrom zeallot %<-%
#' @useDynLib arrow, .registration = TRUE
#' @keywords internal
"_PACKAGE"
Expand Down
20 changes: 20 additions & 0 deletions r/R/arrowExports.R

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 6 additions & 0 deletions r/R/enums.R
Original file line number Diff line number Diff line change
Expand Up @@ -84,3 +84,9 @@ CompressionType <- enum("Compression::type",
FileType <- enum("FileType",
NonExistent = 0L, Unknown = 1L, File = 2L, Directory = 3L
)

#' @export
#' @rdname enums
ParquetVersionType <- enum("ParquetVersionType",
PARQUET_1_0 = 0L, PARQUET_2_0 = 1L
)
126 changes: 104 additions & 22 deletions r/R/parquet.R
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,9 @@ ParquetArrowWriterProperties_Builder <- R6Class("ParquetArrowWriterProperties_Bu
},
set_allow_truncated_timestamps = function(allow_truncated_timestamps = FALSE) {
if (allow_truncated_timestamps) {
parquet___ArrowWriterProperties___Builder__allow_truncated_timestamps()
parquet___ArrowWriterProperties___Builder__allow_truncated_timestamps(self)
} else {
parquet___ArrowWriterProperties___Builder__disallow_truncated_timestamps()
parquet___ArrowWriterProperties___Builder__disallow_truncated_timestamps(self)
}

self
Expand All @@ -63,7 +63,7 @@ ParquetArrowWriterProperties$create <- function(use_deprecated_int96_timestamps
builder <- shared_ptr(ParquetArrowWriterProperties_Builder, parquet___ArrowWriterProperties___Builder__create())
builder$store_schema()
builder$set_int96_support(use_deprecated_int96_timestamps)
builder$coerce_timestamps(coerce_timestamps)
builder$set_coerce_timestamps(coerce_timestamps)
builder$set_allow_truncated_timestamps(allow_truncated_timestamps)
shared_ptr(ParquetArrowWriterProperties, parquet___ArrowWriterProperties___Builder__build(builder))
}
Expand All @@ -75,13 +75,69 @@ ParquetWriterProperties$default <- function() {

ParquetWriterProperties_Builder <- R6Class("ParquetWriterProperties_Builder", inherit = Object,
public = list(
set_version = function(version = NULL) {
if (!is.null(version)) {
if (identical(version, "1.0")) {
parquet___ArrowWriterProperties___Builder__version(self, ParquetVersionType$PARQUET_1_0)
} else if (identical(version, "2.0")) {
parquet___ArrowWriterProperties___Builder__version(self, ParquetVersionType$PARQUET_2_0)
} else {
abort("unknown parquet version")
}
}
},

set_compression = function(compression){
if (is.character(compression) && length(compression) == 1L) {
type <- CompressionType[[match.arg(toupper(compression), names(CompressionType))]]
parquet___ArrowWriterProperties___Builder__default_compression(self, type)
} else {
abort("compression specification not supported yet")
}
},

set_dictionary = function(use_dictionary) {
if (is.logical(use_dictionary) && length(use_dictionary) == 1L) {
parquet___ArrowWriterProperties___Builder__default_use_dictionary(self, isTRUE(use_dictionary))
} else {
abort("use_dictionary specification not supported yet")
}
},

set_write_statistics = function(write_statistics) {
if (is.logical(write_statistics) && length(write_statistics) == 1L) {
parquet___ArrowWriterProperties___Builder__default_write_statistics(self, isTRUE(write_statistics))
} else {
abort("write_statistics specification not supported yet")
}
},

set_data_page_size = function(data_page_size) {
parquet___ArrowWriterProperties___Builder__data_page_size(self, data_page_size)
}
)
)

ParquetWriterProperties$create <- function() {
builder <- shared_ptr(ParquetWriterProperties_Builder, parquet___WriterProperties___Builder__create())
shared_ptr(ParquetWriterProperties, parquet___WriterProperties___Builder__build(builder))
ParquetWriterProperties$create <- function(version = NULL, compression = NULL, use_dictionary = NULL, write_statistics = NULL, data_page_size = NULL) {
if (is.null(version) && is.null(compression) && is.null(use_dictionary) && is.null(write_statistics) && is.null(data_page_size)) {
ParquetWriterProperties$default()
} else {
builder <- shared_ptr(ParquetWriterProperties_Builder, parquet___WriterProperties___Builder__create())
builder$set_version(version)
if (!is.null(compression)) {
builder$set_compression(compression)
}
if (!is.null(use_dictionary)) {
builder$set_dictionary(use_dictionary)
}
if (!is.null(write_statistics)) {
builder$set_write_statistics(write_statistics)
}
if (!is.null(data_page_size)) {
builder$set_data_page_size(data_page_size)
}
shared_ptr(ParquetWriterProperties, parquet___WriterProperties___Builder__build(builder))
}
}

ParquetFileWriter <- R6Class("ParquetFileWriter", inherit = Object,
Expand Down Expand Up @@ -116,6 +172,22 @@ ParquetFileWriter$create <- function(
#' @param sink an [arrow::io::OutputStream][OutputStream] or a string which is interpreted as a file path
#' @param chunk_size chunk size. If NULL, the number of rows of the table is used
#'
#' @param version parquet version
#' @param compression compression name
#' @param use_dictionary Specify if we should use dictionary encoding
#' @param write_statistics Specify if we should write statistics
#' @param data_page_size Set a target threshhold for the approximate encoded size of data
#' pages within a column chunk. If None, use the default data page size (1Mb) is used.
#' @param properties properties for parquet writer, derived from arguments `version`, `compression`, `use_dictionary`, `write_statistics` and `data_page_size`
#'
#' @param use_deprecated_int96_timestamps Write timestamps to INT96 Parquet format
#' @param coerce_timestamps Cast timestamps a particular resolution. can be NULL, "ms" or "us"
#' @param allow_truncated_timestamps Allow loss of data when coercing timestamps to a particular
#' resolution. E.g. if microsecond or nanosecond data is lost when coercing to
#' ms', do not raise an exception
#'
#' @param arrow_properties arrow specific writer properties, derived from arguments `use_deprecated_int96_timestamps`, `coerce_timestamps` and `allow_truncated_timestamps`
#'
#' @examples
#' \donttest{
#' tf1 <- tempfile(fileext = ".parquet")
Expand All @@ -127,30 +199,40 @@ ParquetFileWriter$create <- function(
#'
#' }
#' @export
write_parquet <- function(table, sink, chunk_size = NULL) {
UseMethod("write_parquet", sink)
}
write_parquet <- function(
table,
sink, chunk_size = NULL,
version = NULL, compression = NULL, use_dictionary = NULL, write_statistics = NULL, data_page_size = NULL,
properties = ParquetWriterProperties$create(
version = version,
compression = compression,
use_dictionary = use_dictionary,
write_statistics = write_statistics,
data_page_size = data_page_size
),

#' @export
write_parquet.OutputStream <- function(table, sink, chunk_size = NULL) {
use_deprecated_int96_timestamps = FALSE, coerce_timestamps = NULL, allow_truncated_timestamps = FALSE,
arrow_properties = ParquetArrowWriterProperties$create(
use_deprecated_int96_timestamps = use_deprecated_int96_timestamps,
coerce_timestamps = coerce_timestamps,
allow_truncated_timestamps = allow_truncated_timestamps
)
) {
table <- to_arrow(table)

if (is.character(sink)) {
sink <- FileOutputStream$create(sink)
on.exit(sink$close())
} else if (!inherits(sink, OutputStream)) {
abort("sink must be a file path or an OutputStream")
}

schema <- table$schema
properties <- ParquetWriterProperties$default()
arrow_properties <- ParquetArrowWriterProperties$default()
writer <- ParquetFileWriter$create(schema, sink, properties = properties, arrow_properties = arrow_properties)
writer$WriteTable(table, chunk_size = chunk_size %||% table$num_rows)
writer$Close()
}

#' @export
write_parquet.character <- function(table, sink, chunk_size = NULL) {
table <- to_arrow(table)
file_sink <- FileOutputStream$create(sink)
on.exit(file_sink$close())

write_parquet(table, sink = file_sink, chunk_size = chunk_size)
}

#' Read a Parquet file
#'
#' '[Parquet](https://parquet.apache.org/)' is a columnar storage file format.
Expand Down
3 changes: 3 additions & 0 deletions r/man/enums.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

23 changes: 21 additions & 2 deletions r/man/write_parquet.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

90 changes: 90 additions & 0 deletions r/src/arrowExports.cpp

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions r/src/arrow_types.h
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,7 @@ RCPP_EXPOSED_ENUM_NODECL(arrow::io::FileMode::type)
RCPP_EXPOSED_ENUM_NODECL(arrow::ipc::Message::Type)
RCPP_EXPOSED_ENUM_NODECL(arrow::Compression::type)
RCPP_EXPOSED_ENUM_NODECL(arrow::fs::FileType)
RCPP_EXPOSED_ENUM_NODECL(parquet::ParquetVersion::type)

SEXP ChunkedArray__as_vector(const std::shared_ptr<arrow::ChunkedArray>& chunked_array);
SEXP Array__as_vector(const std::shared_ptr<arrow::Array>& array);
Expand Down
Loading

0 comments on commit fa8990b

Please sign in to comment.