Skip to content

Commit

Permalink
Cleanup and move things around
Browse files Browse the repository at this point in the history
  • Loading branch information
GeorgesLorre committed Jan 11, 2024
1 parent 28c6990 commit 308e59d
Show file tree
Hide file tree
Showing 8 changed files with 104 additions and 65 deletions.
1 change: 0 additions & 1 deletion src/fondant/component/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,4 @@
DaskTransformComponent,
DaskWriteComponent,
PandasTransformComponent,
python_component,
)
29 changes: 0 additions & 29 deletions src/fondant/component/component.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,10 @@
"""This module defines interfaces which components should implement to be executed by fondant."""

import typing as t
from functools import wraps

import dask.dataframe as dd
import pandas as pd

from fondant.component.image import Image
from fondant.core.schema import Field


Expand Down Expand Up @@ -78,30 +76,3 @@ def transform(self, dataframe: pd.DataFrame) -> pd.DataFrame:

Component = t.TypeVar("Component", bound=BaseComponent)
"""Component type which can represents any of the subclasses of BaseComponent"""


def python_component(
extra_requires: t.Optional[t.List[str]] = None,
base_image: t.Optional[str] = None,
):
"""Decorator to enable a python component."""

def wrapper(cls):
image = Image(
base_image=base_image,
extra_requires=extra_requires,
)

@wraps(cls, updated=())
class PythonComponent(cls):
@classmethod
def image(cls) -> Image:
return image

@classmethod
def is_python_component(cls) -> bool:
return True

return PythonComponent

return wrapper
9 changes: 0 additions & 9 deletions src/fondant/component/image.py

This file was deleted.

42 changes: 42 additions & 0 deletions src/fondant/core/component_spec.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,48 @@ def to_file(self, path) -> None:
with open(path, "w", encoding="utf-8") as file_:
yaml.dump(self._specification, file_)

@classmethod
def from_dict(cls, component_spec_dict: t.Dict[str, t.Any]) -> "ComponentSpec":
"""Load the component spec from a dictionary."""
return cls(component_spec_dict)

@classmethod
def from_args(
cls,
name: str,
image: str,
*,
description: t.Optional[str] = None,
consumes: t.Optional[t.Dict[str, t.Union[str, pa.DataType]]] = None,
produces: t.Optional[t.Dict[str, t.Union[str, pa.DataType]]] = None,
arguments: t.Optional[t.Dict[str, t.Any]] = None,
) -> "ComponentSpec":
"""Load the component spec from arguments."""
# TODO make this the __init__ method

spec_dict = {
"name": name,
"image": image,
}

if description:
spec_dict["description"] = description

if consumes:
spec_dict["consumes"] = consumes
else:
spec_dict["consumes"] = {"additionalProperties": True}

if produces:
spec_dict["produces"] = produces
else:
spec_dict["produces"] = {"additionalProperties": True}

if arguments:
spec_dict["args"] = arguments

return cls(spec_dict)

@property
def name(self):
return self._specification["name"]
Expand Down
5 changes: 3 additions & 2 deletions src/fondant/pipeline/__init__.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
from .lightweight_component import Image, lightweight_component # noqa
from .pipeline import ( # noqa
VALID_ACCELERATOR_TYPES,
VALID_VERTEX_ACCELERATOR_TYPES,
ComponentOp,
Dataset,
Pipeline,
Resources,
VALID_ACCELERATOR_TYPES,
VALID_VERTEX_ACCELERATOR_TYPES,
)
38 changes: 38 additions & 0 deletions src/fondant/pipeline/lightweight_component.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
import typing as t
from dataclasses import dataclass
from functools import wraps


@dataclass
class Image:
base_image: t.Optional[str] = "fondant:latest"
extra_requires: t.Optional[t.List[str]] = None
script: t.Optional[str] = None


def lightweight_component(
extra_requires: t.Optional[t.List[str]] = None,
base_image: t.Optional[str] = None,
):
"""Decorator to enable a python component."""

def wrapper(cls):
image = Image(
base_image=base_image,
extra_requires=extra_requires,
)

# updated=() is needed to prevent an attempt to update the class's __dict__
@wraps(cls, updated=())
class PythonComponent(cls):
@classmethod
def image(cls) -> Image:
return image

@classmethod
def is_python_component(cls) -> bool:
return True

return PythonComponent

return wrapper
35 changes: 16 additions & 19 deletions src/fondant/pipeline/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,11 @@
import pyarrow as pa

from fondant.component import BaseComponent
from fondant.component.image import Image
from fondant.core.component_spec import ComponentSpec, OperationSpec
from fondant.core.exceptions import InvalidPipelineDefinition
from fondant.core.manifest import Manifest
from fondant.core.schema import Field
from fondant.pipeline import Image

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -353,8 +353,8 @@ def read(
Read data using the provided component.
Args:
name_or_path: The name of a reusable component, or the path to the directory containing
a custom component.
ref: The name of a reusable component, or the path to the directory containing
a custom component, or a python component class.
produces: A mapping to update the fields produced by the operation as defined in the
component spec. The keys are the names of the fields to be received by the
component, while the values are the type of the field, or the name of the field to
Expand All @@ -376,28 +376,22 @@ def read(
msg,
)

if isinstance(ref, (str, Path)):
operation = ComponentOp.from_component_yaml(ref)
else:
if isinstance(ref(), BaseComponent):
if not ref.is_python_component():
err = """Only Python components are currently supported.
Make sure your class is decorated with the @python_component decorator."""
raise ValueError(err)
name = ref().__class__.__name__
name = ref.__name__
image = ref.image()
description = ref.__doc__ or "python component"

spec_dict = {
"name": name,
"description": "This is an example component",
"image": "example_component:latest",
"produces": {
"additionalProperties": True,
},
"consumes": {
"additionalProperties": True,
},
}
component_spec = ComponentSpec(spec_dict)
component_spec = ComponentSpec.from_args(
name,
image.base_image, # TODO: revisit
description=description,
consumes={},
produces={},
)

operation = ComponentOp(
name,
Expand All @@ -412,6 +406,9 @@ def read(
client_kwargs=client_kwargs,
)

else:
operation = ComponentOp.from_component_yaml(ref)

manifest = Manifest.create(
pipeline_name=self.name,
base_path=self.base_path,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,19 +1,19 @@
import dask.dataframe as dd
import pandas as pd
import pyarrow as pa
from fondant.component import DaskLoadComponent, python_component
from fondant.pipeline import Pipeline
from fondant.component import DaskLoadComponent
from fondant.pipeline import Pipeline, lightweight_component


def test_python_component():
def test_lightweight_component():
pipeline = Pipeline(name="dummy-pipeline", base_path="./data")

@python_component(
@lightweight_component(
base_image="python:3.8-slim-buster",
extra_requires=["pandas", "dask"],
)
class CreateData(DaskLoadComponent):
def __init__(self, *_, **__):
def __init__(self, **kwargs):
pass

def load(self) -> dd.DataFrame:
Expand Down

0 comments on commit 308e59d

Please sign in to comment.