Skip to content

Commit

Permalink
move batch_size to ScanOptions
Browse files Browse the repository at this point in the history
  • Loading branch information
bkietz committed Feb 23, 2020
1 parent 2b4a37e commit d90fd19
Show file tree
Hide file tree
Showing 7 changed files with 17 additions and 18 deletions.
10 changes: 6 additions & 4 deletions cpp/src/arrow/dataset/file_parquet.cc
Original file line number Diff line number Diff line change
Expand Up @@ -103,13 +103,14 @@ static parquet::ReaderProperties MakeReaderProperties(
}

static parquet::ArrowReaderProperties MakeArrowReaderProperties(
const ParquetFileFormat& format, const parquet::ParquetFileReader& reader) {
const ParquetFileFormat& format, int64_t batch_size,
const parquet::ParquetFileReader& reader) {
parquet::ArrowReaderProperties properties(/* use_threads = */ false);
for (const std::string& name : format.reader_options.dict_columns) {
auto column_index = reader.metadata()->schema()->ColumnIndex(name);
properties.set_read_dictionary(column_index, true);
}
properties.set_batch_size(format.reader_options.batch_size);
properties.set_batch_size(batch_size);
return properties;
}

Expand Down Expand Up @@ -351,7 +352,8 @@ Result<std::shared_ptr<Schema>> ParquetFileFormat::Inspect(
auto properties = MakeReaderProperties(*this);
ARROW_ASSIGN_OR_RAISE(auto reader, OpenReader(source, std::move(properties)));

auto arrow_properties = MakeArrowReaderProperties(*this, *reader);
auto arrow_properties =
MakeArrowReaderProperties(*this, parquet::kArrowDefaultBatchSize, *reader);
std::unique_ptr<parquet::arrow::FileReader> arrow_reader;
RETURN_NOT_OK(parquet::arrow::FileReader::Make(default_memory_pool(), std::move(reader),
std::move(arrow_properties),
Expand All @@ -368,7 +370,7 @@ Result<ScanTaskIterator> ParquetFileFormat::ScanFile(
auto properties = MakeReaderProperties(*this, context->pool);
ARROW_ASSIGN_OR_RAISE(auto reader, OpenReader(source, std::move(properties)));

auto arrow_properties = MakeArrowReaderProperties(*this, *reader);
auto arrow_properties = MakeArrowReaderProperties(*this, options->batch_size, *reader);
return ParquetScanTaskIterator::Make(options, context, std::move(reader),
std::move(arrow_properties));
}
Expand Down
1 change: 0 additions & 1 deletion cpp/src/arrow/dataset/file_parquet.h
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,6 @@ class ARROW_DS_EXPORT ParquetFileFormat : public FileFormat {
///
/// @{
std::unordered_set<std::string> dict_columns;
int64_t batch_size = parquet::kArrowDefaultBatchSize;
/// @}
} reader_options;

Expand Down
3 changes: 3 additions & 0 deletions cpp/src/arrow/dataset/scanner.h
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,9 @@ class ARROW_DS_EXPORT ScanOptions {
// sub-selection optimization.
std::vector<std::string> MaterializedFields() const;

// Maximum row count for scanned batches.
int64_t batch_size = 64 << 10;

private:
explicit ScanOptions(std::shared_ptr<Schema> schema);
};
Expand Down
8 changes: 8 additions & 0 deletions cpp/src/arrow/dataset/scanner_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,14 @@ TEST_F(TestScanner, Scan) {
AssertScannerEqualsRepetitionsOf(MakeScanner(batch), batch);
}

TEST_F(TestScanner, DISABLED_ScanWithCappedBatchSize) {
// TODO(bkietz) enable when InMemory* respects ScanOptions::batch_size
SetSchema({field("i32", int32()), field("f64", float64())});
auto batch = ConstantArrayGenerator::Zeroes(kBatchSize, schema_);
options_->batch_size = kBatchSize / 2;
AssertScannerEqualsRepetitionsOf(MakeScanner(batch), batch->Slice(kBatchSize / 2));
}

TEST_F(TestScanner, FilteredScan) {
SetSchema({field("f64", float64())});

Expand Down
9 changes: 0 additions & 9 deletions python/pyarrow/_dataset.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -110,15 +110,6 @@ cdef class ParquetFileFormatReaderOptions:
for value in set(values):
self.options.dict_columns.insert(tobytes(value))

@property
def batch_size(self):
"""Maximum number of rows in read record batches."""
return self.options.batch_size

@batch_size.setter
def batch_size(self, int value):
self.options.batch_size = value


cdef class ParquetFileFormat(FileFormat):

Expand Down
1 change: 0 additions & 1 deletion python/pyarrow/includes/libarrow_dataset.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -287,7 +287,6 @@ cdef extern from "arrow/dataset/api.h" namespace "arrow::dataset" nogil:
c_bool use_buffered_stream
int64_t buffer_size
unordered_set[c_string] dict_columns
int64_t batch_size

cdef cppclass CParquetFileFormat "arrow::dataset::ParquetFileFormat"(
CFileFormat):
Expand Down
3 changes: 0 additions & 3 deletions r/src/dataset.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -83,9 +83,6 @@ std::shared_ptr<ds::ParquetFileFormat> dataset___ParquetFileFormat__Make(List_ o
fmt->reader_options.dict_columns.insert(name);
}
}
if (reader_options.containsElementNamed("batch_size")) {
fmt->reader_options.batch_size = reader_options["batch_size"];
}
}
return fmt;
}
Expand Down

0 comments on commit d90fd19

Please sign in to comment.