Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support applying Lightweight Python components in Pipeline SDK #770

Merged
merged 5 commits into from
Jan 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 6 additions & 4 deletions examples/sample_pipeline/pipeline.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
# This file contains a sample pipeline. Loading data from a parquet file,
# using the load_from_parquet component, chain a custom dummy component, and use
# the reusable chunking component
import pyarrow as pa
from pathlib import Path

import pyarrow as pa

from fondant.pipeline import Pipeline

BASE_PATH = Path("./.artifacts").resolve()
Expand All @@ -17,7 +19,7 @@
}

dataset = pipeline.read(
name_or_path="load_from_parquet",
"load_from_parquet",
arguments={
"dataset_uri": "/data/sample.parquet",
"column_name_mapping": load_component_column_mapping,
Expand All @@ -27,11 +29,11 @@
)

dataset = dataset.apply(
name_or_path="./components/dummy_component",
"./components/dummy_component",
)

dataset.apply(
name_or_path="chunk_text",
"chunk_text",
arguments={"chunk_size": 10, "chunk_overlap": 2},
consumes={"text": "text_data"},
)
2 changes: 1 addition & 1 deletion src/fondant/build.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ def build_component( # ruff: noqa: PLR0912, PLR0915
msg,
)

component_op = ComponentOp(component_dir)
component_op = ComponentOp.from_component_yaml(component_dir)
component_spec = component_op.component_spec

if component_op.dockerfile_path is None:
Expand Down
68 changes: 63 additions & 5 deletions src/fondant/core/component_spec.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,11 +61,58 @@ class ComponentSpec:
Class representing a Fondant component specification.

Args:
specification: The fondant component specification as a Python dict
name: The name of the component
image: The docker image uri to use for the component
description: The description of the component
consumes: A mapping containing the fields consumed by the operation. The keys are the
names of the fields to be received by the component, while the values are the
type of the field.


produces: A mapping containing the fields produced by the operation. The keys are the
names of the fields to be produced by the component, while the values are the
type of the field to be written

arguments: A dictionary containing the argument name and value for the operation.

"""

def __init__(self, specification: t.Dict[str, t.Any]) -> None:
self._specification = copy.deepcopy(specification)
def __init__(
self,
name: str,
image: str,
*,
description: t.Optional[str] = None,
consumes: t.Optional[t.Dict[str, t.Union[str, pa.DataType]]] = None,
produces: t.Optional[t.Dict[str, t.Union[str, pa.DataType]]] = None,
previous_index: t.Optional[str] = None,
args: t.Optional[t.Dict[str, t.Any]] = None,
tags: t.Optional[t.List[str]] = None,
):
spec_dict: t.Dict[str, t.Any] = {
"name": name,
"image": image,
}

if description:
spec_dict["description"] = description

if tags:
spec_dict["tags"] = tags

if consumes:
spec_dict["consumes"] = consumes

if produces:
spec_dict["produces"] = produces

if previous_index:
spec_dict["previous_index"] = previous_index

if args:
spec_dict["args"] = args

self._specification = spec_dict
self._validate_spec()

def _validate_spec(self) -> None:
Expand Down Expand Up @@ -102,13 +149,22 @@ def from_file(cls, path: t.Union[str, Path]) -> "ComponentSpec":
"""Load the component spec from the file specified by the provided path."""
with open(path, encoding="utf-8") as file_:
specification = yaml.safe_load(file_)
return cls(specification)
return cls.from_dict(specification)

def to_file(self, path) -> None:
"""Dump the component spec to the file specified by the provided path."""
with open(path, "w", encoding="utf-8") as file_:
yaml.dump(self._specification, file_)

@classmethod
def from_dict(cls, component_spec_dict: t.Dict[str, t.Any]) -> "ComponentSpec":
"""Load the component spec from a dictionary."""
try:
return cls(**component_spec_dict)
except TypeError as e:
msg = f"Invalid component spec: {e}"
raise InvalidComponentSpec(msg)

@property
def name(self):
return self._specification["name"]
Expand Down Expand Up @@ -334,7 +390,9 @@ def _parse_mapping(
return json_mapping

return cls(
component_spec=ComponentSpec(operation_spec_dict["specification"]),
component_spec=ComponentSpec.from_dict(
operation_spec_dict["specification"],
),
consumes=_parse_mapping(operation_spec_dict["consumes"]),
produces=_parse_mapping(operation_spec_dict["produces"]),
)
Expand Down
5 changes: 3 additions & 2 deletions src/fondant/pipeline/__init__.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
from .lightweight_component import Image, PythonComponent, lightweight_component # noqa
from .pipeline import ( # noqa
VALID_ACCELERATOR_TYPES,
VALID_VERTEX_ACCELERATOR_TYPES,
ComponentOp,
Dataset,
Pipeline,
Resources,
VALID_ACCELERATOR_TYPES,
VALID_VERTEX_ACCELERATOR_TYPES,
)
42 changes: 42 additions & 0 deletions src/fondant/pipeline/lightweight_component.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
import typing as t
from dataclasses import dataclass
from functools import wraps


@dataclass
class Image:
base_image: str = "fondant:latest"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should link this to the installed version of Fondant once we add the CI/CD for this.

extra_requires: t.Optional[t.List[str]] = None
script: t.Optional[str] = None
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess the script is needed for the sagemaker runner?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For every runner, since the script is not baked into the image. In the docker & KfP runner, we'll probably include the script in the entrypoint instead of uploading it separately though.



class PythonComponent:
@classmethod
def image(cls) -> Image:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any reason for this not to be a (class)property?

raise NotImplementedError


def lightweight_component(
extra_requires: t.Optional[t.List[str]] = None,
base_image: t.Optional[str] = None,
):
"""Decorator to enable a python component."""

def wrapper(cls):
kwargs = {}
if base_image:
kwargs["base_image"] = base_image
if extra_requires:
kwargs["extra_requires"] = extra_requires
image = Image(**kwargs)

# updated=() is needed to prevent an attempt to update the class's __dict__
@wraps(cls, updated=())
class AppliedPythonComponent(cls, PythonComponent):
@classmethod
def image(cls) -> Image:
return image

return AppliedPythonComponent

return wrapper
Loading
Loading