diff --git a/rust/Cargo.toml b/rust/Cargo.toml index a78a618691..f7b807098a 100644 --- a/rust/Cargo.toml +++ b/rust/Cargo.toml @@ -13,12 +13,18 @@ readme = "README.md" edition = "2021" [dependencies] -arrow = { version = "37.0.0", optional = true } +arrow = { version = "37", optional = true } +arrow-array = { version = "37", optional = true } +arrow-cast = { version = "37", optional = true } +arrow-schema = { version = "37", optional = true } async-trait = "0.1" bytes = "1" chrono = { version = "0.4.22", default-features = false, features = ["clock"] } cfg-if = "1" -datafusion-objectstore-hdfs = { version = "0.1.3", default-features = false, features = ["hdfs3", "try_spawn_blocking"], optional = true } +datafusion-objectstore-hdfs = { version = "0.1.3", default-features = false, features = [ + "hdfs3", + "try_spawn_blocking", +], optional = true } errno = "0.3" futures = "0.3" itertools = "0.10" @@ -91,6 +97,7 @@ glibc_version = { path = "../glibc_version", version = "0.1" } [features] azure = ["object_store/azure"] +arrow = ["dep:arrow", "arrow-array", "arrow-cast", "arrow-schema"] default = ["arrow", "parquet"] datafusion = [ "dep:datafusion", diff --git a/rust/src/operations/filesystem_check.rs b/rust/src/operations/filesystem_check.rs index 26be9e9113..265b6aee50 100644 --- a/rust/src/operations/filesystem_check.rs +++ b/rust/src/operations/filesystem_check.rs @@ -32,7 +32,7 @@ use url::{ParseError, Url}; #[derive(Debug)] pub struct FileSystemCheckBuilder { /// A snapshot of the to-be-checked table's state - state: DeltaTableState, + snapshot: DeltaTableState, /// Delta object store for handling data files store: Arc, /// Don't remove actions to the table log. Just determine which files can be removed @@ -70,7 +70,7 @@ impl FileSystemCheckBuilder { /// Create a new [`FileSystemCheckBuilder`] pub fn new(store: Arc, state: DeltaTableState) -> Self { FileSystemCheckBuilder { - state, + snapshot: state, store, dry_run: false, } @@ -84,10 +84,10 @@ impl FileSystemCheckBuilder { async fn create_fsck_plan(&self) -> DeltaResult { let mut files_relative: HashMap<&str, &Add> = - HashMap::with_capacity(self.state.files().len()); + HashMap::with_capacity(self.snapshot.files().len()); let store = self.store.clone(); - for active in self.state.files() { + for active in self.snapshot.files() { if is_absolute_path(&active.path)? { return Err(DeltaTableError::Generic( "Filesystem check does not support absolute paths".to_string(), @@ -174,7 +174,7 @@ impl std::future::IntoFuture for FileSystemCheckBuilder { let plan = this.create_fsck_plan().await?; if this.dry_run { return Ok(( - DeltaTable::new_with_state(this.store, this.state), + DeltaTable::new_with_state(this.store, this.snapshot), FileSystemCheckMetrics { files_removed: plan.files_to_remove.into_iter().map(|f| f.path).collect(), dry_run: true, @@ -182,8 +182,8 @@ impl std::future::IntoFuture for FileSystemCheckBuilder { )); } - let metrics = plan.execute(&this.state).await?; - let mut table = DeltaTable::new_with_state(this.store, this.state); + let metrics = plan.execute(&this.snapshot).await?; + let mut table = DeltaTable::new_with_state(this.store, this.snapshot); table.update().await?; Ok((table, metrics)) }) diff --git a/rust/src/operations/mod.rs b/rust/src/operations/mod.rs index 1f17d4b967..47febc388e 100644 --- a/rust/src/operations/mod.rs +++ b/rust/src/operations/mod.rs @@ -109,9 +109,7 @@ impl DeltaOps { #[cfg(feature = "datafusion")] #[must_use] pub fn write(self, batches: impl IntoIterator) -> WriteBuilder { - WriteBuilder::default() - .with_input_batches(batches) - .with_object_store(self.0.object_store()) + WriteBuilder::new(self.0.object_store(), self.0.state).with_input_batches(batches) } /// Vacuum stale files from delta table diff --git a/rust/src/operations/write.rs b/rust/src/operations/write.rs index 203e0f53f9..80a90ea27e 100644 --- a/rust/src/operations/write.rs +++ b/rust/src/operations/write.rs @@ -15,8 +15,6 @@ //! In combination with `Overwrite`, a `replaceWhere` option can be used to transactionally //! replace data that matches a predicate. -// https://github.com/delta-io/delta/blob/master/core/src/main/scala/org/apache/spark/sql/delta/commands/WriteIntoDelta.scala - use std::collections::HashMap; use std::sync::Arc; use std::time::{SystemTime, UNIX_EPOCH}; @@ -25,16 +23,17 @@ use super::writer::{DeltaWriter, WriterConfig}; use super::MAX_SUPPORTED_WRITER_VERSION; use super::{transaction::commit, CreateBuilder}; use crate::action::{Action, Add, DeltaOperation, Remove, SaveMode}; -use crate::builder::DeltaTableBuilder; use crate::delta::{DeltaResult, DeltaTable, DeltaTableError}; use crate::delta_datafusion::DeltaDataChecker; use crate::schema::Schema; use crate::storage::DeltaObjectStore; +use crate::table_state::DeltaTableState; use crate::writer::record_batch::divide_by_partition_values; use crate::writer::utils::PartitionPath; -use arrow::datatypes::{DataType, SchemaRef as ArrowSchemaRef}; -use arrow::record_batch::RecordBatch; +use arrow_array::RecordBatch; +use arrow_cast::{can_cast_types, cast}; +use arrow_schema::{Schema as ArrowSchema, SchemaRef as ArrowSchemaRef}; use datafusion::execution::context::{SessionContext, SessionState, TaskContext}; use datafusion::physical_plan::{memory::MemoryExec, ExecutionPlan}; use futures::future::BoxFuture; @@ -76,12 +75,14 @@ impl From for DeltaTableError { /// Write data into a DeltaTable #[derive(Debug, Clone)] pub struct WriteBuilder { + /// A snapshot of the to-be-loaded table's state + snapshot: DeltaTableState, + /// Delta object store for handling data files + store: Arc, /// The input plan input: Option>, /// Datafusion session state relevant for executing the input plan state: Option, - /// Location where the table is stored - location: Option, /// SaveMode defines how to treat data already written to table location mode: SaveMode, /// Column names for table partitioning @@ -92,45 +93,27 @@ pub struct WriteBuilder { target_file_size: Option, /// Number of records to be written in single batch to underlying writer write_batch_size: Option, - /// An object store to be used as backend for delta table - object_store: Option>, - /// Storage options used to create a new storage backend - storage_options: Option>, /// RecordBatches to be written into the table batches: Option>, } -impl Default for WriteBuilder { - fn default() -> Self { - Self::new() - } -} - impl WriteBuilder { /// Create a new [`WriteBuilder`] - pub fn new() -> Self { + pub fn new(store: Arc, snapshot: DeltaTableState) -> Self { Self { + snapshot, + store, input: None, state: None, - location: None, mode: SaveMode::Append, partition_columns: None, predicate: None, - storage_options: None, target_file_size: None, write_batch_size: None, - object_store: None, batches: None, } } - /// Specify the path to the location where table data is stored, - /// which could be a path on distributed storage. - pub fn with_location(mut self, location: impl Into) -> Self { - self.location = Some(location.into()); - self - } - /// Specify the behavior when a table exists at location pub fn with_save_mode(mut self, save_mode: SaveMode) -> Self { self.mode = save_mode; @@ -171,20 +154,6 @@ impl WriteBuilder { self } - /// Set options used to initialize storage backend - /// - /// Options may be passed in the HashMap or set as environment variables. - pub fn with_storage_options(mut self, storage_options: HashMap) -> Self { - self.storage_options = Some(storage_options); - self - } - - /// Provide a [`DeltaObjectStore`] instance, that points at table location - pub fn with_object_store(mut self, object_store: Arc) -> Self { - self.object_store = Some(object_store); - self - } - /// Specify the target file size for data files written to the delta table. pub fn with_target_file_size(mut self, target_file_size: usize) -> Self { self.target_file_size = Some(target_file_size); @@ -196,6 +165,44 @@ impl WriteBuilder { self.write_batch_size = Some(write_batch_size); self } + + async fn check_preconditions(&self) -> DeltaResult> { + match self.store.is_delta_table_location().await? { + true => { + let min_writer = self.snapshot.min_writer_version(); + if min_writer > MAX_SUPPORTED_WRITER_VERSION { + Err(WriteError::UnsupportedWriterVersion(min_writer).into()) + } else { + match self.mode { + SaveMode::ErrorIfExists => { + Err(WriteError::AlreadyExists(self.store.root_uri()).into()) + } + _ => Ok(vec![]), + } + } + } + false => { + let schema: Schema = if let Some(plan) = &self.input { + Ok(plan.schema().try_into()?) + } else if let Some(batches) = &self.batches { + if batches.is_empty() { + return Err(WriteError::MissingData.into()); + } + Ok(batches[0].schema().try_into()?) + } else { + Err(WriteError::MissingData) + }?; + let mut builder = CreateBuilder::new() + .with_object_store(self.store.clone()) + .with_columns(schema.get_fields().clone()); + if let Some(partition_columns) = self.partition_columns.as_ref() { + builder = builder.with_partition_columns(partition_columns.clone()) + } + let (_, actions, _) = builder.into_table_and_actions()?; + Ok(actions) + } + } + } } impl std::future::IntoFuture for WriteBuilder { @@ -203,83 +210,30 @@ impl std::future::IntoFuture for WriteBuilder { type IntoFuture = BoxFuture<'static, Self::Output>; fn into_future(self) -> Self::IntoFuture { - let this = self; - - fn schema_to_vec_name_type(schema: ArrowSchemaRef) -> Vec<(String, DataType)> { - schema - .fields() - .iter() - .map(|f| (f.name().to_owned(), f.data_type().clone())) - .collect::>() - } - - fn schema_eq(l: ArrowSchemaRef, r: ArrowSchemaRef) -> bool { - schema_to_vec_name_type(l) == schema_to_vec_name_type(r) - } + let mut this = self; Box::pin(async move { - let object_store = if let Some(store) = this.object_store { - Ok(store) - } else { - DeltaTableBuilder::from_uri(this.location.unwrap()) - .with_storage_options(this.storage_options.unwrap_or_default()) - .build_storage() - }?; + // Create table actions to initialize table in case it does not yet exist and should be created + let mut actions = this.check_preconditions().await?; - // TODO we can find a more optimized config. Of course we want to pass in the state anyhow.. - let mut table = DeltaTable::new(object_store.clone(), Default::default()); - let mut actions = match table.load().await { - Err(DeltaTableError::NotATable(_)) => { - let schema: Schema = if let Some(plan) = &this.input { - Ok(plan.schema().try_into()?) - } else if let Some(batches) = &this.batches { - if batches.is_empty() { - return Err(WriteError::MissingData.into()); - } - Ok(batches[0].schema().try_into()?) - } else { - Err(WriteError::MissingData) - }?; - let mut builder = CreateBuilder::new() - .with_object_store(table.object_store()) - .with_columns(schema.get_fields().clone()); - if let Some(partition_columns) = this.partition_columns.as_ref() { - builder = builder.with_partition_columns(partition_columns.clone()) - } - let (_, actions, _) = builder.into_table_and_actions()?; - Ok(actions) - } - Ok(_) => { - if table.get_min_writer_version() > MAX_SUPPORTED_WRITER_VERSION { - Err( - WriteError::UnsupportedWriterVersion(table.get_min_writer_version()) - .into(), - ) - } else { - match this.mode { - SaveMode::ErrorIfExists => { - Err(WriteError::AlreadyExists(table.table_uri()).into()) - } - _ => Ok(vec![]), - } - } - } - Err(err) => Err(err), - }?; + let active_partitions = this + .snapshot + .current_metadata() + .map(|meta| meta.partition_columns.clone()); // validate partition columns - let partition_columns = if let Ok(meta) = table.get_metadata() { + let partition_columns = if let Some(active_part) = active_partitions { if let Some(ref partition_columns) = this.partition_columns { - if &meta.partition_columns != partition_columns { + if &active_part != partition_columns { Err(WriteError::PartitionColumnMismatch { - expected: table.get_metadata()?.partition_columns.clone(), + expected: active_part, got: partition_columns.to_vec(), }) } else { Ok(partition_columns.clone()) } } else { - Ok(meta.partition_columns.clone()) + Ok(active_part) } } else { Ok(this.partition_columns.unwrap_or_default()) @@ -292,18 +246,17 @@ impl std::future::IntoFuture for WriteBuilder { Err(WriteError::MissingData) } else { let schema = batches[0].schema(); - - if let Ok(meta) = table.get_metadata() { - // NOTE the schema generated from the delta schema will have the delta field metadata included, - // so we need to compare the field names and datatypes instead. - // TODO update comparison logic, once we have column mappings supported. - let curr_schema: ArrowSchemaRef = Arc::new((&meta.schema).try_into()?); - - if !schema_eq(curr_schema, schema.clone()) { - return Err(DeltaTableError::Generic( - "Updating table schema not yet implemented".to_string(), - )); - } + let table_schema = this + .snapshot + .physical_arrow_schema(this.store.clone()) + .await + .or_else(|_| this.snapshot.arrow_schema()) + .unwrap_or(schema.clone()); + + if !can_cast_batch(schema.as_ref(), table_schema.as_ref()) { + return Err(DeltaTableError::Generic( + "Updating table schema not yet implemented".to_string(), + )); }; let data = if !partition_columns.is_empty() { @@ -314,8 +267,7 @@ impl std::future::IntoFuture for WriteBuilder { schema.clone(), partition_columns.clone(), &batch, - ) - .unwrap(); + )?; for part in divided { let key = PartitionPath::from_hashmap( &partition_columns, @@ -345,9 +297,10 @@ impl std::future::IntoFuture for WriteBuilder { Err(WriteError::MissingData) }?; - let invariants = table - .get_metadata() - .and_then(|meta| meta.schema.get_invariants()) + let invariants = this + .snapshot + .current_metadata() + .and_then(|meta| meta.schema.get_invariants().ok()) .unwrap_or_default(); let checker = DeltaDataChecker::new(invariants); @@ -368,15 +321,17 @@ impl std::future::IntoFuture for WriteBuilder { this.target_file_size, this.write_batch_size, ); - let mut writer = DeltaWriter::new(object_store.clone(), config); + let mut writer = DeltaWriter::new(this.store.clone(), config); let checker_stream = checker.clone(); + let schema = inner_plan.schema().clone(); let mut stream = inner_plan.execute(i, task_ctx)?; let handle: tokio::task::JoinHandle>> = tokio::task::spawn(async move { while let Some(maybe_batch) = stream.next().await { let batch = maybe_batch?; checker_stream.check_batch(&batch).await?; - writer.write(&batch).await?; + let arr = cast_record_batch(&batch, schema.clone())?; + writer.write(&arr).await?; } writer.close().await }); @@ -396,6 +351,7 @@ impl std::future::IntoFuture for WriteBuilder { .into_iter() .map(Action::add) .collect::>(); + actions.extend(add_actions); // Collect remove actions if we are overwriting the table @@ -424,8 +380,8 @@ impl std::future::IntoFuture for WriteBuilder { todo!("Overwriting data based on predicate is not yet implemented") } _ => { - let remove_actions = table - .get_state() + let remove_actions = this + .snapshot .files() .iter() .map(to_remove_action) @@ -435,34 +391,69 @@ impl std::future::IntoFuture for WriteBuilder { } }; - // Finally, commit ... - let operation = DeltaOperation::Write { - mode: this.mode, - partition_by: if !partition_columns.is_empty() { - Some(partition_columns) - } else { - None - }, - predicate: this.predicate, - }; - let _version = commit( - table.storage.as_ref(), + let version = commit( + this.store.as_ref(), &actions, - operation, - &table.state, + DeltaOperation::Write { + mode: this.mode, + partition_by: if !partition_columns.is_empty() { + Some(partition_columns) + } else { + None + }, + predicate: this.predicate, + }, + &this.snapshot, // TODO pass through metadata None, ) .await?; - table.update().await?; + + // TODO we do not have the table config available, but since we are merging only our newly + // created actions, it may be safe to assume, that we want to include all actions. + // then again, having only some tombstones may be misleading. + this.snapshot + .merge(DeltaTableState::from_actions(actions, version)?, true, true); // TODO should we build checkpoints based on config? - Ok(table) + Ok(DeltaTable::new_with_state(this.store, this.snapshot)) }) } } +fn can_cast_batch(from_schema: &ArrowSchema, to_schema: &ArrowSchema) -> bool { + if from_schema.fields.len() != to_schema.fields.len() { + return false; + } + from_schema.all_fields().iter().all(|f| { + if let Ok(target_field) = to_schema.field_with_name(f.name()) { + can_cast_types(f.data_type(), target_field.data_type()) + } else { + false + } + }) +} + +fn cast_record_batch( + batch: &RecordBatch, + target_schema: ArrowSchemaRef, +) -> DeltaResult { + let columns = target_schema + .all_fields() + .iter() + .map(|f| { + let col = batch.column_by_name(f.name()).unwrap(); + if !col.data_type().equals_datatype(f.data_type()) { + cast(col, f.data_type()) + } else { + Ok(col.clone()) + } + }) + .collect::, _>>()?; + Ok(RecordBatch::try_new(target_schema, columns)?) +} + #[cfg(test)] mod tests { use super::*; diff --git a/rust/tests/datafusion_test.rs b/rust/tests/datafusion_test.rs index 23ef3a770f..0c829dc117 100644 --- a/rust/tests/datafusion_test.rs +++ b/rust/tests/datafusion_test.rs @@ -188,13 +188,14 @@ async fn test_datafusion_write_from_serialized_delta_scan() -> Result<()> { // Trying to execute the write from the input plan without providing Datafusion with a session // state containing the referenced object store in the registry results in an error. - assert!(WriteBuilder::new() - .with_input_execution_plan(source_scan.clone()) - .with_object_store(target_table.object_store()) - .await - .unwrap_err() - .to_string() - .contains("No suitable object store found for delta-rs://")); + assert!( + WriteBuilder::new(target_table.object_store(), target_table.state.clone()) + .with_input_execution_plan(source_scan.clone()) + .await + .unwrap_err() + .to_string() + .contains("No suitable object store found for delta-rs://") + ); // Register the missing source table object store let source_uri = source_scan @@ -212,10 +213,9 @@ async fn test_datafusion_write_from_serialized_delta_scan() -> Result<()> { .register_object_store(source_store_url, Arc::from(source_store)); // Execute write to the target table with the proper state - let target_table = WriteBuilder::new() + let target_table = WriteBuilder::new(target_table.object_store(), target_table.state.clone()) .with_input_execution_plan(source_scan) .with_input_session_state(state) - .with_object_store(target_table.object_store()) .await?; ctx.register_table("target", Arc::new(target_table))?;