Skip to content

Commit

Permalink
+ compression_level= in write_parquet()
Browse files Browse the repository at this point in the history
  • Loading branch information
romainfrancois committed Sep 27, 2019
1 parent b8337e1 commit 2dd2cb9
Show file tree
Hide file tree
Showing 6 changed files with 79 additions and 4 deletions.
8 changes: 8 additions & 0 deletions r/R/arrowExports.R

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 6 additions & 1 deletion r/R/compression.R
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,12 @@
#' @rdname Codec
#' @name Codec
#' @export
Codec <- R6Class("Codec", inherit = Object)
Codec <- R6Class("Codec", inherit = Object,
active = list(
type = function() util___Codec__name(self),
level = function() abort("Codec$level() no yet implemented")
)
)
Codec$create <- function(type = "gzip", compression_level = NA) {
if (is.character(type)) {
type <- unique_ptr(Codec, util___Codec__Create(
Expand Down
22 changes: 19 additions & 3 deletions r/R/parquet.R
Original file line number Diff line number Diff line change
Expand Up @@ -91,11 +91,22 @@ ParquetWriterProperties_Builder <- R6Class("ParquetWriterProperties_Builder", in
if (is.character(compression) && length(compression) == 1L) {
type <- CompressionType[[match.arg(toupper(compression), names(CompressionType))]]
parquet___ArrowWriterProperties___Builder__default_compression(self, type)
} else if(inherits(compression, "Codec")) {
# TODO: Codec does not give a way to access its compression level, e.g. compression$level
parquet___ArrowWriterProperties___Builder__default_compression(self, compression$name)
} else {
abort("compression specification not supported yet")
}
},

set_compression_level = function(compression_level){
if (rlang::is_integerish(compression_level) && length(compression_level) == 1L) {
parquet___ArrowWriterProperties___Builder__default_compression_level(self, compression_level)
} else {
abort("compression_level specification not supported yet")
}
},

set_dictionary = function(use_dictionary) {
if (is.logical(use_dictionary) && length(use_dictionary) == 1L) {
parquet___ArrowWriterProperties___Builder__default_use_dictionary(self, isTRUE(use_dictionary))
Expand All @@ -118,15 +129,18 @@ ParquetWriterProperties_Builder <- R6Class("ParquetWriterProperties_Builder", in
)
)

ParquetWriterProperties$create <- function(version = NULL, compression = NULL, use_dictionary = NULL, write_statistics = NULL, data_page_size = NULL) {
if (is.null(version) && is.null(compression) && is.null(use_dictionary) && is.null(write_statistics) && is.null(data_page_size)) {
ParquetWriterProperties$create <- function(version = NULL, compression = NULL, compression_level = NULL, use_dictionary = NULL, write_statistics = NULL, data_page_size = NULL) {
if (is.null(version) && is.null(compression) && is.null(compression_level) && is.null(use_dictionary) && is.null(write_statistics) && is.null(data_page_size)) {
ParquetWriterProperties$default()
} else {
builder <- shared_ptr(ParquetWriterProperties_Builder, parquet___WriterProperties___Builder__create())
builder$set_version(version)
if (!is.null(compression)) {
builder$set_compression(compression)
}
if (!is.null(compression_level)) {
builder$set_compression_level(compression_level)
}
if (!is.null(use_dictionary)) {
builder$set_dictionary(use_dictionary)
}
Expand Down Expand Up @@ -174,6 +188,7 @@ ParquetFileWriter$create <- function(
#'
#' @param version parquet version
#' @param compression compression name
#' @param compression_level compression level
#' @param use_dictionary Specify if we should use dictionary encoding
#' @param write_statistics Specify if we should write statistics
#' @param data_page_size Set a target threshhold for the approximate encoded size of data
Expand Down Expand Up @@ -202,10 +217,11 @@ ParquetFileWriter$create <- function(
write_parquet <- function(
table,
sink, chunk_size = NULL,
version = NULL, compression = NULL, use_dictionary = NULL, write_statistics = NULL, data_page_size = NULL,
version = NULL, compression = NULL, compression_level = NULL, use_dictionary = NULL, write_statistics = NULL, data_page_size = NULL,
properties = ParquetWriterProperties$create(
version = version,
compression = compression,
compression_level = compression_level,
use_dictionary = use_dictionary,
write_statistics = write_statistics,
data_page_size = data_page_size
Expand Down
34 changes: 34 additions & 0 deletions r/src/arrowExports.cpp

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 5 additions & 0 deletions r/src/compression.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,11 @@ std::unique_ptr<arrow::util::Codec> util___Codec__Create(arrow::Compression::typ
return out;
}

// [[arrow::export]]
std::string util___Codec__name(const std::unique_ptr<arrow::util::Codec>& codec) {
return codec->name();
}

// [[arrow::export]]
std::shared_ptr<arrow::io::CompressedOutputStream> io___CompressedOutputStream__Make(
const std::unique_ptr<arrow::util::Codec>& codec,
Expand Down
7 changes: 7 additions & 0 deletions r/src/parquet.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,13 @@ void parquet___ArrowWriterProperties___Builder__default_compression(
builder->compression(compression);
}

// [[arrow::export]]
void parquet___ArrowWriterProperties___Builder__default_compression_level(
const std::shared_ptr<parquet::WriterProperties::Builder>& builder,
int compression_level) {
builder->compression_level(compression_level);
}

// [[arrow::export]]
void parquet___ArrowWriterProperties___Builder__default_write_statistics(
const std::shared_ptr<parquet::WriterProperties::Builder>& builder,
Expand Down

0 comments on commit 2dd2cb9

Please sign in to comment.