From a22495c027f056825df1111785911162aa82d854 Mon Sep 17 00:00:00 2001 From: kikkon Date: Sun, 10 Mar 2024 21:30:40 +0800 Subject: [PATCH 01/10] Add DataType::ListView and DataType::LargeListView --- arrow-data/src/data.rs | 13 ++ arrow-data/src/equal/mod.rs | 3 + arrow-data/src/transform/mod.rs | 9 ++ arrow-integration-test/src/datatype.rs | 3 + arrow-ipc/src/convert.rs | 1 + arrow-schema/src/datatype.rs | 26 +++- arrow-schema/src/field.rs | 2 + parquet/src/arrow/schema/mod.rs | 190 ++++++++----------------- parquet/src/compression.rs | 39 ++--- 9 files changed, 125 insertions(+), 161 deletions(-) diff --git a/arrow-data/src/data.rs b/arrow-data/src/data.rs index 2ddc2d845b0..0b207218c76 100644 --- a/arrow-data/src/data.rs +++ b/arrow-data/src/data.rs @@ -119,12 +119,22 @@ pub(crate) fn new_buffers(data_type: &DataType, capacity: usize) -> [MutableBuff buffer.push(0i32); [buffer, empty_buffer] } + DataType::ListView(_) => { + let mut buffer = MutableBuffer::new((1 + capacity) * mem::size_of::()); + buffer.push(0i32); + [buffer, empty_buffer] + } DataType::LargeList(_) => { // offset buffer always starts with a zero let mut buffer = MutableBuffer::new((1 + capacity) * mem::size_of::()); buffer.push(0i64); [buffer, empty_buffer] } + DataType(_) => { + let mut buffer = MutableBuffer::new((1 + capacity) * mem::size_of::()); + buffer.push(0i64); + [buffer, empty_buffer] + } DataType::FixedSizeBinary(size) => { [MutableBuffer::new(capacity * *size as usize), empty_buffer] } @@ -1550,6 +1560,9 @@ pub fn layout(data_type: &DataType) -> DataTypeLayout { } DataType::FixedSizeList(_, _) => DataTypeLayout::new_empty(), // all in child data DataType::List(_) => DataTypeLayout::new_fixed_width::(), + DataType::ListView(_) | DataType::LargeListView(_) => { + unimplemented!("ListView/LargeListView not implemented") + } DataType::LargeList(_) => DataTypeLayout::new_fixed_width::(), DataType::Map(_, _) => DataTypeLayout::new_fixed_width::(), DataType::Struct(_) => DataTypeLayout::new_empty(), // all in child data, diff --git a/arrow-data/src/equal/mod.rs b/arrow-data/src/equal/mod.rs index 1255ff39e09..0987fd4c563 100644 --- a/arrow-data/src/equal/mod.rs +++ b/arrow-data/src/equal/mod.rs @@ -100,6 +100,9 @@ fn equal_values( unimplemented!("BinaryView/Utf8View not yet implemented") } DataType::List(_) => list_equal::(lhs, rhs, lhs_start, rhs_start, len), + DataType::ListView(_) | DataType::LargeListView(_) => { + unimplemented!("ListView/LargeListView not yet implemented") + } DataType::LargeList(_) => list_equal::(lhs, rhs, lhs_start, rhs_start, len), DataType::FixedSizeList(_, _) => fixed_list_equal(lhs, rhs, lhs_start, rhs_start, len), DataType::Struct(_) => struct_equal(lhs, rhs, lhs_start, rhs_start, len), diff --git a/arrow-data/src/transform/mod.rs b/arrow-data/src/transform/mod.rs index ef53efac237..b14f6e77103 100644 --- a/arrow-data/src/transform/mod.rs +++ b/arrow-data/src/transform/mod.rs @@ -228,6 +228,9 @@ fn build_extend(array: &ArrayData) -> Extend { unimplemented!("BinaryView/Utf8View not implemented") } DataType::Map(_, _) | DataType::List(_) => list::build_extend::(array), + DataType::ListView(_) | DataType::LargeListView(_) => { + unimplemented!("ListView/LargeListView not implemented") + } DataType::LargeList(_) => list::build_extend::(array), DataType::Dictionary(_, _) => unreachable!("should use build_extend_dictionary"), DataType::Struct(_) => structure::build_extend(array), @@ -273,6 +276,9 @@ fn build_extend_nulls(data_type: &DataType) -> ExtendNulls { unimplemented!("BinaryView/Utf8View not implemented") } DataType::Map(_, _) | DataType::List(_) => list::extend_nulls::, + DataType::ListView(_) | DataType::LargeListView(_) => { + unimplemented!("ListView/LargeListView not implemented") + } DataType::LargeList(_) => list::extend_nulls::, DataType::Dictionary(child_data_type, _) => match child_data_type.as_ref() { DataType::UInt8 => primitive::extend_nulls::, @@ -428,6 +434,9 @@ impl<'a> MutableArrayData<'a> { DataType::BinaryView | DataType::Utf8View => { unimplemented!("BinaryView/Utf8View not implemented") } + DataType::ListView(_) | DataType::LargeListView(_) => { + unimplemented!("ListView/LargeListView not implemented") + } DataType::Map(_, _) | DataType::List(_) | DataType::LargeList(_) => { let children = arrays .iter() diff --git a/arrow-integration-test/src/datatype.rs b/arrow-integration-test/src/datatype.rs index a04db1cf353..e45e94c24e0 100644 --- a/arrow-integration-test/src/datatype.rs +++ b/arrow-integration-test/src/datatype.rs @@ -281,6 +281,9 @@ pub fn data_type_to_json(data_type: &DataType) -> serde_json::Value { DataType::Union(_, _) => json!({"name": "union"}), DataType::List(_) => json!({ "name": "list"}), DataType::LargeList(_) => json!({ "name": "largelist"}), + DataType::ListView(_) | DataType::LargeListView(_) => { + unimplemented!("ListView/LargeListView not implemented") + } DataType::FixedSizeList(_, length) => { json!({"name":"fixedsizelist", "listSize": length}) } diff --git a/arrow-ipc/src/convert.rs b/arrow-ipc/src/convert.rs index a2ffd438020..a821008d89a 100644 --- a/arrow-ipc/src/convert.rs +++ b/arrow-ipc/src/convert.rs @@ -664,6 +664,7 @@ pub(crate) fn get_fb_field_type<'a>( children: Some(fbb.create_vector(&[child])), } } + ListView(_) | LargeListView(_) => unimplemented!("ListView/LargeListView not implemented"), LargeList(ref list_type) => { let child = build_field(fbb, list_type); FBFieldType { diff --git a/arrow-schema/src/datatype.rs b/arrow-schema/src/datatype.rs index b3d89b011e6..f875abceeb1 100644 --- a/arrow-schema/src/datatype.rs +++ b/arrow-schema/src/datatype.rs @@ -228,12 +228,30 @@ pub enum DataType { /// /// A single List array can store up to [`i32::MAX`] elements in total. List(FieldRef), + + /// (NOT YET FULLY SUPPORTED) A list of some logical data type with variable length. + /// + /// Note this data type is not yet fully supported. Using it with arrow APIs may result in `panic`s. + /// + /// The ListView layout is defined by three buffers: + /// a validity bitmap, an offsets buffer, and an additional sizes buffer. + /// Sizes and offsets have the identical bit width and both 32-bit signed integer options is supported. + ListView(FieldRef), /// A list of some logical data type with fixed length. FixedSizeList(FieldRef, i32), /// A list of some logical data type with variable length and 64-bit offsets. /// /// A single LargeList array can store up to [`i64::MAX`] elements in total. LargeList(FieldRef), + + /// (NOT YET FULLY SUPPORTED) A list of some logical data type with variable length and 64-bit offsets. + /// + /// Note this data type is not yet fully supported. Using it with arrow APIs may result in `panic`s. + /// + /// The LargeListView layout is defined by three buffers: + /// a validity bitmap, an offsets buffer, and an additional sizes buffer. + /// Sizes and offsets have the identical bit width and both 64-bit signed integer options is supported. + LargeListView(FieldRef), /// A nested datatype that contains a number of sub-fields. Struct(Fields), /// A nested datatype that can represent slots of differing types. Components: @@ -536,7 +554,11 @@ impl DataType { DataType::Utf8 | DataType::LargeUtf8 | DataType::Utf8View => None, DataType::Binary | DataType::LargeBinary | DataType::BinaryView => None, DataType::FixedSizeBinary(_) => None, - DataType::List(_) | DataType::LargeList(_) | DataType::Map(_, _) => None, + DataType::List(_) + | DataType::ListView(_) + | DataType::LargeList(_) + | DataType::LargeListView(_) + | DataType::Map(_, _) => None, DataType::FixedSizeList(_, _) => None, DataType::Struct(_) => None, DataType::Union(_, _) => None, @@ -581,8 +603,10 @@ impl DataType { | DataType::Decimal256(_, _) => 0, DataType::Timestamp(_, s) => s.as_ref().map(|s| s.len()).unwrap_or_default(), DataType::List(field) + | DataType::ListView(field) | DataType::FixedSizeList(field, _) | DataType::LargeList(field) + | DataType::LargeListView(field) | DataType::Map(field, _) => field.size(), DataType::Struct(fields) => fields.size(), DataType::Union(fields, _) => fields.size(), diff --git a/arrow-schema/src/field.rs b/arrow-schema/src/field.rs index 70a3e2b21a3..c9120010593 100644 --- a/arrow-schema/src/field.rs +++ b/arrow-schema/src/field.rs @@ -510,7 +510,9 @@ impl Field { | DataType::BinaryView | DataType::Interval(_) | DataType::LargeList(_) + | DataType::LargeListView(_) | DataType::List(_) + | DataType::ListView(_) | DataType::Map(_, _) | DataType::Dictionary(_, _) | DataType::RunEndEncoded(_, _) diff --git a/parquet/src/arrow/schema/mod.rs b/parquet/src/arrow/schema/mod.rs index a8bef98d9e8..300a21c4f13 100644 --- a/parquet/src/arrow/schema/mod.rs +++ b/parquet/src/arrow/schema/mod.rs @@ -32,8 +32,7 @@ use arrow_ipc::writer; use arrow_schema::{DataType, Field, Fields, Schema, TimeUnit}; use crate::basic::{ - ConvertedType, LogicalType, Repetition, TimeUnit as ParquetTimeUnit, - Type as PhysicalType, + ConvertedType, LogicalType, Repetition, TimeUnit as ParquetTimeUnit, Type as PhysicalType, }; use crate::errors::{ParquetError, Result}; use crate::file::{metadata::KeyValue, properties::WriterProperties}; @@ -55,11 +54,7 @@ pub fn parquet_to_arrow_schema( parquet_schema: &SchemaDescriptor, key_value_metadata: Option<&Vec>, ) -> Result { - parquet_to_arrow_schema_by_columns( - parquet_schema, - ProjectionMask::all(), - key_value_metadata, - ) + parquet_to_arrow_schema_by_columns(parquet_schema, ProjectionMask::all(), key_value_metadata) } /// Convert parquet schema to arrow schema including optional metadata, @@ -199,10 +194,7 @@ fn encode_arrow_schema(schema: &Schema) -> String { /// Mutates writer metadata by storing the encoded Arrow schema. /// If there is an existing Arrow schema metadata, it is replaced. -pub(crate) fn add_encoded_arrow_schema_to_metadata( - schema: &Schema, - props: &mut WriterProperties, -) { +pub(crate) fn add_encoded_arrow_schema_to_metadata(schema: &Schema, props: &mut WriterProperties) { let encoded = encode_arrow_schema(schema); let schema_kv = KeyValue { @@ -270,16 +262,15 @@ fn parse_key_value_metadata( /// Convert parquet column schema to arrow field. pub fn parquet_to_arrow_field(parquet_column: &ColumnDescriptor) -> Result { let field = complex::convert_type(&parquet_column.self_type_ptr())?; - let mut ret = Field::new( - parquet_column.name(), - field.arrow_type, - field.nullable, - ); + let mut ret = Field::new(parquet_column.name(), field.arrow_type, field.nullable); let basic_info = parquet_column.self_type().get_basic_info(); if basic_info.has_id() { let mut meta = HashMap::with_capacity(1); - meta.insert(PARQUET_FIELD_ID_META_KEY.to_string(), basic_info.id().to_string()); + meta.insert( + PARQUET_FIELD_ID_META_KEY.to_string(), + basic_info.id().to_string(), + ); ret.set_metadata(meta); } @@ -401,15 +392,9 @@ fn arrow_to_parquet_type(field: &Field) -> Result { is_adjusted_to_u_t_c: matches!(tz, Some(z) if !z.as_ref().is_empty()), unit: match time_unit { TimeUnit::Second => unreachable!(), - TimeUnit::Millisecond => { - ParquetTimeUnit::MILLIS(Default::default()) - } - TimeUnit::Microsecond => { - ParquetTimeUnit::MICROS(Default::default()) - } - TimeUnit::Nanosecond => { - ParquetTimeUnit::NANOS(Default::default()) - } + TimeUnit::Millisecond => ParquetTimeUnit::MILLIS(Default::default()), + TimeUnit::Microsecond => ParquetTimeUnit::MICROS(Default::default()), + TimeUnit::Nanosecond => ParquetTimeUnit::NANOS(Default::default()), }, })) .with_repetition(repetition) @@ -457,9 +442,7 @@ fn arrow_to_parquet_type(field: &Field) -> Result { .with_repetition(repetition) .with_id(id) .build(), - DataType::Duration(_) => { - Err(arrow_err!("Converting Duration to parquet not supported",)) - } + DataType::Duration(_) => Err(arrow_err!("Converting Duration to parquet not supported",)), DataType::Interval(_) => { Type::primitive_type_builder(name, PhysicalType::FIXED_LEN_BYTE_ARRAY) .with_converted_type(ConvertedType::INTERVAL) @@ -481,9 +464,10 @@ fn arrow_to_parquet_type(field: &Field) -> Result { .with_length(*length) .build() } - DataType::BinaryView | DataType::Utf8View => unimplemented!("BinaryView/Utf8View not implemented"), - DataType::Decimal128(precision, scale) - | DataType::Decimal256(precision, scale) => { + DataType::BinaryView | DataType::Utf8View => { + unimplemented!("BinaryView/Utf8View not implemented") + } + DataType::Decimal128(precision, scale) | DataType::Decimal256(precision, scale) => { // Decimal precision determines the Parquet physical type to use. // Following the: https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#decimal let (physical_type, length) = if *precision > 1 && *precision <= 9 { @@ -528,11 +512,12 @@ fn arrow_to_parquet_type(field: &Field) -> Result { .with_id(id) .build() } + DataType::ListView(_) | DataType::LargeListView(_) => { + unimplemented!("ListView/LargeListView not implemented") + } DataType::Struct(fields) => { if fields.is_empty() { - return Err( - arrow_err!("Parquet does not support writing empty structs",), - ); + return Err(arrow_err!("Parquet does not support writing empty structs",)); } // recursively convert children to types/nodes let fields = fields @@ -622,8 +607,7 @@ mod tests { let parquet_group_type = parse_message_type(message_type).unwrap(); let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type)); - let converted_arrow_schema = - parquet_to_arrow_schema(&parquet_schema, None).unwrap(); + let converted_arrow_schema = parquet_to_arrow_schema(&parquet_schema, None).unwrap(); let arrow_fields = Fields::from(vec![ Field::new("boolean", DataType::Boolean, false), @@ -661,8 +645,7 @@ mod tests { let parquet_group_type = parse_message_type(message_type).unwrap(); let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type)); - let converted_arrow_schema = - parquet_to_arrow_schema(&parquet_schema, None).unwrap(); + let converted_arrow_schema = parquet_to_arrow_schema(&parquet_schema, None).unwrap(); let arrow_fields = Fields::from(vec![ Field::new("decimal1", DataType::Decimal128(4, 2), false), @@ -688,8 +671,7 @@ mod tests { let parquet_group_type = parse_message_type(message_type).unwrap(); let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type)); - let converted_arrow_schema = - parquet_to_arrow_schema(&parquet_schema, None).unwrap(); + let converted_arrow_schema = parquet_to_arrow_schema(&parquet_schema, None).unwrap(); let arrow_fields = Fields::from(vec![ Field::new("binary", DataType::Binary, false), @@ -710,8 +692,7 @@ mod tests { let parquet_group_type = parse_message_type(message_type).unwrap(); let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type)); - let converted_arrow_schema = - parquet_to_arrow_schema(&parquet_schema, None).unwrap(); + let converted_arrow_schema = parquet_to_arrow_schema(&parquet_schema, None).unwrap(); let arrow_fields = Fields::from(vec![ Field::new("boolean", DataType::Boolean, false), @@ -719,12 +700,9 @@ mod tests { ]); assert_eq!(&arrow_fields, converted_arrow_schema.fields()); - let converted_arrow_schema = parquet_to_arrow_schema_by_columns( - &parquet_schema, - ProjectionMask::all(), - None, - ) - .unwrap(); + let converted_arrow_schema = + parquet_to_arrow_schema_by_columns(&parquet_schema, ProjectionMask::all(), None) + .unwrap(); assert_eq!(&arrow_fields, converted_arrow_schema.fields()); } @@ -922,8 +900,7 @@ mod tests { let parquet_group_type = parse_message_type(message_type).unwrap(); let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type)); - let converted_arrow_schema = - parquet_to_arrow_schema(&parquet_schema, None).unwrap(); + let converted_arrow_schema = parquet_to_arrow_schema(&parquet_schema, None).unwrap(); let converted_fields = converted_arrow_schema.fields(); assert_eq!(arrow_fields.len(), converted_fields.len()); @@ -1001,8 +978,7 @@ mod tests { let parquet_group_type = parse_message_type(message_type).unwrap(); let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type)); - let converted_arrow_schema = - parquet_to_arrow_schema(&parquet_schema, None).unwrap(); + let converted_arrow_schema = parquet_to_arrow_schema(&parquet_schema, None).unwrap(); let converted_fields = converted_arrow_schema.fields(); assert_eq!(arrow_fields.len(), converted_fields.len()); @@ -1096,8 +1072,7 @@ mod tests { let parquet_group_type = parse_message_type(message_type).unwrap(); let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type)); - let converted_arrow_schema = - parquet_to_arrow_schema(&parquet_schema, None).unwrap(); + let converted_arrow_schema = parquet_to_arrow_schema(&parquet_schema, None).unwrap(); let converted_fields = converted_arrow_schema.fields(); assert_eq!(arrow_fields.len(), converted_fields.len()); @@ -1114,8 +1089,7 @@ mod tests { Field::new("leaf1", DataType::Boolean, false), Field::new("leaf2", DataType::Int32, false), ]); - let group1_struct = - Field::new("group1", DataType::Struct(group1_fields), false); + let group1_struct = Field::new("group1", DataType::Struct(group1_fields), false); arrow_fields.push(group1_struct); let leaf3_field = Field::new("leaf3", DataType::Int64, false); @@ -1134,8 +1108,7 @@ mod tests { let parquet_group_type = parse_message_type(message_type).unwrap(); let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type)); - let converted_arrow_schema = - parquet_to_arrow_schema(&parquet_schema, None).unwrap(); + let converted_arrow_schema = parquet_to_arrow_schema(&parquet_schema, None).unwrap(); let converted_fields = converted_arrow_schema.fields(); assert_eq!(arrow_fields.len(), converted_fields.len()); @@ -1288,8 +1261,7 @@ mod tests { let parquet_group_type = parse_message_type(message_type).unwrap(); let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type)); - let converted_arrow_schema = - parquet_to_arrow_schema(&parquet_schema, None).unwrap(); + let converted_arrow_schema = parquet_to_arrow_schema(&parquet_schema, None).unwrap(); let converted_fields = converted_arrow_schema.fields(); assert_eq!(arrow_fields.len(), converted_fields.len()); @@ -1514,20 +1486,11 @@ mod tests { vec![ Field::new("bools", DataType::Boolean, false), Field::new("uint32", DataType::UInt32, false), - Field::new_list( - "int32", - Field::new("element", DataType::Int32, true), - false, - ), + Field::new_list("int32", Field::new("element", DataType::Int32, true), false), ], false, ), - Field::new_dictionary( - "dictionary_strings", - DataType::Int32, - DataType::Utf8, - false, - ), + Field::new_dictionary("dictionary_strings", DataType::Int32, DataType::Utf8, false), Field::new("decimal_int32", DataType::Decimal128(8, 2), false), Field::new("decimal_int64", DataType::Decimal128(16, 2), false), Field::new("decimal_fix_length", DataType::Decimal128(30, 2), false), @@ -1612,10 +1575,8 @@ mod tests { let schema = Schema::new_with_metadata( vec![ - Field::new("c1", DataType::Utf8, false).with_metadata(meta(&[ - ("Key", "Foo"), - (PARQUET_FIELD_ID_META_KEY, "2"), - ])), + Field::new("c1", DataType::Utf8, false) + .with_metadata(meta(&[("Key", "Foo"), (PARQUET_FIELD_ID_META_KEY, "2")])), Field::new("c2", DataType::Binary, false), Field::new("c3", DataType::FixedSizeBinary(3), false), Field::new("c4", DataType::Boolean, false), @@ -1633,10 +1594,7 @@ mod tests { ), Field::new( "c17", - DataType::Timestamp( - TimeUnit::Microsecond, - Some("Africa/Johannesburg".into()), - ), + DataType::Timestamp(TimeUnit::Microsecond, Some("Africa/Johannesburg".into())), false, ), Field::new( @@ -1648,10 +1606,8 @@ mod tests { Field::new("c20", DataType::Interval(IntervalUnit::YearMonth), false), Field::new_list( "c21", - Field::new("item", DataType::Boolean, true).with_metadata(meta(&[ - ("Key", "Bar"), - (PARQUET_FIELD_ID_META_KEY, "5"), - ])), + Field::new("item", DataType::Boolean, true) + .with_metadata(meta(&[("Key", "Bar"), (PARQUET_FIELD_ID_META_KEY, "5")])), false, ) .with_metadata(meta(&[(PARQUET_FIELD_ID_META_KEY, "4")])), @@ -1701,10 +1657,7 @@ mod tests { // Field::new("c30", DataType::Duration(TimeUnit::Nanosecond), false), Field::new_dict( "c31", - DataType::Dictionary( - Box::new(DataType::Int32), - Box::new(DataType::Utf8), - ), + DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)), true, 123, true, @@ -1739,11 +1692,7 @@ mod tests { "c39", "key_value", Field::new("key", DataType::Utf8, false), - Field::new_list( - "value", - Field::new("element", DataType::Utf8, true), - true, - ), + Field::new_list("value", Field::new("element", DataType::Utf8, true), true), false, // fails to roundtrip keys_sorted true, ), @@ -1782,11 +1731,8 @@ mod tests { // write to an empty parquet file so that schema is serialized let file = tempfile::tempfile().unwrap(); - let writer = ArrowWriter::try_new( - file.try_clone().unwrap(), - Arc::new(schema.clone()), - None, - )?; + let writer = + ArrowWriter::try_new(file.try_clone().unwrap(), Arc::new(schema.clone()), None)?; writer.close()?; // read file back @@ -1845,33 +1791,23 @@ mod tests { }; let schema = Schema::new_with_metadata( vec![ - Field::new("c1", DataType::Utf8, true).with_metadata(meta(&[ - (PARQUET_FIELD_ID_META_KEY, "1"), - ])), - Field::new("c2", DataType::Utf8, true).with_metadata(meta(&[ - (PARQUET_FIELD_ID_META_KEY, "2"), - ])), + Field::new("c1", DataType::Utf8, true) + .with_metadata(meta(&[(PARQUET_FIELD_ID_META_KEY, "1")])), + Field::new("c2", DataType::Utf8, true) + .with_metadata(meta(&[(PARQUET_FIELD_ID_META_KEY, "2")])), ], HashMap::new(), ); - let writer = ArrowWriter::try_new( - vec![], - Arc::new(schema.clone()), - None, - )?; + let writer = ArrowWriter::try_new(vec![], Arc::new(schema.clone()), None)?; let parquet_bytes = writer.into_inner()?; - let reader = crate::file::reader::SerializedFileReader::new( - bytes::Bytes::from(parquet_bytes), - )?; + let reader = + crate::file::reader::SerializedFileReader::new(bytes::Bytes::from(parquet_bytes))?; let schema_descriptor = reader.metadata().file_metadata().schema_descr_ptr(); // don't pass metadata so field ids are read from Parquet and not from serialized Arrow schema - let arrow_schema = crate::arrow::parquet_to_arrow_schema( - &schema_descriptor, - None, - )?; + let arrow_schema = crate::arrow::parquet_to_arrow_schema(&schema_descriptor, None)?; let parq_schema_descr = crate::arrow::arrow_to_parquet_schema(&arrow_schema)?; let parq_fields = parq_schema_descr.root_schema().get_fields(); @@ -1884,19 +1820,14 @@ mod tests { #[test] fn test_arrow_schema_roundtrip_lists() -> Result<()> { - let metadata: HashMap = - [("Key".to_string(), "Value".to_string())] - .iter() - .cloned() - .collect(); + let metadata: HashMap = [("Key".to_string(), "Value".to_string())] + .iter() + .cloned() + .collect(); let schema = Schema::new_with_metadata( vec![ - Field::new_list( - "c21", - Field::new("array", DataType::Boolean, true), - false, - ), + Field::new_list("c21", Field::new("array", DataType::Boolean, true), false), Field::new( "c22", DataType::FixedSizeList( @@ -1927,11 +1858,8 @@ mod tests { // write to an empty parquet file so that schema is serialized let file = tempfile::tempfile().unwrap(); - let writer = ArrowWriter::try_new( - file.try_clone().unwrap(), - Arc::new(schema.clone()), - None, - )?; + let writer = + ArrowWriter::try_new(file.try_clone().unwrap(), Arc::new(schema.clone()), None)?; writer.close()?; // read file back diff --git a/parquet/src/compression.rs b/parquet/src/compression.rs index a9a1afbbf21..cae00da3b24 100644 --- a/parquet/src/compression.rs +++ b/parquet/src/compression.rs @@ -144,10 +144,7 @@ pub(crate) trait CompressionLevel { /// Given the compression type `codec`, returns a codec used to compress and decompress /// bytes for the compression type. /// This returns `None` if the codec type is `UNCOMPRESSED`. -pub fn create_codec( - codec: CodecType, - _options: &CodecOptions, -) -> Result>> { +pub fn create_codec(codec: CodecType, _options: &CodecOptions) -> Result>> { match codec { #[cfg(any(feature = "brotli", test))] CodecType::BROTLI(level) => Ok(Some(Box::new(BrotliCodec::new(level)))), @@ -260,8 +257,7 @@ mod gzip_codec { } fn compress(&mut self, input_buf: &[u8], output_buf: &mut Vec) -> Result<()> { - let mut encoder = - write::GzEncoder::new(output_buf, Compression::new(self.level.0)); + let mut encoder = write::GzEncoder::new(output_buf, Compression::new(self.level.0)); encoder.write_all(input_buf)?; encoder.try_finish().map_err(|e| e.into()) } @@ -619,10 +615,7 @@ mod lz4_hadoop_codec { /// Adapted from pola-rs [compression.rs:try_decompress_hadoop](https://pola-rs.github.io/polars/src/parquet2/compression.rs.html#225) /// Translated from the apache arrow c++ function [TryDecompressHadoop](https://github.com/apache/arrow/blob/bf18e6e4b5bb6180706b1ba0d597a65a4ce5ca48/cpp/src/arrow/util/compression_lz4.cc#L474). /// Returns error if decompression failed. - fn try_decompress_hadoop( - input_buf: &[u8], - output_buf: &mut [u8], - ) -> io::Result { + fn try_decompress_hadoop(input_buf: &[u8], output_buf: &mut [u8]) -> io::Result { // Parquet files written with the Hadoop Lz4Codec use their own framing. // The input buffer can contain an arbitrary number of "frames", each // with the following structure: @@ -660,11 +653,9 @@ mod lz4_hadoop_codec { "Not enough bytes to hold advertised output", )); } - let decompressed_size = lz4_flex::decompress_into( - &input[..expected_compressed_size as usize], - output, - ) - .map_err(|e| ParquetError::External(Box::new(e)))?; + let decompressed_size = + lz4_flex::decompress_into(&input[..expected_compressed_size as usize], output) + .map_err(|e| ParquetError::External(Box::new(e)))?; if decompressed_size != expected_decompressed_size as usize { return Err(io::Error::new( io::ErrorKind::Other, @@ -712,8 +703,7 @@ mod lz4_hadoop_codec { Ok(n) => { if n != required_len { return Err(ParquetError::General( - "LZ4HadoopCodec uncompress_size is not the expected one" - .into(), + "LZ4HadoopCodec uncompress_size is not the expected one".into(), )); } Ok(n) @@ -724,20 +714,12 @@ mod lz4_hadoop_codec { Err(_) => { // Truncate any inserted element before tryingg next algorithm. output_buf.truncate(output_len); - match LZ4Codec::new().decompress( - input_buf, - output_buf, - uncompress_size, - ) { + match LZ4Codec::new().decompress(input_buf, output_buf, uncompress_size) { Ok(n) => Ok(n), Err(_) => { // Truncate any inserted element before tryingg next algorithm. output_buf.truncate(output_len); - LZ4RawCodec::new().decompress( - input_buf, - output_buf, - uncompress_size, - ) + LZ4RawCodec::new().decompress(input_buf, output_buf, uncompress_size) } } } @@ -759,8 +741,7 @@ mod lz4_hadoop_codec { let compressed_size = compressed_size as u32; let uncompressed_size = input_buf.len() as u32; output_buf[..SIZE_U32].copy_from_slice(&uncompressed_size.to_be_bytes()); - output_buf[SIZE_U32..PREFIX_LEN] - .copy_from_slice(&compressed_size.to_be_bytes()); + output_buf[SIZE_U32..PREFIX_LEN].copy_from_slice(&compressed_size.to_be_bytes()); Ok(()) } From 79bbdcd203255a55c26b4f369f86226b9009d7aa Mon Sep 17 00:00:00 2001 From: kikkon Date: Sun, 10 Mar 2024 21:52:25 +0800 Subject: [PATCH 02/10] revert some file to main --- parquet/src/compression.rs | 39 ++++++++++++++++++++++++++++---------- 1 file changed, 29 insertions(+), 10 deletions(-) diff --git a/parquet/src/compression.rs b/parquet/src/compression.rs index cae00da3b24..a9a1afbbf21 100644 --- a/parquet/src/compression.rs +++ b/parquet/src/compression.rs @@ -144,7 +144,10 @@ pub(crate) trait CompressionLevel { /// Given the compression type `codec`, returns a codec used to compress and decompress /// bytes for the compression type. /// This returns `None` if the codec type is `UNCOMPRESSED`. -pub fn create_codec(codec: CodecType, _options: &CodecOptions) -> Result>> { +pub fn create_codec( + codec: CodecType, + _options: &CodecOptions, +) -> Result>> { match codec { #[cfg(any(feature = "brotli", test))] CodecType::BROTLI(level) => Ok(Some(Box::new(BrotliCodec::new(level)))), @@ -257,7 +260,8 @@ mod gzip_codec { } fn compress(&mut self, input_buf: &[u8], output_buf: &mut Vec) -> Result<()> { - let mut encoder = write::GzEncoder::new(output_buf, Compression::new(self.level.0)); + let mut encoder = + write::GzEncoder::new(output_buf, Compression::new(self.level.0)); encoder.write_all(input_buf)?; encoder.try_finish().map_err(|e| e.into()) } @@ -615,7 +619,10 @@ mod lz4_hadoop_codec { /// Adapted from pola-rs [compression.rs:try_decompress_hadoop](https://pola-rs.github.io/polars/src/parquet2/compression.rs.html#225) /// Translated from the apache arrow c++ function [TryDecompressHadoop](https://github.com/apache/arrow/blob/bf18e6e4b5bb6180706b1ba0d597a65a4ce5ca48/cpp/src/arrow/util/compression_lz4.cc#L474). /// Returns error if decompression failed. - fn try_decompress_hadoop(input_buf: &[u8], output_buf: &mut [u8]) -> io::Result { + fn try_decompress_hadoop( + input_buf: &[u8], + output_buf: &mut [u8], + ) -> io::Result { // Parquet files written with the Hadoop Lz4Codec use their own framing. // The input buffer can contain an arbitrary number of "frames", each // with the following structure: @@ -653,9 +660,11 @@ mod lz4_hadoop_codec { "Not enough bytes to hold advertised output", )); } - let decompressed_size = - lz4_flex::decompress_into(&input[..expected_compressed_size as usize], output) - .map_err(|e| ParquetError::External(Box::new(e)))?; + let decompressed_size = lz4_flex::decompress_into( + &input[..expected_compressed_size as usize], + output, + ) + .map_err(|e| ParquetError::External(Box::new(e)))?; if decompressed_size != expected_decompressed_size as usize { return Err(io::Error::new( io::ErrorKind::Other, @@ -703,7 +712,8 @@ mod lz4_hadoop_codec { Ok(n) => { if n != required_len { return Err(ParquetError::General( - "LZ4HadoopCodec uncompress_size is not the expected one".into(), + "LZ4HadoopCodec uncompress_size is not the expected one" + .into(), )); } Ok(n) @@ -714,12 +724,20 @@ mod lz4_hadoop_codec { Err(_) => { // Truncate any inserted element before tryingg next algorithm. output_buf.truncate(output_len); - match LZ4Codec::new().decompress(input_buf, output_buf, uncompress_size) { + match LZ4Codec::new().decompress( + input_buf, + output_buf, + uncompress_size, + ) { Ok(n) => Ok(n), Err(_) => { // Truncate any inserted element before tryingg next algorithm. output_buf.truncate(output_len); - LZ4RawCodec::new().decompress(input_buf, output_buf, uncompress_size) + LZ4RawCodec::new().decompress( + input_buf, + output_buf, + uncompress_size, + ) } } } @@ -741,7 +759,8 @@ mod lz4_hadoop_codec { let compressed_size = compressed_size as u32; let uncompressed_size = input_buf.len() as u32; output_buf[..SIZE_U32].copy_from_slice(&uncompressed_size.to_be_bytes()); - output_buf[SIZE_U32..PREFIX_LEN].copy_from_slice(&compressed_size.to_be_bytes()); + output_buf[SIZE_U32..PREFIX_LEN] + .copy_from_slice(&compressed_size.to_be_bytes()); Ok(()) } From 70c9a2ffb3c866d30851b55e6625b6e10176864d Mon Sep 17 00:00:00 2001 From: kikkon Date: Sun, 10 Mar 2024 21:58:32 +0800 Subject: [PATCH 03/10] revert some file to main --- arrow-data/src/data.rs | 2 +- parquet/src/arrow/schema/mod.rs | 203 ++++++++++++++++++++++---------- 2 files changed, 139 insertions(+), 66 deletions(-) diff --git a/arrow-data/src/data.rs b/arrow-data/src/data.rs index 0b207218c76..8f024b2da78 100644 --- a/arrow-data/src/data.rs +++ b/arrow-data/src/data.rs @@ -130,7 +130,7 @@ pub(crate) fn new_buffers(data_type: &DataType, capacity: usize) -> [MutableBuff buffer.push(0i64); [buffer, empty_buffer] } - DataType(_) => { + DataType::LargeListView(_) => { let mut buffer = MutableBuffer::new((1 + capacity) * mem::size_of::()); buffer.push(0i64); [buffer, empty_buffer] diff --git a/parquet/src/arrow/schema/mod.rs b/parquet/src/arrow/schema/mod.rs index 300a21c4f13..04f631f4944 100644 --- a/parquet/src/arrow/schema/mod.rs +++ b/parquet/src/arrow/schema/mod.rs @@ -32,7 +32,8 @@ use arrow_ipc::writer; use arrow_schema::{DataType, Field, Fields, Schema, TimeUnit}; use crate::basic::{ - ConvertedType, LogicalType, Repetition, TimeUnit as ParquetTimeUnit, Type as PhysicalType, + ConvertedType, LogicalType, Repetition, TimeUnit as ParquetTimeUnit, + Type as PhysicalType, }; use crate::errors::{ParquetError, Result}; use crate::file::{metadata::KeyValue, properties::WriterProperties}; @@ -54,7 +55,11 @@ pub fn parquet_to_arrow_schema( parquet_schema: &SchemaDescriptor, key_value_metadata: Option<&Vec>, ) -> Result { - parquet_to_arrow_schema_by_columns(parquet_schema, ProjectionMask::all(), key_value_metadata) + parquet_to_arrow_schema_by_columns( + parquet_schema, + ProjectionMask::all(), + key_value_metadata, + ) } /// Convert parquet schema to arrow schema including optional metadata, @@ -194,7 +199,10 @@ fn encode_arrow_schema(schema: &Schema) -> String { /// Mutates writer metadata by storing the encoded Arrow schema. /// If there is an existing Arrow schema metadata, it is replaced. -pub(crate) fn add_encoded_arrow_schema_to_metadata(schema: &Schema, props: &mut WriterProperties) { +pub(crate) fn add_encoded_arrow_schema_to_metadata( + schema: &Schema, + props: &mut WriterProperties, +) { let encoded = encode_arrow_schema(schema); let schema_kv = KeyValue { @@ -262,15 +270,16 @@ fn parse_key_value_metadata( /// Convert parquet column schema to arrow field. pub fn parquet_to_arrow_field(parquet_column: &ColumnDescriptor) -> Result { let field = complex::convert_type(&parquet_column.self_type_ptr())?; - let mut ret = Field::new(parquet_column.name(), field.arrow_type, field.nullable); + let mut ret = Field::new( + parquet_column.name(), + field.arrow_type, + field.nullable, + ); let basic_info = parquet_column.self_type().get_basic_info(); if basic_info.has_id() { let mut meta = HashMap::with_capacity(1); - meta.insert( - PARQUET_FIELD_ID_META_KEY.to_string(), - basic_info.id().to_string(), - ); + meta.insert(PARQUET_FIELD_ID_META_KEY.to_string(), basic_info.id().to_string()); ret.set_metadata(meta); } @@ -392,9 +401,15 @@ fn arrow_to_parquet_type(field: &Field) -> Result { is_adjusted_to_u_t_c: matches!(tz, Some(z) if !z.as_ref().is_empty()), unit: match time_unit { TimeUnit::Second => unreachable!(), - TimeUnit::Millisecond => ParquetTimeUnit::MILLIS(Default::default()), - TimeUnit::Microsecond => ParquetTimeUnit::MICROS(Default::default()), - TimeUnit::Nanosecond => ParquetTimeUnit::NANOS(Default::default()), + TimeUnit::Millisecond => { + ParquetTimeUnit::MILLIS(Default::default()) + } + TimeUnit::Microsecond => { + ParquetTimeUnit::MICROS(Default::default()) + } + TimeUnit::Nanosecond => { + ParquetTimeUnit::NANOS(Default::default()) + } }, })) .with_repetition(repetition) @@ -442,7 +457,9 @@ fn arrow_to_parquet_type(field: &Field) -> Result { .with_repetition(repetition) .with_id(id) .build(), - DataType::Duration(_) => Err(arrow_err!("Converting Duration to parquet not supported",)), + DataType::Duration(_) => { + Err(arrow_err!("Converting Duration to parquet not supported",)) + } DataType::Interval(_) => { Type::primitive_type_builder(name, PhysicalType::FIXED_LEN_BYTE_ARRAY) .with_converted_type(ConvertedType::INTERVAL) @@ -464,10 +481,9 @@ fn arrow_to_parquet_type(field: &Field) -> Result { .with_length(*length) .build() } - DataType::BinaryView | DataType::Utf8View => { - unimplemented!("BinaryView/Utf8View not implemented") - } - DataType::Decimal128(precision, scale) | DataType::Decimal256(precision, scale) => { + DataType::BinaryView | DataType::Utf8View => unimplemented!("BinaryView/Utf8View not implemented"), + DataType::Decimal128(precision, scale) + | DataType::Decimal256(precision, scale) => { // Decimal precision determines the Parquet physical type to use. // Following the: https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#decimal let (physical_type, length) = if *precision > 1 && *precision <= 9 { @@ -512,12 +528,12 @@ fn arrow_to_parquet_type(field: &Field) -> Result { .with_id(id) .build() } - DataType::ListView(_) | DataType::LargeListView(_) => { - unimplemented!("ListView/LargeListView not implemented") - } + DataType::ListView(_) | DataType::LargeListView(_) => unimplemented!("ListView/LargeListView not implemented"), DataType::Struct(fields) => { if fields.is_empty() { - return Err(arrow_err!("Parquet does not support writing empty structs",)); + return Err( + arrow_err!("Parquet does not support writing empty structs",), + ); } // recursively convert children to types/nodes let fields = fields @@ -607,7 +623,8 @@ mod tests { let parquet_group_type = parse_message_type(message_type).unwrap(); let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type)); - let converted_arrow_schema = parquet_to_arrow_schema(&parquet_schema, None).unwrap(); + let converted_arrow_schema = + parquet_to_arrow_schema(&parquet_schema, None).unwrap(); let arrow_fields = Fields::from(vec![ Field::new("boolean", DataType::Boolean, false), @@ -645,7 +662,8 @@ mod tests { let parquet_group_type = parse_message_type(message_type).unwrap(); let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type)); - let converted_arrow_schema = parquet_to_arrow_schema(&parquet_schema, None).unwrap(); + let converted_arrow_schema = + parquet_to_arrow_schema(&parquet_schema, None).unwrap(); let arrow_fields = Fields::from(vec![ Field::new("decimal1", DataType::Decimal128(4, 2), false), @@ -671,7 +689,8 @@ mod tests { let parquet_group_type = parse_message_type(message_type).unwrap(); let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type)); - let converted_arrow_schema = parquet_to_arrow_schema(&parquet_schema, None).unwrap(); + let converted_arrow_schema = + parquet_to_arrow_schema(&parquet_schema, None).unwrap(); let arrow_fields = Fields::from(vec![ Field::new("binary", DataType::Binary, false), @@ -692,7 +711,8 @@ mod tests { let parquet_group_type = parse_message_type(message_type).unwrap(); let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type)); - let converted_arrow_schema = parquet_to_arrow_schema(&parquet_schema, None).unwrap(); + let converted_arrow_schema = + parquet_to_arrow_schema(&parquet_schema, None).unwrap(); let arrow_fields = Fields::from(vec![ Field::new("boolean", DataType::Boolean, false), @@ -700,9 +720,12 @@ mod tests { ]); assert_eq!(&arrow_fields, converted_arrow_schema.fields()); - let converted_arrow_schema = - parquet_to_arrow_schema_by_columns(&parquet_schema, ProjectionMask::all(), None) - .unwrap(); + let converted_arrow_schema = parquet_to_arrow_schema_by_columns( + &parquet_schema, + ProjectionMask::all(), + None, + ) + .unwrap(); assert_eq!(&arrow_fields, converted_arrow_schema.fields()); } @@ -900,7 +923,8 @@ mod tests { let parquet_group_type = parse_message_type(message_type).unwrap(); let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type)); - let converted_arrow_schema = parquet_to_arrow_schema(&parquet_schema, None).unwrap(); + let converted_arrow_schema = + parquet_to_arrow_schema(&parquet_schema, None).unwrap(); let converted_fields = converted_arrow_schema.fields(); assert_eq!(arrow_fields.len(), converted_fields.len()); @@ -978,7 +1002,8 @@ mod tests { let parquet_group_type = parse_message_type(message_type).unwrap(); let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type)); - let converted_arrow_schema = parquet_to_arrow_schema(&parquet_schema, None).unwrap(); + let converted_arrow_schema = + parquet_to_arrow_schema(&parquet_schema, None).unwrap(); let converted_fields = converted_arrow_schema.fields(); assert_eq!(arrow_fields.len(), converted_fields.len()); @@ -1072,7 +1097,8 @@ mod tests { let parquet_group_type = parse_message_type(message_type).unwrap(); let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type)); - let converted_arrow_schema = parquet_to_arrow_schema(&parquet_schema, None).unwrap(); + let converted_arrow_schema = + parquet_to_arrow_schema(&parquet_schema, None).unwrap(); let converted_fields = converted_arrow_schema.fields(); assert_eq!(arrow_fields.len(), converted_fields.len()); @@ -1089,7 +1115,8 @@ mod tests { Field::new("leaf1", DataType::Boolean, false), Field::new("leaf2", DataType::Int32, false), ]); - let group1_struct = Field::new("group1", DataType::Struct(group1_fields), false); + let group1_struct = + Field::new("group1", DataType::Struct(group1_fields), false); arrow_fields.push(group1_struct); let leaf3_field = Field::new("leaf3", DataType::Int64, false); @@ -1108,7 +1135,8 @@ mod tests { let parquet_group_type = parse_message_type(message_type).unwrap(); let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type)); - let converted_arrow_schema = parquet_to_arrow_schema(&parquet_schema, None).unwrap(); + let converted_arrow_schema = + parquet_to_arrow_schema(&parquet_schema, None).unwrap(); let converted_fields = converted_arrow_schema.fields(); assert_eq!(arrow_fields.len(), converted_fields.len()); @@ -1261,7 +1289,8 @@ mod tests { let parquet_group_type = parse_message_type(message_type).unwrap(); let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type)); - let converted_arrow_schema = parquet_to_arrow_schema(&parquet_schema, None).unwrap(); + let converted_arrow_schema = + parquet_to_arrow_schema(&parquet_schema, None).unwrap(); let converted_fields = converted_arrow_schema.fields(); assert_eq!(arrow_fields.len(), converted_fields.len()); @@ -1486,11 +1515,20 @@ mod tests { vec![ Field::new("bools", DataType::Boolean, false), Field::new("uint32", DataType::UInt32, false), - Field::new_list("int32", Field::new("element", DataType::Int32, true), false), + Field::new_list( + "int32", + Field::new("element", DataType::Int32, true), + false, + ), ], false, ), - Field::new_dictionary("dictionary_strings", DataType::Int32, DataType::Utf8, false), + Field::new_dictionary( + "dictionary_strings", + DataType::Int32, + DataType::Utf8, + false, + ), Field::new("decimal_int32", DataType::Decimal128(8, 2), false), Field::new("decimal_int64", DataType::Decimal128(16, 2), false), Field::new("decimal_fix_length", DataType::Decimal128(30, 2), false), @@ -1575,8 +1613,10 @@ mod tests { let schema = Schema::new_with_metadata( vec![ - Field::new("c1", DataType::Utf8, false) - .with_metadata(meta(&[("Key", "Foo"), (PARQUET_FIELD_ID_META_KEY, "2")])), + Field::new("c1", DataType::Utf8, false).with_metadata(meta(&[ + ("Key", "Foo"), + (PARQUET_FIELD_ID_META_KEY, "2"), + ])), Field::new("c2", DataType::Binary, false), Field::new("c3", DataType::FixedSizeBinary(3), false), Field::new("c4", DataType::Boolean, false), @@ -1594,7 +1634,10 @@ mod tests { ), Field::new( "c17", - DataType::Timestamp(TimeUnit::Microsecond, Some("Africa/Johannesburg".into())), + DataType::Timestamp( + TimeUnit::Microsecond, + Some("Africa/Johannesburg".into()), + ), false, ), Field::new( @@ -1606,11 +1649,13 @@ mod tests { Field::new("c20", DataType::Interval(IntervalUnit::YearMonth), false), Field::new_list( "c21", - Field::new("item", DataType::Boolean, true) - .with_metadata(meta(&[("Key", "Bar"), (PARQUET_FIELD_ID_META_KEY, "5")])), + Field::new("item", DataType::Boolean, true).with_metadata(meta(&[ + ("Key", "Bar"), + (PARQUET_FIELD_ID_META_KEY, "5"), + ])), false, ) - .with_metadata(meta(&[(PARQUET_FIELD_ID_META_KEY, "4")])), + .with_metadata(meta(&[(PARQUET_FIELD_ID_META_KEY, "4")])), Field::new( "c22", DataType::FixedSizeList( @@ -1632,7 +1677,7 @@ mod tests { Field::new("c", DataType::Float32, false), Field::new("d", DataType::Float16, false), ] - .into(), + .into(), ), false, ), @@ -1657,12 +1702,15 @@ mod tests { // Field::new("c30", DataType::Duration(TimeUnit::Nanosecond), false), Field::new_dict( "c31", - DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)), + DataType::Dictionary( + Box::new(DataType::Int32), + Box::new(DataType::Utf8), + ), true, 123, true, ) - .with_metadata(meta(&[(PARQUET_FIELD_ID_META_KEY, "6")])), + .with_metadata(meta(&[(PARQUET_FIELD_ID_META_KEY, "6")])), Field::new("c32", DataType::LargeBinary, true), Field::new("c33", DataType::LargeUtf8, true), Field::new_large_list( @@ -1676,7 +1724,7 @@ mod tests { Field::new("a", DataType::Int16, true), Field::new("b", DataType::Float64, true), ] - .into(), + .into(), ), true, ), @@ -1692,7 +1740,11 @@ mod tests { "c39", "key_value", Field::new("key", DataType::Utf8, false), - Field::new_list("value", Field::new("element", DataType::Utf8, true), true), + Field::new_list( + "value", + Field::new("element", DataType::Utf8, true), + true, + ), false, // fails to roundtrip keys_sorted true, ), @@ -1707,11 +1759,11 @@ mod tests { .with_metadata(meta(&[(PARQUET_FIELD_ID_META_KEY, "10")])), true, ) - .with_metadata(meta(&[(PARQUET_FIELD_ID_META_KEY, "9")])), + .with_metadata(meta(&[(PARQUET_FIELD_ID_META_KEY, "9")])), false, // fails to roundtrip keys_sorted true, ) - .with_metadata(meta(&[(PARQUET_FIELD_ID_META_KEY, "7")])), + .with_metadata(meta(&[(PARQUET_FIELD_ID_META_KEY, "7")])), Field::new_map( "c41", "my_entries", @@ -1731,8 +1783,11 @@ mod tests { // write to an empty parquet file so that schema is serialized let file = tempfile::tempfile().unwrap(); - let writer = - ArrowWriter::try_new(file.try_clone().unwrap(), Arc::new(schema.clone()), None)?; + let writer = ArrowWriter::try_new( + file.try_clone().unwrap(), + Arc::new(schema.clone()), + None, + )?; writer.close()?; // read file back @@ -1791,23 +1846,33 @@ mod tests { }; let schema = Schema::new_with_metadata( vec![ - Field::new("c1", DataType::Utf8, true) - .with_metadata(meta(&[(PARQUET_FIELD_ID_META_KEY, "1")])), - Field::new("c2", DataType::Utf8, true) - .with_metadata(meta(&[(PARQUET_FIELD_ID_META_KEY, "2")])), + Field::new("c1", DataType::Utf8, true).with_metadata(meta(&[ + (PARQUET_FIELD_ID_META_KEY, "1"), + ])), + Field::new("c2", DataType::Utf8, true).with_metadata(meta(&[ + (PARQUET_FIELD_ID_META_KEY, "2"), + ])), ], HashMap::new(), ); - let writer = ArrowWriter::try_new(vec![], Arc::new(schema.clone()), None)?; + let writer = ArrowWriter::try_new( + vec![], + Arc::new(schema.clone()), + None, + )?; let parquet_bytes = writer.into_inner()?; - let reader = - crate::file::reader::SerializedFileReader::new(bytes::Bytes::from(parquet_bytes))?; + let reader = crate::file::reader::SerializedFileReader::new( + bytes::Bytes::from(parquet_bytes), + )?; let schema_descriptor = reader.metadata().file_metadata().schema_descr_ptr(); // don't pass metadata so field ids are read from Parquet and not from serialized Arrow schema - let arrow_schema = crate::arrow::parquet_to_arrow_schema(&schema_descriptor, None)?; + let arrow_schema = crate::arrow::parquet_to_arrow_schema( + &schema_descriptor, + None, + )?; let parq_schema_descr = crate::arrow::arrow_to_parquet_schema(&arrow_schema)?; let parq_fields = parq_schema_descr.root_schema().get_fields(); @@ -1820,14 +1885,19 @@ mod tests { #[test] fn test_arrow_schema_roundtrip_lists() -> Result<()> { - let metadata: HashMap = [("Key".to_string(), "Value".to_string())] - .iter() - .cloned() - .collect(); + let metadata: HashMap = + [("Key".to_string(), "Value".to_string())] + .iter() + .cloned() + .collect(); let schema = Schema::new_with_metadata( vec![ - Field::new_list("c21", Field::new("array", DataType::Boolean, true), false), + Field::new_list( + "c21", + Field::new("array", DataType::Boolean, true), + false, + ), Field::new( "c22", DataType::FixedSizeList( @@ -1858,8 +1928,11 @@ mod tests { // write to an empty parquet file so that schema is serialized let file = tempfile::tempfile().unwrap(); - let writer = - ArrowWriter::try_new(file.try_clone().unwrap(), Arc::new(schema.clone()), None)?; + let writer = ArrowWriter::try_new( + file.try_clone().unwrap(), + Arc::new(schema.clone()), + None, + )?; writer.close()?; // read file back From d8e34a328a683d4fd773f9307beb5b7efb94cdce Mon Sep 17 00:00:00 2001 From: kikkon Date: Sun, 10 Mar 2024 22:11:15 +0800 Subject: [PATCH 04/10] revert some file to main --- parquet/src/arrow/schema/mod.rs | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/parquet/src/arrow/schema/mod.rs b/parquet/src/arrow/schema/mod.rs index 04f631f4944..4a78db05ed2 100644 --- a/parquet/src/arrow/schema/mod.rs +++ b/parquet/src/arrow/schema/mod.rs @@ -725,7 +725,7 @@ mod tests { ProjectionMask::all(), None, ) - .unwrap(); + .unwrap(); assert_eq!(&arrow_fields, converted_arrow_schema.fields()); } @@ -1655,7 +1655,7 @@ mod tests { ])), false, ) - .with_metadata(meta(&[(PARQUET_FIELD_ID_META_KEY, "4")])), + .with_metadata(meta(&[(PARQUET_FIELD_ID_META_KEY, "4")])), Field::new( "c22", DataType::FixedSizeList( @@ -1677,7 +1677,7 @@ mod tests { Field::new("c", DataType::Float32, false), Field::new("d", DataType::Float16, false), ] - .into(), + .into(), ), false, ), @@ -1710,7 +1710,7 @@ mod tests { 123, true, ) - .with_metadata(meta(&[(PARQUET_FIELD_ID_META_KEY, "6")])), + .with_metadata(meta(&[(PARQUET_FIELD_ID_META_KEY, "6")])), Field::new("c32", DataType::LargeBinary, true), Field::new("c33", DataType::LargeUtf8, true), Field::new_large_list( @@ -1724,7 +1724,7 @@ mod tests { Field::new("a", DataType::Int16, true), Field::new("b", DataType::Float64, true), ] - .into(), + .into(), ), true, ), @@ -1759,11 +1759,11 @@ mod tests { .with_metadata(meta(&[(PARQUET_FIELD_ID_META_KEY, "10")])), true, ) - .with_metadata(meta(&[(PARQUET_FIELD_ID_META_KEY, "9")])), + .with_metadata(meta(&[(PARQUET_FIELD_ID_META_KEY, "9")])), false, // fails to roundtrip keys_sorted true, ) - .with_metadata(meta(&[(PARQUET_FIELD_ID_META_KEY, "7")])), + .with_metadata(meta(&[(PARQUET_FIELD_ID_META_KEY, "7")])), Field::new_map( "c41", "my_entries", From 8121ef9a055912b2a9dcb21d4179cc15dce25f62 Mon Sep 17 00:00:00 2001 From: kikkon Date: Mon, 11 Mar 2024 23:47:29 +0800 Subject: [PATCH 05/10] fix: listview buffer init --- arrow-data/src/data.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/arrow-data/src/data.rs b/arrow-data/src/data.rs index 8f024b2da78..c63b3cf7860 100644 --- a/arrow-data/src/data.rs +++ b/arrow-data/src/data.rs @@ -120,8 +120,8 @@ pub(crate) fn new_buffers(data_type: &DataType, capacity: usize) -> [MutableBuff [buffer, empty_buffer] } DataType::ListView(_) => { - let mut buffer = MutableBuffer::new((1 + capacity) * mem::size_of::()); - buffer.push(0i32); + // init offset buffer and size buffer + let mut buffer = MutableBuffer::new(2 * capacity * mem::size_of::()); [buffer, empty_buffer] } DataType::LargeList(_) => { @@ -131,8 +131,8 @@ pub(crate) fn new_buffers(data_type: &DataType, capacity: usize) -> [MutableBuff [buffer, empty_buffer] } DataType::LargeListView(_) => { - let mut buffer = MutableBuffer::new((1 + capacity) * mem::size_of::()); - buffer.push(0i64); + // init offset buffer and size buffer + let mut buffer = MutableBuffer::new(2 * capacity * mem::size_of::()); [buffer, empty_buffer] } DataType::FixedSizeBinary(size) => { From cfe100687ef3881cda729270aeebd2eefe9761ce Mon Sep 17 00:00:00 2001 From: kikkon Date: Tue, 12 Mar 2024 00:29:38 +0800 Subject: [PATCH 06/10] cargo clippy --- arrow-data/src/data.rs | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/arrow-data/src/data.rs b/arrow-data/src/data.rs index c63b3cf7860..6025309022e 100644 --- a/arrow-data/src/data.rs +++ b/arrow-data/src/data.rs @@ -121,8 +121,7 @@ pub(crate) fn new_buffers(data_type: &DataType, capacity: usize) -> [MutableBuff } DataType::ListView(_) => { // init offset buffer and size buffer - let mut buffer = MutableBuffer::new(2 * capacity * mem::size_of::()); - [buffer, empty_buffer] + [MutableBuffer::new(2 * capacity * mem::size_of::()), empty_buffer] } DataType::LargeList(_) => { // offset buffer always starts with a zero @@ -132,8 +131,7 @@ pub(crate) fn new_buffers(data_type: &DataType, capacity: usize) -> [MutableBuff } DataType::LargeListView(_) => { // init offset buffer and size buffer - let mut buffer = MutableBuffer::new(2 * capacity * mem::size_of::()); - [buffer, empty_buffer] + [MutableBuffer::new(2 * capacity * mem::size_of::()), empty_buffer] } DataType::FixedSizeBinary(size) => { [MutableBuffer::new(capacity * *size as usize), empty_buffer] From a6f2d296bbee5e5355b279fe714958298234495a Mon Sep 17 00:00:00 2001 From: kikkon Date: Tue, 12 Mar 2024 00:35:44 +0800 Subject: [PATCH 07/10] cargo fmt --- arrow-data/src/data.rs | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/arrow-data/src/data.rs b/arrow-data/src/data.rs index 6025309022e..89b558bff67 100644 --- a/arrow-data/src/data.rs +++ b/arrow-data/src/data.rs @@ -119,20 +119,20 @@ pub(crate) fn new_buffers(data_type: &DataType, capacity: usize) -> [MutableBuff buffer.push(0i32); [buffer, empty_buffer] } - DataType::ListView(_) => { - // init offset buffer and size buffer - [MutableBuffer::new(2 * capacity * mem::size_of::()), empty_buffer] - } + DataType::ListView(_) => [ + MutableBuffer::new(2 * capacity * mem::size_of::()), + empty_buffer, + ], DataType::LargeList(_) => { // offset buffer always starts with a zero let mut buffer = MutableBuffer::new((1 + capacity) * mem::size_of::()); buffer.push(0i64); [buffer, empty_buffer] } - DataType::LargeListView(_) => { - // init offset buffer and size buffer - [MutableBuffer::new(2 * capacity * mem::size_of::()), empty_buffer] - } + DataType::LargeListView(_) => [ + MutableBuffer::new(2 * capacity * mem::size_of::()), + empty_buffer, + ], DataType::FixedSizeBinary(size) => { [MutableBuffer::new(capacity * *size as usize), empty_buffer] } From cd7ad2450da6349c484bcfb8c61648b6e578cc1c Mon Sep 17 00:00:00 2001 From: kikkon Date: Tue, 12 Mar 2024 10:21:28 +0800 Subject: [PATCH 08/10] fix buffer init --- arrow-data/src/data.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/arrow-data/src/data.rs b/arrow-data/src/data.rs index 89b558bff67..914e90d96c2 100644 --- a/arrow-data/src/data.rs +++ b/arrow-data/src/data.rs @@ -120,8 +120,8 @@ pub(crate) fn new_buffers(data_type: &DataType, capacity: usize) -> [MutableBuff [buffer, empty_buffer] } DataType::ListView(_) => [ - MutableBuffer::new(2 * capacity * mem::size_of::()), - empty_buffer, + MutableBuffer::new(capacity * mem::size_of::()), + MutableBuffer::new(capacity * mem::size_of::()), ], DataType::LargeList(_) => { // offset buffer always starts with a zero @@ -130,8 +130,8 @@ pub(crate) fn new_buffers(data_type: &DataType, capacity: usize) -> [MutableBuff [buffer, empty_buffer] } DataType::LargeListView(_) => [ - MutableBuffer::new(2 * capacity * mem::size_of::()), - empty_buffer, + MutableBuffer::new(capacity * mem::size_of::()), + MutableBuffer::new(capacity * mem::size_of::()), ], DataType::FixedSizeBinary(size) => { [MutableBuffer::new(capacity * *size as usize), empty_buffer] From 04f8abb3feec3dff30fad80da5377bebd7d78dd7 Mon Sep 17 00:00:00 2001 From: Kikkon <19528375+Kikkon@users.noreply.github.com> Date: Tue, 12 Mar 2024 19:48:43 +0800 Subject: [PATCH 09/10] Update arrow-schema/src/datatype.rs Co-authored-by: Andrew Lamb --- arrow-schema/src/datatype.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/arrow-schema/src/datatype.rs b/arrow-schema/src/datatype.rs index f875abceeb1..0186c2e2cdb 100644 --- a/arrow-schema/src/datatype.rs +++ b/arrow-schema/src/datatype.rs @@ -235,7 +235,7 @@ pub enum DataType { /// /// The ListView layout is defined by three buffers: /// a validity bitmap, an offsets buffer, and an additional sizes buffer. - /// Sizes and offsets have the identical bit width and both 32-bit signed integer options is supported. + /// Sizes and offsets are both 32 bits for this type ListView(FieldRef), /// A list of some logical data type with fixed length. FixedSizeList(FieldRef, i32), From 56faa7c45061c80796f3dd7ef3d55d160bd32534 Mon Sep 17 00:00:00 2001 From: Kikkon <19528375+Kikkon@users.noreply.github.com> Date: Tue, 12 Mar 2024 19:48:51 +0800 Subject: [PATCH 10/10] Update arrow-schema/src/datatype.rs Co-authored-by: Andrew Lamb --- arrow-schema/src/datatype.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/arrow-schema/src/datatype.rs b/arrow-schema/src/datatype.rs index 0186c2e2cdb..6472495697b 100644 --- a/arrow-schema/src/datatype.rs +++ b/arrow-schema/src/datatype.rs @@ -250,7 +250,7 @@ pub enum DataType { /// /// The LargeListView layout is defined by three buffers: /// a validity bitmap, an offsets buffer, and an additional sizes buffer. - /// Sizes and offsets have the identical bit width and both 64-bit signed integer options is supported. + /// Sizes and offsets are both 64 bits for this type LargeListView(FieldRef), /// A nested datatype that contains a number of sub-fields. Struct(Fields),