Skip to content

Commit

Permalink
allow merge_execute to release the GIL
Browse files Browse the repository at this point in the history
  • Loading branch information
emcake authored and rtyler committed Jan 19, 2024
1 parent a98cfa7 commit 5d020d4
Showing 1 changed file with 129 additions and 122 deletions.
251 changes: 129 additions & 122 deletions python/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -467,6 +467,7 @@ impl RawDeltaTable {
))]
pub fn merge_execute(
&mut self,
py: Python,
source: PyArrowType<ArrowArrayStreamReader>,
predicate: String,
source_alias: Option<String>,
Expand All @@ -485,152 +486,158 @@ impl RawDeltaTable {
not_matched_by_source_delete_predicate: Option<Vec<String>>,
not_matched_by_source_delete_all: Option<bool>,
) -> PyResult<String> {
let ctx = SessionContext::new();
let schema = source.0.schema();
let batches = vec![source.0.map(|batch| batch.unwrap()).collect::<Vec<_>>()];
let table_provider: Arc<dyn TableProvider> =
Arc::new(MemTable::try_new(schema, batches).unwrap());
let source_df = ctx.read_table(table_provider).unwrap();

let mut cmd = MergeBuilder::new(
self._table.log_store(),
self._table.state.clone(),
predicate,
source_df,
)
.with_safe_cast(safe_cast);

if let Some(src_alias) = source_alias {
cmd = cmd.with_source_alias(src_alias);
}
py.allow_threads(|| {
let ctx = SessionContext::new();
let schema = source.0.schema();
let batches = vec![source.0.map(|batch| batch.unwrap()).collect::<Vec<_>>()];
let table_provider: Arc<dyn TableProvider> =
Arc::new(MemTable::try_new(schema, batches).unwrap());
let source_df = ctx.read_table(table_provider).unwrap();

let mut cmd = MergeBuilder::new(
self._table.log_store(),
self._table.state.clone(),
predicate,
source_df,
)
.with_safe_cast(safe_cast);

if let Some(trgt_alias) = target_alias {
cmd = cmd.with_target_alias(trgt_alias);
}
if let Some(src_alias) = source_alias {
cmd = cmd.with_source_alias(src_alias);
}

if let Some(writer_props) = writer_properties {
cmd = cmd.with_writer_properties(
set_writer_properties(writer_props).map_err(PythonError::from)?,
);
}
if let Some(trgt_alias) = target_alias {
cmd = cmd.with_target_alias(trgt_alias);
}

if let Some(metadata) = custom_metadata {
let json_metadata: Map<String, Value> =
metadata.into_iter().map(|(k, v)| (k, v.into())).collect();
cmd = cmd.with_metadata(json_metadata);
};
if let Some(writer_props) = writer_properties {
cmd = cmd.with_writer_properties(
set_writer_properties(writer_props).map_err(PythonError::from)?,
);
}

if let Some(mu_updates) = matched_update_updates {
if let Some(mu_predicate) = matched_update_predicate {
for it in mu_updates.iter().zip(mu_predicate.iter()) {
let (update_values, predicate_value) = it;

if let Some(pred) = predicate_value {
cmd = cmd
.when_matched_update(|mut update| {
for (col_name, expression) in update_values {
update = update.update(col_name.clone(), expression.clone());
}
update.predicate(pred.clone())
})
.map_err(PythonError::from)?;
} else {
cmd = cmd
.when_matched_update(|mut update| {
for (col_name, expression) in update_values {
update = update.update(col_name.clone(), expression.clone());
}
update
})
.map_err(PythonError::from)?;
if let Some(metadata) = custom_metadata {
let json_metadata: Map<String, Value> =
metadata.into_iter().map(|(k, v)| (k, v.into())).collect();
cmd = cmd.with_metadata(json_metadata);
};

if let Some(mu_updates) = matched_update_updates {
if let Some(mu_predicate) = matched_update_predicate {
for it in mu_updates.iter().zip(mu_predicate.iter()) {
let (update_values, predicate_value) = it;

if let Some(pred) = predicate_value {
cmd = cmd
.when_matched_update(|mut update| {
for (col_name, expression) in update_values {
update =
update.update(col_name.clone(), expression.clone());
}
update.predicate(pred.clone())
})
.map_err(PythonError::from)?;
} else {
cmd = cmd
.when_matched_update(|mut update| {
for (col_name, expression) in update_values {
update =
update.update(col_name.clone(), expression.clone());
}
update
})
.map_err(PythonError::from)?;
}
}
}
}
}

if let Some(_md_delete_all) = matched_delete_all {
cmd = cmd
.when_matched_delete(|delete| delete)
.map_err(PythonError::from)?;
} else if let Some(md_predicate) = matched_delete_predicate {
for pred in md_predicate.iter() {
if let Some(_md_delete_all) = matched_delete_all {
cmd = cmd
.when_matched_delete(|delete| delete.predicate(pred.clone()))
.when_matched_delete(|delete| delete)
.map_err(PythonError::from)?;
} else if let Some(md_predicate) = matched_delete_predicate {
for pred in md_predicate.iter() {
cmd = cmd
.when_matched_delete(|delete| delete.predicate(pred.clone()))
.map_err(PythonError::from)?;
}
}
}

if let Some(nmi_updates) = not_matched_insert_updates {
if let Some(nmi_predicate) = not_matched_insert_predicate {
for it in nmi_updates.iter().zip(nmi_predicate.iter()) {
let (update_values, predicate_value) = it;
if let Some(pred) = predicate_value {
cmd = cmd
.when_not_matched_insert(|mut insert| {
for (col_name, expression) in update_values {
insert = insert.set(col_name.clone(), expression.clone());
}
insert.predicate(pred.clone())
})
.map_err(PythonError::from)?;
} else {
cmd = cmd
.when_not_matched_insert(|mut insert| {
for (col_name, expression) in update_values {
insert = insert.set(col_name.clone(), expression.clone());
}
insert
})
.map_err(PythonError::from)?;
if let Some(nmi_updates) = not_matched_insert_updates {
if let Some(nmi_predicate) = not_matched_insert_predicate {
for it in nmi_updates.iter().zip(nmi_predicate.iter()) {
let (update_values, predicate_value) = it;
if let Some(pred) = predicate_value {
cmd = cmd
.when_not_matched_insert(|mut insert| {
for (col_name, expression) in update_values {
insert = insert.set(col_name.clone(), expression.clone());
}
insert.predicate(pred.clone())
})
.map_err(PythonError::from)?;
} else {
cmd = cmd
.when_not_matched_insert(|mut insert| {
for (col_name, expression) in update_values {
insert = insert.set(col_name.clone(), expression.clone());
}
insert
})
.map_err(PythonError::from)?;
}
}
}
}
}

if let Some(nmbsu_updates) = not_matched_by_source_update_updates {
if let Some(nmbsu_predicate) = not_matched_by_source_update_predicate {
for it in nmbsu_updates.iter().zip(nmbsu_predicate.iter()) {
let (update_values, predicate_value) = it;
if let Some(pred) = predicate_value {
cmd = cmd
.when_not_matched_by_source_update(|mut update| {
for (col_name, expression) in update_values {
update = update.update(col_name.clone(), expression.clone());
}
update.predicate(pred.clone())
})
.map_err(PythonError::from)?;
} else {
cmd = cmd
.when_not_matched_by_source_update(|mut update| {
for (col_name, expression) in update_values {
update = update.update(col_name.clone(), expression.clone());
}
update
})
.map_err(PythonError::from)?;
if let Some(nmbsu_updates) = not_matched_by_source_update_updates {
if let Some(nmbsu_predicate) = not_matched_by_source_update_predicate {
for it in nmbsu_updates.iter().zip(nmbsu_predicate.iter()) {
let (update_values, predicate_value) = it;
if let Some(pred) = predicate_value {
cmd = cmd
.when_not_matched_by_source_update(|mut update| {
for (col_name, expression) in update_values {
update =
update.update(col_name.clone(), expression.clone());
}
update.predicate(pred.clone())
})
.map_err(PythonError::from)?;
} else {
cmd = cmd
.when_not_matched_by_source_update(|mut update| {
for (col_name, expression) in update_values {
update =
update.update(col_name.clone(), expression.clone());
}
update
})
.map_err(PythonError::from)?;
}
}
}
}
}

if let Some(_nmbs_delete_all) = not_matched_by_source_delete_all {
cmd = cmd
.when_not_matched_by_source_delete(|delete| delete)
.map_err(PythonError::from)?;
} else if let Some(nmbs_predicate) = not_matched_by_source_delete_predicate {
for pred in nmbs_predicate.iter() {
if let Some(_nmbs_delete_all) = not_matched_by_source_delete_all {
cmd = cmd
.when_not_matched_by_source_delete(|delete| delete.predicate(pred.clone()))
.when_not_matched_by_source_delete(|delete| delete)
.map_err(PythonError::from)?;
} else if let Some(nmbs_predicate) = not_matched_by_source_delete_predicate {
for pred in nmbs_predicate.iter() {
cmd = cmd
.when_not_matched_by_source_delete(|delete| delete.predicate(pred.clone()))
.map_err(PythonError::from)?;
}
}
}

let (table, metrics) = rt()?
.block_on(cmd.into_future())
.map_err(PythonError::from)?;
self._table.state = table.state;
Ok(serde_json::to_string(&metrics).unwrap())
let (table, metrics) = rt()?
.block_on(cmd.into_future())
.map_err(PythonError::from)?;
self._table.state = table.state;
Ok(serde_json::to_string(&metrics).unwrap())
})
}

// Run the restore command on the Delta Table: restore table to a given version or datetime
Expand Down

0 comments on commit 5d020d4

Please sign in to comment.