diff --git a/components/load_from_hf_hub/Dockerfile b/components/load_from_hf_hub/Dockerfile index 72525d88..919611e7 100644 --- a/components/load_from_hf_hub/Dockerfile +++ b/components/load_from_hf_hub/Dockerfile @@ -12,7 +12,7 @@ RUN pip3 install --no-cache-dir -r requirements.txt # Install Fondant # This is split from other requirements to leverage caching ARG FONDANT_VERSION=main -RUN pip3 install fondant[component,aws,azure,gcp]@git+https://github.com/ml6team/fondant@${FONDANT_VERSION} +RUN pip3 install fondant[component,aws,azure,gcp]@git+https://github.com/ml6team/fondant@d87efb9d37fbec8e86b5fc20a6ab480ff67895af # Set the working directory to the component folder WORKDIR /component/src diff --git a/examples/sample_pipeline/pipeline.py b/examples/sample_pipeline/pipeline.py index 8d5f331d..875f9c58 100644 --- a/examples/sample_pipeline/pipeline.py +++ b/examples/sample_pipeline/pipeline.py @@ -59,7 +59,6 @@ def transform(self, dataframe: pd.DataFrame) -> pd.DataFrame: _ = dataset.apply( ref=CalculateChunkLength, - consumes={"text": pa.string()}, produces={"chunk_length": pa.int32()}, arguments={"arg_x": "value_x"}, ) diff --git a/src/fondant/core/component_spec.py b/src/fondant/core/component_spec.py index 27fffce7..2802ad69 100644 --- a/src/fondant/core/component_spec.py +++ b/src/fondant/core/component_spec.py @@ -93,8 +93,8 @@ def __init__( image: str, *, description: t.Optional[str] = None, - consumes: t.Optional[t.Dict[str, t.Union[str, pa.DataType, bool]]] = None, - produces: t.Optional[t.Dict[str, t.Union[str, pa.DataType, bool]]] = None, + consumes: t.Optional[t.Mapping[str, t.Union[str, pa.DataType, bool]]] = None, + produces: t.Optional[t.Mapping[str, t.Union[str, pa.DataType, bool]]] = None, previous_index: t.Optional[str] = None, args: t.Optional[t.Dict[str, t.Any]] = None, tags: t.Optional[t.List[str]] = None, @@ -443,7 +443,7 @@ def _inner_mapping(self, name: str) -> t.Mapping[str, Field]: else: msg = ( f"Received pyarrow DataType value {value} for key {key} in the " - f"`{name}` argument passed to the operation, but {key} is " + f"`{name}` argument passed to the operation, but `{key}` is " f"already defined in the `{name}` section of the component spec " f"with type {spec_type}" ) @@ -475,8 +475,8 @@ def _outer_mapping(self, name: str) -> t.Mapping[str, Field]: mapping[value] = Field(name=value, type=mapping.pop(key).type) else: msg = ( - f"Received a string value for key {key} in the `{name}` " - f"argument passed to the operation, but {key} is not defined in " + f"Received a string value for key `{key}` in the `{name}` " + f"argument passed to the operation, but `{key}` is not defined in " f"the `{name}` section of the component spec." ) raise InvalidPipelineDefinition(msg) diff --git a/src/fondant/pipeline/lightweight_component.py b/src/fondant/pipeline/lightweight_component.py index 15798da3..56e72c9f 100644 --- a/src/fondant/pipeline/lightweight_component.py +++ b/src/fondant/pipeline/lightweight_component.py @@ -8,10 +8,13 @@ from functools import wraps from importlib import metadata +import pyarrow as pa + from fondant.component import BaseComponent, Component +from fondant.core.schema import Field, Type logger = logging.getLogger(__name__) -logger.setLevel(logging.INFO) + MIN_PYTHON_VERSION = (3, 8) MAX_PYTHON_VERSION = (3, 11) @@ -67,11 +70,77 @@ class LightweightComponent(BaseComponent): def image(cls) -> Image: raise NotImplementedError + @classmethod + def consumes(cls) -> t.Optional[t.Dict[str, t.Any]]: + pass + + @classmethod + def modify_spec_consumes( + cls, + spec_consumes: t.Dict[str, t.Any], + apply_consumes: t.Optional[t.Dict[str, t.Union[str, pa.DataType]]], + ): + """Modify fields based on the consumes argument in the 'apply' method.""" + if apply_consumes: + for k, v in apply_consumes.items(): + if isinstance(v, str): + spec_consumes[k] = spec_consumes.pop(v) + else: + msg = ( + f"Invalid data type for field `{k}` in the `apply_consumes` " + f"argument. Only string types are allowed." + ) + raise ValueError( + msg, + ) + return spec_consumes + + @classmethod + def get_spec_consumes( + cls, + dataset_fields: t.Mapping[str, Field], + apply_consumes: t.Optional[t.Dict[str, t.Union[str, pa.DataType]]] = None, + ): + """ + Function that get the consumes spec for the component based on the dataset fields and + the apply_consumes argument. + + Args: + dataset_fields: The fields of the dataset. + apply_consumes: The consumes argument in the apply method. + + Returns: + The consumes spec for the component. + """ + consumes = cls.consumes() + + if consumes is None: + # Get consumes spec from the dataset + spec_consumes = {k: v.type.to_dict() for k, v in dataset_fields.items()} + + spec_consumes = cls.modify_spec_consumes(spec_consumes, apply_consumes) + + logger.warning( + "No consumes defined. Consumes will be inferred from the dataset." + "All field will be consumed which may lead to additional computation," + "Consider defining consumes in the component.\n Consumes: %s", + spec_consumes, + ) + + else: + spec_consumes = { + k: (Type(v).to_dict() if k != "additionalProperties" else v) + for k, v in consumes.items() + } + + return spec_consumes + def lightweight_component( *args, extra_requires: t.Optional[t.List[str]] = None, base_image: t.Optional[str] = None, + consumes: t.Optional[t.Dict[str, t.Any]] = None, ): """Decorator to enable a lightweight component.""" @@ -160,6 +229,10 @@ class LightweightComponentOp(cls, LightweightComponent): def image(cls) -> Image: return image + @classmethod + def consumes(cls) -> t.Optional[t.Dict[str, t.Dict[t.Any, t.Any]]]: + return consumes + return LightweightComponentOp # Call wrapper with function (`args[0]`) when no additional arguments were passed diff --git a/src/fondant/pipeline/pipeline.py b/src/fondant/pipeline/pipeline.py index 8720cd83..33aebe6a 100644 --- a/src/fondant/pipeline/pipeline.py +++ b/src/fondant/pipeline/pipeline.py @@ -205,10 +205,21 @@ def from_component_yaml(cls, path, **kwargs) -> "ComponentOp": ) @classmethod - def from_ref(cls, ref: t.Any, **kwargs) -> "ComponentOp": + def from_ref( + cls, + ref: t.Any, + fields: t.Optional[t.Mapping[str, Field]] = None, + **kwargs, + ) -> "ComponentOp": """Create a ComponentOp from a reference. The reference can be a reusable component name, a path to a custom component, or a python component class. + + Args: + ref: The name of a reusable component, or the path to the directory containing + a custom component, or a python component class. + fields: The fields of the dataset available to the component. + **kwargs: The provided user arguments are passed in as keyword arguments """ if inspect.isclass(ref) and issubclass(ref, BaseComponent): if issubclass(ref, LightweightComponent): @@ -216,11 +227,17 @@ def from_ref(cls, ref: t.Any, **kwargs) -> "ComponentOp": image = ref.image() description = ref.__doc__ or "lightweight component" + consumes_spec = ( + ref.get_spec_consumes(fields, kwargs["consumes"]) + if fields + else {"additionalProperties": True} + ) + component_spec = ComponentSpec( name, image.base_image, description=description, - consumes={"additionalProperties": True}, + consumes=consumes_spec, produces={"additionalProperties": True}, args={ name: arg.to_spec() @@ -726,6 +743,7 @@ def apply( """ operation = ComponentOp.from_ref( ref, + fields=self.fields, produces=produces, consumes=consumes, arguments=arguments, @@ -773,6 +791,7 @@ def write( """ operation = ComponentOp.from_ref( ref, + fields=self.fields, consumes=consumes, arguments=arguments, input_partition_rows=input_partition_rows, diff --git a/tests/pipeline/test_python_component.py b/tests/pipeline/test_python_component.py index 93fb0b07..b74276ef 100644 --- a/tests/pipeline/test_python_component.py +++ b/tests/pipeline/test_python_component.py @@ -3,6 +3,7 @@ import sys import textwrap from dataclasses import dataclass +from importlib.metadata import version from unittest import mock import dask.dataframe as dd @@ -10,25 +11,59 @@ import pyarrow as pa import pytest from fondant.component import DaskLoadComponent, PandasTransformComponent +from fondant.core.component_spec import OperationSpec from fondant.core.exceptions import InvalidLightweightComponent from fondant.pipeline import Image, Pipeline, lightweight_component from fondant.pipeline.compiler import DockerCompiler +from fondant.testing import DockerComposeConfigs -def test_build_python_script(): - @lightweight_component() +@pytest.fixture() +def default_fondant_image(): + basename = "fndnt/fondant" + fondant_version = version("fondant") + python_version = sys.version_info + python_version = f"{python_version.major}.{python_version.minor}" + return f"{basename}:{fondant_version}-py{python_version}" + + +@pytest.fixture() +def load_pipeline(caplog): + pipeline = Pipeline( + name="dummy-pipeline", + base_path="./data", + ) + + @lightweight_component( + base_image="python:3.8-slim-buster", + extra_requires=["pandas", "dask"], + ) class CreateData(DaskLoadComponent): def load(self) -> dd.DataFrame: df = pd.DataFrame( { "x": [1, 2, 3], "y": [4, 5, 6], + "z": [7, 8, 9], }, index=pd.Index(["a", "b", "c"], name="id"), ) return dd.from_pandas(df, npartitions=1) - assert CreateData.image().script == textwrap.dedent( + load_script = CreateData.image().script + + dataset = pipeline.read( + ref=CreateData, + produces={"x": pa.int32(), "y": pa.int32(), "z": pa.int32()}, + ) + + caplog_records = caplog.records + return pipeline, dataset, load_script, caplog_records + + +def test_build_python_script(load_pipeline): + _, _, load_script, _ = load_pipeline + assert load_script == textwrap.dedent( """\ from typing import * import typing as t @@ -46,6 +81,7 @@ def load(self) -> dd.DataFrame: { "x": [1, 2, 3], "y": [4, 5, 6], + "z": [7, 8, 9], }, index=pd.Index(["a", "b", "c"], name="id"), ) @@ -54,31 +90,8 @@ def load(self) -> dd.DataFrame: ) -def test_lightweight_component_sdk(caplog): - pipeline = Pipeline( - name="dummy-pipeline", - base_path="./data", - ) - - @lightweight_component( - base_image="python:3.8-slim-buster", - extra_requires=["pandas", "dask"], - ) - class CreateData(DaskLoadComponent): - def load(self) -> dd.DataFrame: - df = pd.DataFrame( - { - "x": [1, 2, 3], - "y": [4, 5, 6], - }, - index=pd.Index(["a", "b", "c"], name="id"), - ) - return dd.from_pandas(df, npartitions=1) - - dataset = pipeline.read( - ref=CreateData, - produces={"x": pa.int32(), "y": pa.int32()}, - ) +def test_lightweight_component_sdk(default_fondant_image, load_pipeline): + pipeline, dataset, load_script, caplog_records = load_pipeline assert len(pipeline._graph.keys()) == 1 operation_spec_dict = pipeline._graph["createdata"][ @@ -93,22 +106,19 @@ def load(self) -> dd.DataFrame: "produces": {"additionalProperties": True}, }, "consumes": {}, - "produces": {"x": {"type": "int32"}, "y": {"type": "int32"}}, + "produces": { + "x": {"type": "int32"}, + "y": {"type": "int32"}, + "z": {"type": "int32"}, + }, } # check warning: fondant is not part of the requirements - msg = ( - "You are not using a Fondant default base image, and Fondant is not part of" - "your extra requirements. Please make sure that you have installed fondant " - "inside your container. Alternatively, you can should add Fondant to " - "the extra requirements. \n" - "E.g. \n" - '@lightweight_component(..., extra_requires=["fondant"])' - ) + msg = "You are not using a Fondant default base image" - assert any(msg in record.message for record in caplog.records) + assert any(msg in record.message for record in caplog_records) - @lightweight_component() + @lightweight_component class AddN(PandasTransformComponent): def __init__(self, n: int): self.n = n @@ -119,8 +129,7 @@ def transform(self, dataframe: pd.DataFrame) -> pd.DataFrame: _ = dataset.apply( ref=AddN, - produces={"x": pa.int32(), "y": pa.int32()}, - consumes={"x": pa.int32(), "y": pa.int32()}, + produces={"x": pa.int32(), "y": pa.int32(), "z": pa.int32()}, arguments={"n": 1}, ) assert len(pipeline._graph.keys()) == 1 + 1 @@ -133,18 +142,135 @@ def transform(self, dataframe: pd.DataFrame) -> pd.DataFrame: "name": "AddN", "image": Image.resolve_fndnt_base_image(), "description": "lightweight component", - "consumes": {"additionalProperties": True}, + "consumes": { + "x": {"type": "int32"}, + "y": {"type": "int32"}, + "z": {"type": "int32"}, + }, "produces": {"additionalProperties": True}, "args": {"n": {"type": "int"}}, }, - "consumes": {"x": {"type": "int32"}, "y": {"type": "int32"}}, - "produces": {"x": {"type": "int32"}, "y": {"type": "int32"}}, + "consumes": {}, + "produces": { + "x": {"type": "int32"}, + "y": {"type": "int32"}, + "z": {"type": "int32"}, + }, } pipeline._validate_pipeline_definition(run_id="dummy-run-id") DockerCompiler().compile(pipeline) +def test_consumes_mapping_all_fields(tmp_path_factory, load_pipeline): + @lightweight_component( + base_image="python:3.8", + extra_requires=[ + "fondant[component]@git+https://github.com/ml6team/fondant@main", + ], + ) + class AddN(PandasTransformComponent): + def __init__(self, n: int): + self.n = n + + def transform(self, dataframe: pd.DataFrame) -> pd.DataFrame: + dataframe["a"] = dataframe["a"].map(lambda x: x + self.n) + return dataframe + + pipeline, dataset, _, _ = load_pipeline + + _ = dataset.apply( + ref=AddN, + consumes={"a": "x"}, + produces={"a": pa.int32()}, + arguments={"n": 1}, + ) + + with tmp_path_factory.mktemp("temp") as fn: + output_path = str(fn / "kubeflow_pipeline.yml") + DockerCompiler().compile(pipeline=pipeline, output_path=output_path) + pipeline_configs = DockerComposeConfigs.from_spec(output_path) + operation_spec = OperationSpec.from_json( + pipeline_configs.component_configs["addn"].arguments["operation_spec"], + ) + assert all(k in ["a", "y", "z"] for k in operation_spec.inner_consumes) + assert "x" in operation_spec.outer_consumes + + +def test_consumes_mapping_specific_fields(tmp_path_factory, load_pipeline): + @lightweight_component( + base_image="python:3.8", + extra_requires=[ + "fondant[component]@git+https://github.com/ml6team/fondant@main", + ], + consumes={"a": pa.int32()}, + ) + class AddN(PandasTransformComponent): + def __init__(self, n: int): + self.n = n + + def transform(self, dataframe: pd.DataFrame) -> pd.DataFrame: + dataframe["a"] = dataframe["a"].map(lambda x: x + self.n) + return dataframe + + pipeline, dataset, _, _ = load_pipeline + + _ = dataset.apply( + ref=AddN, + consumes={"a": "x"}, + produces={"a": pa.int32()}, + arguments={"n": 1}, + ) + + with tmp_path_factory.mktemp("temp") as fn: + output_path = str(fn / "kubeflow_pipeline.yml") + DockerCompiler().compile(pipeline=pipeline, output_path=output_path) + pipeline_configs = DockerComposeConfigs.from_spec(output_path) + operation_spec = OperationSpec.from_json( + pipeline_configs.component_configs["addn"].arguments["operation_spec"], + ) + assert "a" in operation_spec.inner_consumes + assert "x" in operation_spec.outer_consumes + assert "z" not in operation_spec.inner_consumes + + +def test_consumes_mapping_additional_fields(tmp_path_factory, load_pipeline): + @lightweight_component( + base_image="python:3.8", + extra_requires=[ + "fondant[component]@git+https://github.com/ml6team/fondant@main", + ], + consumes={"additionalProperties": True}, + ) + class AddN(PandasTransformComponent): + def __init__(self, n: int): + self.n = n + + def transform(self, dataframe: pd.DataFrame) -> pd.DataFrame: + dataframe["a"] = dataframe["x"].map(lambda x: x + self.n) + return dataframe + + pipeline, dataset, _, _ = load_pipeline + + _ = dataset.apply( + ref=AddN, + consumes={"x": pa.int32()}, + produces={"a": pa.int32()}, + arguments={"n": 1}, + ) + + with tmp_path_factory.mktemp("temp") as fn: + output_path = str(fn / "kubeflow_pipeline.yml") + DockerCompiler().compile(pipeline=pipeline, output_path=output_path) + pipeline_configs = DockerComposeConfigs.from_spec(output_path) + operation_spec = OperationSpec.from_json( + pipeline_configs.component_configs["addn"].arguments["operation_spec"], + ) + assert "x" in operation_spec.inner_consumes + assert "a" in operation_spec.inner_produces + assert "z" not in operation_spec.inner_consumes + + def test_lightweight_component_missing_decorator(): pipeline = Pipeline( name="dummy-pipeline",