Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: schema evolution not coercing with large arrow types #2305

Merged
merged 6 commits into from
Mar 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions crates/core/src/kernel/arrow/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -261,6 +261,7 @@ impl TryFrom<&ArrowDataType> for DataType {
panic!("DataType::Map should contain a struct field child");
}
}
ArrowDataType::Dictionary(_, value_type) => Ok(value_type.as_ref().try_into()?),
s => Err(ArrowError::SchemaError(format!(
"Invalid data type for Delta Lake: {s}"
))),
Expand Down
192 changes: 164 additions & 28 deletions crates/core/src/operations/cast.rs
Original file line number Diff line number Diff line change
@@ -1,65 +1,78 @@
//! Provide common cast functionality for callers
//!
use arrow::datatypes::DataType::Dictionary;
use crate::kernel::{
ArrayType, DataType as DeltaDataType, MapType, MetadataValue, StructField, StructType,
};
use arrow_array::{new_null_array, Array, ArrayRef, RecordBatch, StructArray};
use arrow_cast::{cast_with_options, CastOptions};
use arrow_schema::{
ArrowError, DataType, Field as ArrowField, Fields, Schema as ArrowSchema,
SchemaRef as ArrowSchemaRef,
};
use arrow_schema::{ArrowError, DataType, Fields, SchemaRef as ArrowSchemaRef};
use std::collections::HashMap;
use std::sync::Arc;

use crate::DeltaResult;

pub(crate) fn merge_field(left: &ArrowField, right: &ArrowField) -> Result<ArrowField, ArrowError> {
if let Dictionary(_, value_type) = right.data_type() {
if value_type.equals_datatype(left.data_type()) {
return Ok(left.clone());
}
}
if let Dictionary(_, value_type) = left.data_type() {
if value_type.equals_datatype(right.data_type()) {
return Ok(right.clone());
fn try_merge_metadata(
left: &mut HashMap<String, MetadataValue>,
right: &HashMap<String, MetadataValue>,
) -> Result<(), ArrowError> {
for (k, v) in right {
if let Some(vl) = left.get(k) {
if vl != v {
return Err(ArrowError::SchemaError(format!(
"Cannot merge metadata with different values for key {}",
k
)));
}
} else {
left.insert(k.clone(), v.clone());
}
}
let mut new_field = left.clone();
new_field.try_merge(right)?;
Ok(new_field)
Ok(())
}

pub(crate) fn merge_schema(
left: ArrowSchema,
right: ArrowSchema,
) -> Result<ArrowSchema, ArrowError> {
pub(crate) fn merge_struct(
left: &StructType,
right: &StructType,
) -> Result<StructType, ArrowError> {
let mut errors = Vec::with_capacity(left.fields().len());
let merged_fields: Result<Vec<ArrowField>, ArrowError> = left
let merged_fields: Result<Vec<StructField>, ArrowError> = left
.fields()
.iter()
.map(|field| {
let right_field = right.field_with_name(field.name());
if let Ok(right_field) = right_field {
let field_or_not = merge_field(field.as_ref(), right_field);
match field_or_not {
let type_or_not = merge_type(field.data_type(), right_field.data_type());
match type_or_not {
Err(e) => {
errors.push(e.to_string());
Err(e)
}
Ok(f) => Ok(f),
Ok(f) => {
let mut new_field = StructField::new(
field.name(),
f,
field.is_nullable() || right_field.is_nullable(),
);

new_field.metadata = field.metadata.clone();
try_merge_metadata(&mut new_field.metadata, &right_field.metadata)?;
Ok(new_field)
}
}
} else {
Ok(field.as_ref().clone())
Ok(field.clone())
}
})
.collect();
match merged_fields {
Ok(mut fields) => {
for field in right.fields() {
if !left.field_with_name(field.name()).is_ok() {
fields.push(field.as_ref().clone());
fields.push(field.clone());
}
}

Ok(ArrowSchema::new(fields))
Ok(StructType::new(fields))
}
Err(e) => {
errors.push(e.to_string());
Expand All @@ -68,6 +81,51 @@ pub(crate) fn merge_schema(
}
}

pub(crate) fn merge_type(
left: &DeltaDataType,
right: &DeltaDataType,
) -> Result<DeltaDataType, ArrowError> {
if left == right {
return Ok(left.clone());
}
match (left, right) {
(DeltaDataType::Array(a), DeltaDataType::Array(b)) => {
let merged = merge_type(&a.element_type, &b.element_type)?;
Ok(DeltaDataType::Array(Box::new(ArrayType::new(
merged,
a.contains_null() || b.contains_null(),
))))
}
(DeltaDataType::Map(a), DeltaDataType::Map(b)) => {
let merged_key = merge_type(&a.key_type, &b.key_type)?;
let merged_value = merge_type(&a.value_type, &b.value_type)?;
Ok(DeltaDataType::Map(Box::new(MapType::new(
merged_key,
merged_value,
a.value_contains_null() || b.value_contains_null(),
))))
}
(DeltaDataType::Struct(a), DeltaDataType::Struct(b)) => {
let merged = merge_struct(a, b)?;
Ok(DeltaDataType::Struct(Box::new(merged)))
}
(a, b) => Err(ArrowError::SchemaError(format!(
"Cannot merge types {} and {}",
a, b
))),
}
}

pub(crate) fn merge_schema(
left: ArrowSchemaRef,
right: ArrowSchemaRef,
) -> Result<ArrowSchemaRef, ArrowError> {
let left_delta: StructType = left.try_into()?;
let right_delta: StructType = right.try_into()?;
let merged: StructType = merge_struct(&left_delta, &right_delta)?;
Ok(Arc::new((&merged).try_into()?))
}

fn cast_struct(
struct_array: &StructArray,
fields: &Fields,
Expand Down Expand Up @@ -142,13 +200,91 @@ pub fn cast_record_batch(

#[cfg(test)]
mod tests {
use crate::kernel::{
ArrayType as DeltaArrayType, DataType as DeltaDataType, StructField as DeltaStructField,
StructType as DeltaStructType,
};
use crate::operations::cast::MetadataValue;
use crate::operations::cast::{cast_record_batch, is_cast_required};
use arrow::array::ArrayData;
use arrow_array::{Array, ArrayRef, ListArray, RecordBatch};
use arrow_buffer::Buffer;
use arrow_schema::{DataType, Field, FieldRef, Fields, Schema, SchemaRef};
use std::collections::HashMap;
use std::sync::Arc;

#[test]
fn test_merge_schema_with_dict() {
let left_schema = Arc::new(Schema::new(vec![Field::new(
"f",
DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)),
false,
)]));
let right_schema = Arc::new(Schema::new(vec![Field::new(
"f",
DataType::LargeUtf8,
true,
)]));

let result = super::merge_schema(left_schema, right_schema).unwrap();
assert_eq!(result.fields().len(), 1);
let delta_type: DeltaDataType = result.fields()[0].data_type().try_into().unwrap();
assert_eq!(delta_type, DeltaDataType::STRING);
assert_eq!(result.fields()[0].is_nullable(), true);
}

#[test]
fn test_merge_schema_with_meta() {
let mut left_meta = HashMap::new();
left_meta.insert("a".to_string(), "a1".to_string());
let left_schema = DeltaStructType::new(vec![DeltaStructField::new(
"f",
DeltaDataType::STRING,
false,
)
.with_metadata(left_meta)]);
let mut right_meta = HashMap::new();
right_meta.insert("b".to_string(), "b2".to_string());
let right_schema = DeltaStructType::new(vec![DeltaStructField::new(
"f",
DeltaDataType::STRING,
true,
)
.with_metadata(right_meta)]);

let result = super::merge_struct(&left_schema, &right_schema).unwrap();
assert_eq!(result.fields().len(), 1);
let delta_type = result.fields()[0].data_type();
assert_eq!(delta_type, &DeltaDataType::STRING);
let mut expected_meta = HashMap::new();
expected_meta.insert("a".to_string(), MetadataValue::String("a1".to_string()));
expected_meta.insert("b".to_string(), MetadataValue::String("b2".to_string()));
assert_eq!(result.fields()[0].metadata(), &expected_meta);
}

#[test]
fn test_merge_schema_with_nested() {
let left_schema = Arc::new(Schema::new(vec![Field::new(
"f",
DataType::LargeList(Arc::new(Field::new("item", DataType::Utf8, false))),
false,
)]));
let right_schema = Arc::new(Schema::new(vec![Field::new(
"f",
DataType::List(Arc::new(Field::new("item", DataType::LargeUtf8, false))),
true,
)]));

let result = super::merge_schema(left_schema, right_schema).unwrap();
assert_eq!(result.fields().len(), 1);
let delta_type: DeltaDataType = result.fields()[0].data_type().try_into().unwrap();
assert_eq!(
delta_type,
DeltaDataType::Array(Box::new(DeltaArrayType::new(DeltaDataType::STRING, false)))
);
assert_eq!(result.fields()[0].is_nullable(), true);
}

#[test]
fn test_cast_record_batch_with_list_non_default_item() {
let array = Arc::new(make_list_array()) as ArrayRef;
Expand Down
6 changes: 2 additions & 4 deletions crates/core/src/operations/write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -623,10 +623,8 @@ impl std::future::IntoFuture for WriteBuilder {
if this.mode == SaveMode::Overwrite && this.schema_mode.is_some() {
new_schema = None // we overwrite anyway, so no need to cast
} else if this.schema_mode == Some(SchemaMode::Merge) {
new_schema = Some(Arc::new(merge_schema(
table_schema.as_ref().clone(),
schema.as_ref().clone(),
)?));
new_schema =
Some(merge_schema(table_schema.clone(), schema.clone())?);
} else {
return Err(schema_err.into());
}
Expand Down
8 changes: 3 additions & 5 deletions crates/core/src/writer/record_batch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -306,11 +306,9 @@ impl PartitionWriter {
WriteMode::MergeSchema => {
debug!("The writer and record batch schemas do not match, merging");

let merged = merge_schema(
self.arrow_schema.as_ref().clone(),
record_batch.schema().as_ref().clone(),
)?;
self.arrow_schema = Arc::new(merged);
let merged =
merge_schema(self.arrow_schema.clone(), record_batch.schema().clone())?;
self.arrow_schema = merged;

let mut cols = vec![];
for field in self.arrow_schema.fields() {
Expand Down
2 changes: 1 addition & 1 deletion python/tests/test_writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,7 @@ def test_update_schema_rust_writer_append(existing_table: DeltaTable):
)
with pytest.raises(
SchemaMismatchError,
match="Schema error: Fail to merge schema field 'utf8' because the from data_type = Int64 does not equal Utf8",
match="Schema error: Cannot merge types string and long",
):
write_deltalake(
existing_table,
Expand Down
Loading