Skip to content

Commit

Permalink
Support applying Lightweight Python components in Pipeline SDK (#770)
Browse files Browse the repository at this point in the history
  • Loading branch information
GeorgesLorre authored Jan 16, 2024
1 parent 410c3f6 commit abcd36f
Show file tree
Hide file tree
Showing 13 changed files with 360 additions and 99 deletions.
10 changes: 6 additions & 4 deletions examples/sample_pipeline/pipeline.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
# This file contains a sample pipeline. Loading data from a parquet file,
# using the load_from_parquet component, chain a custom dummy component, and use
# the reusable chunking component
import pyarrow as pa
from pathlib import Path

import pyarrow as pa

from fondant.pipeline import Pipeline

BASE_PATH = Path("./.artifacts").resolve()
Expand All @@ -17,7 +19,7 @@
}

dataset = pipeline.read(
name_or_path="load_from_parquet",
"load_from_parquet",
arguments={
"dataset_uri": "/data/sample.parquet",
"column_name_mapping": load_component_column_mapping,
Expand All @@ -27,11 +29,11 @@
)

dataset = dataset.apply(
name_or_path="./components/dummy_component",
"./components/dummy_component",
)

dataset.apply(
name_or_path="chunk_text",
"chunk_text",
arguments={"chunk_size": 10, "chunk_overlap": 2},
consumes={"text": "text_data"},
)
2 changes: 1 addition & 1 deletion src/fondant/build.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ def build_component( # ruff: noqa: PLR0912, PLR0915
msg,
)

component_op = ComponentOp(component_dir)
component_op = ComponentOp.from_component_yaml(component_dir)
component_spec = component_op.component_spec

if component_op.dockerfile_path is None:
Expand Down
68 changes: 63 additions & 5 deletions src/fondant/core/component_spec.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,11 +61,58 @@ class ComponentSpec:
Class representing a Fondant component specification.
Args:
specification: The fondant component specification as a Python dict
name: The name of the component
image: The docker image uri to use for the component
description: The description of the component
consumes: A mapping containing the fields consumed by the operation. The keys are the
names of the fields to be received by the component, while the values are the
type of the field.
produces: A mapping containing the fields produced by the operation. The keys are the
names of the fields to be produced by the component, while the values are the
type of the field to be written
arguments: A dictionary containing the argument name and value for the operation.
"""

def __init__(self, specification: t.Dict[str, t.Any]) -> None:
self._specification = copy.deepcopy(specification)
def __init__(
self,
name: str,
image: str,
*,
description: t.Optional[str] = None,
consumes: t.Optional[t.Dict[str, t.Union[str, pa.DataType]]] = None,
produces: t.Optional[t.Dict[str, t.Union[str, pa.DataType]]] = None,
previous_index: t.Optional[str] = None,
args: t.Optional[t.Dict[str, t.Any]] = None,
tags: t.Optional[t.List[str]] = None,
):
spec_dict: t.Dict[str, t.Any] = {
"name": name,
"image": image,
}

if description:
spec_dict["description"] = description

if tags:
spec_dict["tags"] = tags

if consumes:
spec_dict["consumes"] = consumes

if produces:
spec_dict["produces"] = produces

if previous_index:
spec_dict["previous_index"] = previous_index

if args:
spec_dict["args"] = args

self._specification = spec_dict
self._validate_spec()

def _validate_spec(self) -> None:
Expand Down Expand Up @@ -102,13 +149,22 @@ def from_file(cls, path: t.Union[str, Path]) -> "ComponentSpec":
"""Load the component spec from the file specified by the provided path."""
with open(path, encoding="utf-8") as file_:
specification = yaml.safe_load(file_)
return cls(specification)
return cls.from_dict(specification)

def to_file(self, path) -> None:
"""Dump the component spec to the file specified by the provided path."""
with open(path, "w", encoding="utf-8") as file_:
yaml.dump(self._specification, file_)

@classmethod
def from_dict(cls, component_spec_dict: t.Dict[str, t.Any]) -> "ComponentSpec":
"""Load the component spec from a dictionary."""
try:
return cls(**component_spec_dict)
except TypeError as e:
msg = f"Invalid component spec: {e}"
raise InvalidComponentSpec(msg)

@property
def name(self):
return self._specification["name"]
Expand Down Expand Up @@ -334,7 +390,9 @@ def _parse_mapping(
return json_mapping

return cls(
component_spec=ComponentSpec(operation_spec_dict["specification"]),
component_spec=ComponentSpec.from_dict(
operation_spec_dict["specification"],
),
consumes=_parse_mapping(operation_spec_dict["consumes"]),
produces=_parse_mapping(operation_spec_dict["produces"]),
)
Expand Down
5 changes: 3 additions & 2 deletions src/fondant/pipeline/__init__.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
from .lightweight_component import Image, PythonComponent, lightweight_component # noqa
from .pipeline import ( # noqa
VALID_ACCELERATOR_TYPES,
VALID_VERTEX_ACCELERATOR_TYPES,
ComponentOp,
Dataset,
Pipeline,
Resources,
VALID_ACCELERATOR_TYPES,
VALID_VERTEX_ACCELERATOR_TYPES,
)
42 changes: 42 additions & 0 deletions src/fondant/pipeline/lightweight_component.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
import typing as t
from dataclasses import dataclass
from functools import wraps


@dataclass
class Image:
base_image: str = "fondant:latest"
extra_requires: t.Optional[t.List[str]] = None
script: t.Optional[str] = None


class PythonComponent:
@classmethod
def image(cls) -> Image:
raise NotImplementedError


def lightweight_component(
extra_requires: t.Optional[t.List[str]] = None,
base_image: t.Optional[str] = None,
):
"""Decorator to enable a python component."""

def wrapper(cls):
kwargs = {}
if base_image:
kwargs["base_image"] = base_image
if extra_requires:
kwargs["extra_requires"] = extra_requires
image = Image(**kwargs)

# updated=() is needed to prevent an attempt to update the class's __dict__
@wraps(cls, updated=())
class AppliedPythonComponent(cls, PythonComponent):
@classmethod
def image(cls) -> Image:
return image

return AppliedPythonComponent

return wrapper
Loading

0 comments on commit abcd36f

Please sign in to comment.