Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Start from dataset schema for lightweight python component consumes #789

Merged
merged 22 commits into from
Jan 30, 2024
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
7a6b78a
Integrate argument inference
RobbeSneyders Jan 17, 2024
c007244
Add compilation to python component test
RobbeSneyders Jan 17, 2024
e52a5c4
Add argument inference to integration test
RobbeSneyders Jan 17, 2024
161f214
Start from dataset schema for python component consumes
RobbeSneyders Jan 17, 2024
66a9103
add option to define consumes in mapping
PhilippeMoussalli Jan 22, 2024
2e10af1
add option to define consumes and generic in mapping
PhilippeMoussalli Jan 22, 2024
e8d763f
Merge branch 'feature/python-consumes-mapping-3' into feature/python-…
PhilippeMoussalli Jan 22, 2024
6619b3a
small fixes
PhilippeMoussalli Jan 22, 2024
d898e4a
make lightweight consumes generic by default
PhilippeMoussalli Jan 23, 2024
2d80a77
Merge branch 'main' into feature/python-consumes-mapping
PhilippeMoussalli Jan 23, 2024
cef482a
revert to desired behaviour
PhilippeMoussalli Jan 23, 2024
8c9d154
update sample pipeline
PhilippeMoussalli Jan 23, 2024
4c97282
update based on feedback
PhilippeMoussalli Jan 23, 2024
3ab1bae
implement PR feedback
PhilippeMoussalli Jan 25, 2024
b59fb8c
add docstrings
PhilippeMoussalli Jan 25, 2024
de5a3c1
update consumes based on new proposal
PhilippeMoussalli Jan 30, 2024
3943c4b
Merge branch 'main' into feature/python-consumes-mapping
PhilippeMoussalli Jan 30, 2024
d8e5563
Update src/fondant/pipeline/lightweight_component.py
PhilippeMoussalli Jan 30, 2024
85f0994
enable default behavior of passing all dataset fields
PhilippeMoussalli Jan 30, 2024
5b69298
implement PR feedback
PhilippeMoussalli Jan 30, 2024
12c6f37
Merge branch 'main' into feature/python-consumes-mapping
GeorgesLorre Jan 30, 2024
60dc6f6
Merge branch 'main' into feature/python-consumes-mapping
PhilippeMoussalli Jan 30, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions src/fondant/core/component_spec.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,8 +94,8 @@ def __init__(
image: str,
*,
description: t.Optional[str] = None,
consumes: t.Optional[t.Dict[str, t.Union[str, pa.DataType, bool]]] = None,
produces: t.Optional[t.Dict[str, t.Union[str, pa.DataType, bool]]] = None,
consumes: t.Optional[t.Mapping[str, t.Union[str, pa.DataType, bool]]] = None,
produces: t.Optional[t.Mapping[str, t.Union[str, pa.DataType, bool]]] = None,
previous_index: t.Optional[str] = None,
args: t.Optional[t.Dict[str, t.Any]] = None,
tags: t.Optional[t.List[str]] = None,
Expand Down
65 changes: 65 additions & 0 deletions src/fondant/pipeline/lightweight_component.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,10 @@
from dataclasses import asdict, dataclass
from functools import wraps

import pyarrow as pa

from fondant.component import BaseComponent, Component
from fondant.core.schema import Field, Type


@dataclass
Expand All @@ -28,11 +31,24 @@ class PythonComponent(BaseComponent):
def image(cls) -> Image:
raise NotImplementedError

@classmethod
def consumes(cls) -> t.Optional[t.Union[list, str]]:
pass

@classmethod
def get_consumes_spec(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think a docstring would be good.

cls,
dataset_fields: t.Mapping[str, Field],
apply_consumes: t.Optional[t.Dict[str, t.Union[str, pa.DataType]]],
):
pass


def lightweight_component(
*args,
extra_requires: t.Optional[t.List[str]] = None,
base_image: t.Optional[str] = None,
consumes: t.Optional[t.Union[list, str]] = None,
):
"""Decorator to enable a python component."""

Expand Down Expand Up @@ -121,6 +137,55 @@ class PythonComponentOp(cls, PythonComponent):
def image(cls) -> Image:
return image

@classmethod
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can make this a class property by combining the classmethod and property decorators.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmm it doesn't seem correct, should I apply getters and setters

image

Copy link
Member Author

@RobbeSneyders RobbeSneyders Jan 25, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It should be reversed, but seems like it only works for Python 3.9 and 3.10 (docs).

Just making it a class attribute could work as well:

class BaseClass:
    consumes: ConsumesType

class Class(BaseClass):
    consumes=consumes_  # cannot be the same name

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmm I still don't quite follow what should be done here, aren't the class methods needed for the decorators? what's the need for attributes in this example?

def consumes(cls) -> t.Optional[t.Union[list, str]]:
return consumes

@classmethod
def get_consumes_spec(
PhilippeMoussalli marked this conversation as resolved.
Show resolved Hide resolved
cls,
dataset_fields: t.Mapping[str, Field],
apply_consumes: t.Optional[t.Dict[str, t.Union[str, pa.DataType]]],
):
consumes = cls.consumes()

if consumes == "generic":
return {"additionalProperties": True}

# Get consumes spec from the dataset
consumes_spec = {k: v.type.to_dict() for k, v in dataset_fields.items()}

# Modify naming based on the consumes argument in the 'apply' method
RobbeSneyders marked this conversation as resolved.
Show resolved Hide resolved
if apply_consumes:
for k, v in apply_consumes.items():
if isinstance(v, str):
consumes_spec[k] = consumes_spec.pop(v)
elif isinstance(v, pa.DataType):
consumes_spec[k] = Type(v).to_dict()
else:
msg = (
f"Invalid data type for field `{k}` in the `apply_consumes` "
f"argument. Only string and pa.DataType are allowed."
)
raise ValueError(
msg,
)

# Filter for values that are not in the user defined consumes list
if consumes:
mrchtr marked this conversation as resolved.
Show resolved Hide resolved
for field_to_consume in consumes:
if field_to_consume not in consumes_spec.keys():
msg = f"Field `{field_to_consume}` is not available in the dataset."
raise ValueError(
msg,
)

consumes_spec = {
k: v for k, v in consumes_spec.items() if k in consumes
}

return consumes_spec

return PythonComponentOp

# Call wrapper with function (`args[0]`) when no additional arguments were passed
Expand Down
16 changes: 14 additions & 2 deletions src/fondant/pipeline/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,12 @@ def from_component_yaml(cls, path, **kwargs) -> "ComponentOp":
)

@classmethod
def from_ref(cls, ref: t.Any, **kwargs) -> "ComponentOp":
def from_ref(
cls,
ref: t.Any,
fields: t.Optional[t.Mapping[str, Field]] = None,
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would prefer to keep this fields argument out of here since this is specific to the lightweight Python components. Can we move this to the PythonComponent class instead?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't see a straightforwards way of doing this, unless we somehow pass the fields to the BaseComponent class since this is what the PythonComponent starts from but also not sure how feasible that is. Any other suggestions?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My original implementation did this in the Dataset class. So I would assume we can just call a method on the PythonComponent class at that point?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, this might make less sense after the refactoring on main since my first commits. If we can address my comment above, it's fine for me to keep it like this for now. Would be good to add the argument to the docstring though.

**kwargs,
) -> "ComponentOp":
"""Create a ComponentOp from a reference. The reference can
be a reusable component name, a path to a custom component,
or a python component class.
Expand All @@ -217,11 +222,16 @@ def from_ref(cls, ref: t.Any, **kwargs) -> "ComponentOp":
image = ref.image()
description = ref.__doc__ or "python component"

if fields:
consumes_spec = ref.get_consumes_spec(fields, kwargs["consumes"])
else:
consumes_spec = {"additionalProperties": True}

component_spec = ComponentSpec(
name,
image.base_image,
description=description,
consumes={"additionalProperties": True},
consumes=consumes_spec,
produces={"additionalProperties": True},
args={
name: arg.to_spec()
Expand Down Expand Up @@ -724,6 +734,7 @@ def apply(
"""
operation = ComponentOp.from_ref(
ref,
fields=self.fields,
produces=produces,
consumes=consumes,
arguments=arguments,
Expand Down Expand Up @@ -771,6 +782,7 @@ def write(
"""
operation = ComponentOp.from_ref(
ref,
fields=self.fields,
consumes=consumes,
arguments=arguments,
input_partition_rows=input_partition_rows,
Expand Down
2 changes: 1 addition & 1 deletion tests/pipeline/test_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ def test_component_op(


def test_component_op_python_component(default_pipeline_args):
@lightweight_component()
@lightweight_component(consumes="generic")
class Foo(DaskLoadComponent):
def load(self) -> dd.DataFrame:
df = pd.DataFrame(
Expand Down
149 changes: 115 additions & 34 deletions tests/pipeline/test_python_component.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,25 +7,50 @@
import pyarrow as pa
import pytest
from fondant.component import DaskLoadComponent, PandasTransformComponent
from fondant.core.component_spec import OperationSpec
from fondant.core.exceptions import InvalidPythonComponent
from fondant.pipeline import Pipeline, lightweight_component
from fondant.pipeline.compiler import DockerCompiler
from fondant.testing import DockerComposeConfigs


def test_build_python_script():
@lightweight_component()
@pytest.fixture()
def load_pipeline():
pipeline = Pipeline(
name="dummy-pipeline",
base_path="./data",
)

@lightweight_component(
base_image="python:3.8-slim-buster",
extra_requires=["pandas", "dask"],
consumes="generic",
)
class CreateData(DaskLoadComponent):
def load(self) -> dd.DataFrame:
df = pd.DataFrame(
{
"x": [1, 2, 3],
"y": [4, 5, 6],
"z": [7, 8, 9],
},
index=pd.Index(["a", "b", "c"], name="id"),
)
return dd.from_pandas(df, npartitions=1)

assert CreateData.image().script == textwrap.dedent(
load_script = CreateData.image().script

dataset = pipeline.read(
ref=CreateData,
produces={"x": pa.int32(), "y": pa.int32(), "z": pa.int32()},
)

return pipeline, dataset, load_script


def test_build_python_script(load_pipeline):
_, _, load_script = load_pipeline
assert load_script == textwrap.dedent(
"""\
from typing import *
import typing as t
Expand All @@ -43,6 +68,7 @@ def load(self) -> dd.DataFrame:
{
"x": [1, 2, 3],
"y": [4, 5, 6],
"z": [7, 8, 9],
},
index=pd.Index(["a", "b", "c"], name="id"),
)
Expand All @@ -51,31 +77,8 @@ def load(self) -> dd.DataFrame:
)


def test_lightweight_component_sdk():
pipeline = Pipeline(
name="dummy-pipeline",
base_path="./data",
)

@lightweight_component(
base_image="python:3.8-slim-buster",
extra_requires=["pandas", "dask"],
)
class CreateData(DaskLoadComponent):
def load(self) -> dd.DataFrame:
df = pd.DataFrame(
{
"x": [1, 2, 3],
"y": [4, 5, 6],
},
index=pd.Index(["a", "b", "c"], name="id"),
)
return dd.from_pandas(df, npartitions=1)

dataset = pipeline.read(
ref=CreateData,
produces={"x": pa.int32(), "y": pa.int32()},
)
def test_lightweight_component_sdk(load_pipeline):
pipeline, dataset, load_script = load_pipeline

assert len(pipeline._graph.keys()) == 1
operation_spec_dict = pipeline._graph["CreateData"][
Expand All @@ -90,10 +93,14 @@ def load(self) -> dd.DataFrame:
"produces": {"additionalProperties": True},
},
"consumes": {},
"produces": {"x": {"type": "int32"}, "y": {"type": "int32"}},
"produces": {
"x": {"type": "int32"},
"y": {"type": "int32"},
"z": {"type": "int32"},
},
}

@lightweight_component()
@lightweight_component(consumes="generic")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would prefer to have a empty consumes instead of passing "generic" here.

For me it would be fine to consume the whole dataset. It would make the usability less complex and reduce the efficiency of the component execution. We should keep the base interface as simple as possible. Pipeline improvements will probably following later during the development cycle.

I think we don't want to use the term "generic component". What is the issue when we pass None instead of the string?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the issue is there are still 3 general options that should be supported without the possibility to mix some of them together.

  • consumes is None -> non-generic component without specified fields -> consume all the fields in the dataset schema. Does not have an equivalence in the component spec definition of things.
  • consumes is specified as a list -> non-generic component with specified fields -> start from the fields in the dataset schema and filter based on the fields specified in the list. Equivalent to having specific fields in the component spec.
  • consumes == generic -> generic component that allows to define dynamic fields in the apply produces/consumes -> the fields to consumes are the ones specified in the apply section. Equivalent to setting additionalProperties to true in the component spec.

The only solution would be to somehow mix the 1st and 3rd option but this would require us to change the component spec to support both dynamic and specified fields which is not something we currently support

Copy link
Contributor

@PhilippeMoussalli PhilippeMoussalli Jan 23, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mrchtr updated based on the feedback, consumes is None now simply means to consume all fields from previous components. We lose the concept of additionalProperties for the consumes section in python based components. However I don't think it would be required since we're inferring the fields based on the dataset schema

This is more evident here

    @lightweight_component(
        base_image="python:3.8-slim-buster",
        extra_requires=["pandas", "dask"],
    )
    class CreateData(DaskLoadComponent):
        def load(self) -> dd.DataFrame:
               ....
    dataset = pipeline.read(
        ref=CreateData,
        produces={"x": pa.int32(), "y": pa.int32(), "z": pa.int32()},
    )
    # dataset schema has x,y,z 
        @lightweight_component
    class AddN(PandasTransformComponent):
        def __init__(self, n: int, **kwargs):
            self.n = n

        def transform(self, dataframe: pd.DataFrame) -> pd.DataFrame:
            dataframe["x"] = dataframe["x"].map(lambda x: x + self.n)
            return dataframe

    _ = dataset.apply(
        ref=AddN,
        produces={"x": pa.int32(), "y": pa.int32(), "z": pa.int32()},
        consumes=None,  # This now has to be defined as None since we can't define dynamic fields but we can already infer the schema based on the dataset
        arguments={"n": 1},
    )

I think both options are valuable with small tradeoffs.

  • The previous one would not require any changes between the consumes for the apply function for consumes but slightly more complex interface for the lightweight component
  • This approach offers more flexibility in the lightweight component with more flexibility, but slightly different way of defining consumes compared to resuable components

Happy to hear other takes on this @RobbeSneyders @GeorgesLorre

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree with not supporting consumes=generic. The additionalProperties: true is only needed because the user cannot easily change the schema of a reusable component. But they can easily change the schema of a lightweight one. So there's no need for it here.

Lightweight Python components can still be implemented in a generic way without it. It just means that the implementation of the component depends on the consumes argument it receives.

class AddN(PandasTransformComponent):
def __init__(self, n: int, **kwargs):
self.n = n
Expand All @@ -104,8 +111,8 @@ def transform(self, dataframe: pd.DataFrame) -> pd.DataFrame:

_ = dataset.apply(
ref=AddN,
produces={"x": pa.int32(), "y": pa.int32()},
consumes={"x": pa.int32(), "y": pa.int32()},
produces={"x": pa.int32(), "y": pa.int32(), "z": pa.int32()},
consumes={"x": pa.int32(), "y": pa.int32(), "z": pa.int32()},
arguments={"n": 1},
)
assert len(pipeline._graph.keys()) == 1 + 1
Expand All @@ -120,14 +127,88 @@ def transform(self, dataframe: pd.DataFrame) -> pd.DataFrame:
"produces": {"additionalProperties": True},
"args": {"n": {"type": "int"}},
},
"consumes": {"x": {"type": "int32"}, "y": {"type": "int32"}},
"produces": {"x": {"type": "int32"}, "y": {"type": "int32"}},
"consumes": {
"x": {"type": "int32"},
"y": {"type": "int32"},
"z": {"type": "int32"},
},
"produces": {
"x": {"type": "int32"},
"y": {"type": "int32"},
"z": {"type": "int32"},
},
}
pipeline._validate_pipeline_definition(run_id="dummy-run-id")

DockerCompiler().compile(pipeline)


def test_valid_consumes_mapping(tmp_path_factory, load_pipeline):
@lightweight_component(
base_image="python:3.8",
extra_requires=[
"fondant[component]@git+https://github.com/ml6team/fondant@main",
],
consumes=["a", "y"],
)
class AddN(PandasTransformComponent):
def __init__(self, n: int, **kwargs):
self.n = n

def transform(self, dataframe: pd.DataFrame) -> pd.DataFrame:
dataframe["a"] = dataframe["a"].map(lambda x: x + self.n)
return dataframe

pipeline, dataset, _ = load_pipeline

_ = dataset.apply(
ref=AddN,
consumes={"a": "x"},
produces={"a": pa.int32()},
arguments={"n": 1},
)

with tmp_path_factory.mktemp("temp") as fn:
output_path = str(fn / "kubeflow_pipeline.yml")
DockerCompiler().compile(pipeline=pipeline, output_path=output_path)
pipeline_configs = DockerComposeConfigs.from_spec(output_path)
operation_spec = OperationSpec.from_json(
pipeline_configs.component_configs["AddN"].arguments["operation_spec"],
)
assert all(k in ["a", "y"] for k in operation_spec.inner_consumes)
assert "z" not in operation_spec.inner_consumes


def test_invalid_consumes_mapping(tmp_path_factory, load_pipeline):
@lightweight_component(
base_image="python:3.8",
extra_requires=[
"fondant[component]@git+https://github.com/ml6team/fondant@main",
],
consumes=["nonExistingField"],
)
class AddN(PandasTransformComponent):
def __init__(self, n: int, **kwargs):
self.n = n

def transform(self, dataframe: pd.DataFrame) -> pd.DataFrame:
dataframe["a"] = dataframe["a"].map(lambda x: x + self.n)
return dataframe

_, dataset, _ = load_pipeline

with pytest.raises(
ValueError,
match="Field `nonExistingField` is not available in the dataset.",
):
_ = dataset.apply(
ref=AddN,
consumes={"a": "x"},
produces={"a": pa.int32()},
arguments={"n": 1},
)


def test_lightweight_component_missing_decorator():
pipeline = Pipeline(
name="dummy-pipeline",
Expand Down
Loading