diff --git a/python/src/lib.rs b/python/src/lib.rs index 52621492b9..4f5b7ba293 100644 --- a/python/src/lib.rs +++ b/python/src/lib.rs @@ -467,6 +467,7 @@ impl RawDeltaTable { ))] pub fn merge_execute( &mut self, + py: Python, source: PyArrowType, predicate: String, source_alias: Option, @@ -485,152 +486,158 @@ impl RawDeltaTable { not_matched_by_source_delete_predicate: Option>, not_matched_by_source_delete_all: Option, ) -> PyResult { - let ctx = SessionContext::new(); - let schema = source.0.schema(); - let batches = vec![source.0.map(|batch| batch.unwrap()).collect::>()]; - let table_provider: Arc = - Arc::new(MemTable::try_new(schema, batches).unwrap()); - let source_df = ctx.read_table(table_provider).unwrap(); - - let mut cmd = MergeBuilder::new( - self._table.log_store(), - self._table.state.clone(), - predicate, - source_df, - ) - .with_safe_cast(safe_cast); - - if let Some(src_alias) = source_alias { - cmd = cmd.with_source_alias(src_alias); - } + py.allow_threads(|| { + let ctx = SessionContext::new(); + let schema = source.0.schema(); + let batches = vec![source.0.map(|batch| batch.unwrap()).collect::>()]; + let table_provider: Arc = + Arc::new(MemTable::try_new(schema, batches).unwrap()); + let source_df = ctx.read_table(table_provider).unwrap(); + + let mut cmd = MergeBuilder::new( + self._table.log_store(), + self._table.state.clone(), + predicate, + source_df, + ) + .with_safe_cast(safe_cast); - if let Some(trgt_alias) = target_alias { - cmd = cmd.with_target_alias(trgt_alias); - } + if let Some(src_alias) = source_alias { + cmd = cmd.with_source_alias(src_alias); + } - if let Some(writer_props) = writer_properties { - cmd = cmd.with_writer_properties( - set_writer_properties(writer_props).map_err(PythonError::from)?, - ); - } + if let Some(trgt_alias) = target_alias { + cmd = cmd.with_target_alias(trgt_alias); + } - if let Some(metadata) = custom_metadata { - let json_metadata: Map = - metadata.into_iter().map(|(k, v)| (k, v.into())).collect(); - cmd = cmd.with_metadata(json_metadata); - }; + if let Some(writer_props) = writer_properties { + cmd = cmd.with_writer_properties( + set_writer_properties(writer_props).map_err(PythonError::from)?, + ); + } - if let Some(mu_updates) = matched_update_updates { - if let Some(mu_predicate) = matched_update_predicate { - for it in mu_updates.iter().zip(mu_predicate.iter()) { - let (update_values, predicate_value) = it; - - if let Some(pred) = predicate_value { - cmd = cmd - .when_matched_update(|mut update| { - for (col_name, expression) in update_values { - update = update.update(col_name.clone(), expression.clone()); - } - update.predicate(pred.clone()) - }) - .map_err(PythonError::from)?; - } else { - cmd = cmd - .when_matched_update(|mut update| { - for (col_name, expression) in update_values { - update = update.update(col_name.clone(), expression.clone()); - } - update - }) - .map_err(PythonError::from)?; + if let Some(metadata) = custom_metadata { + let json_metadata: Map = + metadata.into_iter().map(|(k, v)| (k, v.into())).collect(); + cmd = cmd.with_metadata(json_metadata); + }; + + if let Some(mu_updates) = matched_update_updates { + if let Some(mu_predicate) = matched_update_predicate { + for it in mu_updates.iter().zip(mu_predicate.iter()) { + let (update_values, predicate_value) = it; + + if let Some(pred) = predicate_value { + cmd = cmd + .when_matched_update(|mut update| { + for (col_name, expression) in update_values { + update = + update.update(col_name.clone(), expression.clone()); + } + update.predicate(pred.clone()) + }) + .map_err(PythonError::from)?; + } else { + cmd = cmd + .when_matched_update(|mut update| { + for (col_name, expression) in update_values { + update = + update.update(col_name.clone(), expression.clone()); + } + update + }) + .map_err(PythonError::from)?; + } } } } - } - if let Some(_md_delete_all) = matched_delete_all { - cmd = cmd - .when_matched_delete(|delete| delete) - .map_err(PythonError::from)?; - } else if let Some(md_predicate) = matched_delete_predicate { - for pred in md_predicate.iter() { + if let Some(_md_delete_all) = matched_delete_all { cmd = cmd - .when_matched_delete(|delete| delete.predicate(pred.clone())) + .when_matched_delete(|delete| delete) .map_err(PythonError::from)?; + } else if let Some(md_predicate) = matched_delete_predicate { + for pred in md_predicate.iter() { + cmd = cmd + .when_matched_delete(|delete| delete.predicate(pred.clone())) + .map_err(PythonError::from)?; + } } - } - if let Some(nmi_updates) = not_matched_insert_updates { - if let Some(nmi_predicate) = not_matched_insert_predicate { - for it in nmi_updates.iter().zip(nmi_predicate.iter()) { - let (update_values, predicate_value) = it; - if let Some(pred) = predicate_value { - cmd = cmd - .when_not_matched_insert(|mut insert| { - for (col_name, expression) in update_values { - insert = insert.set(col_name.clone(), expression.clone()); - } - insert.predicate(pred.clone()) - }) - .map_err(PythonError::from)?; - } else { - cmd = cmd - .when_not_matched_insert(|mut insert| { - for (col_name, expression) in update_values { - insert = insert.set(col_name.clone(), expression.clone()); - } - insert - }) - .map_err(PythonError::from)?; + if let Some(nmi_updates) = not_matched_insert_updates { + if let Some(nmi_predicate) = not_matched_insert_predicate { + for it in nmi_updates.iter().zip(nmi_predicate.iter()) { + let (update_values, predicate_value) = it; + if let Some(pred) = predicate_value { + cmd = cmd + .when_not_matched_insert(|mut insert| { + for (col_name, expression) in update_values { + insert = insert.set(col_name.clone(), expression.clone()); + } + insert.predicate(pred.clone()) + }) + .map_err(PythonError::from)?; + } else { + cmd = cmd + .when_not_matched_insert(|mut insert| { + for (col_name, expression) in update_values { + insert = insert.set(col_name.clone(), expression.clone()); + } + insert + }) + .map_err(PythonError::from)?; + } } } } - } - if let Some(nmbsu_updates) = not_matched_by_source_update_updates { - if let Some(nmbsu_predicate) = not_matched_by_source_update_predicate { - for it in nmbsu_updates.iter().zip(nmbsu_predicate.iter()) { - let (update_values, predicate_value) = it; - if let Some(pred) = predicate_value { - cmd = cmd - .when_not_matched_by_source_update(|mut update| { - for (col_name, expression) in update_values { - update = update.update(col_name.clone(), expression.clone()); - } - update.predicate(pred.clone()) - }) - .map_err(PythonError::from)?; - } else { - cmd = cmd - .when_not_matched_by_source_update(|mut update| { - for (col_name, expression) in update_values { - update = update.update(col_name.clone(), expression.clone()); - } - update - }) - .map_err(PythonError::from)?; + if let Some(nmbsu_updates) = not_matched_by_source_update_updates { + if let Some(nmbsu_predicate) = not_matched_by_source_update_predicate { + for it in nmbsu_updates.iter().zip(nmbsu_predicate.iter()) { + let (update_values, predicate_value) = it; + if let Some(pred) = predicate_value { + cmd = cmd + .when_not_matched_by_source_update(|mut update| { + for (col_name, expression) in update_values { + update = + update.update(col_name.clone(), expression.clone()); + } + update.predicate(pred.clone()) + }) + .map_err(PythonError::from)?; + } else { + cmd = cmd + .when_not_matched_by_source_update(|mut update| { + for (col_name, expression) in update_values { + update = + update.update(col_name.clone(), expression.clone()); + } + update + }) + .map_err(PythonError::from)?; + } } } } - } - if let Some(_nmbs_delete_all) = not_matched_by_source_delete_all { - cmd = cmd - .when_not_matched_by_source_delete(|delete| delete) - .map_err(PythonError::from)?; - } else if let Some(nmbs_predicate) = not_matched_by_source_delete_predicate { - for pred in nmbs_predicate.iter() { + if let Some(_nmbs_delete_all) = not_matched_by_source_delete_all { cmd = cmd - .when_not_matched_by_source_delete(|delete| delete.predicate(pred.clone())) + .when_not_matched_by_source_delete(|delete| delete) .map_err(PythonError::from)?; + } else if let Some(nmbs_predicate) = not_matched_by_source_delete_predicate { + for pred in nmbs_predicate.iter() { + cmd = cmd + .when_not_matched_by_source_delete(|delete| delete.predicate(pred.clone())) + .map_err(PythonError::from)?; + } } - } - let (table, metrics) = rt()? - .block_on(cmd.into_future()) - .map_err(PythonError::from)?; - self._table.state = table.state; - Ok(serde_json::to_string(&metrics).unwrap()) + let (table, metrics) = rt()? + .block_on(cmd.into_future()) + .map_err(PythonError::from)?; + self._table.state = table.state; + Ok(serde_json::to_string(&metrics).unwrap()) + }) } // Run the restore command on the Delta Table: restore table to a given version or datetime