Skip to content

Commit

Permalink
test zorder
Browse files Browse the repository at this point in the history
  • Loading branch information
wjones127 committed Jun 3, 2023
1 parent e882d50 commit 859356b
Show file tree
Hide file tree
Showing 2 changed files with 234 additions and 1 deletion.
33 changes: 33 additions & 0 deletions rust/src/operations/optimize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ use arrow_schema::ArrowError;
use futures::future::BoxFuture;
use futures::stream::BoxStream;
use futures::{Future, StreamExt, TryStreamExt};
use itertools::Itertools;
use log::debug;
use num_cpus;
use parquet::arrow::async_reader::{ParquetObjectReader, ParquetRecordBatchStreamBuilder};
Expand Down Expand Up @@ -794,6 +795,38 @@ fn build_zorder_plan(
filters: &[PartitionFilter<'_, &str>],
target_size: i64,
) -> Result<(OptimizeOperations, Metrics), DeltaTableError> {
if zorder_columns.is_empty() {
return Err(DeltaTableError::Generic(
"Z-order requires at least one column".to_string(),
));
}
let zorder_partition_cols = zorder_columns
.iter()
.filter(|col| partition_keys.contains(col))
.collect_vec();
if !zorder_partition_cols.is_empty() {
return Err(DeltaTableError::Generic(format!(
"Z-order columns cannot be partition columns. Found: {zorder_partition_cols:?}"
)));
}
let field_names = snapshot
.current_metadata()
.unwrap()
.schema
.get_fields()
.iter()
.map(|field| field.get_name().to_string())
.collect_vec();
let unknown_columns = zorder_columns
.iter()
.filter(|col| !field_names.contains(col))
.collect_vec();
if !unknown_columns.is_empty() {
return Err(DeltaTableError::Generic(
format!("Z-order columns must be present in the table schema. Unknown columns: {unknown_columns:?}"),
));
}

// For now, just be naive and optimize all files in each selected partition.
let mut metrics = Metrics::default();

Expand Down
202 changes: 201 additions & 1 deletion rust/tests/command_optimize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,17 @@ use arrow::{
datatypes::{DataType, Field},
record_batch::RecordBatch,
};
use arrow_select::concat::concat_batches;
use deltalake::action::{Action, Remove};
use deltalake::operations::optimize::{create_merge_plan, MetricDetails, Metrics, OptimizeType};
use deltalake::operations::DeltaOps;
use deltalake::storage::ObjectStoreRef;
use deltalake::writer::{DeltaWriter, RecordBatchWriter};
use deltalake::{DeltaTable, DeltaTableError, PartitionFilter, SchemaDataType, SchemaField};
use deltalake::{DeltaTable, DeltaTableError, PartitionFilter, Path, SchemaDataType, SchemaField};
use futures::TryStreamExt;
use object_store::ObjectStore;
use parquet::arrow::async_reader::ParquetObjectReader;
use parquet::arrow::ParquetRecordBatchStreamBuilder;
use parquet::file::properties::WriterProperties;
use rand::prelude::*;
use serde_json::json;
Expand Down Expand Up @@ -497,3 +503,197 @@ async fn test_commit_info() -> Result<(), Box<dyn Error>> {

Ok(())
}

#[tokio::test]
async fn test_zorder_rejects_zero_columns() -> Result<(), Box<dyn Error>> {
let context = setup_test(true).await?;
let dt = context.table;

// Rejects zero columns
let result = DeltaOps(dt)
.optimize()
.with_type(OptimizeType::ZOrder(vec![]))
.await;
assert!(result.is_err());
assert!(result
.unwrap_err()
.to_string()
.contains("Z-order requires at least one column"));
Ok(())
}

#[tokio::test]
async fn test_zorder_rejects_nonexistent_columns() -> Result<(), Box<dyn Error>> {
let context = setup_test(true).await?;
let dt = context.table;

// Rejects non-existent columns
let result = DeltaOps(dt)
.optimize()
.with_type(OptimizeType::ZOrder(vec!["non-existent".to_string()]))
.await;
assert!(result.is_err());
assert!(result.unwrap_err().to_string().contains(
"Z-order columns must be present in the table schema. Unknown columns: [\"non-existent\"]"
));
Ok(())
}

#[tokio::test]
async fn test_zorder_rejects_partition_column() -> Result<(), Box<dyn Error>> {
let context = setup_test(true).await?;
let mut dt = context.table;
let mut writer = RecordBatchWriter::for_table(&dt)?;

write(
&mut writer,
&mut dt,
tuples_to_batch(vec![(1, 2), (1, 3), (1, 4)], "2022-05-22")?,
)
.await?;
// Rejects partition columns
let result = DeltaOps(dt)
.optimize()
.with_type(OptimizeType::ZOrder(vec!["date".to_string()]))
.await;
assert!(result.is_err());
assert!(result
.unwrap_err()
.to_string()
.contains("Z-order columns cannot be partition columns. Found: [\"date\"]"));

Ok(())
}

#[tokio::test]
async fn test_zorder_unpartitioned() -> Result<(), Box<dyn Error>> {
let context = setup_test(false).await?;
let mut dt = context.table;
let mut writer = RecordBatchWriter::for_table(&dt)?;

write(
&mut writer,
&mut dt,
tuples_to_batch(vec![(1, 1), (1, 2), (1, 2)], "1970-01-01")?,
)
.await?;

write(
&mut writer,
&mut dt,
tuples_to_batch(vec![(2, 1), (2, 2), (1, 2)], "1970-01-04")?,
)
.await?;

let optimize = DeltaOps(dt).optimize().with_type(OptimizeType::ZOrder(vec![
"date".to_string(),
"x".to_string(),
"y".to_string(),
]));
let (dt, metrics) = optimize.await?;

assert_eq!(metrics.num_files_added, 1);
assert_eq!(metrics.num_files_removed, 2);
assert_eq!(metrics.total_files_skipped, 0);
assert_eq!(metrics.total_considered_files, 2);

// Check data
let files = dt.get_files();
assert_eq!(files.len(), 1);

let actual = read_parquet_file(&files[0], dt.object_store()).await?;
let expected = RecordBatch::try_new(
actual.schema(),
// Note that the order is not hierarchically sorted.
vec![
Arc::new(Int32Array::from(vec![1, 2, 1, 1, 1, 2])),
Arc::new(Int32Array::from(vec![1, 1, 2, 2, 2, 2])),
Arc::new(StringArray::from(vec![
"1970-01-01",
"1970-01-04",
"1970-01-01",
"1970-01-01",
"1970-01-04",
"1970-01-04",
])),
],
)?;

assert_eq!(actual, expected);

Ok(())
}

#[tokio::test]
async fn test_zorder_partitioned() -> Result<(), Box<dyn Error>> {
let context = setup_test(true).await?;
let mut dt = context.table;
let mut writer = RecordBatchWriter::for_table(&dt)?;

// Write data in sorted order. Each value is a power of 2, so will affect
// a new bit in the z-ordering.
write(
&mut writer,
&mut dt,
tuples_to_batch(vec![(1, 1), (1, 2), (1, 4)], "2022-05-22")?,
)
.await?;

write(
&mut writer,
&mut dt,
tuples_to_batch(vec![(2, 1), (2, 2), (2, 4)], "2022-05-22")?,
)
.await?;

// This batch doesn't matter; we just use it to test partition filtering
write(
&mut writer,
&mut dt,
tuples_to_batch(vec![(1, 1), (1, 1), (1, 1)], "2022-05-23")?,
)
.await?;

let filter = vec![PartitionFilter::try_from(("date", "=", "2022-05-22"))?];

let optimize = DeltaOps(dt)
.optimize()
.with_type(OptimizeType::ZOrder(vec!["x".to_string(), "y".to_string()]))
.with_filters(&filter);
let (dt, metrics) = optimize.await?;

assert_eq!(metrics.num_files_added, 1);
assert_eq!(metrics.num_files_removed, 2);

// Check data
let files = dt.get_files_by_partitions(&filter)?;
assert_eq!(files.len(), 1);

let actual = read_parquet_file(&files[0], dt.object_store()).await?;
let expected = RecordBatch::try_new(
actual.schema(),
// Note that the order is not hierarchically sorted.
vec![
Arc::new(Int32Array::from(vec![1, 2, 1, 2, 1, 2])),
Arc::new(Int32Array::from(vec![1, 1, 2, 2, 4, 4])),
],
)?;

assert_eq!(actual, expected);

Ok(())
}

async fn read_parquet_file(
path: &Path,
object_store: ObjectStoreRef,
) -> Result<RecordBatch, Box<dyn Error>> {
let file = object_store.head(path).await?;
let file_reader = ParquetObjectReader::new(object_store, file);
let batches = ParquetRecordBatchStreamBuilder::new(file_reader)
.await?
.build()?
.try_collect::<Vec<_>>()
.await?;
Ok(concat_batches(&batches[0].schema(), &batches)?)
}

0 comments on commit 859356b

Please sign in to comment.