Skip to content

Commit

Permalink
automatically configure use_buffered_stream for threaded scans
Browse files Browse the repository at this point in the history
  • Loading branch information
bkietz committed Feb 20, 2020
1 parent 50dfadb commit ed53f97
Show file tree
Hide file tree
Showing 8 changed files with 31 additions and 41 deletions.
17 changes: 10 additions & 7 deletions cpp/src/arrow/dataset/file_parquet.cc
Original file line number Diff line number Diff line change
Expand Up @@ -90,15 +90,18 @@ static Result<std::unique_ptr<parquet::ParquetFileReader>> OpenReader(
}

static parquet::ReaderProperties MakeReaderProperties(
const ParquetFileFormat& format, MemoryPool* pool = default_memory_pool()) {
const ParquetFileFormat& format, bool use_threads = false,
MemoryPool* pool = default_memory_pool()) {
parquet::ReaderProperties properties(pool);
if (format.reader_options.use_buffered_stream) {

properties.file_decryption_properties(format.reader_options.file_decryption_properties);
properties.set_buffer_size(format.reader_options.buffer_size);
if (use_threads) {
// When using a single reader from multiple threads, a buffered stream must be used to
// avoid clobbering the cursor of the underlying file.
properties.enable_buffered_stream();
} else {
properties.disable_buffered_stream();
}
properties.set_buffer_size(format.reader_options.buffer_size);
properties.file_decryption_properties(format.reader_options.file_decryption_properties);

return properties;
}

Expand Down Expand Up @@ -367,7 +370,7 @@ Result<std::shared_ptr<Schema>> ParquetFileFormat::Inspect(
Result<ScanTaskIterator> ParquetFileFormat::ScanFile(
const FileSource& source, std::shared_ptr<ScanOptions> options,
std::shared_ptr<ScanContext> context) const {
auto properties = MakeReaderProperties(*this, context->pool);
auto properties = MakeReaderProperties(*this, options->use_threads, context->pool);
ARROW_ASSIGN_OR_RAISE(auto reader, OpenReader(source, std::move(properties)));

auto arrow_properties = MakeArrowReaderProperties(*this, options->batch_size, *reader);
Expand Down
1 change: 0 additions & 1 deletion cpp/src/arrow/dataset/file_parquet.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@ class ARROW_DS_EXPORT ParquetFileFormat : public FileFormat {
/// members of parquet::ReaderProperties.
///
/// @{
bool use_buffered_stream = parquet::DEFAULT_USE_BUFFERED_STREAM;
int64_t buffer_size = parquet::DEFAULT_BUFFER_SIZE;
std::shared_ptr<parquet::FileDecryptionProperties> file_decryption_properties;
/// @}
Expand Down
15 changes: 4 additions & 11 deletions python/pyarrow/_dataset.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -80,19 +80,12 @@ cdef class ParquetFileFormatReaderOptions:
def __init__(self, ParquetFileFormat fmt):
self.options = &fmt.parquet_format.reader_options

@property
def use_buffered_stream(self):
"""Read files through buffered input streams rather than
loading entire chunks at a time."""
return self.options.use_buffered_stream

@use_buffered_stream.setter
def use_buffered_stream(self, bint value):
self.options.use_buffered_stream = value

@property
def buffer_size(self):
"""Size of buffered stream, if enabled."""
"""During a threaded scan, files must be read through buffered input
streams to guard against collisions. By default their buffer size is
1024 bytes. A larger buffer size may yield more rapid threaded scans
at the cost of greater memory overhead."""
return self.options.buffer_size

@buffer_size.setter
Expand Down
1 change: 0 additions & 1 deletion python/pyarrow/includes/libarrow_dataset.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -268,7 +268,6 @@ cdef extern from "arrow/dataset/api.h" namespace "arrow::dataset" nogil:

cdef cppclass CParquetFileFormatReaderOptions \
"arrow::dataset::ParquetFileFormat::ReaderOptions":
c_bool use_buffered_stream
int64_t buffer_size
unordered_set[c_string] dict_columns

Expand Down
4 changes: 2 additions & 2 deletions r/R/arrowExports.R

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

13 changes: 6 additions & 7 deletions r/R/dataset.R
Original file line number Diff line number Diff line change
Expand Up @@ -319,9 +319,10 @@ FileSystemSourceFactory$create <- function(filesystem,
#' the Arrow file format)
#' * `...`: Additional format-specific options
#' format="parquet":
#' * `use_buffered_stream`: Read files through buffered input streams rather than
#' loading entire chunks at a time. Disabled by default.
#' * `buffer_size`: Size of buffered stream, if enabled. Default is 1K.
#' * `buffer_size`: During a threaded scan, files must be read through buffered input
#' streams to guard against collisions. By default their buffer size
#' is 1024 bytes. A larger buffer size may yield more rapid threaded
#' scans at the cost of greater memory overhead.
#' * `dict_columns`: Names of columns which should be read as dictionaries.
#'
#' It returns the appropriate subclass of `FileFormat` (e.g. `ParquetFileFormat`)
Expand All @@ -344,11 +345,9 @@ FileFormat$create <- function(format, ...) {
#' @rdname FileFormat
#' @export
ParquetFileFormat <- R6Class("ParquetFileFormat", inherit = FileFormat)
ParquetFileFormat$create <- function(use_buffered_stream = FALSE,
buffer_size = 1024,
ParquetFileFormat$create <- function(buffer_size = 1024,
dict_columns = character(0)) {
shared_ptr(ParquetFileFormat, dataset___ParquetFileFormat__Make(
use_buffered_stream, buffer_size, dict_columns))
shared_ptr(ParquetFileFormat, dataset___ParquetFileFormat__Make(buffer_size, dict_columns))
}

#' @usage NULL
Expand Down
11 changes: 5 additions & 6 deletions r/src/arrowExports.cpp

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 4 additions & 6 deletions r/src/dataset.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -62,16 +62,14 @@ std::shared_ptr<ds::SourceFactory> dataset___FSSFactory__Make3(

// [[arrow::export]]
std::shared_ptr<ds::ParquetFileFormat> dataset___ParquetFileFormat__Make(
bool use_buffered_stream, int64_t buffer_size, CharacterVector dict_columns) {
int64_t buffer_size, CharacterVector dict_columns) {
auto fmt = ds::ParquetFileFormat::Make();

fmt->reader_options.use_buffered_stream = use_buffered_stream;
fmt->reader_options.buffer_size = buffer_size;

auto dict_columns_vector = Rcpp::as<std::vector<std::string>>(dict_columns);
auto& d = fmt->reader_options.dict_columns;
std::move(dict_columns_vector.begin(), dict_columns_vector.end(),
std::inserter(d, d.end()));
for (auto& name : Rcpp::as<std::vector<std::string>>(dict_columns)) {
fmt->reader_options.dict_columns.insert(std::move(name));
}

return fmt;
}
Expand Down

0 comments on commit ed53f97

Please sign in to comment.