From a0838cef382de5b2f8946e98d3b3be30ad81f189 Mon Sep 17 00:00:00 2001 From: Georges Lorre Date: Wed, 12 Jul 2023 15:47:58 +0200 Subject: [PATCH 1/8] Add draft structure for kfp refactor --- src/fondant/compiler.py | 59 ++++++++++ src/fondant/pipeline.py | 240 +--------------------------------------- src/fondant/runner.py | 30 +++++ 3 files changed, 90 insertions(+), 239 deletions(-) diff --git a/src/fondant/compiler.py b/src/fondant/compiler.py index 9f7ef3bd8..ffc3fd6ca 100644 --- a/src/fondant/compiler.py +++ b/src/fondant/compiler.py @@ -8,6 +8,7 @@ import yaml +from fondant.import_utils import is_kfp_available from fondant.pipeline import Pipeline logger = logging.getLogger(__name__) @@ -207,3 +208,61 @@ def _generate_spec( "version": "3.8", "services": services, } + + +class KubeFlowCompiler(Compiler): + """Compiler that creates a Kubeflow pipeline spec from a pipeline.""" + + def _resolve_imports(self): + """Resolve imports for the Kubeflow compiler.""" + try: + import kfp + from kfp import dsl + except ImportError: + raise ImportError( + "You need to install kfp to use the Kubeflow compiler, " + / "you can install it with `poetry install --extras kfp`" + ) + + def compile( + self, + pipeline: Pipeline, + output_path: str = "kubeflow_pipeline.py", + ) -> None: + """Compile a pipeline to Kubeflow pipeline spec and save it to a specified output path. + + Args: + pipeline: the pipeline to compile + output_path: the path where to save the Kubeflow pipeline spec + """ + self._resolve_imports() + logger.info(f"Compiling {pipeline.name} to {output_path}") + wrapped_pipeline = dsl.pipeline( + name=pipeline.name, description=pipeline.description + )(self.kf_pipeline) + kfp.compiler.Compiler().compile(wrapped_pipeline, output_path) + logger.info("Pipeline compiled successfully") + + def kf_pipeline(self, pipeline: Pipeline): + for component_name, component in pipeline._graph.items(): + logger.info(f"Compiling service for {component_name}") + + # convert ComponentOp to Kubeflow component + component_task = component.kubeflow_specification.to_string() + + # add configuration to Kubeflow component (CPU, GPU, etc.) + component_task = self._set_configuration(component_task, component) + + # add dependency to task + if component.dependencies: + component_task.after(previous_component_task) + + return pipeline + + def _set_configuration(self, task, fondant_component_operation): + # Unpack optional specifications + number_of_gpus = fondant_component_operation.number_of_gpus + if number_of_gpus is not None: + task.set_gpu_limit(number_of_gpus) + # TODO add rest + return task diff --git a/src/fondant/pipeline.py b/src/fondant/pipeline.py index a0d20d391..3e259962a 100644 --- a/src/fondant/pipeline.py +++ b/src/fondant/pipeline.py @@ -284,7 +284,7 @@ def _validate_pipeline_definition(self, run_id: str): base_path: the base path where to store the pipelines artifacts run_id: the run id of the component """ - # TODO: change later if we decide to run 2 fondan pipelines after each other + # TODO: change later if we decide to run 2 fondant pipelines after each other load_component = True load_component_name = list(self._graph.keys())[0] @@ -348,241 +348,3 @@ def _validate_pipeline_definition(self, run_id: str): load_component = False logger.info("All pipeline component specifications match.") - - def compile(self): - """ - Function that creates and compiles a Kubeflow Pipeline. - Once you have generated the pipeline function, you can use it to create an instance of - the pipeline and compile it using the Kubeflow compiler. - """ - - def _get_component_function( - fondant_component_operation: ComponentOp, - ) -> t.Callable: - """ - Load the Kubeflow component based on the specification from the fondant component - operation. - - Args: - fondant_component_operation (ComponentOp): The fondant component - operation. - - Returns: - Callable: The Kubeflow component. - """ - return kfp.components.load_component( - text=fondant_component_operation.component_spec.kubeflow_specification.to_string(), - ) - - def _set_task_configuration(task, fondant_component_operation): - # Unpack optional specifications - number_of_gpus = fondant_component_operation.number_of_gpus - node_pool_label = fondant_component_operation.node_pool_label - node_pool_name = fondant_component_operation.node_pool_name - p_volumes = fondant_component_operation.p_volumes - ephemeral_storage_size = fondant_component_operation.ephemeral_storage_size - - # Assign optional specification - if number_of_gpus is not None: - task.set_gpu_limit(number_of_gpus) - if node_pool_label is not None and node_pool_name is not None: - task.add_node_selector_constraint(node_pool_label, node_pool_name) - if p_volumes is not None: - task.add_pvolumes(p_volumes) - if ephemeral_storage_size is not None: - task.set_ephemeral_storage_request(ephemeral_storage_size) - - return task - - # Sort graph based on specified dependencies - self.sort_graph() - - # parse metadata argument required for the first component - run_id = "{{workflow.name}}" - - # Validate subset schema before defining the pipeline - self._validate_pipeline_definition(run_id) - - @dsl.pipeline(name=self.name, description=self.description) - def pipeline(): - # TODO: check if we want to have the manifest path empty for loading component or remove - # it completely from the loading component - # TODO: check if we want to have the metadata arg empty for transform component or - # remove it completely from the transform component - manifest_path = "" - metadata = "" - previous_component_task = None - for operation in self._graph.values(): - fondant_component_op = operation["fondant_component_op"] - - # Get the Kubeflow component based on the fondant component operation. - kubeflow_component_op = _get_component_function(fondant_component_op) - - # Execute the Kubeflow component and pass in the output manifest path from - # the previous component. - component_args = fondant_component_op.arguments - - if previous_component_task is not None: - component_task = kubeflow_component_op( - input_manifest_path=manifest_path, - metadata=metadata, - **component_args, - ) - else: - metadata = json.dumps( - {"base_path": self.base_path, "run_id": run_id}, - ) - # Add metadata to the first component - component_task = kubeflow_component_op( - input_manifest_path=manifest_path, - metadata=metadata, - **component_args, - ) - metadata = "" - # Set optional configurations - component_task = _set_task_configuration( - component_task, - fondant_component_op, - ) - # Set the execution order of the component task to be after the previous - # component task. - if previous_component_task is not None: - component_task.after(previous_component_task) - - # Update the manifest path to be the output path of the current component task. - manifest_path = component_task.outputs["output_manifest_path"] - - previous_component_task = component_task - - return pipeline - - logger.info(f"Compiling pipeline: {self.name}") - - kfp.compiler.Compiler().compile(pipeline, self.package_path) - - logger.info("Pipeline compiled successfully") - - def __repr__(self) -> str: - """Return a string representation of the FondantPipeline object.""" - return f"{self.__class__.__name__}({self._graph!r}" - - -class Client: - """Class representing a Fondant Client.""" - - def __init__(self, host: str): - """ - Args: - host: The `host` URL argument specifies the Kubeflow Pipelines API endpoint to - which the client should send requests. - """ - self.host = host - self.client = kfp.Client(host=self.host) - - def get_pipeline_id(self, pipeline_name: str) -> str: - """ - Function that returns the id of a pipeline given a pipeline name - Args: - pipeline_name: the name of the pipeline - Returns: - The pipeline id. - """ - return self.client.get_pipeline_id(pipeline_name) - - def get_pipeline_version_ids(self, pipeline_id: str) -> t.List[str]: - """Function that returns the versions of a pipeline given a pipeline id.""" - pipeline_versions = self.client.list_pipeline_versions(pipeline_id).versions - return [version.id for version in pipeline_versions] - - def delete_pipeline(self, pipeline_name: str): - """ - Function that deletes the pipeline name - Args: - pipeline_name: the name of the pipeline to delete. - """ - pipeline_id = self.get_pipeline_id(pipeline_name) - if pipeline_id is not None: - pipeline_version_ids = self.get_pipeline_version_ids(pipeline_id) - # All versions need to be first deleted - for pipeline_version_id in pipeline_version_ids: - self.client.delete_pipeline_version(pipeline_version_id) - self.client.delete_pipeline(pipeline_id) - - logger.info( - f"Pipeline {pipeline_name} already exists. Deleting old pipeline...", - ) - else: - logger.info(f"No existing pipeline under `{pipeline_name}` name was found.") - - def compile_and_upload( - self, - pipeline: Pipeline, - delete_pipeline_package: t.Optional[bool] = False, - ): - """ - Uploads a pipeline package to Kubeflow Pipelines and deletes any existing pipeline with the - same name. - - Args: - pipeline: The fondant pipeline to compile and upload - delete_pipeline_package: Whether to delete the pipeline package file - after uploading. Defaults to False. - - Raises: - Exception: If there was an error uploading the pipeline package. - """ - pipeline.compile() - - logger.info(f"Uploading pipeline: {pipeline.name}") - - try: - self.client.upload_pipeline( - pipeline_package_path=pipeline.package_path, - pipeline_name=pipeline.name, - ) - except Exception as e: - msg = f"Error uploading pipeline package: {str(e)}" - raise Exception(msg) - - # Delete the pipeline package file if specified. - if delete_pipeline_package: - Path(pipeline.package_path).unlink() - - def compile_and_run( - self, - pipeline: Pipeline, - run_name: t.Optional[str] = None, - experiment_name: t.Optional[str] = "Default", - ): - """ - Compiles and runs the specified pipeline. - - Args: - pipeline: The pipeline object to be compiled and run. - run_name: The name of the run. If not provided, the pipeline's name with 'run' appended - will be used. - experiment_name: The name of the experiment where the pipeline will be run. - Default is 'Default'. - """ - pipeline.compile() - - try: - experiment = self.client.get_experiment(experiment_name=experiment_name) - except ValueError: - logger.info( - f"Defined experiment '{experiment_name}' not found. Creating new experiment" - f"under this name", - ) - experiment = self.client.create_experiment(experiment_name) - - if run_name is None: - run_name = pipeline.name + " run" - - pipeline_spec = self.client.run_pipeline( - experiment_id=experiment.id, - job_name=run_name, - pipeline_package_path=pipeline.package_path, - ) - - pipeline_url = f"{self.host}/#/runs/details/{pipeline_spec.id}" - logger.info(f"Pipeline is running at: {pipeline_url}") diff --git a/src/fondant/runner.py b/src/fondant/runner.py index eda71142e..5c8e12f6f 100644 --- a/src/fondant/runner.py +++ b/src/fondant/runner.py @@ -1,6 +1,9 @@ +import logging import subprocess # nosec from abc import ABC, abstractmethod +logger = logging.getLogger(__name__) + class Runner(ABC): """Abstract base class for a runner.""" @@ -26,3 +29,30 @@ def run(self, input_spec: str, *args, **kwargs): ] subprocess.call(cmd) # nosec + + +class KubeflowRunner(Runner): + @abstractmethod + def _resolve_imports(self): + """Resolve imports for the Kubeflow compiler.""" + try: + import kfp + except ImportError: + raise ImportError( + "You need to install kfp to use the Kubeflow compiler, " + / "you can install it with `poetry install --extras kfp`" + ) + + def run(cls, input_spec: str, host: str, *args, **kwargs): + """Run a kubeflow pipeline.""" + cls._resolve_imports() + client = kfp.Client(host=host) + # TODO add logic to see if pipeline exists + pipeline_spec = client.run_pipeline( + experiment_id=experiment.id, + job_name=run_name, + pipeline_package_path=pipeline.package_path, + ) + + pipeline_url = f"{self.host}/#/runs/details/{pipeline_spec.id}" + logger.info(f"Pipeline is running at: {pipeline_url}") From 0b493ba6f0aa144341a9c25b4bcae1e52dce7ee4 Mon Sep 17 00:00:00 2001 From: Georges Lorre Date: Wed, 19 Jul 2023 14:03:41 +0200 Subject: [PATCH 2/8] Update kubeflow compiler code for new structure --- src/fondant/compiler.py | 83 ++++++++++++++++++++++++++++--------- src/fondant/import_utils.py | 2 +- src/fondant/pipeline.py | 2 - src/fondant/runner.py | 28 +++++++------ 4 files changed, 80 insertions(+), 35 deletions(-) diff --git a/src/fondant/compiler.py b/src/fondant/compiler.py index ffc3fd6ca..58c66dd90 100644 --- a/src/fondant/compiler.py +++ b/src/fondant/compiler.py @@ -8,7 +8,6 @@ import yaml -from fondant.import_utils import is_kfp_available from fondant.pipeline import Pipeline logger = logging.getLogger(__name__) @@ -18,7 +17,7 @@ class Compiler(ABC): """Abstract base class for a compiler.""" @abstractmethod - def compile(self, *args, **kwargs): + def compile(self, *args, **kwargs) -> None: """Abstract method to invoke compilation.""" @@ -213,21 +212,25 @@ def _generate_spec( class KubeFlowCompiler(Compiler): """Compiler that creates a Kubeflow pipeline spec from a pipeline.""" + def __init__(self): + self._resolve_imports() + def _resolve_imports(self): """Resolve imports for the Kubeflow compiler.""" try: import kfp - from kfp import dsl + + self.kfp = kfp except ImportError: raise ImportError( "You need to install kfp to use the Kubeflow compiler, " - / "you can install it with `poetry install --extras kfp`" + / "you can install it with `pip install --extras kfp`", ) def compile( self, pipeline: Pipeline, - output_path: str = "kubeflow_pipeline.py", + output_path: str = "kubeflow_pipeline.yml", ) -> None: """Compile a pipeline to Kubeflow pipeline spec and save it to a specified output path. @@ -235,34 +238,74 @@ def compile( pipeline: the pipeline to compile output_path: the path where to save the Kubeflow pipeline spec """ - self._resolve_imports() - logger.info(f"Compiling {pipeline.name} to {output_path}") - wrapped_pipeline = dsl.pipeline( - name=pipeline.name, description=pipeline.description - )(self.kf_pipeline) - kfp.compiler.Compiler().compile(wrapped_pipeline, output_path) + self.pipeline = pipeline + logger.info(f"Compiling {self.pipeline.name} to {output_path}") + wrapped_pipeline = self.kfp.dsl.pipeline(self.kf_pipeline) # type: ignore + self.kfp.compiler.Compiler().compile(wrapped_pipeline, output_path) # type: ignore logger.info("Pipeline compiled successfully") - def kf_pipeline(self, pipeline: Pipeline): - for component_name, component in pipeline._graph.items(): + def kf_pipeline(self): + previous_component_task = None + manifest_path = "" + for component_name, component in self.pipeline._graph.items(): logger.info(f"Compiling service for {component_name}") + component_op = component["fondant_component_op"] # convert ComponentOp to Kubeflow component - component_task = component.kubeflow_specification.to_string() + kubeflow_component_op = self.kfp.components.load_component( + text=component_op.component_spec.kubeflow_specification.to_string(), + ) - # add configuration to Kubeflow component (CPU, GPU, etc.) - component_task = self._set_configuration(component_task, component) + # Execute the Kubeflow component and pass in the output manifest path from + # the previous component. + component_args = component_op.arguments + metadata = json.dumps( + {"base_path": self.pipeline.base_path, "run_id": "{{workflow.name}}"}, + ) - # add dependency to task - if component.dependencies: + if previous_component_task is not None: + component_task = kubeflow_component_op( + input_manifest_path=manifest_path, + metadata=metadata, + **component_args, + ) + else: + # Add metadata to the first component + component_task = kubeflow_component_op( + input_manifest_path=manifest_path, + metadata=metadata, + **component_args, + ) + # Set optional configurations + component_task = self._set_configuration( + component_task, + component_op, + ) + # Set the execution order of the component task to be after the previous + # component task. + if previous_component_task is not None: component_task.after(previous_component_task) - return pipeline + # Update the manifest path to be the output path of the current component task. + manifest_path = component_task.outputs["output_manifest_path"] + + previous_component_task = component_task def _set_configuration(self, task, fondant_component_operation): # Unpack optional specifications number_of_gpus = fondant_component_operation.number_of_gpus + node_pool_name = fondant_component_operation.node_pool_name + p_volumes = fondant_component_operation.p_volumes + ephemeral_storage_size = fondant_component_operation.ephemeral_storage_size + + # Assign optional specification if number_of_gpus is not None: task.set_gpu_limit(number_of_gpus) - # TODO add rest + if node_pool_name is not None: + task.add_node_selector_constraint("node_pool", node_pool_name) + if p_volumes is not None: + task.add_pvolumes(p_volumes) + if ephemeral_storage_size is not None: + task.set_ephemeral_storage_request(ephemeral_storage_size) + return task diff --git a/src/fondant/import_utils.py b/src/fondant/import_utils.py index 65ba634d7..8619b4aa1 100644 --- a/src/fondant/import_utils.py +++ b/src/fondant/import_utils.py @@ -37,5 +37,5 @@ def is_package_available(package_name: str, import_error_msg: str) -> bool: def is_kfp_available(): - """Check if 'pandas' is available.""" + """Check if 'kfp' is available.""" return is_package_available("kfp", KFP_IMPORT_ERROR) diff --git a/src/fondant/pipeline.py b/src/fondant/pipeline.py index 3e259962a..ed37ea117 100644 --- a/src/fondant/pipeline.py +++ b/src/fondant/pipeline.py @@ -18,8 +18,6 @@ from fondant.schema import validate_partition_number, validate_partition_size if is_kfp_available(): - import kfp - from kfp import dsl from kubernetes import client as k8s_client logger = logging.getLogger(__name__) diff --git a/src/fondant/runner.py b/src/fondant/runner.py index 5c8e12f6f..286c2922a 100644 --- a/src/fondant/runner.py +++ b/src/fondant/runner.py @@ -32,27 +32,31 @@ def run(self, input_spec: str, *args, **kwargs): class KubeflowRunner(Runner): + def __init__(self): + self._resolve_imports() + @abstractmethod def _resolve_imports(self): """Resolve imports for the Kubeflow compiler.""" try: + global kfp import kfp except ImportError: raise ImportError( "You need to install kfp to use the Kubeflow compiler, " - / "you can install it with `poetry install --extras kfp`" + / "you can install it with `pip install --extras kfp`", ) def run(cls, input_spec: str, host: str, *args, **kwargs): """Run a kubeflow pipeline.""" - cls._resolve_imports() - client = kfp.Client(host=host) - # TODO add logic to see if pipeline exists - pipeline_spec = client.run_pipeline( - experiment_id=experiment.id, - job_name=run_name, - pipeline_package_path=pipeline.package_path, - ) - - pipeline_url = f"{self.host}/#/runs/details/{pipeline_spec.id}" - logger.info(f"Pipeline is running at: {pipeline_url}") + pass + # client = kfp.Client(host=host) + # # TODO add logic to see if pipeline exists + # pipeline_spec = client.run_pipeline( + # experiment_id=experiment.id, + # job_name=run_name, + # pipeline_package_path=pipeline.package_path, + # ) + + # pipeline_url = f"{self.host}/#/runs/details/{pipeline_spec.id}" + # logger.info(f"Pipeline is running at: {pipeline_url}") From f59b791a341e59f45f336f1b16e09cdf00a14740 Mon Sep 17 00:00:00 2001 From: Georges Lorre Date: Wed, 19 Jul 2023 15:14:20 +0200 Subject: [PATCH 3/8] Fix loose imports --- src/fondant/components | 1 - src/fondant/pipeline.py | 1 - 2 files changed, 2 deletions(-) delete mode 120000 src/fondant/components diff --git a/src/fondant/components b/src/fondant/components deleted file mode 120000 index 6e10371d3..000000000 --- a/src/fondant/components +++ /dev/null @@ -1 +0,0 @@ -../../components \ No newline at end of file diff --git a/src/fondant/pipeline.py b/src/fondant/pipeline.py index ed37ea117..66b08bab5 100644 --- a/src/fondant/pipeline.py +++ b/src/fondant/pipeline.py @@ -1,5 +1,4 @@ """This module defines classes to represent a Fondant Pipeline.""" -import json import logging import re import typing as t From c12fd20806bae0e2445528be99cce431f2606b80 Mon Sep 17 00:00:00 2001 From: Georges Lorre Date: Thu, 3 Aug 2023 15:02:20 +0200 Subject: [PATCH 4/8] Update compiler and pipeline tests to match new structure --- src/fondant/compiler.py | 9 +- .../example_1/kubeflow_pipeline.yml | 199 ++++++++++++++++++ .../example_2/kubeflow_pipeline.yml | 147 +++++++++++++ .../compiled_pipeline/kubeflow_pipeline.yml | 80 +++++++ .../first_component/fondant_component.yaml | 24 +++ .../second_component/fondant_component.yaml | 27 +++ tests/test_compiler.py | 62 +++++- tests/test_pipeline.py | 56 ++--- 8 files changed, 570 insertions(+), 34 deletions(-) create mode 100644 tests/example_pipelines/compiled_pipeline/example_1/kubeflow_pipeline.yml create mode 100644 tests/example_pipelines/compiled_pipeline/example_2/kubeflow_pipeline.yml create mode 100644 tests/example_pipelines/compiled_pipeline/kubeflow_pipeline.yml create mode 100644 tests/example_pipelines/invalid_pipeline/example_3/first_component/fondant_component.yaml create mode 100644 tests/example_pipelines/invalid_pipeline/example_3/second_component/fondant_component.yaml diff --git a/src/fondant/compiler.py b/src/fondant/compiler.py index 58c66dd90..52ce263eb 100644 --- a/src/fondant/compiler.py +++ b/src/fondant/compiler.py @@ -222,9 +222,10 @@ def _resolve_imports(self): self.kfp = kfp except ImportError: + msg = """You need to install kfp to use the Kubeflow compiler,\n + you can install it with `pip install --extras kfp`""" raise ImportError( - "You need to install kfp to use the Kubeflow compiler, " - / "you can install it with `pip install --extras kfp`", + msg, ) def compile( @@ -239,8 +240,10 @@ def compile( output_path: the path where to save the Kubeflow pipeline spec """ self.pipeline = pipeline + self.pipeline.sort_graph() + self.pipeline._validate_pipeline_definition("{{workflow.name}}") logger.info(f"Compiling {self.pipeline.name} to {output_path}") - wrapped_pipeline = self.kfp.dsl.pipeline(self.kf_pipeline) # type: ignore + wrapped_pipeline = (self.kfp.dsl.pipeline())(self.kf_pipeline) # type: ignore self.kfp.compiler.Compiler().compile(wrapped_pipeline, output_path) # type: ignore logger.info("Pipeline compiled successfully") diff --git a/tests/example_pipelines/compiled_pipeline/example_1/kubeflow_pipeline.yml b/tests/example_pipelines/compiled_pipeline/example_1/kubeflow_pipeline.yml new file mode 100644 index 000000000..b7338233c --- /dev/null +++ b/tests/example_pipelines/compiled_pipeline/example_1/kubeflow_pipeline.yml @@ -0,0 +1,199 @@ +apiVersion: argoproj.io/v1alpha1 +kind: Workflow +metadata: + generateName: kf-pipeline- + annotations: {pipelines.kubeflow.org/kfp_sdk_version: 1.8.22, pipelines.kubeflow.org/pipeline_compilation_time: '2023-01-01T00:00:00', + pipelines.kubeflow.org/pipeline_spec: '{"name": "Kf pipeline"}'} + labels: {pipelines.kubeflow.org/kfp_sdk_version: 1.8.22} +spec: + entrypoint: kf-pipeline + templates: + - name: first-component + container: + args: [] + command: [python3, main.py, --input_manifest_path, /tmp/inputs/input_manifest_path/data, + --metadata, '{"base_path": "/foo/bar", "run_id": "{{workflow.name}}"}', --component_spec, + '{"args": {"storage_args": {"description": "Storage arguments", "type": "str"}}, + "description": "This is an example component", "image": "example_component:latest", + "name": "First component", "produces": {"captions": {"fields": {"data": + {"type": "string"}}}, "images": {"fields": {"data": {"type": "binary"}}}}}', + --input_partition_rows, disable, --output_partition_size, disable, --storage_args, + a dummy string arg, --output_manifest_path, /tmp/outputs/output_manifest_path/data] + image: example_component:latest + inputs: + artifacts: + - name: input_manifest_path + path: /tmp/inputs/input_manifest_path/data + raw: {data: ''} + outputs: + artifacts: + - {name: first-component-output_manifest_path, path: /tmp/outputs/output_manifest_path/data} + metadata: + labels: + pipelines.kubeflow.org/kfp_sdk_version: 1.8.22 + pipelines.kubeflow.org/pipeline-sdk-type: kfp + pipelines.kubeflow.org/enable_caching: "true" + annotations: {pipelines.kubeflow.org/component_spec: '{"description": "This + is an example component", "implementation": {"container": {"command": ["python3", + "main.py", "--input_manifest_path", {"inputPath": "input_manifest_path"}, + "--metadata", {"inputValue": "metadata"}, "--component_spec", {"inputValue": + "component_spec"}, "--input_partition_rows", {"inputValue": "input_partition_rows"}, + "--output_partition_size", {"inputValue": "output_partition_size"}, "--storage_args", + {"inputValue": "storage_args"}, "--output_manifest_path", {"outputPath": + "output_manifest_path"}], "image": "example_component:latest"}}, "inputs": + [{"description": "Path to the input manifest", "name": "input_manifest_path", + "type": "String"}, {"description": "Metadata arguments containing the run + id and base path", "name": "metadata", "type": "String"}, {"default": "None", + "description": "The component specification as a dictionary", "name": "component_spec", + "type": "JsonObject"}, {"default": "None", "description": "The number of + rows to load per partition. Set to override the automatic partitioning", + "name": "input_partition_rows", "type": "String"}, {"default": "None", "description": + "The size of the output partition size, defaults to 250MB. Set to `disable` + to disable the automatic partitioning", "name": "output_partition_size", + "type": "String"}, {"description": "Storage arguments", "name": "storage_args", + "type": "String"}], "name": "First component", "outputs": [{"description": + "Path to the output manifest", "name": "output_manifest_path", "type": "String"}]}', + pipelines.kubeflow.org/component_ref: '{"digest": "561ddfe38aa8378f4ea92b26ef6bdeb53b1e9b2fc3c0908800738c304fdca30a"}', + pipelines.kubeflow.org/arguments.parameters: '{"component_spec": "{\"args\": + {\"storage_args\": {\"description\": \"Storage arguments\", \"type\": \"str\"}}, + \"description\": \"This is an example component\", \"image\": \"example_component:latest\", + \"name\": \"First component\", \"produces\": {\"captions\": {\"fields\": + {\"data\": {\"type\": \"string\"}}}, \"images\": {\"fields\": {\"data\": + {\"type\": \"binary\"}}}}}", "input_partition_rows": "disable", "metadata": + "{\"base_path\": \"/foo/bar\", \"run_id\": \"{{workflow.name}}\"}", "output_partition_size": + "disable", "storage_args": "a dummy string arg"}'} + - name: kf-pipeline + dag: + tasks: + - {name: first-component, template: first-component} + - name: second-component + template: second-component + dependencies: [first-component] + arguments: + artifacts: + - {name: first-component-output_manifest_path, from: '{{tasks.first-component.outputs.artifacts.first-component-output_manifest_path}}'} + - name: third-component + template: third-component + dependencies: [second-component] + arguments: + artifacts: + - {name: second-component-output_manifest_path, from: '{{tasks.second-component.outputs.artifacts.second-component-output_manifest_path}}'} + - name: second-component + container: + args: [] + command: [python3, main.py, --input_manifest_path, /tmp/inputs/input_manifest_path/data, + --metadata, '{"base_path": "/foo/bar", "run_id": "{{workflow.name}}"}', --component_spec, + '{"args": {"storage_args": {"description": "Storage arguments", "type": "str"}}, + "consumes": {"images": {"fields": {"data": {"type": "binary"}}}}, "description": + "This is an example component", "image": "example_component:latest", "name": + "Second component", "produces": {"embeddings": {"fields": {"data": {"items": + {"type": "float32"}, "type": "array"}}}}}', --input_partition_rows, '10', + --output_partition_size, 30MB, --storage_args, a dummy string arg, --output_manifest_path, + /tmp/outputs/output_manifest_path/data] + image: example_component:latest + inputs: + artifacts: + - {name: first-component-output_manifest_path, path: /tmp/inputs/input_manifest_path/data} + outputs: + artifacts: + - {name: second-component-output_manifest_path, path: /tmp/outputs/output_manifest_path/data} + metadata: + labels: + pipelines.kubeflow.org/kfp_sdk_version: 1.8.22 + pipelines.kubeflow.org/pipeline-sdk-type: kfp + pipelines.kubeflow.org/enable_caching: "true" + annotations: {pipelines.kubeflow.org/component_spec: '{"description": "This + is an example component", "implementation": {"container": {"command": ["python3", + "main.py", "--input_manifest_path", {"inputPath": "input_manifest_path"}, + "--metadata", {"inputValue": "metadata"}, "--component_spec", {"inputValue": + "component_spec"}, "--input_partition_rows", {"inputValue": "input_partition_rows"}, + "--output_partition_size", {"inputValue": "output_partition_size"}, "--storage_args", + {"inputValue": "storage_args"}, "--output_manifest_path", {"outputPath": + "output_manifest_path"}], "image": "example_component:latest"}}, "inputs": + [{"description": "Path to the input manifest", "name": "input_manifest_path", + "type": "String"}, {"description": "Metadata arguments containing the run + id and base path", "name": "metadata", "type": "String"}, {"default": "None", + "description": "The component specification as a dictionary", "name": "component_spec", + "type": "JsonObject"}, {"default": "None", "description": "The number of + rows to load per partition. Set to override the automatic partitioning", + "name": "input_partition_rows", "type": "String"}, {"default": "None", "description": + "The size of the output partition size, defaults to 250MB. Set to `disable` + to disable the automatic partitioning", "name": "output_partition_size", + "type": "String"}, {"description": "Storage arguments", "name": "storage_args", + "type": "String"}], "name": "Second component", "outputs": [{"description": + "Path to the output manifest", "name": "output_manifest_path", "type": "String"}]}', + pipelines.kubeflow.org/component_ref: '{"digest": "b20d3957f48cd2540e594e8c9f2f1f67f5a299152522c61a71f697f5e40278c7"}', + pipelines.kubeflow.org/arguments.parameters: '{"component_spec": "{\"args\": + {\"storage_args\": {\"description\": \"Storage arguments\", \"type\": \"str\"}}, + \"consumes\": {\"images\": {\"fields\": {\"data\": {\"type\": \"binary\"}}}}, + \"description\": \"This is an example component\", \"image\": \"example_component:latest\", + \"name\": \"Second component\", \"produces\": {\"embeddings\": {\"fields\": + {\"data\": {\"items\": {\"type\": \"float32\"}, \"type\": \"array\"}}}}}", + "input_partition_rows": "10", "metadata": "{\"base_path\": \"/foo/bar\", + \"run_id\": \"{{workflow.name}}\"}", "output_partition_size": "30MB", "storage_args": + "a dummy string arg"}'} + - name: third-component + container: + args: [] + command: [python3, main.py, --input_manifest_path, /tmp/inputs/input_manifest_path/data, + --metadata, '{"base_path": "/foo/bar", "run_id": "{{workflow.name}}"}', --component_spec, + '{"args": {"some_list": {"description": "Some list", "items": {"type": "int"}, + "type": "list"}, "storage_args": {"description": "Storage arguments", "type": + "str"}}, "consumes": {"captions": {"fields": {"data": {"type": "string"}}}, + "embeddings": {"fields": {"data": {"items": {"type": "float32"}, "type": + "array"}}}, "images": {"fields": {"data": {"type": "binary"}}}}, "description": + "This is an example component", "image": "example_component:latest", "name": + "Third component", "produces": {"additionalSubsets": false, "images": {"fields": + {"data": {"type": "binary"}}}}}', --input_partition_rows, None, --output_partition_size, + None, --storage_args, a dummy string arg, --some_list, '[1, 2, 3]', --output_manifest_path, + /tmp/outputs/output_manifest_path/data] + image: example_component:latest + inputs: + artifacts: + - {name: second-component-output_manifest_path, path: /tmp/inputs/input_manifest_path/data} + outputs: + artifacts: + - {name: third-component-output_manifest_path, path: /tmp/outputs/output_manifest_path/data} + metadata: + labels: + pipelines.kubeflow.org/kfp_sdk_version: 1.8.22 + pipelines.kubeflow.org/pipeline-sdk-type: kfp + pipelines.kubeflow.org/enable_caching: "true" + annotations: {pipelines.kubeflow.org/component_spec: '{"description": "This + is an example component", "implementation": {"container": {"command": ["python3", + "main.py", "--input_manifest_path", {"inputPath": "input_manifest_path"}, + "--metadata", {"inputValue": "metadata"}, "--component_spec", {"inputValue": + "component_spec"}, "--input_partition_rows", {"inputValue": "input_partition_rows"}, + "--output_partition_size", {"inputValue": "output_partition_size"}, "--storage_args", + {"inputValue": "storage_args"}, "--some_list", {"inputValue": "some_list"}, + "--output_manifest_path", {"outputPath": "output_manifest_path"}], "image": + "example_component:latest"}}, "inputs": [{"description": "Path to the input + manifest", "name": "input_manifest_path", "type": "String"}, {"description": + "Metadata arguments containing the run id and base path", "name": "metadata", + "type": "String"}, {"default": "None", "description": "The component specification + as a dictionary", "name": "component_spec", "type": "JsonObject"}, {"default": + "None", "description": "The number of rows to load per partition. Set to + override the automatic partitioning", "name": "input_partition_rows", "type": + "String"}, {"default": "None", "description": "The size of the output partition + size, defaults to 250MB. Set to `disable` to disable the automatic partitioning", + "name": "output_partition_size", "type": "String"}, {"description": "Storage + arguments", "name": "storage_args", "type": "String"}, {"description": "Some + list", "name": "some_list", "type": "JsonArray"}], "name": "Third component", + "outputs": [{"description": "Path to the output manifest", "name": "output_manifest_path", + "type": "String"}]}', pipelines.kubeflow.org/component_ref: '{"digest": + "936f0e13275cc8aab199925252dffe2720a01d94af50e5aa78bf9819ccb4ab27"}', pipelines.kubeflow.org/arguments.parameters: '{"component_spec": + "{\"args\": {\"some_list\": {\"description\": \"Some list\", \"items\": + {\"type\": \"int\"}, \"type\": \"list\"}, \"storage_args\": {\"description\": + \"Storage arguments\", \"type\": \"str\"}}, \"consumes\": {\"captions\": + {\"fields\": {\"data\": {\"type\": \"string\"}}}, \"embeddings\": {\"fields\": + {\"data\": {\"items\": {\"type\": \"float32\"}, \"type\": \"array\"}}}, + \"images\": {\"fields\": {\"data\": {\"type\": \"binary\"}}}}, \"description\": + \"This is an example component\", \"image\": \"example_component:latest\", + \"name\": \"Third component\", \"produces\": {\"additionalSubsets\": false, + \"images\": {\"fields\": {\"data\": {\"type\": \"binary\"}}}}}", "input_partition_rows": + "None", "metadata": "{\"base_path\": \"/foo/bar\", \"run_id\": \"{{workflow.name}}\"}", + "output_partition_size": "None", "some_list": "[1, 2, 3]", "storage_args": + "a dummy string arg"}'} + arguments: + parameters: [] + serviceAccountName: pipeline-runner diff --git a/tests/example_pipelines/compiled_pipeline/example_2/kubeflow_pipeline.yml b/tests/example_pipelines/compiled_pipeline/example_2/kubeflow_pipeline.yml new file mode 100644 index 000000000..e36ac99b6 --- /dev/null +++ b/tests/example_pipelines/compiled_pipeline/example_2/kubeflow_pipeline.yml @@ -0,0 +1,147 @@ +apiVersion: argoproj.io/v1alpha1 +kind: Workflow +metadata: + generateName: kf-pipeline- + annotations: {pipelines.kubeflow.org/kfp_sdk_version: 1.8.22, pipelines.kubeflow.org/pipeline_compilation_time: '2023-01-01T00:00:00', + pipelines.kubeflow.org/pipeline_spec: '{"name": "Kf pipeline"}'} + labels: {pipelines.kubeflow.org/kfp_sdk_version: 1.8.22} +spec: + entrypoint: kf-pipeline + templates: + - name: first-component + container: + args: [] + command: [python3, main.py, --input_manifest_path, /tmp/inputs/input_manifest_path/data, + --metadata, '{"base_path": "/foo/bar", "run_id": "{{workflow.name}}"}', --component_spec, + '{"args": {"storage_args": {"description": "Storage arguments", "type": "str"}}, + "description": "This is an example component", "image": "example_component:latest", + "name": "First component", "produces": {"captions": {"fields": {"data": + {"type": "string"}}}, "images": {"fields": {"data": {"type": "binary"}}}}}', + --input_partition_rows, None, --output_partition_size, None, --storage_args, + a dummy string arg, --output_manifest_path, /tmp/outputs/output_manifest_path/data] + image: example_component:latest + inputs: + artifacts: + - name: input_manifest_path + path: /tmp/inputs/input_manifest_path/data + raw: {data: ''} + outputs: + artifacts: + - {name: first-component-output_manifest_path, path: /tmp/outputs/output_manifest_path/data} + metadata: + labels: + pipelines.kubeflow.org/kfp_sdk_version: 1.8.22 + pipelines.kubeflow.org/pipeline-sdk-type: kfp + pipelines.kubeflow.org/enable_caching: "true" + annotations: {pipelines.kubeflow.org/component_spec: '{"description": "This + is an example component", "implementation": {"container": {"command": ["python3", + "main.py", "--input_manifest_path", {"inputPath": "input_manifest_path"}, + "--metadata", {"inputValue": "metadata"}, "--component_spec", {"inputValue": + "component_spec"}, "--input_partition_rows", {"inputValue": "input_partition_rows"}, + "--output_partition_size", {"inputValue": "output_partition_size"}, "--storage_args", + {"inputValue": "storage_args"}, "--output_manifest_path", {"outputPath": + "output_manifest_path"}], "image": "example_component:latest"}}, "inputs": + [{"description": "Path to the input manifest", "name": "input_manifest_path", + "type": "String"}, {"description": "Metadata arguments containing the run + id and base path", "name": "metadata", "type": "String"}, {"default": "None", + "description": "The component specification as a dictionary", "name": "component_spec", + "type": "JsonObject"}, {"default": "None", "description": "The number of + rows to load per partition. Set to override the automatic partitioning", + "name": "input_partition_rows", "type": "String"}, {"default": "None", "description": + "The size of the output partition size, defaults to 250MB. Set to `disable` + to disable the automatic partitioning", "name": "output_partition_size", + "type": "String"}, {"description": "Storage arguments", "name": "storage_args", + "type": "String"}], "name": "First component", "outputs": [{"description": + "Path to the output manifest", "name": "output_manifest_path", "type": "String"}]}', + pipelines.kubeflow.org/component_ref: '{"digest": "561ddfe38aa8378f4ea92b26ef6bdeb53b1e9b2fc3c0908800738c304fdca30a"}', + pipelines.kubeflow.org/arguments.parameters: '{"component_spec": "{\"args\": + {\"storage_args\": {\"description\": \"Storage arguments\", \"type\": \"str\"}}, + \"description\": \"This is an example component\", \"image\": \"example_component:latest\", + \"name\": \"First component\", \"produces\": {\"captions\": {\"fields\": + {\"data\": {\"type\": \"string\"}}}, \"images\": {\"fields\": {\"data\": + {\"type\": \"binary\"}}}}}", "input_partition_rows": "None", "metadata": + "{\"base_path\": \"/foo/bar\", \"run_id\": \"{{workflow.name}}\"}", "output_partition_size": + "None", "storage_args": "a dummy string arg"}'} + - name: image-cropping + container: + args: [] + command: [python3, main.py, --input_manifest_path, /tmp/inputs/input_manifest_path/data, + --metadata, '{"base_path": "/foo/bar", "run_id": "{{workflow.name}}"}', --component_spec, + '{"args": {"cropping_threshold": {"default": -30, "description": "Threshold + parameter used for detecting borders. A lower (negative) parameter results + in a more performant border detection, but can cause overcropping. Default + is -30", "type": "int"}, "padding": {"default": 10, "description": "Padding + for the image cropping. The padding is added to all borders of the image.", + "type": "int"}}, "consumes": {"images": {"fields": {"data": {"type": "binary"}}}}, + "description": "Component that removes single-colored borders around images + and crops them appropriately", "image": "ghcr.io/ml6team/image_cropping:dev", + "name": "Image cropping", "produces": {"images": {"fields": {"data": {"type": + "binary"}, "height": {"type": "int16"}, "width": {"type": "int16"}}}}}', + --input_partition_rows, None, --output_partition_size, None, --cropping_threshold, + '0', --padding, '0', --output_manifest_path, /tmp/outputs/output_manifest_path/data] + image: ghcr.io/ml6team/image_cropping:dev + inputs: + artifacts: + - {name: first-component-output_manifest_path, path: /tmp/inputs/input_manifest_path/data} + outputs: + artifacts: + - {name: image-cropping-output_manifest_path, path: /tmp/outputs/output_manifest_path/data} + metadata: + labels: + pipelines.kubeflow.org/kfp_sdk_version: 1.8.22 + pipelines.kubeflow.org/pipeline-sdk-type: kfp + pipelines.kubeflow.org/enable_caching: "true" + annotations: {pipelines.kubeflow.org/component_spec: '{"description": "Component + that removes single-colored borders around images and crops them appropriately", + "implementation": {"container": {"command": ["python3", "main.py", "--input_manifest_path", + {"inputPath": "input_manifest_path"}, "--metadata", {"inputValue": "metadata"}, + "--component_spec", {"inputValue": "component_spec"}, "--input_partition_rows", + {"inputValue": "input_partition_rows"}, "--output_partition_size", {"inputValue": + "output_partition_size"}, "--cropping_threshold", {"inputValue": "cropping_threshold"}, + "--padding", {"inputValue": "padding"}, "--output_manifest_path", {"outputPath": + "output_manifest_path"}], "image": "ghcr.io/ml6team/image_cropping:dev"}}, + "inputs": [{"description": "Path to the input manifest", "name": "input_manifest_path", + "type": "String"}, {"description": "Metadata arguments containing the run + id and base path", "name": "metadata", "type": "String"}, {"default": "None", + "description": "The component specification as a dictionary", "name": "component_spec", + "type": "JsonObject"}, {"default": "None", "description": "The number of + rows to load per partition. Set to override the automatic partitioning", + "name": "input_partition_rows", "type": "String"}, {"default": "None", "description": + "The size of the output partition size, defaults to 250MB. Set to `disable` + to disable the automatic partitioning", "name": "output_partition_size", + "type": "String"}, {"default": -30, "description": "Threshold parameter + used for detecting borders. A lower (negative) parameter results in a more + performant border detection, but can cause overcropping. Default is -30", + "name": "cropping_threshold", "type": "Integer"}, {"default": 10, "description": + "Padding for the image cropping. The padding is added to all borders of + the image.", "name": "padding", "type": "Integer"}], "name": "Image cropping", + "outputs": [{"description": "Path to the output manifest", "name": "output_manifest_path", + "type": "String"}]}', pipelines.kubeflow.org/component_ref: '{"digest": + "08066cab55f911d084af84cc08a555794a9cf94569fe4b991836a39fd3f76f86"}', pipelines.kubeflow.org/arguments.parameters: '{"component_spec": + "{\"args\": {\"cropping_threshold\": {\"default\": -30, \"description\": + \"Threshold parameter used for detecting borders. A lower (negative) parameter + results in a more performant border detection, but can cause overcropping. + Default is -30\", \"type\": \"int\"}, \"padding\": {\"default\": 10, \"description\": + \"Padding for the image cropping. The padding is added to all borders of + the image.\", \"type\": \"int\"}}, \"consumes\": {\"images\": {\"fields\": + {\"data\": {\"type\": \"binary\"}}}}, \"description\": \"Component that + removes single-colored borders around images and crops them appropriately\", + \"image\": \"ghcr.io/ml6team/image_cropping:dev\", \"name\": \"Image cropping\", + \"produces\": {\"images\": {\"fields\": {\"data\": {\"type\": \"binary\"}, + \"height\": {\"type\": \"int16\"}, \"width\": {\"type\": \"int16\"}}}}}", + "cropping_threshold": "0", "input_partition_rows": "None", "metadata": "{\"base_path\": + \"/foo/bar\", \"run_id\": \"{{workflow.name}}\"}", "output_partition_size": + "None", "padding": "0"}'} + - name: kf-pipeline + dag: + tasks: + - {name: first-component, template: first-component} + - name: image-cropping + template: image-cropping + dependencies: [first-component] + arguments: + artifacts: + - {name: first-component-output_manifest_path, from: '{{tasks.first-component.outputs.artifacts.first-component-output_manifest_path}}'} + arguments: + parameters: [] + serviceAccountName: pipeline-runner diff --git a/tests/example_pipelines/compiled_pipeline/kubeflow_pipeline.yml b/tests/example_pipelines/compiled_pipeline/kubeflow_pipeline.yml new file mode 100644 index 000000000..0345039cc --- /dev/null +++ b/tests/example_pipelines/compiled_pipeline/kubeflow_pipeline.yml @@ -0,0 +1,80 @@ +apiVersion: argoproj.io/v1alpha1 +kind: Workflow +metadata: + generateName: kf-pipeline- + annotations: {pipelines.kubeflow.org/kfp_sdk_version: 1.8.22, pipelines.kubeflow.org/pipeline_compilation_time: '2023-01-01T00:00:00', + pipelines.kubeflow.org/pipeline_spec: '{"name": "Kf pipeline"}'} + labels: {pipelines.kubeflow.org/kfp_sdk_version: 1.8.22} +spec: + entrypoint: kf-pipeline + templates: + - name: first-component + container: + args: [] + command: [python3, main.py, --input_manifest_path, /tmp/inputs/input_manifest_path/data, + --metadata, '{"base_path": "/foo/bar", "run_id": "{{workflow.name}}"}', --component_spec, + '{"args": {"storage_args": {"description": "Storage arguments", "type": "str"}}, + "description": "This is an example component", "image": "example_component:latest", + "name": "First component", "produces": {"captions": {"fields": {"data": + {"type": "string"}}}, "images": {"fields": {"data": {"type": "binary"}}}}}', + --input_partition_rows, None, --output_partition_size, None, --storage_args, + a dummy string arg, --output_manifest_path, /tmp/outputs/output_manifest_path/data] + image: example_component:latest + resources: + limits: {nvidia.com/gpu: 1} + requests: {ephemeral-storage: 1Gi} + volumeMounts: + - {mountPath: /mnt, name: mypvc} + inputs: + artifacts: + - name: input_manifest_path + path: /tmp/inputs/input_manifest_path/data + raw: {data: ''} + outputs: + artifacts: + - {name: first-component-output_manifest_path, path: /tmp/outputs/output_manifest_path/data} + nodeSelector: {node_pool: a_node_pool} + metadata: + labels: + pipelines.kubeflow.org/kfp_sdk_version: 1.8.22 + pipelines.kubeflow.org/pipeline-sdk-type: kfp + pipelines.kubeflow.org/enable_caching: "true" + annotations: {pipelines.kubeflow.org/component_spec: '{"description": "This + is an example component", "implementation": {"container": {"command": ["python3", + "main.py", "--input_manifest_path", {"inputPath": "input_manifest_path"}, + "--metadata", {"inputValue": "metadata"}, "--component_spec", {"inputValue": + "component_spec"}, "--input_partition_rows", {"inputValue": "input_partition_rows"}, + "--output_partition_size", {"inputValue": "output_partition_size"}, "--storage_args", + {"inputValue": "storage_args"}, "--output_manifest_path", {"outputPath": + "output_manifest_path"}], "image": "example_component:latest"}}, "inputs": + [{"description": "Path to the input manifest", "name": "input_manifest_path", + "type": "String"}, {"description": "Metadata arguments containing the run + id and base path", "name": "metadata", "type": "String"}, {"default": "None", + "description": "The component specification as a dictionary", "name": "component_spec", + "type": "JsonObject"}, {"default": "None", "description": "The number of + rows to load per partition. Set to override the automatic partitioning", + "name": "input_partition_rows", "type": "String"}, {"default": "None", "description": + "The size of the output partition size, defaults to 250MB. Set to `disable` + to disable the automatic partitioning", "name": "output_partition_size", + "type": "String"}, {"description": "Storage arguments", "name": "storage_args", + "type": "String"}], "name": "First component", "outputs": [{"description": + "Path to the output manifest", "name": "output_manifest_path", "type": "String"}]}', + pipelines.kubeflow.org/component_ref: '{"digest": "561ddfe38aa8378f4ea92b26ef6bdeb53b1e9b2fc3c0908800738c304fdca30a"}', + pipelines.kubeflow.org/arguments.parameters: '{"component_spec": "{\"args\": + {\"storage_args\": {\"description\": \"Storage arguments\", \"type\": \"str\"}}, + \"description\": \"This is an example component\", \"image\": \"example_component:latest\", + \"name\": \"First component\", \"produces\": {\"captions\": {\"fields\": + {\"data\": {\"type\": \"string\"}}}, \"images\": {\"fields\": {\"data\": + {\"type\": \"binary\"}}}}}", "input_partition_rows": "None", "metadata": + "{\"base_path\": \"/foo/bar\", \"run_id\": \"{{workflow.name}}\"}", "output_partition_size": + "None", "storage_args": "a dummy string arg"}'} + volumes: + - emptyDir: {} + name: mypvc + - name: kf-pipeline + dag: + tasks: + - {name: first-component, template: first-component} + arguments: + parameters: [] + serviceAccountName: pipeline-runner diff --git a/tests/example_pipelines/invalid_pipeline/example_3/first_component/fondant_component.yaml b/tests/example_pipelines/invalid_pipeline/example_3/first_component/fondant_component.yaml new file mode 100644 index 000000000..45964a8c6 --- /dev/null +++ b/tests/example_pipelines/invalid_pipeline/example_3/first_component/fondant_component.yaml @@ -0,0 +1,24 @@ +name: First component +description: This is an example component +image: example_component:latest + +consumes: + images: + fields: + data: + type: binary + +produces: + captions: + fields: + data: + type: string + + images: + fields: + data: + type: binary +args: + storage_args: + description: Storage arguments + type: str \ No newline at end of file diff --git a/tests/example_pipelines/invalid_pipeline/example_3/second_component/fondant_component.yaml b/tests/example_pipelines/invalid_pipeline/example_3/second_component/fondant_component.yaml new file mode 100644 index 000000000..c02abbaa1 --- /dev/null +++ b/tests/example_pipelines/invalid_pipeline/example_3/second_component/fondant_component.yaml @@ -0,0 +1,27 @@ +name: Second component +description: This is an example component +image: example_component:latest + +consumes: + images: + fields: + data: + type: string + + captions: + fields: + data: + type: string + +produces: + embeddings: + fields: + data: + type: array + items: + type: float32 + +args: + storage_args: + description: Storage arguments + type: str \ No newline at end of file diff --git a/tests/test_compiler.py b/tests/test_compiler.py index 4bdfba732..ef0105a5e 100644 --- a/tests/test_compiler.py +++ b/tests/test_compiler.py @@ -1,14 +1,16 @@ import datetime +import sys from pathlib import Path +from unittest import mock import pytest import yaml -from fondant.compiler import DockerCompiler +from fondant.compiler import DockerCompiler, KubeFlowCompiler from fondant.pipeline import ComponentOp, Pipeline COMPONENTS_PATH = Path("./tests/example_pipelines/valid_pipeline") -VALID_DOCKER_PIPELINE = Path("./tests/example_pipelines/compiled_pipeline/") +VALID_PIPELINE = Path("./tests/example_pipelines/compiled_pipeline/") TEST_PIPELINES = [ ( @@ -79,8 +81,6 @@ def setup_pipeline(request, tmp_path, monkeypatch): pipeline.add_op(component, dependencies=prev_comp) prev_comp = component - pipeline.compile() - # override the default package_path with temporary path to avoid the creation of artifacts monkeypatch.setattr(pipeline, "package_path", str(tmp_path / "test_pipeline.tgz")) @@ -96,7 +96,7 @@ def test_docker_compiler(setup_pipeline, tmp_path_factory): output_path = str(fn / "docker-compose.yml") compiler.compile(pipeline=pipeline, output_path=output_path, build_args=[]) with open(output_path) as src, open( - VALID_DOCKER_PIPELINE / example_dir / "docker-compose.yml", + VALID_PIPELINE / example_dir / "docker-compose.yml", ) as truth: assert yaml.safe_load(src) == yaml.safe_load(truth) @@ -184,3 +184,55 @@ def test_docker_extra_volumes(setup_pipeline, tmp_path_factory): assert all( extra_volume in service["volumes"] for extra_volume in extra_volumes ) + + +@pytest.mark.usefixtures("_freeze_time") +def test_kubeflow_compiler(setup_pipeline, tmp_path_factory): + """Test compiling a pipeline to kubeflow.""" + example_dir, pipeline = setup_pipeline + compiler = KubeFlowCompiler() + with tmp_path_factory.mktemp("temp") as fn: + output_path = str(fn / "kubeflow_pipeline.yml") + compiler.compile(pipeline=pipeline, output_path=output_path) + with open(output_path) as src, open( + VALID_PIPELINE / example_dir / "kubeflow_pipeline.yml", + ) as truth: + assert src.read() == truth.read() + + +@pytest.mark.usefixtures("_freeze_time") +def test_kubeflow_configuration(tmp_path_factory): + """Test that the kubeflow pipeline can be configured.""" + from kfp.dsl import PipelineVolume + + pipeline = Pipeline( + pipeline_name="test_pipeline", + pipeline_description="description of the test pipeline", + base_path="/foo/bar", + ) + component_1 = ComponentOp( + Path(COMPONENTS_PATH / "example_1" / "first_component"), + arguments={"storage_args": "a dummy string arg"}, + node_pool_name="a_node_pool", + number_of_gpus=1, + p_volumes={"/mnt": PipelineVolume(name="mypvc", empty_dir={})}, + ephemeral_storage_size="1Gi", + ) + pipeline.add_op(component_1) + compiler = KubeFlowCompiler() + with tmp_path_factory.mktemp("temp") as fn: + output_path = str(fn / "kubeflow_pipeline.yml") + compiler.compile(pipeline=pipeline, output_path=output_path) + with open(output_path) as src, open( + VALID_PIPELINE / "kubeflow_pipeline.yml", + ) as truth: + assert src.read() == truth.read() + + +def test_kfp_import(): + """Test that the kfp import throws the correct error.""" + with mock.patch.dict(sys.modules): + # remove kfp from the modules + sys.modules["kfp"] = None + with pytest.raises(ImportError): + _ = KubeFlowCompiler() diff --git a/tests/test_pipeline.py b/tests/test_pipeline.py index e9eea77c5..0d4c55b14 100644 --- a/tests/test_pipeline.py +++ b/tests/test_pipeline.py @@ -132,7 +132,7 @@ def test_valid_pipeline( assert pipeline._graph["Second component"]["dependencies"] == ["First component"] assert pipeline._graph["Third component"]["dependencies"] == ["Second component"] - pipeline.compile() + pipeline._validate_pipeline_definition("test_pipeline") @pytest.mark.parametrize( @@ -144,11 +144,7 @@ def test_valid_pipeline( ), ], ) -def test_invalid_pipeline_dependencies( - default_pipeline_args, - valid_pipeline_example, - tmp_path, -): +def test_invalid_pipeline_dependencies(default_pipeline_args, valid_pipeline_example): """ Test that an InvalidPipelineDefinition exception is raised when attempting to create a pipeline with more than one operation defined without dependencies. @@ -183,12 +179,12 @@ def test_invalid_pipeline_dependencies( [ ("example_1", ["first_component", "second_component"]), ("example_2", ["first_component", "second_component"]), + ("example_3", ["first_component", "second_component"]), ], ) def test_invalid_pipeline_compilation( default_pipeline_args, invalid_pipeline_example, - tmp_path, ): """ Test that an InvalidPipelineDefinition exception is raised when attempting to compile @@ -213,32 +209,40 @@ def test_invalid_pipeline_compilation( pipeline.add_op(second_component_op, dependencies=first_component_op) with pytest.raises(InvalidPipelineDefinition): - pipeline.compile() + pipeline._validate_pipeline_definition("test_pipeline") -@pytest.mark.parametrize( - "invalid_component_args", - [ - {"invalid_arg": "a dummy string arg", "storage_args": "a dummy string arg"}, - {"args": 1, "storage_args": "a dummy string arg"}, - ], -) -def test_invalid_argument(default_pipeline_args, invalid_component_args, tmp_path): +def test_invalid_pipeline_composition(default_pipeline_args): """ - Test that an exception is raised when the passed invalid argument name or type to the fondant - component does not match the ones specified in the fondant specifications. + Test that an InvalidPipelineDefinition exception is raised when attempting to compile + an invalid pipeline definition. """ - component_operation = ComponentOp( - valid_pipeline_path / "example_1" / "first_component", - arguments=invalid_component_args, - ) + components_path = Path(invalid_pipeline_path / "example_1") + component_args = {"storage_args": "a dummy string arg"} - pipeline = Pipeline(**default_pipeline_args) + first_component_op = ComponentOp( + Path(components_path / "first_component"), + arguments=component_args, + ) + second_component_op = ComponentOp( + Path(components_path / "second_component"), + arguments=component_args, + ) - pipeline.add_op(component_operation) + # double dependency + pipeline1 = Pipeline(**default_pipeline_args) + pipeline1.add_op(first_component_op) + with pytest.raises(InvalidPipelineDefinition): + pipeline1.add_op( + second_component_op, + dependencies=[first_component_op, first_component_op], + ) - with pytest.raises((ValueError, TypeError)): - pipeline.compile() + # 2 components with no dependencies + pipeline2 = Pipeline(**default_pipeline_args) + pipeline2.add_op(first_component_op) + with pytest.raises(InvalidPipelineDefinition): + pipeline2.add_op(second_component_op) def test_reusable_component_op(): From 5cc072f96a5c1116b8c2a392f43acc4eb7899a36 Mon Sep 17 00:00:00 2001 From: Georges Lorre Date: Tue, 8 Aug 2023 14:49:20 +0200 Subject: [PATCH 5/8] Make validate method for pipeline and call it from the compilers --- src/fondant/compiler.py | 8 +++++--- src/fondant/components | 1 + src/fondant/pipeline.py | 14 ++++++++++++++ src/fondant/runner.py | 31 ------------------------------- tests/test_compiler.py | 1 + tests/test_pipeline.py | 9 ++++----- 6 files changed, 25 insertions(+), 39 deletions(-) create mode 120000 src/fondant/components diff --git a/src/fondant/compiler.py b/src/fondant/compiler.py index 52ce263eb..6c1c76ad8 100644 --- a/src/fondant/compiler.py +++ b/src/fondant/compiler.py @@ -134,10 +134,13 @@ def _generate_spec( """ timestamp = datetime.datetime.now().strftime("%Y%m%d%H%M%S") path, volume = self._patch_path(base_path=pipeline.base_path) - metadata = MetaData(run_id=f"{pipeline.name}-{timestamp}", base_path=path) + run_id = f"{pipeline.name}-{timestamp}" + metadata = MetaData(run_id=run_id, base_path=path) services = {} + pipeline.validate(run_id=run_id) + for component_name, component in pipeline._graph.items(): logger.info(f"Compiling service for {component_name}") safe_component_name = self._safe_component_name(component_name) @@ -240,8 +243,7 @@ def compile( output_path: the path where to save the Kubeflow pipeline spec """ self.pipeline = pipeline - self.pipeline.sort_graph() - self.pipeline._validate_pipeline_definition("{{workflow.name}}") + self.pipeline.validate(run_id="{{workflow.name}}") logger.info(f"Compiling {self.pipeline.name} to {output_path}") wrapped_pipeline = (self.kfp.dsl.pipeline())(self.kf_pipeline) # type: ignore self.kfp.compiler.Compiler().compile(wrapped_pipeline, output_path) # type: ignore diff --git a/src/fondant/components b/src/fondant/components new file mode 120000 index 000000000..6e10371d3 --- /dev/null +++ b/src/fondant/components @@ -0,0 +1 @@ +../../components \ No newline at end of file diff --git a/src/fondant/pipeline.py b/src/fondant/pipeline.py index 66b08bab5..2b2a3dcd0 100644 --- a/src/fondant/pipeline.py +++ b/src/fondant/pipeline.py @@ -240,6 +240,7 @@ def add_op( def sort_graph(self): """Sort the graph topologically based on task dependencies.""" + logger.info("Sorting pipeline component graph topologically.") sorted_graph = [] visited = set() @@ -269,6 +270,15 @@ def _validate_pipeline_name(pipeline_name: str) -> str: raise InvalidPipelineDefinition(msg) return pipeline_name + def validate(self, run_id: str): + """Sort and run validation on the pipeline definition. + + Args: + run_id (str, optional): run identifier. Defaults to None. + """ + self.sort_graph() + self._validate_pipeline_definition(run_id) + def _validate_pipeline_definition(self, run_id: str): """ Validates the pipeline definition by ensuring that the consumed and produced subsets and @@ -281,6 +291,10 @@ def _validate_pipeline_definition(self, run_id: str): base_path: the base path where to store the pipelines artifacts run_id: the run id of the component """ + if len(self._graph.keys()) == 0: + logger.info("No components defined in the pipeline. Nothing to validate.") + return + # TODO: change later if we decide to run 2 fondant pipelines after each other load_component = True load_component_name = list(self._graph.keys())[0] diff --git a/src/fondant/runner.py b/src/fondant/runner.py index 286c2922a..a5311f7dc 100644 --- a/src/fondant/runner.py +++ b/src/fondant/runner.py @@ -29,34 +29,3 @@ def run(self, input_spec: str, *args, **kwargs): ] subprocess.call(cmd) # nosec - - -class KubeflowRunner(Runner): - def __init__(self): - self._resolve_imports() - - @abstractmethod - def _resolve_imports(self): - """Resolve imports for the Kubeflow compiler.""" - try: - global kfp - import kfp - except ImportError: - raise ImportError( - "You need to install kfp to use the Kubeflow compiler, " - / "you can install it with `pip install --extras kfp`", - ) - - def run(cls, input_spec: str, host: str, *args, **kwargs): - """Run a kubeflow pipeline.""" - pass - # client = kfp.Client(host=host) - # # TODO add logic to see if pipeline exists - # pipeline_spec = client.run_pipeline( - # experiment_id=experiment.id, - # job_name=run_name, - # pipeline_package_path=pipeline.package_path, - # ) - - # pipeline_url = f"{self.host}/#/runs/details/{pipeline_spec.id}" - # logger.info(f"Pipeline is running at: {pipeline_url}") diff --git a/tests/test_compiler.py b/tests/test_compiler.py index ef0105a5e..553069a42 100644 --- a/tests/test_compiler.py +++ b/tests/test_compiler.py @@ -214,6 +214,7 @@ def test_kubeflow_configuration(tmp_path_factory): Path(COMPONENTS_PATH / "example_1" / "first_component"), arguments={"storage_args": "a dummy string arg"}, node_pool_name="a_node_pool", + node_pool_label="a_node_pool_label", number_of_gpus=1, p_volumes={"/mnt": PipelineVolume(name="mypvc", empty_dir={})}, ephemeral_storage_size="1Gi", diff --git a/tests/test_pipeline.py b/tests/test_pipeline.py index 0d4c55b14..b4270e725 100644 --- a/tests/test_pipeline.py +++ b/tests/test_pipeline.py @@ -182,13 +182,12 @@ def test_invalid_pipeline_dependencies(default_pipeline_args, valid_pipeline_exa ("example_3", ["first_component", "second_component"]), ], ) -def test_invalid_pipeline_compilation( +def test_invalid_pipeline_declaration( default_pipeline_args, invalid_pipeline_example, ): - """ - Test that an InvalidPipelineDefinition exception is raised when attempting to compile - an invalid pipeline definition. + """Test that an InvalidPipelineDefinition exception is raised when attempting + to register invalid components combinations. """ example_dir, component_names = invalid_pipeline_example components_path = Path(invalid_pipeline_path / example_dir) @@ -212,7 +211,7 @@ def test_invalid_pipeline_compilation( pipeline._validate_pipeline_definition("test_pipeline") -def test_invalid_pipeline_composition(default_pipeline_args): +def test_invalid_pipeline_validation(default_pipeline_args): """ Test that an InvalidPipelineDefinition exception is raised when attempting to compile an invalid pipeline definition. From 48434d1c79f0ac3db2a758a26b372e073a2652f3 Mon Sep 17 00:00:00 2001 From: Georges Lorre Date: Tue, 8 Aug 2023 15:11:50 +0200 Subject: [PATCH 6/8] Fix test after rebase --- .../compiled_pipeline/example_2/kubeflow_pipeline.yml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/example_pipelines/compiled_pipeline/example_2/kubeflow_pipeline.yml b/tests/example_pipelines/compiled_pipeline/example_2/kubeflow_pipeline.yml index e36ac99b6..56c8df280 100644 --- a/tests/example_pipelines/compiled_pipeline/example_2/kubeflow_pipeline.yml +++ b/tests/example_pipelines/compiled_pipeline/example_2/kubeflow_pipeline.yml @@ -76,7 +76,7 @@ spec: "description": "Component that removes single-colored borders around images and crops them appropriately", "image": "ghcr.io/ml6team/image_cropping:dev", "name": "Image cropping", "produces": {"images": {"fields": {"data": {"type": - "binary"}, "height": {"type": "int16"}, "width": {"type": "int16"}}}}}', + "binary"}, "height": {"type": "int32"}, "width": {"type": "int32"}}}}}', --input_partition_rows, None, --output_partition_size, None, --cropping_threshold, '0', --padding, '0', --output_manifest_path, /tmp/outputs/output_manifest_path/data] image: ghcr.io/ml6team/image_cropping:dev @@ -128,7 +128,7 @@ spec: removes single-colored borders around images and crops them appropriately\", \"image\": \"ghcr.io/ml6team/image_cropping:dev\", \"name\": \"Image cropping\", \"produces\": {\"images\": {\"fields\": {\"data\": {\"type\": \"binary\"}, - \"height\": {\"type\": \"int16\"}, \"width\": {\"type\": \"int16\"}}}}}", + \"height\": {\"type\": \"int32\"}, \"width\": {\"type\": \"int32\"}}}}}", "cropping_threshold": "0", "input_partition_rows": "None", "metadata": "{\"base_path\": \"/foo/bar\", \"run_id\": \"{{workflow.name}}\"}", "output_partition_size": "None", "padding": "0"}'} From 17a569d547dacf2a045d2d3f1e074a3e754ada2c Mon Sep 17 00:00:00 2001 From: Georges Lorre Date: Tue, 8 Aug 2023 16:00:58 +0200 Subject: [PATCH 7/8] Rename kf to kfp --- src/fondant/compiler.py | 22 ++++++------------- .../example_1/kubeflow_pipeline.yml | 8 +++---- .../example_2/kubeflow_pipeline.yml | 8 +++---- .../compiled_pipeline/kubeflow_pipeline.yml | 8 +++---- 4 files changed, 19 insertions(+), 27 deletions(-) diff --git a/src/fondant/compiler.py b/src/fondant/compiler.py index 6c1c76ad8..ed28001f9 100644 --- a/src/fondant/compiler.py +++ b/src/fondant/compiler.py @@ -245,11 +245,11 @@ def compile( self.pipeline = pipeline self.pipeline.validate(run_id="{{workflow.name}}") logger.info(f"Compiling {self.pipeline.name} to {output_path}") - wrapped_pipeline = (self.kfp.dsl.pipeline())(self.kf_pipeline) # type: ignore + wrapped_pipeline = (self.kfp.dsl.pipeline())(self.kfp_pipeline) # type: ignore self.kfp.compiler.Compiler().compile(wrapped_pipeline, output_path) # type: ignore logger.info("Pipeline compiled successfully") - def kf_pipeline(self): + def kfp_pipeline(self): previous_component_task = None manifest_path = "" for component_name, component in self.pipeline._graph.items(): @@ -268,19 +268,11 @@ def kf_pipeline(self): {"base_path": self.pipeline.base_path, "run_id": "{{workflow.name}}"}, ) - if previous_component_task is not None: - component_task = kubeflow_component_op( - input_manifest_path=manifest_path, - metadata=metadata, - **component_args, - ) - else: - # Add metadata to the first component - component_task = kubeflow_component_op( - input_manifest_path=manifest_path, - metadata=metadata, - **component_args, - ) + component_task = kubeflow_component_op( + input_manifest_path=manifest_path, + metadata=metadata, + **component_args, + ) # Set optional configurations component_task = self._set_configuration( component_task, diff --git a/tests/example_pipelines/compiled_pipeline/example_1/kubeflow_pipeline.yml b/tests/example_pipelines/compiled_pipeline/example_1/kubeflow_pipeline.yml index b7338233c..f8127c8e6 100644 --- a/tests/example_pipelines/compiled_pipeline/example_1/kubeflow_pipeline.yml +++ b/tests/example_pipelines/compiled_pipeline/example_1/kubeflow_pipeline.yml @@ -1,12 +1,12 @@ apiVersion: argoproj.io/v1alpha1 kind: Workflow metadata: - generateName: kf-pipeline- + generateName: kfp-pipeline- annotations: {pipelines.kubeflow.org/kfp_sdk_version: 1.8.22, pipelines.kubeflow.org/pipeline_compilation_time: '2023-01-01T00:00:00', - pipelines.kubeflow.org/pipeline_spec: '{"name": "Kf pipeline"}'} + pipelines.kubeflow.org/pipeline_spec: '{"name": "Kfp pipeline"}'} labels: {pipelines.kubeflow.org/kfp_sdk_version: 1.8.22} spec: - entrypoint: kf-pipeline + entrypoint: kfp-pipeline templates: - name: first-component container: @@ -62,7 +62,7 @@ spec: {\"type\": \"binary\"}}}}}", "input_partition_rows": "disable", "metadata": "{\"base_path\": \"/foo/bar\", \"run_id\": \"{{workflow.name}}\"}", "output_partition_size": "disable", "storage_args": "a dummy string arg"}'} - - name: kf-pipeline + - name: kfp-pipeline dag: tasks: - {name: first-component, template: first-component} diff --git a/tests/example_pipelines/compiled_pipeline/example_2/kubeflow_pipeline.yml b/tests/example_pipelines/compiled_pipeline/example_2/kubeflow_pipeline.yml index 56c8df280..4bb853ea2 100644 --- a/tests/example_pipelines/compiled_pipeline/example_2/kubeflow_pipeline.yml +++ b/tests/example_pipelines/compiled_pipeline/example_2/kubeflow_pipeline.yml @@ -1,12 +1,12 @@ apiVersion: argoproj.io/v1alpha1 kind: Workflow metadata: - generateName: kf-pipeline- + generateName: kfp-pipeline- annotations: {pipelines.kubeflow.org/kfp_sdk_version: 1.8.22, pipelines.kubeflow.org/pipeline_compilation_time: '2023-01-01T00:00:00', - pipelines.kubeflow.org/pipeline_spec: '{"name": "Kf pipeline"}'} + pipelines.kubeflow.org/pipeline_spec: '{"name": "Kfp pipeline"}'} labels: {pipelines.kubeflow.org/kfp_sdk_version: 1.8.22} spec: - entrypoint: kf-pipeline + entrypoint: kfp-pipeline templates: - name: first-component container: @@ -132,7 +132,7 @@ spec: "cropping_threshold": "0", "input_partition_rows": "None", "metadata": "{\"base_path\": \"/foo/bar\", \"run_id\": \"{{workflow.name}}\"}", "output_partition_size": "None", "padding": "0"}'} - - name: kf-pipeline + - name: kfp-pipeline dag: tasks: - {name: first-component, template: first-component} diff --git a/tests/example_pipelines/compiled_pipeline/kubeflow_pipeline.yml b/tests/example_pipelines/compiled_pipeline/kubeflow_pipeline.yml index 0345039cc..907cea309 100644 --- a/tests/example_pipelines/compiled_pipeline/kubeflow_pipeline.yml +++ b/tests/example_pipelines/compiled_pipeline/kubeflow_pipeline.yml @@ -1,12 +1,12 @@ apiVersion: argoproj.io/v1alpha1 kind: Workflow metadata: - generateName: kf-pipeline- + generateName: kfp-pipeline- annotations: {pipelines.kubeflow.org/kfp_sdk_version: 1.8.22, pipelines.kubeflow.org/pipeline_compilation_time: '2023-01-01T00:00:00', - pipelines.kubeflow.org/pipeline_spec: '{"name": "Kf pipeline"}'} + pipelines.kubeflow.org/pipeline_spec: '{"name": "Kfp pipeline"}'} labels: {pipelines.kubeflow.org/kfp_sdk_version: 1.8.22} spec: - entrypoint: kf-pipeline + entrypoint: kfp-pipeline templates: - name: first-component container: @@ -71,7 +71,7 @@ spec: volumes: - emptyDir: {} name: mypvc - - name: kf-pipeline + - name: kfp-pipeline dag: tasks: - {name: first-component, template: first-component} From 06c23eb3eb321b2c080bddbe537c151ff0fb7ed5 Mon Sep 17 00:00:00 2001 From: Georges Lorre Date: Thu, 10 Aug 2023 10:50:23 +0200 Subject: [PATCH 8/8] Update ImportError message --- src/fondant/compiler.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/fondant/compiler.py b/src/fondant/compiler.py index ed28001f9..2e7bba08b 100644 --- a/src/fondant/compiler.py +++ b/src/fondant/compiler.py @@ -226,7 +226,7 @@ def _resolve_imports(self): self.kfp = kfp except ImportError: msg = """You need to install kfp to use the Kubeflow compiler,\n - you can install it with `pip install --extras kfp`""" + you can install it with `pip install --extras pipelines`""" raise ImportError( msg, )