Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implementation of new pipeline interface #665

Closed
wants to merge 38 commits into from

Conversation

mrchtr
Copy link
Contributor

@mrchtr mrchtr commented Nov 22, 2023

Draft implementation of the new pipeline interface:

  • modified pipeline.py (introduce new interface itself)
  • renaming dataset accordingly to the defined consumes and produces
  • introduce schema which can be used inside of generic components

Note: Didn't fixed the test yet. Added a dummy test just to visualise how the pipeline could look like. First wanted to check if this goes into the right direction.

mrchtr and others added 29 commits November 16, 2023 14:15
Co-authored-by: Philippe Moussalli <[email protected]>
@mrchtr mrchtr requested a review from RobbeSneyders November 22, 2023 16:07
Copy link
Contributor

@PhilippeMoussalli PhilippeMoussalli left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks Matthias! I think this is headed in the right direction :)
Left some comments

components/embed_text/fondant_component.yaml Outdated Show resolved Hide resolved
schema=schema)

self.add_op(component_op)
return self
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

General question

Do we need to return the class instance. So instead of this:

pipeline = Pipeline(
    pipeline_name="my_pipeline",
    pipeline_description="description of my pipeline",
    base_path="/foo/bar",
)

dataset = pipeline.read(
    name="load_images",
    schema={
        "image": type.binary  # or pa.binary()
    }
)

We would have:

pipeline = Pipeline(
    pipeline_name="my_pipeline",
    pipeline_description="description of my pipeline",
    base_path="/foo/bar",
)

pipeline.read(
    name="load_images",
    schema={
        "image": type.binary  # or pa.binary()
    }
)

A bit similar to the the way we used to add operations to the pipeline. I think both are valid but not sure which one is more intuitive

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The idea behind returning the class instance was that to call apply on a dataset afterwards.
Returning the class instance gives the user the most flexibility. The user can decide which variant he wants to implement.

Either:

dataset = pipeline.read(...)
dataset = dataset.apply(...)

or

pipeline.read(...)
dataset.apply(...)

or even:

pipeline.read(...).apply(...).apply(...)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok that makes sense :)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, the Dataset class can provide an interface for more interactive data exploration.

src/fondant/pipeline/pipeline.py Outdated Show resolved Hide resolved
src/fondant/pipeline/pipeline.py Outdated Show resolved Hide resolved
src/fondant/pipeline/pipeline.py Outdated Show resolved Hide resolved
cluster_type: t.Optional[str] = "default",
client_kwargs: t.Optional[dict] = None,
resources: t.Optional[Resources] = None,
consumes: t.Optional[t.Dict[str, str]] = None,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would add return an error if both consumes and produces are None

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would use the default values of the component specs if they are None.

cluster_type: t.Optional[str] = "default",
client_kwargs: t.Optional[dict] = None,
resources: t.Optional[Resources] = None,
schema: t.Optional[t.Dict[str, str]] = None) -> "Pipeline":
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The schema here is mandatory I suppose

client_kwargs: t.Optional[dict] = None,
resources: t.Optional[Resources] = None,
consumes: t.Optional[t.Dict[str, str]] = None,
schema: t.Optional[t.Dict[str, str]] = None) -> "Pipeline":
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same comment as above, is the schema here just needed for renaming or does it also specify which fields to write?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The schema should specify which fields and belonging types will be written.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would docstring all the read/write/apply functions since they're user facing

@mrchtr mrchtr force-pushed the feautre/refactore-component-package branch from 734526b to d2182a0 Compare November 23, 2023 12:13
Base automatically changed from feautre/refactore-component-package to feature/redesign-dataset-format-and-interface November 23, 2023 13:47
mrchtr and others added 3 commits November 24, 2023 08:49
First PR related to the data structure redesign. 

Implements the following: 
- New manifest structure (including validation, and evolution)
- New ComponentSpec structure (including validation)
- Removes `Subsets` and `Index`

Not all tests are running successfully. But this are already quite a few
changes. Therefore, I've created PR on feature branch
`feature/redesign-dataset-format-and-interface`, to have quicker
feedback loops.

---------

Co-authored-by: Robbe Sneyders <[email protected]>
Co-authored-by: Philippe Moussalli <[email protected]>
Refactor component package as part of #643

---------

Co-authored-by: Robbe Sneyders <[email protected]>
Co-authored-by: Philippe Moussalli <[email protected]>
This PR applies the usage of the new data format:

- fixes all tests
- update component specifications and component code
- remove subset field usage in `pipeline.py`

---------

Co-authored-by: Robbe Sneyders <[email protected]>
@RobbeSneyders RobbeSneyders force-pushed the feature/redesign-dataset-format-and-interface branch from 9f057ad to e4eadf3 Compare November 24, 2023 07:50
@mrchtr mrchtr linked an issue Nov 24, 2023 that may be closed by this pull request
cluster_type: t.Optional[str] = "default",
client_kwargs: t.Optional[dict] = None,
resources: t.Optional[Resources] = None,
schema: t.Dict[str, str],
Copy link
Contributor

@PhilippeMoussalli PhilippeMoussalli Nov 24, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't this schema be Dict[str, Type]? I would add some example on how this would look like in the docstring so the users know what to input and mainly because it's different from the write schema (this one would also benefit from having examples in the description), could you also move it to the top since it's mandatory?

cluster_type: t.Optional[str] = "default",
client_kwargs: t.Optional[dict] = None,
resources: t.Optional[Resources] = None,
consumes: t.Optional[t.Dict[str, str]] = None,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does the write requires a consumes?

Base automatically changed from feature/redesign-dataset-format-and-interface to main November 27, 2023 09:34
RobbeSneyders added a commit that referenced this pull request Nov 28, 2023
This PR is the first one of multiple PRs to replace #665. This PR only
focuses on implementing the new pipeline interface, without adding any
new functionality.

The new interface applies operations to intermediate datasets instead of
adding operations to a pipeline, as shown below. It's a superficial
change, since only the interface is changed. All underlying behavior is
still the same.

The new interface fits nicely with our data format design and we'll be
able to leverage it for interactive development in the future. We can
calculate the schema for each intermediate dataset so the user can
inspect it. Or with eager execution, we could execute a single operation
and allow the user to explore the data using the dataset.

I still need to update the README generation, but I'll do that as a
separate PR. It becomes a bit more complex since we now need to
discriminate between read, transform, and write components to generate
the example code.

**Old interface**
```Python
from fondant.pipeline import ComponentOp, Pipeline


pipeline = Pipeline(
    pipeline_name="my_pipeline",
    pipeline_description="description of my pipeline",
    base_path="/foo/bar",
)

load_op = ComponentOp(
    component_dir="load_data",
    arguments={...},
)

caption_op = ComponentOp.from_registry(
    name="caption_images",
    arguments={...},
)

embed_op = ComponentOp(
    component_dir="embed_text",
    arguments={...},
)

write_op = ComponentOp.from_registry(
    name="write_to_hf_hub",
    arguments={...},
)

pipeline.add_op(load_op)
pipeline.add_op(caption_op, dependencies=[load_op])
pipeline.add_op(embed_op, dependencies=[caption_op])
pipeline.add_op(write_op, dependencies=[embed_op])
```

**New interface**
```Python
pipeline = Pipeline(
    pipeline_name="my_pipeline",
    pipeline_description="description of my pipeline",
    base_path="/foo/bar",
)

dataset = pipeline.read(
    "load_data",
    arguments={...},
)
dataset = dataset.apply(
    "caption_images",
    arguments={...},
)
dataset = dataset.apply(
    "embed_text",
    arguments={...},
)
dataset.write(
    "write_to_hf_hub",
    arguments={...},
)
@RobbeSneyders
Copy link
Member

Closing in favor of #685

RobbeSneyders added a commit that referenced this pull request Dec 7, 2023
This PR is the first one of multiple PRs to replace #665. This PR only
focuses on implementing the new pipeline interface, without adding any
new functionality.

The new interface applies operations to intermediate datasets instead of
adding operations to a pipeline, as shown below. It's a superficial
change, since only the interface is changed. All underlying behavior is
still the same.

The new interface fits nicely with our data format design and we'll be
able to leverage it for interactive development in the future. We can
calculate the schema for each intermediate dataset so the user can
inspect it. Or with eager execution, we could execute a single operation
and allow the user to explore the data using the dataset.

I still need to update the README generation, but I'll do that as a
separate PR. It becomes a bit more complex since we now need to
discriminate between read, transform, and write components to generate
the example code.

**Old interface**
```Python
from fondant.pipeline import ComponentOp, Pipeline

pipeline = Pipeline(
    pipeline_name="my_pipeline",
    pipeline_description="description of my pipeline",
    base_path="/foo/bar",
)

load_op = ComponentOp(
    component_dir="load_data",
    arguments={...},
)

caption_op = ComponentOp.from_registry(
    name="caption_images",
    arguments={...},
)

embed_op = ComponentOp(
    component_dir="embed_text",
    arguments={...},
)

write_op = ComponentOp.from_registry(
    name="write_to_hf_hub",
    arguments={...},
)

pipeline.add_op(load_op)
pipeline.add_op(caption_op, dependencies=[load_op])
pipeline.add_op(embed_op, dependencies=[caption_op])
pipeline.add_op(write_op, dependencies=[embed_op])
```

**New interface**
```Python
pipeline = Pipeline(
    pipeline_name="my_pipeline",
    pipeline_description="description of my pipeline",
    base_path="/foo/bar",
)

dataset = pipeline.read(
    "load_data",
    arguments={...},
)
dataset = dataset.apply(
    "caption_images",
    arguments={...},
)
dataset = dataset.apply(
    "embed_text",
    arguments={...},
)
dataset.write(
    "write_to_hf_hub",
    arguments={...},
)
RobbeSneyders added a commit that referenced this pull request Dec 7, 2023
This PR is the first one of multiple PRs to replace #665. This PR only
focuses on implementing the new pipeline interface, without adding any
new functionality.

The new interface applies operations to intermediate datasets instead of
adding operations to a pipeline, as shown below. It's a superficial
change, since only the interface is changed. All underlying behavior is
still the same.

The new interface fits nicely with our data format design and we'll be
able to leverage it for interactive development in the future. We can
calculate the schema for each intermediate dataset so the user can
inspect it. Or with eager execution, we could execute a single operation
and allow the user to explore the data using the dataset.

I still need to update the README generation, but I'll do that as a
separate PR. It becomes a bit more complex since we now need to
discriminate between read, transform, and write components to generate
the example code.

**Old interface**
```Python
from fondant.pipeline import ComponentOp, Pipeline

pipeline = Pipeline(
    pipeline_name="my_pipeline",
    pipeline_description="description of my pipeline",
    base_path="/foo/bar",
)

load_op = ComponentOp(
    component_dir="load_data",
    arguments={...},
)

caption_op = ComponentOp.from_registry(
    name="caption_images",
    arguments={...},
)

embed_op = ComponentOp(
    component_dir="embed_text",
    arguments={...},
)

write_op = ComponentOp.from_registry(
    name="write_to_hf_hub",
    arguments={...},
)

pipeline.add_op(load_op)
pipeline.add_op(caption_op, dependencies=[load_op])
pipeline.add_op(embed_op, dependencies=[caption_op])
pipeline.add_op(write_op, dependencies=[embed_op])
```

**New interface**
```Python
pipeline = Pipeline(
    pipeline_name="my_pipeline",
    pipeline_description="description of my pipeline",
    base_path="/foo/bar",
)

dataset = pipeline.read(
    "load_data",
    arguments={...},
)
dataset = dataset.apply(
    "caption_images",
    arguments={...},
)
dataset = dataset.apply(
    "embed_text",
    arguments={...},
)
dataset.write(
    "write_to_hf_hub",
    arguments={...},
)
@RobbeSneyders RobbeSneyders deleted the feature/implement-new-pipeline-interface branch January 11, 2024 09:09
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Implement new pipeline interface
3 participants