Skip to content

Commit

Permalink
feat(python): expose MERGE operation (#1685)
Browse files Browse the repository at this point in the history
# Description
This exposes MERGE commands to the Python API. The updates and
predicates are first kept in the Class TableMerger and only dispatched
to Rust after `TableMerge.execute()`.

This was my first thought on how to implement it since I have limited
experience with Rust and PyO3 (still learning 😄). Maybe a more elegant
solution is that every class method on TableMerger is dispatched to Rust
and then the Rust MergeBuilder gets serialized and sent back to Python
(back and forth). Let me know your thoughts on this. If this is better,
I could also do this in the next PR, so we at least can push this one
out sooner.

Couple of issues at the moment, I need feedback on, where the first one
is blocking since I can't test it now:

~- Source_alias is not applying, somehow during a schema check the
prefix is missing, however when I printed the lines inside merge, it
showed the prefix correctly. So not sure where the issue is~
~- I had to make datafusion_utils public since I needed to get the
Expression Struct from it, is this the right way to do that? @Blajda~

Edit:
I will pull @Blajda's changes
#1705 once merged with develop:


# Related Issue(s)
<!---
For example:

- closes #106
--->
closes  #1357
  • Loading branch information
ion-elgreco authored Oct 17, 2023
1 parent 187a58c commit 21e369f
Show file tree
Hide file tree
Showing 7 changed files with 1,072 additions and 3 deletions.
19 changes: 19 additions & 0 deletions python/deltalake/_internal.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,25 @@ class RawDeltaTable:
writer_properties: Optional[Dict[str, int]],
safe_cast: bool = False,
) -> str: ...
def merge_execute(
self,
source: pa.RecordBatchReader,
predicate: str,
source_alias: Optional[str],
target_alias: Optional[str],
writer_properties: Optional[Dict[str, int | None]],
safe_cast: bool,
matched_update_updates: Optional[Dict[str, str]],
matched_update_predicate: Optional[str],
matched_delete_predicate: Optional[str],
matched_delete_all: Optional[bool],
not_matched_insert_updates: Optional[Dict[str, str]],
not_matched_insert_predicate: Optional[str],
not_matched_by_source_update_updates: Optional[Dict[str, str]],
not_matched_by_source_update_predicate: Optional[str],
not_matched_by_source_delete_predicate: Optional[str],
not_matched_by_source_delete_all: Optional[bool],
) -> str: ...
def get_active_partitions(
self, partitions_filters: Optional[FilterType] = None
) -> Any: ...
Expand Down
Loading

0 comments on commit 21e369f

Please sign in to comment.