-
Notifications
You must be signed in to change notification settings - Fork 803
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix Parquet Reader's Arrow Schema Inference #1682
Conversation
@tustvold please let me know when you would like any substantial review for this |
Don't treat embedded arrow schema as authoritative (apache#1663) Fix projection of nested parquet files (apache#1652) (apache#1654) Fix schema inference for repeated fields (apache#1681) Support reading alternative list representations from parquet (apache#1680)
6eb932c
to
e2f12de
Compare
} | ||
} | ||
|
||
impl ParquetTypeConverter<'_> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This logic is copied largely wholesale into schema/primitive.rs
@@ -1261,7 +746,7 @@ mod tests { | |||
{ | |||
arrow_fields.push(Field::new( | |||
"my_list", | |||
DataType::List(Box::new(Field::new("element", DataType::Utf8, true))), | |||
DataType::List(Box::new(Field::new("str", DataType::Utf8, false))), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here we can see this fixing #1681
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the comments need to be updated for the changes in code
// // List<String> (list nullable, elements non-null)
// optional group my_list (LIST) {
// repeated group element {
// required binary str (UTF8);
// };
// }
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That comment is still correct, this is a nullable list with non-nullable elements, as described by that parquet schema.
The test was previously wrong
@@ -1679,7 +1168,7 @@ mod tests { | |||
|
|||
let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type)); | |||
let converted_arrow_schema = | |||
parquet_to_arrow_schema_by_columns(&parquet_schema, vec![3, 4, 0], None) | |||
parquet_to_arrow_schema_by_columns(&parquet_schema, vec![0, 3, 4], None) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Out of order column projection previously would misbehave as parquet_to_arrow_schema_by_columns
supported it, but the actual reader logic did not. This makes it consistently not supported, it will error, as it is hard to reason what the correct semantics are in the event of nested schema
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I didn't see a test for the(new) error case -- I suggest adding one so we don't get accidental regressions
column_mask: Vec<bool>, | ||
} | ||
|
||
impl Visitor { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is logic extracted from builder.rs, it wasn't possible to reuse the existing TypeVisitor as its handling of lists interfered with #1680
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I really like this structure to encapsulate the logic for using the embedded schema. 👍
}; | ||
|
||
Ok(Some(match repetition { | ||
Repetition::REPEATED => primitive_field.into_list(primitive_type.name()), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here is the logic to now support #1680, there is comprehensive test coverage of this in schema.rs, in particular test_arrow_schema_roundtrip.
I'm actually quite pleased with this, despite the underlying list representation in parquet being fundamentally different, the ArrayBuilder can be completely oblivious to this fact 😄
|
||
/// Uses an type hint from the embedded arrow schema to aid in faithfully | ||
/// reproducing the data as it was written into parquet | ||
fn apply_hint(parquet: DataType, hint: DataType) -> DataType { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is the change that fixes #1663 - we only use the arrow schema to hint types
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I like this centralization of hinting logic -- it makes it easy to understand where arrow and parquet type systems aren't compatible
rep_level: i16, | ||
def_level: i16, | ||
/// An optional [`DataType`] sourced from the embedded arrow schema | ||
data_type: Option<DataType>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is what fixes #1654 - we carry the DataType as we walk the tree, which prevents it from misbehaving
} | ||
|
||
/// Representation of a parquet file, in terms of arrow schema elements | ||
pub struct ParquetField { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is the new structure as described in #1655
@@ -1050,6 +1050,41 @@ mod tests { | |||
for batch in record_batch_reader { | |||
batch.unwrap(); | |||
} | |||
|
|||
let projected_reader = arrow_reader |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
let projected_reader = arrow_reader | |
// Test for https://github.com/apache/arrow-rs/issues/1654 and | |
// https://github.com/apache/arrow-rs/issues/1652 | |
let projected_reader = arrow_reader |
Codecov Report
@@ Coverage Diff @@
## master #1682 +/- ##
==========================================
+ Coverage 83.15% 83.25% +0.10%
==========================================
Files 193 195 +2
Lines 56007 56049 +42
==========================================
+ Hits 46572 46665 +93
+ Misses 9435 9384 -51
Continue to review full report at Codecov.
|
I will review this later today |
Sorry I don't think i will get to this today -- will do first thing tommorow. Sorry @tustvold 😞 I just need to find enough contiguous time to do the review and that is hard to come by |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you for taking on this challenge -- @tustvold 🏅 🏆
I won't say I totally grok all the changes in this PR, but I did read it carefully and it makes sense to me and seems to allow for easier improvements going forward. I like how the type logic is now encapsulated more.
Not sure if anyone who uses structured types in parquet (@bjchambers ? @TimDiekmann @jhorstmann ?) might be interested in testing their code with this PR.
I am not sure if anyone else wants a chance to review or if we should merge and include in arrow 14.0.0 (which I am starting to prepare for).
@@ -1050,6 +1050,41 @@ mod tests { | |||
for batch in record_batch_reader { | |||
batch.unwrap(); | |||
} | |||
|
|||
let projected_reader = arrow_reader |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
let projected_reader = arrow_reader | |
// Test for https://github.com/apache/arrow-rs/issues/1654 and | |
// https://github.com/apache/arrow-rs/issues/1652 | |
let projected_reader = arrow_reader |
DataType::Date32 => Type::primitive_type_builder(name, PhysicalType::INT32) | ||
.with_logical_type(Some(LogicalType::Date)) | ||
.with_repetition(repetition) | ||
.build(), | ||
// date64 is cast to date32 | ||
// date64 is cast to date32 (#1666) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍 #1666
DataType::Date64 => Type::primitive_type_builder(name, PhysicalType::INT32) | ||
.with_logical_type(Some(LogicalType::Date)) | ||
.with_repetition(repetition) | ||
.build(), | ||
DataType::Time32(_) => Type::primitive_type_builder(name, PhysicalType::INT32) | ||
DataType::Time32(TimeUnit::Second) => { | ||
// Cannot represent seconds in LogicalType |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Confirming this is a (seemingly better) change in behavior, right -- now no logical type is stored for arrow Time32(seconds)
but previously the logical type of Time(millis)
was stored,
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, TBC this wouldn't be wrong if the writer coerced the types to match. The problem is it does not
@@ -1261,7 +746,7 @@ mod tests { | |||
{ | |||
arrow_fields.push(Field::new( | |||
"my_list", | |||
DataType::List(Box::new(Field::new("element", DataType::Utf8, true))), | |||
DataType::List(Box::new(Field::new("str", DataType::Utf8, false))), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the comments need to be updated for the changes in code
// // List<String> (list nullable, elements non-null)
// optional group my_list (LIST) {
// repeated group element {
// required binary str (UTF8);
// };
// }
column_mask: Vec<bool>, | ||
} | ||
|
||
impl Visitor { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I really like this structure to encapsulate the logic for using the embedded schema. 👍
}; | ||
|
||
Ok(visitor.dispatch(parquet_type, context)?.unwrap()) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it would be awesome to add tests specifically for this logic that enumerated parquet types and their expected conversions to arrow.
Maybe that could be done as a follow on PR
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is fairly good coverage of the type conversion already in schema.rs, but there is definitely scope for testing repetition levels in addition. Filed #1698
|
||
/// Uses an type hint from the embedded arrow schema to aid in faithfully | ||
/// reproducing the data as it was written into parquet | ||
fn apply_hint(parquet: DataType, hint: DataType) -> DataType { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I like this centralization of hinting logic -- it makes it easy to understand where arrow and parquet type systems aren't compatible
} | ||
_ => Ok(DataType::FixedSizeBinary(type_length)), | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
also here, some explicit tests showing conversions as a way to document expected behavior would be really nice
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These type conversions are very well covered by the unit tests in schema.rs
I think this is now ready, if I've missed anything let me know. I think it is worth highlighting as a breaking change in the changelog, so that on the off chance it does break something someone was relying on, even if it likely was a bug, they know where to look and we can hopefully quickly unblock them. |
Looking into test failures |
I didn't review this in detail, but did run our test suite against this branch and did not notice any issues. |
.unwrap_err() | ||
.to_string(); | ||
|
||
assert!( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
Merging (is the first PR in what will be released as arrow 15.0.0) 🎉 |
😅 glad I didn't try to include this in |
@alamb that's unfortunately expected... DataFusion has a bug... Will provide context on ticket |
Which issue does this PR close?
Closes #1655
Closes #1663
Closes #1652
Closes #1654
Closes #1681
Closes #1680
Closes #1484
Rationale for this change
See tickets, in particular #1655
What changes are included in this PR?
This separates the schema inference logic from the logic that reads the parquet file, this makes the logic clearer, easier to test, and hopefully less buggy.
Are there any user-facing changes?
Yes, schema inference may change. It will be more correct, but this is still a change.
We also explicitly no longer support out-of-order column projection, whereas previously it would be silently ignored in some code paths.