diff --git a/Cargo.toml b/Cargo.toml index 381bf2199..cde0af3c4 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -24,6 +24,7 @@ version = "0.1.1" arrow = { version = "^52.0" } arrow-arith = { version = "^52.0" } arrow-array = { version = "^52.0" } +arrow-cast = { version = "^52.0" } arrow-data = { version = "^52.0" } arrow-ord = { version = "^52.0" } arrow-json = { version = "^52.0" } diff --git a/acceptance/src/data.rs b/acceptance/src/data.rs index 871879ddc..87a774c92 100644 --- a/acceptance/src/data.rs +++ b/acceptance/src/data.rs @@ -60,11 +60,12 @@ pub fn sort_record_batch(batch: RecordBatch) -> DeltaResult { Ok(RecordBatch::try_new(batch.schema(), columns)?) } -static SKIPPED_TESTS: &[&str; 1] = &[ - // For multi_partitioned_2: The golden table stores the timestamp as an INT96 (which is - // nanosecond precision), while the spec says we should read partition columns as - // microseconds. This means the read and golden data don't line up. When this is released in - // `dat` upstream, we can stop skipping this test +static SKIPPED_TESTS: &[&str; 2] = &[ + // For all_primitive_types and multi_partitioned_2: The golden table stores the timestamp as an + // INT96 (which is nanosecond precision), while the spec says we should read partition columns + // as microseconds. This means the read and golden data don't line up. When this is released in + // `dat` upstream, we can stop skipping these tests + "all_primitive_types", "multi_partitioned_2", ]; diff --git a/ffi/src/lib.rs b/ffi/src/lib.rs index cf3685896..0942056f3 100644 --- a/ffi/src/lib.rs +++ b/ffi/src/lib.rs @@ -263,6 +263,7 @@ pub enum KernelError { InvalidTableLocationError, InvalidDecimalError, InvalidStructDataError, + InternalError, } impl From for KernelError { @@ -303,6 +304,7 @@ impl From for KernelError { Error::InvalidTableLocation(_) => KernelError::InvalidTableLocationError, Error::InvalidDecimal(_) => KernelError::InvalidDecimalError, Error::InvalidStructData(_) => KernelError::InvalidStructDataError, + Error::InternalError(_) => KernelError::InternalError, Error::Backtraced { source, backtrace: _, diff --git a/ffi/src/scan.rs b/ffi/src/scan.rs index e4e9c5867..cbee3ee3f 100644 --- a/ffi/src/scan.rs +++ b/ffi/src/scan.rs @@ -5,7 +5,7 @@ use std::ffi::c_void; use std::sync::{Arc, Mutex}; use delta_kernel::scan::state::{visit_scan_files, DvInfo, GlobalScanState}; -use delta_kernel::scan::{Scan, ScanBuilder, ScanData}; +use delta_kernel::scan::{Scan, ScanData}; use delta_kernel::schema::Schema; use delta_kernel::snapshot::Snapshot; use delta_kernel::{DeltaResult, EngineData, Error}; diff --git a/kernel/Cargo.toml b/kernel/Cargo.toml index 601feb0ed..82fd64c08 100644 --- a/kernel/Cargo.toml +++ b/kernel/Cargo.toml @@ -37,9 +37,10 @@ delta_kernel_derive = { path = "../derive-macros", version = "0.1.1" } visibility = "0.1.0" # Used in default engine -arrow-array = { workspace = true, optional = true } +arrow-array = { workspace = true, optional = true, features = ["chrono-tz"] } arrow-select = { workspace = true, optional = true } arrow-arith = { workspace = true, optional = true } +arrow-cast = { workspace = true, optional = true } arrow-json = { workspace = true, optional = true } arrow-ord = { workspace = true, optional = true } arrow-schema = { workspace = true, optional = true } @@ -67,6 +68,7 @@ default-engine = [ "arrow-conversion", "arrow-expression", "arrow-array", + "arrow-cast", "arrow-json", "arrow-schema", "arrow-select", @@ -80,6 +82,7 @@ default-engine = [ developer-visibility = [] sync-engine = [ + "arrow-cast", "arrow-conversion", "arrow-expression", "arrow-array", diff --git a/kernel/src/engine/arrow_expression.rs b/kernel/src/engine/arrow_expression.rs index bd11fdb3c..61d2a3234 100644 --- a/kernel/src/engine/arrow_expression.rs +++ b/kernel/src/engine/arrow_expression.rs @@ -17,10 +17,10 @@ use itertools::Itertools; use super::arrow_conversion::LIST_ARRAY_ROOT; use crate::engine::arrow_data::ArrowEngineData; +use crate::engine::arrow_utils::ensure_data_types; use crate::error::{DeltaResult, Error}; use crate::expressions::{BinaryOperator, Expression, Scalar, UnaryOperator, VariadicOperator}; use crate::schema::{DataType, PrimitiveType, SchemaRef}; -use crate::utils::require; use crate::{EngineData, ExpressionEvaluator, ExpressionHandler}; // TODO leverage scalars / Datum @@ -161,93 +161,6 @@ fn column_as_struct<'a>( .ok_or(ArrowError::SchemaError(format!("{} is not a struct", name))) } -fn make_arrow_error(s: String) -> Error { - Error::Arrow(arrow_schema::ArrowError::InvalidArgumentError(s)) -} - -/// Ensure a kernel data type matches an arrow data type. This only ensures that the actual "type" -/// is the same, but does so recursively into structs, and ensures lists and maps have the correct -/// associated types as well. This returns an `Ok(())` if the types are compatible, or an error if -/// the types do not match. If there is a `struct` type included, we only ensure that the named -/// fields that the kernel is asking for exist, and that for those fields the types -/// match. Un-selected fields are ignored. -fn ensure_data_types(kernel_type: &DataType, arrow_type: &ArrowDataType) -> DeltaResult<()> { - match (kernel_type, arrow_type) { - (DataType::Primitive(_), _) if arrow_type.is_primitive() => Ok(()), - (DataType::Primitive(PrimitiveType::Boolean), ArrowDataType::Boolean) - | (DataType::Primitive(PrimitiveType::String), ArrowDataType::Utf8) - | (DataType::Primitive(PrimitiveType::Binary), ArrowDataType::Binary) => { - // strings, bools, and binary aren't primitive in arrow - Ok(()) - } - ( - DataType::Primitive(PrimitiveType::Decimal(kernel_prec, kernel_scale)), - ArrowDataType::Decimal128(arrow_prec, arrow_scale), - ) if arrow_prec == kernel_prec && *arrow_scale == *kernel_scale as i8 => { - // decimal isn't primitive in arrow. cast above is okay as we limit range - Ok(()) - } - (DataType::Array(inner_type), ArrowDataType::List(arrow_list_type)) => { - let kernel_array_type = &inner_type.element_type; - let arrow_list_type = arrow_list_type.data_type(); - ensure_data_types(kernel_array_type, arrow_list_type) - } - (DataType::Map(kernel_map_type), ArrowDataType::Map(arrow_map_type, _)) => { - if let ArrowDataType::Struct(fields) = arrow_map_type.data_type() { - let mut fiter = fields.iter(); - if let Some(key_type) = fiter.next() { - ensure_data_types(&kernel_map_type.key_type, key_type.data_type())?; - } else { - return Err(make_arrow_error( - "Arrow map struct didn't have a key type".to_string(), - )); - } - if let Some(value_type) = fiter.next() { - ensure_data_types(&kernel_map_type.value_type, value_type.data_type())?; - } else { - return Err(make_arrow_error( - "Arrow map struct didn't have a value type".to_string(), - )); - } - Ok(()) - } else { - Err(make_arrow_error( - "Arrow map type wasn't a struct.".to_string(), - )) - } - } - (DataType::Struct(kernel_fields), ArrowDataType::Struct(arrow_fields)) => { - // build a list of kernel fields that matches the order of the arrow fields - let mapped_fields = arrow_fields - .iter() - .flat_map(|f| kernel_fields.fields.get(f.name())); - - // keep track of how many fields we matched up - let mut found_fields = 0; - // ensure that for the fields that we found, the types match - for (kernel_field, arrow_field) in mapped_fields.zip(arrow_fields) { - ensure_data_types(&kernel_field.data_type, arrow_field.data_type())?; - found_fields += 1; - } - - // require that we found the number of fields that we requested. - require!(kernel_fields.fields.len() == found_fields, { - let kernel_field_names = kernel_fields.fields.keys().join(", "); - let arrow_field_names = arrow_fields.iter().map(|f| f.name()).join(", "); - make_arrow_error(format!( - "Missing Struct fields. Requested: {}, found: {}", - kernel_field_names, arrow_field_names, - )) - }); - Ok(()) - } - _ => Err(make_arrow_error(format!( - "Incorrect datatype. Expected {}, got {}", - kernel_type, arrow_type - ))), - } -} - fn evaluate_expression( expression: &Expression, batch: &RecordBatch, diff --git a/kernel/src/engine/arrow_utils.rs b/kernel/src/engine/arrow_utils.rs index 1e15199e4..522454d19 100644 --- a/kernel/src/engine/arrow_utils.rs +++ b/kernel/src/engine/arrow_utils.rs @@ -1,89 +1,1369 @@ //! Some utilities for working with arrow data types -use std::sync::Arc; +use std::{collections::HashSet, sync::Arc}; -use crate::{schema::SchemaRef, utils::require, DeltaResult, Error}; +use crate::{ + schema::{DataType, PrimitiveType, Schema, SchemaRef, StructField, StructType}, + utils::require, + DeltaResult, Error, +}; -use arrow_array::RecordBatch; -use arrow_schema::{Schema as ArrowSchema, SchemaRef as ArrowSchemaRef}; +use arrow_array::{ + cast::AsArray, new_null_array, Array as ArrowArray, GenericListArray, OffsetSizeTrait, + StructArray, +}; +use arrow_schema::{ + DataType as ArrowDataType, Field as ArrowField, FieldRef as ArrowFieldRef, Fields, + SchemaRef as ArrowSchemaRef, +}; +use itertools::Itertools; use parquet::{arrow::ProjectionMask, schema::types::SchemaDescriptor}; +use tracing::debug; -/// Get the indicies in `parquet_schema` of the specified columns in `requested_schema`. This -/// returns a tuples of (mask_indicies: Vec, reorder_indicies: -/// Vec). `mask_indicies` is used for generating the mask for reading from the +fn make_arrow_error(s: String) -> Error { + Error::Arrow(arrow_schema::ArrowError::InvalidArgumentError(s)) +} + +/// Capture the compatibility between two data-types, as passed to [`ensure_data_types`] +pub(crate) enum DataTypeCompat { + /// The two types are the same + Identical, + /// What is read from parquet needs to be cast to the associated type + NeedsCast(ArrowDataType), + /// Types are compatible, but are nested types. This is used when comparing types where casting + /// is not desired (i.e. in the expression evaluator) + Nested, +} + +// Check if two types can be cast +fn check_cast_compat( + target_type: ArrowDataType, + source_type: &ArrowDataType, +) -> DeltaResult { + match (source_type, &target_type) { + (source_type, target_type) if source_type == target_type => Ok(DataTypeCompat::Identical), + (&ArrowDataType::Timestamp(_, _), &ArrowDataType::Timestamp(_, _)) => { + // timestamps are able to be cast between each other + Ok(DataTypeCompat::NeedsCast(target_type)) + } + _ => Err(make_arrow_error(format!( + "Incorrect datatype. Expected {}, got {}", + target_type, source_type + ))), + } +} + +/// Ensure a kernel data type matches an arrow data type. This only ensures that the actual "type" +/// is the same, but does so recursively into structs, and ensures lists and maps have the correct +/// associated types as well. This returns an `Ok(DataTypeCompat)` if the types are compatible, and +/// will indicate what kind of compatibility they have, or an error if the types do not match. If +/// there is a `struct` type included, we only ensure that the named fields that the kernel is +/// asking for exist, and that for those fields the types match. Un-selected fields are ignored. +pub(crate) fn ensure_data_types( + kernel_type: &DataType, + arrow_type: &ArrowDataType, +) -> DeltaResult { + match (kernel_type, arrow_type) { + (DataType::Primitive(_), _) if arrow_type.is_primitive() => { + check_cast_compat(kernel_type.try_into()?, arrow_type) + } + (DataType::Primitive(PrimitiveType::Boolean), ArrowDataType::Boolean) + | (DataType::Primitive(PrimitiveType::String), ArrowDataType::Utf8) + | (DataType::Primitive(PrimitiveType::Binary), ArrowDataType::Binary) => { + // strings, bools, and binary aren't primitive in arrow + Ok(DataTypeCompat::Identical) + } + ( + DataType::Primitive(PrimitiveType::Decimal(kernel_prec, kernel_scale)), + ArrowDataType::Decimal128(arrow_prec, arrow_scale), + ) if arrow_prec == kernel_prec && *arrow_scale == *kernel_scale as i8 => { + // decimal isn't primitive in arrow. cast above is okay as we limit range + Ok(DataTypeCompat::Identical) + } + (DataType::Array(inner_type), ArrowDataType::List(arrow_list_type)) => { + let kernel_array_type = &inner_type.element_type; + let arrow_list_type = arrow_list_type.data_type(); + ensure_data_types(kernel_array_type, arrow_list_type) + } + (DataType::Map(kernel_map_type), ArrowDataType::Map(arrow_map_type, _)) => { + if let ArrowDataType::Struct(fields) = arrow_map_type.data_type() { + let mut fields = fields.iter(); + if let Some(key_type) = fields.next() { + ensure_data_types(&kernel_map_type.key_type, key_type.data_type())?; + } else { + return Err(make_arrow_error( + "Arrow map struct didn't have a key type".to_string(), + )); + } + if let Some(value_type) = fields.next() { + ensure_data_types(&kernel_map_type.value_type, value_type.data_type())?; + } else { + return Err(make_arrow_error( + "Arrow map struct didn't have a value type".to_string(), + )); + } + if fields.next().is_some() { + return Err(Error::generic("map fields had more than 2 members")); + } + Ok(DataTypeCompat::Nested) + } else { + Err(make_arrow_error( + "Arrow map type wasn't a struct.".to_string(), + )) + } + } + (DataType::Struct(kernel_fields), ArrowDataType::Struct(arrow_fields)) => { + // build a list of kernel fields that matches the order of the arrow fields + let mapped_fields = arrow_fields + .iter() + .filter_map(|f| kernel_fields.fields.get(f.name())); + + // keep track of how many fields we matched up + let mut found_fields = 0; + // ensure that for the fields that we found, the types match + for (kernel_field, arrow_field) in mapped_fields.zip(arrow_fields) { + ensure_data_types(&kernel_field.data_type, arrow_field.data_type())?; + found_fields += 1; + } + + // require that we found the number of fields that we requested. + require!(kernel_fields.fields.len() == found_fields, { + let arrow_field_map: HashSet<&String> = + HashSet::from_iter(arrow_fields.iter().map(|f| f.name())); + let missing_field_names = kernel_fields + .fields + .keys() + .filter(|kernel_field| !arrow_field_map.contains(kernel_field)) + .take(5) + .join(", "); + make_arrow_error(format!( + "Missing Struct fields {} (Up to five missing fields shown)", + missing_field_names + )) + }); + Ok(DataTypeCompat::Nested) + } + _ => Err(make_arrow_error(format!( + "Incorrect datatype. Expected {}, got {}", + kernel_type, arrow_type + ))), + } +} + +/* +* The code below implements proper pruning of columns when reading parquet, reordering of columns to +* match the specified schema, and insertion of null columns if the requested schema includes a +* nullable column that isn't included in the parquet file. +* +* At a high level there are three schemas/concepts to worry about: +* - The parquet file's physical schema (= the columns that are actually available), called +* "parquet_schema" below +* - The requested logical schema from the engine (= the columns we actually want), called +* "requested_schema" below +* - The Read schema (and intersection of 1. and 2., in logical schema order). This is never +* materialized, but is useful to be able to refer to here +* - A `ProjectionMask` that goes to the parquet reader which specifies which subset of columns from +* the file schema to actually read. (See "Example" below) +* +* In other words, the ProjectionMask is the intersection of the parquet schema and logical schema, +* and then mapped to indices in the parquet file. Columns unique to the file schema need to be +* masked out (= ignored), while columns unique to the logical schema need to be backfilled with +* nulls. +* +* We also have to worry about field ordering differences between the read schema and logical +* schema. We represent any reordering needed as a tree. Each level of the tree is a vec of +* `ReorderIndex`s. Each element's index represents a column that will be in the read parquet data +* (as an arrow StructArray) at that level and index. The `ReorderIndex::index` field of the element +* is the position that the column should appear in the final output. + +* The algorithm has three parts, handled by `get_requested_indices`, `generate_mask` and +* `reorder_struct_array` respectively. + +* `get_requested_indices` generates indices to select, along with reordering information: +* 1. Loop over each field in parquet_schema, keeping track of how many physical fields (i.e. leaf +* columns) we have seen so far +* 2. If a requested field matches the physical field, push the index of the field onto the mask. + +* 3. Also push a ReorderIndex element that indicates where this item should be in the final output, +* and if it needs any transformation (i.e. casting, create null column) +* 4. If a nested element (struct/map/list) is encountered, recurse into it, pushing indices onto +* the same vector, but producing a new reorder level, which is added to the parent with a `Nested` +* transform +* +* `generate_mask` is simple, and just calls `ProjectionMask::leaves` in the parquet crate with the +* indices computed by `get_requested_indices` +* +* `reorder_struct_array` handles reordering and data transforms: +* 1. First check if we need to do any transformations (see doc comment for +* `ordering_needs_transform`) +* 2. If nothing is required we're done (return); otherwise: +* 3. Create a Vec[None, ..., None] of placeholders that will hold the correctly ordered columns +* 4. Deconstruct the existing struct array and then loop over the `ReorderIndex` list +* 5. Use the `ReorderIndex::index` value to put the column at the correct location +* 6. Additionally, if `ReorderIndex::transform` is not `Identity`, then if it is: +* - `Cast`: cast the column to the specified type +* - `Missing`: put a column of `null` at the correct location +* - `Nested([child_order])` and the data is a `StructArray`: recursively call +* `reorder_struct_array` on the column with `child_order` to correctly ordered the child +* array +* - `Nested` and the data is a `List`: get the inner struct array out of the list, +* reorder it recursively as above, rebuild the list, and the put the column at the correct +* location +* +* Example: +* The parquet crate `ProjectionMask::leaves` method only considers leaf columns -- a "flat" schema -- +* so a struct column is purely a schema level thing and doesn't "count" wrt. column indices. +* +* So if we have the following file physical schema: +* +* a +* d +* x +* b +* y +* z +* e +* f +* c +* +* and a logical requested schema of: +* +* b +* f +* e +* a +* x +* c +* +* The mask is [1, 3, 4, 5] because a, b, and y don't contribute to the column indices. +* +* The reorder tree is: +* [ +* // col a is at position 0 in the struct array, and should be moved to position 1 +* { index: 1, Nested([{ index: 0 }]) }, +* // col b is at position 1 in the struct array, and should be moved to position 0 +* // also, the inner struct array needs to be reordered to swap 'f' and 'e' +* { index: 0, Nested([{ index: 1 }, {index: 0}]) }, +* // col c is at position 2 in the struct array, and should stay there +* { index: 2 } +* ] +*/ + +/// Reordering is specified as a tree. Each level is a vec of `ReorderIndex`s. Each element's +/// position represents a column that will be in the read parquet data at that level and +/// position. The `index` of the element is the position that the column should appear in the final +/// output. The `transform` indicates what, if any, transforms are needed. See the docs for +/// [`ReorderIndexTransform`] for the meaning. +#[derive(Debug, PartialEq)] +pub(crate) struct ReorderIndex { + pub(crate) index: usize, + transform: ReorderIndexTransform, +} + +#[derive(Debug, PartialEq)] +pub(crate) enum ReorderIndexTransform { + /// For a non-nested type, indicates that we need to cast to the contained type + Cast(ArrowDataType), + /// Used for struct/list/map. Potentially transform child fields using contained reordering + Nested(Vec), + /// No work needed to transform this data + Identity, + /// Data is missing, fill in with a null column + Missing(ArrowFieldRef), +} + +impl ReorderIndex { + fn new(index: usize, transform: ReorderIndexTransform) -> Self { + ReorderIndex { index, transform } + } + + fn cast(index: usize, target: ArrowDataType) -> Self { + ReorderIndex::new(index, ReorderIndexTransform::Cast(target)) + } + + fn nested(index: usize, children: Vec) -> Self { + ReorderIndex::new(index, ReorderIndexTransform::Nested(children)) + } + + fn identity(index: usize) -> Self { + ReorderIndex::new(index, ReorderIndexTransform::Identity) + } + + fn missing(index: usize, field: ArrowFieldRef) -> Self { + ReorderIndex::new(index, ReorderIndexTransform::Missing(field)) + } + + /// Check if this reordering requires a transformation anywhere. See comment below on + /// [`ordering_needs_transform`] to understand why this is needed. + fn needs_transform(&self) -> bool { + match self.transform { + // if we're casting or inserting null, we need to transform + ReorderIndexTransform::Cast(_) | ReorderIndexTransform::Missing(_) => true, + // if our nested ordering needs a transform, we need a transform + ReorderIndexTransform::Nested(ref children) => ordering_needs_transform(children), + // no transform needed + ReorderIndexTransform::Identity => false, + } + } +} + +// count the number of physical columns, including nested ones in an `ArrowField` +fn count_cols(field: &ArrowField) -> usize { + _count_cols(field.data_type()) +} + +fn _count_cols(dt: &ArrowDataType) -> usize { + match dt { + ArrowDataType::Struct(fields) => fields.iter().map(|f| count_cols(f)).sum(), + ArrowDataType::Union(fields, _) => fields.iter().map(|(_, f)| count_cols(f)).sum(), + ArrowDataType::List(field) + | ArrowDataType::LargeList(field) + | ArrowDataType::FixedSizeList(field, _) + | ArrowDataType::Map(field, _) => count_cols(field), + ArrowDataType::Dictionary(_, value_field) => _count_cols(value_field.as_ref()), + _ => 1, // other types are "real" fields, so count + } +} + +/// helper function, does the same as `get_requested_indices` but at an offset. used to recurse into +/// structs, lists, and maps. `parquet_offset` is how many parquet fields exist before processing +/// this potentially nested schema. returns the number of parquet fields in `fields` (regardless of +/// if they are selected or not) and reordering information for the requested fields. +fn get_indices( + start_parquet_offset: usize, + requested_schema: &Schema, + fields: &Fields, + mask_indices: &mut Vec, +) -> DeltaResult<(usize, Vec)> { + let mut found_fields = HashSet::with_capacity(requested_schema.fields.len()); + let mut reorder_indices = Vec::with_capacity(requested_schema.fields.len()); + let mut parquet_offset = start_parquet_offset; + // for each field, get its position in the parquet (via enumerate), a reference to the arrow + // field, and info about where it appears in the requested_schema, or None if the field is not + // requested + let all_field_info = fields.iter().enumerate().map(|(parquet_index, field)| { + let field_info = requested_schema.fields.get_full(field.name()); + (parquet_index, field, field_info) + }); + for (parquet_index, field, field_info) in all_field_info { + debug!( + "Getting indices for field {} with offset {parquet_offset}, with index {parquet_index}", + field.name() + ); + if let Some((index, _, requested_field)) = field_info { + match field.data_type() { + ArrowDataType::Struct(fields) => { + if let DataType::Struct(ref requested_schema) = requested_field.data_type { + let (parquet_advance, children) = get_indices( + parquet_index + parquet_offset, + requested_schema.as_ref(), + fields, + mask_indices, + )?; + // advance the number of parquet fields, but subtract 1 because the + // struct will be counted by the `enumerate` call but doesn't count as + // an actual index. + parquet_offset += parquet_advance - 1; + // note that we found this field + found_fields.insert(requested_field.name()); + // push the child reorder on + reorder_indices.push(ReorderIndex::nested(index, children)); + } else { + return Err(Error::unexpected_column_type(field.name())); + } + } + ArrowDataType::List(list_field) + | ArrowDataType::LargeList(list_field) + | ArrowDataType::ListView(list_field) => { + // we just want to transparently recurse into lists, need to transform the kernel + // list data type into a schema + if let DataType::Array(array_type) = requested_field.data_type() { + let requested_schema = StructType::new(vec![StructField::new( + list_field.name().clone(), // so we find it in the inner call + array_type.element_type.clone(), + array_type.contains_null, + )]); + let (parquet_advance, mut children) = get_indices( + found_fields.len() + parquet_offset, + &requested_schema, + &[list_field.clone()].into(), + mask_indices, + )?; + // see comment above in struct match arm + parquet_offset += parquet_advance - 1; + found_fields.insert(requested_field.name()); + if children.len() != 1 { + return Err(Error::generic( + "List call should not have generated more than one reorder index", + )); + } + // safety, checked that we have 1 element + let mut children = children.swap_remove(0); + // the index is wrong, as it's the index from the inner schema. Adjust + // it to be our index + children.index = index; + reorder_indices.push(children); + } else { + return Err(Error::unexpected_column_type(list_field.name())); + } + } + ArrowDataType::Map(key_val_field, _) => { + match (key_val_field.data_type(), requested_field.data_type()) { + (ArrowDataType::Struct(inner_fields), DataType::Map(map_type)) => { + let mut key_val_names = + inner_fields.iter().map(|f| f.name().to_string()); + let key_name = key_val_names.next().ok_or_else(|| { + Error::generic("map fields didn't include a key col") + })?; + let val_name = key_val_names.next().ok_or_else(|| { + Error::generic("map fields didn't include a val col") + })?; + if key_val_names.next().is_some() { + return Err(Error::generic("map fields had more than 2 members")); + } + let inner_schema = map_type.as_struct_schema(key_name, val_name); + let (parquet_advance, _children) = get_indices( + parquet_index + parquet_offset, + &inner_schema, + inner_fields, + mask_indices, + )?; + // advance the number of parquet fields, but subtract 1 because the + // map will be counted by the `enumerate` call but doesn't count as + // an actual index. + parquet_offset += parquet_advance - 1; + // note that we found this field + found_fields.insert(requested_field.name()); + // push the child reorder on, currently no reordering for maps + reorder_indices.push(ReorderIndex::identity(index)); + } + _ => { + return Err(Error::unexpected_column_type(field.name())); + } + } + } + _ => { + match ensure_data_types(&requested_field.data_type, field.data_type())? { + DataTypeCompat::Identical => { + reorder_indices.push(ReorderIndex::identity(index)) + } + DataTypeCompat::NeedsCast(target) => { + reorder_indices.push(ReorderIndex::cast(index, target)) + } + DataTypeCompat::Nested => { + return Err(Error::internal_error( + "Comparing nested types in get_indices", + )) + } + } + found_fields.insert(requested_field.name()); + mask_indices.push(parquet_offset + parquet_index); + } + } + } else { + // We're NOT selecting this field, but we still need to track how many leaf columns we + // skipped over + debug!("Skipping over un-selected field: {}", field.name()); + // offset by number of inner fields. subtract one, because the enumerate still + // counts this logical "parent" field + parquet_offset += count_cols(field) - 1; + } + } + + if found_fields.len() != requested_schema.fields.len() { + // some fields are missing, but they might be nullable, need to insert them into the reorder_indices + for (requested_position, field) in requested_schema.fields().enumerate() { + if !found_fields.contains(field.name()) { + if field.nullable { + debug!("Inserting missing and nullable field: {}", field.name()); + reorder_indices.push(ReorderIndex::missing( + requested_position, + Arc::new(field.try_into()?), + )); + } else { + return Err(Error::Generic(format!( + "Requested field not found in parquet schema, and field is not nullable: {}", + field.name() + ))); + } + } + } + } + Ok(( + parquet_offset + fields.len() - start_parquet_offset, + reorder_indices, + )) +} + +/// Get the indices in `parquet_schema` of the specified columns in `requested_schema`. This returns +/// a tuple of (mask_indices: Vec, reorder_indices: +/// Vec). `mask_indices` is used for generating the mask for reading from the /// parquet file, and simply contains an entry for each index we wish to select from the parquet -/// file set to the index of the requested column in the parquet. `reorder_indicies` is used for -/// re-ordering and will be the same size as `requested_schema`. Each index in `reorder_indicies` -/// represents a column that will be in the read parquet data at that index. The value stored in -/// `reorder_indicies` is the position that the column should appear in the final output. For -/// example, if `reorder_indicies` is `[2,0,1]`, then the re-ordering code should take the third -/// column in the raw-read parquet data, and move it to the first column in the final output, the -/// first column to the second, and the second to the third. +/// file set to the index of the requested column in the parquet. `reorder_indices` is used for +/// re-ordering. See the documentation for [`ReorderIndex`] to understand what each element in the +/// returned array means pub(crate) fn get_requested_indices( requested_schema: &SchemaRef, parquet_schema: &ArrowSchemaRef, -) -> DeltaResult<(Vec, Vec)> { - let (mask_indicies, reorder_indicies): (Vec, Vec) = parquet_schema - .fields() - .iter() - .enumerate() - .filter_map(|(parquet_index, field)| { - requested_schema - .index_of(field.name()) - .map(|index| (parquet_index, index)) - }) - .unzip(); - require!( - mask_indicies.len() == requested_schema.fields.len(), - Error::generic("Didn't find all requested columns in parquet schema") - ); - Ok((mask_indicies, reorder_indicies)) -} - -/// Create a mask that will only select the specified indicies from the parquet. Currently we only -/// handle "root" level columns, and hence use `ProjectionMask::roots`, but will support leaf -/// selection in the future. See issues #86 and #96 as well. +) -> DeltaResult<(Vec, Vec)> { + let mut mask_indices = vec![]; + let (_, reorder_indexes) = get_indices( + 0, + requested_schema, + parquet_schema.fields(), + &mut mask_indices, + )?; + Ok((mask_indices, reorder_indexes)) +} + +/// Create a mask that will only select the specified indices from the parquet. `indices` can be +/// computed from a [`Schema`] using [`get_requested_indices`] pub(crate) fn generate_mask( - requested_schema: &SchemaRef, - parquet_schema: &ArrowSchemaRef, + _requested_schema: &SchemaRef, + _parquet_schema: &ArrowSchemaRef, parquet_physical_schema: &SchemaDescriptor, - indicies: &[usize], + indices: &[usize], ) -> Option { - if parquet_schema.fields.size() == requested_schema.fields.len() { - // we assume that in get_requested_indicies we will have caught any column name mismatches, - // so here we can just say that if we request the same # of columns as the parquet file - // actually has, we don't need to mask anything out - None - } else { - Some(ProjectionMask::roots( - parquet_physical_schema, - indicies.to_owned(), - )) + // TODO: Determine if it's worth checking if we're selecting everything and returning None in + // that case + Some(ProjectionMask::leaves( + parquet_physical_schema, + indices.to_owned(), + )) +} + +/// Check if an ordering requires transforming the data in any way. This is true if the indices are +/// NOT in ascending order (so we have to reorder things), or if we need to do any transformation on +/// the data read from parquet. We check the ordering here, and also call +/// `ReorderIndex::needs_transform` on each element to check for other transforms, and to check +/// `Nested` variants recursively. +fn ordering_needs_transform(requested_ordering: &[ReorderIndex]) -> bool { + if requested_ordering.is_empty() { + return false; + } + // we have >=1 element. check that the first element doesn't need a transform + if requested_ordering[0].needs_transform() { + return true; } + // Check for all elements if we need a transform. This is true if any elements are not in order + // (i.e. element[i].index < element[i+1].index), or any element needs a transform + requested_ordering + .windows(2) + .any(|ri| (ri[0].index >= ri[1].index) || ri[1].needs_transform()) } +// we use this as a placeholder for an array and its associated field. We can fill in a Vec of None +// of this type and then set elements of the Vec to Some(FieldArrayOpt) for each column +type FieldArrayOpt = Option<(Arc, Arc)>; + /// Reorder a RecordBatch to match `requested_ordering`. For each non-zero value in /// `requested_ordering`, the column at that index will be added in order to returned batch -pub(crate) fn reorder_record_batch( - input_data: RecordBatch, - requested_ordering: &[usize], -) -> DeltaResult { - if requested_ordering.windows(2).all(|is| is[0] < is[1]) { - // indicies is already sorted, meaning we requested in the order that the columns were +pub(crate) fn reorder_struct_array( + input_data: StructArray, + requested_ordering: &[ReorderIndex], +) -> DeltaResult { + if !ordering_needs_transform(requested_ordering) { + // indices is already sorted, meaning we requested in the order that the columns were // stored in the parquet Ok(input_data) } else { // requested an order different from the parquet, reorder - let input_schema = input_data.schema(); - let mut fields = Vec::with_capacity(requested_ordering.len()); - let reordered_columns = requested_ordering - .iter() - .map(|index| { - fields.push(input_schema.field(*index).clone()); - input_data.column(*index).clone() // cheap Arc clone - }) - .collect(); - let schema = Arc::new(ArrowSchema::new(fields)); - Ok(RecordBatch::try_new(schema, reordered_columns)?) + debug!("Have requested reorder {requested_ordering:#?} on {input_data:?}"); + let num_rows = input_data.len(); + let num_cols = requested_ordering.len(); + let (input_fields, input_cols, null_buffer) = input_data.into_parts(); + let mut final_fields_cols: Vec = vec![None; num_cols]; + for (parquet_position, reorder_index) in requested_ordering.iter().enumerate() { + // for each item, reorder_index.index() tells us where to put it, and its position in + // requested_ordering tells us where it is in the parquet data + match &reorder_index.transform { + ReorderIndexTransform::Cast(target) => { + let col = input_cols[parquet_position].as_ref(); + let col = Arc::new(arrow_cast::cast::cast(col, target)?); + let new_field = Arc::new( + input_fields[parquet_position] + .as_ref() + .clone() + .with_data_type(col.data_type().clone()), + ); + final_fields_cols[reorder_index.index] = Some((new_field, col)); + } + ReorderIndexTransform::Nested(children) => { + match input_cols[parquet_position].data_type() { + ArrowDataType::Struct(_) => { + let struct_array = input_cols[parquet_position].as_struct().clone(); + let result_array = + Arc::new(reorder_struct_array(struct_array, children)?); + // create the new field specifying the correct order for the struct + let new_field = Arc::new(ArrowField::new_struct( + input_fields[parquet_position].name(), + result_array.fields().clone(), + input_fields[parquet_position].is_nullable(), + )); + final_fields_cols[reorder_index.index] = + Some((new_field, result_array)); + } + ArrowDataType::List(_) => { + let list_array = input_cols[parquet_position].as_list::().clone(); + final_fields_cols[reorder_index.index] = reorder_list( + list_array, + input_fields[parquet_position].name(), + children, + )?; + } + ArrowDataType::LargeList(_) => { + let list_array = input_cols[parquet_position].as_list::().clone(); + final_fields_cols[reorder_index.index] = reorder_list( + list_array, + input_fields[parquet_position].name(), + children, + )?; + } + // TODO: MAP + _ => { + return Err(Error::internal_error( + "Nested reorder can only apply to struct/list/map.", + )); + } + } + } + ReorderIndexTransform::Identity => { + final_fields_cols[reorder_index.index] = Some(( + input_fields[parquet_position].clone(), // cheap Arc clone + input_cols[parquet_position].clone(), // cheap Arc clone + )); + } + ReorderIndexTransform::Missing(field) => { + let null_array = Arc::new(new_null_array(field.data_type(), num_rows)); + let field = field.clone(); // cheap Arc clone + final_fields_cols[reorder_index.index] = Some((field, null_array)); + } + } + } + let num_cols = final_fields_cols.len(); + let (field_vec, reordered_columns): (Vec>, _) = + final_fields_cols.into_iter().flatten().unzip(); + if field_vec.len() != num_cols { + Err(Error::internal_error("Found a None in final_fields_cols.")) + } else { + Ok(StructArray::try_new( + field_vec.into(), + reordered_columns, + null_buffer, + )?) + } + } +} + +fn reorder_list( + list_array: GenericListArray, + input_field_name: &str, + children: &[ReorderIndex], +) -> DeltaResult { + let (list_field, offset_buffer, maybe_sa, null_buf) = list_array.into_parts(); + if let Some(struct_array) = maybe_sa.as_struct_opt() { + let struct_array = struct_array.clone(); + let result_array = Arc::new(reorder_struct_array(struct_array, children)?); + let new_list_field = Arc::new(ArrowField::new_struct( + list_field.name(), + result_array.fields().clone(), + result_array.is_nullable(), + )); + let new_field = Arc::new(ArrowField::new_list( + input_field_name, + new_list_field.clone(), + list_field.is_nullable(), + )); + let list = Arc::new(GenericListArray::try_new( + new_list_field, + offset_buffer, + result_array, + null_buf, + )?); + Ok(Some((new_field, list))) + } else { + Err(Error::internal_error( + "Nested reorder of list should have had struct child.", + )) + } +} + +#[cfg(test)] +mod tests { + use std::sync::Arc; + + use arrow::{ + array::AsArray, + buffer::{OffsetBuffer, ScalarBuffer}, + }; + use arrow_array::{ + Array, ArrayRef as ArrowArrayRef, BooleanArray, GenericListArray, Int32Array, StructArray, + }; + use arrow_schema::{ + DataType as ArrowDataType, Field as ArrowField, Fields, Schema as ArrowSchema, + SchemaRef as ArrowSchemaRef, + }; + + use crate::schema::{ArrayType, DataType, MapType, StructField, StructType}; + + use super::{get_requested_indices, reorder_struct_array, ReorderIndex}; + + fn nested_parquet_schema() -> ArrowSchemaRef { + Arc::new(ArrowSchema::new(vec![ + ArrowField::new("i", ArrowDataType::Int32, false), + ArrowField::new( + "nested", + ArrowDataType::Struct( + vec![ + ArrowField::new("int32", ArrowDataType::Int32, false), + ArrowField::new("string", ArrowDataType::Utf8, false), + ] + .into(), + ), + false, + ), + ArrowField::new("j", ArrowDataType::Int32, false), + ])) + } + + #[test] + fn simple_mask_indices() { + let requested_schema = Arc::new(StructType::new(vec![ + StructField::new("i", DataType::INTEGER, false), + StructField::new("s", DataType::STRING, true), + StructField::new("i2", DataType::INTEGER, true), + ])); + let parquet_schema = Arc::new(ArrowSchema::new(vec![ + ArrowField::new("i", ArrowDataType::Int32, false), + ArrowField::new("s", ArrowDataType::Utf8, true), + ArrowField::new("i2", ArrowDataType::Int32, true), + ])); + let (mask_indices, reorder_indices) = + get_requested_indices(&requested_schema, &parquet_schema).unwrap(); + let expect_mask = vec![0, 1, 2]; + let expect_reorder = vec![ + ReorderIndex::identity(0), + ReorderIndex::identity(1), + ReorderIndex::identity(2), + ]; + assert_eq!(mask_indices, expect_mask); + assert_eq!(reorder_indices, expect_reorder); + } + + #[test] + fn ensure_data_types_fails_correctly() { + let requested_schema = Arc::new(StructType::new(vec![ + StructField::new("i", DataType::INTEGER, false), + StructField::new("s", DataType::INTEGER, true), + ])); + let parquet_schema = Arc::new(ArrowSchema::new(vec![ + ArrowField::new("i", ArrowDataType::Int32, false), + ArrowField::new("s", ArrowDataType::Utf8, true), + ])); + let res = get_requested_indices(&requested_schema, &parquet_schema); + assert!(res.is_err()); + + let requested_schema = Arc::new(StructType::new(vec![ + StructField::new("i", DataType::INTEGER, false), + StructField::new("s", DataType::STRING, true), + ])); + let parquet_schema = Arc::new(ArrowSchema::new(vec![ + ArrowField::new("i", ArrowDataType::Int32, false), + ArrowField::new("s", ArrowDataType::Int32, true), + ])); + let res = get_requested_indices(&requested_schema, &parquet_schema); + println!("{res:#?}"); + assert!(res.is_err()); + } + + #[test] + fn mask_with_map() { + let requested_schema = Arc::new(StructType::new(vec![StructField::new( + "map", + DataType::Map(Box::new(MapType::new( + DataType::INTEGER, + DataType::STRING, + false, + ))), + false, + )])); + let parquet_schema = Arc::new(ArrowSchema::new(vec![ArrowField::new_map( + "map", + "entries", + ArrowField::new("i", ArrowDataType::Int32, false), + ArrowField::new("s", ArrowDataType::Utf8, false), + false, + false, + )])); + let (mask_indices, reorder_indices) = + get_requested_indices(&requested_schema, &parquet_schema).unwrap(); + let expect_mask = vec![0, 1]; + let expect_reorder = vec![ReorderIndex::identity(0)]; + assert_eq!(mask_indices, expect_mask); + assert_eq!(reorder_indices, expect_reorder); + } + + #[test] + fn simple_reorder_indices() { + let requested_schema = Arc::new(StructType::new(vec![ + StructField::new("i", DataType::INTEGER, false), + StructField::new("s", DataType::STRING, true), + StructField::new("i2", DataType::INTEGER, true), + ])); + let parquet_schema = Arc::new(ArrowSchema::new(vec![ + ArrowField::new("i2", ArrowDataType::Int32, true), + ArrowField::new("i", ArrowDataType::Int32, false), + ArrowField::new("s", ArrowDataType::Utf8, true), + ])); + let (mask_indices, reorder_indices) = + get_requested_indices(&requested_schema, &parquet_schema).unwrap(); + let expect_mask = vec![0, 1, 2]; + let expect_reorder = vec![ + ReorderIndex::identity(2), + ReorderIndex::identity(0), + ReorderIndex::identity(1), + ]; + assert_eq!(mask_indices, expect_mask); + assert_eq!(reorder_indices, expect_reorder); + } + + #[test] + fn simple_nullable_field_missing() { + let requested_schema = Arc::new(StructType::new(vec![ + StructField::new("i", DataType::INTEGER, false), + StructField::new("s", DataType::STRING, true), + StructField::new("i2", DataType::INTEGER, true), + ])); + let parquet_schema = Arc::new(ArrowSchema::new(vec![ + ArrowField::new("i", ArrowDataType::Int32, false), + ArrowField::new("i2", ArrowDataType::Int32, true), + ])); + let (mask_indices, reorder_indices) = + get_requested_indices(&requested_schema, &parquet_schema).unwrap(); + let expect_mask = vec![0, 1]; + let expect_reorder = vec![ + ReorderIndex::identity(0), + ReorderIndex::identity(2), + ReorderIndex::missing(1, Arc::new(ArrowField::new("s", ArrowDataType::Utf8, true))), + ]; + assert_eq!(mask_indices, expect_mask); + assert_eq!(reorder_indices, expect_reorder); + } + + #[test] + fn nested_indices() { + let requested_schema = Arc::new(StructType::new(vec![ + StructField::new("i", DataType::INTEGER, false), + StructField::new( + "nested", + StructType::new(vec![ + StructField::new("int32", DataType::INTEGER, false), + StructField::new("string", DataType::STRING, false), + ]), + false, + ), + StructField::new("j", DataType::INTEGER, false), + ])); + let parquet_schema = nested_parquet_schema(); + let (mask_indices, reorder_indices) = + get_requested_indices(&requested_schema, &parquet_schema).unwrap(); + let expect_mask = vec![0, 1, 2, 3]; + let expect_reorder = vec![ + ReorderIndex::identity(0), + ReorderIndex::nested( + 1, + vec![ReorderIndex::identity(0), ReorderIndex::identity(1)], + ), + ReorderIndex::identity(2), + ]; + assert_eq!(mask_indices, expect_mask); + assert_eq!(reorder_indices, expect_reorder); + } + + #[test] + fn nested_indices_reorder() { + let requested_schema = Arc::new(StructType::new(vec![ + StructField::new( + "nested", + StructType::new(vec![ + StructField::new("string", DataType::STRING, false), + StructField::new("int32", DataType::INTEGER, false), + ]), + false, + ), + StructField::new("j", DataType::INTEGER, false), + StructField::new("i", DataType::INTEGER, false), + ])); + let parquet_schema = nested_parquet_schema(); + let (mask_indices, reorder_indices) = + get_requested_indices(&requested_schema, &parquet_schema).unwrap(); + let expect_mask = vec![0, 1, 2, 3]; + let expect_reorder = vec![ + ReorderIndex::identity(2), + ReorderIndex::nested( + 0, + vec![ReorderIndex::identity(1), ReorderIndex::identity(0)], + ), + ReorderIndex::identity(1), + ]; + assert_eq!(mask_indices, expect_mask); + assert_eq!(reorder_indices, expect_reorder); + } + + #[test] + fn nested_indices_mask_inner() { + let requested_schema = Arc::new(StructType::new(vec![ + StructField::new("i", DataType::INTEGER, false), + StructField::new( + "nested", + StructType::new(vec![StructField::new("int32", DataType::INTEGER, false)]), + false, + ), + StructField::new("j", DataType::INTEGER, false), + ])); + let parquet_schema = nested_parquet_schema(); + let (mask_indices, reorder_indices) = + get_requested_indices(&requested_schema, &parquet_schema).unwrap(); + let expect_mask = vec![0, 1, 3]; + let expect_reorder = vec![ + ReorderIndex::identity(0), + ReorderIndex::nested(1, vec![ReorderIndex::identity(0)]), + ReorderIndex::identity(2), + ]; + assert_eq!(mask_indices, expect_mask); + assert_eq!(reorder_indices, expect_reorder); + } + + #[test] + fn simple_list_mask() { + let requested_schema = Arc::new(StructType::new(vec![ + StructField::new("i", DataType::INTEGER, false), + StructField::new("list", ArrayType::new(DataType::INTEGER, false), false), + StructField::new("j", DataType::INTEGER, false), + ])); + let parquet_schema = Arc::new(ArrowSchema::new(vec![ + ArrowField::new("i", ArrowDataType::Int32, false), + ArrowField::new( + "list", + ArrowDataType::List(Arc::new(ArrowField::new( + "nested", + ArrowDataType::Int32, + false, + ))), + false, + ), + ArrowField::new("j", ArrowDataType::Int32, false), + ])); + let (mask_indices, reorder_indices) = + get_requested_indices(&requested_schema, &parquet_schema).unwrap(); + let expect_mask = vec![0, 1, 2]; + let expect_reorder = vec![ + ReorderIndex::identity(0), + ReorderIndex::identity(1), + ReorderIndex::identity(2), + ]; + assert_eq!(mask_indices, expect_mask); + assert_eq!(reorder_indices, expect_reorder); + } + + #[test] + fn nested_indices_list() { + let requested_schema = Arc::new(StructType::new(vec![ + StructField::new("i", DataType::INTEGER, false), + StructField::new( + "list", + ArrayType::new( + StructType::new(vec![ + StructField::new("int32", DataType::INTEGER, false), + StructField::new("string", DataType::STRING, false), + ]) + .into(), + false, + ), + false, + ), + StructField::new("j", DataType::INTEGER, false), + ])); + let parquet_schema = Arc::new(ArrowSchema::new(vec![ + ArrowField::new("i", ArrowDataType::Int32, false), + ArrowField::new( + "list", + ArrowDataType::List(Arc::new(ArrowField::new( + "nested", + ArrowDataType::Struct( + vec![ + ArrowField::new("int32", ArrowDataType::Int32, false), + ArrowField::new("string", ArrowDataType::Utf8, false), + ] + .into(), + ), + false, + ))), + false, + ), + ArrowField::new("j", ArrowDataType::Int32, false), + ])); + let (mask_indices, reorder_indices) = + get_requested_indices(&requested_schema, &parquet_schema).unwrap(); + let expect_mask = vec![0, 1, 2, 3]; + let expect_reorder = vec![ + ReorderIndex::identity(0), + ReorderIndex::nested( + 1, + vec![ReorderIndex::identity(0), ReorderIndex::identity(1)], + ), + ReorderIndex::identity(2), + ]; + assert_eq!(mask_indices, expect_mask); + assert_eq!(reorder_indices, expect_reorder); + } + + #[test] + fn nested_indices_unselected_list() { + let requested_schema = Arc::new(StructType::new(vec![ + StructField::new("i", DataType::INTEGER, false), + StructField::new("j", DataType::INTEGER, false), + ])); + let parquet_schema = Arc::new(ArrowSchema::new(vec![ + ArrowField::new("i", ArrowDataType::Int32, false), + ArrowField::new( + "list", + ArrowDataType::List(Arc::new(ArrowField::new( + "nested", + ArrowDataType::Struct( + vec![ + ArrowField::new("int32", ArrowDataType::Int32, false), + ArrowField::new("string", ArrowDataType::Utf8, false), + ] + .into(), + ), + false, + ))), + false, + ), + ArrowField::new("j", ArrowDataType::Int32, false), + ])); + let (mask_indices, reorder_indices) = + get_requested_indices(&requested_schema, &parquet_schema).unwrap(); + let expect_mask = vec![0, 3]; + let expect_reorder = vec![ReorderIndex::identity(0), ReorderIndex::identity(1)]; + assert_eq!(mask_indices, expect_mask); + assert_eq!(reorder_indices, expect_reorder); + } + + #[test] + fn nested_indices_list_mask_inner() { + let requested_schema = Arc::new(StructType::new(vec![ + StructField::new("i", DataType::INTEGER, false), + StructField::new( + "list", + ArrayType::new( + StructType::new(vec![StructField::new("int32", DataType::INTEGER, false)]) + .into(), + false, + ), + false, + ), + StructField::new("j", DataType::INTEGER, false), + ])); + let parquet_schema = Arc::new(ArrowSchema::new(vec![ + ArrowField::new("i", ArrowDataType::Int32, false), + ArrowField::new( + "list", + ArrowDataType::List(Arc::new(ArrowField::new( + "nested", + ArrowDataType::Struct( + vec![ + ArrowField::new("int32", ArrowDataType::Int32, false), + ArrowField::new("string", ArrowDataType::Utf8, false), + ] + .into(), + ), + false, + ))), + false, + ), + ArrowField::new("j", ArrowDataType::Int32, false), + ])); + let (mask_indices, reorder_indices) = + get_requested_indices(&requested_schema, &parquet_schema).unwrap(); + let expect_mask = vec![0, 1, 3]; + let expect_reorder = vec![ + ReorderIndex::identity(0), + ReorderIndex::nested(1, vec![ReorderIndex::identity(0)]), + ReorderIndex::identity(2), + ]; + assert_eq!(mask_indices, expect_mask); + assert_eq!(reorder_indices, expect_reorder); + } + + #[test] + fn nested_indices_list_mask_inner_reorder() { + let requested_schema = Arc::new(StructType::new(vec![ + StructField::new("i", DataType::INTEGER, false), + StructField::new( + "list", + ArrayType::new( + StructType::new(vec![ + StructField::new("string", DataType::STRING, false), + StructField::new("int2", DataType::INTEGER, false), + ]) + .into(), + false, + ), + false, + ), + StructField::new("j", DataType::INTEGER, false), + ])); + let parquet_schema = Arc::new(ArrowSchema::new(vec![ + ArrowField::new("i", ArrowDataType::Int32, false), // field 0 + ArrowField::new( + "list", + ArrowDataType::List(Arc::new(ArrowField::new( + "nested", + ArrowDataType::Struct( + vec![ + ArrowField::new("int1", ArrowDataType::Int32, false), // field 1 + ArrowField::new("int2", ArrowDataType::Int32, false), // field 2 + ArrowField::new("string", ArrowDataType::Utf8, false), // field 3 + ] + .into(), + ), + false, + ))), + false, + ), + ArrowField::new("j", ArrowDataType::Int32, false), // field 4 + ])); + let (mask_indices, reorder_indices) = + get_requested_indices(&requested_schema, &parquet_schema).unwrap(); + let expect_mask = vec![0, 2, 3, 4]; + let expect_reorder = vec![ + ReorderIndex::identity(0), + ReorderIndex::nested( + 1, + vec![ReorderIndex::identity(1), ReorderIndex::identity(0)], + ), + ReorderIndex::identity(2), + ]; + assert_eq!(mask_indices, expect_mask); + assert_eq!(reorder_indices, expect_reorder); + } + + #[test] + fn skipped_struct() { + let requested_schema = Arc::new(StructType::new(vec![ + StructField::new("i", DataType::INTEGER, false), + StructField::new( + "nested", + StructType::new(vec![ + StructField::new("int32", DataType::INTEGER, false), + StructField::new("string", DataType::STRING, false), + ]), + false, + ), + StructField::new("j", DataType::INTEGER, false), + ])); + let parquet_schema = Arc::new(ArrowSchema::new(vec![ + ArrowField::new( + "skipped", + ArrowDataType::Struct( + vec![ + ArrowField::new("int32", ArrowDataType::Int32, false), + ArrowField::new("string", ArrowDataType::Utf8, false), + ] + .into(), + ), + false, + ), + ArrowField::new("j", ArrowDataType::Int32, false), + ArrowField::new( + "nested", + ArrowDataType::Struct( + vec![ + ArrowField::new("int32", ArrowDataType::Int32, false), + ArrowField::new("string", ArrowDataType::Utf8, false), + ] + .into(), + ), + false, + ), + ArrowField::new("i", ArrowDataType::Int32, false), + ])); + let (mask_indices, reorder_indices) = + get_requested_indices(&requested_schema, &parquet_schema).unwrap(); + let expect_mask = vec![2, 3, 4, 5]; + let expect_reorder = vec![ + ReorderIndex::identity(2), + ReorderIndex::nested( + 1, + vec![ReorderIndex::identity(0), ReorderIndex::identity(1)], + ), + ReorderIndex::identity(0), + ]; + assert_eq!(mask_indices, expect_mask); + assert_eq!(reorder_indices, expect_reorder); + } + + fn make_struct_array() -> StructArray { + let boolean = Arc::new(BooleanArray::from(vec![false, false, true, true])); + let int = Arc::new(Int32Array::from(vec![42, 28, 19, 31])); + StructArray::from(vec![ + ( + Arc::new(ArrowField::new("b", ArrowDataType::Boolean, false)), + boolean.clone() as ArrowArrayRef, + ), + ( + Arc::new(ArrowField::new("c", ArrowDataType::Int32, false)), + int.clone() as ArrowArrayRef, + ), + ]) + } + + #[test] + fn simple_reorder_struct() { + let arry = make_struct_array(); + let reorder = vec![ReorderIndex::identity(1), ReorderIndex::identity(0)]; + let ordered = reorder_struct_array(arry, &reorder).unwrap(); + assert_eq!(ordered.column_names(), vec!["c", "b"]); + } + + #[test] + fn nested_reorder_struct() { + let arry1 = Arc::new(make_struct_array()); + let arry2 = Arc::new(make_struct_array()); + let fields: Fields = vec![ + Arc::new(ArrowField::new("b", ArrowDataType::Boolean, false)), + Arc::new(ArrowField::new("c", ArrowDataType::Int32, false)), + ] + .into(); + let nested = StructArray::from(vec![ + ( + Arc::new(ArrowField::new( + "struct1", + ArrowDataType::Struct(fields.clone()), + false, + )), + arry1 as ArrowArrayRef, + ), + ( + Arc::new(ArrowField::new( + "struct2", + ArrowDataType::Struct(fields), + false, + )), + arry2 as ArrowArrayRef, + ), + ]); + let reorder = vec![ + ReorderIndex::nested( + 1, + vec![ReorderIndex::identity(1), ReorderIndex::identity(0)], + ), + ReorderIndex::nested( + 0, + vec![ + ReorderIndex::identity(0), + ReorderIndex::identity(1), + ReorderIndex::missing( + 2, + Arc::new(ArrowField::new("s", ArrowDataType::Utf8, true)), + ), + ], + ), + ]; + let ordered = reorder_struct_array(nested, &reorder).unwrap(); + assert_eq!(ordered.column_names(), vec!["struct2", "struct1"]); + let ordered_s2 = ordered.column(0).as_struct(); + assert_eq!(ordered_s2.column_names(), vec!["b", "c", "s"]); + let ordered_s1 = ordered.column(1).as_struct(); + assert_eq!(ordered_s1.column_names(), vec!["c", "b"]); + } + + #[test] + fn reorder_list_of_struct() { + let boolean = Arc::new(BooleanArray::from(vec![ + false, false, true, true, false, true, + ])); + let int = Arc::new(Int32Array::from(vec![42, 28, 19, 31, 0, 3])); + let list_sa = StructArray::from(vec![ + ( + Arc::new(ArrowField::new("b", ArrowDataType::Boolean, false)), + boolean.clone() as ArrowArrayRef, + ), + ( + Arc::new(ArrowField::new("c", ArrowDataType::Int32, false)), + int.clone() as ArrowArrayRef, + ), + ]); + let offsets = OffsetBuffer::new(ScalarBuffer::from(vec![0, 3, 6])); + let list_field = ArrowField::new("item", list_sa.data_type().clone(), false); + let list = Arc::new(GenericListArray::new( + Arc::new(list_field), + offsets, + Arc::new(list_sa), + None, + )); + let fields: Fields = vec![ + Arc::new(ArrowField::new("b", ArrowDataType::Boolean, false)), + Arc::new(ArrowField::new("c", ArrowDataType::Int32, false)), + ] + .into(); + let list_dt = Arc::new(ArrowField::new( + "list", + ArrowDataType::new_list(ArrowDataType::Struct(fields), false), + false, + )); + let struct_array = StructArray::from(vec![(list_dt, list as ArrowArrayRef)]); + let reorder = vec![ReorderIndex::nested( + 0, + vec![ReorderIndex::identity(1), ReorderIndex::identity(0)], + )]; + let ordered = reorder_struct_array(struct_array, &reorder).unwrap(); + let ordered_list_col = ordered.column(0).as_list::(); + for i in 0..ordered_list_col.len() { + let array_item = ordered_list_col.value(i); + let struct_item = array_item.as_struct(); + assert_eq!(struct_item.column_names(), vec!["c", "b"]); + } + } + + #[test] + fn no_matches() { + let requested_schema = Arc::new(StructType::new(vec![ + StructField::new("s", DataType::STRING, true), + StructField::new("i2", DataType::INTEGER, true), + ])); + let nots_field = ArrowField::new("NOTs", ArrowDataType::Utf8, true); + let noti2_field = ArrowField::new("NOTi2", ArrowDataType::Int32, true); + let parquet_schema = Arc::new(ArrowSchema::new(vec![ + nots_field.clone(), + noti2_field.clone(), + ])); + let (mask_indices, reorder_indices) = + get_requested_indices(&requested_schema, &parquet_schema).unwrap(); + let expect_mask: Vec = vec![]; + let expect_reorder = vec![ + ReorderIndex::missing(0, nots_field.with_name("s").into()), + ReorderIndex::missing(1, noti2_field.with_name("i2").into()), + ]; + assert_eq!(mask_indices, expect_mask); + assert_eq!(reorder_indices, expect_reorder); + } + + #[test] + fn empty_requested_schema() { + let requested_schema = Arc::new(StructType::new(vec![])); + let parquet_schema = Arc::new(ArrowSchema::new(vec![ + ArrowField::new("i", ArrowDataType::Int32, false), + ArrowField::new("s", ArrowDataType::Utf8, true), + ArrowField::new("i2", ArrowDataType::Int32, true), + ])); + let (mask_indices, reorder_indices) = + get_requested_indices(&requested_schema, &parquet_schema).unwrap(); + let expect_mask: Vec = vec![]; + let expect_reorder = vec![]; + assert_eq!(mask_indices, expect_mask); + assert_eq!(reorder_indices, expect_reorder); } } diff --git a/kernel/src/engine/default/json.rs b/kernel/src/engine/default/json.rs index b51970e6c..6c6370863 100644 --- a/kernel/src/engine/default/json.rs +++ b/kernel/src/engine/default/json.rs @@ -220,6 +220,7 @@ impl FileOpener for JsonOpener { mod tests { use std::path::PathBuf; + use arrow::array::AsArray; use arrow_schema::{DataType, Field, Schema as ArrowSchema}; use itertools::Itertools; use object_store::{local::LocalFileSystem, ObjectStore}; @@ -256,6 +257,32 @@ mod tests { assert_eq!(batch.length(), 4); } + #[test] + fn test_parse_json_drop_field() { + let store = Arc::new(LocalFileSystem::new()); + let handler = DefaultJsonHandler::new(store, Arc::new(TokioBackgroundExecutor::new())); + let json_strings = StringArray::from(vec![ + r#"{"add":{"path":"part-00000-fae5310a-a37d-4e51-827b-c3d5516560ca-c000.snappy.parquet","partitionValues":{},"size":635,"modificationTime":1677811178336,"dataChange":true,"stats":"{\"numRecords\":10,\"minValues\":{\"value\":0},\"maxValues\":{\"value\":9},\"nullCount\":{\"value\":0},\"tightBounds\":false}","tags":{"INSERTION_TIME":"1677811178336000","MIN_INSERTION_TIME":"1677811178336000","MAX_INSERTION_TIME":"1677811178336000","OPTIMIZE_TARGET_SIZE":"268435456"},"deletionVector":{"storageType":"u","pathOrInlineDv":"vBn[lx{q8@P<9BNH/isA","offset":1,"sizeInBytes":36,"cardinality":2, "maxRowId": 3}}}"#, + ]); + let output_schema = Arc::new(get_log_schema().clone()); + + let batch: RecordBatch = handler + .parse_json(string_array_to_engine_data(json_strings), output_schema) + .unwrap() + .into_any() + .downcast::() + .map(|sd| sd.into()) + .unwrap(); + assert_eq!(batch.column(0).len(), 1); + let add_array = batch.column_by_name("add").unwrap().as_struct(); + let dv_col = add_array + .column_by_name("deletionVector") + .unwrap() + .as_struct(); + assert!(dv_col.column_by_name("storageType").is_some()); + assert!(dv_col.column_by_name("maxRowId").is_none()); + } + #[tokio::test] async fn test_read_json_files() { let store = Arc::new(LocalFileSystem::new()); diff --git a/kernel/src/engine/default/parquet.rs b/kernel/src/engine/default/parquet.rs index fc66d5126..46bff22cb 100644 --- a/kernel/src/engine/default/parquet.rs +++ b/kernel/src/engine/default/parquet.rs @@ -12,7 +12,7 @@ use parquet::arrow::arrow_reader::{ use parquet::arrow::async_reader::{ParquetObjectReader, ParquetRecordBatchStreamBuilder}; use super::file_stream::{FileOpenFuture, FileOpener, FileStream}; -use crate::engine::arrow_utils::{generate_mask, get_requested_indices, reorder_record_batch}; +use crate::engine::arrow_utils::{generate_mask, get_requested_indices, reorder_struct_array}; use crate::engine::default::executor::TaskExecutor; use crate::schema::SchemaRef; use crate::{DeltaResult, Error, Expression, FileDataReadResultIterator, FileMeta, ParquetHandler}; @@ -141,8 +141,9 @@ impl FileOpener for ParquetOpener { let stream = stream.map(move |rbr| { // re-order each batch if needed - rbr.map_err(Error::Parquet) - .and_then(|rb| reorder_record_batch(rb, &requested_ordering)) + rbr.map_err(Error::Parquet).and_then(|rb| { + reorder_struct_array(rb.into(), &requested_ordering).map(Into::into) + }) }); Ok(stream.boxed()) })) @@ -204,8 +205,9 @@ impl FileOpener for PresignedUrlOpener { let stream = futures::stream::iter(reader); let stream = stream.map(move |rbr| { // re-order each batch if needed - rbr.map_err(Error::Arrow) - .and_then(|rb| reorder_record_batch(rb, &requested_ordering)) + rbr.map_err(Error::Arrow).and_then(|rb| { + reorder_struct_array(rb.into(), &requested_ordering).map(Into::into) + }) }); Ok(stream.boxed()) })) @@ -232,7 +234,7 @@ mod tests { ) -> DeltaResult { engine_data .and_then(ArrowEngineData::try_from_engine_data) - .map(|sd| sd.into()) + .map(Into::into) } #[tokio::test] diff --git a/kernel/src/engine/sync/parquet.rs b/kernel/src/engine/sync/parquet.rs index 828dbe104..cd25a926a 100644 --- a/kernel/src/engine/sync/parquet.rs +++ b/kernel/src/engine/sync/parquet.rs @@ -5,7 +5,7 @@ use tracing::debug; use url::Url; use crate::engine::arrow_data::ArrowEngineData; -use crate::engine::arrow_utils::{generate_mask, get_requested_indices, reorder_record_batch}; +use crate::engine::arrow_utils::{generate_mask, get_requested_indices, reorder_struct_array}; use crate::schema::SchemaRef; use crate::{DeltaResult, Error, Expression, FileDataReadResultIterator, FileMeta, ParquetHandler}; @@ -29,10 +29,8 @@ fn try_create_from_parquet(schema: SchemaRef, location: Url) -> DeltaResult Self { + Self::InternalError(msg.to_string()).with_backtrace() + } + // Capture a backtrace when the error is constructed. #[must_use] pub fn with_backtrace(self) -> Self { diff --git a/kernel/src/schema.rs b/kernel/src/schema.rs index 8ce316e81..309badbca 100644 --- a/kernel/src/schema.rs +++ b/kernel/src/schema.rs @@ -332,6 +332,14 @@ impl MapType { pub const fn value_contains_null(&self) -> bool { self.value_contains_null } + + /// Create a schema assuming the map is stored as a struct with the specified key and value field names + pub fn as_struct_schema(&self, key_name: String, val_name: String) -> Schema { + StructType::new(vec![ + StructField::new(key_name, self.key_type.clone(), false), + StructField::new(val_name, self.value_type.clone(), self.value_contains_null), + ]) + } } fn default_true() -> bool { diff --git a/kernel/tests/data/basic-decimal-table/_delta_log/.00000000000000000000.json.crc b/kernel/tests/data/basic-decimal-table/_delta_log/.00000000000000000000.json.crc new file mode 100644 index 000000000..9e29d16b1 Binary files /dev/null and b/kernel/tests/data/basic-decimal-table/_delta_log/.00000000000000000000.json.crc differ diff --git a/kernel/tests/data/basic-decimal-table/_delta_log/00000000000000000000.json b/kernel/tests/data/basic-decimal-table/_delta_log/00000000000000000000.json new file mode 100644 index 000000000..0aa97e4f4 --- /dev/null +++ b/kernel/tests/data/basic-decimal-table/_delta_log/00000000000000000000.json @@ -0,0 +1,7 @@ +{"commitInfo":{"timestamp":1690853005164,"operation":"WRITE","operationParameters":{"mode":"ErrorIfExists","partitionBy":"[\"part\"]"},"isolationLevel":"Serializable","isBlindAppend":true,"operationMetrics":{"numFiles":"4","numOutputRows":"4","numOutputBytes":"4131"},"engineInfo":"Apache-Spark/3.4.0 Delta-Lake/3.0.0-SNAPSHOT","txnId":"451ba03f-e80c-4fda-9bba-8fdfda856925"}} +{"metaData":{"id":"testId","format":{"provider":"parquet","options":{}},"schemaString":"{\"type\":\"struct\",\"fields\":[{\"name\":\"part\",\"type\":\"decimal(12,5)\",\"nullable\":true,\"metadata\":{}},{\"name\":\"col1\",\"type\":\"decimal(5,2)\",\"nullable\":true,\"metadata\":{}},{\"name\":\"col2\",\"type\":\"decimal(10,5)\",\"nullable\":true,\"metadata\":{}},{\"name\":\"col3\",\"type\":\"decimal(20,10)\",\"nullable\":true,\"metadata\":{}}]}","partitionColumns":["part"],"configuration":{},"createdTime":1690852998865}} +{"protocol":{"minReaderVersion":1,"minWriterVersion":2}} +{"add":{"path":"part=-2342342.23423/part-00000-8f850371-9b03-42c4-9d22-f83bc81c9b68.c000.snappy.parquet","partitionValues":{"part":"-2342342.23423"},"size":1032,"modificationTime":1690853004000,"dataChange":true,"stats":"{\"numRecords\":1,\"minValues\":{\"col1\":-999.99,\"col2\":-99999.99999,\"col3\":-9999999999.9999999999},\"maxValues\":{\"col1\":-999.99,\"col2\":-99999.99999,\"col3\":-9999999999.9999999999},\"nullCount\":{\"col1\":0,\"col2\":0,\"col3\":0}}"}} +{"add":{"path":"part=0.00004/part-00000-1cb60e36-6cd4-4191-a318-ae9355f877c3.c000.snappy.parquet","partitionValues":{"part":"0.00004"},"size":1033,"modificationTime":1690853004000,"dataChange":true,"stats":"{\"numRecords\":1,\"minValues\":{\"col1\":0.00,\"col2\":0.00000,\"col3\":0E-10},\"maxValues\":{\"col1\":0.00,\"col2\":0.00000,\"col3\":0E-10},\"nullCount\":{\"col1\":0,\"col2\":0,\"col3\":0}}"}} +{"add":{"path":"part=234.00000/part-00000-ac109189-97e5-49af-947f-335a5e46ee5c.c000.snappy.parquet","partitionValues":{"part":"234.00000"},"size":1033,"modificationTime":1690853004000,"dataChange":true,"stats":"{\"numRecords\":1,\"minValues\":{\"col1\":1.00,\"col2\":2.00000,\"col3\":3.0000000000},\"maxValues\":{\"col1\":1.00,\"col2\":2.00000,\"col3\":3.0000000000},\"nullCount\":{\"col1\":0,\"col2\":0,\"col3\":0}}"}} +{"add":{"path":"part=2342222.23454/part-00000-d5a0c70f-7cd3-4d32-a9c0-7171a06547c6.c000.snappy.parquet","partitionValues":{"part":"2342222.23454"},"size":1033,"modificationTime":1690853004000,"dataChange":true,"stats":"{\"numRecords\":1,\"minValues\":{\"col1\":111.11,\"col2\":22222.22222,\"col3\":3333333333.3333333333},\"maxValues\":{\"col1\":111.11,\"col2\":22222.22222,\"col3\":3333333333.3333333333},\"nullCount\":{\"col1\":0,\"col2\":0,\"col3\":0}}"}} diff --git a/kernel/tests/data/basic-decimal-table/part=-2342342.23423/.part-00000-8f850371-9b03-42c4-9d22-f83bc81c9b68.c000.snappy.parquet.crc b/kernel/tests/data/basic-decimal-table/part=-2342342.23423/.part-00000-8f850371-9b03-42c4-9d22-f83bc81c9b68.c000.snappy.parquet.crc new file mode 100644 index 000000000..aef64b938 Binary files /dev/null and b/kernel/tests/data/basic-decimal-table/part=-2342342.23423/.part-00000-8f850371-9b03-42c4-9d22-f83bc81c9b68.c000.snappy.parquet.crc differ diff --git a/kernel/tests/data/basic-decimal-table/part=-2342342.23423/part-00000-8f850371-9b03-42c4-9d22-f83bc81c9b68.c000.snappy.parquet b/kernel/tests/data/basic-decimal-table/part=-2342342.23423/part-00000-8f850371-9b03-42c4-9d22-f83bc81c9b68.c000.snappy.parquet new file mode 100644 index 000000000..63820edc4 Binary files /dev/null and b/kernel/tests/data/basic-decimal-table/part=-2342342.23423/part-00000-8f850371-9b03-42c4-9d22-f83bc81c9b68.c000.snappy.parquet differ diff --git a/kernel/tests/data/basic-decimal-table/part=0.00004/.part-00000-1cb60e36-6cd4-4191-a318-ae9355f877c3.c000.snappy.parquet.crc b/kernel/tests/data/basic-decimal-table/part=0.00004/.part-00000-1cb60e36-6cd4-4191-a318-ae9355f877c3.c000.snappy.parquet.crc new file mode 100644 index 000000000..5646c5120 Binary files /dev/null and b/kernel/tests/data/basic-decimal-table/part=0.00004/.part-00000-1cb60e36-6cd4-4191-a318-ae9355f877c3.c000.snappy.parquet.crc differ diff --git a/kernel/tests/data/basic-decimal-table/part=0.00004/part-00000-1cb60e36-6cd4-4191-a318-ae9355f877c3.c000.snappy.parquet b/kernel/tests/data/basic-decimal-table/part=0.00004/part-00000-1cb60e36-6cd4-4191-a318-ae9355f877c3.c000.snappy.parquet new file mode 100644 index 000000000..18db5c238 Binary files /dev/null and b/kernel/tests/data/basic-decimal-table/part=0.00004/part-00000-1cb60e36-6cd4-4191-a318-ae9355f877c3.c000.snappy.parquet differ diff --git a/kernel/tests/data/basic-decimal-table/part=234.00000/.part-00000-ac109189-97e5-49af-947f-335a5e46ee5c.c000.snappy.parquet.crc b/kernel/tests/data/basic-decimal-table/part=234.00000/.part-00000-ac109189-97e5-49af-947f-335a5e46ee5c.c000.snappy.parquet.crc new file mode 100644 index 000000000..6c1db4450 Binary files /dev/null and b/kernel/tests/data/basic-decimal-table/part=234.00000/.part-00000-ac109189-97e5-49af-947f-335a5e46ee5c.c000.snappy.parquet.crc differ diff --git a/kernel/tests/data/basic-decimal-table/part=234.00000/part-00000-ac109189-97e5-49af-947f-335a5e46ee5c.c000.snappy.parquet b/kernel/tests/data/basic-decimal-table/part=234.00000/part-00000-ac109189-97e5-49af-947f-335a5e46ee5c.c000.snappy.parquet new file mode 100644 index 000000000..6645238db Binary files /dev/null and b/kernel/tests/data/basic-decimal-table/part=234.00000/part-00000-ac109189-97e5-49af-947f-335a5e46ee5c.c000.snappy.parquet differ diff --git a/kernel/tests/data/basic-decimal-table/part=2342222.23454/.part-00000-d5a0c70f-7cd3-4d32-a9c0-7171a06547c6.c000.snappy.parquet.crc b/kernel/tests/data/basic-decimal-table/part=2342222.23454/.part-00000-d5a0c70f-7cd3-4d32-a9c0-7171a06547c6.c000.snappy.parquet.crc new file mode 100644 index 000000000..808f4b946 Binary files /dev/null and b/kernel/tests/data/basic-decimal-table/part=2342222.23454/.part-00000-d5a0c70f-7cd3-4d32-a9c0-7171a06547c6.c000.snappy.parquet.crc differ diff --git a/kernel/tests/data/basic-decimal-table/part=2342222.23454/part-00000-d5a0c70f-7cd3-4d32-a9c0-7171a06547c6.c000.snappy.parquet b/kernel/tests/data/basic-decimal-table/part=2342222.23454/part-00000-d5a0c70f-7cd3-4d32-a9c0-7171a06547c6.c000.snappy.parquet new file mode 100644 index 000000000..e9095a569 Binary files /dev/null and b/kernel/tests/data/basic-decimal-table/part=2342222.23454/part-00000-d5a0c70f-7cd3-4d32-a9c0-7171a06547c6.c000.snappy.parquet differ diff --git a/kernel/tests/data/data-reader-timestamp_ntz/_delta_log/.00000000000000000000.json.crc b/kernel/tests/data/data-reader-timestamp_ntz/_delta_log/.00000000000000000000.json.crc new file mode 100644 index 000000000..2f15b9f18 Binary files /dev/null and b/kernel/tests/data/data-reader-timestamp_ntz/_delta_log/.00000000000000000000.json.crc differ diff --git a/kernel/tests/data/data-reader-timestamp_ntz/_delta_log/.00000000000000000001.json.crc b/kernel/tests/data/data-reader-timestamp_ntz/_delta_log/.00000000000000000001.json.crc new file mode 100644 index 000000000..429df3251 Binary files /dev/null and b/kernel/tests/data/data-reader-timestamp_ntz/_delta_log/.00000000000000000001.json.crc differ diff --git a/kernel/tests/data/data-reader-timestamp_ntz/_delta_log/00000000000000000000.json b/kernel/tests/data/data-reader-timestamp_ntz/_delta_log/00000000000000000000.json new file mode 100644 index 000000000..f17ec3568 --- /dev/null +++ b/kernel/tests/data/data-reader-timestamp_ntz/_delta_log/00000000000000000000.json @@ -0,0 +1,3 @@ +{"commitInfo":{"timestamp":1712333988110,"operation":"CREATE TABLE","operationParameters":{"partitionBy":"[\"tsNtzPartition\"]","clusterBy":"[]","description":null,"isManaged":"false","properties":"{}"},"isolationLevel":"Serializable","isBlindAppend":true,"operationMetrics":{},"engineInfo":"Apache-Spark/3.5.0 Delta-Lake/3.2.0-SNAPSHOT","txnId":"fecbfd56-6849-421b-8439-070f0d694787"}} +{"metaData":{"id":"testId","format":{"provider":"parquet","options":{}},"schemaString":"{\"type\":\"struct\",\"fields\":[{\"name\":\"id\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}},{\"name\":\"tsNtz\",\"type\":\"timestamp_ntz\",\"nullable\":true,\"metadata\":{}},{\"name\":\"tsNtzPartition\",\"type\":\"timestamp_ntz\",\"nullable\":true,\"metadata\":{}}]}","partitionColumns":["tsNtzPartition"],"configuration":{},"createdTime":1712333987987}} +{"protocol":{"minReaderVersion":3,"minWriterVersion":7,"readerFeatures":["timestampNtz"],"writerFeatures":["timestampNtz"]}} diff --git a/kernel/tests/data/data-reader-timestamp_ntz/_delta_log/00000000000000000001.json b/kernel/tests/data/data-reader-timestamp_ntz/_delta_log/00000000000000000001.json new file mode 100644 index 000000000..67749ceec --- /dev/null +++ b/kernel/tests/data/data-reader-timestamp_ntz/_delta_log/00000000000000000001.json @@ -0,0 +1,5 @@ +{"commitInfo":{"timestamp":1712333992682,"operation":"WRITE","operationParameters":{"mode":"Append","partitionBy":"[]"},"readVersion":0,"isolationLevel":"Serializable","isBlindAppend":true,"operationMetrics":{"numFiles":"4","numOutputRows":"9","numOutputBytes":"2940"},"engineInfo":"Apache-Spark/3.5.0 Delta-Lake/3.2.0-SNAPSHOT","txnId":"39f277cb-1414-419a-b634-f6a983ed9b37"}} +{"add":{"path":"tsNtzPartition=2013-07-05%2017%253A01%253A00.123456/part-00000-6240e68e-2304-449a-a1e6-0e24866d3508.c000.snappy.parquet","partitionValues":{"tsNtzPartition":"2013-07-05 17:01:00.123456"},"size":726,"modificationTime":1712333992612,"dataChange":true,"stats":"{\"numRecords\":1,\"minValues\":{\"id\":3,\"tsNtz\":\"2021-11-18T02:30:00.123\"},\"maxValues\":{\"id\":3,\"tsNtz\":\"2021-11-18T02:30:00.123\"},\"nullCount\":{\"id\":0,\"tsNtz\":0}}"}} +{"add":{"path":"tsNtzPartition=2021-11-18%2002%253A30%253A00.123456/part-00000-65fcd5cb-f2f3-44f4-96ef-f43825143ba9.c000.snappy.parquet","partitionValues":{"tsNtzPartition":"2021-11-18 02:30:00.123456"},"size":742,"modificationTime":1712333992666,"dataChange":true,"stats":"{\"numRecords\":3,\"minValues\":{\"id\":0,\"tsNtz\":\"2013-07-05T17:01:00.123\"},\"maxValues\":{\"id\":2,\"tsNtz\":\"2021-11-18T02:30:00.123\"},\"nullCount\":{\"id\":0,\"tsNtz\":1}}"}} +{"add":{"path":"tsNtzPartition=__HIVE_DEFAULT_PARTITION__/part-00001-53fd3b3b-7773-459a-921c-bb64bf0bbd03.c000.snappy.parquet","partitionValues":{"tsNtzPartition":null},"size":742,"modificationTime":1712333992612,"dataChange":true,"stats":"{\"numRecords\":3,\"minValues\":{\"id\":6,\"tsNtz\":\"2013-07-05T17:01:00.123\"},\"maxValues\":{\"id\":8,\"tsNtz\":\"2021-11-18T02:30:00.123\"},\"nullCount\":{\"id\":0,\"tsNtz\":1}}"}} +{"add":{"path":"tsNtzPartition=2013-07-05%2017%253A01%253A00.123456/part-00001-336e3e5f-a202-4bd9-b117-28d871bbb639.c000.snappy.parquet","partitionValues":{"tsNtzPartition":"2013-07-05 17:01:00.123456"},"size":730,"modificationTime":1712333992659,"dataChange":true,"stats":"{\"numRecords\":2,\"minValues\":{\"id\":4,\"tsNtz\":\"2013-07-05T17:01:00.123\"},\"maxValues\":{\"id\":5,\"tsNtz\":\"2013-07-05T17:01:00.123\"},\"nullCount\":{\"id\":0,\"tsNtz\":1}}"}} diff --git a/kernel/tests/data/data-reader-timestamp_ntz/tsNtzPartition=2013-07-05 17%3A01%3A00.123456/.part-00000-6240e68e-2304-449a-a1e6-0e24866d3508.c000.snappy.parquet.crc b/kernel/tests/data/data-reader-timestamp_ntz/tsNtzPartition=2013-07-05 17%3A01%3A00.123456/.part-00000-6240e68e-2304-449a-a1e6-0e24866d3508.c000.snappy.parquet.crc new file mode 100644 index 000000000..e7bef1966 Binary files /dev/null and b/kernel/tests/data/data-reader-timestamp_ntz/tsNtzPartition=2013-07-05 17%3A01%3A00.123456/.part-00000-6240e68e-2304-449a-a1e6-0e24866d3508.c000.snappy.parquet.crc differ diff --git a/kernel/tests/data/data-reader-timestamp_ntz/tsNtzPartition=2013-07-05 17%3A01%3A00.123456/.part-00001-336e3e5f-a202-4bd9-b117-28d871bbb639.c000.snappy.parquet.crc b/kernel/tests/data/data-reader-timestamp_ntz/tsNtzPartition=2013-07-05 17%3A01%3A00.123456/.part-00001-336e3e5f-a202-4bd9-b117-28d871bbb639.c000.snappy.parquet.crc new file mode 100644 index 000000000..194342167 Binary files /dev/null and b/kernel/tests/data/data-reader-timestamp_ntz/tsNtzPartition=2013-07-05 17%3A01%3A00.123456/.part-00001-336e3e5f-a202-4bd9-b117-28d871bbb639.c000.snappy.parquet.crc differ diff --git a/kernel/tests/data/data-reader-timestamp_ntz/tsNtzPartition=2013-07-05 17%3A01%3A00.123456/part-00000-6240e68e-2304-449a-a1e6-0e24866d3508.c000.snappy.parquet b/kernel/tests/data/data-reader-timestamp_ntz/tsNtzPartition=2013-07-05 17%3A01%3A00.123456/part-00000-6240e68e-2304-449a-a1e6-0e24866d3508.c000.snappy.parquet new file mode 100644 index 000000000..c8ad1d4ac Binary files /dev/null and b/kernel/tests/data/data-reader-timestamp_ntz/tsNtzPartition=2013-07-05 17%3A01%3A00.123456/part-00000-6240e68e-2304-449a-a1e6-0e24866d3508.c000.snappy.parquet differ diff --git a/kernel/tests/data/data-reader-timestamp_ntz/tsNtzPartition=2013-07-05 17%3A01%3A00.123456/part-00001-336e3e5f-a202-4bd9-b117-28d871bbb639.c000.snappy.parquet b/kernel/tests/data/data-reader-timestamp_ntz/tsNtzPartition=2013-07-05 17%3A01%3A00.123456/part-00001-336e3e5f-a202-4bd9-b117-28d871bbb639.c000.snappy.parquet new file mode 100644 index 000000000..8fa4bfba7 Binary files /dev/null and b/kernel/tests/data/data-reader-timestamp_ntz/tsNtzPartition=2013-07-05 17%3A01%3A00.123456/part-00001-336e3e5f-a202-4bd9-b117-28d871bbb639.c000.snappy.parquet differ diff --git a/kernel/tests/data/data-reader-timestamp_ntz/tsNtzPartition=2021-11-18 02%3A30%3A00.123456/.part-00000-65fcd5cb-f2f3-44f4-96ef-f43825143ba9.c000.snappy.parquet.crc b/kernel/tests/data/data-reader-timestamp_ntz/tsNtzPartition=2021-11-18 02%3A30%3A00.123456/.part-00000-65fcd5cb-f2f3-44f4-96ef-f43825143ba9.c000.snappy.parquet.crc new file mode 100644 index 000000000..dada6b5ed Binary files /dev/null and b/kernel/tests/data/data-reader-timestamp_ntz/tsNtzPartition=2021-11-18 02%3A30%3A00.123456/.part-00000-65fcd5cb-f2f3-44f4-96ef-f43825143ba9.c000.snappy.parquet.crc differ diff --git a/kernel/tests/data/data-reader-timestamp_ntz/tsNtzPartition=2021-11-18 02%3A30%3A00.123456/part-00000-65fcd5cb-f2f3-44f4-96ef-f43825143ba9.c000.snappy.parquet b/kernel/tests/data/data-reader-timestamp_ntz/tsNtzPartition=2021-11-18 02%3A30%3A00.123456/part-00000-65fcd5cb-f2f3-44f4-96ef-f43825143ba9.c000.snappy.parquet new file mode 100644 index 000000000..0fe2570cb Binary files /dev/null and b/kernel/tests/data/data-reader-timestamp_ntz/tsNtzPartition=2021-11-18 02%3A30%3A00.123456/part-00000-65fcd5cb-f2f3-44f4-96ef-f43825143ba9.c000.snappy.parquet differ diff --git a/kernel/tests/data/data-reader-timestamp_ntz/tsNtzPartition=__HIVE_DEFAULT_PARTITION__/.part-00001-53fd3b3b-7773-459a-921c-bb64bf0bbd03.c000.snappy.parquet.crc b/kernel/tests/data/data-reader-timestamp_ntz/tsNtzPartition=__HIVE_DEFAULT_PARTITION__/.part-00001-53fd3b3b-7773-459a-921c-bb64bf0bbd03.c000.snappy.parquet.crc new file mode 100644 index 000000000..964841a09 Binary files /dev/null and b/kernel/tests/data/data-reader-timestamp_ntz/tsNtzPartition=__HIVE_DEFAULT_PARTITION__/.part-00001-53fd3b3b-7773-459a-921c-bb64bf0bbd03.c000.snappy.parquet.crc differ diff --git a/kernel/tests/data/data-reader-timestamp_ntz/tsNtzPartition=__HIVE_DEFAULT_PARTITION__/part-00001-53fd3b3b-7773-459a-921c-bb64bf0bbd03.c000.snappy.parquet b/kernel/tests/data/data-reader-timestamp_ntz/tsNtzPartition=__HIVE_DEFAULT_PARTITION__/part-00001-53fd3b3b-7773-459a-921c-bb64bf0bbd03.c000.snappy.parquet new file mode 100644 index 000000000..1d1e6fb69 Binary files /dev/null and b/kernel/tests/data/data-reader-timestamp_ntz/tsNtzPartition=__HIVE_DEFAULT_PARTITION__/part-00001-53fd3b3b-7773-459a-921c-bb64bf0bbd03.c000.snappy.parquet differ diff --git a/kernel/tests/read.rs b/kernel/tests/read.rs index ed2f15240..1b760fa84 100644 --- a/kernel/tests/read.rs +++ b/kernel/tests/read.rs @@ -7,6 +7,7 @@ use arrow::array::{ArrayRef, Int32Array, StringArray}; use arrow::compute::filter_record_batch; use arrow::error::ArrowError; use arrow::record_batch::RecordBatch; +use arrow_schema::SchemaRef as ArrowSchemaRef; use arrow_select::concat::concat_batches; use delta_kernel::actions::deletion_vector::split_vector; use delta_kernel::engine::arrow_data::ArrowEngineData; @@ -27,7 +28,7 @@ const PARQUET_FILE2: &str = "part-00001-c506e79a-0bf8-4e2b-a42b-9731b2e490ae-c00 const METADATA: &str = r#"{"commitInfo":{"timestamp":1587968586154,"operation":"WRITE","operationParameters":{"mode":"ErrorIfExists","partitionBy":"[]"},"isBlindAppend":true}} {"protocol":{"minReaderVersion":1,"minWriterVersion":2}} -{"metaData":{"id":"5fba94ed-9794-4965-ba6e-6ee3c0d22af9","format":{"provider":"parquet","options":{}},"schemaString":"{\"type\":\"struct\",\"fields\":[{\"name\":\"id\",\"type\":\"long\",\"nullable\":true,\"metadata\":{}},{\"name\":\"val\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}}]}","partitionColumns":[],"configuration":{},"createdTime":1587968585495}}"#; +{"metaData":{"id":"5fba94ed-9794-4965-ba6e-6ee3c0d22af9","format":{"provider":"parquet","options":{}},"schemaString":"{\"type\":\"struct\",\"fields\":[{\"name\":\"id\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}},{\"name\":\"val\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}}]}","partitionColumns":[],"configuration":{},"createdTime":1587968585495}}"#; enum TestAction { Add(String), @@ -301,7 +302,7 @@ async fn stats() -> Result<(), Box> { use BinaryOperator::{ Equal, GreaterThan, GreaterThanOrEqual, LessThan, LessThanOrEqual, NotEqual, }; - let test_cases: Vec<(_, i64, _)> = vec![ + let test_cases: Vec<(_, i32, _)> = vec![ (Equal, 0, vec![]), (Equal, 1, vec![&batch1]), (Equal, 3, vec![&batch1]), @@ -399,6 +400,7 @@ fn read_with_execute( scan: &Scan, expected: &[String], ) -> Result<(), Box> { + let result_schema: ArrowSchemaRef = Arc::new(scan.schema().as_ref().try_into()?); let scan_results = scan.execute(engine)?; let batches: Vec = scan_results .into_iter() @@ -421,8 +423,7 @@ fn read_with_execute( if expected.is_empty() { assert_eq!(batches.len(), 0); } else { - let schema = batches[0].schema(); - let batch = concat_batches(&schema, &batches)?; + let batch = concat_batches(&result_schema, &batches)?; assert_batches_sorted_eq!(expected, &[batch]); } Ok(()) @@ -458,6 +459,7 @@ fn read_with_scan_data( expected: &[String], ) -> Result<(), Box> { let global_state = scan.global_scan_state(); + let result_schema: ArrowSchemaRef = Arc::new(scan.schema().as_ref().try_into()?); let scan_data = scan.scan_data(engine)?; let mut scan_files = vec![]; for data in scan_data { @@ -511,8 +513,7 @@ fn read_with_scan_data( if expected.is_empty() { assert_eq!(batches.len(), 0); } else { - let schema = batches[0].schema(); - let batch = concat_batches(&schema, &batches)?; + let batch = concat_batches(&result_schema, &batches)?; assert_batches_sorted_eq!(expected, &[batch]); } Ok(()) @@ -987,18 +988,60 @@ fn with_predicate_and_removes() -> Result<(), Box> { #[test] fn short_dv() -> Result<(), Box> { let expected = vec![ - "+----+-------+-------------------------+---------------------+", - "| id | value | timestamp | rand |", - "+----+-------+-------------------------+---------------------+", - "| 3 | 3 | 2023-05-31T18:58:33.633 | 0.7918174793484931 |", - "| 4 | 4 | 2023-05-31T18:58:33.633 | 0.9281049271981882 |", - "| 5 | 5 | 2023-05-31T18:58:33.633 | 0.27796520310701633 |", - "| 6 | 6 | 2023-05-31T18:58:33.633 | 0.15263801464228832 |", - "| 7 | 7 | 2023-05-31T18:58:33.633 | 0.1981143710215575 |", - "| 8 | 8 | 2023-05-31T18:58:33.633 | 0.3069439236599195 |", - "| 9 | 9 | 2023-05-31T18:58:33.633 | 0.5175919190815845 |", - "+----+-------+-------------------------+---------------------+", + "+----+-------+--------------------------+---------------------+", + "| id | value | timestamp | rand |", + "+----+-------+--------------------------+---------------------+", + "| 3 | 3 | 2023-05-31T18:58:33.633Z | 0.7918174793484931 |", + "| 4 | 4 | 2023-05-31T18:58:33.633Z | 0.9281049271981882 |", + "| 5 | 5 | 2023-05-31T18:58:33.633Z | 0.27796520310701633 |", + "| 6 | 6 | 2023-05-31T18:58:33.633Z | 0.15263801464228832 |", + "| 7 | 7 | 2023-05-31T18:58:33.633Z | 0.1981143710215575 |", + "| 8 | 8 | 2023-05-31T18:58:33.633Z | 0.3069439236599195 |", + "| 9 | 9 | 2023-05-31T18:58:33.633Z | 0.5175919190815845 |", + "+----+-------+--------------------------+---------------------+", ]; read_table_data_str("./tests/data/with-short-dv/", None, None, expected)?; Ok(()) } + +#[test] +fn basic_decimal() -> Result<(), Box> { + let expected = vec![ + "+----------------+---------+--------------+------------------------+", + "| part | col1 | col2 | col3 |", + "+----------------+---------+--------------+------------------------+", + "| -2342342.23423 | -999.99 | -99999.99999 | -9999999999.9999999999 |", + "| 0.00004 | 0.00 | 0.00000 | 0.0000000000 |", + "| 234.00000 | 1.00 | 2.00000 | 3.0000000000 |", + "| 2342222.23454 | 111.11 | 22222.22222 | 3333333333.3333333333 |", + "+----------------+---------+--------------+------------------------+", + ]; + read_table_data_str("./tests/data/basic-decimal-table/", None, None, expected)?; + Ok(()) +} + +#[test] +fn timestamp_ntz() -> Result<(), Box> { + let expected = vec![ + "+----+----------------------------+----------------------------+", + "| id | tsNtz | tsNtzPartition |", + "+----+----------------------------+----------------------------+", + "| 0 | 2021-11-18T02:30:00.123456 | 2021-11-18T02:30:00.123456 |", + "| 1 | 2013-07-05T17:01:00.123456 | 2021-11-18T02:30:00.123456 |", + "| 2 | | 2021-11-18T02:30:00.123456 |", + "| 3 | 2021-11-18T02:30:00.123456 | 2013-07-05T17:01:00.123456 |", + "| 4 | 2013-07-05T17:01:00.123456 | 2013-07-05T17:01:00.123456 |", + "| 5 | | 2013-07-05T17:01:00.123456 |", + "| 6 | 2021-11-18T02:30:00.123456 | |", + "| 7 | 2013-07-05T17:01:00.123456 | |", + "| 8 | | |", + "+----+----------------------------+----------------------------+", + ]; + read_table_data_str( + "./tests/data/data-reader-timestamp_ntz/", + None, + None, + expected, + )?; + Ok(()) +}