diff --git a/parquet/src/arrow/arrow_writer.rs b/parquet/src/arrow/arrow_writer.rs index c400cc182543..b4915a3111d3 100644 --- a/parquet/src/arrow/arrow_writer.rs +++ b/parquet/src/arrow/arrow_writer.rs @@ -91,7 +91,7 @@ impl ArrowWriter { let batch_level = LevelInfo::new_from_batch(batch); let mut row_group_writer = self.writer.next_row_group()?; for (array, field) in batch.columns().iter().zip(batch.schema().fields()) { - let mut levels = batch_level.calculate_array_levels(array, field); + let mut levels = batch_level.calculate_array_levels(array, field, false); // Reverse levels as we pop() them when writing arrays levels.reverse(); write_leaves(&mut row_group_writer, array, &mut levels)?; @@ -760,10 +760,15 @@ mod tests { let schema = Schema::new(vec![ Field::new("a", DataType::Int32, false), Field::new("b", DataType::Int32, true), + // Note: when the below struct is set to non-nullable, this test fails, + // but the output data written is correct. + // Interestingly, pyarrow will read it correctly, but pyspark fails to. + // This might be a compatibility quirk between arrow and parquet. + // We have opened https://github.com/apache/arrow-rs/issues/245 to investigate Field::new( "c", DataType::Struct(vec![struct_field_d.clone(), struct_field_e.clone()]), - true, // NB: this test fails if value is false. Why? + true, ), ]); @@ -809,6 +814,48 @@ mod tests { roundtrip("test_arrow_writer_complex.parquet", batch); } + #[test] + fn arrow_writer_complex_mixed() { + // This test was added while investigating https://github.com/apache/arrow-rs/issues/244. + // Only writing the "offest_field" column works when "some_nested_object" is non-null. + // This indicates that a non-null struct should not have a null child (with null values). + // One observation is that spark doesn't consider the parent struct's nullness, + // and so, we should investigate the impact of always treating structs as null. + // See https://github.com/apache/arrow-rs/issues/245. + + // define schema + let offset_field = Field::new("offset", DataType::Int32, true); + let partition_field = Field::new("partition", DataType::Int64, true); + let topic_field = Field::new("topic", DataType::Utf8, true); + let schema = Schema::new(vec![Field::new( + "some_nested_object", + DataType::Struct(vec![ + offset_field.clone(), + partition_field.clone(), + topic_field.clone(), + ]), + true, + )]); + + // create some data + let offset = Int32Array::from(vec![1, 2, 3, 4, 5]); + let partition = Int64Array::from(vec![Some(1), None, None, Some(4), Some(5)]); + let topic = StringArray::from(vec![Some("A"), None, Some("A"), Some(""), None]); + + let some_nested_object = StructArray::from(vec![ + (offset_field, Arc::new(offset) as ArrayRef), + (partition_field, Arc::new(partition) as ArrayRef), + (topic_field, Arc::new(topic) as ArrayRef), + ]); + + // build a record batch + let batch = + RecordBatch::try_new(Arc::new(schema), vec![Arc::new(some_nested_object)]) + .unwrap(); + + roundtrip("test_arrow_writer_complex_mixed.parquet", batch); + } + #[test] fn arrow_writer_2_level_struct() { // tests writing > @@ -872,7 +919,6 @@ mod tests { } #[test] - #[ignore = "The levels generated are correct, but because of field_a being non-nullable, we cannot write record"] fn arrow_writer_2_level_struct_mixed_null() { // tests writing > let field_c = Field::new("c", DataType::Int32, false); @@ -881,10 +927,14 @@ mod tests { let schema = Schema::new(vec![field_a.clone()]); // create data + // When the null buffer of the struct is created, this test fails. + // It appears that the nullness of the struct is ignored when the + // struct is read back. + // See https://github.com/apache/arrow-rs/issues/245 let c = Int32Array::from(vec![1, 2, 3, 4, 5, 6]); let b_data = ArrayDataBuilder::new(field_b.data_type().clone()) .len(6) - .null_bit_buffer(Buffer::from(vec![0b00100111])) + // .null_bit_buffer(Buffer::from(vec![0b00100111])) .add_child_data(c.data().clone()) .build(); let b = StructArray::from(b_data); @@ -896,7 +946,7 @@ mod tests { let a = StructArray::from(a_data); assert_eq!(a.null_count(), 0); - assert_eq!(a.column(0).null_count(), 2); + assert_eq!(a.column(0).null_count(), 0); // build a racord batch let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap(); diff --git a/parquet/src/arrow/levels.rs b/parquet/src/arrow/levels.rs index 2168670bb591..2669581c8712 100644 --- a/parquet/src/arrow/levels.rs +++ b/parquet/src/arrow/levels.rs @@ -97,10 +97,20 @@ impl LevelInfo { /// Compute nested levels of the Arrow array, recursing into lists and structs. /// /// Returns a list of `LevelInfo`, where each level is for nested primitive arrays. + /// + /// The parent struct's nullness is tracked, as it determines whether the child + /// max_definition should be incremented. + /// The 'is_parent_struct' variable asks "is this field's parent a struct?". + /// * If we are starting at a [RecordBatch], this is `false`. + /// * If we are calculating a list's child, this is `false`. + /// * If we are calculating a struct (i.e. `field.data_type90 == Struct`), + /// this depends on whether the struct is a child of a struct. + /// * If we are calculating a field inside a [StructArray], this is 'true'. pub(crate) fn calculate_array_levels( &self, array: &ArrayRef, field: &Field, + is_parent_struct: bool, ) -> Vec { let (array_offsets, array_mask) = Self::get_array_offsets_and_masks(array); match array.data_type() { @@ -143,6 +153,7 @@ impl LevelInfo { array_offsets, array_mask, false, + is_parent_struct, field.is_nullable(), )] } @@ -152,6 +163,8 @@ impl LevelInfo { array_offsets, array_mask, true, + // the list could come from a struct, but its children will all be false + is_parent_struct, field.is_nullable(), ); @@ -195,11 +208,12 @@ impl LevelInfo { child_offsets, child_mask, false, + false, // always false list_field.is_nullable(), )] } DataType::List(_) | DataType::LargeList(_) | DataType::Struct(_) => { - list_level.calculate_array_levels(&child_array, list_field) + list_level.calculate_array_levels(&child_array, list_field, false) } DataType::FixedSizeList(_, _) => unimplemented!(), DataType::Union(_) => unimplemented!(), @@ -215,6 +229,8 @@ impl LevelInfo { array_offsets, array_mask, false, + // struct's own parent could be a struct + is_parent_struct, field.is_nullable(), ); let mut struct_levels = vec![]; @@ -223,8 +239,12 @@ impl LevelInfo { .into_iter() .zip(struct_fields) .for_each(|(child_array, child_field)| { - let mut levels = - struct_level.calculate_array_levels(child_array, child_field); + let mut levels = struct_level.calculate_array_levels( + child_array, + child_field, + // this is the only place where this is always true + true, + ); struct_levels.append(&mut levels); }); struct_levels @@ -239,6 +259,7 @@ impl LevelInfo { array_offsets, array_mask, false, + is_parent_struct, field.is_nullable(), )] } @@ -295,6 +316,7 @@ impl LevelInfo { array_offsets: Vec, array_mask: Vec, is_list: bool, + is_parent_struct: bool, is_nullable: bool, ) -> Self { let min_len = *(array_offsets.last().unwrap()) as usize; @@ -311,11 +333,20 @@ impl LevelInfo { } else if self.is_list { // second exception, always increment after a list self.max_definition + 1 + } else if is_parent_struct && !self.is_nullable { + // if the parent is a non-null struct, don't increment + self.max_definition } else { self.max_definition + is_nullable as i16 } } - true => self.max_definition + 1 + is_nullable as i16, + true => { + if is_parent_struct && !self.is_nullable { + self.max_definition + is_nullable as i16 + } else { + self.max_definition + 1 + is_nullable as i16 + } + } }; match (self.is_list, is_list) { @@ -728,6 +759,7 @@ mod tests { array_mask, true, false, + false, ); // let expected_levels = LevelInfo { @@ -759,6 +791,7 @@ mod tests { array_mask, true, false, + false, ); let expected_levels = LevelInfo { definition: vec![2, 2, 2, 2, 2, 2, 2, 2, 2, 2], @@ -799,6 +832,7 @@ mod tests { array_mask.clone(), false, false, + false, ); let expected_levels = LevelInfo { definition: vec![1; 10], @@ -831,6 +865,7 @@ mod tests { array_offsets.clone(), array_mask.clone(), false, + false, true, ); let expected_levels = LevelInfo { @@ -865,6 +900,7 @@ mod tests { array_offsets.clone(), array_mask, true, + false, true, ); // array: [[0, 0], _1_, [2, 2], [3, 3, 3, 3], [4, 4, 4]] @@ -926,6 +962,7 @@ mod tests { array_offsets.clone(), array_mask, true, + false, true, ); let expected_levels = LevelInfo { @@ -963,6 +1000,7 @@ mod tests { array_offsets.clone(), array_mask, true, + false, true, ); let expected_levels = LevelInfo { @@ -1043,6 +1081,7 @@ mod tests { array_offsets.clone(), array_mask, true, + false, true, ); // 0: [null], level 1 is defined, but not 2 @@ -1083,6 +1122,7 @@ mod tests { array_offsets.clone(), array_mask, true, + false, true, ); // We have 7 array values, and at least 15 primitives (from array_offsets) @@ -1151,7 +1191,7 @@ mod tests { is_nullable: true, }; let b_levels = - a_levels.calculate_child_levels(b_offsets.clone(), b_mask, false, true); + a_levels.calculate_child_levels(b_offsets.clone(), b_mask, false, true, true); assert_eq!(&b_expected_levels, &b_levels); // c's offset and mask @@ -1167,7 +1207,8 @@ mod tests { is_list: false, is_nullable: true, }; - let c_levels = b_levels.calculate_child_levels(c_offsets, c_mask, false, true); + let c_levels = + b_levels.calculate_child_levels(c_offsets, c_mask, false, true, true); assert_eq!(&c_expected_levels, &c_levels); } @@ -1216,7 +1257,8 @@ mod tests { .iter() .zip(batch.schema().fields()) .for_each(|(array, field)| { - let mut array_levels = batch_level.calculate_array_levels(array, field); + let mut array_levels = + batch_level.calculate_array_levels(array, field, false); levels.append(&mut array_levels); }); assert_eq!(levels.len(), 1); @@ -1248,6 +1290,8 @@ mod tests { fn mixed_struct_list() { // this tests the level generation from the equivalent arrow_writer_complex test + // TODO: Investigate failure if struct is null. See https://github.com/apache/arrow-rs/issues/245 + // define schema let struct_field_d = Field::new("d", DataType::Float64, true); let struct_field_f = Field::new("f", DataType::Float32, true); @@ -1267,7 +1311,7 @@ mod tests { Field::new( "c", DataType::Struct(vec![struct_field_d.clone(), struct_field_e.clone()]), - false, + true, // https://github.com/apache/arrow-rs/issues/245 ), ]); @@ -1330,7 +1374,8 @@ mod tests { .iter() .zip(batch.schema().fields()) .for_each(|(array, field)| { - let mut array_levels = batch_level.calculate_array_levels(array, field); + let mut array_levels = + batch_level.calculate_array_levels(array, field, false); levels.append(&mut array_levels); }); assert_eq!(levels.len(), 5); @@ -1408,4 +1453,65 @@ mod tests { let filter = level.filter_array_indices(); assert_eq!(expected, filter); } + + #[test] + fn test_null_vs_nonnull_struct() { + // define schema + let offset_field = Field::new("offset", DataType::Int32, true); + let schema = Schema::new(vec![Field::new( + "some_nested_object", + DataType::Struct(vec![offset_field.clone()]), + false, + )]); + + // create some data + let offset = Int32Array::from(vec![1, 2, 3, 4, 5]); + + let some_nested_object = + StructArray::from(vec![(offset_field, Arc::new(offset) as ArrayRef)]); + + // build a record batch + let batch = + RecordBatch::try_new(Arc::new(schema), vec![Arc::new(some_nested_object)]) + .unwrap(); + + let batch_level = LevelInfo::new_from_batch(&batch); + let struct_null_level = batch_level.calculate_array_levels( + batch.column(0), + batch.schema().field(0), + false, + ); + + // create second batch + // define schema + let offset_field = Field::new("offset", DataType::Int32, true); + let schema = Schema::new(vec![Field::new( + "some_nested_object", + DataType::Struct(vec![offset_field.clone()]), + true, + )]); + + // create some data + let offset = Int32Array::from(vec![1, 2, 3, 4, 5]); + + let some_nested_object = + StructArray::from(vec![(offset_field, Arc::new(offset) as ArrayRef)]); + + // build a record batch + let batch = + RecordBatch::try_new(Arc::new(schema), vec![Arc::new(some_nested_object)]) + .unwrap(); + + let batch_level = LevelInfo::new_from_batch(&batch); + let struct_non_null_level = batch_level.calculate_array_levels( + batch.column(0), + batch.schema().field(0), + false, + ); + + // The 2 levels should not be the same + if struct_non_null_level == struct_null_level { + panic!("Levels should not be equal, to reflect the difference in struct nullness"); + } + } }