Skip to content

Commit

Permalink
Start from dataset schema for lightweight python component consumes (
Browse files Browse the repository at this point in the history
…#789)

Fixes #785 

Opening this as a draft PR since it's not yet clear to me what the
desired behavior is.

I'll be using the "inner" / "outer" terminology which we already use in
our `OperationSpec` class to explain. "inner" schema's are the schema's
that the Python component consumes / produces. "outer" schema's are the
schema's that the `DataIO` layer consumes / produces.

For docker components, the logic works as follows:
1. The `consumes` section in the component spec is the "inner" schema
2. We leverage the `consumes` argument of the `apply` method to
calculate the "outer" schema from the "inner" schema.

For lightweight python components, we do not have a component spec to
start from. So what I currently implemented is this:
1. We start from the dataset schema and reverse alter it with the
`consumes` argument to calculate the "inner" schema.
2. We leverage the `consumes` argument of the `apply` method to
calculate the "outer" schema from the "inner" schema.

This works, but has one big downside. Since we start from the dataset
schema, the calculated "inner" / "outer" consumes contain all the fields
in the dataset. In other words, the lack of a component spec removes the
ability to select which columns from the dataset to load. Since this is
an important part of our optimization, I think we need to find a way
around this.

My best idea at this time is to expand the `lightweight_component`
decorator to add support for this. But curious to hear if anyone has
other ideas.

---------

Co-authored-by: Philippe Moussalli <[email protected]>
Co-authored-by: Georges Lorré <[email protected]>
  • Loading branch information
3 people authored Jan 30, 2024
1 parent 115fe9f commit 9070b82
Show file tree
Hide file tree
Showing 6 changed files with 271 additions and 54 deletions.
2 changes: 1 addition & 1 deletion components/load_from_hf_hub/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ RUN pip3 install --no-cache-dir -r requirements.txt
# Install Fondant
# This is split from other requirements to leverage caching
ARG FONDANT_VERSION=main
RUN pip3 install fondant[component,aws,azure,gcp]@git+https://github.com/ml6team/fondant@${FONDANT_VERSION}
RUN pip3 install fondant[component,aws,azure,gcp]@git+https://github.com/ml6team/fondant@d87efb9d37fbec8e86b5fc20a6ab480ff67895af

# Set the working directory to the component folder
WORKDIR /component/src
Expand Down
1 change: 0 additions & 1 deletion examples/sample_pipeline/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,6 @@ def transform(self, dataframe: pd.DataFrame) -> pd.DataFrame:

_ = dataset.apply(
ref=CalculateChunkLength,
consumes={"text": pa.string()},
produces={"chunk_length": pa.int32()},
arguments={"arg_x": "value_x"},
)
10 changes: 5 additions & 5 deletions src/fondant/core/component_spec.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,8 +93,8 @@ def __init__(
image: str,
*,
description: t.Optional[str] = None,
consumes: t.Optional[t.Dict[str, t.Union[str, pa.DataType, bool]]] = None,
produces: t.Optional[t.Dict[str, t.Union[str, pa.DataType, bool]]] = None,
consumes: t.Optional[t.Mapping[str, t.Union[str, pa.DataType, bool]]] = None,
produces: t.Optional[t.Mapping[str, t.Union[str, pa.DataType, bool]]] = None,
previous_index: t.Optional[str] = None,
args: t.Optional[t.Dict[str, t.Any]] = None,
tags: t.Optional[t.List[str]] = None,
Expand Down Expand Up @@ -443,7 +443,7 @@ def _inner_mapping(self, name: str) -> t.Mapping[str, Field]:
else:
msg = (
f"Received pyarrow DataType value {value} for key {key} in the "
f"`{name}` argument passed to the operation, but {key} is "
f"`{name}` argument passed to the operation, but `{key}` is "
f"already defined in the `{name}` section of the component spec "
f"with type {spec_type}"
)
Expand Down Expand Up @@ -475,8 +475,8 @@ def _outer_mapping(self, name: str) -> t.Mapping[str, Field]:
mapping[value] = Field(name=value, type=mapping.pop(key).type)
else:
msg = (
f"Received a string value for key {key} in the `{name}` "
f"argument passed to the operation, but {key} is not defined in "
f"Received a string value for key `{key}` in the `{name}` "
f"argument passed to the operation, but `{key}` is not defined in "
f"the `{name}` section of the component spec."
)
raise InvalidPipelineDefinition(msg)
Expand Down
75 changes: 74 additions & 1 deletion src/fondant/pipeline/lightweight_component.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,13 @@
from functools import wraps
from importlib import metadata

import pyarrow as pa

from fondant.component import BaseComponent, Component
from fondant.core.schema import Field, Type

logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)


MIN_PYTHON_VERSION = (3, 8)
MAX_PYTHON_VERSION = (3, 11)
Expand Down Expand Up @@ -67,11 +70,77 @@ class LightweightComponent(BaseComponent):
def image(cls) -> Image:
raise NotImplementedError

@classmethod
def consumes(cls) -> t.Optional[t.Dict[str, t.Any]]:
pass

@classmethod
def modify_spec_consumes(
cls,
spec_consumes: t.Dict[str, t.Any],
apply_consumes: t.Optional[t.Dict[str, t.Union[str, pa.DataType]]],
):
"""Modify fields based on the consumes argument in the 'apply' method."""
if apply_consumes:
for k, v in apply_consumes.items():
if isinstance(v, str):
spec_consumes[k] = spec_consumes.pop(v)
else:
msg = (
f"Invalid data type for field `{k}` in the `apply_consumes` "
f"argument. Only string types are allowed."
)
raise ValueError(
msg,
)
return spec_consumes

@classmethod
def get_spec_consumes(
cls,
dataset_fields: t.Mapping[str, Field],
apply_consumes: t.Optional[t.Dict[str, t.Union[str, pa.DataType]]] = None,
):
"""
Function that get the consumes spec for the component based on the dataset fields and
the apply_consumes argument.
Args:
dataset_fields: The fields of the dataset.
apply_consumes: The consumes argument in the apply method.
Returns:
The consumes spec for the component.
"""
consumes = cls.consumes()

if consumes is None:
# Get consumes spec from the dataset
spec_consumes = {k: v.type.to_dict() for k, v in dataset_fields.items()}

spec_consumes = cls.modify_spec_consumes(spec_consumes, apply_consumes)

logger.warning(
"No consumes defined. Consumes will be inferred from the dataset."
"All field will be consumed which may lead to additional computation,"
"Consider defining consumes in the component.\n Consumes: %s",
spec_consumes,
)

else:
spec_consumes = {
k: (Type(v).to_dict() if k != "additionalProperties" else v)
for k, v in consumes.items()
}

return spec_consumes


def lightweight_component(
*args,
extra_requires: t.Optional[t.List[str]] = None,
base_image: t.Optional[str] = None,
consumes: t.Optional[t.Dict[str, t.Any]] = None,
):
"""Decorator to enable a lightweight component."""

Expand Down Expand Up @@ -160,6 +229,10 @@ class LightweightComponentOp(cls, LightweightComponent):
def image(cls) -> Image:
return image

@classmethod
def consumes(cls) -> t.Optional[t.Dict[str, t.Dict[t.Any, t.Any]]]:
return consumes

return LightweightComponentOp

# Call wrapper with function (`args[0]`) when no additional arguments were passed
Expand Down
23 changes: 21 additions & 2 deletions src/fondant/pipeline/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -205,22 +205,39 @@ def from_component_yaml(cls, path, **kwargs) -> "ComponentOp":
)

@classmethod
def from_ref(cls, ref: t.Any, **kwargs) -> "ComponentOp":
def from_ref(
cls,
ref: t.Any,
fields: t.Optional[t.Mapping[str, Field]] = None,
**kwargs,
) -> "ComponentOp":
"""Create a ComponentOp from a reference. The reference can
be a reusable component name, a path to a custom component,
or a python component class.
Args:
ref: The name of a reusable component, or the path to the directory containing
a custom component, or a python component class.
fields: The fields of the dataset available to the component.
**kwargs: The provided user arguments are passed in as keyword arguments
"""
if inspect.isclass(ref) and issubclass(ref, BaseComponent):
if issubclass(ref, LightweightComponent):
name = ref.__name__
image = ref.image()
description = ref.__doc__ or "lightweight component"

consumes_spec = (
ref.get_spec_consumes(fields, kwargs["consumes"])
if fields
else {"additionalProperties": True}
)

component_spec = ComponentSpec(
name,
image.base_image,
description=description,
consumes={"additionalProperties": True},
consumes=consumes_spec,
produces={"additionalProperties": True},
args={
name: arg.to_spec()
Expand Down Expand Up @@ -726,6 +743,7 @@ def apply(
"""
operation = ComponentOp.from_ref(
ref,
fields=self.fields,
produces=produces,
consumes=consumes,
arguments=arguments,
Expand Down Expand Up @@ -773,6 +791,7 @@ def write(
"""
operation = ComponentOp.from_ref(
ref,
fields=self.fields,
consumes=consumes,
arguments=arguments,
input_partition_rows=input_partition_rows,
Expand Down
Loading

0 comments on commit 9070b82

Please sign in to comment.