Skip to content

Commit

Permalink
Add concurrent merge tests
Browse files Browse the repository at this point in the history
  • Loading branch information
Jonas Schmitz authored and ion-elgreco committed Mar 18, 2024
1 parent 60ee37b commit c20e505
Show file tree
Hide file tree
Showing 2 changed files with 274 additions and 4 deletions.
30 changes: 26 additions & 4 deletions crates/core/src/operations/merge/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -866,10 +866,6 @@ async fn try_construct_early_filter(
let table_metadata = table_snapshot.metadata();
let partition_columns = &table_metadata.partition_columns;

if partition_columns.is_empty() {
return Ok(None);
}

let mut placeholders = HashMap::default();

match generalize_filter(
Expand Down Expand Up @@ -2724,6 +2720,32 @@ mod tests {
assert_eq!(generalized, expected_filter);
}


#[tokio::test]
async fn test_generalize_filter_keeps_only_static_target_references() {
let source = TableReference::parse_str("source");
let target = TableReference::parse_str("target");

let parsed_filter = col(Column::new(source.clone().into(), "id"))
.eq(col(Column::new(target.clone().into(), "id")))
.and(col(Column::new(target.clone().into(), "id")).eq(lit("C")));

let mut placeholders = HashMap::default();

let generalized = generalize_filter(
parsed_filter,
&vec!["other".to_owned()],
&source,
&target,
&mut placeholders,
)
.unwrap();

let expected_filter = col(Column::new(target.clone().into(), "id")).eq(lit("C"));

assert_eq!(generalized, expected_filter);
}

#[tokio::test]
async fn test_generalize_filter_removes_source_references() {
let source = TableReference::parse_str("source");
Expand Down
248 changes: 248 additions & 0 deletions crates/core/tests/command_merge.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,248 @@
#![allow(dead_code)]
mod fs_common;

use arrow_array::RecordBatch;
use arrow_schema::{DataType, Field, Schema as ArrowSchema};
use datafusion::dataframe::DataFrame;
use datafusion::prelude::SessionContext;
use datafusion_expr::{col, Expr, lit};
use deltalake_core::kernel::{DataType as DeltaDataType, PrimitiveType, StructField, StructType};
use deltalake_core::operations::merge::MergeMetrics;
use deltalake_core::operations::transaction::TransactionError;
use deltalake_core::protocol::SaveMode;
use deltalake_core::{open_table, DeltaOps, DeltaResult, DeltaTable, DeltaTableError};
use std::sync::Arc;
use datafusion_common::Column;

async fn create_table(table_uri: &String, partition: Option<Vec<&str>>) -> DeltaTable {
let table_schema = get_delta_schema();
let ops = DeltaOps::try_from_uri(table_uri.clone()).await.unwrap();
let table = ops
.create()
.with_columns(table_schema.fields().clone())
.with_partition_columns(partition.unwrap_or_default())
.await
.expect("Failed to create table");

let schema = get_arrow_schema();
return write_data(table, &schema).await;
}

fn get_delta_schema() -> StructType {
StructType::new(vec![
StructField::new(
"id".to_string(),
DeltaDataType::Primitive(PrimitiveType::String),
true,
),
StructField::new(
"value".to_string(),
DeltaDataType::Primitive(PrimitiveType::Integer),
true,
),
StructField::new(
"event_date".to_string(),
DeltaDataType::Primitive(PrimitiveType::String),
true,
),
])
}

fn get_arrow_schema() -> Arc<ArrowSchema> {
return Arc::new(ArrowSchema::new(vec![
Field::new("id", DataType::Utf8, true),
Field::new("value", DataType::Int32, true),
Field::new("event_date", DataType::Utf8, true),
]));
}

async fn write_data(table: DeltaTable, schema: &Arc<ArrowSchema>) -> DeltaTable {
let batch = RecordBatch::try_new(
Arc::clone(schema),
vec![
Arc::new(arrow::array::StringArray::from(vec!["A", "B", "C", "D"])),
Arc::new(arrow::array::Int32Array::from(vec![1, 10, 10, 100])),
Arc::new(arrow::array::StringArray::from(vec![
"2021-02-01",
"2021-02-01",
"2021-02-02",
"2021-02-02",
])),
],
)
.unwrap();
// write some data
DeltaOps(table)
.write(vec![batch.clone()])
.with_save_mode(SaveMode::Append)
.await
.unwrap()
}

fn create_test_data() -> (DataFrame, DataFrame) {
let schema = get_arrow_schema();
let ctx = SessionContext::new();
let batch = RecordBatch::try_new(
Arc::clone(&schema),
vec![
Arc::new(arrow::array::StringArray::from(vec!["C", "D"])),
Arc::new(arrow::array::Int32Array::from(vec![10, 20])),
Arc::new(arrow::array::StringArray::from(vec![
"2021-02-02",
"2021-02-02",
])),
],
)
.unwrap();
let df1 = ctx.read_batch(batch).unwrap();
let batch = RecordBatch::try_new(
Arc::clone(&schema),
vec![
Arc::new(arrow::array::StringArray::from(vec!["E", "F"])),
Arc::new(arrow::array::Int32Array::from(vec![10, 20])),
Arc::new(arrow::array::StringArray::from(vec![
"2021-02-03",
"2021-02-03",
])),
],
)
.unwrap();
let df2 = ctx.read_batch(batch).unwrap();
return (df1, df2);
}

async fn merge(
table: DeltaTable,
df: DataFrame,
predicate: Expr,
) -> DeltaResult<(DeltaTable, MergeMetrics)> {
return DeltaOps(table)
.merge(df, predicate)
.with_source_alias("source")
.with_target_alias("target")
.when_matched_update(|update| {
update
.update("value", col("source.value"))
.update("event_date", col("source.event_date"))
})
.unwrap()
.when_not_matched_insert(|insert| {
insert
.set("id", col("source.id"))
.set("value", col("source.value"))
.set("event_date", col("source.event_date"))
})
.unwrap()
.await;
}

#[tokio::test]
async fn test_merge_concurrent_conflict() {
// No partition key or filter predicate -> Commit conflict
let tmp_dir = tempfile::tempdir().unwrap();
let table_uri = tmp_dir.path().to_str().to_owned().unwrap();

let table_ref1 = create_table(&table_uri.to_string(), Some(vec!["event_date"])).await;
let table_ref2 = open_table(table_uri).await.unwrap();
let (df1, df2) = create_test_data();

let expr = col("target.id").eq(col("source.id"));
let (_table_ref1, _metrics) = merge(table_ref1, df1, expr.clone()).await.unwrap();
let result = merge(table_ref2, df2, expr).await;

assert!(matches!(
result.as_ref().unwrap_err(),
DeltaTableError::Transaction { .. }
));
if let DeltaTableError::Transaction { source } = result.unwrap_err() {
assert!(matches!(source, TransactionError::CommitConflict(_)));
}
}

#[tokio::test]
async fn test_merge_concurrent_different_partition() {
// partition key in predicate -> Successful merge
let tmp_dir = tempfile::tempdir().unwrap();
let table_uri = tmp_dir.path().to_str().to_owned().unwrap();

let table_ref1 = create_table(&table_uri.to_string(), Some(vec!["event_date"])).await;
let table_ref2 = open_table(table_uri).await.unwrap();
let (df1, df2) = create_test_data();

let expr = col("target.id")
.eq(col("source.id"))
.and(col("target.event_date").eq(col("source.event_date")));
let (_table_ref1, _metrics) = merge(table_ref1, df1, expr.clone()).await.unwrap();
let result = merge(table_ref2, df2, expr).await;

// TODO: Currently it throws a Version mismatch error, but the merge commit was successfully
// This bug needs to be fixed, see pull request #2280
assert!(!matches!(
result.as_ref().unwrap_err(),
DeltaTableError::Transaction { .. }
));
assert!(matches!(
result.as_ref().unwrap_err(),
DeltaTableError::Generic(_)
));
if let DeltaTableError::Generic(msg) = result.unwrap_err() {
assert_eq!(msg, "Version mismatch");
}
}


#[tokio::test]
async fn test_merge_concurrent_no_overlapping_files() {
// predicate contains filter and files are not overlapping -> No conflict
let tmp_dir = tempfile::tempdir().unwrap();
let table_uri = tmp_dir.path().to_str().to_owned().unwrap();

let table_ref1 = create_table(&table_uri.to_string(), None).await;
let table_ref2 = open_table(table_uri).await.unwrap();
let (df1, df2) = create_test_data();

let expr = col("target.id")
.eq(col("source.id"));
let (_table_ref1, _metrics) = merge(table_ref1, df2, expr.clone().and(col(Column::from_qualified_name("target.event_date")).eq(lit("2021-02-03")))).await.unwrap();
let result = merge(table_ref2, df1, expr.and(col(Column::from_qualified_name("target.event_date")).eq(lit("2021-02-02")))).await;

// TODO: Currently it throws a Version mismatch error, but the merge commit was successfully
// This bug needs to be fixed, see pull request #2280
assert!(!matches!(
result.as_ref().unwrap_err(),
DeltaTableError::Transaction { .. }
));
assert!(matches!(
result.as_ref().unwrap_err(),
DeltaTableError::Generic(_)
));
if let DeltaTableError::Generic(msg) = result.unwrap_err() {
assert_eq!(msg, "Version mismatch");
}
}


#[tokio::test]
async fn test_merge_concurrent_with_overlapping_files() {
// predicate contains filter and files are overlapping -> Commit conflict
let tmp_dir = tempfile::tempdir().unwrap();
let table_uri = tmp_dir.path().to_str().to_owned().unwrap();

let table_ref1 = create_table(&table_uri.to_string(), None).await;
let table_ref2 = open_table(table_uri).await.unwrap();
let (df1, _df2) = create_test_data();

let expr = col("target.id")
.eq(col("source.id"));
let (_table_ref1, _metrics) = merge(table_ref1, df1.clone(), expr.clone().and(col(Column::from_qualified_name("target.event_date")).lt_eq(lit("2021-02-02")))).await.unwrap();
let result = merge(table_ref2, df1, expr.and(col(Column::from_qualified_name("target.event_date")).eq(lit("2021-02-02")))).await;


assert!(matches!(
result.as_ref().unwrap_err(),
DeltaTableError::Transaction { .. }
));
if let DeltaTableError::Transaction { source } = result.unwrap_err() {
assert!(matches!(source, TransactionError::CommitConflict(_)));
}
}

0 comments on commit c20e505

Please sign in to comment.