Skip to content

Commit

Permalink
Add support for applying transforms
Browse files Browse the repository at this point in the history
  • Loading branch information
GeorgesLorre committed Jan 15, 2024
1 parent 308e59d commit e16e6b1
Show file tree
Hide file tree
Showing 5 changed files with 158 additions and 84 deletions.
2 changes: 1 addition & 1 deletion src/fondant/build.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ def build_component( # ruff: noqa: PLR0912, PLR0915
msg,
)

component_op = ComponentOp(component_dir)
component_op = ComponentOp.from_component_yaml(component_dir)
component_spec = component_op.component_spec

if component_op.dockerfile_path is None:
Expand Down
94 changes: 51 additions & 43 deletions src/fondant/core/component_spec.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,11 +61,54 @@ class ComponentSpec:
Class representing a Fondant component specification.
Args:
specification: The fondant component specification as a Python dict
name: The name of the component
image: The docker image uri to use for the component
description: The description of the component
consumes: A mapping containing the fields consumed by the operation. The keys are the
names of the fields to be received by the component, while the values are the
type of the field.
produces: A mapping containing the fields produced by the operation. The keys are the
names of the fields to be produced by the component, while the values are the
type of the field to be written
arguments: A dictionary containing the argument name and value for the operation.
"""

def __init__(self, specification: t.Dict[str, t.Any]) -> None:
self._specification = copy.deepcopy(specification)
def __init__(
self,
name: str,
image: str,
*,
description: t.Optional[str] = None,
consumes: t.Optional[t.Dict[str, t.Union[str, pa.DataType]]] = None,
produces: t.Optional[t.Dict[str, t.Union[str, pa.DataType]]] = None,
arguments: t.Optional[t.Dict[str, t.Any]] = None,
):
spec_dict: t.Dict[str, t.Any] = {
"name": name,
"image": image,
}

if description:
spec_dict["description"] = description

if consumes:
spec_dict["consumes"] = consumes
else:
spec_dict["consumes"] = {"additionalProperties": True}

if produces:
spec_dict["produces"] = produces
else:
spec_dict["produces"] = {"additionalProperties": True}

if arguments:
spec_dict["args"] = arguments

self._specification = spec_dict
self._validate_spec()

def _validate_spec(self) -> None:
Expand Down Expand Up @@ -102,7 +145,7 @@ def from_file(cls, path: t.Union[str, Path]) -> "ComponentSpec":
"""Load the component spec from the file specified by the provided path."""
with open(path, encoding="utf-8") as file_:
specification = yaml.safe_load(file_)
return cls(specification)
return cls.from_dict(specification)

def to_file(self, path) -> None:
"""Dump the component spec to the file specified by the provided path."""
Expand All @@ -112,44 +155,7 @@ def to_file(self, path) -> None:
@classmethod
def from_dict(cls, component_spec_dict: t.Dict[str, t.Any]) -> "ComponentSpec":
"""Load the component spec from a dictionary."""
return cls(component_spec_dict)

@classmethod
def from_args(
cls,
name: str,
image: str,
*,
description: t.Optional[str] = None,
consumes: t.Optional[t.Dict[str, t.Union[str, pa.DataType]]] = None,
produces: t.Optional[t.Dict[str, t.Union[str, pa.DataType]]] = None,
arguments: t.Optional[t.Dict[str, t.Any]] = None,
) -> "ComponentSpec":
"""Load the component spec from arguments."""
# TODO make this the __init__ method

spec_dict = {
"name": name,
"image": image,
}

if description:
spec_dict["description"] = description

if consumes:
spec_dict["consumes"] = consumes
else:
spec_dict["consumes"] = {"additionalProperties": True}

if produces:
spec_dict["produces"] = produces
else:
spec_dict["produces"] = {"additionalProperties": True}

if arguments:
spec_dict["args"] = arguments

return cls(spec_dict)
return cls(**component_spec_dict)

@property
def name(self):
Expand Down Expand Up @@ -376,7 +382,9 @@ def _parse_mapping(
return json_mapping

return cls(
component_spec=ComponentSpec(operation_spec_dict["specification"]),
component_spec=ComponentSpec.from_dict(
operation_spec_dict["specification"],
),
consumes=_parse_mapping(operation_spec_dict["consumes"]),
produces=_parse_mapping(operation_spec_dict["produces"]),
)
Expand Down
10 changes: 6 additions & 4 deletions src/fondant/pipeline/lightweight_component.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,12 @@ def lightweight_component(
"""Decorator to enable a python component."""

def wrapper(cls):
image = Image(
base_image=base_image,
extra_requires=extra_requires,
)
kwargs = {}
if base_image:
kwargs["base_image"] = base_image
if extra_requires:
kwargs["extra_requires"] = extra_requires
image = Image(**kwargs)

# updated=() is needed to prevent an attempt to update the class's __dict__
@wraps(cls, updated=())
Expand Down
116 changes: 82 additions & 34 deletions src/fondant/pipeline/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -227,10 +227,13 @@ def _configure_caching_from_image_tag(

return cache

@property
def dockerfile_path(self) -> t.Optional[Path]:
path = self.component_dir / "Dockerfile"
return path if path.exists() else None
def dockerfile_path(self, path: t.Union[str, Path]) -> t.Optional[Path]:
if self._is_custom_component(path):
component_dir = Path(path)
else:
component_dir = self._get_registry_path(str(path))
docker_path = component_dir / "Dockerfile"
return docker_path if docker_path.exists() else None

@staticmethod
def _is_custom_component(path_or_name: t.Union[str, Path]) -> bool:
Expand Down Expand Up @@ -339,7 +342,7 @@ def register_operation(

def read(
self,
ref: t.Union[str, Path, t.Type["BaseComponent"]],
ref: t.Any,
*,
produces: t.Optional[t.Dict[str, t.Union[str, pa.DataType]]] = None,
arguments: t.Optional[t.Dict[str, t.Any]] = None,
Expand Down Expand Up @@ -376,7 +379,7 @@ def read(
msg,
)

if isinstance(ref(), BaseComponent):
if issubclass(ref, BaseComponent):
if not ref.is_python_component():
err = """Only Python components are currently supported.
Make sure your class is decorated with the @python_component decorator."""
Expand All @@ -385,7 +388,7 @@ def read(
image = ref.image()
description = ref.__doc__ or "python component"

component_spec = ComponentSpec.from_args(
component_spec = ComponentSpec(
name,
image.base_image, # TODO: revisit
description=description,
Expand Down Expand Up @@ -582,7 +585,7 @@ def _apply(self, operation: ComponentOp) -> "Dataset":

def apply(
self,
name_or_path: t.Union[str, Path],
ref: t.Any,
*,
consumes: t.Optional[t.Dict[str, t.Union[str, pa.DataType]]] = None,
produces: t.Optional[t.Dict[str, t.Union[str, pa.DataType]]] = None,
Expand All @@ -597,8 +600,8 @@ def apply(
Apply the provided component on the dataset.
Args:
name_or_path: The name of a reusable component, or the path to the directory containing
a custom component.
ref: The name of a reusable component, or the path to the directory containing
a custom component, or a python component class.
consumes: A mapping to update the fields consumed by the operation as defined in the
component spec. The keys are the names of the fields to be received by the
component, while the values are the type of the field, or the name of the field to
Expand Down Expand Up @@ -682,22 +685,45 @@ def apply(
Returns:
An intermediate dataset.
"""
operation = ComponentOp(
name_or_path,
consumes=consumes,
produces=produces,
arguments=arguments,
input_partition_rows=input_partition_rows,
resources=resources,
cache=cache,
cluster_type=cluster_type,
client_kwargs=client_kwargs,
)
if issubclass(ref, BaseComponent):
if not ref.is_python_component():
err = """Only Python components are currently supported.
Make sure your class is decorated with the @python_component decorator."""
raise ValueError(err)
name = ref.__name__
image = ref.image()
description = ref.__doc__ or "python component"

component_spec = ComponentSpec(
name,
image.base_image, # TODO: revisit
description=description,
consumes={},
produces={},
)

operation = ComponentOp(
name,
image,
component_spec,
consumes=consumes,
produces=produces,
arguments=arguments,
input_partition_rows=input_partition_rows,
resources=resources,
cache=cache,
cluster_type=cluster_type,
client_kwargs=client_kwargs,
)

else:
operation = ComponentOp.from_component_yaml(ref)

return self._apply(operation)

def write(
self,
name_or_path: t.Union[str, Path],
ref: t.Any,
*,
consumes: t.Optional[t.Dict[str, t.Union[str, pa.DataType]]] = None,
arguments: t.Optional[t.Dict[str, t.Any]] = None,
Expand All @@ -711,8 +737,8 @@ def write(
Write the dataset using the provided component.
Args:
name_or_path: The name of a reusable component, or the path to the directory containing
a custom component.
ref: The name of a reusable component, or the path to the directory containing
a custom component, or a python component class.
consumes: A mapping to update the fields consumed by the operation as defined in the
component spec. The keys are the names of the fields to be received by the
component, while the values are the type of the field, or the name of the field to
Expand All @@ -728,14 +754,36 @@ def write(
Returns:
An intermediate dataset.
"""
operation = ComponentOp(
name_or_path,
consumes=consumes,
arguments=arguments,
input_partition_rows=input_partition_rows,
resources=resources,
cache=cache,
cluster_type=cluster_type,
client_kwargs=client_kwargs,
)
if issubclass(ref, BaseComponent):
if not ref.is_python_component():
err = """Only Python components are currently supported.
Make sure your class is decorated with the @python_component decorator."""
raise ValueError(err)
name = ref.__name__
image = ref.image()
description = ref.__doc__ or "python component"

component_spec = ComponentSpec(
name,
image.base_image, # TODO: revisit
description=description,
consumes={},
produces={},
)

operation = ComponentOp(
name,
image,
component_spec,
consumes=consumes,
arguments=arguments,
input_partition_rows=input_partition_rows,
resources=resources,
cache=cache,
cluster_type=cluster_type,
client_kwargs=client_kwargs,
)

else:
operation = ComponentOp.from_component_yaml(ref)
self._apply(operation)
20 changes: 18 additions & 2 deletions tests/pipeline/test_python_component.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import dask.dataframe as dd
import pandas as pd
import pyarrow as pa
from fondant.component import DaskLoadComponent
from fondant.component import DaskLoadComponent, PandasTransformComponent
from fondant.pipeline import Pipeline, lightweight_component


Expand All @@ -26,7 +26,23 @@ def load(self) -> dd.DataFrame:
)
return dd.from_pandas(df, npartitions=1)

pipeline.read(
dataset = pipeline.read(
ref=CreateData,
produces={"x": pa.int32(), "y": pa.int32()},
)

@lightweight_component()
class AddN(PandasTransformComponent):
def __init__(self, n: int):
self.n = n

def transform(self, dataframe: pd.DataFrame) -> pd.DataFrame:
dataframe["x"] = dataframe["x"].map(lambda x: x + self.n)
return dataframe

_ = dataset.apply(
ref=AddN,
produces={"x": pa.int32(), "y": pa.int32()},
consumes={"x": pa.int32(), "y": pa.int32()},
arguments={"n": 1},
)

0 comments on commit e16e6b1

Please sign in to comment.