diff --git a/cpp/src/arrow/dataset/file_parquet.cc b/cpp/src/arrow/dataset/file_parquet.cc index 937e7444acf66..1a68fe238f10e 100644 --- a/cpp/src/arrow/dataset/file_parquet.cc +++ b/cpp/src/arrow/dataset/file_parquet.cc @@ -90,18 +90,15 @@ static Result> OpenReader( } static parquet::ReaderProperties MakeReaderProperties( - const ParquetFileFormat& format, bool use_threads = false, - MemoryPool* pool = default_memory_pool()) { + const ParquetFileFormat& format, MemoryPool* pool = default_memory_pool()) { parquet::ReaderProperties properties(pool); - - properties.file_decryption_properties(format.reader_options.file_decryption_properties); - properties.set_buffer_size(format.reader_options.buffer_size); - if (use_threads) { - // When using a single reader from multiple threads, a buffered stream must be used to - // avoid clobbering the cursor of the underlying file. + if (format.reader_options.use_buffered_stream) { properties.enable_buffered_stream(); + } else { + properties.disable_buffered_stream(); } - + properties.set_buffer_size(format.reader_options.buffer_size); + properties.file_decryption_properties(format.reader_options.file_decryption_properties); return properties; } @@ -370,7 +367,7 @@ Result> ParquetFileFormat::Inspect( Result ParquetFileFormat::ScanFile( const FileSource& source, std::shared_ptr options, std::shared_ptr context) const { - auto properties = MakeReaderProperties(*this, options->use_threads, context->pool); + auto properties = MakeReaderProperties(*this, context->pool); ARROW_ASSIGN_OR_RAISE(auto reader, OpenReader(source, std::move(properties))); auto arrow_properties = MakeArrowReaderProperties(*this, options->batch_size, *reader); diff --git a/cpp/src/arrow/dataset/file_parquet.h b/cpp/src/arrow/dataset/file_parquet.h index d256e3b0a8aaa..471650d4959ad 100644 --- a/cpp/src/arrow/dataset/file_parquet.h +++ b/cpp/src/arrow/dataset/file_parquet.h @@ -50,6 +50,7 @@ class ARROW_DS_EXPORT ParquetFileFormat : public FileFormat { /// members of parquet::ReaderProperties. /// /// @{ + bool use_buffered_stream = parquet::DEFAULT_USE_BUFFERED_STREAM; int64_t buffer_size = parquet::DEFAULT_BUFFER_SIZE; std::shared_ptr file_decryption_properties; /// @} diff --git a/python/pyarrow/_dataset.pyx b/python/pyarrow/_dataset.pyx index f3546a08ade0f..eb02fbe59049c 100644 --- a/python/pyarrow/_dataset.pyx +++ b/python/pyarrow/_dataset.pyx @@ -80,12 +80,19 @@ cdef class ParquetFileFormatReaderOptions: def __init__(self, ParquetFileFormat fmt): self.options = &fmt.parquet_format.reader_options + @property + def use_buffered_stream(self): + """Read files through buffered input streams rather than + loading entire chunks at a time.""" + return self.options.use_buffered_stream + + @use_buffered_stream.setter + def use_buffered_stream(self, bint value): + self.options.use_buffered_stream = value + @property def buffer_size(self): - """During a threaded scan, files must be read through buffered input - streams to guard against collisions. By default their buffer size is - 1024 bytes. A larger buffer size may yield more rapid threaded scans - at the cost of greater memory overhead.""" + """Size of buffered stream, if enabled.""" return self.options.buffer_size @buffer_size.setter diff --git a/python/pyarrow/includes/libarrow_dataset.pxd b/python/pyarrow/includes/libarrow_dataset.pxd index 0a7b9e720da94..53d10590cf0ef 100644 --- a/python/pyarrow/includes/libarrow_dataset.pxd +++ b/python/pyarrow/includes/libarrow_dataset.pxd @@ -269,6 +269,7 @@ cdef extern from "arrow/dataset/api.h" namespace "arrow::dataset" nogil: cdef cppclass CParquetFileFormatReaderOptions \ "arrow::dataset::ParquetFileFormat::ReaderOptions": + c_bool use_buffered_stream int64_t buffer_size unordered_set[c_string] dict_columns diff --git a/r/R/arrowExports.R b/r/R/arrowExports.R index 84a5bc187cdd2..46c0b982ffa78 100644 --- a/r/R/arrowExports.R +++ b/r/R/arrowExports.R @@ -360,8 +360,8 @@ dataset___FileFormat__type_name <- function(format){ .Call(`_arrow_dataset___FileFormat__type_name` , format) } -dataset___ParquetFileFormat__Make <- function(buffer_size, dict_columns){ - .Call(`_arrow_dataset___ParquetFileFormat__Make` , buffer_size, dict_columns) +dataset___ParquetFileFormat__Make <- function(use_buffered_stream, buffer_size, dict_columns){ + .Call(`_arrow_dataset___ParquetFileFormat__Make` , use_buffered_stream, buffer_size, dict_columns) } dataset___IpcFileFormat__Make <- function(){ diff --git a/r/R/dataset.R b/r/R/dataset.R index 374ff13fa3780..3dfd79b23a551 100644 --- a/r/R/dataset.R +++ b/r/R/dataset.R @@ -356,10 +356,9 @@ FileSystemSourceFactory$create <- function(filesystem, #' the Arrow file format) #' * `...`: Additional format-specific options #' format="parquet": -#' * `buffer_size`: During a threaded scan, files must be read through buffered input -#' streams to guard against collisions. By default their buffer size -#' is 1024 bytes. A larger buffer size may yield more rapid threaded -#' scans at the cost of greater memory overhead. +#' * `use_buffered_stream`: Read files through buffered input streams rather than +#' loading entire chunks at a time. Disabled by default. +#' * `buffer_size`: Size of buffered stream, if enabled. Default is 1K. #' * `dict_columns`: Names of columns which should be read as dictionaries. #' #' It returns the appropriate subclass of `FileFormat` (e.g. `ParquetFileFormat`) @@ -400,9 +399,11 @@ FileFormat$create <- function(format, ...) { #' @rdname FileFormat #' @export ParquetFileFormat <- R6Class("ParquetFileFormat", inherit = FileFormat) -ParquetFileFormat$create <- function(buffer_size = 1024, +ParquetFileFormat$create <- function(use_buffered_stream = FALSE, + buffer_size = 1024, dict_columns = character(0)) { - shared_ptr(ParquetFileFormat, dataset___ParquetFileFormat__Make(buffer_size, dict_columns)) + shared_ptr(ParquetFileFormat, dataset___ParquetFileFormat__Make( + use_buffered_stream, buffer_size, dict_columns)) } #' @usage NULL diff --git a/r/src/arrowExports.cpp b/r/src/arrowExports.cpp index e4a1269bbf4da..9c714bb456cff 100644 --- a/r/src/arrowExports.cpp +++ b/r/src/arrowExports.cpp @@ -1425,16 +1425,17 @@ RcppExport SEXP _arrow_dataset___FileFormat__type_name(SEXP format_sexp){ // dataset.cpp #if defined(ARROW_R_WITH_ARROW) -std::shared_ptr dataset___ParquetFileFormat__Make(int64_t buffer_size, CharacterVector dict_columns); -RcppExport SEXP _arrow_dataset___ParquetFileFormat__Make(SEXP buffer_size_sexp, SEXP dict_columns_sexp){ +std::shared_ptr dataset___ParquetFileFormat__Make(bool use_buffered_stream, int64_t buffer_size, CharacterVector dict_columns); +RcppExport SEXP _arrow_dataset___ParquetFileFormat__Make(SEXP use_buffered_stream_sexp, SEXP buffer_size_sexp, SEXP dict_columns_sexp){ BEGIN_RCPP + Rcpp::traits::input_parameter::type use_buffered_stream(use_buffered_stream_sexp); Rcpp::traits::input_parameter::type buffer_size(buffer_size_sexp); Rcpp::traits::input_parameter::type dict_columns(dict_columns_sexp); - return Rcpp::wrap(dataset___ParquetFileFormat__Make(buffer_size, dict_columns)); + return Rcpp::wrap(dataset___ParquetFileFormat__Make(use_buffered_stream, buffer_size, dict_columns)); END_RCPP } #else -RcppExport SEXP _arrow_dataset___ParquetFileFormat__Make(SEXP buffer_size_sexp, SEXP dict_columns_sexp){ +RcppExport SEXP _arrow_dataset___ParquetFileFormat__Make(SEXP use_buffered_stream_sexp, SEXP buffer_size_sexp, SEXP dict_columns_sexp){ Rf_error("Cannot call dataset___ParquetFileFormat__Make(). Please use arrow::install_arrow() to install required runtime libraries. "); } #endif @@ -5901,7 +5902,7 @@ static const R_CallMethodDef CallEntries[] = { { "_arrow_dataset___FSSFactory__Make1", (DL_FUNC) &_arrow_dataset___FSSFactory__Make1, 3}, { "_arrow_dataset___FSSFactory__Make3", (DL_FUNC) &_arrow_dataset___FSSFactory__Make3, 4}, { "_arrow_dataset___FileFormat__type_name", (DL_FUNC) &_arrow_dataset___FileFormat__type_name, 1}, - { "_arrow_dataset___ParquetFileFormat__Make", (DL_FUNC) &_arrow_dataset___ParquetFileFormat__Make, 2}, + { "_arrow_dataset___ParquetFileFormat__Make", (DL_FUNC) &_arrow_dataset___ParquetFileFormat__Make, 3}, { "_arrow_dataset___IpcFileFormat__Make", (DL_FUNC) &_arrow_dataset___IpcFileFormat__Make, 0}, { "_arrow_dataset___SFactory__Finish1", (DL_FUNC) &_arrow_dataset___SFactory__Finish1, 1}, { "_arrow_dataset___SFactory__Finish2", (DL_FUNC) &_arrow_dataset___SFactory__Finish2, 2}, diff --git a/r/src/dataset.cpp b/r/src/dataset.cpp index 70e167be49206..7ef486c92dbfe 100644 --- a/r/src/dataset.cpp +++ b/r/src/dataset.cpp @@ -68,14 +68,16 @@ std::string dataset___FileFormat__type_name( // [[arrow::export]] std::shared_ptr dataset___ParquetFileFormat__Make( - int64_t buffer_size, CharacterVector dict_columns) { + bool use_buffered_stream, int64_t buffer_size, CharacterVector dict_columns) { auto fmt = ds::ParquetFileFormat::Make(); + fmt->reader_options.use_buffered_stream = use_buffered_stream; fmt->reader_options.buffer_size = buffer_size; - for (auto& name : Rcpp::as>(dict_columns)) { - fmt->reader_options.dict_columns.insert(std::move(name)); - } + auto dict_columns_vector = Rcpp::as>(dict_columns); + auto& d = fmt->reader_options.dict_columns; + std::move(dict_columns_vector.begin(), dict_columns_vector.end(), + std::inserter(d, d.end())); return fmt; }