Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Start from dataset schema for lightweight python component consumes #789

Merged
merged 22 commits into from
Jan 30, 2024
Merged
Show file tree
Hide file tree
Changes from 16 commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
7a6b78a
Integrate argument inference
RobbeSneyders Jan 17, 2024
c007244
Add compilation to python component test
RobbeSneyders Jan 17, 2024
e52a5c4
Add argument inference to integration test
RobbeSneyders Jan 17, 2024
161f214
Start from dataset schema for python component consumes
RobbeSneyders Jan 17, 2024
66a9103
add option to define consumes in mapping
PhilippeMoussalli Jan 22, 2024
2e10af1
add option to define consumes and generic in mapping
PhilippeMoussalli Jan 22, 2024
e8d763f
Merge branch 'feature/python-consumes-mapping-3' into feature/python-…
PhilippeMoussalli Jan 22, 2024
6619b3a
small fixes
PhilippeMoussalli Jan 22, 2024
d898e4a
make lightweight consumes generic by default
PhilippeMoussalli Jan 23, 2024
2d80a77
Merge branch 'main' into feature/python-consumes-mapping
PhilippeMoussalli Jan 23, 2024
cef482a
revert to desired behaviour
PhilippeMoussalli Jan 23, 2024
8c9d154
update sample pipeline
PhilippeMoussalli Jan 23, 2024
4c97282
update based on feedback
PhilippeMoussalli Jan 23, 2024
3ab1bae
implement PR feedback
PhilippeMoussalli Jan 25, 2024
b59fb8c
add docstrings
PhilippeMoussalli Jan 25, 2024
de5a3c1
update consumes based on new proposal
PhilippeMoussalli Jan 30, 2024
3943c4b
Merge branch 'main' into feature/python-consumes-mapping
PhilippeMoussalli Jan 30, 2024
d8e5563
Update src/fondant/pipeline/lightweight_component.py
PhilippeMoussalli Jan 30, 2024
85f0994
enable default behavior of passing all dataset fields
PhilippeMoussalli Jan 30, 2024
5b69298
implement PR feedback
PhilippeMoussalli Jan 30, 2024
12c6f37
Merge branch 'main' into feature/python-consumes-mapping
GeorgesLorre Jan 30, 2024
60dc6f6
Merge branch 'main' into feature/python-consumes-mapping
PhilippeMoussalli Jan 30, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion examples/sample_pipeline/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,6 @@ def transform(self, dataframe: pd.DataFrame) -> pd.DataFrame:

_ = dataset.apply(
ref=CalculateChunkLength,
consumes={"text": pa.string()},
produces={"chunk_length": pa.int32()},
arguments={"arg_x": "value_x"},
)
10 changes: 5 additions & 5 deletions src/fondant/core/component_spec.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,8 +94,8 @@ def __init__(
image: str,
*,
description: t.Optional[str] = None,
consumes: t.Optional[t.Dict[str, t.Union[str, pa.DataType, bool]]] = None,
produces: t.Optional[t.Dict[str, t.Union[str, pa.DataType, bool]]] = None,
consumes: t.Optional[t.Mapping[str, t.Union[str, pa.DataType, bool]]] = None,
produces: t.Optional[t.Mapping[str, t.Union[str, pa.DataType, bool]]] = None,
previous_index: t.Optional[str] = None,
args: t.Optional[t.Dict[str, t.Any]] = None,
tags: t.Optional[t.List[str]] = None,
Expand Down Expand Up @@ -462,7 +462,7 @@ def _inner_mapping(self, name: str) -> t.Mapping[str, Field]:
else:
msg = (
f"Received pyarrow DataType value {value} for key {key} in the "
f"`{name}` argument passed to the operation, but {key} is "
f"`{name}` argument passed to the operation, but `{key}` is "
f"already defined in the `{name}` section of the component spec "
f"with type {spec_type}"
)
Expand Down Expand Up @@ -494,8 +494,8 @@ def _outer_mapping(self, name: str) -> t.Mapping[str, Field]:
mapping[value] = Field(name=value, type=mapping.pop(key).type)
else:
msg = (
f"Received a string value for key {key} in the `{name}` "
f"argument passed to the operation, but {key} is not defined in "
f"Received a string value for key `{key}` in the `{name}` "
f"argument passed to the operation, but `{key}` is not defined in "
f"the `{name}` section of the component spec."
)
raise InvalidPipelineDefinition(msg)
Expand Down
39 changes: 39 additions & 0 deletions src/fondant/pipeline/lightweight_component.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,15 @@
import inspect
import itertools
import logging
import textwrap
import typing as t
from dataclasses import asdict, dataclass
from functools import wraps

from fondant.component import BaseComponent, Component
from fondant.core.schema import Field, Type

logger = logging.getLogger(__name__)


@dataclass
Expand All @@ -28,11 +32,42 @@ class PythonComponent(BaseComponent):
def image(cls) -> Image:
raise NotImplementedError

@classmethod
def consumes(cls) -> t.Optional[t.Dict[str, t.Any]]:
pass

@classmethod
def get_consumes_spec(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think a docstring would be good.

cls,
dataset_fields: t.Mapping[str, Field],
):
consumes = cls.consumes()

if consumes is None:
# Get consumes spec from the dataset
spec_consumes = {k: v.type.to_dict() for k, v in dataset_fields.items()}

logger.warning(
"No consumes defined. Consumes will be inferred from the dataset."
"All field will be consumed which may lead to additional computation,"
"Consider defining consumes in the component.\n Consumes: %s",
spec_consumes,
)

elif consumes == {"additionalProperties": True}:
spec_consumes = consumes

else:
spec_consumes = {k: Type(v).to_dict() for k, v in consumes.items()}
PhilippeMoussalli marked this conversation as resolved.
Show resolved Hide resolved

return spec_consumes


def lightweight_component(
*args,
extra_requires: t.Optional[t.List[str]] = None,
base_image: t.Optional[str] = None,
consumes: t.Optional[t.Dict[str, t.Any]] = None,
):
"""Decorator to enable a python component."""

Expand Down Expand Up @@ -121,6 +156,10 @@ class PythonComponentOp(cls, PythonComponent):
def image(cls) -> Image:
return image

@classmethod
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can make this a class property by combining the classmethod and property decorators.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmm it doesn't seem correct, should I apply getters and setters

image

Copy link
Member Author

@RobbeSneyders RobbeSneyders Jan 25, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It should be reversed, but seems like it only works for Python 3.9 and 3.10 (docs).

Just making it a class attribute could work as well:

class BaseClass:
    consumes: ConsumesType

class Class(BaseClass):
    consumes=consumes_  # cannot be the same name

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmm I still don't quite follow what should be done here, aren't the class methods needed for the decorators? what's the need for attributes in this example?

def consumes(cls) -> t.Optional[t.Dict[str, t.Dict[t.Any, t.Any]]]:
return consumes

return PythonComponentOp

# Call wrapper with function (`args[0]`) when no additional arguments were passed
Expand Down
23 changes: 21 additions & 2 deletions src/fondant/pipeline/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -206,22 +206,39 @@ def from_component_yaml(cls, path, **kwargs) -> "ComponentOp":
)

@classmethod
def from_ref(cls, ref: t.Any, **kwargs) -> "ComponentOp":
def from_ref(
cls,
ref: t.Any,
fields: t.Optional[t.Mapping[str, Field]] = None,
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would prefer to keep this fields argument out of here since this is specific to the lightweight Python components. Can we move this to the PythonComponent class instead?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't see a straightforwards way of doing this, unless we somehow pass the fields to the BaseComponent class since this is what the PythonComponent starts from but also not sure how feasible that is. Any other suggestions?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My original implementation did this in the Dataset class. So I would assume we can just call a method on the PythonComponent class at that point?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, this might make less sense after the refactoring on main since my first commits. If we can address my comment above, it's fine for me to keep it like this for now. Would be good to add the argument to the docstring though.

**kwargs,
) -> "ComponentOp":
"""Create a ComponentOp from a reference. The reference can
be a reusable component name, a path to a custom component,
or a python component class.

Args:
ref: The name of a reusable component, or the path to the directory containing
a custom component, or a python component class.
fields: The fields of the dataset available to the component.
**kwargs: The provided user arguments are passed in as keyword arguments
"""
if inspect.isclass(ref) and issubclass(ref, BaseComponent):
if issubclass(ref, PythonComponent):
name = ref.__name__
image = ref.image()
description = ref.__doc__ or "python component"

consumes_spec = (
ref.get_consumes_spec(fields)
if fields
else {"additionalProperties": True}
)

component_spec = ComponentSpec(
name,
image.base_image,
description=description,
consumes={"additionalProperties": True},
consumes=consumes_spec,
produces={"additionalProperties": True},
args={
name: arg.to_spec()
Expand Down Expand Up @@ -724,6 +741,7 @@ def apply(
"""
operation = ComponentOp.from_ref(
ref,
fields=self.fields,
produces=produces,
consumes=consumes,
arguments=arguments,
Expand Down Expand Up @@ -771,6 +789,7 @@ def write(
"""
operation = ComponentOp.from_ref(
ref,
fields=self.fields,
consumes=consumes,
arguments=arguments,
input_partition_rows=input_partition_rows,
Expand Down
Loading