From 918d1a0a910c2512bc3c4ed9572275636db292ee Mon Sep 17 00:00:00 2001 From: Robbe Sneyders Date: Tue, 28 Nov 2023 18:47:33 +0100 Subject: [PATCH] Move to datasets & apply interface (#683) This PR is the first one of multiple PRs to replace #665. This PR only focuses on implementing the new pipeline interface, without adding any new functionality. The new interface applies operations to intermediate datasets instead of adding operations to a pipeline, as shown below. It's a superficial change, since only the interface is changed. All underlying behavior is still the same. The new interface fits nicely with our data format design and we'll be able to leverage it for interactive development in the future. We can calculate the schema for each intermediate dataset so the user can inspect it. Or with eager execution, we could execute a single operation and allow the user to explore the data using the dataset. I still need to update the README generation, but I'll do that as a separate PR. It becomes a bit more complex since we now need to discriminate between read, transform, and write components to generate the example code. **Old interface** ```Python from fondant.pipeline import ComponentOp, Pipeline pipeline = Pipeline( pipeline_name="my_pipeline", pipeline_description="description of my pipeline", base_path="/foo/bar", ) load_op = ComponentOp( component_dir="load_data", arguments={...}, ) caption_op = ComponentOp.from_registry( name="caption_images", arguments={...}, ) embed_op = ComponentOp( component_dir="embed_text", arguments={...}, ) write_op = ComponentOp.from_registry( name="write_to_hf_hub", arguments={...}, ) pipeline.add_op(load_op) pipeline.add_op(caption_op, dependencies=[load_op]) pipeline.add_op(embed_op, dependencies=[caption_op]) pipeline.add_op(write_op, dependencies=[embed_op]) ``` **New interface** ```Python pipeline = Pipeline( pipeline_name="my_pipeline", pipeline_description="description of my pipeline", base_path="/foo/bar", ) dataset = pipeline.read( "load_data", arguments={...}, ) dataset = dataset.apply( "caption_images", arguments={...}, ) dataset = dataset.apply( "embed_text", arguments={...}, ) dataset.write( "write_to_hf_hub", arguments={...}, ) --- src/fondant/pipeline/compiler.py | 6 +- src/fondant/pipeline/pipeline.py | 261 ++++++++++++------ .../invalid_double_pipeline.py | 4 +- tests/examples/example_modules/pipeline.py | 4 +- .../components/dummy_component/README.md | 10 +- .../integration_tests/test_sample_pipeline.py | 21 +- tests/pipeline/test_compiler.py | 83 +++--- tests/pipeline/test_pipeline.py | 65 ++--- tests/pipeline/test_runner.py | 4 +- tests/test_cli.py | 2 +- 10 files changed, 263 insertions(+), 197 deletions(-) diff --git a/src/fondant/pipeline/compiler.py b/src/fondant/pipeline/compiler.py index 4f9f23f30..3c3880002 100644 --- a/src/fondant/pipeline/compiler.py +++ b/src/fondant/pipeline/compiler.py @@ -148,7 +148,7 @@ def _generate_spec( component_cache_key = None for component_name, component in pipeline._graph.items(): - component_op = component["fondant_component_op"] + component_op = component["operation"] component_cache_key = component_op.get_component_cache_key( previous_component_cache=component_cache_key, @@ -347,7 +347,7 @@ def kfp_pipeline(): for component_name, component in pipeline._graph.items(): logger.info(f"Compiling service for {component_name}") - component_op = component["fondant_component_op"] + component_op = component["operation"] # convert ComponentOp to Kubeflow component kubeflow_component_op = self.kfp.components.load_component_from_text( text=component_op.component_spec.kubeflow_specification.to_string(), @@ -673,7 +673,7 @@ def compile( with tempfile.TemporaryDirectory(dir=os.getcwd()) as tmpdirname: for component_name, component in pipeline._graph.items(): - component_op = component["fondant_component_op"] + component_op = component["operation"] component_cache_key = component_op.get_component_cache_key( previous_component_cache=component_cache_key, ) diff --git a/src/fondant/pipeline/pipeline.py b/src/fondant/pipeline/pipeline.py index 05be61c17..e605e7a6b 100644 --- a/src/fondant/pipeline/pipeline.py +++ b/src/fondant/pipeline/pipeline.py @@ -108,7 +108,8 @@ class ComponentOp: is a representation of a function that will be executed as part of a pipeline. Arguments: - component_dir: The path to the component directory. + name_or_path: The name of a reusable component, or the path to the directory containing a + custom component. arguments: A dictionary containing the argument name and value for the operation. input_partition_rows: The number of rows to load per partition. Set to override the automatic partitioning @@ -119,18 +120,13 @@ class ComponentOp: Note: - A Fondant Component operation is created by defining a Fondant Component and its input arguments. - - The `accelerator_name`, `node_pool_label`, `node_pool_name` - attributes are optional and can be used to specify additional - configurations for the operation. More information on the optional attributes that can - be assigned to kfp components here: - https://kubeflow-pipelines.readthedocs.io/en/1.8.13/source/kfp.dsl.html """ COMPONENT_SPEC_NAME = "fondant_component.yaml" def __init__( self, - component_dir: t.Union[str, Path], + name_or_path: t.Union[str, Path], *, arguments: t.Optional[t.Dict[str, t.Any]] = None, input_partition_rows: t.Optional[t.Union[str, int]] = None, @@ -139,7 +135,11 @@ def __init__( client_kwargs: t.Optional[dict] = None, resources: t.Optional[Resources] = None, ) -> None: - self.component_dir = Path(component_dir) + if self._is_custom_component(name_or_path): + self.component_dir = Path(name_or_path) + else: + self.component_dir = self._get_registry_path(str(name_or_path)) + self.input_partition_rows = input_partition_rows self.component_spec = ComponentSpec.from_file( self.component_dir / self.COMPONENT_SPEC_NAME, @@ -210,45 +210,20 @@ def dockerfile_path(self) -> t.Optional[Path]: path = self.component_dir / "Dockerfile" return path if path.exists() else None - @classmethod - def from_registry( - cls, - name: str, - *, - arguments: t.Optional[t.Dict[str, t.Any]] = None, - input_partition_rows: t.Optional[t.Union[int, str]] = None, - resources: t.Optional[Resources] = None, - cache: t.Optional[bool] = True, - cluster_type: t.Optional[str] = "default", - client_kwargs: t.Optional[dict] = None, - ) -> "ComponentOp": - """Load a reusable component by its name. - - Args: - name: Name of the component to load - arguments: A dictionary containing the argument name and value for the operation. - input_partition_rows: The number of rows to load per partition. Set to override the - automatic partitioning - resources: The resources to assign to the operation. - cache: Set to False to disable caching, True by default. - cluster_type: The type of cluster to use for distributed execution (default is "local"). - client_kwargs: Keyword arguments used to initialise the dask client. - """ - components_dir: Path = t.cast(Path, files("fondant") / f"components/{name}") + @staticmethod + def _is_custom_component(path_or_name: t.Union[str, Path]) -> bool: + """Checks if name is a local path and a custom component.""" + component_dir: Path = Path(path_or_name) + return component_dir.exists() and component_dir.is_dir() - if not (components_dir.exists() and components_dir.is_dir()): + @staticmethod + def _get_registry_path(name: str) -> Path: + """Checks if name is a local path and a custom component.""" + component_dir: Path = t.cast(Path, files("fondant") / f"components/{name}") + if not (component_dir.exists() and component_dir.is_dir()): msg = f"No reusable component with name {name} found." raise ValueError(msg) - - return ComponentOp( - components_dir, - arguments=arguments, - input_partition_rows=input_partition_rows, - resources=resources, - cache=cache, - cluster_type=cluster_type, - client_kwargs=client_kwargs, - ) + return component_dir def get_component_cache_key( self, @@ -302,64 +277,100 @@ class Pipeline: def __init__( self, + name: str, + *, base_path: str, - pipeline_name: str, - pipeline_description: t.Optional[str] = None, + description: t.Optional[str] = None, ): """ Args: + name: The name of the pipeline. base_path: The base path for the pipeline where the artifacts are stored. - pipeline_name: The name of the pipeline. - pipeline_description: Optional description of the pipeline. + description: Optional description of the pipeline. """ self.base_path = base_path - self.name = self._validate_pipeline_name(pipeline_name) - self.description = pipeline_description - self.package_path = f"{pipeline_name}.tgz" + self.name = self._validate_pipeline_name(name) + self.description = description + self.package_path = f"{name}.tgz" self._graph: t.OrderedDict[str, t.Any] = OrderedDict() self.task_without_dependencies_added = False - def add_op( + def _apply( self, - task: ComponentOp, - dependencies: t.Optional[t.Union[ComponentOp, t.List[ComponentOp]]] = None, - ): + operation: ComponentOp, + datasets: t.Optional[t.Union["Dataset", t.List["Dataset"]]] = None, + ) -> "Dataset": """ - Add a task to the pipeline with an optional dependency. + Apply an operation to the provided input datasets. Args: - task: The task to add to the pipeline. - dependencies: Optional task dependencies that needs to be completed before the task - can run. + operation: The operation to apply. + datasets: The input datasets to apply the operation on """ - if dependencies is None: - if self.task_without_dependencies_added: - msg = "At most one task can be defined without dependencies." - raise InvalidPipelineDefinition( - msg, - ) - dependencies = [] - self.task_without_dependencies_added = True - elif not isinstance(dependencies, list): - dependencies = [dependencies] + if datasets is None: + datasets = [] + elif not isinstance(datasets, list): + datasets = [datasets] - if len(dependencies) > 1: + if len(datasets) > 1: msg = ( - f"Multiple component dependencies provided for component " - f"`{task.component_spec.name}`. The current version of Fondant can only handle " - f"components with a single dependency. Please note that the behavior of the " - f"pipeline may be unpredictable or incorrect." + f"Multiple input datasets provided for operation `{operation.name}`. The current " + f"version of Fondant can only handle components with a single input." ) raise InvalidPipelineDefinition( msg, ) - dependencies_names = [dependency.name for dependency in dependencies] - - self._graph[task.name] = { - "fondant_component_op": task, - "dependencies": dependencies_names, + self._graph[operation.name] = { + "operation": operation, + "dependencies": [dataset.operation.name for dataset in datasets], } + return Dataset(pipeline=self, operation=operation) + + def read( + self, + name_or_path: t.Union[str, Path], + *, + arguments: t.Optional[t.Dict[str, t.Any]] = None, + input_partition_rows: t.Optional[t.Union[int, str]] = None, + resources: t.Optional[Resources] = None, + cache: t.Optional[bool] = True, + cluster_type: t.Optional[str] = "default", + client_kwargs: t.Optional[dict] = None, + ) -> "Dataset": + """ + Read data using the provided component. + + Args: + name_or_path: The name of a reusable component, or the path to the directory containing + a custom component. + arguments: A dictionary containing the argument name and value for the operation. + input_partition_rows: The number of rows to load per partition. Set to override the + automatic partitioning + resources: The resources to assign to the operation. + cache: Set to False to disable caching, True by default. + cluster_type: The type of cluster to use for distributed execution (default is "local"). + client_kwargs: Keyword arguments used to initialise the Dask client. + + Returns: + An intermediate dataset. + """ + if self._graph: + msg = "For now, at most one read component can be applied per pipeline." + raise InvalidPipelineDefinition( + msg, + ) + + operation = ComponentOp( + name_or_path, + arguments=arguments, + input_partition_rows=input_partition_rows, + resources=resources, + cache=cache, + cluster_type=cluster_type, + client_kwargs=client_kwargs, + ) + return self._apply(operation) def sort_graph(self): """Sort the graph topologically based on task dependencies.""" @@ -437,7 +448,7 @@ def _validate_pipeline_definition(self, run_id: str): cache_key="42", ) for operation_specs in self._graph.values(): - fondant_component_op = operation_specs["fondant_component_op"] + fondant_component_op = operation_specs["operation"] component_spec = fondant_component_op.component_spec if not load_component: @@ -481,3 +492,93 @@ def _validate_pipeline_definition(self, run_id: str): def __repr__(self) -> str: """Return a string representation of the FondantPipeline object.""" return f"{self.__class__.__name__}({self._graph!r}" + + +class Dataset: + def __init__(self, *, pipeline: Pipeline, operation: ComponentOp) -> None: + """A class representing an intermediate dataset. + + Args: + pipeline: The pipeline this dataset is a part of. + operation: The operation that created this dataset. + """ + self.pipeline = pipeline + self.operation = operation + + def apply( + self, + name_or_path: t.Union[str, Path], + *, + arguments: t.Optional[t.Dict[str, t.Any]] = None, + input_partition_rows: t.Optional[t.Union[int, str]] = None, + resources: t.Optional[Resources] = None, + cache: t.Optional[bool] = True, + cluster_type: t.Optional[str] = "default", + client_kwargs: t.Optional[dict] = None, + ) -> "Dataset": + """ + Apply the provided component on the dataset. + + Args: + name_or_path: The name of a reusable component, or the path to the directory containing + a custom component. + arguments: A dictionary containing the argument name and value for the operation. + input_partition_rows: The number of rows to load per partition. Set to override the + automatic partitioning + resources: The resources to assign to the operation. + cache: Set to False to disable caching, True by default. + cluster_type: The type of cluster to use for distributed execution (default is "local"). + client_kwargs: Keyword arguments used to initialise the Dask client. + + Returns: + An intermediate dataset. + """ + operation = ComponentOp( + name_or_path, + arguments=arguments, + input_partition_rows=input_partition_rows, + resources=resources, + cache=cache, + cluster_type=cluster_type, + client_kwargs=client_kwargs, + ) + return self.pipeline._apply(operation, self) + + def write( + self, + name_or_path: t.Union[str, Path], + *, + arguments: t.Optional[t.Dict[str, t.Any]] = None, + input_partition_rows: t.Optional[t.Union[int, str]] = None, + resources: t.Optional[Resources] = None, + cache: t.Optional[bool] = True, + cluster_type: t.Optional[str] = "default", + client_kwargs: t.Optional[dict] = None, + ) -> None: + """ + Write the dataset using the provided component. + + Args: + name_or_path: The name of a reusable component, or the path to the directory containing + a custom component. + arguments: A dictionary containing the argument name and value for the operation. + input_partition_rows: The number of rows to load per partition. Set to override the + automatic partitioning + resources: The resources to assign to the operation. + cache: Set to False to disable caching, True by default. + cluster_type: The type of cluster to use for distributed execution (default is "local"). + client_kwargs: Keyword arguments used to initialise the Dask client. + + Returns: + An intermediate dataset. + """ + operation = ComponentOp( + name_or_path, + arguments=arguments, + input_partition_rows=input_partition_rows, + resources=resources, + cache=cache, + cluster_type=cluster_type, + client_kwargs=client_kwargs, + ) + self.pipeline._apply(operation, self) diff --git a/tests/examples/example_modules/invalid_double_pipeline.py b/tests/examples/example_modules/invalid_double_pipeline.py index c518d20e2..132f3e4c1 100644 --- a/tests/examples/example_modules/invalid_double_pipeline.py +++ b/tests/examples/example_modules/invalid_double_pipeline.py @@ -1,4 +1,4 @@ from fondant.pipeline import Pipeline -TEST_PIPELINE = Pipeline(pipeline_name="test_pipeline", base_path="some/path") -TEST_PIPELINE_2 = Pipeline(pipeline_name="test_pipeline", base_path="some/path") +TEST_PIPELINE = Pipeline(name="test_pipeline", base_path="some/path") +TEST_PIPELINE_2 = Pipeline(name="test_pipeline", base_path="some/path") diff --git a/tests/examples/example_modules/pipeline.py b/tests/examples/example_modules/pipeline.py index 83b20a77a..0b54b8f61 100644 --- a/tests/examples/example_modules/pipeline.py +++ b/tests/examples/example_modules/pipeline.py @@ -2,11 +2,11 @@ def create_pipeline_with_args(name): - return Pipeline(pipeline_name=name, base_path="some/path") + return Pipeline(name=name, base_path="some/path") def create_pipeline(): - return Pipeline(pipeline_name="test_pipeline", base_path="some/path") + return Pipeline(name="test_pipeline", base_path="some/path") def not_implemented(): diff --git a/tests/integration_tests/sample_pipeline_test/components/dummy_component/README.md b/tests/integration_tests/sample_pipeline_test/components/dummy_component/README.md index 97b3309e0..79fc51efb 100644 --- a/tests/integration_tests/sample_pipeline_test/components/dummy_component/README.md +++ b/tests/integration_tests/sample_pipeline_test/components/dummy_component/README.md @@ -34,18 +34,20 @@ The component takes the following arguments to alter its behavior: You can add this component to your pipeline using the following code: ```python -from fondant.pipeline import ComponentOp +from fondant.pipeline import Pipeline -chunk_text_op = ComponentOp.from_registry( - name="chunk_text", +pipeline = Pipeline(...) +dataset = pipeline.read(...) + +dataset.apply( + name_or_path="chunk_text", arguments={ # Add arguments # "chunk_size": 0, # "chunk_overlap": 0, } ) -pipeline.add_op(chunk_text_op, dependencies=[...]) #Add previous component as dependency ``` ### Testing diff --git a/tests/integration_tests/test_sample_pipeline.py b/tests/integration_tests/test_sample_pipeline.py index 8e7f6fbda..39b6b732f 100644 --- a/tests/integration_tests/test_sample_pipeline.py +++ b/tests/integration_tests/test_sample_pipeline.py @@ -7,7 +7,7 @@ from pathlib import Path import pytest -from fondant.pipeline import ComponentOp, Pipeline +from fondant.pipeline import Pipeline from fondant.pipeline.compiler import DockerCompiler from fondant.pipeline.runner import DockerRunner @@ -24,15 +24,15 @@ @pytest.fixture() def sample_pipeline(data_dir="./data") -> Pipeline: # Define pipeline - pipeline = Pipeline(pipeline_name="dummy-pipeline", base_path=data_dir) + pipeline = Pipeline(name="dummy-pipeline", base_path=data_dir) # Load from hub component load_component_column_mapping = { "text": "text_data", } - load_from_file = ComponentOp( - component_dir=Path(BASE_PATH / "components" / "load_from_parquet"), + dataset = pipeline.read( + name_or_path=Path(BASE_PATH / "components" / "load_from_parquet"), arguments={ "dataset_uri": "/data/sample.parquet", "column_name_mapping": load_component_column_mapping, @@ -40,20 +40,15 @@ def sample_pipeline(data_dir="./data") -> Pipeline: }, ) - custom_dummy_component = ComponentOp( - component_dir=Path(BASE_PATH / "components" / "dummy_component"), + dataset = dataset.apply( + name_or_path=Path(BASE_PATH / "components" / "dummy_component"), ) - chunk_text = ComponentOp.from_registry( - name="chunk_text", + dataset.apply( + name_or_path="chunk_text", arguments={"chunk_size": 10, "chunk_overlap": 2}, ) - # Add components to the pipeline - pipeline.add_op(load_from_file) - pipeline.add_op(custom_dummy_component, dependencies=load_from_file) - pipeline.add_op(chunk_text, dependencies=[custom_dummy_component]) - return pipeline diff --git a/tests/pipeline/test_compiler.py b/tests/pipeline/test_compiler.py index 539155ff8..55b31d5dd 100644 --- a/tests/pipeline/test_compiler.py +++ b/tests/pipeline/test_compiler.py @@ -70,8 +70,8 @@ "cache_key": "1", }, { - "component_op": ComponentOp.from_registry( - name="crop_images", + "component_op": ComponentOp( + "crop_images", arguments={"cropping_threshold": 0, "padding": 0}, ), "cache_key": "2", @@ -98,12 +98,12 @@ def now(cls): @pytest.fixture(params=TEST_PIPELINES) def setup_pipeline(request, tmp_path, monkeypatch): pipeline = Pipeline( - pipeline_name="testpipeline", - pipeline_description="description of the test pipeline", + name="testpipeline", + description="description of the test pipeline", base_path="/foo/bar", ) example_dir, components = request.param - prev_comp = None + dataset = None cache_dict = {} for component_dict in components: component = component_dict["component_op"] @@ -115,8 +115,7 @@ def setup_pipeline(request, tmp_path, monkeypatch): "get_component_cache_key", lambda cache_key=cache_key, previous_component_cache=None: cache_key, ) - pipeline.add_op(component, dependencies=prev_comp) - prev_comp = component + dataset = pipeline._apply(component, datasets=dataset) cache_dict[component.name] = cache_key # override the default package_path with temporary path to avoid the creation of artifacts @@ -142,7 +141,7 @@ def test_docker_compiler(setup_pipeline, tmp_path_factory): ) in pipeline_configs.component_configs.items(): # Get exepcted component configs component = pipeline._graph[component_name] - component_op = component["fondant_component_op"] + component_op = component["operation"] # Check that the component configs are correct assert component_configs.dependencies == component["dependencies"] @@ -278,11 +277,11 @@ def test_docker_extra_volumes(setup_pipeline, tmp_path_factory): def test_docker_configuration(tmp_path_factory): """Test that extra volumes are applied correctly.""" pipeline = Pipeline( - pipeline_name="test_pipeline", - pipeline_description="description of the test pipeline", + name="test_pipeline", + description="description of the test pipeline", base_path="/foo/bar", ) - component_1 = ComponentOp( + pipeline.read( Path(COMPONENTS_PATH / "example_1" / "first_component"), arguments={"storage_args": "a dummy string arg"}, resources=Resources( @@ -291,7 +290,6 @@ def test_docker_configuration(tmp_path_factory): ), ) - pipeline.add_op(component_1) compiler = DockerCompiler() with tmp_path_factory.mktemp("temp") as fn: output_path = str(fn / "docker-compose.yaml") @@ -306,11 +304,11 @@ def test_docker_configuration(tmp_path_factory): def test_invalid_docker_configuration(tmp_path_factory): """Test that a valid error is returned when an unknown accelerator is set.""" pipeline = Pipeline( - pipeline_name="test_pipeline", - pipeline_description="description of the test pipeline", + name="test_pipeline", + description="description of the test pipeline", base_path="/foo/bar", ) - component_1 = ComponentOp( + pipeline.read( Path(COMPONENTS_PATH / "example_1" / "first_component"), arguments={"storage_args": "a dummy string arg"}, resources=Resources( @@ -319,7 +317,6 @@ def test_invalid_docker_configuration(tmp_path_factory): ), ) - pipeline.add_op(component_1) compiler = DockerCompiler() with pytest.raises(InvalidPipelineDefinition): compiler.compile(pipeline=pipeline, output_path="kubeflow_pipeline.yml") @@ -342,7 +339,7 @@ def test_kubeflow_compiler(setup_pipeline, tmp_path_factory): ) in pipeline_configs.component_configs.items(): # Get exepcted component configs component = pipeline._graph[component_name] - component_op = component["fondant_component_op"] + component_op = component["operation"] # Check that the component configs are correct assert component_configs.dependencies == component["dependencies"] @@ -369,11 +366,11 @@ def test_kubeflow_configuration(tmp_path_factory): node_pool_name = "dummy_label" pipeline = Pipeline( - pipeline_name="test_pipeline", - pipeline_description="description of the test pipeline", + name="test_pipeline", + description="description of the test pipeline", base_path="/foo/bar", ) - component_1 = ComponentOp( + pipeline.read( Path(COMPONENTS_PATH / "example_1" / "first_component"), arguments={"storage_args": "a dummy string arg"}, resources=Resources( @@ -383,7 +380,6 @@ def test_kubeflow_configuration(tmp_path_factory): accelerator_name="GPU", ), ) - pipeline.add_op(component_1) compiler = KubeFlowCompiler() with tmp_path_factory.mktemp("temp") as fn: output_path = str(fn / "kubeflow_pipeline.yml") @@ -401,11 +397,11 @@ def test_kubeflow_configuration(tmp_path_factory): def test_invalid_kubeflow_configuration(tmp_path_factory): """Test that an error is returned when an invalid resource is provided.""" pipeline = Pipeline( - pipeline_name="test_pipeline", - pipeline_description="description of the test pipeline", + name="test_pipeline", + description="description of the test pipeline", base_path="/foo/bar", ) - component_1 = ComponentOp( + pipeline.read( Path(COMPONENTS_PATH / "example_1" / "first_component"), arguments={"storage_args": "a dummy string arg"}, resources=Resources( @@ -414,7 +410,6 @@ def test_invalid_kubeflow_configuration(tmp_path_factory): ), ) - pipeline.add_op(component_1) compiler = KubeFlowCompiler() with pytest.raises(InvalidPipelineDefinition): compiler.compile(pipeline=pipeline, output_path="kubeflow_pipeline.yml") @@ -446,7 +441,7 @@ def test_vertex_compiler(setup_pipeline, tmp_path_factory): ) in pipeline_configs.component_configs.items(): # Get exepcted component configs component = pipeline._graph[component_name] - component_op = component["fondant_component_op"] + component_op = component["operation"] # Check that the component configs are correct assert component_configs.dependencies == component["dependencies"] @@ -470,11 +465,11 @@ def test_vertex_compiler(setup_pipeline, tmp_path_factory): def test_vertex_configuration(tmp_path_factory): """Test that the kubeflow pipeline can be configured.""" pipeline = Pipeline( - pipeline_name="test_pipeline", - pipeline_description="description of the test pipeline", + name="test_pipeline", + description="description of the test pipeline", base_path="/foo/bar", ) - component_1 = ComponentOp( + pipeline.read( Path(COMPONENTS_PATH / "example_1" / "first_component"), arguments={"storage_args": "a dummy string arg"}, resources=Resources( @@ -482,7 +477,6 @@ def test_vertex_configuration(tmp_path_factory): accelerator_name="NVIDIA_TESLA_K80", ), ) - pipeline.add_op(component_1) compiler = VertexCompiler() with tmp_path_factory.mktemp("temp") as fn: output_path = str(fn / "kubeflow_pipeline.yml") @@ -498,11 +492,11 @@ def test_vertex_configuration(tmp_path_factory): def test_invalid_vertex_configuration(tmp_path_factory): """Test that extra volumes are applied correctly.""" pipeline = Pipeline( - pipeline_name="test_pipeline", - pipeline_description="description of the test pipeline", + name="test_pipeline", + description="description of the test pipeline", base_path="/foo/bar", ) - component_1 = ComponentOp( + pipeline.read( Path(COMPONENTS_PATH / "example_1" / "first_component"), arguments={"storage_args": "a dummy string arg"}, resources=Resources( @@ -511,7 +505,6 @@ def test_invalid_vertex_configuration(tmp_path_factory): ), ) - pipeline.add_op(component_1) compiler = VertexCompiler() with pytest.raises(InvalidPipelineDefinition): compiler.compile(pipeline=pipeline, output_path="kubeflow_pipeline.yml") @@ -526,24 +519,21 @@ def test_caching_dependency_docker(tmp_path_factory): for arg in arg_list: pipeline = Pipeline( - pipeline_name="test_pipeline", - pipeline_description="description of the test pipeline", + name="test_pipeline", + description="description of the test pipeline", base_path="/foo/bar", ) compiler = DockerCompiler() - component_1 = ComponentOp( + dataset = pipeline.read( Path(COMPONENTS_PATH / "example_1" / "first_component"), arguments={"storage_args": f"{arg}"}, ) - component_2 = ComponentOp( + dataset.apply( Path(COMPONENTS_PATH / "example_1" / "second_component"), arguments={"storage_args": "a dummy string arg"}, ) - pipeline.add_op(component_1) - pipeline.add_op(component_2, dependencies=component_1) - with tmp_path_factory.mktemp("temp") as fn: output_path = str(fn / "docker-compose.yml") compiler.compile(pipeline=pipeline, output_path=output_path, build_args=[]) @@ -571,24 +561,21 @@ def test_caching_dependency_kfp(tmp_path_factory): for arg in arg_list: pipeline = Pipeline( - pipeline_name="test_pipeline", - pipeline_description="description of the test pipeline", + name="test_pipeline", + description="description of the test pipeline", base_path="/foo/bar", ) compiler = KubeFlowCompiler() - component_1 = ComponentOp( + dataset = pipeline.read( Path(COMPONENTS_PATH / "example_1" / "first_component"), arguments={"storage_args": f"{arg}"}, ) - component_2 = ComponentOp( + dataset.apply( Path(COMPONENTS_PATH / "example_1" / "second_component"), arguments={"storage_args": "a dummy string arg"}, ) - pipeline.add_op(component_1) - pipeline.add_op(component_2, dependencies=component_1) - with tmp_path_factory.mktemp("temp") as fn: output_path = str(fn / "kubeflow_pipeline.yml") compiler.compile(pipeline=pipeline, output_path=output_path) diff --git a/tests/pipeline/test_pipeline.py b/tests/pipeline/test_pipeline.py index 9c85cbf02..542994011 100644 --- a/tests/pipeline/test_pipeline.py +++ b/tests/pipeline/test_pipeline.py @@ -20,7 +20,7 @@ def yaml_file_to_dict(file_path): @pytest.fixture() def default_pipeline_args(): return { - "pipeline_name": "pipeline", + "name": "pipeline", "base_path": "gcs://bucket/blob", } @@ -156,23 +156,19 @@ def test_valid_pipeline( # override the default package_path with temporary path to avoid the creation of artifacts monkeypatch.setattr(pipeline, "package_path", str(tmp_path / "test_pipeline.tgz")) - first_component_op = ComponentOp( + dataset = pipeline.read( Path(components_path / component_names[0]), arguments=component_args, ) - second_component_op = ComponentOp( + dataset = dataset.apply( Path(components_path / component_names[1]), arguments=component_args, ) - third_component_op = ComponentOp( + dataset.apply( Path(components_path / component_names[2]), arguments=component_args, ) - pipeline.add_op(third_component_op, dependencies=second_component_op) - pipeline.add_op(first_component_op) - pipeline.add_op(second_component_op, dependencies=first_component_op) - pipeline.sort_graph() assert list(pipeline._graph.keys()) == [ "first_component", @@ -206,23 +202,19 @@ def test_invalid_pipeline_dependencies(default_pipeline_args, valid_pipeline_exa pipeline = Pipeline(**default_pipeline_args) - first_component_op = ComponentOp( + dataset = pipeline.read( Path(components_path / component_names[0]), arguments=component_args, ) - second_component_op = ComponentOp( + dataset = dataset.apply( Path(components_path / component_names[1]), arguments=component_args, ) - third_component_op = ComponentOp( - Path(components_path / component_names[2]), - arguments=component_args, - ) - - pipeline.add_op(third_component_op, dependencies=second_component_op) - pipeline.add_op(second_component_op) with pytest.raises(InvalidPipelineDefinition): - pipeline.add_op(first_component_op) + pipeline.read( + Path(components_path / component_names[2]), + arguments=component_args, + ) @pytest.mark.parametrize( @@ -246,18 +238,15 @@ def test_invalid_pipeline_declaration( pipeline = Pipeline(**default_pipeline_args) - first_component_op = ComponentOp( + dataset = pipeline.read( Path(components_path / component_names[0]), arguments=component_args, ) - second_component_op = ComponentOp( + dataset.apply( Path(components_path / component_names[1]), arguments=component_args, ) - pipeline.add_op(first_component_op) - pipeline.add_op(second_component_op, dependencies=first_component_op) - with pytest.raises(InvalidPipelineDefinition): pipeline._validate_pipeline_definition("test_pipeline") @@ -281,23 +270,17 @@ def test_invalid_pipeline_validation(default_pipeline_args): # double dependency pipeline1 = Pipeline(**default_pipeline_args) - pipeline1.add_op(first_component_op) + dataset = pipeline1._apply(first_component_op) with pytest.raises(InvalidPipelineDefinition): - pipeline1.add_op( + pipeline1._apply( second_component_op, - dependencies=[first_component_op, first_component_op], + datasets=[dataset, dataset], ) - # 2 components with no dependencies - pipeline2 = Pipeline(**default_pipeline_args) - pipeline2.add_op(first_component_op) - with pytest.raises(InvalidPipelineDefinition): - pipeline2.add_op(second_component_op) - def test_reusable_component_op(): - laion_retrieval_op = ComponentOp.from_registry( - name="retrieve_laion_by_prompt", + laion_retrieval_op = ComponentOp( + name_or_path="retrieve_laion_by_prompt", arguments={"num_images": 2, "aesthetic_score": 9, "aesthetic_weight": 0.5}, ) assert laion_retrieval_op.component_spec, "component_spec_path could not be loaded" @@ -307,14 +290,12 @@ def test_reusable_component_op(): ValueError, match=f"No reusable component with name {component_name} " "found.", ): - ComponentOp.from_registry( - name=component_name, - ) + ComponentOp(component_name) def test_defining_reusable_component_op_with_custom_spec(): - load_from_hub_default_op = ComponentOp.from_registry( - name="load_from_hf_hub", + load_from_hub_default_op = ComponentOp( + name_or_path="load_from_hf_hub", arguments={ "dataset_name": "test_dataset", "column_name_mapping": {"foo": "bar"}, @@ -323,7 +304,7 @@ def test_defining_reusable_component_op_with_custom_spec(): ) load_from_hub_custom_op = ComponentOp( - component_dir=load_from_hub_default_op.component_dir, + name_or_path=load_from_hub_default_op.component_dir, arguments={ "dataset_name": "test_dataset", "column_name_mapping": {"foo": "bar"}, @@ -338,6 +319,6 @@ def test_defining_reusable_component_op_with_custom_spec(): def test_pipeline_name(): - Pipeline(pipeline_name="valid-name", base_path="base_path") + Pipeline(name="valid-name", base_path="base_path") with pytest.raises(InvalidPipelineDefinition, match="The pipeline name violates"): - Pipeline(pipeline_name="invalid name", base_path="base_path") + Pipeline(name="invalid name", base_path="base_path") diff --git a/tests/pipeline/test_runner.py b/tests/pipeline/test_runner.py index 7da895b98..d4a6b3735 100644 --- a/tests/pipeline/test_runner.py +++ b/tests/pipeline/test_runner.py @@ -15,8 +15,8 @@ VALID_PIPELINE = Path("./tests/pipeline/examples/pipelines/compiled_pipeline/") PIPELINE = Pipeline( - pipeline_name="testpipeline", - pipeline_description="description of the test pipeline", + name="testpipeline", + description="description of the test pipeline", base_path="/foo/bar", ) diff --git a/tests/test_cli.py b/tests/test_cli.py index dd92616fd..d8b781f57 100644 --- a/tests/test_cli.py +++ b/tests/test_cli.py @@ -50,7 +50,7 @@ def test_basic_invocation(command): assert process.returncode == 0 -TEST_PIPELINE = Pipeline(pipeline_name="test_pipeline", base_path="some/path") +TEST_PIPELINE = Pipeline("test_pipeline", base_path="some/path") @pytest.mark.parametrize(