Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: merge predicate for concurrent writes #2291

Merged
merged 7 commits into from
Mar 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
183 changes: 158 additions & 25 deletions crates/core/src/operations/merge/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -866,10 +866,6 @@ async fn try_construct_early_filter(
let table_metadata = table_snapshot.metadata();
let partition_columns = &table_metadata.partition_columns;

if partition_columns.is_empty() {
return Ok(None);
}

let mut placeholders = HashMap::default();

match generalize_filter(
Expand Down Expand Up @@ -1015,16 +1011,15 @@ async fn execute(

let state = state.with_query_planner(Arc::new(MergePlanner {}));

let target = {
// Attempt to construct an early filter that we can apply to the Add action list and the delta scan.
// In the case where there are partition columns in the join predicate, we can scan the source table
// to get the distinct list of partitions affected and constrain the search to those.

if !not_match_source_operations.is_empty() {
// It's only worth trying to create an early filter where there are no `when_not_matched_source` operators, since
// that implies a full scan
target
} else if let Some(filter) = try_construct_early_filter(
// Attempt to construct an early filter that we can apply to the Add action list and the delta scan.
// In the case where there are partition columns in the join predicate, we can scan the source table
// to get the distinct list of partitions affected and constrain the search to those.
let target_subset_filter = if !not_match_source_operations.is_empty() {
// It's only worth trying to create an early filter where there are no `when_not_matched_source` operators, since
// that implies a full scan
None
} else {
try_construct_early_filter(
predicate.clone(),
snapshot,
&state,
Expand All @@ -1033,10 +1028,11 @@ async fn execute(
&target_name,
)
.await?
{
LogicalPlan::Filter(Filter::try_new(filter, target.into())?)
} else {
target
};
let target = match target_subset_filter.as_ref() {
None => target,
Some(subset_filter) => {
LogicalPlan::Filter(Filter::try_new(subset_filter.clone(), target.into())?)
}
};

Expand Down Expand Up @@ -1424,9 +1420,21 @@ async fn execute(
app_metadata.insert("operationMetrics".to_owned(), map);
}

// Predicate will be used for conflict detection
let commit_predicate = match target_subset_filter {
None => None, // No predicate means it's a full table merge
Some(some_filter) => {
let predict_expr = match target_alias {
None => some_filter,
Some(alias) => remove_table_alias(some_filter, alias),
};
Some(fmt_expr_to_sql(&predict_expr)?)
}
};
// Do not make a commit when there are zero updates to the state
let operation = DeltaOperation::Merge {
predicate: Some(fmt_expr_to_sql(&predicate)?),
predicate: commit_predicate,
merge_predicate: Some(fmt_expr_to_sql(&predicate)?),
matched_predicates: match_operations,
not_matched_predicates: not_match_target_operations,
not_matched_by_source_predicates: not_match_source_operations,
Expand All @@ -1450,6 +1458,21 @@ async fn execute(
))
}

fn remove_table_alias(expr: Expr, table_alias: String) -> Expr {
expr.transform(&|expr| match expr {
Expr::Column(c) => match c.relation {
Some(rel) if rel.table() == table_alias => Ok(Transformed::Yes(Expr::Column(
Column::new_unqualified(c.name),
))),
_ => Ok(Transformed::No(Expr::Column(Column::new(
c.relation, c.name,
)))),
},
_ => Ok(Transformed::No(expr)),
})
.unwrap()
}

// TODO: Abstract MergePlanner into DeltaPlanner to support other delta operations in the future.
struct MergePlanner {}

Expand Down Expand Up @@ -1549,6 +1572,7 @@ mod tests {
use datafusion_expr::LogicalPlanBuilder;
use datafusion_expr::Operator;
use itertools::Itertools;
use regex::Regex;
use serde_json::json;
use std::collections::HashMap;
use std::ops::Neg;
Expand Down Expand Up @@ -1699,7 +1723,8 @@ mod tests {
let commit_info = table.history(None).await.unwrap();
let last_commit = &commit_info[0];
let parameters = last_commit.operation_parameters.clone().unwrap();
assert_eq!(parameters["predicate"], json!("target.id = source.id"));
assert!(!parameters.contains_key("predicate"));
assert_eq!(parameters["mergePredicate"], json!("target.id = source.id"));
assert_eq!(
parameters["matchedPredicates"],
json!(r#"[{"actionType":"update"}]"#)
Expand Down Expand Up @@ -1751,7 +1776,8 @@ mod tests {
let commit_info = table.history(None).await.unwrap();
let last_commit = &commit_info[0];
let parameters = last_commit.operation_parameters.clone().unwrap();
assert_eq!(parameters["predicate"], json!("target.id = source.id"));
assert!(!parameters.contains_key("predicate"));
assert_eq!(parameters["mergePredicate"], json!("target.id = source.id"));
assert_eq!(
parameters["matchedPredicates"],
json!(r#"[{"actionType":"update"}]"#)
Expand Down Expand Up @@ -1955,6 +1981,15 @@ mod tests {
assert_eq!(metrics.num_output_rows, 6);
assert_eq!(metrics.num_source_rows, 3);

let commit_info = table.history(None).await.unwrap();
let last_commit = &commit_info[0];
let parameters = last_commit.operation_parameters.clone().unwrap();
assert!(!parameters.contains_key("predicate"));
assert_eq!(
parameters["mergePredicate"],
"target.id = source.id AND target.modified = '2021-02-02'"
);

let expected = vec![
"+----+-------+------------+",
"| id | value | modified |",
Expand All @@ -1971,6 +2006,65 @@ mod tests {
assert_batches_sorted_eq!(&expected, &actual);
}

#[tokio::test]
async fn test_merge_partition_filtered() {
let schema = get_arrow_schema(&None);
let table = setup_table(Some(vec!["modified"])).await;
let table = write_data(table, &schema).await;
assert_eq!(table.version(), 1);

let ctx = SessionContext::new();
let batch = RecordBatch::try_new(
Arc::clone(&schema),
vec![
Arc::new(arrow::array::StringArray::from(vec!["B", "C"])),
Arc::new(arrow::array::Int32Array::from(vec![10, 20])),
Arc::new(arrow::array::StringArray::from(vec![
"2021-02-02",
"2021-02-02",
])),
],
)
.unwrap();
let source = ctx.read_batch(batch).unwrap();

let (table, _metrics) = DeltaOps(table)
.merge(
source,
col("target.id")
.eq(col("source.id"))
.and(col("target.modified").eq(lit("2021-02-02"))),
)
.with_source_alias("source")
.with_target_alias("target")
.when_matched_update(|update| {
update
.update("value", col("source.value"))
.update("modified", col("source.modified"))
})
.unwrap()
.when_not_matched_insert(|insert| {
insert
.set("id", col("source.id"))
.set("value", col("source.value"))
.set("modified", col("source.modified"))
})
.unwrap()
.await
.unwrap();

assert_eq!(table.version(), 2);

let commit_info = table.history(None).await.unwrap();
let last_commit = &commit_info[0];
let parameters = last_commit.operation_parameters.clone().unwrap();
assert_eq!(parameters["predicate"], "modified = '2021-02-02'");
assert_eq!(
parameters["mergePredicate"],
"target.id = source.id AND target.modified = '2021-02-02'"
);
}

#[tokio::test]
async fn test_merge_partitions_skipping() {
/* Validate the join predicate can be used for skipping partitions */
Expand Down Expand Up @@ -2028,6 +2122,13 @@ mod tests {
assert_eq!(metrics.num_output_rows, 3);
assert_eq!(metrics.num_source_rows, 3);

let commit_info = table.history(None).await.unwrap();
let last_commit = &commit_info[0];
let parameters = last_commit.operation_parameters.clone().unwrap();
let predicate = parameters["predicate"].as_str().unwrap();
let re = Regex::new(r"^id = '(C|X|B)' OR id = '(C|X|B)' OR id = '(C|X|B)'$").unwrap();
assert!(re.is_match(predicate));

let expected = vec![
"+-------+------------+----+",
"| value | modified | id |",
Expand Down Expand Up @@ -2098,7 +2199,8 @@ mod tests {
extra_info["operationMetrics"],
serde_json::to_value(&metrics).unwrap()
);
assert_eq!(parameters["predicate"], json!("target.id = source.id"));
assert!(!parameters.contains_key("predicate"));
assert_eq!(parameters["mergePredicate"], json!("target.id = source.id"));
assert_eq!(
parameters["matchedPredicates"],
json!(r#"[{"actionType":"delete"}]"#)
Expand Down Expand Up @@ -2162,7 +2264,7 @@ mod tests {
let commit_info = table.history(None).await.unwrap();
let last_commit = &commit_info[0];
let parameters = last_commit.operation_parameters.clone().unwrap();
assert_eq!(parameters["predicate"], json!("target.id = source.id"));
assert_eq!(parameters["mergePredicate"], json!("target.id = source.id"));
assert_eq!(
parameters["matchedPredicates"],
json!(r#"[{"actionType":"delete","predicate":"source.value <= 10"}]"#)
Expand Down Expand Up @@ -2231,7 +2333,8 @@ mod tests {
let commit_info = table.history(None).await.unwrap();
let last_commit = &commit_info[0];
let parameters = last_commit.operation_parameters.clone().unwrap();
assert_eq!(parameters["predicate"], json!("target.id = source.id"));
assert!(!parameters.contains_key("predicate"));
assert_eq!(parameters["mergePredicate"], json!("target.id = source.id"));
assert_eq!(
parameters["notMatchedBySourcePredicates"],
json!(r#"[{"actionType":"delete"}]"#)
Expand Down Expand Up @@ -2295,7 +2398,7 @@ mod tests {
let commit_info = table.history(None).await.unwrap();
let last_commit = &commit_info[0];
let parameters = last_commit.operation_parameters.clone().unwrap();
assert_eq!(parameters["predicate"], json!("target.id = source.id"));
assert_eq!(parameters["mergePredicate"], json!("target.id = source.id"));
assert_eq!(
parameters["notMatchedBySourcePredicates"],
json!(r#"[{"actionType":"delete","predicate":"target.modified > '2021-02-01'"}]"#)
Expand Down Expand Up @@ -2374,6 +2477,11 @@ mod tests {
assert_eq!(metrics.num_output_rows, 3);
assert_eq!(metrics.num_source_rows, 3);

let commit_info = table.history(None).await.unwrap();
let last_commit = &commit_info[0];
let parameters = last_commit.operation_parameters.clone().unwrap();
assert_eq!(parameters["predicate"], json!("modified = '2021-02-02'"));

let expected = vec![
"+----+-------+------------+",
"| id | value | modified |",
Expand Down Expand Up @@ -2612,6 +2720,31 @@ mod tests {
assert_eq!(generalized, expected_filter);
}

#[tokio::test]
async fn test_generalize_filter_keeps_only_static_target_references() {
let source = TableReference::parse_str("source");
let target = TableReference::parse_str("target");

let parsed_filter = col(Column::new(source.clone().into(), "id"))
.eq(col(Column::new(target.clone().into(), "id")))
.and(col(Column::new(target.clone().into(), "id")).eq(lit("C")));

let mut placeholders = HashMap::default();

let generalized = generalize_filter(
parsed_filter,
&vec!["other".to_owned()],
&source,
&target,
&mut placeholders,
)
.unwrap();

let expected_filter = col(Column::new(target.clone().into(), "id")).eq(lit("C"));

assert_eq!(generalized, expected_filter);
}

#[tokio::test]
async fn test_generalize_filter_removes_source_references() {
let source = TableReference::parse_str("source");
Expand Down
3 changes: 1 addition & 2 deletions crates/core/src/operations/transaction/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -493,8 +493,7 @@ impl<'a> std::future::IntoFuture for PreparedCommit<'a> {
read_snapshot,
this.data.operation.read_predicate(),
&this.data.actions,
// TODO allow tainting whole table
false,
this.data.operation.read_whole_table(),
)?;
let conflict_checker = ConflictChecker::new(
transaction_info,
Expand Down
10 changes: 6 additions & 4 deletions crates/core/src/protocol/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -380,9 +380,12 @@ pub enum DeltaOperation {
/// Merge data with a source data with the following predicate
#[serde(rename_all = "camelCase")]
Merge {
/// The merge predicate
/// Cleaned merge predicate for conflict checks
predicate: Option<String>,

/// The original merge predicate
merge_predicate: Option<String>,

/// Match operations performed
matched_predicates: Vec<MergePredicate>,

Expand Down Expand Up @@ -541,9 +544,8 @@ impl DeltaOperation {
/// Denotes if the operation reads the entire table
pub fn read_whole_table(&self) -> bool {
match self {
// TODO just adding one operation example, as currently none of the
// implemented operations scan the entire table.
Self::Write { predicate, .. } if predicate.is_none() => false,
// Predicate is none -> Merge operation had to join full source and target
Self::Merge { predicate, .. } if predicate.is_none() => true,
_ => false,
}
}
Expand Down
Loading
Loading