Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[spike] Clarify status of various Delta Table datasets #542

Open
astrojuanlu opened this issue Feb 7, 2024 · 2 comments
Open

[spike] Clarify status of various Delta Table datasets #542

astrojuanlu opened this issue Feb 7, 2024 · 2 comments

Comments

@astrojuanlu
Copy link
Member

astrojuanlu commented Feb 7, 2024

Description

We have several Delta Table datasets and it's a bit hard to understand the differences and also choose from them. A spike should be done to get clarity on what each of these delta table datasets does and how we could potentially merge the datasets or make them more consistent.

Context

image

(related: kedro-org/kedro#3578)

Halfway through the implementation it was determined to not add a _save method and instead users are supposed to perform write operations inside the node themselves, introducing an undocumented exception to the Kedro philosophy.

This has been an ongoing point of confusion, and the final straw was this Slack thread https://linen-slack.kedro.org/t/16366189/tldr-is-there-a-suggested-pattern-for-converting-vanilla-par#4c007d9d-68c7-46e3-bf86-b4b86755f7ca

The expected outcomes here are unclear, but for starters I hope at least folks agree that the current situation is a bit of a mess. Possible actions are:

@julio-cmdr
Copy link

julio-cmdr commented Mar 6, 2024

Hi!
I've created a custom pandas.DeltaTableDataset, with upsert write mode. I'm sending it in here, just in case it's useful to anyone.

I took the opportunity to also add the pyarrow.Table.from_pandas() function within the _save() method, to fix the problems described in #3666

from deltalake.writer import write_deltalake
from deltalake import DeltaTable
from kedro_datasets.pandas import DeltaTableDataset
from kedro.io.core import DatasetError
from typing import List, Union, Any
import pandas as pd
import pyarrow as pa

# from https://github.com/inigohidalgo/prefect-polygon-etl/blob/main/delta-rs-etl/src/delta_rs_etl/upsert.py
def upsert(new_data: pa.Table, target_table: DeltaTable, primary_key: Union[str, List[str]]) -> dict:    
    predicate = (
        f"target.{primary_key} = source.{primary_key}"
        if type(primary_key) == str
        else " AND ".join([f"target.{col} = source.{col}" for col in primary_key])
    )
    
    return (
        target_table
        .merge(
            source=new_data,
            predicate=predicate,
            source_alias="source",
            target_alias="target"
        )
        .when_matched_update_all()
        .when_not_matched_insert_all()
        .execute()
    )


class CustomDeltaTableDataset(DeltaTableDataset):
    """
        This is a variation of pandas.DeltaTableDataset with support to upsert write mode
    """

    def __init__(self, primary_key: Union[str, List[str], None] = None, **kargs) -> None:
        self.primary_key = primary_key

        if kargs.get('save_args', {}).get('mode', '') == 'upsert':
            self.upsert_mode = True
            kargs['save_args']['mode'] = 'overwrite'
            if not self.primary_key:
                raise DatasetError(
                    "To use upsert write mode, you need to set the primare_key argument!"
                )
        else:
            self.upsert_mode = False
        
        super().__init__(**kargs)


    def _save(self, data: pd.DataFrame) -> None:
        data = pa.Table.from_pandas(data, preserve_index=False)

        if self.is_empty_dir:
            # first time creation of delta table
            write_deltalake(
                self._filepath,
                data,
                storage_options=self.fs_args,
                **self._save_args,
            )
            self.is_empty_dir = False
            self._delta_table = DeltaTable(
                table_uri=self._filepath,
                storage_options=self.fs_args,
                version=self._version,
            )
        elif self.upsert_mode:
            upsert(
                new_data=data,
                target_table=self._delta_table,
                primary_key=self.primary_key
            )
        else:            
            write_deltalake(
                self._delta_table,
                data,
                storage_options=self.fs_args,
                **self._save_args,
            )


    def _describe(self) -> dict[str, Any]:
        desc = super()._describe()
        desc['primary_key'] = self.primary_key

        if self.upsert_mode:
            desc['save_args']['mode'] = 'upsert'

        return desc

@astrojuanlu
Copy link
Member Author

To add on top of that, just realized that months ago I had created my own Polars DeltaDataset: https://github.com/astrojuanlu/talk-kedro-huggingface/blob/b998371/src/social_summarizer/datasets/polars_delta.py

Mostly uninteresting, except for

        # HACK: If the table is empty, return an empty DataFrame
        try:
            return pl.read_delta(
                load_path, storage_options=self._storage_options, **self._load_args
            )
        except TableNotFoundError:
            return pl.DataFrame()

(related: kedro-org/kedro#3578)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
Status: No status
Development

No branches or pull requests

3 participants