Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(flink): add read_***() for Flink backend #7777

Conversation

mfatihaktas
Copy link
Contributor

@mfatihaktas mfatihaktas commented Dec 15, 2023

Adds the following functions in ibis/backends/flink/__init__.py:

  • register()
  • read_file()
  • read_parquet()
  • read_csv()
  • read_json()

Addition of these functions clears several test functions in test_param.py and test_register.py.

Edit: Removed register() per this comment.

@mfatihaktas mfatihaktas force-pushed the flink-deep-dive-on-test_json-2 branch 5 times, most recently from ca1ec35 to 76ee200 Compare December 16, 2023 11:07
@cpcloud
Copy link
Member

cpcloud commented Dec 16, 2023

Let's avoid adding register for now. It's a bit crufty and from a time when we thought we might be able to unify all the read_* methods under a single read(...) function.

@mfatihaktas mfatihaktas force-pushed the flink-deep-dive-on-test_json-2 branch from 76ee200 to 4a125d9 Compare December 17, 2023 01:17
@mfatihaktas
Copy link
Contributor Author

Let's avoid adding register for now. It's a bit crufty and from a time when we thought we might be able to unify all the read_* methods under a single read(...) function.

Thanks for the note, removed register().

@mfatihaktas mfatihaktas changed the title feat(flink): add register() and read_***() for Flink backend feat(flink): add dread_***() for Flink backend Dec 18, 2023
@mfatihaktas mfatihaktas changed the title feat(flink): add dread_***() for Flink backend feat(flink): add read_***() for Flink backend Dec 18, 2023
@mfatihaktas mfatihaktas marked this pull request as ready for review December 18, 2023 22:11
@mfatihaktas mfatihaktas force-pushed the flink-deep-dive-on-test_json-2 branch from 4a125d9 to 8cf8ed5 Compare December 19, 2023 16:36
@mfatihaktas mfatihaktas force-pushed the flink-deep-dive-on-test_json-2 branch from 8cf8ed5 to 601cd13 Compare January 3, 2024 18:01
ibis/backends/flink/__init__.py Outdated Show resolved Hide resolved
ir.Table
The just-registered table
"""
obj = self._get_dataframe_from_path(path)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does flink support natively loading directly from files? If so, I think we should use that.

If it doesn't, I'm not sure how I feel about automatically loading files using pandas and forwarding them that way. We do something similar for local in-memory backends like duckdb/pandas where file paths are unambiguously local. But for a potentially distributed system like flink, automatically using a local file reader may have unexpected behavior with path names, and also may be inefficient.

If we do decide to handle file reading manually using an in-memory reader here, we should use the ones provided by pyarrow directly, not pandas. These have builtin support for directory datasets, and better match the behavior of other backends.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, I agree that reading the file is not ideal while creating the table. Flink actually supports creating tables with filesystem connector. This however requires specifying the schema, as create_table() also requires:

def create_table(
        self,
        name: str,
        obj: pd.DataFrame | pa.Table | ir.Table | None = None,
        *,
        schema: sch.Schema | None = None,
        database: str | None = None,
        catalog: str | None = None,
        tbl_properties: dict | None = None,
        watermark: Watermark | None = None,
        temp: bool = False,
        overwrite: bool = False,
    ) -> ir.Table:
        ...

               if obj is None and schema is None:
                   raise exc.IbisError("`schema` or `obj` is required")
    ...

So I think we have two options:

  1. Add a required argument schema for read_***(). This would deviate from the interface existing for other backends.
  2. "Read" the schema from the file with pyarrow and feed it into create_table() with filesystem connector. This has the same issue you raised in accessing the files in a distributed system.

Which one do you think makes more sense? We could also implement both where the user can specify the schema, and if not specified we could construct the schema from the file and raise an error in case of a failure in accessing the file.

@mfatihaktas mfatihaktas force-pushed the flink-deep-dive-on-test_json-2 branch from 601cd13 to 0e69c18 Compare January 3, 2024 22:39
@mfatihaktas mfatihaktas requested a review from jcrist January 3, 2024 22:39
@cpcloud cpcloud deleted the branch ibis-project:master January 4, 2024 10:43
@cpcloud cpcloud closed this Jan 4, 2024
@cpcloud
Copy link
Member

cpcloud commented Jan 4, 2024

@mfatihaktas Apologies again for the churn, can you reopen this PR against main?

@mfatihaktas
Copy link
Contributor Author

@mfatihaktas Apologies again for the churn, can you reopen this PR against main?

Reopened: #7908

cpcloud pushed a commit that referenced this pull request Jan 21, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants