Skip to content

Commit

Permalink
port reader_options changes to R
Browse files Browse the repository at this point in the history
  • Loading branch information
bkietz committed Feb 23, 2020
1 parent 55bdce4 commit 2b4a37e
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 14 deletions.
30 changes: 18 additions & 12 deletions r/src/dataset.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#if defined(ARROW_R_WITH_ARROW)

using Rcpp::List_;
using Rcpp::String;

// [[arrow::export]]
std::shared_ptr<ds::SourceFactory> dataset___FSSFactory__Make2(
Expand Down Expand Up @@ -68,18 +69,23 @@ std::string dataset___FileFormat__type_name(
// [[arrow::export]]
std::shared_ptr<ds::ParquetFileFormat> dataset___ParquetFileFormat__Make(List_ options) {
auto fmt = ds::ParquetFileFormat::Make();
if (options.containsElementNamed("use_buffered_stream")) {
fmt->use_buffered_stream = options["use_buffered_stream"];
}
if (options.containsElementNamed("buffer_size")) {
fmt->buffer_size = options["buffer_size"];
}
if (options.containsElementNamed("read_dict_indices")) {
List_ indices = options["read_dict_indices"];
fmt->read_dict_indices.insert(indices.begin(), indices.end());
}
if (options.containsElementNamed("batch_size")) {
fmt->batch_size = options["batch_size"];
if (options.containsElementNamed("reader_options")) {
List_ reader_options = options["reader_options"];
if (reader_options.containsElementNamed("use_buffered_stream")) {
fmt->reader_options.use_buffered_stream = reader_options["use_buffered_stream"];
}
if (reader_options.containsElementNamed("buffer_size")) {
fmt->reader_options.buffer_size = reader_options["buffer_size"];
}
if (reader_options.containsElementNamed("dict_columns")) {
List_ dict_columns = reader_options["dict_columns"];
for (String name : dict_columns) {
fmt->reader_options.dict_columns.insert(name);
}
}
if (reader_options.containsElementNamed("batch_size")) {
fmt->reader_options.batch_size = reader_options["batch_size"];
}
}
return fmt;
}
Expand Down
5 changes: 3 additions & 2 deletions r/tests/testthat/test-dataset.R
Original file line number Diff line number Diff line change
Expand Up @@ -98,9 +98,10 @@ test_that("Simple interface for datasets", {
})

test_that("Simple interface for datasets (custom ParquetFileFormat)", {
reader_options = list(dict_indices = 0, use_buffered_stream = TRUE)
ds <- open_dataset(dataset_dir, partitioning = schema(part = uint8()),
format = FileFormat$create("parquet", read_dict_indices = 0,
use_buffered_stream = TRUE))
format = FileFormat$create("parquet",
reader_options=reader_options))
expect_is(ds, "Dataset")
expect_equivalent(
ds %>%
Expand Down

0 comments on commit 2b4a37e

Please sign in to comment.