From 008d2777ea3bdb6cf5f62144ace42ff725bc6255 Mon Sep 17 00:00:00 2001 From: mwish Date: Wed, 20 Sep 2023 20:08:20 +0800 Subject: [PATCH] GH-37111: [C++][Parquet] Dataset: Fixing Schema Cast (#37793) ### Rationale for this change Parquet and Arrow has two schema: 1. Parquet has a SchemaElement[1], it's language and implement independent. Parquet Arrow will extract the schema and decude it. 2. Parquet arrow stores schema and possible `field_id` in `key_value_metadata`[2] when `store_schema` enabled. When deserializing, it will first parse `SchemaElement`[1], and if self-defined key_value_metadata exists, it will use `key_value_metadata` to override the [1] [1] https://github.com/apache/parquet-format/blob/master/src/main/thrift/parquet.thrift#L356 [2] https://github.com/apache/parquet-format/blob/master/src/main/thrift/parquet.thrift#L1033 The bug raise from that, when dataset parsing `SchemaManifest`, it doesn't use `key_value_metadata` from `Metadata`, which raises the problem. For duration, when `store_schema` enabled, it will store `Int64` as physical type, and add a `::arrow::Duration` in `key_value_metadata`. And there is no `equal(Duration, i64)`. So raise the un-impl ### What changes are included in this PR? Set `key_value_metadata` in implemented. ### Are these changes tested? Yes ### Are there any user-facing changes? bugfix * Closes: #37111 Authored-by: mwish Signed-off-by: Benjamin Kietzman --- cpp/src/arrow/dataset/file_parquet.cc | 7 ++--- cpp/src/arrow/dataset/file_parquet_test.cc | 31 ++++++++++++++++++++-- 2 files changed, 33 insertions(+), 5 deletions(-) diff --git a/cpp/src/arrow/dataset/file_parquet.cc b/cpp/src/arrow/dataset/file_parquet.cc index 9d0e8a6515878..751937e93b937 100644 --- a/cpp/src/arrow/dataset/file_parquet.cc +++ b/cpp/src/arrow/dataset/file_parquet.cc @@ -104,11 +104,12 @@ parquet::ArrowReaderProperties MakeArrowReaderProperties( return arrow_properties; } -template Result> GetSchemaManifest( - const M& metadata, const parquet::ArrowReaderProperties& properties) { + const parquet::FileMetaData& metadata, + const parquet::ArrowReaderProperties& properties) { auto manifest = std::make_shared(); - const std::shared_ptr& key_value_metadata = nullptr; + const std::shared_ptr& key_value_metadata = + metadata.key_value_metadata(); RETURN_NOT_OK(SchemaManifest::Make(metadata.schema(), key_value_metadata, properties, manifest.get())); return manifest; diff --git a/cpp/src/arrow/dataset/file_parquet_test.cc b/cpp/src/arrow/dataset/file_parquet_test.cc index 8527c3af64c83..177ca824179a8 100644 --- a/cpp/src/arrow/dataset/file_parquet_test.cc +++ b/cpp/src/arrow/dataset/file_parquet_test.cc @@ -65,11 +65,15 @@ class ParquetFormatHelper { public: using FormatType = ParquetFileFormat; - static Result> Write(RecordBatchReader* reader) { + static Result> Write( + RecordBatchReader* reader, + const std::shared_ptr& arrow_properties = + default_arrow_writer_properties()) { auto pool = ::arrow::default_memory_pool(); std::shared_ptr out; auto sink = CreateOutputStream(pool); - RETURN_NOT_OK(WriteRecordBatchReader(reader, pool, sink)); + RETURN_NOT_OK(WriteRecordBatchReader(reader, pool, sink, default_writer_properties(), + arrow_properties)); return sink->Finish(); } static std::shared_ptr MakeFormat() { @@ -703,6 +707,29 @@ TEST_P(TestParquetFileFormatScan, PredicatePushdownRowGroupFragmentsUsingStringC CountRowGroupsInFragment(fragment, {0, 3}, equal(field_ref("x"), literal("a"))); } +TEST_P(TestParquetFileFormatScan, PredicatePushdownRowGroupFragmentsUsingDurationColumn) { + // GH-37111: Parquet arrow stores writer schema and possible field_id in + // key_value_metadata when store_schema enabled. When storing `arrow::duration`, it will + // be stored as int64. This test ensures that dataset can parse the writer schema + // correctly. + auto table = TableFromJSON(schema({field("t", duration(TimeUnit::NANO))}), + { + R"([{"t": 1}])", + R"([{"t": 2}, {"t": 3}])", + }); + TableBatchReader table_reader(*table); + ASSERT_OK_AND_ASSIGN( + auto buffer, + ParquetFormatHelper::Write( + &table_reader, ArrowWriterProperties::Builder().store_schema()->build())); + auto source = std::make_shared(buffer); + SetSchema({field("t", duration(TimeUnit::NANO))}); + ASSERT_OK_AND_ASSIGN(auto fragment, format_->MakeFragment(*source)); + + auto expr = equal(field_ref("t"), literal(::arrow::DurationScalar(1, TimeUnit::NANO))); + CountRowGroupsInFragment(fragment, {0}, expr); +} + // Tests projection with nested/indexed FieldRefs. // https://github.com/apache/arrow/issues/35579 TEST_P(TestParquetFileFormatScan, ProjectWithNonNamedFieldRefs) {