From f3e8db9e9ec6494c7cedec9933ef90739727971f Mon Sep 17 00:00:00 2001 From: "R. Tyler Croy" Date: Mon, 1 Jan 2024 11:37:50 -0800 Subject: [PATCH] Introduce schema evolution on RecordBatchWriter This commit introduces the `WriteMode` enum and the ability to specify writes which should enable [schema evolution](https://delta.io/blog/2023-02-08-delta-lake-schema-evolution/). The result of this is a new `metaData` action added to the transaction log with the write which reflects the updated schema There are some caveats however such as all writes must include non-nullable columns. Fixes #1386 Sponsored-by: Raft, LLC. --- crates/deltalake-core/src/writer/json.rs | 43 +- crates/deltalake-core/src/writer/mod.rs | 69 ++- .../deltalake-core/src/writer/record_batch.rs | 396 +++++++++++++++--- 3 files changed, 401 insertions(+), 107 deletions(-) diff --git a/crates/deltalake-core/src/writer/json.rs b/crates/deltalake-core/src/writer/json.rs index 0b970ae6d7..0cbfea316f 100644 --- a/crates/deltalake-core/src/writer/json.rs +++ b/crates/deltalake-core/src/writer/json.rs @@ -6,7 +6,7 @@ use std::sync::Arc; use arrow::datatypes::{Schema as ArrowSchema, SchemaRef as ArrowSchemaRef}; use arrow::record_batch::*; use bytes::Bytes; -use log::{info, warn}; +use log::*; use object_store::path::Path; use object_store::ObjectStore; use parquet::{ @@ -21,11 +21,10 @@ use super::utils::{ arrow_schema_without_partitions, next_data_path, record_batch_from_message, record_batch_without_partitions, stringified_partition_value, }; -use super::{utils::PartitionPath, DeltaWriter, DeltaWriterError}; +use super::{utils::PartitionPath, DeltaWriter, DeltaWriterError, WriteMode}; use crate::errors::DeltaTableError; use crate::kernel::{Add, StructType}; use crate::table::builder::DeltaTableBuilder; -use crate::table::DeltaTableMetaData; use crate::writer::utils::ShareableBuffer; use crate::DeltaTable; @@ -226,30 +225,6 @@ impl JsonWriter { }) } - /// Retrieves the latest schema from table, compares to the current and updates if changed. - /// When schema is updated then `true` is returned which signals the caller that parquet - /// created file or arrow batch should be revisited. - pub fn update_schema( - &mut self, - metadata: &DeltaTableMetaData, - ) -> Result { - let schema: ArrowSchema = - >::try_from(&metadata.schema)?; - - let schema_updated = self.arrow_schema_ref.as_ref() != &schema - || self.partition_columns != metadata.partition_columns; - - if schema_updated { - let _ = std::mem::replace(&mut self.arrow_schema_ref, Arc::new(schema)); - let _ = std::mem::replace( - &mut self.partition_columns, - metadata.partition_columns.clone(), - ); - } - - Ok(schema_updated) - } - /// Returns the current byte length of the in memory buffer. /// This may be used by the caller to decide when to finalize the file write. pub fn buffer_len(&self) -> usize { @@ -310,8 +285,20 @@ impl JsonWriter { #[async_trait::async_trait] impl DeltaWriter> for JsonWriter { - /// Writes the given values to internal parquet buffers for each represented partition. + /// Write a chunk of values into the internal write buffers with the default write mode async fn write(&mut self, values: Vec) -> Result<(), DeltaTableError> { + self.write_with_mode(values, WriteMode::Default).await + } + + /// Writes the given values to internal parquet buffers for each represented partition. + async fn write_with_mode( + &mut self, + values: Vec, + mode: WriteMode, + ) -> Result<(), DeltaTableError> { + if mode != WriteMode::Default { + warn!("The JsonWriter does not currently support non-default write modes, falling back to default mode"); + } let mut partial_writes: Vec<(Value, ParquetError)> = Vec::new(); let arrow_schema = self.arrow_schema(); let divided = self.divide_by_partition_values(values)?; diff --git a/crates/deltalake-core/src/writer/mod.rs b/crates/deltalake-core/src/writer/mod.rs index fd3d2ed4e7..9f402b3214 100644 --- a/crates/deltalake-core/src/writer/mod.rs +++ b/crates/deltalake-core/src/writer/mod.rs @@ -116,17 +116,34 @@ impl From for DeltaTableError { DeltaWriterError::Io { source } => DeltaTableError::Io { source }, DeltaWriterError::ObjectStore { source } => DeltaTableError::ObjectStore { source }, DeltaWriterError::Parquet { source } => DeltaTableError::Parquet { source }, + DeltaWriterError::SchemaMismatch { .. } => DeltaTableError::SchemaMismatch { + msg: err.to_string(), + }, _ => DeltaTableError::Generic(err.to_string()), } } } +/// Write mode for the [DeltaWriter] +#[derive(Clone, Debug, PartialEq)] +pub enum WriteMode { + /// Default write mode which will return an error if schemas do not match correctly + Default, + /// Merge the schema of the table with the newly written data + /// + /// [Read more here](https://delta.io/blog/2023-02-08-delta-lake-schema-evolution/) + MergeSchema, +} + #[async_trait] /// Trait for writing data to Delta tables pub trait DeltaWriter { - /// write a chunk of values into the internal write buffers. + /// Write a chunk of values into the internal write buffers with the default write mode async fn write(&mut self, values: T) -> Result<(), DeltaTableError>; + /// Wreite a chunk of values into the internal write buffers with the specified [WriteMode] + async fn write_with_mode(&mut self, values: T, mode: WriteMode) -> Result<(), DeltaTableError>; + /// Flush the internal write buffers to files in the delta table folder structure. /// The corresponding delta [`Add`] actions are returned and should be committed via a transaction. async fn flush(&mut self) -> Result, DeltaTableError>; @@ -135,26 +152,34 @@ pub trait DeltaWriter { /// and commit the changes to the Delta log, creating a new table version. async fn flush_and_commit(&mut self, table: &mut DeltaTable) -> Result { let adds: Vec<_> = self.flush().await?.drain(..).map(Action::Add).collect(); - let partition_cols = table.metadata()?.partition_columns.clone(); - let partition_by = if !partition_cols.is_empty() { - Some(partition_cols) - } else { - None - }; - let operation = DeltaOperation::Write { - mode: SaveMode::Append, - partition_by, - predicate: None, - }; - let version = commit( - table.log_store.as_ref(), - &adds, - operation, - &table.state, - None, - ) - .await?; - table.update().await?; - Ok(version) + flush_and_commit(adds, table).await } } + +/// Method for flushing to be used by writers +pub(crate) async fn flush_and_commit( + adds: Vec, + table: &mut DeltaTable, +) -> Result { + let partition_cols = table.metadata()?.partition_columns.clone(); + let partition_by = if !partition_cols.is_empty() { + Some(partition_cols) + } else { + None + }; + let operation = DeltaOperation::Write { + mode: SaveMode::Append, + partition_by, + predicate: None, + }; + let version = commit( + table.log_store.as_ref(), + &adds, + operation, + &table.state, + None, + ) + .await?; + table.update().await?; + Ok(version) +} diff --git a/crates/deltalake-core/src/writer/record_batch.rs b/crates/deltalake-core/src/writer/record_batch.rs index 07240d0335..cc4daace4f 100644 --- a/crates/deltalake-core/src/writer/record_batch.rs +++ b/crates/deltalake-core/src/writer/record_batch.rs @@ -7,13 +7,14 @@ use std::{collections::HashMap, sync::Arc}; -use arrow::array::{Array, UInt32Array}; +use arrow::array::{new_null_array, Array, UInt32Array}; use arrow::compute::{partition, take}; use arrow::record_batch::RecordBatch; use arrow_array::ArrayRef; use arrow_row::{RowConverter, SortField}; use arrow_schema::{ArrowError, Schema as ArrowSchema, SchemaRef as ArrowSchemaRef}; use bytes::Bytes; +use log::*; use object_store::{path::Path, ObjectStore}; use parquet::{arrow::ArrowWriter, errors::ParquetError}; use parquet::{basic::Compression, file::properties::WriterProperties}; @@ -24,18 +25,19 @@ use super::utils::{ arrow_schema_without_partitions, next_data_path, record_batch_without_partitions, stringified_partition_value, PartitionPath, ShareableBuffer, }; -use super::{DeltaWriter, DeltaWriterError}; +use super::{DeltaWriter, DeltaWriterError, WriteMode}; use crate::errors::DeltaTableError; -use crate::kernel::{Add, StructType}; +use crate::kernel::{Action, Add, StructType}; use crate::table::builder::DeltaTableBuilder; -use crate::table::DeltaTableMetaData; use crate::DeltaTable; /// Writes messages to a delta lake table. pub struct RecordBatchWriter { storage: Arc, - arrow_schema_ref: Arc, + arrow_schema_ref: ArrowSchemaRef, + original_schema_ref: ArrowSchemaRef, writer_properties: WriterProperties, + should_evolve: bool, partition_columns: Vec, arrow_writers: HashMap, } @@ -67,9 +69,11 @@ impl RecordBatchWriter { Ok(Self { storage, - arrow_schema_ref: schema, + arrow_schema_ref: schema.clone(), + original_schema_ref: schema, writer_properties, partition_columns: partition_columns.unwrap_or_default(), + should_evolve: false, arrow_writers: HashMap::new(), }) } @@ -91,38 +95,15 @@ impl RecordBatchWriter { Ok(Self { storage: table.object_store(), - arrow_schema_ref, + arrow_schema_ref: arrow_schema_ref.clone(), + original_schema_ref: arrow_schema_ref.clone(), writer_properties, partition_columns, + should_evolve: false, arrow_writers: HashMap::new(), }) } - /// Retrieves the latest schema from table, compares to the current and updates if changed. - /// When schema is updated then `true` is returned which signals the caller that parquet - /// created file or arrow batch should be revisited. - // TODO Test schema update scenarios - pub fn update_schema( - &mut self, - metadata: &DeltaTableMetaData, - ) -> Result { - let schema: ArrowSchema = - >::try_from(&metadata.schema)?; - - let schema_updated = self.arrow_schema_ref.as_ref() != &schema - || self.partition_columns != metadata.partition_columns; - - if schema_updated { - let _ = std::mem::replace(&mut self.arrow_schema_ref, Arc::new(schema)); - let _ = std::mem::replace( - &mut self.partition_columns, - metadata.partition_columns.clone(), - ); - } - - Ok(schema_updated) - } - /// Returns the current byte length of the in memory buffer. /// This may be used by the caller to decide when to finalize the file write. pub fn buffer_len(&self) -> usize { @@ -153,7 +134,8 @@ impl RecordBatchWriter { &mut self, record_batch: RecordBatch, partition_values: &HashMap>, - ) -> Result<(), DeltaTableError> { + mode: WriteMode, + ) -> Result { let arrow_schema = arrow_schema_without_partitions(&self.arrow_schema_ref, &self.partition_columns); let partition_key = @@ -161,22 +143,20 @@ impl RecordBatchWriter { let record_batch = record_batch_without_partitions(&record_batch, &self.partition_columns)?; - match self.arrow_writers.get_mut(&partition_key) { - Some(writer) => { - writer.write(&record_batch)?; - } + let written_schema = match self.arrow_writers.get_mut(&partition_key) { + Some(writer) => writer.write(&record_batch, mode)?, None => { let mut writer = PartitionWriter::new( arrow_schema, partition_values.clone(), self.writer_properties.clone(), )?; - writer.write(&record_batch)?; + let schema = writer.write(&record_batch, mode)?; let _ = self.arrow_writers.insert(partition_key, writer); + schema } - } - - Ok(()) + }; + Ok(written_schema) } /// Sets the writer properties for the underlying arrow writer. @@ -199,12 +179,26 @@ impl RecordBatchWriter { #[async_trait::async_trait] impl DeltaWriter for RecordBatchWriter { + /// Write a chunk of values into the internal write buffers with the default write mode + async fn write(&mut self, values: RecordBatch) -> Result<(), DeltaTableError> { + self.write_with_mode(values, WriteMode::Default).await + } /// Divides a single record batch into into multiple according to table partitioning. /// Values are written to arrow buffers, to collect data until it should be written to disk. - async fn write(&mut self, values: RecordBatch) -> Result<(), DeltaTableError> { + async fn write_with_mode( + &mut self, + values: RecordBatch, + mode: WriteMode, + ) -> Result<(), DeltaTableError> { + // Set the should_evolve flag for later in case the writer should perform schema evolution + // on its flush_and_commit + self.should_evolve = mode == WriteMode::MergeSchema; + for result in self.divide_by_partition_values(&values)? { - self.write_partition(result.record_batch, &result.partition_values) + let schema = self + .write_partition(result.record_batch, &result.partition_values, mode.clone()) .await?; + self.arrow_schema_ref = schema; } Ok(()) } @@ -234,6 +228,30 @@ impl DeltaWriter for RecordBatchWriter { } Ok(actions) } + + /// Flush the internal write buffers to files in the delta table folder structure. + /// and commit the changes to the Delta log, creating a new table version. + async fn flush_and_commit(&mut self, table: &mut DeltaTable) -> Result { + use crate::kernel::{Format, Metadata, StructType}; + let mut adds: Vec = self.flush().await?.drain(..).map(Action::Add).collect(); + + if self.arrow_schema_ref != self.original_schema_ref && self.should_evolve { + let schema: StructType = self.arrow_schema_ref.clone().try_into()?; + let schema_string: String = serde_json::to_string(&schema)?; + // TODO: Handle partition columns somehow? Can we even evolve partition columns? Maybe + // this should just propagate the existing columns in the new action + let part_cols: Vec = vec![]; + let metadata = Metadata::new( + Uuid::new_v4(), + Format::default(), + schema_string, + part_cols, + None, + ); + adds.push(Action::Metadata(metadata)); + } + super::flush_and_commit(adds, table).await + } } /// Helper container for partitioned record batches @@ -246,7 +264,7 @@ pub struct PartitionResult { } struct PartitionWriter { - arrow_schema: Arc, + arrow_schema: ArrowSchemaRef, writer_properties: WriterProperties, pub(super) buffer: ShareableBuffer, pub(super) arrow_writer: ArrowWriter, @@ -256,7 +274,7 @@ struct PartitionWriter { impl PartitionWriter { pub fn new( - arrow_schema: Arc, + arrow_schema: ArrowSchemaRef, partition_values: HashMap>, writer_properties: WriterProperties, ) -> Result { @@ -282,21 +300,57 @@ impl PartitionWriter { /// Writes the record batch in-memory and updates internal state accordingly. /// This method buffers the write stream internally so it can be invoked for many /// record batches and flushed after the appropriate number of bytes has been written. - pub fn write(&mut self, record_batch: &RecordBatch) -> Result<(), DeltaWriterError> { - if record_batch.schema() != self.arrow_schema { - return Err(DeltaWriterError::SchemaMismatch { - record_batch_schema: record_batch.schema(), - expected_schema: self.arrow_schema.clone(), - }); - } + /// + /// Returns the schema which was written by the write which can be used to understand if a + /// schema evolution has happened + pub fn write( + &mut self, + record_batch: &RecordBatch, + mode: WriteMode, + ) -> Result { + let merged_batch = if record_batch.schema() != self.arrow_schema { + match mode { + WriteMode::MergeSchema => { + debug!("The writer and record batch schemas do not match, merging"); + + let merged = ArrowSchema::try_merge(vec![ + self.arrow_schema.as_ref().clone(), + record_batch.schema().as_ref().clone(), + ])?; + self.arrow_schema = Arc::new(merged); + + let mut cols = vec![]; + for field in self.arrow_schema.fields() { + if let Some(column) = record_batch.column_by_name(field.name()) { + cols.push(column.clone()); + } else { + let null_column = + new_null_array(field.data_type(), record_batch.num_rows()); + cols.push(null_column); + } + } + Some(RecordBatch::try_new(self.arrow_schema.clone(), cols)?) + } + WriteMode::Default => { + // If the schemas didn't match then an error should be pushed up + Err(DeltaWriterError::SchemaMismatch { + record_batch_schema: record_batch.schema(), + expected_schema: self.arrow_schema.clone(), + })? + } + } + } else { + None + }; // Copy current cursor bytes so we can recover from failures let buffer_bytes = self.buffer.to_vec(); + let record_batch = merged_batch.as_ref().unwrap_or(record_batch); match self.arrow_writer.write(record_batch) { Ok(_) => { self.buffered_record_batch_count += 1; - Ok(()) + Ok(self.arrow_schema.clone()) } // If a write fails we need to reset the state of the PartitionWriter Err(e) => { @@ -398,11 +452,11 @@ fn lexsort_to_indices(arrays: &[ArrayRef]) -> UInt32Array { #[cfg(test)] mod tests { use super::*; - use crate::writer::{ - test_utils::{create_initialized_table, get_record_batch}, - utils::PartitionPath, - }; + use crate::operations::create::CreateBuilder; + use crate::writer::{test_utils::*, utils::PartitionPath}; use arrow::json::ReaderBuilder; + use arrow_array::{Int32Array, RecordBatch, StringArray}; + use arrow_schema::{DataType, Field, Schema as ArrowSchema}; use std::path::Path; #[tokio::test] @@ -590,4 +644,232 @@ mod tests { assert_eq!(ref_batch, result.record_batch); } } + + // The following sets of tests are related to #1386 and mergeSchema support + // + #[tokio::test] + async fn test_write_mismatched_schema() { + let batch = get_record_batch(None, false); + let partition_cols = vec![]; + let table = create_initialized_table(&partition_cols).await; + let mut writer = RecordBatchWriter::for_table(&table).unwrap(); + + // Write the first batch with the first schema to the table + writer.write(batch).await.unwrap(); + let adds = writer.flush().await.unwrap(); + assert_eq!(adds.len(), 1); + + // Create a second batch with a different schema + let second_schema = Arc::new(ArrowSchema::new(vec![ + Field::new("id", DataType::Int32, true), + Field::new("name", DataType::Utf8, true), + ])); + let second_batch = RecordBatch::try_new( + second_schema, + vec![ + Arc::new(Int32Array::from(vec![Some(1), Some(2)])), + Arc::new(StringArray::from(vec![Some("will"), Some("robert")])), + ], + ) + .unwrap(); + + let result = writer.write(second_batch).await; + assert!(result.is_err()); + + match result { + Ok(_) => { + assert!(false, "Should not have successfully written"); + } + Err(e) => { + match e { + DeltaTableError::SchemaMismatch { .. } => { + // this is expected + } + others => { + assert!(false, "Got the wrong error: {others:?}"); + } + } + } + }; + } + + #[tokio::test] + async fn test_write_schema_evolution() { + let batch = get_record_batch(None, false); + let table_schema = get_delta_schema(); + let table_dir = tempfile::tempdir().unwrap(); + let table_path = table_dir.path(); + + let mut table = CreateBuilder::new() + .with_location(table_path.to_str().unwrap()) + .with_table_name("test-table") + .with_comment("A table for running tests") + .with_columns(table_schema.fields().clone()) + .await + .unwrap(); + table.load().await.expect("Failed to load table"); + assert_eq!(table.version(), 0); + + let mut writer = RecordBatchWriter::for_table(&table).unwrap(); + + // Write the first batch with the first schema to the table + writer.write(batch).await.unwrap(); + let version = writer.flush_and_commit(&mut table).await.unwrap(); + assert_eq!(version, 1); + table.load().await.expect("Failed to load table"); + assert_eq!(table.version(), 1); + + // Create a second batch with a different schema + let second_schema = Arc::new(ArrowSchema::new(vec![ + Field::new("vid", DataType::Int32, true), + Field::new("name", DataType::Utf8, true), + ])); + let second_batch = RecordBatch::try_new( + second_schema, + vec![ + Arc::new(Int32Array::from(vec![Some(1), Some(2)])), // vid + Arc::new(StringArray::from(vec![Some("will"), Some("robert")])), // name + ], + ) + .unwrap(); + + let result = writer + .write_with_mode(second_batch, WriteMode::MergeSchema) + .await; + assert!( + result.is_ok(), + "Failed to write with WriteMode::MergeSchema, {:?}", + result + ); + let version = writer.flush_and_commit(&mut table).await.unwrap(); + assert_eq!(version, 2); + table.load().await.expect("Failed to load table"); + assert_eq!(table.version(), 2); + + let new_schema = table.metadata().unwrap().schema().unwrap(); + let expected_columns = vec!["id", "value", "modified", "vid", "name"]; + let found_columns: Vec<&String> = new_schema.fields().iter().map(|f| f.name()).collect(); + assert_eq!( + expected_columns, found_columns, + "The new table schema does not contain all evolved columns as expected" + ); + } + + #[tokio::test] + async fn test_schema_evolution_column_type_mismatch() { + let batch = get_record_batch(None, false); + let partition_cols = vec![]; + let mut table = create_initialized_table(&partition_cols).await; + + let mut writer = RecordBatchWriter::for_table(&table).unwrap(); + + // Write the first batch with the first schema to the table + writer.write(batch).await.unwrap(); + let version = writer.flush_and_commit(&mut table).await.unwrap(); + assert_eq!(version, 1); + + // Create a second batch with a different schema + let second_schema = Arc::new(ArrowSchema::new(vec![ + Field::new("id", DataType::Int32, true), + Field::new("name", DataType::Utf8, true), + ])); + let second_batch = RecordBatch::try_new( + second_schema, + vec![ + Arc::new(Int32Array::from(vec![Some(1), Some(2)])), // vid + Arc::new(StringArray::from(vec![Some("will"), Some("robert")])), // name + ], + ) + .unwrap(); + + let result = writer + .write_with_mode(second_batch, WriteMode::MergeSchema) + .await; + assert!( + result.is_err(), + "Did not expect to successfully add new writes with different column types: {:?}", + result + ); + } + + #[tokio::test] + async fn test_schema_evolution_with_nonnullable_col() { + use crate::kernel::{DataType as DeltaDataType, PrimitiveType, StructField, StructType}; + + let table_schema = StructType::new(vec![ + StructField::new( + "id".to_string(), + DeltaDataType::Primitive(PrimitiveType::String), + false, + ), + StructField::new( + "value".to_string(), + DeltaDataType::Primitive(PrimitiveType::Integer), + true, + ), + StructField::new( + "modified".to_string(), + DeltaDataType::Primitive(PrimitiveType::String), + true, + ), + ]); + let table_dir = tempfile::tempdir().unwrap(); + let table_path = table_dir.path(); + + let mut table = CreateBuilder::new() + .with_location(table_path.to_str().unwrap()) + .with_table_name("test-table") + .with_comment("A table for running tests") + .with_columns(table_schema.fields().clone()) + .await + .unwrap(); + table.load().await.expect("Failed to load table"); + assert_eq!(table.version(), 0); + + // Hand-crafting the first RecordBatch to ensure that a write with non-nullable columns + // works properly before attepting the second write + let arrow_schema = Arc::new(ArrowSchema::new(vec![ + Field::new("id", DataType::Utf8, false), + Field::new("value", DataType::Int32, true), + Field::new("modified", DataType::Utf8, true), + ])); + let batch = RecordBatch::try_new( + arrow_schema, + vec![ + Arc::new(StringArray::from(vec![Some("1"), Some("2")])), // id + Arc::new(new_null_array(&DataType::Int32, 2)), // value + Arc::new(new_null_array(&DataType::Utf8, 2)), // modified + ], + ) + .unwrap(); + + // Write the first batch with the first schema to the table + let mut writer = RecordBatchWriter::for_table(&table).unwrap(); + writer.write(batch).await.unwrap(); + let version = writer.flush_and_commit(&mut table).await.unwrap(); + assert_eq!(version, 1); + + // Create a second batch with a different schema + let second_schema = Arc::new(ArrowSchema::new(vec![Field::new( + "name", + DataType::Utf8, + true, + )])); + let second_batch = RecordBatch::try_new( + second_schema, + vec![ + Arc::new(StringArray::from(vec![Some("will"), Some("robert")])), // name + ], + ) + .unwrap(); + + let result = writer + .write_with_mode(second_batch, WriteMode::MergeSchema) + .await; + assert!( + result.is_err(), + "Should not have been able to write with a missing non-nullable column: {:?}", + result + ); + } }