diff --git a/rust/src/operations/optimize.rs b/rust/src/operations/optimize.rs index 70c6a0429a..123d08913a 100644 --- a/rust/src/operations/optimize.rs +++ b/rust/src/operations/optimize.rs @@ -35,6 +35,7 @@ use arrow_schema::ArrowError; use futures::future::BoxFuture; use futures::stream::BoxStream; use futures::{Future, StreamExt, TryStreamExt}; +use itertools::Itertools; use log::debug; use num_cpus; use parquet::arrow::async_reader::{ParquetObjectReader, ParquetRecordBatchStreamBuilder}; @@ -794,6 +795,38 @@ fn build_zorder_plan( filters: &[PartitionFilter<'_, &str>], target_size: i64, ) -> Result<(OptimizeOperations, Metrics), DeltaTableError> { + if zorder_columns.is_empty() { + return Err(DeltaTableError::Generic( + "Z-order requires at least one column".to_string(), + )); + } + let zorder_partition_cols = zorder_columns + .iter() + .filter(|col| partition_keys.contains(col)) + .collect_vec(); + if !zorder_partition_cols.is_empty() { + return Err(DeltaTableError::Generic(format!( + "Z-order columns cannot be partition columns. Found: {zorder_partition_cols:?}" + ))); + } + let field_names = snapshot + .current_metadata() + .unwrap() + .schema + .get_fields() + .iter() + .map(|field| field.get_name().to_string()) + .collect_vec(); + let unknown_columns = zorder_columns + .iter() + .filter(|col| !field_names.contains(col)) + .collect_vec(); + if !unknown_columns.is_empty() { + return Err(DeltaTableError::Generic( + format!("Z-order columns must be present in the table schema. Unknown columns: {unknown_columns:?}"), + )); + } + // For now, just be naive and optimize all files in each selected partition. let mut metrics = Metrics::default(); diff --git a/rust/tests/command_optimize.rs b/rust/tests/command_optimize.rs index 6137eb5fc7..2d6bf38246 100644 --- a/rust/tests/command_optimize.rs +++ b/rust/tests/command_optimize.rs @@ -6,11 +6,17 @@ use arrow::{ datatypes::{DataType, Field}, record_batch::RecordBatch, }; +use arrow_select::concat::concat_batches; use deltalake::action::{Action, Remove}; use deltalake::operations::optimize::{create_merge_plan, MetricDetails, Metrics, OptimizeType}; use deltalake::operations::DeltaOps; +use deltalake::storage::ObjectStoreRef; use deltalake::writer::{DeltaWriter, RecordBatchWriter}; -use deltalake::{DeltaTable, DeltaTableError, PartitionFilter, SchemaDataType, SchemaField}; +use deltalake::{DeltaTable, DeltaTableError, PartitionFilter, Path, SchemaDataType, SchemaField}; +use futures::TryStreamExt; +use object_store::ObjectStore; +use parquet::arrow::async_reader::ParquetObjectReader; +use parquet::arrow::ParquetRecordBatchStreamBuilder; use parquet::file::properties::WriterProperties; use rand::prelude::*; use serde_json::json; @@ -497,3 +503,197 @@ async fn test_commit_info() -> Result<(), Box> { Ok(()) } + +#[tokio::test] +async fn test_zorder_rejects_zero_columns() -> Result<(), Box> { + let context = setup_test(true).await?; + let dt = context.table; + + // Rejects zero columns + let result = DeltaOps(dt) + .optimize() + .with_type(OptimizeType::ZOrder(vec![])) + .await; + assert!(result.is_err()); + assert!(result + .unwrap_err() + .to_string() + .contains("Z-order requires at least one column")); + Ok(()) +} + +#[tokio::test] +async fn test_zorder_rejects_nonexistent_columns() -> Result<(), Box> { + let context = setup_test(true).await?; + let dt = context.table; + + // Rejects non-existent columns + let result = DeltaOps(dt) + .optimize() + .with_type(OptimizeType::ZOrder(vec!["non-existent".to_string()])) + .await; + assert!(result.is_err()); + assert!(result.unwrap_err().to_string().contains( + "Z-order columns must be present in the table schema. Unknown columns: [\"non-existent\"]" + )); + Ok(()) +} + +#[tokio::test] +async fn test_zorder_rejects_partition_column() -> Result<(), Box> { + let context = setup_test(true).await?; + let mut dt = context.table; + let mut writer = RecordBatchWriter::for_table(&dt)?; + + write( + &mut writer, + &mut dt, + tuples_to_batch(vec![(1, 2), (1, 3), (1, 4)], "2022-05-22")?, + ) + .await?; + // Rejects partition columns + let result = DeltaOps(dt) + .optimize() + .with_type(OptimizeType::ZOrder(vec!["date".to_string()])) + .await; + assert!(result.is_err()); + assert!(result + .unwrap_err() + .to_string() + .contains("Z-order columns cannot be partition columns. Found: [\"date\"]")); + + Ok(()) +} + +#[tokio::test] +async fn test_zorder_unpartitioned() -> Result<(), Box> { + let context = setup_test(false).await?; + let mut dt = context.table; + let mut writer = RecordBatchWriter::for_table(&dt)?; + + write( + &mut writer, + &mut dt, + tuples_to_batch(vec![(1, 1), (1, 2), (1, 2)], "1970-01-01")?, + ) + .await?; + + write( + &mut writer, + &mut dt, + tuples_to_batch(vec![(2, 1), (2, 2), (1, 2)], "1970-01-04")?, + ) + .await?; + + let optimize = DeltaOps(dt).optimize().with_type(OptimizeType::ZOrder(vec![ + "date".to_string(), + "x".to_string(), + "y".to_string(), + ])); + let (dt, metrics) = optimize.await?; + + assert_eq!(metrics.num_files_added, 1); + assert_eq!(metrics.num_files_removed, 2); + assert_eq!(metrics.total_files_skipped, 0); + assert_eq!(metrics.total_considered_files, 2); + + // Check data + let files = dt.get_files(); + assert_eq!(files.len(), 1); + + let actual = read_parquet_file(&files[0], dt.object_store()).await?; + let expected = RecordBatch::try_new( + actual.schema(), + // Note that the order is not hierarchically sorted. + vec![ + Arc::new(Int32Array::from(vec![1, 2, 1, 1, 1, 2])), + Arc::new(Int32Array::from(vec![1, 1, 2, 2, 2, 2])), + Arc::new(StringArray::from(vec![ + "1970-01-01", + "1970-01-04", + "1970-01-01", + "1970-01-01", + "1970-01-04", + "1970-01-04", + ])), + ], + )?; + + assert_eq!(actual, expected); + + Ok(()) +} + +#[tokio::test] +async fn test_zorder_partitioned() -> Result<(), Box> { + let context = setup_test(true).await?; + let mut dt = context.table; + let mut writer = RecordBatchWriter::for_table(&dt)?; + + // Write data in sorted order. Each value is a power of 2, so will affect + // a new bit in the z-ordering. + write( + &mut writer, + &mut dt, + tuples_to_batch(vec![(1, 1), (1, 2), (1, 4)], "2022-05-22")?, + ) + .await?; + + write( + &mut writer, + &mut dt, + tuples_to_batch(vec![(2, 1), (2, 2), (2, 4)], "2022-05-22")?, + ) + .await?; + + // This batch doesn't matter; we just use it to test partition filtering + write( + &mut writer, + &mut dt, + tuples_to_batch(vec![(1, 1), (1, 1), (1, 1)], "2022-05-23")?, + ) + .await?; + + let filter = vec![PartitionFilter::try_from(("date", "=", "2022-05-22"))?]; + + let optimize = DeltaOps(dt) + .optimize() + .with_type(OptimizeType::ZOrder(vec!["x".to_string(), "y".to_string()])) + .with_filters(&filter); + let (dt, metrics) = optimize.await?; + + assert_eq!(metrics.num_files_added, 1); + assert_eq!(metrics.num_files_removed, 2); + + // Check data + let files = dt.get_files_by_partitions(&filter)?; + assert_eq!(files.len(), 1); + + let actual = read_parquet_file(&files[0], dt.object_store()).await?; + let expected = RecordBatch::try_new( + actual.schema(), + // Note that the order is not hierarchically sorted. + vec![ + Arc::new(Int32Array::from(vec![1, 2, 1, 2, 1, 2])), + Arc::new(Int32Array::from(vec![1, 1, 2, 2, 4, 4])), + ], + )?; + + assert_eq!(actual, expected); + + Ok(()) +} + +async fn read_parquet_file( + path: &Path, + object_store: ObjectStoreRef, +) -> Result> { + let file = object_store.head(path).await?; + let file_reader = ParquetObjectReader::new(object_store, file); + let batches = ParquetRecordBatchStreamBuilder::new(file_reader) + .await? + .build()? + .try_collect::>() + .await?; + Ok(concat_batches(&batches[0].schema(), &batches)?) +}