From 22caaa1ba2e0838df228df767ac4c07313a9f619 Mon Sep 17 00:00:00 2001 From: Benjamin Kietzman Date: Thu, 23 Jan 2020 13:21:21 -0500 Subject: [PATCH] refactoring --- cpp/src/arrow/dataset/file_base.h | 2 +- cpp/src/arrow/dataset/file_ipc.cc | 4 +- cpp/src/arrow/dataset/file_ipc.h | 6 +-- cpp/src/arrow/dataset/file_parquet.cc | 5 +- cpp/src/arrow/dataset/file_parquet.h | 19 +++++-- cpp/src/arrow/dataset/file_parquet_test.cc | 54 ++++++++++---------- cpp/src/arrow/dataset/test_util.h | 8 +-- python/pyarrow/_dataset.pyx | 8 +++ python/pyarrow/includes/libarrow_dataset.pxd | 5 +- 9 files changed, 68 insertions(+), 43 deletions(-) diff --git a/cpp/src/arrow/dataset/file_base.h b/cpp/src/arrow/dataset/file_base.h index bcae386b08a2a..331a089d33fbc 100644 --- a/cpp/src/arrow/dataset/file_base.h +++ b/cpp/src/arrow/dataset/file_base.h @@ -130,7 +130,7 @@ class ARROW_DS_EXPORT FileFormat { /// \brief Open a fragment virtual Result> MakeFragment( - const FileSource& location, std::shared_ptr options) = 0; + FileSource location, std::shared_ptr options) = 0; }; /// \brief A Fragment that is stored in a file with a known format diff --git a/cpp/src/arrow/dataset/file_ipc.cc b/cpp/src/arrow/dataset/file_ipc.cc index 30474fc4c968d..d4e95700558aa 100644 --- a/cpp/src/arrow/dataset/file_ipc.cc +++ b/cpp/src/arrow/dataset/file_ipc.cc @@ -130,8 +130,8 @@ Result IpcFileFormat::ScanFile( } Result> IpcFileFormat::MakeFragment( - const FileSource& source, std::shared_ptr options) { - return std::make_shared(source, options); + FileSource source, std::shared_ptr options) { + return std::make_shared(std::move(source), std::move(options)); } } // namespace dataset diff --git a/cpp/src/arrow/dataset/file_ipc.h b/cpp/src/arrow/dataset/file_ipc.h index 716524a509ede..827394e35d7ec 100644 --- a/cpp/src/arrow/dataset/file_ipc.h +++ b/cpp/src/arrow/dataset/file_ipc.h @@ -43,13 +43,13 @@ class ARROW_DS_EXPORT IpcFileFormat : public FileFormat { std::shared_ptr context) const override; Result> MakeFragment( - const FileSource& source, std::shared_ptr options) override; + FileSource source, std::shared_ptr options) override; }; class ARROW_DS_EXPORT IpcFragment : public FileFragment { public: - IpcFragment(const FileSource& source, std::shared_ptr options) - : FileFragment(source, std::make_shared(), options) {} + IpcFragment(FileSource source, std::shared_ptr options) + : FileFragment(std::move(source), std::make_shared(), options) {} bool splittable() const override { return true; } }; diff --git a/cpp/src/arrow/dataset/file_parquet.cc b/cpp/src/arrow/dataset/file_parquet.cc index 2cf63323a9a26..11da0e412a402 100644 --- a/cpp/src/arrow/dataset/file_parquet.cc +++ b/cpp/src/arrow/dataset/file_parquet.cc @@ -373,8 +373,9 @@ Result ParquetFileFormat::ScanFile( } Result> ParquetFileFormat::MakeFragment( - const FileSource& source, std::shared_ptr options) { - return std::make_shared(source, options); + FileSource source, std::shared_ptr options) { + auto format = weak_this_.lock(); + return std::make_shared(std::move(source), format, std::move(options)); } } // namespace dataset diff --git a/cpp/src/arrow/dataset/file_parquet.h b/cpp/src/arrow/dataset/file_parquet.h index 151c7be541da7..613dd7f98f014 100644 --- a/cpp/src/arrow/dataset/file_parquet.h +++ b/cpp/src/arrow/dataset/file_parquet.h @@ -19,6 +19,7 @@ #include #include +#include #include "arrow/dataset/file_base.h" #include "arrow/dataset/type_fwd.h" @@ -37,6 +38,12 @@ namespace dataset { /// \brief A FileFormat implementation that reads from Parquet files class ARROW_DS_EXPORT ParquetFileFormat : public FileFormat { public: + static std::shared_ptr Make() { + std::shared_ptr out{new ParquetFileFormat}; + out->weak_this_ = out; + return out; + } + /// \defgroup parquet-file-format-reader-properties properties which correspond to /// members of parquet::ReaderProperties. /// @@ -67,13 +74,19 @@ class ARROW_DS_EXPORT ParquetFileFormat : public FileFormat { std::shared_ptr context) const override; Result> MakeFragment( - const FileSource& source, std::shared_ptr options) override; + FileSource source, std::shared_ptr options) override; + + private: + ParquetFileFormat() = default; + + std::weak_ptr weak_this_; }; class ARROW_DS_EXPORT ParquetFragment : public FileFragment { public: - ParquetFragment(const FileSource& source, std::shared_ptr options) - : FileFragment(source, std::make_shared(), options) {} + ParquetFragment(FileSource source, std::shared_ptr format, + std::shared_ptr options) + : FileFragment(std::move(source), std::move(format), std::move(options)) {} bool splittable() const override { return true; } }; diff --git a/cpp/src/arrow/dataset/file_parquet_test.cc b/cpp/src/arrow/dataset/file_parquet_test.cc index daa35713422b9..453fa81719c4f 100644 --- a/cpp/src/arrow/dataset/file_parquet_test.cc +++ b/cpp/src/arrow/dataset/file_parquet_test.cc @@ -159,7 +159,7 @@ class ParquetBufferFixtureMixin : public ArrowParquetWriterMixin { class TestParquetFileFormat : public ParquetBufferFixtureMixin { protected: - ParquetFileFormat format_; + std::shared_ptr format_ = ParquetFileFormat::Make(); std::shared_ptr opts_; std::shared_ptr ctx_ = std::make_shared(); }; @@ -169,7 +169,7 @@ TEST_F(TestParquetFileFormat, ScanRecordBatchReader) { auto source = GetFileSource(reader.get()); opts_ = ScanOptions::Make(reader->schema()); - auto fragment = std::make_shared(*source, opts_); + auto fragment = std::make_shared(*source, format_, opts_); ASSERT_OK_AND_ASSIGN(auto scan_task_it, fragment->Scan(ctx_)); int64_t row_count = 0; @@ -192,8 +192,8 @@ TEST_F(TestParquetFileFormat, ScanRecordBatchReaderDictEncoded) { opts_ = ScanOptions::Make(reader->schema()); - format_.read_dict_indices.insert(0); - ASSERT_OK_AND_ASSIGN(auto fragment, format_.MakeFragment(*source, opts_)); + format_->read_dict_indices.insert(0); + ASSERT_OK_AND_ASSIGN(auto fragment, format_->MakeFragment(*source, opts_)); ASSERT_OK_AND_ASSIGN(auto scan_task_it, fragment->Scan(ctx_)); int64_t row_count = 0; @@ -215,14 +215,14 @@ TEST_F(TestParquetFileFormat, ScanRecordBatchReaderDictEncoded) { TEST_F(TestParquetFileFormat, OpenFailureWithRelevantError) { std::shared_ptr buf = std::make_shared(util::string_view("")); - auto result = format_.Inspect(FileSource(buf)); + auto result = format_->Inspect(FileSource(buf)); EXPECT_RAISES_WITH_MESSAGE_THAT(IOError, testing::HasSubstr(""), result.status()); constexpr auto file_name = "herp/derp"; ASSERT_OK_AND_ASSIGN( auto fs, fs::internal::MockFileSystem::Make(fs::kNoTime, {fs::File(file_name)})); - result = format_.Inspect({file_name, fs.get()}); + result = format_->Inspect({file_name, fs.get()}); EXPECT_RAISES_WITH_MESSAGE_THAT(IOError, testing::HasSubstr(file_name), result.status()); } @@ -241,7 +241,7 @@ TEST_F(TestParquetFileFormat, ScanRecordBatchReaderProjected) { auto reader = GetRecordBatchReader(); auto source = GetFileSource(reader.get()); - auto fragment = std::make_shared(*source, opts_); + auto fragment = std::make_shared(*source, format_, opts_); ASSERT_OK_AND_ASSIGN(auto scan_task_it, fragment->Scan(ctx_)); int64_t row_count = 0; @@ -283,7 +283,7 @@ TEST_F(TestParquetFileFormat, ScanRecordBatchReaderProjectedMissingCols) { auto source = GetFileSource({reader.get(), reader_without_i32.get(), reader_without_f64.get()}); - auto fragment = std::make_shared(*source, opts_); + auto fragment = std::make_shared(*source, format_, opts_); ASSERT_OK_AND_ASSIGN(auto scan_task_it, fragment->Scan(ctx_)); int64_t row_count = 0; @@ -304,7 +304,7 @@ TEST_F(TestParquetFileFormat, Inspect) { auto reader = GetRecordBatchReader(); auto source = GetFileSource(reader.get()); - ASSERT_OK_AND_ASSIGN(auto actual, format_.Inspect(*source.get())); + ASSERT_OK_AND_ASSIGN(auto actual, format_->Inspect(*source.get())); AssertSchemaEqual(*actual, *schema_, /*check_metadata=*/false); } @@ -312,8 +312,8 @@ TEST_F(TestParquetFileFormat, InspectDictEncoded) { auto reader = GetRecordBatchReader(); auto source = GetFileSource(reader.get()); - format_.read_dict_indices.insert(0); - ASSERT_OK_AND_ASSIGN(auto actual, format_.Inspect(*source.get())); + format_->read_dict_indices.insert(0); + ASSERT_OK_AND_ASSIGN(auto actual, format_->Inspect(*source.get())); Schema expected_schema({field("f64", dictionary(int32(), float64()))}); EXPECT_EQ(*actual, expected_schema); @@ -326,14 +326,14 @@ TEST_F(TestParquetFileFormat, IsSupported) { bool supported = false; std::shared_ptr buf = std::make_shared(util::string_view("")); - ASSERT_OK_AND_ASSIGN(supported, format_.IsSupported(FileSource(buf))); + ASSERT_OK_AND_ASSIGN(supported, format_->IsSupported(FileSource(buf))); ASSERT_EQ(supported, false); buf = std::make_shared(util::string_view("corrupted")); - ASSERT_OK_AND_ASSIGN(supported, format_.IsSupported(FileSource(buf))); + ASSERT_OK_AND_ASSIGN(supported, format_->IsSupported(FileSource(buf))); ASSERT_EQ(supported, false); - ASSERT_OK_AND_ASSIGN(supported, format_.IsSupported(*source)); + ASSERT_OK_AND_ASSIGN(supported, format_->IsSupported(*source)); EXPECT_EQ(supported, true); } @@ -358,12 +358,12 @@ void CountRowsInScan(ScanTaskIterator& it, int64_t expected_rows, class TestParquetFileFormatPushDown : public TestParquetFileFormat { public: - void CountRowsAndBatchesInScan(Fragment& fragment, int64_t expected_rows, - int64_t expected_batches) { + void CountRowsAndBatchesInScan(const std::shared_ptr& fragment, + int64_t expected_rows, int64_t expected_batches) { int64_t actual_rows = 0; int64_t actual_batches = 0; - ASSERT_OK_AND_ASSIGN(auto it, fragment.Scan(ctx_)); + ASSERT_OK_AND_ASSIGN(auto it, fragment->Scan(ctx_)); for (auto maybe_scan_task : it) { ASSERT_OK_AND_ASSIGN(auto scan_task, std::move(maybe_scan_task)); ASSERT_OK_AND_ASSIGN(auto rb_it, scan_task->Execute()); @@ -399,35 +399,35 @@ TEST_F(TestParquetFileFormatPushDown, Basic) { auto source = GetFileSource(reader.get()); opts_ = ScanOptions::Make(reader->schema()); - auto fragment = std::make_shared(*source, opts_); + auto fragment = std::make_shared(*source, format_, opts_); opts_->filter = scalar(true); - CountRowsAndBatchesInScan(*fragment, kTotalNumRows, kNumRowGroups); + CountRowsAndBatchesInScan(fragment, kTotalNumRows, kNumRowGroups); for (int64_t i = 1; i <= kNumRowGroups; i++) { opts_->filter = ("i64"_ == int64_t(i)).Copy(); - CountRowsAndBatchesInScan(*fragment, i, 1); + CountRowsAndBatchesInScan(fragment, i, 1); } /* Out of bound filters should skip all RowGroups. */ opts_->filter = scalar(false); - CountRowsAndBatchesInScan(*fragment, 0, 0); + CountRowsAndBatchesInScan(fragment, 0, 0); opts_->filter = ("i64"_ == int64_t(kNumRowGroups + 1)).Copy(); - CountRowsAndBatchesInScan(*fragment, 0, 0); + CountRowsAndBatchesInScan(fragment, 0, 0); opts_->filter = ("i64"_ == int64_t(-1)).Copy(); - CountRowsAndBatchesInScan(*fragment, 0, 0); + CountRowsAndBatchesInScan(fragment, 0, 0); // No rows match 1 and 2. opts_->filter = ("i64"_ == int64_t(1) and "u8"_ == uint8_t(2)).Copy(); - CountRowsAndBatchesInScan(*fragment, 0, 0); + CountRowsAndBatchesInScan(fragment, 0, 0); opts_->filter = ("i64"_ == int64_t(2) or "i64"_ == int64_t(4)).Copy(); - CountRowsAndBatchesInScan(*fragment, 2 + 4, 2); + CountRowsAndBatchesInScan(fragment, 2 + 4, 2); opts_->filter = ("i64"_ < int64_t(6)).Copy(); - CountRowsAndBatchesInScan(*fragment, 5 * (5 + 1) / 2, 5); + CountRowsAndBatchesInScan(fragment, 5 * (5 + 1) / 2, 5); opts_->filter = ("i64"_ >= int64_t(6)).Copy(); - CountRowsAndBatchesInScan(*fragment, kTotalNumRows - (5 * (5 + 1) / 2), + CountRowsAndBatchesInScan(fragment, kTotalNumRows - (5 * (5 + 1) / 2), kNumRowGroups - 5); } diff --git a/cpp/src/arrow/dataset/test_util.h b/cpp/src/arrow/dataset/test_util.h index bb7355aa3f87b..ef1f76758e835 100644 --- a/cpp/src/arrow/dataset/test_util.h +++ b/cpp/src/arrow/dataset/test_util.h @@ -196,7 +196,7 @@ class DummyFileFormat : public FileFormat { } inline Result> MakeFragment( - const FileSource& location, std::shared_ptr options) override; + FileSource source, std::shared_ptr options) override; protected: std::shared_ptr schema_; @@ -211,7 +211,7 @@ class DummyFragment : public FileFragment { }; Result> DummyFileFormat::MakeFragment( - const FileSource& source, std::shared_ptr options) { + FileSource source, std::shared_ptr options) { return std::make_shared(source, options); } @@ -251,7 +251,7 @@ class JSONRecordBatchFileFormat : public FileFormat { } inline Result> MakeFragment( - const FileSource& location, std::shared_ptr options) override; + FileSource source, std::shared_ptr options) override; protected: SchemaResolver resolver_; @@ -268,7 +268,7 @@ class JSONRecordBatchFragment : public FileFragment { }; Result> JSONRecordBatchFileFormat::MakeFragment( - const FileSource& source, std::shared_ptr options) { + FileSource source, std::shared_ptr options) { return std::make_shared(source, resolver_(source), options); } diff --git a/python/pyarrow/_dataset.pyx b/python/pyarrow/_dataset.pyx index 39089c9c54407..9f858a0c58559 100644 --- a/python/pyarrow/_dataset.pyx +++ b/python/pyarrow/_dataset.pyx @@ -75,9 +75,17 @@ cdef class FileFormat: cdef class ParquetFileFormat(FileFormat): + cdef: + CParquetFileFormat* parquet_format + def __init__(self): self.init(shared_ptr[CFileFormat](new CParquetFileFormat())) + @property + def use_buffered_stream(self): + """The arrow Schema describing the partition scheme.""" + return self.wrapped. + cdef class IpcFileFormat(FileFormat): diff --git a/python/pyarrow/includes/libarrow_dataset.pxd b/python/pyarrow/includes/libarrow_dataset.pxd index e9952f4aec5d3..08d9af2899db0 100644 --- a/python/pyarrow/includes/libarrow_dataset.pxd +++ b/python/pyarrow/includes/libarrow_dataset.pxd @@ -285,7 +285,10 @@ cdef extern from "arrow/dataset/api.h" namespace "arrow::dataset" nogil: cdef cppclass CParquetFileFormat "arrow::dataset::ParquetFileFormat"( CFileFormat): - pass + c_bool use_buffered_stream + int64_t buffer_size + unordered_set[c_int] read_dict_indices + int64_t batch_size cdef cppclass CParquetFragment "arrow::dataset::ParquetFragment"( CFileFragment):