diff --git a/src/fondant/component/component.py b/src/fondant/component/component.py index dd094ff94..919308938 100644 --- a/src/fondant/component/component.py +++ b/src/fondant/component/component.py @@ -16,7 +16,12 @@ class BaseComponent: **kwargs: The provided user arguments are passed in as keyword arguments """ - def __init__(self, spec: ComponentSpec, **kwargs): + def __init__( + self, + spec: ComponentSpec, + schema: t.Optional[t.Dict[str, str]] = None, + **kwargs, + ): pass diff --git a/src/fondant/component/data_io.py b/src/fondant/component/data_io.py index 79a181f8d..aa514b19c 100644 --- a/src/fondant/component/data_io.py +++ b/src/fondant/component/data_io.py @@ -28,9 +28,11 @@ def __init__( manifest: Manifest, component_spec: ComponentSpec, input_partition_rows: t.Optional[int] = None, + consumes: t.Optional[t.Dict[str, str]] = None, ): super().__init__(manifest=manifest, component_spec=component_spec) self.input_partition_rows = input_partition_rows + self.consumes = consumes def partition_loaded_dataframe(self, dataframe: dd.DataFrame) -> dd.DataFrame: """ @@ -131,6 +133,10 @@ def load_dataframe(self) -> dd.DataFrame: logging.info(f"Columns of dataframe: {list(dataframe.columns)}") + # Renaming dataframe columns + if self.consumes: + dataframe = dataframe.rename(columns=self.consumes) + return dataframe @@ -140,8 +146,10 @@ def __init__( *, manifest: Manifest, component_spec: ComponentSpec, + produces: t.Optional[t.Dict[str, str]] = None, ): super().__init__(manifest=manifest, component_spec=component_spec) + self.produces = produces def write_dataframe( self, @@ -158,6 +166,11 @@ def write_dataframe( self.validate_dataframe_columns(dataframe, columns_to_produce) dataframe = dataframe[columns_to_produce] + + # Renaming dataframe produces + if self.produces: + dataframe = dataframe.rename(columns=self.produces) + write_task = self._write_dataframe(dataframe) with ProgressBar(): diff --git a/src/fondant/component/executor.py b/src/fondant/component/executor.py index 571bc60bb..9b9e5e474 100644 --- a/src/fondant/component/executor.py +++ b/src/fondant/component/executor.py @@ -67,6 +67,9 @@ def __init__( input_partition_rows: int, cluster_type: t.Optional[str] = None, client_kwargs: t.Optional[dict] = None, + schema: t.Optional[t.Dict[str, str]] = None, + consumes: t.Optional[t.Dict[str, str]] = None, + produces: t.Optional[t.Dict[str, str]] = None, ) -> None: self.spec = spec self.cache = cache @@ -75,6 +78,10 @@ def __init__( self.metadata = Metadata.from_dict(metadata) self.user_arguments = user_arguments self.input_partition_rows = input_partition_rows + self.schema = schema + + self.consumes = consumes + self.produces = produces if cluster_type == "local": client_kwargs = client_kwargs or { @@ -112,6 +119,10 @@ def from_args(cls) -> "Executor": parser.add_argument("--input_partition_rows", type=int) parser.add_argument("--cluster_type", type=str) parser.add_argument("--client_kwargs", type=json.loads) + parser.add_argument("--schema", type=json.loads) + parser.add_argument("--consumes", type=json.loads) + parser.add_argument("--produces", type=json.loads) + args, _ = parser.parse_known_args() if "component_spec" not in args: @@ -123,6 +134,9 @@ def from_args(cls) -> "Executor": cache = args.cache cluster_type = args.cluster_type client_kwargs = args.client_kwargs + schema = args.schema + consumes = args.consumes + produces = args.produces return cls.from_spec( component_spec, @@ -130,6 +144,9 @@ def from_args(cls) -> "Executor": input_partition_rows=input_partition_rows, cluster_type=cluster_type, client_kwargs=client_kwargs, + schema=schema, + consumes=consumes, + produces=produces, ) @classmethod @@ -141,6 +158,9 @@ def from_spec( input_partition_rows: int, cluster_type: t.Optional[str], client_kwargs: t.Optional[dict], + schema: t.Optional[dict], + consumes: t.Optional[dict], + produces: t.Optional[dict], ) -> "Executor": """Create an executor from a component spec.""" args_dict = vars(cls._add_and_parse_args(component_spec)) @@ -160,6 +180,15 @@ def from_spec( if "client_kwargs" in args_dict: args_dict.pop("client_kwargs") + if "schema" in args_dict: + args_dict.pop("schema") + + if "consumes" in args_dict: + args_dict.pop("consumes") + + if "produces" in args_dict: + args_dict.pop("produces") + input_manifest_path = args_dict.pop("input_manifest_path") output_manifest_path = args_dict.pop("output_manifest_path") metadata = args_dict.pop("metadata") @@ -175,6 +204,9 @@ def from_spec( input_partition_rows=input_partition_rows, cluster_type=cluster_type, client_kwargs=client_kwargs, + schema=schema, + consumes=consumes, + produces=produces, ) @classmethod @@ -251,11 +283,18 @@ def _execute_component( A Dask DataFrame containing the output data """ - def _write_data(self, dataframe: dd.DataFrame, *, manifest: Manifest): + def _write_data( + self, + dataframe: dd.DataFrame, + *, + manifest: Manifest, + produces: t.Optional[t.Dict[str, str]], + ): """Create a data writer given a manifest and writes out the index and subsets.""" data_writer = DaskDataWriter( manifest=manifest, component_spec=self.spec, + produces=produces, ) data_writer.write_dataframe(dataframe, self.client) @@ -331,7 +370,7 @@ def _run_execution( input_manifest: Manifest, ) -> Manifest: logging.info("Executing component") - component = component_cls(self.spec, **self.user_arguments) + component = component_cls(self.spec, self.schema, **self.user_arguments) output_df = self._execute_component( component, manifest=input_manifest, @@ -340,7 +379,11 @@ def _run_execution( component_spec=self.spec, run_id=self.metadata.run_id, ) - self._write_data(dataframe=output_df, manifest=output_manifest) + self._write_data( + dataframe=output_df, + manifest=output_manifest, + produces=self.produces, + ) return output_manifest @@ -478,6 +521,7 @@ def _execute_component( manifest=manifest, component_spec=self.spec, input_partition_rows=self.input_partition_rows, + consumes=self.consumes, ) dataframe = data_loader.load_dataframe() return component.transform(dataframe) @@ -530,6 +574,7 @@ def _execute_component( manifest=manifest, component_spec=self.spec, input_partition_rows=self.input_partition_rows, + consumes=self.consumes, ) dataframe = data_loader.load_dataframe() @@ -574,6 +619,7 @@ def _execute_component( manifest=manifest, component_spec=self.spec, input_partition_rows=self.input_partition_rows, + consumes=self.consumes, ) dataframe = data_loader.load_dataframe() component.write(dataframe) diff --git a/src/fondant/pipeline/pipeline.py b/src/fondant/pipeline/pipeline.py index 05be61c17..8b3deaafe 100644 --- a/src/fondant/pipeline/pipeline.py +++ b/src/fondant/pipeline/pipeline.py @@ -138,6 +138,9 @@ def __init__( cluster_type: t.Optional[str] = "default", client_kwargs: t.Optional[dict] = None, resources: t.Optional[Resources] = None, + schema: t.Optional[t.Dict[str, t.Any]] = None, + consumes: t.Optional[t.Dict[str, t.Any]] = None, + produces: t.Optional[t.Dict[str, t.Any]] = None, ) -> None: self.component_dir = Path(component_dir) self.input_partition_rows = input_partition_rows @@ -148,13 +151,18 @@ def __init__( self.cache = self._configure_caching_from_image_tag(cache) self.cluster_type = cluster_type self.client_kwargs = client_kwargs + self.schema = schema + self.consumes = consumes + self.produces = produces self.arguments = arguments or {} self._add_component_argument("input_partition_rows", input_partition_rows) self._add_component_argument("cache", self.cache) self._add_component_argument("cluster_type", cluster_type) self._add_component_argument("client_kwargs", client_kwargs) - + self._add_component_argument("schema", schema) + self._add_component_argument("consumes", consumes) + self._add_component_argument("produces", produces) self.arguments.setdefault("component_spec", self.component_spec.specification) self.resources = resources or Resources() @@ -221,6 +229,8 @@ def from_registry( cache: t.Optional[bool] = True, cluster_type: t.Optional[str] = "default", client_kwargs: t.Optional[dict] = None, + consumes: t.Optional[t.Dict[str, t.Any]] = None, + produces: t.Optional[t.Dict[str, t.Any]] = None, ) -> "ComponentOp": """Load a reusable component by its name. @@ -233,6 +243,8 @@ def from_registry( cache: Set to False to disable caching, True by default. cluster_type: The type of cluster to use for distributed execution (default is "local"). client_kwargs: Keyword arguments used to initialise the dask client. + consumes: Dataframe columns that will be consumed by the component. + produces: Dataframe columns that will be produced by the component. """ components_dir: Path = t.cast(Path, files("fondant") / f"components/{name}") @@ -248,6 +260,8 @@ def from_registry( cache=cache, cluster_type=cluster_type, client_kwargs=client_kwargs, + consumes=consumes, + produces=produces, ) def get_component_cache_key( @@ -319,11 +333,193 @@ def __init__( self._graph: t.OrderedDict[str, t.Any] = OrderedDict() self.task_without_dependencies_added = False + def _build_component_op( + self, + name, + *, + arguments: t.Optional[t.Dict[str, t.Any]] = None, + input_partition_rows: t.Optional[t.Union[str, int]] = None, + cache: t.Optional[bool] = True, + cluster_type: t.Optional[str] = "default", + client_kwargs: t.Optional[dict] = None, + resources: t.Optional[Resources] = None, + schema: t.Optional[t.Dict[str, t.Any]] = None, + consumes: t.Optional[t.Dict[str, t.Any]] = None, + produces: t.Optional[t.Dict[str, t.Any]] = None, + ) -> ComponentOp: + """Building ComponentOp.""" + if not self._is_custom_component(path_or_name=name): + name = self._get_registry_path(name) + return ComponentOp( + name, + arguments=arguments, + input_partition_rows=input_partition_rows, + cache=cache, + cluster_type=cluster_type, + client_kwargs=client_kwargs, + resources=resources, + schema=schema, + consumes=consumes, + produces=produces, + ) + + def read( + self, + name, + *, + arguments: t.Optional[t.Dict[str, t.Any]] = None, + input_partition_rows: t.Optional[t.Union[str, int]] = None, + cache: t.Optional[bool] = True, + cluster_type: t.Optional[str] = "default", + client_kwargs: t.Optional[dict] = None, + resources: t.Optional[Resources] = None, + schema: t.Dict[str, str], + ) -> "Pipeline": + """ + Add a reading component to the pipeline. + + Args: + name: Name of the resuable component or a path to the component directory. + arguments: A dictionary containing the argument name and value for the operation. + input_partition_rows: The number of rows to load per partition. Set to override the + automatic partitioning. + cache: If true the cached results of previous components will be used, if available. + cluster_type: The type of cluster to use for distributed execution (default is "local"). + client_kwargs: Keyword arguments used to initialise the dask client. + resources: The resources to assign to the operation. + schema: Schema which will be used to initialise the dataset. + """ + component_op = self._build_component_op( + name, + arguments=arguments, + input_partition_rows=input_partition_rows, + cache=cache, + cluster_type=cluster_type, + client_kwargs=client_kwargs, + resources=resources, + schema=schema, + ) + + self.add_op(component_op) + return self + + def apply( + self, + name, + *, + arguments: t.Optional[t.Dict[str, t.Any]] = None, + input_partition_rows: t.Optional[t.Union[str, int]] = None, + cache: t.Optional[bool] = True, + cluster_type: t.Optional[str] = "default", + client_kwargs: t.Optional[dict] = None, + resources: t.Optional[Resources] = None, + consumes: t.Optional[t.Dict[str, str]] = None, + produces: t.Optional[t.Dict[str, str]] = None, + ) -> "Pipeline": + """ + Add a reading component to the pipeline. + + Args: + name: Name of the resuable component or a path to the component directory. + arguments: A dictionary containing the argument name and value for the operation. + input_partition_rows: The number of rows to load per partition. Set to override the + automatic partitioning. + cache: If true the cached results of previous components will be used, if available. + cluster_type: The type of cluster to use for distributed execution (default is "local"). + client_kwargs: Keyword arguments used to initialise the dask client. + resources: The resources to assign to the operation. + consumes: Dataframe columns that will be consumed by the component. + produces: Dataframe columns that will be produced by the component. + """ + component_op = self._build_component_op( + name, + arguments=arguments, + input_partition_rows=input_partition_rows, + cache=cache, + cluster_type=cluster_type, + client_kwargs=client_kwargs, + resources=resources, + consumes=consumes, + produces=produces, + ) + + previous_component = self._get_previous_component() + self.add_op(component_op, dependencies=previous_component) + return self + + def write( + self, + name, + *, + arguments: t.Optional[t.Dict[str, t.Any]] = None, + input_partition_rows: t.Optional[t.Union[str, int]] = None, + cache: t.Optional[bool] = True, + cluster_type: t.Optional[str] = "default", + client_kwargs: t.Optional[dict] = None, + resources: t.Optional[Resources] = None, + consumes: t.Optional[t.Dict[str, str]] = None, + schema: t.Optional[t.Dict[str, str]], + ): + """ + Add a reading component to the pipeline. + + Args: + name: Name of the resuable component or a path to the component directory. + arguments: A dictionary containing the argument name and value for the operation. + input_partition_rows: The number of rows to load per partition. Set to override the + automatic partitioning. + cache: If true the cached results of previous components will be used, if available. + cluster_type: The type of cluster to use for distributed execution (default is "local"). + client_kwargs: Keyword arguments used to initialise the dask client. + resources: The resources to assign to the operation. + consumes: Dataframe columns that will be consumed by the component. + schema: Schema which will be used to write the dataset. + """ + component_op = self._build_component_op( + name, + arguments=arguments, + input_partition_rows=input_partition_rows, + cache=cache, + cluster_type=cluster_type, + client_kwargs=client_kwargs, + resources=resources, + consumes=consumes, + schema=schema, + ) + + # Get previous component + previous_component = self._get_previous_component() + self.add_op(component_op, dependencies=previous_component) + + @staticmethod + def _is_custom_component(path_or_name): + """Checks if name is a local path and a custom component.""" + components_dir: Path = Path(path_or_name) + return components_dir.exists() and components_dir.is_dir() + + @staticmethod + def _get_registry_path(name): + """Checks if name is a local path and a custom component.""" + components_dir: Path = t.cast(Path, files("fondant") / f"components/{name}") + if not (components_dir.exists() and components_dir.is_dir()): + msg = f"No reusable component with name {name} found." + raise ValueError(msg) + return components_dir + + def _get_previous_component(self) -> ComponentOp: + """Return previous component that was added to the task graph.""" + previous_component = list(self._graph.items())[-1][-1]["fondant_component_op"] + if previous_component is None: + msg = "No previous component found." + raise ValueError(msg) + + return previous_component + def add_op( self, task: ComponentOp, dependencies: t.Optional[t.Union[ComponentOp, t.List[ComponentOp]]] = None, - ): + ) -> "Pipeline": """ Add a task to the pipeline with an optional dependency. diff --git a/tests/component/test_component.py b/tests/component/test_component.py index 830ce2963..54cec78b6 100644 --- a/tests/component/test_component.py +++ b/tests/component/test_component.py @@ -446,7 +446,7 @@ def __init__(self, *args, flag, value): self.flag = flag self.value = value - def write(self, dataframe): + def write(self, dataframe, produces): assert self.flag == "success" assert self.value == 1 assert isinstance(dataframe, dd.DataFrame) diff --git a/tests/core/examples/evolution_examples/1/component.yaml b/tests/core/examples/evolution_examples/1/component.yaml index e91ae6f46..4dbcfc900 100644 --- a/tests/core/examples/evolution_examples/1/component.yaml +++ b/tests/core/examples/evolution_examples/1/component.yaml @@ -7,10 +7,15 @@ consumes: type: binary produces: +<<<<<<<< HEAD:tests/core/examples/evolution_examples/2/component.yaml + images_encoding: + type: string +======== embeddings_data: type: array items: type: float32 +>>>>>>>> main:tests/core/examples/evolution_examples/1/component.yaml args: storage_args: diff --git a/tests/pipeline/examples/pipelines/invalid_pipeline/example_1/first_component/fondant_component.yaml b/tests/pipeline/examples/pipelines/invalid_pipeline/example_1/first_component/fondant_component.yaml index 066519825..09eef3105 100644 --- a/tests/pipeline/examples/pipelines/invalid_pipeline/example_1/first_component/fondant_component.yaml +++ b/tests/pipeline/examples/pipelines/invalid_pipeline/example_1/first_component/fondant_component.yaml @@ -7,8 +7,15 @@ consumes: type: binary produces: +<<<<<<<< HEAD:tests/core/examples/evolution_examples/1/component.yaml + embeddings_data: + type: array + items: + type: float32 +======== captions_data: type: string +>>>>>>>> main:tests/pipeline/examples/pipelines/invalid_pipeline/example_1/first_component/fondant_component.yaml args: storage_args: diff --git a/tests/pipeline/examples/pipelines/invalid_pipeline/example_2/first_component/fondant_component.yaml b/tests/pipeline/examples/pipelines/invalid_pipeline/example_2/first_component/fondant_component.yaml index 053b4c5b5..fba17c1a7 100644 --- a/tests/pipeline/examples/pipelines/invalid_pipeline/example_2/first_component/fondant_component.yaml +++ b/tests/pipeline/examples/pipelines/invalid_pipeline/example_2/first_component/fondant_component.yaml @@ -15,4 +15,4 @@ produces: args: storage_args: description: Storage arguments - type: str \ No newline at end of file + type: str diff --git a/tests/pipeline/test_pipeline.py b/tests/pipeline/test_pipeline.py index b4deebc97..60e2fe82a 100644 --- a/tests/pipeline/test_pipeline.py +++ b/tests/pipeline/test_pipeline.py @@ -65,6 +65,58 @@ def test_component_op( ) +def test_new_pipeline_interface(): + pipeline = Pipeline( + pipeline_name="my_pipeline", + pipeline_description="description of my pipeline", + base_path="/foo/bar", + ) + + dataset = pipeline.read( + name="load_from_hf_hub", + schema={ + "image": "binary", # or pa.binary() + }, + ) + + dataset = dataset.apply( + name="caption_images", + consumes={ + "images_data": "image", + }, + produces={ + "captions": "text", + }, + ) + + dataset = dataset.apply( + name="embed_text", + consumes={ + "text_data": "text", + }, + ) + + dataset.write( + name="write_to_hf_hub", + schema={ + "image": "binary", + "caption": "string", + }, + ) + + # Get component_ops from pipeline + component_ops = [ + element[1]["fondant_component_op"] for element in list(pipeline._graph.items()) + ] + + assert component_ops[0].schema == {"image": "binary"} + assert component_ops[1].consumes == {"images_data": "image"} + assert component_ops[1].produces == {"captions": "text"} + assert component_ops[2].consumes == {"text_data": "text"} + assert component_ops[2].produces is None + assert component_ops[3].schema == {"image": "binary", "caption": "string"} + + @pytest.mark.parametrize( "valid_pipeline_example", [