Skip to content

Commit

Permalink
Format code
Browse files Browse the repository at this point in the history
  • Loading branch information
Jonas Schmitz committed Mar 16, 2024
1 parent 0934c38 commit 133bb69
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 15 deletions.
1 change: 0 additions & 1 deletion crates/core/src/operations/merge/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2720,7 +2720,6 @@ mod tests {
assert_eq!(generalized, expected_filter);
}


#[tokio::test]
async fn test_generalize_filter_keeps_only_static_target_references() {
let source = TableReference::parse_str("source");
Expand Down
47 changes: 33 additions & 14 deletions crates/core/tests/command_merge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,14 @@ use arrow_array::RecordBatch;
use arrow_schema::{DataType, Field, Schema as ArrowSchema};
use datafusion::dataframe::DataFrame;
use datafusion::prelude::SessionContext;
use datafusion_expr::{col, Expr, lit};
use datafusion_common::Column;
use datafusion_expr::{col, lit, Expr};
use deltalake_core::kernel::{DataType as DeltaDataType, PrimitiveType, StructField, StructType};
use deltalake_core::operations::merge::MergeMetrics;
use deltalake_core::operations::transaction::TransactionError;
use deltalake_core::protocol::SaveMode;
use deltalake_core::{open_table, DeltaOps, DeltaResult, DeltaTable, DeltaTableError};
use std::sync::Arc;
use datafusion_common::Column;

async fn create_table(table_uri: &String, partition: Option<Vec<&str>>) -> DeltaTable {
let table_schema = get_delta_schema();
Expand Down Expand Up @@ -190,7 +190,6 @@ async fn test_merge_concurrent_different_partition() {
}
}


#[tokio::test]
async fn test_merge_concurrent_no_overlapping_files() {
// predicate contains filter and files are not overlapping -> No conflict
Expand All @@ -201,10 +200,21 @@ async fn test_merge_concurrent_no_overlapping_files() {
let table_ref2 = open_table(table_uri).await.unwrap();
let (df1, df2) = create_test_data();

let expr = col("target.id")
.eq(col("source.id"));
let (_table_ref1, _metrics) = merge(table_ref1, df2, expr.clone().and(col(Column::from_qualified_name("target.event_date")).eq(lit("2021-02-03")))).await.unwrap();
let result = merge(table_ref2, df1, expr.and(col(Column::from_qualified_name("target.event_date")).eq(lit("2021-02-02")))).await;
let expr = col("target.id").eq(col("source.id"));
let (_table_ref1, _metrics) = merge(
table_ref1,
df2,
expr.clone()
.and(col(Column::from_qualified_name("target.event_date")).eq(lit("2021-02-03"))),
)
.await
.unwrap();
let result = merge(
table_ref2,
df1,
expr.and(col(Column::from_qualified_name("target.event_date")).eq(lit("2021-02-02"))),
)
.await;

// TODO: Currently it throws a Version mismatch error, but the merge commit was successfully
// This bug needs to be fixed, see pull request #2280
Expand All @@ -221,7 +231,6 @@ async fn test_merge_concurrent_no_overlapping_files() {
}
}


#[tokio::test]
async fn test_merge_concurrent_with_overlapping_files() {
// predicate contains filter and files are overlapping -> Commit conflict
Expand All @@ -232,11 +241,21 @@ async fn test_merge_concurrent_with_overlapping_files() {
let table_ref2 = open_table(table_uri).await.unwrap();
let (df1, _df2) = create_test_data();

let expr = col("target.id")
.eq(col("source.id"));
let (_table_ref1, _metrics) = merge(table_ref1, df1.clone(), expr.clone().and(col(Column::from_qualified_name("target.event_date")).lt_eq(lit("2021-02-02")))).await.unwrap();
let result = merge(table_ref2, df1, expr.and(col(Column::from_qualified_name("target.event_date")).eq(lit("2021-02-02")))).await;

let expr = col("target.id").eq(col("source.id"));
let (_table_ref1, _metrics) = merge(
table_ref1,
df1.clone(),
expr.clone()
.and(col(Column::from_qualified_name("target.event_date")).lt_eq(lit("2021-02-02"))),
)
.await
.unwrap();
let result = merge(
table_ref2,
df1,
expr.and(col(Column::from_qualified_name("target.event_date")).eq(lit("2021-02-02"))),
)
.await;

assert!(matches!(
result.as_ref().unwrap_err(),
Expand All @@ -245,4 +264,4 @@ async fn test_merge_concurrent_with_overlapping_files() {
if let DeltaTableError::Transaction { source } = result.unwrap_err() {
assert!(matches!(source, TransactionError::CommitConflict(_)));
}
}
}

0 comments on commit 133bb69

Please sign in to comment.