Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Deltalake dedup_sort support #2202

Open
wants to merge 2 commits into
base: devel
Choose a base branch
from
Open

Conversation

guitcastro
Copy link

Description

Deltalake is ignoring dedup_sort column and always overriding rows with matching ids. This PR added support to dedup_sort

Additional Context

This is my first PR, and I am very unfamiliar with the code base. So, I expected that this PR might need a few interactions of reviews to get it right. Per example, I didn't not find an easy way to test it. I am very open to make changes if someone point's me the right direction.

Copy link

netlify bot commented Jan 9, 2025

Deploy Preview for dlt-hub-docs ready!

Name Link
🔨 Latest commit f613ac6
🔍 Latest deploy log https://app.netlify.com/sites/dlt-hub-docs/deploys/678685a325de08000858889e
😎 Deploy Preview https://deploy-preview-2202--dlt-hub-docs.netlify.app
📱 Preview on mobile
Toggle QR Code...

QR Code

Use your smartphone camera to open QR code link.

To edit notification comments on pull requests, go to your Netlify site configuration.

@rudolfix rudolfix added the ci from fork run ci workflows on a pr even if they are from a fork label Jan 12, 2025
@rudolfix rudolfix self-requested a review January 20, 2025 11:16
@rudolfix rudolfix self-assigned this Jan 20, 2025
Copy link
Collaborator

@rudolfix rudolfix left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@guitcastro right now dedup_sort has no function for upsert merge strategy. indeed we do not issue a warning when it is defined. it is just ignored.

we assume that in case of upsert the input dataset is deduplicated on primary_key. in that case, the target dataset is deduplicated as well.

we could indeed use dedup_sort to deduplicate the source. but your code is doing something else. #2201 does not help to understand your use case.

are you trying to use dedup_sort as some kind of record version? and skip the update on rows for which version is not changed?

that is typically done using Incremental during extract phase so records that are not changed are simply not present.

@guitcastro
Copy link
Author

@rudolfix

The issue was indeed confused. English is not my native language and I opened the issues before digging in the code and didn't not updated it. Sorry about that.

we could indeed use dedup_sort to deduplicate the source. but your code is doing something else. #2201 does not help to understand your use case.

I might have written the code wrong, but my intention is to deduplicate using dedup_sort. It's working locally and is based on this example from data brics docs:

deltaTable.alias("logs").merge(
    newDedupedLogs.alias("newDedupedLogs"),
    "logs.uniqueId = newDedupedLogs.uniqueId AND logs.date > current_date() - INTERVAL 7 DAYS") \
  .whenNotMatchedInsertAll("newDedupedLogs.date > current_date() - INTERVAL 7 DAYS") \
  .execute()

Could you point out what might be wrong with my code?

cheers!

@rudolfix
Copy link
Collaborator

Your problems is: your source data contains duplicates and you want to get rid of them. correct? This should happen BEFORE you execute merge statement so:

def merge_delta_table(
    table: DeltaTable,
    data: Union[pa.Table, pa.RecordBatchReader],
    schema: TTableSchema,
) -> None:

you need to deduplicate data using pyarrow or duckdb.

current implementation in dlt assumes that your source data is deduplicated

@guitcastro
Copy link
Author

@rudolfix No, the source data does not contain duplicated.

Source and destination might have the same key, if so I want to update the destination (merge), however I don't want to override if the date didn't changed (modified_date are equals).
Replacing the data does give me the same output is destination, but with a cost (delta e duplicating a lot of data to keep table history).

That exactly the use case of dedup_sort, right?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
ci from fork run ci workflows on a pr even if they are from a fork
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants