diff --git a/cpp/CMakeLists.txt b/cpp/CMakeLists.txt
index 63b8ed80915ee..d9291aeadf8d2 100644
--- a/cpp/CMakeLists.txt
+++ b/cpp/CMakeLists.txt
@@ -805,10 +805,13 @@ if(NOT WIN32 AND ARROW_PLASMA)
add_subdirectory(src/plasma)
endif()
+if(ARROW_PARQUET)
+ add_definitions(-DARROW_PARQUET)
+endif()
+
add_subdirectory(src/arrow)
if(ARROW_PARQUET)
- add_definitions(-DARROW_PARQUET)
add_subdirectory(src/parquet)
add_subdirectory(tools/parquet)
if(PARQUET_BUILD_EXAMPLES)
diff --git a/cpp/src/arrow/dataset/dataset.cc b/cpp/src/arrow/dataset/dataset.cc
index 666a2e1d88f86..597c0a256c098 100644
--- a/cpp/src/arrow/dataset/dataset.cc
+++ b/cpp/src/arrow/dataset/dataset.cc
@@ -23,6 +23,7 @@
#include "arrow/dataset/dataset_internal.h"
#include "arrow/dataset/filter.h"
#include "arrow/dataset/scanner.h"
+#include "arrow/util/bit_util.h"
#include "arrow/util/iterator.h"
#include "arrow/util/make_unique.h"
@@ -125,20 +126,29 @@ InMemorySource::InMemorySource(std::shared_ptr
table)
FragmentIterator InMemorySource::GetFragmentsImpl(
std::shared_ptr scan_options) {
- auto create_batch =
- [scan_options](std::shared_ptr batch) -> std::shared_ptr {
+ auto schema = this->schema();
+
+ auto create_fragment =
+ [scan_options,
+ schema](std::shared_ptr batch) -> Result> {
+ if (!batch->schema()->Equals(schema)) {
+ return Status::TypeError("yielded batch had schema ", *batch->schema(),
+ " which did not match InMemorySource's: ", *schema);
+ }
+
std::vector> batches;
- while (batch->num_rows() > scan_options->batch_size) {
- batches.push_back(batch->Slice(0, scan_options->batch_size));
- batch = batch->Slice(scan_options->batch_size);
+ auto batch_size = scan_options->batch_size;
+ auto n_batches = BitUtil::CeilDiv(batch->num_rows(), batch_size);
+
+ for (int i = 0; i < n_batches; i++) {
+ batches.push_back(batch->Slice(batch_size * i, batch_size));
}
- batches.push_back(std::move(batch));
return std::make_shared(std::move(batches), scan_options);
};
- return MakeMapIterator(std::move(create_batch), get_batches_());
+ return MakeMaybeMapIterator(std::move(create_fragment), get_batches_());
}
FragmentIterator TreeSource::GetFragmentsImpl(std::shared_ptr options) {
diff --git a/cpp/src/arrow/dataset/dataset.h b/cpp/src/arrow/dataset/dataset.h
index 6b8fabe623ec6..e4f155be754c2 100644
--- a/cpp/src/arrow/dataset/dataset.h
+++ b/cpp/src/arrow/dataset/dataset.h
@@ -128,7 +128,9 @@ class ARROW_DS_EXPORT Source {
std::shared_ptr partition_expression_;
};
-/// \brief A Source consisting of a flat sequence of Fragments
+/// \brief A Source which yields fragments wrapping a stream of record batches.
+///
+/// The record batches must match the schema provided to the source at construction.
class ARROW_DS_EXPORT InMemorySource : public Source {
public:
using RecordBatchGenerator = std::function;
diff --git a/cpp/src/arrow/dataset/file_parquet.cc b/cpp/src/arrow/dataset/file_parquet.cc
index 1a68fe238f10e..9ca978fb43de3 100644
--- a/cpp/src/arrow/dataset/file_parquet.cc
+++ b/cpp/src/arrow/dataset/file_parquet.cc
@@ -31,6 +31,7 @@
#include "parquet/arrow/reader.h"
#include "parquet/arrow/schema.h"
#include "parquet/file_reader.h"
+#include "parquet/properties.h"
#include "parquet/statistics.h"
namespace arrow {
diff --git a/cpp/src/arrow/dataset/file_parquet.h b/cpp/src/arrow/dataset/file_parquet.h
index 471650d4959ad..0373859545c10 100644
--- a/cpp/src/arrow/dataset/file_parquet.h
+++ b/cpp/src/arrow/dataset/file_parquet.h
@@ -25,12 +25,14 @@
#include "arrow/dataset/file_base.h"
#include "arrow/dataset/type_fwd.h"
#include "arrow/dataset/visibility.h"
-#include "parquet/properties.h"
namespace parquet {
class ParquetFileReader;
class RowGroupMetaData;
class FileMetaData;
+class FileDecryptionProperties;
+class ReaderProperties;
+class ArrowReaderProperties;
} // namespace parquet
namespace arrow {
@@ -49,15 +51,23 @@ class ARROW_DS_EXPORT ParquetFileFormat : public FileFormat {
/// \defgroup parquet-file-format-reader-properties properties which correspond to
/// members of parquet::ReaderProperties.
///
+ /// We don't embed parquet::ReaderProperties directly because we get memory_pool from
+ /// ScanContext at scan time and provide differing defaults.
+ ///
/// @{
- bool use_buffered_stream = parquet::DEFAULT_USE_BUFFERED_STREAM;
- int64_t buffer_size = parquet::DEFAULT_BUFFER_SIZE;
+ bool use_buffered_stream = false;
+ int64_t buffer_size = 1 << 13;
std::shared_ptr file_decryption_properties;
/// @}
/// \defgroup parquet-file-format-arrow-reader-properties properties which correspond
/// to members of parquet::ArrowReaderProperties.
///
+ /// We don't embed parquet::ReaderProperties directly because we get batch_size from
+ /// ScanOptions at scan time, and we will never pass use_threads == true (since we
+ /// defer parallelization of the scan). Additionally column names (rather than
+ /// indices) are used to indicate dictionary columns.
+ ///
/// @{
std::unordered_set dict_columns;
/// @}
diff --git a/cpp/src/arrow/dataset/scanner.cc b/cpp/src/arrow/dataset/scanner.cc
index fc2df07d3f267..cd564494056c5 100644
--- a/cpp/src/arrow/dataset/scanner.cc
+++ b/cpp/src/arrow/dataset/scanner.cc
@@ -134,6 +134,9 @@ Status ScannerBuilder::UseThreads(bool use_threads) {
}
Status ScannerBuilder::BatchSize(int64_t batch_size) {
+ if (batch_size <= 0) {
+ return Status::Invalid("BatchSize must be greater than 0, got ", batch_size);
+ }
options_->batch_size = batch_size;
return Status::OK();
}
diff --git a/cpp/src/arrow/dataset/scanner.h b/cpp/src/arrow/dataset/scanner.h
index 664605e49f625..389de1690db0b 100644
--- a/cpp/src/arrow/dataset/scanner.h
+++ b/cpp/src/arrow/dataset/scanner.h
@@ -75,7 +75,7 @@ class ARROW_DS_EXPORT ScanOptions {
RecordBatchProjector projector;
// Maximum row count for scanned batches.
- int64_t batch_size = 64 << 10;
+ int64_t batch_size = 1 << 15;
// Return a vector of fields that requires materialization.
//
@@ -215,7 +215,12 @@ class ARROW_DS_EXPORT ScannerBuilder {
/// ThreadPool found in ScanContext;
Status UseThreads(bool use_threads = true);
- /// \brief Set the maximum row count for scanned batches
+ /// \brief Set the maximum number of rows per RecordBatch.
+ ///
+ /// \param[in] batch_size the maximum number of rows.
+ /// \returns An error if the number for batch is not greater than 0.
+ ///
+ /// This option provides a control limiting the memory owned by any RecordBatch.
Status BatchSize(int64_t batch_size);
/// \brief Return the constructed now-immutable Scanner object
diff --git a/python/pyarrow/_dataset.pyx b/python/pyarrow/_dataset.pyx
index da107cb29b84d..f1244d13f4f67 100644
--- a/python/pyarrow/_dataset.pyx
+++ b/python/pyarrow/_dataset.pyx
@@ -93,7 +93,7 @@ cdef class ParquetFileFormatReaderOptions:
@property
def buffer_size(self):
- """Size of buffered stream, if enabled. Default is 1024 bytes."""
+ """Size of buffered stream, if enabled. Default is 8KB."""
return self.options.buffer_size
@buffer_size.setter
@@ -890,7 +890,7 @@ cdef class Dataset:
return scanner.scan()
def to_batches(self, columns=None, filter=None,
- batch_size=64*2**10, MemoryPool memory_pool=None):
+ batch_size=32*2**10, MemoryPool memory_pool=None):
"""Read the dataset as materialized record batches.
Builds a scan operation against the dataset and sequentially executes
@@ -912,8 +912,10 @@ cdef class Dataset:
partition information or internal metadata found in the data
source, e.g. Parquet statistics. Otherwise filters the loaded
RecordBatches before yielding them.
- batch_size : int, default 64*2**10
- The maximum row count for scanned record batches.
+ batch_size : int, default 32K
+ The maximum row count for scanned record batches. If scanned
+ record batches are overflowing memory then this method can be
+ called to reduce their size.
memory_pool : MemoryPool, default None
For memory allocations, if required. If not specified, uses the
default pool.
@@ -929,7 +931,7 @@ cdef class Dataset:
yield batch
def to_table(self, columns=None, filter=None, use_threads=True,
- batch_size=64*2**10, MemoryPool memory_pool=None):
+ batch_size=32*2**10, MemoryPool memory_pool=None):
"""Read the dataset to an arrow table.
Note that this method reads all the selected data from the dataset
@@ -954,8 +956,10 @@ cdef class Dataset:
use_threads : boolean, default True
If enabled, then maximum paralellism will be used determined by
the number of available CPU cores.
- batch_size : int, default 64*2**10
- The maximum row count for scanned record batches.
+ batch_size : int, default 32K
+ The maximum row count for scanned record batches. If scanned
+ record batches are overflowing memory then this method can be
+ called to reduce their size.
memory_pool : MemoryPool, default None
For memory allocations, if required. If not specified, uses the
default pool.
@@ -1060,8 +1064,10 @@ cdef class Scanner:
use_threads : boolean, default True
If enabled, then maximum paralellism will be used determined by
the number of available CPU cores.
- batch_size : int, default 64*2**10
- The maximum row count for scanned record batches.
+ batch_size : int, default 32K
+ The maximum row count for scanned record batches. If scanned
+ record batches are overflowing memory then this method can be
+ called to reduce their size.
memory_pool : MemoryPool, default None
For memory allocations, if required. If not specified, uses the
default pool.
@@ -1073,7 +1079,7 @@ cdef class Scanner:
def __init__(self, Dataset dataset, list columns=None,
Expression filter=None, bint use_threads=True,
- int batch_size=64*2**10, MemoryPool memory_pool=None):
+ int batch_size=32*2**10, MemoryPool memory_pool=None):
cdef:
shared_ptr[CScanContext] context
shared_ptr[CScannerBuilder] builder
diff --git a/r/R/dataset.R b/r/R/dataset.R
index 377a98527007e..078f48246ecc0 100644
--- a/r/R/dataset.R
+++ b/r/R/dataset.R
@@ -359,7 +359,7 @@ FileSystemSourceFactory$create <- function(filesystem,
#' * `use_buffered_stream`: Read files through buffered input streams rather than
#' loading entire row groups at once. This may be enabled
#' to reduce memory overhead. Disabled by default.
-#' * `buffer_size`: Size of buffered stream, if enabled. Default is 1024 bytes.
+#' * `buffer_size`: Size of buffered stream, if enabled. Default is 8KB.
#' * `dict_columns`: Names of columns which should be read as dictionaries.
#'
#' It returns the appropriate subclass of `FileFormat` (e.g. `ParquetFileFormat`)
@@ -401,7 +401,7 @@ FileFormat$create <- function(format, ...) {
#' @export
ParquetFileFormat <- R6Class("ParquetFileFormat", inherit = FileFormat)
ParquetFileFormat$create <- function(use_buffered_stream = FALSE,
- buffer_size = 1024,
+ buffer_size = 8196,
dict_columns = character(0)) {
shared_ptr(ParquetFileFormat, dataset___ParquetFileFormat__Make(
use_buffered_stream, buffer_size, dict_columns))
@@ -430,7 +430,7 @@ IpcFileFormat <- R6Class("IpcFileFormat", inherit = FileFormat)
#' The method's default input is `TRUE`, but you must call the method to enable
#' multithreading because the scanner default is `FALSE`.
#' - `$BatchSize(batch_size)`: integer: Maximum row count of scanned record
-#' batches, default is 64K. If scanned record batches are overflowing memory
+#' batches, default is 32K. If scanned record batches are overflowing memory
#' then this method can be called to reduce their size.
#' - `$schema`: Active binding, returns the [Schema] of the Dataset
#' - `$Finish()`: Returns a `Scanner`