From 55bdce477a7025d3341c6f83686331a8ce7cc3c2 Mon Sep 17 00:00:00 2001 From: Benjamin Kietzman Date: Mon, 17 Feb 2020 12:53:39 -0500 Subject: [PATCH] namespace reader_options, use strings(names) for dict_columns --- cpp/src/arrow/dataset/file_parquet.cc | 19 +++---- cpp/src/arrow/dataset/file_parquet.h | 34 ++++++------ cpp/src/arrow/dataset/file_parquet_test.cc | 4 +- python/pyarrow/_dataset.pyx | 57 ++++++++++++-------- python/pyarrow/includes/libarrow_dataset.pxd | 11 ++-- python/pyarrow/tests/test_dataset.py | 3 +- 6 files changed, 72 insertions(+), 56 deletions(-) diff --git a/cpp/src/arrow/dataset/file_parquet.cc b/cpp/src/arrow/dataset/file_parquet.cc index 11da0e412a402..b5423a64b2865 100644 --- a/cpp/src/arrow/dataset/file_parquet.cc +++ b/cpp/src/arrow/dataset/file_parquet.cc @@ -92,23 +92,24 @@ static Result> OpenReader( static parquet::ReaderProperties MakeReaderProperties( const ParquetFileFormat& format, MemoryPool* pool = default_memory_pool()) { parquet::ReaderProperties properties(pool); - if (format.use_buffered_stream) { + if (format.reader_options.use_buffered_stream) { properties.enable_buffered_stream(); } else { properties.disable_buffered_stream(); } - properties.set_buffer_size(format.buffer_size); - properties.file_decryption_properties(format.file_decryption_properties); + properties.set_buffer_size(format.reader_options.buffer_size); + properties.file_decryption_properties(format.reader_options.file_decryption_properties); return properties; } static parquet::ArrowReaderProperties MakeArrowReaderProperties( - const ParquetFileFormat& format) { - parquet::ArrowReaderProperties properties(/* use_threads= */ false); - for (int column_index : format.read_dict_indices) { + const ParquetFileFormat& format, const parquet::ParquetFileReader& reader) { + parquet::ArrowReaderProperties properties(/* use_threads = */ false); + for (const std::string& name : format.reader_options.dict_columns) { + auto column_index = reader.metadata()->schema()->ColumnIndex(name); properties.set_read_dictionary(column_index, true); } - properties.set_batch_size(format.batch_size); + properties.set_batch_size(format.reader_options.batch_size); return properties; } @@ -350,7 +351,7 @@ Result> ParquetFileFormat::Inspect( auto properties = MakeReaderProperties(*this); ARROW_ASSIGN_OR_RAISE(auto reader, OpenReader(source, std::move(properties))); - auto arrow_properties = MakeArrowReaderProperties(*this); + auto arrow_properties = MakeArrowReaderProperties(*this, *reader); std::unique_ptr arrow_reader; RETURN_NOT_OK(parquet::arrow::FileReader::Make(default_memory_pool(), std::move(reader), std::move(arrow_properties), @@ -367,7 +368,7 @@ Result ParquetFileFormat::ScanFile( auto properties = MakeReaderProperties(*this, context->pool); ARROW_ASSIGN_OR_RAISE(auto reader, OpenReader(source, std::move(properties))); - auto arrow_properties = MakeArrowReaderProperties(*this); + auto arrow_properties = MakeArrowReaderProperties(*this, *reader); return ParquetScanTaskIterator::Make(options, context, std::move(reader), std::move(arrow_properties)); } diff --git a/cpp/src/arrow/dataset/file_parquet.h b/cpp/src/arrow/dataset/file_parquet.h index 9ad7d50d9f67d..752d01e7e4217 100644 --- a/cpp/src/arrow/dataset/file_parquet.h +++ b/cpp/src/arrow/dataset/file_parquet.h @@ -45,22 +45,24 @@ class ARROW_DS_EXPORT ParquetFileFormat : public FileFormat { return out; } - /// \defgroup parquet-file-format-reader-properties properties which correspond to - /// members of parquet::ReaderProperties. - /// - /// @{ - bool use_buffered_stream = parquet::DEFAULT_USE_BUFFERED_STREAM; - int64_t buffer_size = parquet::DEFAULT_BUFFER_SIZE; - std::shared_ptr file_decryption_properties; - /// @} - - /// \defgroup parquet-file-format-arrow-reader-properties properties which correspond to - /// members of parquet::ArrowReaderProperties. - /// - /// @{ - std::unordered_set read_dict_indices; - int64_t batch_size = parquet::kArrowDefaultBatchSize; - /// @} + struct ReaderOptions { + /// \defgroup parquet-file-format-reader-properties properties which correspond to + /// members of parquet::ReaderProperties. + /// + /// @{ + bool use_buffered_stream = parquet::DEFAULT_USE_BUFFERED_STREAM; + int64_t buffer_size = parquet::DEFAULT_BUFFER_SIZE; + std::shared_ptr file_decryption_properties; + /// @} + + /// \defgroup parquet-file-format-arrow-reader-properties properties which correspond + /// to members of parquet::ArrowReaderProperties. + /// + /// @{ + std::unordered_set dict_columns; + int64_t batch_size = parquet::kArrowDefaultBatchSize; + /// @} + } reader_options; std::string type_name() const override { return "parquet"; } diff --git a/cpp/src/arrow/dataset/file_parquet_test.cc b/cpp/src/arrow/dataset/file_parquet_test.cc index d0c4963c744d4..383475fc6284e 100644 --- a/cpp/src/arrow/dataset/file_parquet_test.cc +++ b/cpp/src/arrow/dataset/file_parquet_test.cc @@ -194,7 +194,7 @@ TEST_F(TestParquetFileFormat, ScanRecordBatchReaderDictEncoded) { opts_ = ScanOptions::Make(reader->schema()); - format_->read_dict_indices.insert(0); + format_->reader_options.dict_columns = {"utf8"}; ASSERT_OK_AND_ASSIGN(auto fragment, format_->MakeFragment(*source, opts_)); ASSERT_OK_AND_ASSIGN(auto scan_task_it, fragment->Scan(ctx_)); @@ -316,7 +316,7 @@ TEST_F(TestParquetFileFormat, InspectDictEncoded) { auto reader = GetRecordBatchReader(); auto source = GetFileSource(reader.get()); - format_->read_dict_indices.insert(0); + format_->reader_options.dict_columns = {"utf8"}; ASSERT_OK_AND_ASSIGN(auto actual, format_->Inspect(*source.get())); Schema expected_schema({field("utf8", dictionary(int32(), utf8()))}); diff --git a/python/pyarrow/_dataset.pyx b/python/pyarrow/_dataset.pyx index 3b891984134b2..69bb96cf92caf 100644 --- a/python/pyarrow/_dataset.pyx +++ b/python/pyarrow/_dataset.pyx @@ -73,56 +73,67 @@ cdef class FileFormat: return self.wrapped -cdef class ParquetFileFormat(FileFormat): - +cdef class ParquetFileFormatReaderOptions: cdef: - CParquetFileFormat* parquet_format + CParquetFileFormatReaderOptions* options - def __init__(self): - self.init( CParquetFileFormat.Make()) - self.parquet_format = self.wrapped.get() + def __init__(self, ParquetFileFormat fmt): + self.options = &fmt.parquet_format.reader_options @property def use_buffered_stream(self): """Read files through buffered input streams rather than loading entire chunks at a time.""" - return self.parquet_format.use_buffered_stream + return self.options.use_buffered_stream @use_buffered_stream.setter def use_buffered_stream(self, bint value): - self.parquet_format.use_buffered_stream = value + self.options.use_buffered_stream = value @property def buffer_size(self): """Size of buffered stream, if enabled.""" - return self.parquet_format.buffer_size + return self.options.buffer_size @buffer_size.setter def buffer_size(self, int value): - self.parquet_format.buffer_size = value + self.options.buffer_size = value @property - def read_dict_indices(self): - """Indices of columns which should be read as dictionaries.""" - return self.parquet_format.read_dict_indices - - @read_dict_indices.setter - def read_dict_indices(self, set values): - self.parquet_format.read_dict_indices.clear() - for value in values: - self.read_dict_index(int(value)) + def dict_columns(self): + """Names of columns which should be read as dictionaries.""" + return self.options.dict_columns - def read_dict_index(self, int value): - self.parquet_format.read_dict_indices.insert(value) + @dict_columns.setter + def dict_columns(self, values): + self.options.dict_columns.clear() + for value in set(values): + self.options.dict_columns.insert(tobytes(value)) @property def batch_size(self): """Maximum number of rows in read record batches.""" - return self.parquet_format.batch_size + return self.options.batch_size @batch_size.setter def batch_size(self, int value): - self.parquet_format.batch_size = value + self.options.batch_size = value + + +cdef class ParquetFileFormat(FileFormat): + + cdef: + CParquetFileFormat* parquet_format + + def __init__(self, dict reader_options=dict()): + self.init( CParquetFileFormat.Make()) + self.parquet_format = self.wrapped.get() + for name, value in reader_options.items(): + setattr(self.reader_options, name, value) + + @property + def reader_options(self): + return ParquetFileFormatReaderOptions(self) cdef class IpcFileFormat(FileFormat): diff --git a/python/pyarrow/includes/libarrow_dataset.pxd b/python/pyarrow/includes/libarrow_dataset.pxd index 0bf7b39a808f4..ae9e2870d38a1 100644 --- a/python/pyarrow/includes/libarrow_dataset.pxd +++ b/python/pyarrow/includes/libarrow_dataset.pxd @@ -283,14 +283,17 @@ cdef extern from "arrow/dataset/api.h" namespace "arrow::dataset" nogil: CFileWriteOptions): c_string file_type() + cdef cppclass CParquetFileFormatReaderOptions "arrow::dataset::ParquetFileFormat::ReaderOptions": + c_bool use_buffered_stream + int64_t buffer_size + unordered_set[c_string] dict_columns + int64_t batch_size + cdef cppclass CParquetFileFormat "arrow::dataset::ParquetFileFormat"( CFileFormat): @staticmethod shared_ptr[CParquetFileFormat] Make() - c_bool use_buffered_stream - int64_t buffer_size - unordered_set[int] read_dict_indices - int64_t batch_size + CParquetFileFormatReaderOptions reader_options cdef cppclass CParquetFragment "arrow::dataset::ParquetFragment"( CFileFragment): diff --git a/python/pyarrow/tests/test_dataset.py b/python/pyarrow/tests/test_dataset.py index a10e3c879ba98..6bf68a84f6d5a 100644 --- a/python/pyarrow/tests/test_dataset.py +++ b/python/pyarrow/tests/test_dataset.py @@ -433,8 +433,7 @@ def test_expression_ergonomics(): ] ]) def test_file_system_factory(mockfs, paths_or_selector): - format = ds.ParquetFileFormat() - format.read_dict_indices = {2} + format = ds.ParquetFileFormat(reader_options=dict(dict_columns={"str"})) options = ds.FileSystemFactoryOptions('subdir') options.partitioning = ds.DirectoryPartitioning(