diff --git a/Cargo.lock b/Cargo.lock index 074292193f..63421786f9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -722,6 +722,8 @@ dependencies = [ "chrono", "criterion", "datafusion", + "datafusion-common", + "datafusion-expr", "dotenv", "dynamodb_lock", "errno", diff --git a/rust/Cargo.toml b/rust/Cargo.toml index 804d02f907..3b280e1a12 100644 --- a/rust/Cargo.toml +++ b/rust/Cargo.toml @@ -25,7 +25,7 @@ object_store = "0.5.0" once_cell = "1.12.0" parquet = { version = "22", features = ["async"], optional = true } parquet2 = { version = "0.16", optional = true } -parquet-format = "~4.0.0" +parquet-format = { version = "~4.0.0" } percent-encoding = "2" serde = { version = "1", features = ["derive"] } serde_json = "1" @@ -44,17 +44,44 @@ rusoto_dynamodb = { version = "0.48", default-features = false, optional = true # Glue rusoto_glue = { version = "0.48", default-features = false, optional = true } +# Datafusion +datafusion = { version = "12", optional = true } +datafusion-expr = { version = "12", optional = true } +datafusion-common = { version = "12", optional = true } + # NOTE dependencies only for integration tests fs_extra = { version = "1.2.0", optional = true } tempdir = { version = "0", optional = true } -[dependencies.datafusion] -version = "12" +[dependencies.dynamodb_lock] +path = "../dynamodb_lock" +version = "0" +default-features = false optional = true +[dev-dependencies] +criterion = "0" +dotenv = "*" +maplit = "1" +pretty_assertions = "1.2.1" +rand = "0.8" +serial_test = "0" +tempdir = "0" +tempfile = "3" +utime = "0.3" + +[build-dependencies] +glibc_version = "0" + [features] default = ["arrow", "parquet"] -datafusion-ext = ["datafusion"] +datafusion-ext = [ + "datafusion", + "datafusion-expr", + "datafusion-common", + "arrow", + "parquet", +] azure = ["object_store/azure"] gcs = ["object_store/gcp"] s3 = [ @@ -78,26 +105,6 @@ python = ["arrow/pyarrow"] # used only for integration testing integration_test = ["fs_extra", "tempdir"] -[build-dependencies] -glibc_version = "0" - -[dependencies.dynamodb_lock] -path = "../dynamodb_lock" -version = "0" -default-features = false -optional = true - -[dev-dependencies] -criterion = "0" -dotenv = "*" -maplit = "1" -pretty_assertions = "1.3.0" -rand = "0.8" -serial_test = "0" -tempdir = "0" -tempfile = "3" -utime = "0.3" - [[bench]] name = "read_checkpoint" harness = false diff --git a/rust/src/delta_datafusion.rs b/rust/src/delta_datafusion.rs index 7eae981550..8fd4fd6d5b 100644 --- a/rust/src/delta_datafusion.rs +++ b/rust/src/delta_datafusion.rs @@ -25,29 +25,27 @@ use std::collections::HashMap; use std::convert::TryFrom; use std::sync::Arc; +use crate::action; +use crate::schema; +use crate::{DeltaTable, DeltaTableError}; + use arrow::array::ArrayRef; use arrow::compute::{cast_with_options, CastOptions}; use arrow::datatypes::{DataType as ArrowDataType, Schema as ArrowSchema, TimeUnit}; use async_trait::async_trait; use chrono::{DateTime, NaiveDateTime, Utc}; use datafusion::datasource::file_format::{parquet::ParquetFormat, FileFormat}; -use datafusion::datasource::listing::PartitionedFile; -use datafusion::datasource::{TableProvider, TableType}; -use datafusion::error::{DataFusionError, Result as DataFusionResult}; +use datafusion::datasource::{listing::PartitionedFile, TableProvider, TableType}; use datafusion::execution::context::SessionState; -use datafusion::logical_plan::{combine_filters, Column, Expr}; use datafusion::physical_optimizer::pruning::{PruningPredicate, PruningStatistics}; use datafusion::physical_plan::file_format::FileScanConfig; use datafusion::physical_plan::{ColumnStatistics, ExecutionPlan, Statistics}; -use datafusion::scalar::ScalarValue; +use datafusion_common::scalar::ScalarValue; +use datafusion_common::{Column, DataFusionError, Result as DataFusionResult}; +use datafusion_expr::{combine_filters, Expr}; use object_store::{path::Path, ObjectMeta}; use url::Url; -use crate::action; -use crate::delta; -use crate::schema; -use crate::DeltaTableError; - impl From for DataFusionError { fn from(err: DeltaTableError) -> Self { match err { @@ -60,7 +58,7 @@ impl From for DataFusionError { } } -impl From for crate::DeltaTableError { +impl From for DeltaTableError { fn from(err: DataFusionError) -> Self { match err { DataFusionError::ArrowError(source) => DeltaTableError::Arrow { source }, @@ -72,7 +70,7 @@ impl From for crate::DeltaTableError { } } -impl delta::DeltaTable { +impl DeltaTable { /// Return statistics for Datafusion Table pub fn datafusion_table_statistics(&self) -> Statistics { let stats = self @@ -222,7 +220,7 @@ impl delta::DeltaTable { } } -impl PruningStatistics for delta::DeltaTable { +impl PruningStatistics for DeltaTable { /// return the minimum values for the named column, if known. /// Note: the returned array must contain `num_containers()` rows fn min_values(&self, column: &Column) -> Option { @@ -294,13 +292,11 @@ impl PruningStatistics for delta::DeltaTable { } #[async_trait] -impl TableProvider for delta::DeltaTable { +impl TableProvider for DeltaTable { fn schema(&self) -> Arc { Arc::new( - >::try_from( - delta::DeltaTable::schema(self).unwrap(), - ) - .unwrap(), + >::try_from(DeltaTable::schema(self).unwrap()) + .unwrap(), ) } @@ -316,7 +312,7 @@ impl TableProvider for delta::DeltaTable { limit: Option, ) -> DataFusionResult> { let schema = Arc::new(>::try_from( - delta::DeltaTable::schema(self).unwrap(), + DeltaTable::schema(self).unwrap(), )?); // each delta table must register a specific object store, since paths are internally @@ -421,7 +417,7 @@ fn partitioned_file_from_action(action: &action::Add, schema: &ArrowSchema) -> P } } -fn to_scalar_value(stat_val: &serde_json::Value) -> Option { +fn to_scalar_value(stat_val: &serde_json::Value) -> Option { match stat_val { serde_json::Value::Bool(val) => Some(ScalarValue::from(*val)), serde_json::Value::Number(num) => { @@ -443,7 +439,7 @@ fn to_scalar_value(stat_val: &serde_json::Value) -> Option Option { +) -> Option { match stat_val { serde_json::Value::Array(_) => None, serde_json::Value::Object(_) => None, @@ -485,10 +481,7 @@ fn to_correct_scalar_value( } } -fn correct_scalar_value_type( - value: datafusion::scalar::ScalarValue, - field_dt: &ArrowDataType, -) -> Option { +fn correct_scalar_value_type(value: ScalarValue, field_dt: &ArrowDataType) -> Option { match field_dt { ArrowDataType::Int64 => { let raw_value = i64::try_from(value).ok()?; @@ -565,10 +558,7 @@ fn correct_scalar_value_type( } } -fn left_larger_than_right( - left: datafusion::scalar::ScalarValue, - right: datafusion::scalar::ScalarValue, -) -> Option { +fn left_larger_than_right(left: ScalarValue, right: ScalarValue) -> Option { match left { ScalarValue::Float64(Some(v)) => { let f_right = f64::try_from(right).ok()?; diff --git a/rust/src/operations/create.rs b/rust/src/operations/create.rs index ca5e3e6a54..473e209908 100644 --- a/rust/src/operations/create.rs +++ b/rust/src/operations/create.rs @@ -1,5 +1,7 @@ //! Command for creating a new delta table // https://github.com/delta-io/delta/blob/master/core/src/main/scala/org/apache/spark/sql/delta/commands/CreateDeltaTableCommand.scala +use std::sync::Arc; + use super::{ to_datafusion_err, transaction::{serialize_actions, OPERATION_SCHEMA}, @@ -9,11 +11,11 @@ use crate::{ action::{Action, DeltaOperation, MetaData, Protocol, SaveMode}, DeltaTableBuilder, DeltaTableMetaData, }; + +use arrow::datatypes::SchemaRef; use async_trait::async_trait; use core::any::Any; use datafusion::{ - arrow::datatypes::SchemaRef, - error::{DataFusionError, Result as DataFusionResult}, execution::context::TaskContext, physical_plan::{ common::{compute_record_batch_statistics, SizedRecordBatchStream}, @@ -23,8 +25,8 @@ use datafusion::{ Distribution, ExecutionPlan, Partitioning, SendableRecordBatchStream, Statistics, }, }; +use datafusion_common::{DataFusionError, Result as DataFusionResult}; use futures::{TryFutureExt, TryStreamExt}; -use std::sync::Arc; /// Command for creating new delta table pub struct CreateCommand { diff --git a/rust/src/operations/mod.rs b/rust/src/operations/mod.rs index 2c9e25d9d1..0a943be7e3 100644 --- a/rust/src/operations/mod.rs +++ b/rust/src/operations/mod.rs @@ -1,6 +1,11 @@ //! High level delta commands that can be executed against a delta table // TODO // - rename to delta operations +use std::collections::HashMap; +use std::convert::TryFrom; +use std::fmt::Debug; +use std::sync::Arc; + use crate::{ action::{DeltaOperation, Protocol, SaveMode}, builder::DeltaTableBuilder, @@ -9,16 +14,13 @@ use crate::{ writer::{record_batch::divide_by_partition_values, utils::PartitionPath}, DeltaTable, DeltaTableError, DeltaTableMetaData, }; + use arrow::{datatypes::SchemaRef as ArrowSchemaRef, error::ArrowError, record_batch::RecordBatch}; use datafusion::{ - error::DataFusionError, physical_plan::{collect, memory::MemoryExec, ExecutionPlan}, prelude::SessionContext, }; -use std::collections::HashMap; -use std::convert::TryFrom; -use std::fmt::Debug; -use std::sync::Arc; +use datafusion_common::DataFusionError; pub mod create; pub mod transaction; diff --git a/rust/src/operations/transaction.rs b/rust/src/operations/transaction.rs index b1b183eac9..8071cbd65c 100644 --- a/rust/src/operations/transaction.rs +++ b/rust/src/operations/transaction.rs @@ -1,17 +1,17 @@ //! Wrapper Execution plan to handle distributed operations +use core::any::Any; +use std::sync::Arc; + use super::*; use crate::action::Action; use crate::schema::DeltaDataTypeVersion; -use core::any::Any; + +use arrow::array::StringArray; +use arrow::datatypes::{ + DataType, Field as ArrowField, Schema as ArrowSchema, SchemaRef as ArrowSchemaRef, +}; +use arrow::record_batch::RecordBatch; use datafusion::{ - arrow::{ - array::StringArray, - datatypes::{ - DataType, Field as ArrowField, Schema as ArrowSchema, SchemaRef as ArrowSchemaRef, - }, - record_batch::RecordBatch, - }, - error::Result as DataFusionResult, execution::context::TaskContext, physical_plan::{ coalesce_partitions::CoalescePartitionsExec, common::compute_record_batch_statistics, @@ -19,9 +19,9 @@ use datafusion::{ Distribution, ExecutionPlan, Partitioning, SendableRecordBatchStream, Statistics, }, }; +use datafusion_common::Result as DataFusionResult; use futures::{TryFutureExt, TryStreamExt}; use lazy_static::lazy_static; -use std::sync::Arc; lazy_static! { /// Schema expected for plans wrapped by transaction diff --git a/rust/src/operations/write.rs b/rust/src/operations/write.rs index c1355f1528..a4673f2a40 100644 --- a/rust/src/operations/write.rs +++ b/rust/src/operations/write.rs @@ -16,20 +16,22 @@ //! replace data that matches a predicate. // https://github.com/delta-io/delta/blob/master/core/src/main/scala/org/apache/spark/sql/delta/commands/WriteIntoDelta.scala +use core::any::Any; +use std::collections::HashMap; +use std::sync::Arc; +use std::time::{SystemTime, UNIX_EPOCH}; + use super::{ create::CreateCommand, transaction::{serialize_actions, OPERATION_SCHEMA}, *, }; -use crate::{ - action::{Action, Add, Remove, SaveMode}, - writer::{DeltaWriter, RecordBatchWriter}, - Schema, -}; -use core::any::Any; +use crate::action::{Action, Add, Remove, SaveMode}; +use crate::writer::{DeltaWriter, RecordBatchWriter}; +use crate::Schema; + +use arrow::datatypes::SchemaRef as ArrowSchemaRef; use datafusion::{ - arrow::datatypes::SchemaRef as ArrowSchemaRef, - error::Result as DataFusionResult, execution::context::TaskContext, physical_plan::{ common::{ @@ -42,10 +44,8 @@ use datafusion::{ SendableRecordBatchStream, Statistics, }, }; +use datafusion_common::Result as DataFusionResult; use futures::{TryFutureExt, TryStreamExt}; -use std::collections::HashMap; -use std::sync::Arc; -use std::time::{SystemTime, UNIX_EPOCH}; const MAX_SUPPORTED_WRITER_VERSION: i32 = 1; diff --git a/rust/src/storage/mod.rs b/rust/src/storage/mod.rs index 81145bf76b..d1522ebe47 100644 --- a/rust/src/storage/mod.rs +++ b/rust/src/storage/mod.rs @@ -154,6 +154,15 @@ impl DeltaObjectStore { } Ok(()) } + + /// Check if the location is a delta table location + pub async fn is_delta_table_location(&self) -> ObjectStoreResult { + match self.head(self.log_path()).await { + Ok(_) => Ok(true), + Err(ObjectStoreError::NotFound { .. }) => Ok(false), + Err(err) => Err(err), + } + } } #[async_trait::async_trait] diff --git a/rust/src/storage/utils.rs b/rust/src/storage/utils.rs index 87e39ff178..b352149608 100644 --- a/rust/src/storage/utils.rs +++ b/rust/src/storage/utils.rs @@ -4,8 +4,10 @@ use std::collections::HashMap; use crate::builder::DeltaTableBuilder; use crate::DeltaTableError; -use futures::StreamExt; -use object_store::ObjectStore; +use futures::{StreamExt, TryStreamExt}; +use object_store::path::Path; +use object_store::{DynObjectStore, Result as ObjectStoreResult}; +use std::sync::Arc; /// Copies the contents from the `from` location into the `to` location pub async fn copy_table( @@ -20,7 +22,14 @@ pub async fn copy_table( let to_store = DeltaTableBuilder::from_uri(to) .with_storage_options(to_options.unwrap_or_default()) .build_storage()?; + sync_stores(from_store, to_store).await +} +/// Synchronize the contents of two object stores +pub async fn sync_stores( + from_store: Arc, + to_store: Arc, +) -> Result<(), DeltaTableError> { // TODO if a table is copied within the same root store (i.e bucket), using copy would be MUCH more efficient let mut meta_stream = from_store.list(None).await?; while let Some(file) = meta_stream.next().await { @@ -29,6 +38,18 @@ pub async fn copy_table( to_store.put(&meta.location, bytes).await?; } } - Ok(()) } + +/// Collect list stream +pub async fn flatten_list_stream( + storage: &DynObjectStore, + prefix: Option<&Path>, +) -> ObjectStoreResult> { + storage + .list(prefix) + .await? + .map_ok(|meta| meta.location) + .try_collect::>() + .await +} diff --git a/rust/tests/command_optimize.rs b/rust/tests/command_optimize.rs index 5ec3e97432..b48df4d25a 100644 --- a/rust/tests/command_optimize.rs +++ b/rust/tests/command_optimize.rs @@ -1,504 +1,503 @@ -#[cfg(all(feature = "arrow", feature = "parquet"))] -mod optimize { - use arrow::datatypes::Schema as ArrowSchema; - use arrow::{ - array::{Int32Array, StringArray}, - datatypes::{DataType, Field}, - record_batch::RecordBatch, - }; - use deltalake::optimize::{MetricDetails, Metrics}; - use deltalake::DeltaTableError; - use deltalake::{ - action, - action::Remove, - builder::DeltaTableBuilder, - optimize::{create_merge_plan, Optimize}, - writer::{DeltaWriter, RecordBatchWriter}, - DeltaTableMetaData, PartitionFilter, - }; - use deltalake::{DeltaTable, Schema, SchemaDataType, SchemaField}; - use rand::prelude::*; - use serde_json::{json, Map, Value}; - use std::time::SystemTime; - use std::time::UNIX_EPOCH; - use std::{collections::HashMap, error::Error, sync::Arc}; - use tempdir::TempDir; - - struct Context { - pub tmp_dir: TempDir, - pub table: DeltaTable, - } +#![cfg(all(feature = "arrow", feature = "parquet"))] + +use arrow::datatypes::Schema as ArrowSchema; +use arrow::{ + array::{Int32Array, StringArray}, + datatypes::{DataType, Field}, + record_batch::RecordBatch, +}; +use deltalake::optimize::{MetricDetails, Metrics}; +use deltalake::DeltaTableError; +use deltalake::{ + action, + action::Remove, + builder::DeltaTableBuilder, + optimize::{create_merge_plan, Optimize}, + writer::{DeltaWriter, RecordBatchWriter}, + DeltaTableMetaData, PartitionFilter, +}; +use deltalake::{DeltaTable, Schema, SchemaDataType, SchemaField}; +use rand::prelude::*; +use serde_json::{json, Map, Value}; +use std::time::SystemTime; +use std::time::UNIX_EPOCH; +use std::{collections::HashMap, error::Error, sync::Arc}; +use tempdir::TempDir; + +struct Context { + pub tmp_dir: TempDir, + pub table: DeltaTable, +} - async fn setup_test(partitioned: bool) -> Result> { - let schema = Schema::new(vec![ - SchemaField::new( - "x".to_owned(), - SchemaDataType::primitive("integer".to_owned()), - false, - HashMap::new(), - ), - SchemaField::new( - "y".to_owned(), - SchemaDataType::primitive("integer".to_owned()), - false, - HashMap::new(), - ), - SchemaField::new( - "date".to_owned(), - SchemaDataType::primitive("string".to_owned()), - false, - HashMap::new(), - ), - ]); - - let p = if partitioned { - vec!["date".to_owned()] - } else { - vec![] - }; - - let table_meta = DeltaTableMetaData::new( - Some("opt_table".to_owned()), - Some("Table for optimize tests".to_owned()), - None, - schema.clone(), - p, +async fn setup_test(partitioned: bool) -> Result> { + let schema = Schema::new(vec![ + SchemaField::new( + "x".to_owned(), + SchemaDataType::primitive("integer".to_owned()), + false, HashMap::new(), - ); - - let tmp_dir = tempdir::TempDir::new("opt_table").unwrap(); - let p = tmp_dir.path().to_str().to_owned().unwrap(); - let mut dt = DeltaTableBuilder::from_uri(p).build()?; - - let mut commit_info = Map::::new(); - - let protocol = action::Protocol { - min_reader_version: 1, - min_writer_version: 2, - }; - - commit_info.insert( - "operation".to_string(), - serde_json::Value::String("CREATE TABLE".to_string()), - ); - let _res = dt - .create( - table_meta.clone(), - protocol.clone(), - Some(commit_info), - None, - ) - .await?; - - Ok(Context { tmp_dir, table: dt }) - } - - fn generate_random_batch>( - rows: usize, - partition: T, - ) -> Result> { - let mut x_vec: Vec = Vec::with_capacity(rows); - let mut y_vec: Vec = Vec::with_capacity(rows); - let mut date_vec = Vec::with_capacity(rows); - let mut rng = rand::thread_rng(); - let s = partition.into(); - - for _ in 0..rows { - x_vec.push(rng.gen()); - y_vec.push(rng.gen()); - date_vec.push(s.clone()); - } - - let x_array = Int32Array::from(x_vec); - let y_array = Int32Array::from(y_vec); - let date_array = StringArray::from(date_vec); - - Ok(RecordBatch::try_new( - Arc::new(ArrowSchema::new(vec![ - Field::new("x", DataType::Int32, false), - Field::new("y", DataType::Int32, false), - Field::new("date", DataType::Utf8, false), - ])), - vec![Arc::new(x_array), Arc::new(y_array), Arc::new(date_array)], - )?) - } - - fn tuples_to_batch>( - tuples: Vec<(i32, i32)>, - partition: T, - ) -> Result> { - let mut x_vec: Vec = Vec::new(); - let mut y_vec: Vec = Vec::new(); - let mut date_vec = Vec::new(); - let s = partition.into(); - - for t in tuples { - x_vec.push(t.0); - y_vec.push(t.1); - date_vec.push(s.clone()); - } - - let x_array = Int32Array::from(x_vec); - let y_array = Int32Array::from(y_vec); - let date_array = StringArray::from(date_vec); - - Ok(RecordBatch::try_new( - Arc::new(ArrowSchema::new(vec![ - Field::new("x", DataType::Int32, false), - Field::new("y", DataType::Int32, false), - Field::new("date", DataType::Utf8, false), - ])), - vec![Arc::new(x_array), Arc::new(y_array), Arc::new(date_array)], - )?) - } - - fn records_for_size(size: usize) -> usize { - //12 bytes to account of overhead - size / 12 - } - - #[tokio::test] - async fn test_optimize_non_partitioned_table() -> Result<(), Box> { - let context = setup_test(false).await?; - let mut dt = context.table; - let mut writer = RecordBatchWriter::for_table(&dt)?; - - write( - &mut writer, - &mut dt, - tuples_to_batch(vec![(1, 2), (1, 3), (1, 4)], "2022-05-22")?, - ) - .await?; - write( - &mut writer, - &mut dt, - tuples_to_batch(vec![(2, 1), (2, 3), (2, 3)], "2022-05-23")?, - ) - .await?; - write( - &mut writer, - &mut dt, - tuples_to_batch(vec![(3, 1), (3, 3), (3, 3)], "2022-05-22")?, - ) - .await?; - write( - &mut writer, - &mut dt, - tuples_to_batch(vec![(4, 1), (4, 3), (4, 3)], "2022-05-23")?, - ) - .await?; - write( - &mut writer, - &mut dt, - generate_random_batch(records_for_size(4_000_000), "2022-05-22")?, - ) - .await?; - - let version = dt.version(); - assert_eq!(dt.get_state().files().len(), 5); - - let optimize = Optimize::default().target_size(2_000_000); - let metrics = optimize.execute(&mut dt).await?; - - assert_eq!(version + 1, dt.version()); - assert_eq!(metrics.num_files_added, 1); - assert_eq!(metrics.num_files_removed, 4); - assert_eq!(metrics.total_considered_files, 5); - assert_eq!(metrics.partitions_optimized, 1); - assert_eq!(dt.get_state().files().len(), 2); - - Ok(()) - } - - async fn write( - writer: &mut RecordBatchWriter, - mut table: &mut DeltaTable, - batch: RecordBatch, - ) -> Result<(), DeltaTableError> { - writer.write(batch).await?; - writer.flush_and_commit(&mut table).await?; - Ok(()) - } - - #[tokio::test] - async fn test_optimize_with_partitions() -> Result<(), Box> { - let context = setup_test(true).await?; - let mut dt = context.table; - let mut writer = RecordBatchWriter::for_table(&dt)?; + ), + SchemaField::new( + "y".to_owned(), + SchemaDataType::primitive("integer".to_owned()), + false, + HashMap::new(), + ), + SchemaField::new( + "date".to_owned(), + SchemaDataType::primitive("string".to_owned()), + false, + HashMap::new(), + ), + ]); - write( - &mut writer, - &mut dt, - tuples_to_batch(vec![(1, 2), (1, 3), (1, 4)], "2022-05-22")?, - ) - .await?; - write( - &mut writer, - &mut dt, - tuples_to_batch(vec![(2, 1), (2, 3), (2, 3)], "2022-05-23")?, - ) - .await?; - write( - &mut writer, - &mut dt, - tuples_to_batch(vec![(3, 1), (3, 3), (3, 3)], "2022-05-22")?, - ) - .await?; - write( - &mut writer, - &mut dt, - tuples_to_batch(vec![(4, 1), (4, 3), (4, 3)], "2022-05-23")?, - ) - .await?; + let p = if partitioned { + vec!["date".to_owned()] + } else { + vec![] + }; - let version = dt.version(); - let mut filter = vec![]; - filter.push(PartitionFilter::try_from(("date", "=", "2022-05-22"))?); + let table_meta = DeltaTableMetaData::new( + Some("opt_table".to_owned()), + Some("Table for optimize tests".to_owned()), + None, + schema.clone(), + p, + HashMap::new(), + ); - let optimize = Optimize::default().filter(&filter); - let metrics = optimize.execute(&mut dt).await?; + let tmp_dir = tempdir::TempDir::new("opt_table").unwrap(); + let p = tmp_dir.path().to_str().to_owned().unwrap(); + let mut dt = DeltaTableBuilder::from_uri(p).build()?; - assert_eq!(version + 1, dt.version()); - assert_eq!(metrics.num_files_added, 1); - assert_eq!(metrics.num_files_removed, 2); - assert_eq!(dt.get_state().files().len(), 3); + let mut commit_info = Map::::new(); - Ok(()) - } + let protocol = action::Protocol { + min_reader_version: 1, + min_writer_version: 2, + }; - #[tokio::test] - #[ignore] - /// Validate that optimize fails when a remove action occurs - async fn test_conflict_for_remove_actions() -> Result<(), Box> { - let context = setup_test(true).await?; - let mut dt = context.table; - let mut writer = RecordBatchWriter::for_table(&dt)?; - - write( - &mut writer, - &mut dt, - tuples_to_batch(vec![(1, 2), (1, 3), (1, 4)], "2022-05-22")?, + commit_info.insert( + "operation".to_string(), + serde_json::Value::String("CREATE TABLE".to_string()), + ); + let _res = dt + .create( + table_meta.clone(), + protocol.clone(), + Some(commit_info), + None, ) .await?; - write( - &mut writer, - &mut dt, - tuples_to_batch(vec![(2, 2), (2, 3), (2, 4)], "2022-05-22")?, - ) - .await?; + Ok(Context { tmp_dir, table: dt }) +} - let version = dt.version(); - - //create the merge plan, remove a file, and execute the plan. - let filter = vec![PartitionFilter::try_from(("date", "=", "2022-05-22"))?]; - let plan = create_merge_plan(&mut dt, &filter, None)?; - - let uri = context.tmp_dir.path().to_str().to_owned().unwrap(); - let mut other_dt = deltalake::open_table(uri).await?; - let add = &other_dt.get_state().files()[0]; - let remove = Remove { - path: add.path.clone(), - deletion_timestamp: Some( - SystemTime::now() - .duration_since(UNIX_EPOCH) - .unwrap() - .as_millis() as i64, - ), - data_change: true, - extended_file_metadata: None, - size: Some(add.size), - partition_values: Some(add.partition_values.clone()), - tags: Some(HashMap::new()), - }; - - let mut transaction = other_dt.create_transaction(None); - transaction.add_action(action::Action::remove(remove)); - transaction.commit(None, None).await?; - - let maybe_metrics = plan.execute(&mut dt).await; - assert!(maybe_metrics.is_err()); - assert_eq!(dt.version(), version + 1); - Ok(()) +fn generate_random_batch>( + rows: usize, + partition: T, +) -> Result> { + let mut x_vec: Vec = Vec::with_capacity(rows); + let mut y_vec: Vec = Vec::with_capacity(rows); + let mut date_vec = Vec::with_capacity(rows); + let mut rng = rand::thread_rng(); + let s = partition.into(); + + for _ in 0..rows { + x_vec.push(rng.gen()); + y_vec.push(rng.gen()); + date_vec.push(s.clone()); } - #[tokio::test] - /// Validate that optimize succeeds when only add actions occur for a optimized partition - async fn test_no_conflict_for_append_actions() -> Result<(), Box> { - let context = setup_test(true).await?; - let mut dt = context.table; - let mut writer = RecordBatchWriter::for_table(&dt)?; - - write( - &mut writer, - &mut dt, - tuples_to_batch(vec![(1, 2), (1, 3), (1, 4)], "2022-05-22")?, - ) - .await?; - - write( - &mut writer, - &mut dt, - tuples_to_batch(vec![(2, 2), (2, 3), (2, 4)], "2022-05-22")?, - ) - .await?; - - let version = dt.version(); - - //create the merge plan, remove a file, and execute the plan. - let filter = vec![PartitionFilter::try_from(("date", "=", "2022-05-22"))?]; - let plan = create_merge_plan(&mut dt, &filter, None)?; - - let uri = context.tmp_dir.path().to_str().to_owned().unwrap(); - let mut other_dt = deltalake::open_table(uri).await?; - let mut writer = RecordBatchWriter::for_table(&other_dt)?; - write( - &mut writer, - &mut other_dt, - tuples_to_batch(vec![(3, 2), (3, 3), (3, 4)], "2022-05-22")?, - ) - .await?; + let x_array = Int32Array::from(x_vec); + let y_array = Int32Array::from(y_vec); + let date_array = StringArray::from(date_vec); + + Ok(RecordBatch::try_new( + Arc::new(ArrowSchema::new(vec![ + Field::new("x", DataType::Int32, false), + Field::new("y", DataType::Int32, false), + Field::new("date", DataType::Utf8, false), + ])), + vec![Arc::new(x_array), Arc::new(y_array), Arc::new(date_array)], + )?) +} - let metrics = plan.execute(&mut dt).await?; - assert_eq!(metrics.num_files_added, 1); - assert_eq!(metrics.num_files_removed, 2); - assert_eq!(dt.version(), version + 2); - Ok(()) +fn tuples_to_batch>( + tuples: Vec<(i32, i32)>, + partition: T, +) -> Result> { + let mut x_vec: Vec = Vec::new(); + let mut y_vec: Vec = Vec::new(); + let mut date_vec = Vec::new(); + let s = partition.into(); + + for t in tuples { + x_vec.push(t.0); + y_vec.push(t.1); + date_vec.push(s.clone()); } - #[tokio::test] - #[ignore] - /// Validate that bin packing is idempotent. - async fn test_idempotent() -> Result<(), Box> { - //TODO: Compression makes it hard to get the target file size... - //Maybe just commit files with a known size - let context = setup_test(true).await?; - let mut dt = context.table; - let mut writer = RecordBatchWriter::for_table(&dt)?; - - write( - &mut writer, - &mut dt, - generate_random_batch(records_for_size(6_000_000), "2022-05-22")?, - ) - .await?; - write( - &mut writer, - &mut dt, - generate_random_batch(records_for_size(9_000_000), "2022-05-22")?, - ) - .await?; - write( - &mut writer, - &mut dt, - generate_random_batch(records_for_size(2_000_000), "2022-05-22")?, - ) - .await?; - - let version = dt.version(); - - let mut filter = vec![]; - filter.push(PartitionFilter::try_from(("date", "=", "2022-05-22"))?); - - let optimize = Optimize::default().filter(&filter).target_size(10_000_000); - let metrics = optimize.execute(&mut dt).await?; - assert_eq!(metrics.num_files_added, 1); - assert_eq!(metrics.num_files_removed, 2); - assert_eq!(dt.version(), version + 1); - - let metrics = optimize.execute(&mut dt).await?; - - assert_eq!(metrics.num_files_added, 0); - assert_eq!(metrics.num_files_removed, 0); - assert_eq!(dt.version(), version + 1); - - Ok(()) - } + let x_array = Int32Array::from(x_vec); + let y_array = Int32Array::from(y_vec); + let date_array = StringArray::from(date_vec); + + Ok(RecordBatch::try_new( + Arc::new(ArrowSchema::new(vec![ + Field::new("x", DataType::Int32, false), + Field::new("y", DataType::Int32, false), + Field::new("date", DataType::Utf8, false), + ])), + vec![Arc::new(x_array), Arc::new(y_array), Arc::new(date_array)], + )?) +} - #[tokio::test] - /// Validate Metrics when no files are optimized - async fn test_idempotent_metrics() -> Result<(), Box> { - let context = setup_test(true).await?; - let mut dt = context.table; - let mut writer = RecordBatchWriter::for_table(&dt)?; - - write( - &mut writer, - &mut dt, - generate_random_batch(records_for_size(1_000_000), "2022-05-22")?, - ) - .await?; +fn records_for_size(size: usize) -> usize { + //12 bytes to account of overhead + size / 12 +} - let version = dt.version(); - let optimize = Optimize::default().target_size(10_000_000); - let metrics = optimize.execute(&mut dt).await?; - - let mut expected_metric_details = MetricDetails::default(); - expected_metric_details.min = 0; - expected_metric_details.max = 0; - expected_metric_details.avg = 0.0; - expected_metric_details.total_files = 0; - expected_metric_details.total_size = 0; - - let mut expected = Metrics::default(); - expected.num_files_added = 0; - expected.num_files_removed = 0; - expected.partitions_optimized = 0; - expected.num_batches = 0; - expected.total_considered_files = 1; - expected.total_files_skipped = 1; - expected.preserve_insertion_order = true; - expected.files_added = expected_metric_details.clone(); - expected.files_removed = expected_metric_details.clone(); - - assert_eq!(expected, metrics); - assert_eq!(version, dt.version()); - Ok(()) - } +#[tokio::test] +async fn test_optimize_non_partitioned_table() -> Result<(), Box> { + let context = setup_test(false).await?; + let mut dt = context.table; + let mut writer = RecordBatchWriter::for_table(&dt)?; + + write( + &mut writer, + &mut dt, + tuples_to_batch(vec![(1, 2), (1, 3), (1, 4)], "2022-05-22")?, + ) + .await?; + write( + &mut writer, + &mut dt, + tuples_to_batch(vec![(2, 1), (2, 3), (2, 3)], "2022-05-23")?, + ) + .await?; + write( + &mut writer, + &mut dt, + tuples_to_batch(vec![(3, 1), (3, 3), (3, 3)], "2022-05-22")?, + ) + .await?; + write( + &mut writer, + &mut dt, + tuples_to_batch(vec![(4, 1), (4, 3), (4, 3)], "2022-05-23")?, + ) + .await?; + write( + &mut writer, + &mut dt, + generate_random_batch(records_for_size(4_000_000), "2022-05-22")?, + ) + .await?; + + let version = dt.version(); + assert_eq!(dt.get_state().files().len(), 5); + + let optimize = Optimize::default().target_size(2_000_000); + let metrics = optimize.execute(&mut dt).await?; + + assert_eq!(version + 1, dt.version()); + assert_eq!(metrics.num_files_added, 1); + assert_eq!(metrics.num_files_removed, 4); + assert_eq!(metrics.total_considered_files, 5); + assert_eq!(metrics.partitions_optimized, 1); + assert_eq!(dt.get_state().files().len(), 2); + + Ok(()) +} - #[tokio::test] - /// Validate operation data and metadata was written - async fn test_commit_info() -> Result<(), Box> { - let context = setup_test(true).await?; - let mut dt = context.table; - let mut writer = RecordBatchWriter::for_table(&dt)?; - - write( - &mut writer, - &mut dt, - tuples_to_batch(vec![(1, 2), (1, 3), (1, 4)], "2022-05-22")?, - ) - .await?; +async fn write( + writer: &mut RecordBatchWriter, + mut table: &mut DeltaTable, + batch: RecordBatch, +) -> Result<(), DeltaTableError> { + writer.write(batch).await?; + writer.flush_and_commit(&mut table).await?; + Ok(()) +} - write( - &mut writer, - &mut dt, - tuples_to_batch(vec![(2, 2), (2, 3), (2, 4)], "2022-05-22")?, - ) - .await?; +#[tokio::test] +async fn test_optimize_with_partitions() -> Result<(), Box> { + let context = setup_test(true).await?; + let mut dt = context.table; + let mut writer = RecordBatchWriter::for_table(&dt)?; + + write( + &mut writer, + &mut dt, + tuples_to_batch(vec![(1, 2), (1, 3), (1, 4)], "2022-05-22")?, + ) + .await?; + write( + &mut writer, + &mut dt, + tuples_to_batch(vec![(2, 1), (2, 3), (2, 3)], "2022-05-23")?, + ) + .await?; + write( + &mut writer, + &mut dt, + tuples_to_batch(vec![(3, 1), (3, 3), (3, 3)], "2022-05-22")?, + ) + .await?; + write( + &mut writer, + &mut dt, + tuples_to_batch(vec![(4, 1), (4, 3), (4, 3)], "2022-05-23")?, + ) + .await?; + + let version = dt.version(); + let mut filter = vec![]; + filter.push(PartitionFilter::try_from(("date", "=", "2022-05-22"))?); + + let optimize = Optimize::default().filter(&filter); + let metrics = optimize.execute(&mut dt).await?; + + assert_eq!(version + 1, dt.version()); + assert_eq!(metrics.num_files_added, 1); + assert_eq!(metrics.num_files_removed, 2); + assert_eq!(dt.get_state().files().len(), 3); + + Ok(()) +} - let version = dt.version(); +#[tokio::test] +#[ignore] +/// Validate that optimize fails when a remove action occurs +async fn test_conflict_for_remove_actions() -> Result<(), Box> { + let context = setup_test(true).await?; + let mut dt = context.table; + let mut writer = RecordBatchWriter::for_table(&dt)?; + + write( + &mut writer, + &mut dt, + tuples_to_batch(vec![(1, 2), (1, 3), (1, 4)], "2022-05-22")?, + ) + .await?; + + write( + &mut writer, + &mut dt, + tuples_to_batch(vec![(2, 2), (2, 3), (2, 4)], "2022-05-22")?, + ) + .await?; + + let version = dt.version(); + + //create the merge plan, remove a file, and execute the plan. + let filter = vec![PartitionFilter::try_from(("date", "=", "2022-05-22"))?]; + let plan = create_merge_plan(&mut dt, &filter, None)?; + + let uri = context.tmp_dir.path().to_str().to_owned().unwrap(); + let mut other_dt = deltalake::open_table(uri).await?; + let add = &other_dt.get_state().files()[0]; + let remove = Remove { + path: add.path.clone(), + deletion_timestamp: Some( + SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap() + .as_millis() as i64, + ), + data_change: true, + extended_file_metadata: None, + size: Some(add.size), + partition_values: Some(add.partition_values.clone()), + tags: Some(HashMap::new()), + }; - let filter = vec![PartitionFilter::try_from(("date", "=", "2022-05-22"))?]; + let mut transaction = other_dt.create_transaction(None); + transaction.add_action(action::Action::remove(remove)); + transaction.commit(None, None).await?; - let optimize = Optimize::default().target_size(2_000_000).filter(&filter); - let metrics = optimize.execute(&mut dt).await?; + let maybe_metrics = plan.execute(&mut dt).await; + assert!(maybe_metrics.is_err()); + assert_eq!(dt.version(), version + 1); + Ok(()) +} - let commit_info = dt.history(None).await?; - let last_commit = &commit_info[commit_info.len() - 1]; +#[tokio::test] +/// Validate that optimize succeeds when only add actions occur for a optimized partition +async fn test_no_conflict_for_append_actions() -> Result<(), Box> { + let context = setup_test(true).await?; + let mut dt = context.table; + let mut writer = RecordBatchWriter::for_table(&dt)?; + + write( + &mut writer, + &mut dt, + tuples_to_batch(vec![(1, 2), (1, 3), (1, 4)], "2022-05-22")?, + ) + .await?; + + write( + &mut writer, + &mut dt, + tuples_to_batch(vec![(2, 2), (2, 3), (2, 4)], "2022-05-22")?, + ) + .await?; + + let version = dt.version(); + + //create the merge plan, remove a file, and execute the plan. + let filter = vec![PartitionFilter::try_from(("date", "=", "2022-05-22"))?]; + let plan = create_merge_plan(&mut dt, &filter, None)?; + + let uri = context.tmp_dir.path().to_str().to_owned().unwrap(); + let mut other_dt = deltalake::open_table(uri).await?; + let mut writer = RecordBatchWriter::for_table(&other_dt)?; + write( + &mut writer, + &mut other_dt, + tuples_to_batch(vec![(3, 2), (3, 3), (3, 4)], "2022-05-22")?, + ) + .await?; + + let metrics = plan.execute(&mut dt).await?; + assert_eq!(metrics.num_files_added, 1); + assert_eq!(metrics.num_files_removed, 2); + assert_eq!(dt.version(), version + 2); + Ok(()) +} - let commit_metrics = - serde_json::from_value::(last_commit["operationMetrics"].clone())?; +#[tokio::test] +#[ignore] +/// Validate that bin packing is idempotent. +async fn test_idempotent() -> Result<(), Box> { + //TODO: Compression makes it hard to get the target file size... + //Maybe just commit files with a known size + let context = setup_test(true).await?; + let mut dt = context.table; + let mut writer = RecordBatchWriter::for_table(&dt)?; + + write( + &mut writer, + &mut dt, + generate_random_batch(records_for_size(6_000_000), "2022-05-22")?, + ) + .await?; + write( + &mut writer, + &mut dt, + generate_random_batch(records_for_size(9_000_000), "2022-05-22")?, + ) + .await?; + write( + &mut writer, + &mut dt, + generate_random_batch(records_for_size(2_000_000), "2022-05-22")?, + ) + .await?; + + let version = dt.version(); + + let mut filter = vec![]; + filter.push(PartitionFilter::try_from(("date", "=", "2022-05-22"))?); + + let optimize = Optimize::default().filter(&filter).target_size(10_000_000); + let metrics = optimize.execute(&mut dt).await?; + assert_eq!(metrics.num_files_added, 1); + assert_eq!(metrics.num_files_removed, 2); + assert_eq!(dt.version(), version + 1); + + let metrics = optimize.execute(&mut dt).await?; + + assert_eq!(metrics.num_files_added, 0); + assert_eq!(metrics.num_files_removed, 0); + assert_eq!(dt.version(), version + 1); + + Ok(()) +} - assert_eq!(commit_metrics, metrics); - assert_eq!(last_commit["readVersion"], json!(version)); - assert_eq!( - last_commit["operationParameters"]["targetSize"], - json!(2_000_000) - ); - // TODO: Requires a string representation for PartitionFilter - assert_eq!(last_commit["operationParameters"]["predicate"], Value::Null); +#[tokio::test] +/// Validate Metrics when no files are optimized +async fn test_idempotent_metrics() -> Result<(), Box> { + let context = setup_test(true).await?; + let mut dt = context.table; + let mut writer = RecordBatchWriter::for_table(&dt)?; + + write( + &mut writer, + &mut dt, + generate_random_batch(records_for_size(1_000_000), "2022-05-22")?, + ) + .await?; + + let version = dt.version(); + let optimize = Optimize::default().target_size(10_000_000); + let metrics = optimize.execute(&mut dt).await?; + + let mut expected_metric_details = MetricDetails::default(); + expected_metric_details.min = 0; + expected_metric_details.max = 0; + expected_metric_details.avg = 0.0; + expected_metric_details.total_files = 0; + expected_metric_details.total_size = 0; + + let mut expected = Metrics::default(); + expected.num_files_added = 0; + expected.num_files_removed = 0; + expected.partitions_optimized = 0; + expected.num_batches = 0; + expected.total_considered_files = 1; + expected.total_files_skipped = 1; + expected.preserve_insertion_order = true; + expected.files_added = expected_metric_details.clone(); + expected.files_removed = expected_metric_details.clone(); + + assert_eq!(expected, metrics); + assert_eq!(version, dt.version()); + Ok(()) +} - Ok(()) - } +#[tokio::test] +/// Validate operation data and metadata was written +async fn test_commit_info() -> Result<(), Box> { + let context = setup_test(true).await?; + let mut dt = context.table; + let mut writer = RecordBatchWriter::for_table(&dt)?; + + write( + &mut writer, + &mut dt, + tuples_to_batch(vec![(1, 2), (1, 3), (1, 4)], "2022-05-22")?, + ) + .await?; + + write( + &mut writer, + &mut dt, + tuples_to_batch(vec![(2, 2), (2, 3), (2, 4)], "2022-05-22")?, + ) + .await?; + + let version = dt.version(); + + let filter = vec![PartitionFilter::try_from(("date", "=", "2022-05-22"))?]; + + let optimize = Optimize::default().target_size(2_000_000).filter(&filter); + let metrics = optimize.execute(&mut dt).await?; + + let commit_info = dt.history(None).await?; + let last_commit = &commit_info[commit_info.len() - 1]; + + let commit_metrics = + serde_json::from_value::(last_commit["operationMetrics"].clone())?; + + assert_eq!(commit_metrics, metrics); + assert_eq!(last_commit["readVersion"], json!(version)); + assert_eq!( + last_commit["operationParameters"]["targetSize"], + json!(2_000_000) + ); + // TODO: Requires a string representation for PartitionFilter + assert_eq!(last_commit["operationParameters"]["predicate"], Value::Null); + + Ok(()) } diff --git a/rust/tests/datafusion_test.rs b/rust/tests/datafusion_test.rs index 554eb41793..270262a36e 100644 --- a/rust/tests/datafusion_test.rs +++ b/rust/tests/datafusion_test.rs @@ -1,26 +1,23 @@ #![cfg(feature = "datafusion-ext")] -use arrow::{ - array::*, - datatypes::{ - DataType as ArrowDataType, Field as ArrowField, Schema as ArrowSchema, - SchemaRef as ArrowSchemaRef, - }, - record_batch::RecordBatch, -}; +use std::collections::{HashMap, HashSet}; +use std::sync::Arc; + +use deltalake::action::SaveMode; +use deltalake::operations::DeltaCommands; +use deltalake::{DeltaTable, DeltaTableMetaData, Schema}; + +use arrow::array::*; +use arrow::datatypes::{DataType as ArrowDataType, Field as ArrowField, Schema as ArrowSchema}; +use arrow::record_batch::RecordBatch; use datafusion::datasource::TableProvider; -use datafusion::error::{DataFusionError, Result}; use datafusion::execution::context::{SessionContext, TaskContext}; -use datafusion::logical_expr::Expr; -use datafusion::logical_plan::Column; -use datafusion::physical_plan::{ - coalesce_partitions::CoalescePartitionsExec, common, file_format::ParquetExec, metrics::Label, - visit_execution_plan, ExecutionPlan, ExecutionPlanVisitor, -}; -use datafusion::scalar::ScalarValue; -use deltalake::{action::SaveMode, operations::DeltaCommands, DeltaTable, DeltaTableMetaData}; -use std::collections::HashMap; -use std::{collections::HashSet, sync::Arc}; +use datafusion::physical_plan::coalesce_partitions::CoalescePartitionsExec; +use datafusion::physical_plan::{common, file_format::ParquetExec, metrics::Label}; +use datafusion::physical_plan::{visit_execution_plan, ExecutionPlan, ExecutionPlanVisitor}; +use datafusion_common::scalar::ScalarValue; +use datafusion_common::{Column, DataFusionError, Result}; +use datafusion_expr::Expr; fn get_scanned_files(node: &dyn ExecutionPlan) -> HashSet