From 0827feaa9bc680dce3cb64d7df31b4057652080c Mon Sep 17 00:00:00 2001 From: Robert Pack <42610831+roeap@users.noreply.github.com> Date: Mon, 4 Dec 2023 17:35:14 +0100 Subject: [PATCH] refactor: prefer usage of metadata and protocol fields (#1935) --- .../src/delta_datafusion/mod.rs | 5 +- .../src/kernel/actions/schemas.rs | 1 + .../src/kernel/actions/types.rs | 3 ++ crates/deltalake-core/src/lib.rs | 52 ++++++++++--------- .../src/operations/convert_to_delta.rs | 3 +- .../deltalake-core/src/operations/create.rs | 22 ++++---- .../deltalake-core/src/operations/restore.rs | 12 ++--- .../src/operations/transaction/test_utils.rs | 1 + .../src/protocol/checkpoints.rs | 4 +- crates/deltalake-core/src/protocol/mod.rs | 2 + .../src/protocol/parquet_read/mod.rs | 1 + crates/deltalake-core/src/storage/utils.rs | 1 + crates/deltalake-core/src/table/builder.rs | 9 ++-- crates/deltalake-core/src/table/mod.rs | 26 +++++++++- crates/deltalake-core/src/table/state.rs | 28 +++++++--- crates/deltalake-core/src/writer/json.rs | 4 +- crates/deltalake-core/src/writer/mod.rs | 2 +- .../deltalake-core/src/writer/record_batch.rs | 4 +- crates/deltalake-core/src/writer/stats.rs | 1 + crates/deltalake-core/tests/common/mod.rs | 1 + crates/deltalake-core/tests/fs_common/mod.rs | 1 + .../tests/integration_concurrent_writes.rs | 5 +- .../deltalake-core/tests/integration_read.rs | 16 +++--- .../tests/read_delta_log_test.rs | 6 +-- .../deltalake/examples/recordbatch-writer.rs | 4 +- python/src/lib.rs | 29 +++++------ 26 files changed, 146 insertions(+), 97 deletions(-) diff --git a/crates/deltalake-core/src/delta_datafusion/mod.rs b/crates/deltalake-core/src/delta_datafusion/mod.rs index fba8acd313..973d575904 100644 --- a/crates/deltalake-core/src/delta_datafusion/mod.rs +++ b/crates/deltalake-core/src/delta_datafusion/mod.rs @@ -246,7 +246,7 @@ fn get_prune_stats(table: &DeltaTable, column: &Column, get_max: bool) -> Option } let data_type = field.data_type().try_into().ok()?; - let partition_columns = &table.get_metadata().ok()?.partition_columns; + let partition_columns = &table.metadata().ok()?.partition_columns; let values = table.get_state().files().iter().map(|add| { if partition_columns.contains(&column.name) { @@ -310,7 +310,7 @@ impl PruningStatistics for DeltaTable { /// /// Note: the returned array must contain `num_containers()` rows. fn null_counts(&self, column: &Column) -> Option { - let partition_columns = &self.get_metadata().ok()?.partition_columns; + let partition_columns = &self.metadata().ok()?.partition_columns; let values = self.get_state().files().iter().map(|add| { if let Ok(Some(statistics)) = add.get_stats() { @@ -1602,6 +1602,7 @@ mod tests { tags: None, base_row_id: None, default_row_commit_version: None, + clustering_provider: None, }; let schema = ArrowSchema::new(vec![ Field::new("year", ArrowDataType::Int64, true), diff --git a/crates/deltalake-core/src/kernel/actions/schemas.rs b/crates/deltalake-core/src/kernel/actions/schemas.rs index ad3e3ccbad..d8f8438438 100644 --- a/crates/deltalake-core/src/kernel/actions/schemas.rs +++ b/crates/deltalake-core/src/kernel/actions/schemas.rs @@ -112,6 +112,7 @@ lazy_static! { deletion_vector_field(), StructField::new("baseRowId", DataType::long(), true), StructField::new("defaultRowCommitVersion", DataType::long(), true), + StructField::new("clusteringProvider", DataType::string(), true), ]))), true, ); diff --git a/crates/deltalake-core/src/kernel/actions/types.rs b/crates/deltalake-core/src/kernel/actions/types.rs index aa60823e4a..f38cbd51b2 100644 --- a/crates/deltalake-core/src/kernel/actions/types.rs +++ b/crates/deltalake-core/src/kernel/actions/types.rs @@ -583,6 +583,9 @@ pub struct Add { /// First commit version in which an add action with the same path was committed to the table. pub default_row_commit_version: Option, + /// The name of the clustering implementation + pub clustering_provider: Option, + // TODO remove migration filds added to not do too many business logic changes in one PR /// Partition values stored in raw parquet struct format. In this struct, the column names /// correspond to the partition columns and the values are stored in their corresponding data diff --git a/crates/deltalake-core/src/lib.rs b/crates/deltalake-core/src/lib.rs index fda54ccc01..312e4d6429 100644 --- a/crates/deltalake-core/src/lib.rs +++ b/crates/deltalake-core/src/lib.rs @@ -196,6 +196,8 @@ pub fn crate_version() -> &'static str { #[cfg(test)] mod tests { + use itertools::Itertools; + use super::*; use crate::table::PeekCommit; use std::collections::HashMap; @@ -204,10 +206,10 @@ mod tests { async fn read_delta_2_0_table_without_version() { let table = crate::open_table("./tests/data/delta-0.2.0").await.unwrap(); assert_eq!(table.version(), 3); - assert_eq!(table.get_min_writer_version(), 2); - assert_eq!(table.get_min_reader_version(), 1); + assert_eq!(table.protocol().min_writer_version, 2); + assert_eq!(table.protocol().min_reader_version, 1); assert_eq!( - table.get_files(), + table.get_files_iter().collect_vec(), vec![ Path::from("part-00000-cb6b150b-30b8-4662-ad28-ff32ddab96d2-c000.snappy.parquet"), Path::from("part-00000-7c2deba3-1994-4fb8-bc07-d46c948aa415-c000.snappy.parquet"), @@ -241,8 +243,8 @@ mod tests { table_to_update.update().await.unwrap(); assert_eq!( - table_newest_version.get_files(), - table_to_update.get_files() + table_newest_version.get_files_iter().collect_vec(), + table_to_update.get_files_iter().collect_vec() ); } #[tokio::test] @@ -251,10 +253,10 @@ mod tests { .await .unwrap(); assert_eq!(table.version(), 0); - assert_eq!(table.get_min_writer_version(), 2); - assert_eq!(table.get_min_reader_version(), 1); + assert_eq!(table.protocol().min_writer_version, 2); + assert_eq!(table.protocol().min_reader_version, 1); assert_eq!( - table.get_files(), + table.get_files_iter().collect_vec(), vec![ Path::from("part-00000-b44fcdb0-8b06-4f3a-8606-f8311a96f6dc-c000.snappy.parquet"), Path::from("part-00001-185eca06-e017-4dea-ae49-fc48b973e37e-c000.snappy.parquet"), @@ -265,10 +267,10 @@ mod tests { .await .unwrap(); assert_eq!(table.version(), 2); - assert_eq!(table.get_min_writer_version(), 2); - assert_eq!(table.get_min_reader_version(), 1); + assert_eq!(table.protocol().min_writer_version, 2); + assert_eq!(table.protocol().min_reader_version, 1); assert_eq!( - table.get_files(), + table.get_files_iter().collect_vec(), vec![ Path::from("part-00000-7c2deba3-1994-4fb8-bc07-d46c948aa415-c000.snappy.parquet"), Path::from("part-00001-c373a5bd-85f0-4758-815e-7eb62007a15c-c000.snappy.parquet"), @@ -279,10 +281,10 @@ mod tests { .await .unwrap(); assert_eq!(table.version(), 3); - assert_eq!(table.get_min_writer_version(), 2); - assert_eq!(table.get_min_reader_version(), 1); + assert_eq!(table.protocol().min_writer_version, 2); + assert_eq!(table.protocol().min_reader_version, 1); assert_eq!( - table.get_files(), + table.get_files_iter().collect_vec(), vec![ Path::from("part-00000-cb6b150b-30b8-4662-ad28-ff32ddab96d2-c000.snappy.parquet"), Path::from("part-00000-7c2deba3-1994-4fb8-bc07-d46c948aa415-c000.snappy.parquet"), @@ -295,10 +297,10 @@ mod tests { async fn read_delta_8_0_table_without_version() { let table = crate::open_table("./tests/data/delta-0.8.0").await.unwrap(); assert_eq!(table.version(), 1); - assert_eq!(table.get_min_writer_version(), 2); - assert_eq!(table.get_min_reader_version(), 1); + assert_eq!(table.protocol().min_writer_version, 2); + assert_eq!(table.protocol().min_reader_version, 1); assert_eq!( - table.get_files(), + table.get_files_iter().collect_vec(), vec![ Path::from("part-00000-c9b90f86-73e6-46c8-93ba-ff6bfaf892a1-c000.snappy.parquet"), Path::from("part-00000-04ec9591-0b73-459e-8d18-ba5711d6cbe1-c000.snappy.parquet") @@ -341,10 +343,10 @@ mod tests { async fn read_delta_8_0_table_with_load_version() { let mut table = crate::open_table("./tests/data/delta-0.8.0").await.unwrap(); assert_eq!(table.version(), 1); - assert_eq!(table.get_min_writer_version(), 2); - assert_eq!(table.get_min_reader_version(), 1); + assert_eq!(table.protocol().min_writer_version, 2); + assert_eq!(table.protocol().min_reader_version, 1); assert_eq!( - table.get_files(), + table.get_files_iter().collect_vec(), vec![ Path::from("part-00000-c9b90f86-73e6-46c8-93ba-ff6bfaf892a1-c000.snappy.parquet"), Path::from("part-00000-04ec9591-0b73-459e-8d18-ba5711d6cbe1-c000.snappy.parquet"), @@ -352,10 +354,10 @@ mod tests { ); table.load_version(0).await.unwrap(); assert_eq!(table.version(), 0); - assert_eq!(table.get_min_writer_version(), 2); - assert_eq!(table.get_min_reader_version(), 1); + assert_eq!(table.protocol().min_writer_version, 2); + assert_eq!(table.protocol().min_reader_version, 1); assert_eq!( - table.get_files(), + table.get_files_iter().collect_vec(), vec![ Path::from("part-00000-c9b90f86-73e6-46c8-93ba-ff6bfaf892a1-c000.snappy.parquet"), Path::from("part-00001-911a94a2-43f6-4acb-8620-5e68c2654989-c000.snappy.parquet"), @@ -483,7 +485,7 @@ mod tests { .unwrap(); assert_eq!( - table.get_files(), + table.get_files_iter().collect_vec(), vec![ Path::parse( "x=A%2FA/part-00007-b350e235-2832-45df-9918-6cab4f7578f7.c000.snappy.parquet" @@ -683,7 +685,7 @@ mod tests { .unwrap(); assert_eq!(table.version(), 2); assert_eq!( - table.get_files(), + table.get_files_iter().collect_vec(), vec![Path::from( "part-00000-7444aec4-710a-4a4c-8abe-3323499043e9.c000.snappy.parquet" ),] diff --git a/crates/deltalake-core/src/operations/convert_to_delta.rs b/crates/deltalake-core/src/operations/convert_to_delta.rs index 644591727c..5aba3bc4f8 100644 --- a/crates/deltalake-core/src/operations/convert_to_delta.rs +++ b/crates/deltalake-core/src/operations/convert_to_delta.rs @@ -405,6 +405,7 @@ mod tests { storage::config::StorageOptions, Path, }; + use itertools::Itertools; use pretty_assertions::assert_eq; use std::fs; use tempfile::tempdir; @@ -501,7 +502,7 @@ mod tests { "Testing location: {test_data_from:?}" ); - let mut files = table.get_files(); + let mut files = table.get_files_iter().collect_vec(); files.sort(); assert_eq!( files, expected_paths, diff --git a/crates/deltalake-core/src/operations/create.rs b/crates/deltalake-core/src/operations/create.rs index 84c2e03627..69bae07e75 100644 --- a/crates/deltalake-core/src/operations/create.rs +++ b/crates/deltalake-core/src/operations/create.rs @@ -342,7 +342,7 @@ mod tests { .await .unwrap(); assert_eq!(table.version(), 0); - assert_eq!(table.get_metadata().unwrap().schema, table_schema) + assert_eq!(table.get_schema().unwrap(), &table_schema) } #[tokio::test] @@ -362,7 +362,7 @@ mod tests { .await .unwrap(); assert_eq!(table.version(), 0); - assert_eq!(table.get_metadata().unwrap().schema, table_schema) + assert_eq!(table.get_schema().unwrap(), &table_schema) } #[tokio::test] @@ -391,14 +391,14 @@ mod tests { .unwrap(); assert_eq!(table.version(), 0); assert_eq!( - table.get_min_reader_version(), + table.protocol().min_reader_version, PROTOCOL.default_reader_version() ); assert_eq!( - table.get_min_writer_version(), + table.protocol().min_writer_version, PROTOCOL.default_writer_version() ); - assert_eq!(table.schema().unwrap(), &schema); + assert_eq!(table.get_schema().unwrap(), &schema); // check we can overwrite default settings via adding actions let protocol = Protocol { @@ -413,8 +413,8 @@ mod tests { .with_actions(vec![Action::Protocol(protocol)]) .await .unwrap(); - assert_eq!(table.get_min_reader_version(), 0); - assert_eq!(table.get_min_writer_version(), 0); + assert_eq!(table.protocol().min_reader_version, 0); + assert_eq!(table.protocol().min_writer_version, 0); let table = CreateBuilder::new() .with_location("memory://") @@ -423,7 +423,7 @@ mod tests { .await .unwrap(); let append = table - .get_metadata() + .metadata() .unwrap() .configuration .get(DeltaConfigKey::AppendOnly.as_ref()) @@ -445,7 +445,7 @@ mod tests { .await .unwrap(); assert_eq!(table.version(), 0); - let first_id = table.get_metadata().unwrap().id.clone(); + let first_id = table.metadata().unwrap().id.clone(); let log_store = table.log_store; @@ -464,7 +464,7 @@ mod tests { .with_save_mode(SaveMode::Ignore) .await .unwrap(); - assert_eq!(table.get_metadata().unwrap().id, first_id); + assert_eq!(table.metadata().unwrap().id, first_id); // Check table is overwritten let table = CreateBuilder::new() @@ -473,6 +473,6 @@ mod tests { .with_save_mode(SaveMode::Overwrite) .await .unwrap(); - assert_ne!(table.get_metadata().unwrap().id, first_id) + assert_ne!(table.metadata().unwrap().id, first_id) } } diff --git a/crates/deltalake-core/src/operations/restore.rs b/crates/deltalake-core/src/operations/restore.rs index db0959e9c7..a10248bcb0 100644 --- a/crates/deltalake-core/src/operations/restore.rs +++ b/crates/deltalake-core/src/operations/restore.rs @@ -207,27 +207,27 @@ async fn execute( let mut actions = vec![]; let protocol = if protocol_downgrade_allowed { Protocol { - min_reader_version: table.get_min_reader_version(), - min_writer_version: table.get_min_writer_version(), + min_reader_version: table.protocol().min_reader_version, + min_writer_version: table.protocol().min_writer_version, writer_features: if snapshot.protocol().min_writer_version < 7 { None } else { - table.get_writer_features().cloned() + table.protocol().writer_features.clone() }, reader_features: if snapshot.protocol().min_reader_version < 3 { None } else { - table.get_reader_features().cloned() + table.protocol().reader_features.clone() }, } } else { Protocol { min_reader_version: max( - table.get_min_reader_version(), + table.protocol().min_reader_version, snapshot.protocol().min_reader_version, ), min_writer_version: max( - table.get_min_writer_version(), + table.protocol().min_writer_version, snapshot.protocol().min_writer_version, ), writer_features: snapshot.protocol().writer_features.clone(), diff --git a/crates/deltalake-core/src/operations/transaction/test_utils.rs b/crates/deltalake-core/src/operations/transaction/test_utils.rs index 2efdcde2ea..ccb0e090f0 100644 --- a/crates/deltalake-core/src/operations/transaction/test_utils.rs +++ b/crates/deltalake-core/src/operations/transaction/test_utils.rs @@ -30,6 +30,7 @@ pub fn create_add_action( default_row_commit_version: None, tags: None, deletion_vector: None, + clustering_provider: None, }) } diff --git a/crates/deltalake-core/src/protocol/checkpoints.rs b/crates/deltalake-core/src/protocol/checkpoints.rs index 7ec06db9fc..55b36a64e1 100644 --- a/crates/deltalake-core/src/protocol/checkpoints.rs +++ b/crates/deltalake-core/src/protocol/checkpoints.rs @@ -519,7 +519,7 @@ mod tests { .await .unwrap(); assert_eq!(table.version(), 0); - assert_eq!(table.get_metadata().unwrap().schema, table_schema); + assert_eq!(table.get_schema().unwrap(), &table_schema); let res = create_checkpoint_for(0, table.get_state(), table.log_store.as_ref()).await; assert!(res.is_ok()); @@ -548,7 +548,7 @@ mod tests { .await .unwrap(); assert_eq!(table.version(), 0); - assert_eq!(table.get_metadata().unwrap().schema, table_schema); + assert_eq!(table.get_schema().unwrap(), &table_schema); match create_checkpoint_for(1, table.get_state(), table.log_store.as_ref()).await { Ok(_) => { /* diff --git a/crates/deltalake-core/src/protocol/mod.rs b/crates/deltalake-core/src/protocol/mod.rs index 56edde7f78..49881187c9 100644 --- a/crates/deltalake-core/src/protocol/mod.rs +++ b/crates/deltalake-core/src/protocol/mod.rs @@ -721,6 +721,7 @@ mod tests { modification_time: 0, base_row_id: None, default_row_commit_version: None, + clustering_provider: None, }; let stats = action.get_stats().unwrap().unwrap(); @@ -796,6 +797,7 @@ mod tests { modification_time: 0, base_row_id: None, default_row_commit_version: None, + clustering_provider: None, }; let stats = action.get_stats().unwrap().unwrap(); diff --git a/crates/deltalake-core/src/protocol/parquet_read/mod.rs b/crates/deltalake-core/src/protocol/parquet_read/mod.rs index e89c73d4bd..21ad2bdff8 100644 --- a/crates/deltalake-core/src/protocol/parquet_read/mod.rs +++ b/crates/deltalake-core/src/protocol/parquet_read/mod.rs @@ -113,6 +113,7 @@ impl Add { base_row_id: None, default_row_commit_version: None, tags: None, + clustering_provider: None, }; for (i, (name, _)) in record.get_column_iter().enumerate() { diff --git a/crates/deltalake-core/src/storage/utils.rs b/crates/deltalake-core/src/storage/utils.rs index 768664b97b..7b8e76c47d 100644 --- a/crates/deltalake-core/src/storage/utils.rs +++ b/crates/deltalake-core/src/storage/utils.rs @@ -118,6 +118,7 @@ mod tests { deletion_vector: None, partition_values_parsed: None, stats_parsed: None, + clustering_provider: None, }; let meta: ObjectMeta = (&add).try_into().unwrap(); diff --git a/crates/deltalake-core/src/table/builder.rs b/crates/deltalake-core/src/table/builder.rs index f453c895f6..81be85c423 100644 --- a/crates/deltalake-core/src/table/builder.rs +++ b/crates/deltalake-core/src/table/builder.rs @@ -489,6 +489,7 @@ pub fn ensure_table_uri(table_uri: impl AsRef) -> DeltaResult { #[cfg(test)] mod tests { use super::*; + use itertools::Itertools; use object_store::path::Path; #[test] @@ -607,7 +608,7 @@ mod tests { ); assert_eq!( - table.get_files(), + table.get_files_iter().collect_vec(), vec![ Path::from("part-00000-c9b90f86-73e6-46c8-93ba-ff6bfaf892a1-c000.snappy.parquet"), Path::from("part-00000-04ec9591-0b73-459e-8d18-ba5711d6cbe1-c000.snappy.parquet") @@ -623,7 +624,7 @@ mod tests { .await .unwrap(); - assert!(table.get_files().is_empty(), "files should be empty"); + assert_eq!(table.get_files_iter().count(), 0, "files should be empty"); assert!( table.get_tombstones().next().is_none(), "tombstones should be empty" @@ -640,7 +641,7 @@ mod tests { .unwrap(); assert_eq!(table.version(), 0); - assert!(table.get_files().is_empty(), "files should be empty"); + assert_eq!(table.get_files_iter().count(), 0, "files should be empty"); assert!( table.get_tombstones().next().is_none(), "tombstones should be empty" @@ -648,7 +649,7 @@ mod tests { table.update().await.unwrap(); assert_eq!(table.version(), 1); - assert!(table.get_files().is_empty(), "files should be empty"); + assert_eq!(table.get_files_iter().count(), 0, "files should be empty"); assert!( table.get_tombstones().next().is_none(), "tombstones should be empty" diff --git a/crates/deltalake-core/src/table/mod.rs b/crates/deltalake-core/src/table/mod.rs index 4d55d3e462..f7a9a1b8c6 100644 --- a/crates/deltalake-core/src/table/mod.rs +++ b/crates/deltalake-core/src/table/mod.rs @@ -22,8 +22,8 @@ use self::builder::DeltaTableConfig; use self::state::DeltaTableState; use crate::errors::DeltaTableError; use crate::kernel::{ - Action, Add, CommitInfo, DataType, Format, Metadata, ReaderFeatures, Remove, StructType, - WriterFeatures, + Action, Add, CommitInfo, DataType, Format, Metadata, Protocol, ReaderFeatures, Remove, + StructType, WriterFeatures, }; use crate::logstore::LogStoreConfig; use crate::logstore::LogStoreRef; @@ -435,6 +435,7 @@ impl DeltaTable { Ok(()) } + /// returns the latest available version of the table pub async fn get_latest_version(&mut self) -> Result { self.log_store.get_latest_version(self.version()).await @@ -727,12 +728,14 @@ impl DeltaTable { } /// Returns a collection of file names present in the loaded state + #[deprecated(since = "0.17.0", note = "use get_files_iter() instead")] #[inline] pub fn get_files(&self) -> Vec { self.state.file_paths_iter().collect() } /// Returns file names present in the loaded state in HashSet + #[deprecated(since = "0.17.0", note = "use get_files_iter() instead")] pub fn get_file_set(&self) -> HashSet { self.state.file_paths_iter().collect() } @@ -764,7 +767,18 @@ impl DeltaTable { &self.state } + /// Returns current table protocol + pub fn protocol(&self) -> &Protocol { + self.state.protocol() + } + + /// Returns the metadata associated with the loaded state. + pub fn metadata(&self) -> Result<&Metadata, DeltaTableError> { + Ok(self.state.metadata_action()?) + } + /// Returns the metadata associated with the loaded state. + #[deprecated(since = "0.17.0", note = "use metadata() instead")] pub fn get_metadata(&self) -> Result<&DeltaTableMetaData, DeltaTableError> { self.state.metadata().ok_or(DeltaTableError::NoMetadata) } @@ -781,22 +795,26 @@ impl DeltaTable { /// Returns the minimum reader version supported by the DeltaTable based on the loaded /// metadata. + #[deprecated(since = "0.17.0", note = "use protocol().min_reader_version instead")] pub fn get_min_reader_version(&self) -> i32 { self.state.protocol().min_reader_version } /// Returns the minimum writer version supported by the DeltaTable based on the loaded /// metadata. + #[deprecated(since = "0.17.0", note = "use protocol().min_writer_version instead")] pub fn get_min_writer_version(&self) -> i32 { self.state.protocol().min_writer_version } /// Returns current supported reader features by this table + #[deprecated(since = "0.17.0", note = "use protocol().reader_features instead")] pub fn get_reader_features(&self) -> Option<&HashSet> { self.state.protocol().reader_features.as_ref() } /// Returns current supported writer features by this table + #[deprecated(since = "0.17.0", note = "use protocol().writer_features instead")] pub fn get_writer_features(&self) -> Option<&HashSet> { self.state.protocol().writer_features.as_ref() } @@ -814,6 +832,10 @@ impl DeltaTable { } /// Return the tables configurations that are encapsulated in the DeltaTableStates currentMetaData field + #[deprecated( + since = "0.17.0", + note = "use metadata().configuration or get_state().table_config() instead" + )] pub fn get_configurations(&self) -> Result<&HashMap>, DeltaTableError> { Ok(self .state diff --git a/crates/deltalake-core/src/table/state.rs b/crates/deltalake-core/src/table/state.rs index ccff7f6257..fa9078997c 100644 --- a/crates/deltalake-core/src/table/state.rs +++ b/crates/deltalake-core/src/table/state.rs @@ -12,8 +12,8 @@ use serde::{Deserialize, Serialize}; use super::config::TableConfig; use crate::errors::DeltaTableError; -use crate::kernel::Protocol; use crate::kernel::{Action, Add, CommitInfo, DataType, DomainMetadata, Remove, StructType}; +use crate::kernel::{Metadata, Protocol}; use crate::partitions::{DeltaTablePartition, PartitionFilter}; use crate::protocol::ProtocolError; use crate::storage::commit_uri_from_version; @@ -41,7 +41,8 @@ pub struct DeltaTableState { app_transaction_version: HashMap, // table metadata corresponding to current version current_metadata: Option, - current_protocol: Option, + protocol: Option, + metadata: Option, } impl DeltaTableState { @@ -207,7 +208,12 @@ impl DeltaTableState { lazy_static! { static ref DEFAULT_PROTOCOL: Protocol = Protocol::default(); } - self.current_protocol.as_ref().unwrap_or(&DEFAULT_PROTOCOL) + self.protocol.as_ref().unwrap_or(&DEFAULT_PROTOCOL) + } + + /// The most recent metadata of the table. + pub fn metadata_action(&self) -> Result<&Metadata, ProtocolError> { + self.metadata.as_ref().ok_or(ProtocolError::NoMetaData) } /// The most recent metadata of the table. @@ -269,9 +275,12 @@ impl DeltaTableState { if new_state.current_metadata.is_some() { self.current_metadata = new_state.current_metadata.take(); } + if new_state.metadata.is_some() { + self.metadata = new_state.metadata.take(); + } - if new_state.current_protocol.is_some() { - self.current_protocol = new_state.current_protocol.take(); + if new_state.protocol.is_some() { + self.protocol = new_state.protocol.take(); } new_state @@ -314,9 +323,10 @@ impl DeltaTableState { } } Action::Protocol(v) => { - self.current_protocol = Some(v); + self.protocol = Some(v); } Action::Metadata(v) => { + self.metadata = Some(v.clone()); let md = DeltaTableMetaData::try_from(v)?; self.current_metadata = Some(md); } @@ -391,7 +401,8 @@ mod tests { domain_metadatas: vec![], app_transaction_version: Default::default(), current_metadata: None, - current_protocol: None, + metadata: None, + protocol: None, }; let bytes = serde_json::to_vec(&expected).unwrap(); let actual: DeltaTableState = serde_json::from_slice(&bytes).unwrap(); @@ -411,7 +422,8 @@ mod tests { domain_metadatas: vec![], tombstones: HashSet::new(), current_metadata: None, - current_protocol: None, + protocol: None, + metadata: None, app_transaction_version, }; diff --git a/crates/deltalake-core/src/writer/json.rs b/crates/deltalake-core/src/writer/json.rs index 7fec11fad2..0b970ae6d7 100644 --- a/crates/deltalake-core/src/writer/json.rs +++ b/crates/deltalake-core/src/writer/json.rs @@ -206,8 +206,8 @@ impl JsonWriter { /// Creates a JsonWriter to write to the given table pub fn for_table(table: &DeltaTable) -> Result { // Initialize an arrow schema ref from the delta table schema - let metadata = table.get_metadata()?; - let arrow_schema = >::try_from(&metadata.schema)?; + let metadata = table.metadata()?; + let arrow_schema = >::try_from(&metadata.schema()?)?; let arrow_schema_ref = Arc::new(arrow_schema); let partition_columns = metadata.partition_columns.clone(); diff --git a/crates/deltalake-core/src/writer/mod.rs b/crates/deltalake-core/src/writer/mod.rs index 3b73fe2ef6..fd3d2ed4e7 100644 --- a/crates/deltalake-core/src/writer/mod.rs +++ b/crates/deltalake-core/src/writer/mod.rs @@ -135,7 +135,7 @@ pub trait DeltaWriter { /// and commit the changes to the Delta log, creating a new table version. async fn flush_and_commit(&mut self, table: &mut DeltaTable) -> Result { let adds: Vec<_> = self.flush().await?.drain(..).map(Action::Add).collect(); - let partition_cols = table.get_metadata()?.partition_columns.clone(); + let partition_cols = table.metadata()?.partition_columns.clone(); let partition_by = if !partition_cols.is_empty() { Some(partition_cols) } else { diff --git a/crates/deltalake-core/src/writer/record_batch.rs b/crates/deltalake-core/src/writer/record_batch.rs index 49b5dfebc9..07240d0335 100644 --- a/crates/deltalake-core/src/writer/record_batch.rs +++ b/crates/deltalake-core/src/writer/record_batch.rs @@ -77,9 +77,9 @@ impl RecordBatchWriter { /// Creates a [`RecordBatchWriter`] to write data to provided Delta Table pub fn for_table(table: &DeltaTable) -> Result { // Initialize an arrow schema ref from the delta table schema - let metadata = table.get_metadata()?; + let metadata = table.metadata()?; let arrow_schema = - >::try_from(&metadata.schema.clone())?; + >::try_from(&metadata.schema()?.clone())?; let arrow_schema_ref = Arc::new(arrow_schema); let partition_columns = metadata.partition_columns.clone(); diff --git a/crates/deltalake-core/src/writer/stats.rs b/crates/deltalake-core/src/writer/stats.rs index 2e4f6ac177..cc3badc1fa 100644 --- a/crates/deltalake-core/src/writer/stats.rs +++ b/crates/deltalake-core/src/writer/stats.rs @@ -42,6 +42,7 @@ pub fn create_add( default_row_commit_version: None, stats_parsed: None, partition_values_parsed: None, + clustering_provider: None, }) } diff --git a/crates/deltalake-core/tests/common/mod.rs b/crates/deltalake-core/tests/common/mod.rs index af2e6e1a7f..d2742b2718 100644 --- a/crates/deltalake-core/tests/common/mod.rs +++ b/crates/deltalake-core/tests/common/mod.rs @@ -141,6 +141,7 @@ pub async fn add_file( default_row_commit_version: None, base_row_id: None, deletion_vector: None, + clustering_provider: None, }; let operation = DeltaOperation::Write { mode: SaveMode::Append, diff --git a/crates/deltalake-core/tests/fs_common/mod.rs b/crates/deltalake-core/tests/fs_common/mod.rs index 73593f26b1..c79fc833da 100644 --- a/crates/deltalake-core/tests/fs_common/mod.rs +++ b/crates/deltalake-core/tests/fs_common/mod.rs @@ -94,6 +94,7 @@ pub fn add(offset_millis: i64) -> Add { deletion_vector: None, base_row_id: None, default_row_commit_version: None, + clustering_provider: None, } } diff --git a/crates/deltalake-core/tests/integration_concurrent_writes.rs b/crates/deltalake-core/tests/integration_concurrent_writes.rs index 79c16e85dc..67fbbe38c5 100644 --- a/crates/deltalake-core/tests/integration_concurrent_writes.rs +++ b/crates/deltalake-core/tests/integration_concurrent_writes.rs @@ -68,8 +68,8 @@ async fn prepare_table( .await?; assert_eq!(0, table.version()); - assert_eq!(1, table.get_min_reader_version()); - assert_eq!(2, table.get_min_writer_version()); + assert_eq!(1, table.protocol().min_reader_version); + assert_eq!(2, table.protocol().min_writer_version); assert_eq!(0, table.get_files().len()); Ok((table, table_uri)) @@ -166,6 +166,7 @@ impl Worker { deletion_vector: None, base_row_id: None, default_row_commit_version: None, + clustering_provider: None, })]; let version = commit( self.table.log_store().as_ref(), diff --git a/crates/deltalake-core/tests/integration_read.rs b/crates/deltalake-core/tests/integration_read.rs index a15679a09c..44d91f3ad3 100644 --- a/crates/deltalake-core/tests/integration_read.rs +++ b/crates/deltalake-core/tests/integration_read.rs @@ -199,8 +199,8 @@ async fn read_simple_table(integration: &IntegrationContext) -> TestResult { .await?; assert_eq!(table.version(), 4); - assert_eq!(table.get_min_writer_version(), 2); - assert_eq!(table.get_min_reader_version(), 1); + assert_eq!(table.protocol().min_writer_version, 2); + assert_eq!(table.protocol().min_reader_version, 1); assert_eq!( table.get_files(), vec![ @@ -239,8 +239,8 @@ async fn read_simple_table_with_version(integration: &IntegrationContext) -> Tes .await?; assert_eq!(table.version(), 3); - assert_eq!(table.get_min_writer_version(), 2); - assert_eq!(table.get_min_reader_version(), 1); + assert_eq!(table.protocol().min_writer_version, 2); + assert_eq!(table.protocol().min_reader_version, 1); assert_eq!( table.get_files(), vec![ @@ -280,8 +280,8 @@ async fn read_golden(integration: &IntegrationContext) -> TestResult { .unwrap(); assert_eq!(table.version(), 0); - assert_eq!(table.get_min_writer_version(), 2); - assert_eq!(table.get_min_reader_version(), 1); + assert_eq!(table.protocol().min_writer_version, 2); + assert_eq!(table.protocol().min_reader_version, 1); Ok(()) } @@ -309,8 +309,8 @@ mod gcs { .await .unwrap(); assert_eq!(table.version(), 4); - assert_eq!(table.get_min_writer_version(), 2); - assert_eq!(table.get_min_reader_version(), 1); + assert_eq!(table.protocol().min_writer_version, 2); + assert_eq!(table.protocol().min_reader_version, 1); assert_eq!( table.get_files(), vec![ diff --git a/crates/deltalake-core/tests/read_delta_log_test.rs b/crates/deltalake-core/tests/read_delta_log_test.rs index 248f25c876..dc7e86ebe5 100644 --- a/crates/deltalake-core/tests/read_delta_log_test.rs +++ b/crates/deltalake-core/tests/read_delta_log_test.rs @@ -153,8 +153,8 @@ async fn test_read_liquid_table() -> DeltaResult<()> { #[tokio::test] async fn test_read_table_features() -> DeltaResult<()> { let mut _table = deltalake_core::open_table("./tests/data/simple_table_features").await?; - let rf = _table.get_reader_features(); - let wf = _table.get_writer_features(); + let rf = _table.protocol().reader_features.clone(); + let wf = _table.protocol().writer_features.clone(); assert!(rf.is_some()); assert!(wf.is_some()); @@ -170,5 +170,5 @@ async fn read_delta_table_from_dlt() { .await .unwrap(); assert_eq!(table.version(), 1); - assert!(table.schema().is_some()); + assert!(table.get_schema().is_ok()); } diff --git a/crates/deltalake/examples/recordbatch-writer.rs b/crates/deltalake/examples/recordbatch-writer.rs index e7fd7125cd..67aac3b962 100644 --- a/crates/deltalake/examples/recordbatch-writer.rs +++ b/crates/deltalake/examples/recordbatch-writer.rs @@ -159,10 +159,10 @@ fn fetch_readings() -> Vec { */ fn convert_to_batch(table: &DeltaTable, records: &Vec) -> RecordBatch { let metadata = table - .get_metadata() + .metadata() .expect("Failed to get metadata for the table"); let arrow_schema = >::try_from( - &metadata.schema.clone(), + &metadata.schema().expect("failed to get schema"), ) .expect("Failed to convert to arrow schema"); let arrow_schema_ref = Arc::new(arrow_schema); diff --git a/python/src/lib.rs b/python/src/lib.rs index 9da42f1170..5741bd40d2 100644 --- a/python/src/lib.rs +++ b/python/src/lib.rs @@ -26,7 +26,7 @@ use deltalake::datafusion::datasource::provider::TableProvider; use deltalake::datafusion::prelude::SessionContext; use deltalake::delta_datafusion::DeltaDataChecker; use deltalake::errors::DeltaTableError; -use deltalake::kernel::{Action, Add, Invariant, Metadata, Remove, StructType}; +use deltalake::kernel::{Action, Add, Invariant, Remove, StructType}; use deltalake::operations::convert_to_delta::{ConvertToDeltaBuilder, PartitionStrategy}; use deltalake::operations::delete::DeleteBuilder; use deltalake::operations::filesystem_check::FileSystemCheckBuilder; @@ -155,7 +155,7 @@ impl RawDeltaTable { } pub fn metadata(&self) -> PyResult { - let metadata = self._table.get_metadata().map_err(PythonError::from)?; + let metadata = self._table.metadata().map_err(PythonError::from)?; Ok(RawDeltaTableMetaData { id: metadata.id.clone(), name: metadata.name.clone(), @@ -168,8 +168,8 @@ impl RawDeltaTable { pub fn protocol_versions(&self) -> PyResult<(i32, i32)> { Ok(( - self._table.get_min_reader_version(), - self._table.get_min_writer_version(), + self._table.protocol().min_reader_version, + self._table.protocol().min_writer_version, )) } @@ -685,15 +685,15 @@ impl RawDeltaTable { ) -> PyResult<&'py PyFrozenSet> { let column_names: HashSet<&str> = self ._table - .schema() - .ok_or_else(|| DeltaProtocolError::new_err("table does not yet have a schema"))? + .get_schema() + .map_err(|_| DeltaProtocolError::new_err("table does not yet have a schema"))? .fields() .iter() .map(|field| field.name().as_str()) .collect(); let partition_columns: HashSet<&str> = self ._table - .get_metadata() + .metadata() .map_err(PythonError::from)? .partition_columns .iter() @@ -799,15 +799,11 @@ impl RawDeltaTable { // Update metadata with new schema if &schema != existing_schema { - let mut metadata = self - ._table - .get_metadata() - .map_err(PythonError::from)? - .clone(); - metadata.schema = schema; - let metadata_action = Metadata::try_from(metadata) - .map_err(|_| PyValueError::new_err("Failed to reparse metadata"))?; - actions.push(Action::Metadata(metadata_action)); + let mut metadata = self._table.metadata().map_err(PythonError::from)?.clone(); + metadata.schema_string = serde_json::to_string(&schema) + .map_err(DeltaTableError::from) + .map_err(PythonError::from)?; + actions.push(Action::Metadata(metadata)); } } _ => { @@ -1122,6 +1118,7 @@ impl From<&PyAddAction> for Add { deletion_vector: None, base_row_id: None, default_row_commit_version: None, + clustering_provider: None, } } }