From c9f84a8457a840896d648b7bc2fc814ea2edbd40 Mon Sep 17 00:00:00 2001 From: Philippe Moussalli Date: Wed, 8 Nov 2023 14:20:19 +0100 Subject: [PATCH] Granular compiler tests (#589) Addresses https://github.com/ml6team/fondant-use-cases/issues/46 For each compiler we now have a method `get_pipeline_configs()` to fetch the essential information about a pipeline (component, arguments, dependencies, hardware, ...) from a compiled specification file. In this way, we no longer face issues with having to modify the specification files after every small modification to the framework. It also makes it easier to call the desired attribute without having to dive deep into the schema of the compiler. --- src/fondant/core/component_spec.py | 32 +- src/fondant/pipeline/compiler.py | 3 + src/fondant/pipeline/pipeline.py | 2 +- src/fondant/testing.py | 265 ++++++++++++++ .../example_1/docker-compose.yml | 94 ----- .../example_1/kubeflow_pipeline.yml | 335 ------------------ .../example_1/vertex_pipeline.yml | 334 ----------------- .../example_2/docker-compose.yml | 75 ---- .../example_2/kubeflow_pipeline.yml | 245 ------------- .../example_2/vertex_pipeline.yml | 245 ------------- tests/test_compiler.py | 301 +++++++++------- tests/test_pipeline.py | 12 +- 12 files changed, 475 insertions(+), 1468 deletions(-) create mode 100644 src/fondant/testing.py delete mode 100644 tests/example_pipelines/compiled_pipeline/example_1/docker-compose.yml delete mode 100644 tests/example_pipelines/compiled_pipeline/example_1/kubeflow_pipeline.yml delete mode 100644 tests/example_pipelines/compiled_pipeline/example_1/vertex_pipeline.yml delete mode 100644 tests/example_pipelines/compiled_pipeline/example_2/docker-compose.yml delete mode 100644 tests/example_pipelines/compiled_pipeline/example_2/kubeflow_pipeline.yml delete mode 100644 tests/example_pipelines/compiled_pipeline/example_2/vertex_pipeline.yml diff --git a/src/fondant/core/component_spec.py b/src/fondant/core/component_spec.py index 3cb8aa0a0..9a34cdf67 100644 --- a/src/fondant/core/component_spec.py +++ b/src/fondant/core/component_spec.py @@ -151,6 +151,23 @@ def to_file(self, path) -> None: def name(self): return self._specification["name"] + @property + def sanitized_component_name(self): + """Cleans and converts a name to be kfp V2 compatible. + + Taken from https://github.com/kubeflow/pipelines/blob/ + cfe671c485d4ee8514290ee81ca2785e8bda5c9b/sdk/python/kfp/dsl/utils.py#L52 + """ + return ( + re.sub( + "-+", + "-", + re.sub("[^-0-9a-z]+", "-", self._specification["name"].lower()), + ) + .lstrip("-") + .rstrip("-") + ) + @property def description(self): return self._specification["description"] @@ -314,19 +331,6 @@ def convert_arguments(fondant_component: ComponentSpec): return args - @staticmethod - def sanitize_component_name(name: str) -> str: - """Cleans and converts a name to be kfp V2 compatible. - - Taken from https://github.com/kubeflow/pipelines/blob/ - cfe671c485d4ee8514290ee81ca2785e8bda5c9b/sdk/python/kfp/dsl/utils.py#L52 - """ - return ( - re.sub("-+", "-", re.sub("[^-0-9a-z]+", "-", name.lower())) - .lstrip("-") - .rstrip("-") - ) - @classmethod def from_fondant_component_spec(cls, fondant_component: ComponentSpec): """Generate a Kubeflow component spec from a Fondant component spec.""" @@ -336,7 +340,7 @@ def from_fondant_component_spec(cls, fondant_component: ComponentSpec): }, } - cleaned_component_name = cls.sanitize_component_name(fondant_component.name) + cleaned_component_name = fondant_component.sanitized_component_name specification = { "components": { diff --git a/src/fondant/pipeline/compiler.py b/src/fondant/pipeline/compiler.py index 3b4bc2a2a..05c072d59 100644 --- a/src/fondant/pipeline/compiler.py +++ b/src/fondant/pipeline/compiler.py @@ -229,6 +229,9 @@ def _generate_spec( "name": pipeline.name, "version": "3.8", "services": services, + "labels": { + "description": pipeline.description, + }, } def _set_configuration(self, services, fondant_component_operation, component_name): diff --git a/src/fondant/pipeline/pipeline.py b/src/fondant/pipeline/pipeline.py index 0fbfdeb62..522a1af0b 100644 --- a/src/fondant/pipeline/pipeline.py +++ b/src/fondant/pipeline/pipeline.py @@ -144,7 +144,7 @@ def __init__( self.component_spec = ComponentSpec.from_file( self.component_dir / self.COMPONENT_SPEC_NAME, ) - self.name = self.component_spec.name.replace(" ", "_").lower() + self.name = self.component_spec.sanitized_component_name self.cache = self._configure_caching_from_image_tag(cache) self.cluster_type = cluster_type self.client_kwargs = client_kwargs diff --git a/src/fondant/testing.py b/src/fondant/testing.py new file mode 100644 index 000000000..184669388 --- /dev/null +++ b/src/fondant/testing.py @@ -0,0 +1,265 @@ +import typing as t +from abc import abstractmethod +from dataclasses import dataclass + +import yaml + +from fondant.core.exceptions import InvalidPipelineDefinition + + +@dataclass +class Accelerator: + """ + Represents a hardware accelerator configuration. + + Args: + type: Type of the accelerator. + number: The number of the accelerator. + """ + + type: str + number: int + + +@dataclass +class ComponentConfigs: + """ + Represents the configurations for a component. + + Args: + image: The Docker image for the component. + arguments: Arguments to be passed to the component. + dependencies: List of dependencies required for the component. + accelerators: List of hardware accelerators for the component. + cpu_request: CPU request for the component. + cpu_limit: CPU limit for the component. + memory_request: Memory request for the component. + memory_limit: Memory limit for the component. + """ + + image: t.Optional[str] = None + arguments: t.Optional[t.Dict[str, t.Any]] = None + dependencies: t.Optional[t.List[str]] = None + accelerators: t.Optional[t.List[Accelerator]] = None + cpu_request: t.Optional[str] = None + cpu_limit: t.Optional[str] = None + memory_request: t.Optional[str] = None + memory_limit: t.Optional[str] = None + + +@dataclass +class KubeflowComponentConfig(ComponentConfigs): + """ + Represents Kubeflow-specific configurations for a component. + + Args: + node_pool_label: Label for the node pool. + node_pool_name: Name of the node pool. + """ + + node_pool_label: t.Optional[str] = None + node_pool_name: t.Optional[str] = None + + +@dataclass +class DockerComponentConfig(ComponentConfigs): + """ + Represents Docker-specific configurations for a component. + + Args: + context: The context for the Docker component. + volumes: List of volumes for the Docker component. + ports: List of ports for the Docker component. + """ + + context: t.Optional[str] = None + volumes: t.Optional[t.List[t.Union[str, dict]]] = None + ports: t.Optional[t.List[t.Union[str, dict]]] = None + + +@dataclass +class PipelineConfigs: + """ + Represents the configurations for a pipeline. + + Args: + pipeline_name: Name of the pipeline. + pipeline_description: Description of the pipeline. + """ + + pipeline_name: str + pipeline_description: str + pipeline_version: str + + @classmethod + @abstractmethod + def from_spec(cls, spec_path: str) -> "PipelineConfigs": + """Get pipeline configs from a pipeline specification.""" + + +@dataclass +class DockerPipelineConfigs(PipelineConfigs): + """ + Represents Docker-specific configurations for a pipeline. + + Args: + component_configs: Dictionary of Docker component configurations for the pipeline. + """ + + component_configs: t.Dict[str, DockerComponentConfig] + + @classmethod + def from_spec(cls, spec_path: str) -> "DockerPipelineConfigs": + """Get pipeline configs from a pipeline specification.""" + with open(spec_path) as file_: + specification = yaml.safe_load(file_) + + components_configs_dict = {} + + # Iterate through each service + for component_name, component_configs in specification["services"].items(): + # Get arguments from command + command_list = component_configs.get("command", []) + arguments = {} + for i in range(0, len(command_list), 2): + arguments[command_list[i].lstrip("-")] = command_list[i + 1] + + # Get accelerator name and number of accelerators + resources = component_configs.get("deploy", {}).get("resources", {}) + devices = resources.get("reservations", {}).get("devices", {}) + + accelerator_list = [] + if devices: + for device in devices: + accelerator = Accelerator( + type=device["capabilities"][0], + number=device["count"], + ) + accelerator_list.append(accelerator) + + component_config = DockerComponentConfig( + image=component_configs.get("image", None), + arguments=arguments, + dependencies=list(component_configs.get("depends_on", {}).keys()), + accelerators=accelerator_list, + context=component_configs.get("build", {}).get("context", None), + ports=component_configs.get("ports", None), + volumes=component_configs.get("volumes", None), + cpu_request=None, + cpu_limit=None, + memory_request=None, + memory_limit=None, + ) + components_configs_dict[component_name] = component_config + + return cls( + pipeline_name=specification["name"], + pipeline_version=specification["version"], + pipeline_description=specification.get("labels", {}).get( + "description", + None, + ), + component_configs=components_configs_dict, + ) + + +@dataclass +class KubeflowPipelineConfigs(PipelineConfigs): + """ + Represents Kubeflow-specific configurations for a pipeline. + + Args: + component_configs: Dictionary of Kubeflow component configurations for the pipeline. + """ + + component_configs: t.Dict[str, KubeflowComponentConfig] + + @classmethod + def from_spec(cls, spec_path: str) -> "KubeflowPipelineConfigs": + """Get pipeline configs from a pipeline specification.""" + # Two specs are present and loaded in the yaml file (component spec and k8s specs) + k8_specification = {} + specification = {} + + with open(spec_path) as file_: + for spec in yaml.load_all(file_, Loader=yaml.FullLoader): + if "deploymentSpec" in spec: + specification = spec + elif "platforms" in spec: + k8_specification = spec["platforms"]["kubernetes"][ + "deploymentSpec" + ]["executors"] + + if not specification: + msg = "No component specification found in the pipeline specification" + raise InvalidPipelineDefinition(msg) + components_configs_dict = {} + + # Iterate through each service + for component_name, component_configs in specification["root"]["dag"][ + "tasks" + ].items(): + # Get arguments from command + arguments = { + arg_name: arg_value["runtimeValue"]["constant"] + for arg_name, arg_value in component_configs["inputs"][ + "parameters" + ].items() + } + + # Get accelerator name and number of accelerators + container_spec = specification["deploymentSpec"]["executors"][ + f"exec-{component_name}" + ]["container"] + resources = component_configs.get("resources", {}) + devices = resources.get("accelerator", {}) + accelerator_list = [] + + if devices: + for device in devices: + accelerator = Accelerator( + type=device["accelerator"]["type"], + number=device["accelerator"]["count"], + ) + accelerator_list.append(accelerator) + + # Get node pool name and label + node_pool_label = None + node_pool_name = None + if k8_specification: + node_pool_dict = ( + k8_specification.get(f"exec-{component_name}", {}) + .get("nodeSelector", {}) + .get("labels", {}) + ) + if node_pool_dict: + node_pool_label = list(node_pool_dict.keys())[0] + node_pool_name = list(node_pool_dict.values())[0] + + component_config = KubeflowComponentConfig( + image=container_spec.get("image"), + arguments=arguments, + dependencies=component_configs.get("dependentTasks", []), + accelerators=accelerator_list, + cpu_request=component_configs.get("cpuRequest", None), + cpu_limit=component_configs.get("cpuLimit", None), + memory_request=component_configs.get("memoryRequest", None), + memory_limit=component_configs.get("memoryLimit", None), + node_pool_name=node_pool_name, + node_pool_label=node_pool_label, + ) + components_configs_dict[component_name] = component_config + + pipeline_info = specification["pipelineInfo"] + + return cls( + pipeline_name=pipeline_info["name"], + pipeline_version=specification["sdkVersion"], + pipeline_description=pipeline_info.get("description", None), + component_configs=components_configs_dict, + ) + + +@dataclass +class VertexPipelineConfigs(KubeflowPipelineConfigs): + pass diff --git a/tests/example_pipelines/compiled_pipeline/example_1/docker-compose.yml b/tests/example_pipelines/compiled_pipeline/example_1/docker-compose.yml deleted file mode 100644 index 362459873..000000000 --- a/tests/example_pipelines/compiled_pipeline/example_1/docker-compose.yml +++ /dev/null @@ -1,94 +0,0 @@ -name: testpipeline -services: - first_component: - build: - args: [] - context: tests/example_pipelines/valid_pipeline/example_1/first_component - command: - - --metadata - - '{"base_path": "/foo/bar", "pipeline_name": "testpipeline", "run_id": "testpipeline-20230101000000", - "component_id": "first_component", "cache_key": "1"}' - - --output_manifest_path - - /foo/bar/testpipeline/testpipeline-20230101000000/first_component/manifest.json - - --storage_args - - a dummy string arg - - --input_partition_rows - - '10' - - --cache - - 'False' - - --cluster_type - - default - - --component_spec - - '{"name": "First component", "description": "This is an example component", - "image": "example_component:latest", "produces": {"images": {"fields": {"data": - {"type": "binary"}}}, "captions": {"fields": {"data": {"type": "string"}}}}, - "args": {"storage_args": {"description": "Storage arguments", "type": "str"}}}' - depends_on: {} - volumes: [] - ports: - - 8787:8787 - second_component: - build: - args: [] - context: tests/example_pipelines/valid_pipeline/example_1/second_component - command: - - --metadata - - '{"base_path": "/foo/bar", "pipeline_name": "testpipeline", "run_id": "testpipeline-20230101000000", - "component_id": "second_component", "cache_key": "2"}' - - --output_manifest_path - - /foo/bar/testpipeline/testpipeline-20230101000000/second_component/manifest.json - - --storage_args - - a dummy string arg - - --input_partition_rows - - '10' - - --cache - - 'False' - - --cluster_type - - default - - --component_spec - - '{"name": "Second component", "description": "This is an example component", - "image": "example_component:latest", "consumes": {"images": {"fields": {"data": - {"type": "binary"}}}}, "produces": {"embeddings": {"fields": {"data": {"type": - "array", "items": {"type": "float32"}}}}}, "args": {"storage_args": {"description": - "Storage arguments", "type": "str"}}}' - - --input_manifest_path - - /foo/bar/testpipeline/testpipeline-20230101000000/first_component/manifest.json - depends_on: - first_component: - condition: service_completed_successfully - ports: - - 8787:8787 - volumes: [] - third_component: - build: - args: [] - context: tests/example_pipelines/valid_pipeline/example_1/third_component - command: - - --metadata - - '{"base_path": "/foo/bar", "pipeline_name": "testpipeline", "run_id": "testpipeline-20230101000000", - "component_id": "third_component", "cache_key": "3"}' - - --output_manifest_path - - /foo/bar/testpipeline/testpipeline-20230101000000/third_component/manifest.json - - --storage_args - - a dummy string arg - - --cache - - 'False' - - --cluster_type - - default - - --component_spec - - '{"name": "Third component", "description": "This is an example component", - "image": "example_component:latest", "consumes": {"images": {"fields": {"data": - {"type": "binary"}}}, "captions": {"fields": {"data": {"type": "string"}}}, - "embeddings": {"fields": {"data": {"type": "array", "items": {"type": "float32"}}}}}, - "produces": {"images": {"fields": {"data": {"type": "binary"}}}, "additionalSubsets": - false}, "args": {"storage_args": {"description": "Storage arguments", "type": - "str"}}}' - - --input_manifest_path - - /foo/bar/testpipeline/testpipeline-20230101000000/second_component/manifest.json - depends_on: - second_component: - condition: service_completed_successfully - ports: - - 8787:8787 - volumes: [] -version: '3.8' diff --git a/tests/example_pipelines/compiled_pipeline/example_1/kubeflow_pipeline.yml b/tests/example_pipelines/compiled_pipeline/example_1/kubeflow_pipeline.yml deleted file mode 100644 index aa8cf7463..000000000 --- a/tests/example_pipelines/compiled_pipeline/example_1/kubeflow_pipeline.yml +++ /dev/null @@ -1,335 +0,0 @@ -components: - comp-first-component: - executorLabel: exec-first-component - inputDefinitions: - parameters: - cache: - defaultValue: true - isOptional: true - parameterType: BOOLEAN - client_kwargs: - defaultValue: {} - isOptional: true - parameterType: STRUCT - cluster_type: - defaultValue: default - isOptional: true - parameterType: STRING - component_spec: - parameterType: STRUCT - input_manifest_path: - isOptional: true - parameterType: STRING - input_partition_rows: - isOptional: true - parameterType: NUMBER_INTEGER - metadata: - parameterType: STRING - output_manifest_path: - parameterType: STRING - storage_args: - parameterType: STRING - comp-second-component: - executorLabel: exec-second-component - inputDefinitions: - parameters: - cache: - defaultValue: true - isOptional: true - parameterType: BOOLEAN - client_kwargs: - defaultValue: {} - isOptional: true - parameterType: STRUCT - cluster_type: - defaultValue: default - isOptional: true - parameterType: STRING - component_spec: - parameterType: STRUCT - input_manifest_path: - isOptional: true - parameterType: STRING - input_partition_rows: - isOptional: true - parameterType: NUMBER_INTEGER - metadata: - parameterType: STRING - output_manifest_path: - parameterType: STRING - storage_args: - parameterType: STRING - comp-third-component: - executorLabel: exec-third-component - inputDefinitions: - parameters: - cache: - defaultValue: true - isOptional: true - parameterType: BOOLEAN - client_kwargs: - defaultValue: {} - isOptional: true - parameterType: STRUCT - cluster_type: - defaultValue: default - isOptional: true - parameterType: STRING - component_spec: - parameterType: STRUCT - input_manifest_path: - isOptional: true - parameterType: STRING - input_partition_rows: - isOptional: true - parameterType: NUMBER_INTEGER - metadata: - parameterType: STRING - output_manifest_path: - parameterType: STRING - storage_args: - parameterType: STRING -deploymentSpec: - executors: - exec-first-component: - container: - args: - - --storage_args - - '{{$.inputs.parameters[''storage_args'']}}' - - --input_partition_rows - - '{{$.inputs.parameters[''input_partition_rows'']}}' - - --cache - - '{{$.inputs.parameters[''cache'']}}' - - --cluster_type - - '{{$.inputs.parameters[''cluster_type'']}}' - - --component_spec - - '{{$.inputs.parameters[''component_spec'']}}' - - --output_manifest_path - - '{{$.inputs.parameters[''output_manifest_path'']}}' - - --metadata - - '{{$.inputs.parameters[''metadata'']}}' - command: - - fondant - - execute - - main - image: example_component:latest - resources: - memoryLimit: 0.512 - memoryRequest: 0.256 - exec-second-component: - container: - args: - - --storage_args - - '{{$.inputs.parameters[''storage_args'']}}' - - --input_partition_rows - - '{{$.inputs.parameters[''input_partition_rows'']}}' - - --cache - - '{{$.inputs.parameters[''cache'']}}' - - --cluster_type - - '{{$.inputs.parameters[''cluster_type'']}}' - - --component_spec - - '{{$.inputs.parameters[''component_spec'']}}' - - --output_manifest_path - - '{{$.inputs.parameters[''output_manifest_path'']}}' - - --metadata - - '{{$.inputs.parameters[''metadata'']}}' - - --input_manifest_path - - '{{$.inputs.parameters[''input_manifest_path'']}}' - command: - - fondant - - execute - - main - image: example_component:latest - exec-third-component: - container: - args: - - --storage_args - - '{{$.inputs.parameters[''storage_args'']}}' - - --cache - - '{{$.inputs.parameters[''cache'']}}' - - --cluster_type - - '{{$.inputs.parameters[''cluster_type'']}}' - - --component_spec - - '{{$.inputs.parameters[''component_spec'']}}' - - --output_manifest_path - - '{{$.inputs.parameters[''output_manifest_path'']}}' - - --metadata - - '{{$.inputs.parameters[''metadata'']}}' - - --input_manifest_path - - '{{$.inputs.parameters[''input_manifest_path'']}}' - command: - - fondant - - execute - - main - image: example_component:latest -pipelineInfo: - description: description of the test pipeline - name: testpipeline -root: - dag: - tasks: - first-component: - cachingOptions: {} - componentRef: - name: comp-first-component - inputs: - parameters: - cache: - runtimeValue: - constant: false - cluster_type: - runtimeValue: - constant: default - component_spec: - runtimeValue: - constant: - args: - storage_args: - description: Storage arguments - type: str - description: This is an example component - image: example_component:latest - name: First component - produces: - captions: - fields: - data: - type: string - images: - fields: - data: - type: binary - input_partition_rows: - runtimeValue: - constant: 10.0 - metadata: - runtimeValue: - constant: '{"base_path": "/foo/bar", "pipeline_name": "testpipeline", - "run_id": "testpipeline-20230101000000", "component_id": "first_component", - "cache_key": "1"}' - output_manifest_path: - runtimeValue: - constant: /foo/bar/testpipeline/testpipeline-20230101000000/first_component/manifest.json - storage_args: - runtimeValue: - constant: a dummy string arg - taskInfo: - name: first-component - second-component: - cachingOptions: {} - componentRef: - name: comp-second-component - dependentTasks: - - first-component - inputs: - parameters: - cache: - runtimeValue: - constant: false - cluster_type: - runtimeValue: - constant: default - component_spec: - runtimeValue: - constant: - args: - storage_args: - description: Storage arguments - type: str - consumes: - images: - fields: - data: - type: binary - description: This is an example component - image: example_component:latest - name: Second component - produces: - embeddings: - fields: - data: - items: - type: float32 - type: array - input_manifest_path: - runtimeValue: - constant: /foo/bar/testpipeline/testpipeline-20230101000000/first_component/manifest.json - input_partition_rows: - runtimeValue: - constant: 10.0 - metadata: - runtimeValue: - constant: '{"base_path": "/foo/bar", "pipeline_name": "testpipeline", - "run_id": "testpipeline-20230101000000", "component_id": "second_component", - "cache_key": "2"}' - output_manifest_path: - runtimeValue: - constant: /foo/bar/testpipeline/testpipeline-20230101000000/second_component/manifest.json - storage_args: - runtimeValue: - constant: a dummy string arg - taskInfo: - name: second-component - third-component: - cachingOptions: {} - componentRef: - name: comp-third-component - dependentTasks: - - second-component - inputs: - parameters: - cache: - runtimeValue: - constant: false - cluster_type: - runtimeValue: - constant: default - component_spec: - runtimeValue: - constant: - args: - storage_args: - description: Storage arguments - type: str - consumes: - captions: - fields: - data: - type: string - embeddings: - fields: - data: - items: - type: float32 - type: array - images: - fields: - data: - type: binary - description: This is an example component - image: example_component:latest - name: Third component - produces: - additionalSubsets: false - images: - fields: - data: - type: binary - input_manifest_path: - runtimeValue: - constant: /foo/bar/testpipeline/testpipeline-20230101000000/second_component/manifest.json - metadata: - runtimeValue: - constant: '{"base_path": "/foo/bar", "pipeline_name": "testpipeline", - "run_id": "testpipeline-20230101000000", "component_id": "third_component", - "cache_key": "3"}' - output_manifest_path: - runtimeValue: - constant: /foo/bar/testpipeline/testpipeline-20230101000000/third_component/manifest.json - storage_args: - runtimeValue: - constant: a dummy string arg - taskInfo: - name: third-component -schemaVersion: 2.1.0 -sdkVersion: kfp-2.3.0 diff --git a/tests/example_pipelines/compiled_pipeline/example_1/vertex_pipeline.yml b/tests/example_pipelines/compiled_pipeline/example_1/vertex_pipeline.yml deleted file mode 100644 index f81e56f1c..000000000 --- a/tests/example_pipelines/compiled_pipeline/example_1/vertex_pipeline.yml +++ /dev/null @@ -1,334 +0,0 @@ -components: - comp-first-component: - executorLabel: exec-first-component - inputDefinitions: - parameters: - cache: - defaultValue: true - isOptional: true - parameterType: BOOLEAN - client_kwargs: - defaultValue: {} - isOptional: true - parameterType: STRUCT - cluster_type: - defaultValue: default - isOptional: true - parameterType: STRING - component_spec: - parameterType: STRUCT - input_manifest_path: - isOptional: true - parameterType: STRING - input_partition_rows: - isOptional: true - parameterType: NUMBER_INTEGER - metadata: - parameterType: STRING - output_manifest_path: - parameterType: STRING - storage_args: - parameterType: STRING - comp-second-component: - executorLabel: exec-second-component - inputDefinitions: - parameters: - cache: - defaultValue: true - isOptional: true - parameterType: BOOLEAN - client_kwargs: - defaultValue: {} - isOptional: true - parameterType: STRUCT - cluster_type: - defaultValue: default - isOptional: true - parameterType: STRING - component_spec: - parameterType: STRUCT - input_manifest_path: - isOptional: true - parameterType: STRING - input_partition_rows: - isOptional: true - parameterType: NUMBER_INTEGER - metadata: - parameterType: STRING - output_manifest_path: - parameterType: STRING - storage_args: - parameterType: STRING - comp-third-component: - executorLabel: exec-third-component - inputDefinitions: - parameters: - cache: - defaultValue: true - isOptional: true - parameterType: BOOLEAN - client_kwargs: - defaultValue: {} - isOptional: true - parameterType: STRUCT - cluster_type: - defaultValue: default - isOptional: true - parameterType: STRING - component_spec: - parameterType: STRUCT - input_manifest_path: - isOptional: true - parameterType: STRING - input_partition_rows: - isOptional: true - parameterType: NUMBER_INTEGER - metadata: - parameterType: STRING - output_manifest_path: - parameterType: STRING - storage_args: - parameterType: STRING -deploymentSpec: - executors: - exec-first-component: - container: - args: - - "--storage_args" - - "{{$.inputs.parameters['storage_args']}}" - - "--input_partition_rows" - - "{{$.inputs.parameters['input_partition_rows']}}" - - "--cache" - - "{{$.inputs.parameters['cache']}}" - - "--cluster_type" - - "{{$.inputs.parameters['cluster_type']}}" - - "--component_spec" - - "{{$.inputs.parameters['component_spec']}}" - - "--output_manifest_path" - - "{{$.inputs.parameters['output_manifest_path']}}" - - "--metadata" - - "{{$.inputs.parameters['metadata']}}" - command: - - fondant - - execute - - main - image: example_component:latest - resources: - memoryLimit: 0.512 - exec-second-component: - container: - args: - - "--storage_args" - - "{{$.inputs.parameters['storage_args']}}" - - "--input_partition_rows" - - "{{$.inputs.parameters['input_partition_rows']}}" - - "--cache" - - "{{$.inputs.parameters['cache']}}" - - "--cluster_type" - - "{{$.inputs.parameters['cluster_type']}}" - - "--component_spec" - - "{{$.inputs.parameters['component_spec']}}" - - "--output_manifest_path" - - "{{$.inputs.parameters['output_manifest_path']}}" - - "--metadata" - - "{{$.inputs.parameters['metadata']}}" - - "--input_manifest_path" - - "{{$.inputs.parameters['input_manifest_path']}}" - command: - - fondant - - execute - - main - image: example_component:latest - exec-third-component: - container: - args: - - "--storage_args" - - "{{$.inputs.parameters['storage_args']}}" - - "--cache" - - "{{$.inputs.parameters['cache']}}" - - "--cluster_type" - - "{{$.inputs.parameters['cluster_type']}}" - - "--component_spec" - - "{{$.inputs.parameters['component_spec']}}" - - "--output_manifest_path" - - "{{$.inputs.parameters['output_manifest_path']}}" - - "--metadata" - - "{{$.inputs.parameters['metadata']}}" - - "--input_manifest_path" - - "{{$.inputs.parameters['input_manifest_path']}}" - command: - - fondant - - execute - - main - image: example_component:latest -pipelineInfo: - description: description of the test pipeline - name: testpipeline -root: - dag: - tasks: - first-component: - cachingOptions: {} - componentRef: - name: comp-first-component - inputs: - parameters: - cache: - runtimeValue: - constant: false - cluster_type: - runtimeValue: - constant: default - component_spec: - runtimeValue: - constant: - args: - storage_args: - description: Storage arguments - type: str - description: This is an example component - image: example_component:latest - name: First component - produces: - captions: - fields: - data: - type: string - images: - fields: - data: - type: binary - input_partition_rows: - runtimeValue: - constant: 10 - metadata: - runtimeValue: - constant: '{"base_path": "/foo/bar", "pipeline_name": "testpipeline", - "run_id": "testpipeline-20230101000000", "component_id": "first_component", - "cache_key": "1"}' - output_manifest_path: - runtimeValue: - constant: "/foo/bar/testpipeline/testpipeline-20230101000000/first_component/manifest.json" - storage_args: - runtimeValue: - constant: a dummy string arg - taskInfo: - name: first-component - second-component: - cachingOptions: {} - componentRef: - name: comp-second-component - dependentTasks: - - first-component - inputs: - parameters: - cache: - runtimeValue: - constant: false - cluster_type: - runtimeValue: - constant: default - component_spec: - runtimeValue: - constant: - args: - storage_args: - description: Storage arguments - type: str - consumes: - images: - fields: - data: - type: binary - description: This is an example component - image: example_component:latest - name: Second component - produces: - embeddings: - fields: - data: - items: - type: float32 - type: array - input_manifest_path: - runtimeValue: - constant: "/foo/bar/testpipeline/testpipeline-20230101000000/first_component/manifest.json" - input_partition_rows: - runtimeValue: - constant: 10 - metadata: - runtimeValue: - constant: '{"base_path": "/foo/bar", "pipeline_name": "testpipeline", - "run_id": "testpipeline-20230101000000", "component_id": "second_component", - "cache_key": "2"}' - output_manifest_path: - runtimeValue: - constant: "/foo/bar/testpipeline/testpipeline-20230101000000/second_component/manifest.json" - storage_args: - runtimeValue: - constant: a dummy string arg - taskInfo: - name: second-component - third-component: - cachingOptions: {} - componentRef: - name: comp-third-component - dependentTasks: - - second-component - inputs: - parameters: - cache: - runtimeValue: - constant: false - cluster_type: - runtimeValue: - constant: default - component_spec: - runtimeValue: - constant: - args: - storage_args: - description: Storage arguments - type: str - consumes: - captions: - fields: - data: - type: string - embeddings: - fields: - data: - items: - type: float32 - type: array - images: - fields: - data: - type: binary - description: This is an example component - image: example_component:latest - name: Third component - produces: - additionalSubsets: false - images: - fields: - data: - type: binary - input_manifest_path: - runtimeValue: - constant: "/foo/bar/testpipeline/testpipeline-20230101000000/second_component/manifest.json" - metadata: - runtimeValue: - constant: '{"base_path": "/foo/bar", "pipeline_name": "testpipeline", - "run_id": "testpipeline-20230101000000", "component_id": "third_component", - "cache_key": "3"}' - output_manifest_path: - runtimeValue: - constant: "/foo/bar/testpipeline/testpipeline-20230101000000/third_component/manifest.json" - storage_args: - runtimeValue: - constant: a dummy string arg - taskInfo: - name: third-component -schemaVersion: 2.1.0 -sdkVersion: kfp-2.3.0 diff --git a/tests/example_pipelines/compiled_pipeline/example_2/docker-compose.yml b/tests/example_pipelines/compiled_pipeline/example_2/docker-compose.yml deleted file mode 100644 index dd4bb1a5b..000000000 --- a/tests/example_pipelines/compiled_pipeline/example_2/docker-compose.yml +++ /dev/null @@ -1,75 +0,0 @@ -name: testpipeline -services: - first_component: - build: - args: [] - context: tests/example_pipelines/valid_pipeline/example_1/first_component - command: - - --metadata - - '{"base_path": "/foo/bar", "pipeline_name": "testpipeline", "run_id": "testpipeline-20230101000000", - "component_id": "first_component", "cache_key": "1"}' - - --output_manifest_path - - /foo/bar/testpipeline/testpipeline-20230101000000/first_component/manifest.json - - --storage_args - - a dummy string arg - - --cache - - 'False' - - --cluster_type - - default - - --component_spec - - '{"name": "First component", "description": "This is an example component", - "image": "example_component:latest", "produces": {"images": {"fields": {"data": - {"type": "binary"}}}, "captions": {"fields": {"data": {"type": "string"}}}}, - "args": {"storage_args": {"description": "Storage arguments", "type": "str"}}}' - depends_on: {} - ports: - - 8787:8787 - volumes: [] - image_cropping: - image: fndnt/image_cropping:dev - command: - - --metadata - - '{"base_path": "/foo/bar", "pipeline_name": "testpipeline", "run_id": "testpipeline-20230101000000", - "component_id": "image_cropping", "cache_key": "2"}' - - --output_manifest_path - - /foo/bar/testpipeline/testpipeline-20230101000000/image_cropping/manifest.json - - --cropping_threshold - - '0' - - --padding - - '0' - - --cache - - 'True' - - --cluster_type - - default - - --component_spec - - '{"name": "Image cropping", "image": "fndnt/image_cropping:dev", "description": - "This component crops out image borders. This is typically useful when working - with graphical \nimages that have single-color borders (e.g. logos, icons, etc.).\n\nThe - component takes an image and calculates which color is most present in the border. - It then \ncrops the image in order to minimize this single-color border. The - `padding` argument will add \nextra border to the image before cropping it, - in order to avoid cutting off parts of the image.\nThe resulting crop will always - be square. If a crop is not possible, the component will return \nthe original - image.\n\n#### Examples\nExamples of image cropping by removing the single-color - border. Left side is original image, \nright side is border-cropped image.\n\n![Example - of image cropping by removing the single-color border. Left side is original, - right side is cropped image](../../docs/art/components/image_cropping/component_border_crop_1.png)\n![Example - of image cropping by removing the single-color border. Left side is original, - right side is cropped image](../../docs/art/components/image_cropping/component_border_crop_0.png)\n", - "consumes": {"images": {"fields": {"data": {"type": "binary"}}}}, "produces": - {"images": {"fields": {"data": {"type": "binary"}, "width": {"type": "int32"}, - "height": {"type": "int32"}}}}, "args": {"cropping_threshold": {"description": - "Threshold parameter used for detecting borders. A lower (negative) parameter - results in a more performant border detection, but can cause overcropping. Default - is -30", "type": "int", "default": -30}, "padding": {"description": "Padding - for the image cropping. The padding is added to all borders of the image.", - "type": "int", "default": 10}}}' - - --input_manifest_path - - /foo/bar/testpipeline/testpipeline-20230101000000/first_component/manifest.json - depends_on: - first_component: - condition: service_completed_successfully - ports: - - 8787:8787 - volumes: [] -version: '3.8' diff --git a/tests/example_pipelines/compiled_pipeline/example_2/kubeflow_pipeline.yml b/tests/example_pipelines/compiled_pipeline/example_2/kubeflow_pipeline.yml deleted file mode 100644 index 989003a61..000000000 --- a/tests/example_pipelines/compiled_pipeline/example_2/kubeflow_pipeline.yml +++ /dev/null @@ -1,245 +0,0 @@ -components: - comp-first-component: - executorLabel: exec-first-component - inputDefinitions: - parameters: - cache: - defaultValue: true - isOptional: true - parameterType: BOOLEAN - client_kwargs: - defaultValue: {} - isOptional: true - parameterType: STRUCT - cluster_type: - defaultValue: default - isOptional: true - parameterType: STRING - component_spec: - parameterType: STRUCT - input_manifest_path: - isOptional: true - parameterType: STRING - input_partition_rows: - isOptional: true - parameterType: NUMBER_INTEGER - metadata: - parameterType: STRING - output_manifest_path: - parameterType: STRING - storage_args: - parameterType: STRING - comp-image-cropping: - executorLabel: exec-image-cropping - inputDefinitions: - parameters: - cache: - defaultValue: true - isOptional: true - parameterType: BOOLEAN - client_kwargs: - defaultValue: {} - isOptional: true - parameterType: STRUCT - cluster_type: - defaultValue: default - isOptional: true - parameterType: STRING - component_spec: - parameterType: STRUCT - cropping_threshold: - defaultValue: -30.0 - isOptional: true - parameterType: NUMBER_INTEGER - input_manifest_path: - isOptional: true - parameterType: STRING - input_partition_rows: - isOptional: true - parameterType: NUMBER_INTEGER - metadata: - parameterType: STRING - output_manifest_path: - parameterType: STRING - padding: - defaultValue: 10.0 - isOptional: true - parameterType: NUMBER_INTEGER -deploymentSpec: - executors: - exec-first-component: - container: - args: - - --storage_args - - '{{$.inputs.parameters[''storage_args'']}}' - - --cache - - '{{$.inputs.parameters[''cache'']}}' - - --cluster_type - - '{{$.inputs.parameters[''cluster_type'']}}' - - --component_spec - - '{{$.inputs.parameters[''component_spec'']}}' - - --output_manifest_path - - '{{$.inputs.parameters[''output_manifest_path'']}}' - - --metadata - - '{{$.inputs.parameters[''metadata'']}}' - command: - - fondant - - execute - - main - image: example_component:latest - exec-image-cropping: - container: - args: - - --cropping_threshold - - '{{$.inputs.parameters[''cropping_threshold'']}}' - - --padding - - '{{$.inputs.parameters[''padding'']}}' - - --cache - - '{{$.inputs.parameters[''cache'']}}' - - --cluster_type - - '{{$.inputs.parameters[''cluster_type'']}}' - - --component_spec - - '{{$.inputs.parameters[''component_spec'']}}' - - --output_manifest_path - - '{{$.inputs.parameters[''output_manifest_path'']}}' - - --metadata - - '{{$.inputs.parameters[''metadata'']}}' - - --input_manifest_path - - '{{$.inputs.parameters[''input_manifest_path'']}}' - command: - - fondant - - execute - - main - image: fndnt/image_cropping:dev -pipelineInfo: - description: description of the test pipeline - name: testpipeline -root: - dag: - tasks: - first-component: - cachingOptions: {} - componentRef: - name: comp-first-component - inputs: - parameters: - cache: - runtimeValue: - constant: false - cluster_type: - runtimeValue: - constant: default - component_spec: - runtimeValue: - constant: - args: - storage_args: - description: Storage arguments - type: str - description: This is an example component - image: example_component:latest - name: First component - produces: - captions: - fields: - data: - type: string - images: - fields: - data: - type: binary - metadata: - runtimeValue: - constant: '{"base_path": "/foo/bar", "pipeline_name": "testpipeline", - "run_id": "testpipeline-20230101000000", "component_id": "first_component", - "cache_key": "1"}' - output_manifest_path: - runtimeValue: - constant: /foo/bar/testpipeline/testpipeline-20230101000000/first_component/manifest.json - storage_args: - runtimeValue: - constant: a dummy string arg - taskInfo: - name: first-component - image-cropping: - cachingOptions: {} - componentRef: - name: comp-image-cropping - dependentTasks: - - first-component - inputs: - parameters: - cache: - runtimeValue: - constant: true - cluster_type: - runtimeValue: - constant: default - component_spec: - runtimeValue: - constant: - args: - cropping_threshold: - default: -30.0 - description: Threshold parameter used for detecting borders. - A lower (negative) parameter results in a more performant - border detection, but can cause overcropping. Default is -30 - type: int - padding: - default: 10.0 - description: Padding for the image cropping. The padding is - added to all borders of the image. - type: int - consumes: - images: - fields: - data: - type: binary - description: "This component crops out image borders. This is typically\ - \ useful when working with graphical \nimages that have single-color\ - \ borders (e.g. logos, icons, etc.).\n\nThe component takes an\ - \ image and calculates which color is most present in the border.\ - \ It then \ncrops the image in order to minimize this single-color\ - \ border. The `padding` argument will add \nextra border to the\ - \ image before cropping it, in order to avoid cutting off parts\ - \ of the image.\nThe resulting crop will always be square. If\ - \ a crop is not possible, the component will return \nthe original\ - \ image.\n\n#### Examples\nExamples of image cropping by removing\ - \ the single-color border. Left side is original image, \nright\ - \ side is border-cropped image.\n\n![Example of image cropping\ - \ by removing the single-color border. Left side is original,\ - \ right side is cropped image](../../docs/art/components/image_cropping/component_border_crop_1.png)\n\ - ![Example of image cropping by removing the single-color border.\ - \ Left side is original, right side is cropped image](../../docs/art/components/image_cropping/component_border_crop_0.png)\n" - image: fndnt/image_cropping:dev - name: Image cropping - produces: - images: - fields: - data: - type: binary - height: - type: int32 - width: - type: int32 - cropping_threshold: - runtimeValue: - constant: 0.0 - input_manifest_path: - runtimeValue: - constant: /foo/bar/testpipeline/testpipeline-20230101000000/first_component/manifest.json - metadata: - runtimeValue: - constant: '{"base_path": "/foo/bar", "pipeline_name": "testpipeline", - "run_id": "testpipeline-20230101000000", "component_id": "image_cropping", - "cache_key": "2"}' - output_manifest_path: - runtimeValue: - constant: /foo/bar/testpipeline/testpipeline-20230101000000/image_cropping/manifest.json - padding: - runtimeValue: - constant: 0.0 - taskInfo: - name: image-cropping -schemaVersion: 2.1.0 -sdkVersion: kfp-2.3.0 diff --git a/tests/example_pipelines/compiled_pipeline/example_2/vertex_pipeline.yml b/tests/example_pipelines/compiled_pipeline/example_2/vertex_pipeline.yml deleted file mode 100644 index 989003a61..000000000 --- a/tests/example_pipelines/compiled_pipeline/example_2/vertex_pipeline.yml +++ /dev/null @@ -1,245 +0,0 @@ -components: - comp-first-component: - executorLabel: exec-first-component - inputDefinitions: - parameters: - cache: - defaultValue: true - isOptional: true - parameterType: BOOLEAN - client_kwargs: - defaultValue: {} - isOptional: true - parameterType: STRUCT - cluster_type: - defaultValue: default - isOptional: true - parameterType: STRING - component_spec: - parameterType: STRUCT - input_manifest_path: - isOptional: true - parameterType: STRING - input_partition_rows: - isOptional: true - parameterType: NUMBER_INTEGER - metadata: - parameterType: STRING - output_manifest_path: - parameterType: STRING - storage_args: - parameterType: STRING - comp-image-cropping: - executorLabel: exec-image-cropping - inputDefinitions: - parameters: - cache: - defaultValue: true - isOptional: true - parameterType: BOOLEAN - client_kwargs: - defaultValue: {} - isOptional: true - parameterType: STRUCT - cluster_type: - defaultValue: default - isOptional: true - parameterType: STRING - component_spec: - parameterType: STRUCT - cropping_threshold: - defaultValue: -30.0 - isOptional: true - parameterType: NUMBER_INTEGER - input_manifest_path: - isOptional: true - parameterType: STRING - input_partition_rows: - isOptional: true - parameterType: NUMBER_INTEGER - metadata: - parameterType: STRING - output_manifest_path: - parameterType: STRING - padding: - defaultValue: 10.0 - isOptional: true - parameterType: NUMBER_INTEGER -deploymentSpec: - executors: - exec-first-component: - container: - args: - - --storage_args - - '{{$.inputs.parameters[''storage_args'']}}' - - --cache - - '{{$.inputs.parameters[''cache'']}}' - - --cluster_type - - '{{$.inputs.parameters[''cluster_type'']}}' - - --component_spec - - '{{$.inputs.parameters[''component_spec'']}}' - - --output_manifest_path - - '{{$.inputs.parameters[''output_manifest_path'']}}' - - --metadata - - '{{$.inputs.parameters[''metadata'']}}' - command: - - fondant - - execute - - main - image: example_component:latest - exec-image-cropping: - container: - args: - - --cropping_threshold - - '{{$.inputs.parameters[''cropping_threshold'']}}' - - --padding - - '{{$.inputs.parameters[''padding'']}}' - - --cache - - '{{$.inputs.parameters[''cache'']}}' - - --cluster_type - - '{{$.inputs.parameters[''cluster_type'']}}' - - --component_spec - - '{{$.inputs.parameters[''component_spec'']}}' - - --output_manifest_path - - '{{$.inputs.parameters[''output_manifest_path'']}}' - - --metadata - - '{{$.inputs.parameters[''metadata'']}}' - - --input_manifest_path - - '{{$.inputs.parameters[''input_manifest_path'']}}' - command: - - fondant - - execute - - main - image: fndnt/image_cropping:dev -pipelineInfo: - description: description of the test pipeline - name: testpipeline -root: - dag: - tasks: - first-component: - cachingOptions: {} - componentRef: - name: comp-first-component - inputs: - parameters: - cache: - runtimeValue: - constant: false - cluster_type: - runtimeValue: - constant: default - component_spec: - runtimeValue: - constant: - args: - storage_args: - description: Storage arguments - type: str - description: This is an example component - image: example_component:latest - name: First component - produces: - captions: - fields: - data: - type: string - images: - fields: - data: - type: binary - metadata: - runtimeValue: - constant: '{"base_path": "/foo/bar", "pipeline_name": "testpipeline", - "run_id": "testpipeline-20230101000000", "component_id": "first_component", - "cache_key": "1"}' - output_manifest_path: - runtimeValue: - constant: /foo/bar/testpipeline/testpipeline-20230101000000/first_component/manifest.json - storage_args: - runtimeValue: - constant: a dummy string arg - taskInfo: - name: first-component - image-cropping: - cachingOptions: {} - componentRef: - name: comp-image-cropping - dependentTasks: - - first-component - inputs: - parameters: - cache: - runtimeValue: - constant: true - cluster_type: - runtimeValue: - constant: default - component_spec: - runtimeValue: - constant: - args: - cropping_threshold: - default: -30.0 - description: Threshold parameter used for detecting borders. - A lower (negative) parameter results in a more performant - border detection, but can cause overcropping. Default is -30 - type: int - padding: - default: 10.0 - description: Padding for the image cropping. The padding is - added to all borders of the image. - type: int - consumes: - images: - fields: - data: - type: binary - description: "This component crops out image borders. This is typically\ - \ useful when working with graphical \nimages that have single-color\ - \ borders (e.g. logos, icons, etc.).\n\nThe component takes an\ - \ image and calculates which color is most present in the border.\ - \ It then \ncrops the image in order to minimize this single-color\ - \ border. The `padding` argument will add \nextra border to the\ - \ image before cropping it, in order to avoid cutting off parts\ - \ of the image.\nThe resulting crop will always be square. If\ - \ a crop is not possible, the component will return \nthe original\ - \ image.\n\n#### Examples\nExamples of image cropping by removing\ - \ the single-color border. Left side is original image, \nright\ - \ side is border-cropped image.\n\n![Example of image cropping\ - \ by removing the single-color border. Left side is original,\ - \ right side is cropped image](../../docs/art/components/image_cropping/component_border_crop_1.png)\n\ - ![Example of image cropping by removing the single-color border.\ - \ Left side is original, right side is cropped image](../../docs/art/components/image_cropping/component_border_crop_0.png)\n" - image: fndnt/image_cropping:dev - name: Image cropping - produces: - images: - fields: - data: - type: binary - height: - type: int32 - width: - type: int32 - cropping_threshold: - runtimeValue: - constant: 0.0 - input_manifest_path: - runtimeValue: - constant: /foo/bar/testpipeline/testpipeline-20230101000000/first_component/manifest.json - metadata: - runtimeValue: - constant: '{"base_path": "/foo/bar", "pipeline_name": "testpipeline", - "run_id": "testpipeline-20230101000000", "component_id": "image_cropping", - "cache_key": "2"}' - output_manifest_path: - runtimeValue: - constant: /foo/bar/testpipeline/testpipeline-20230101000000/image_cropping/manifest.json - padding: - runtimeValue: - constant: 0.0 - taskInfo: - name: image-cropping -schemaVersion: 2.1.0 -sdkVersion: kfp-2.3.0 diff --git a/tests/test_compiler.py b/tests/test_compiler.py index abf0e7ead..e99206557 100644 --- a/tests/test_compiler.py +++ b/tests/test_compiler.py @@ -5,10 +5,18 @@ from unittest import mock import pytest -import yaml from fondant.core.exceptions import InvalidPipelineDefinition from fondant.pipeline import ComponentOp, Pipeline, Resources -from fondant.pipeline.compiler import DockerCompiler, KubeFlowCompiler, VertexCompiler +from fondant.pipeline.compiler import ( + DockerCompiler, + KubeFlowCompiler, + VertexCompiler, +) +from fondant.testing import ( + DockerPipelineConfigs, + KubeflowPipelineConfigs, + VertexPipelineConfigs, +) COMPONENTS_PATH = Path("./tests/example_pipelines/valid_pipeline") @@ -123,10 +131,33 @@ def test_docker_compiler(setup_pipeline, tmp_path_factory): with tmp_path_factory.mktemp("temp") as fn: output_path = str(fn / "docker-compose.yml") compiler.compile(pipeline=pipeline, output_path=output_path, build_args=[]) - with open(output_path) as src, open( - VALID_PIPELINE / example_dir / "docker-compose.yml", - ) as truth: - assert yaml.safe_load(src) == yaml.safe_load(truth) + pipeline_configs = DockerPipelineConfigs.from_spec(output_path) + assert pipeline_configs.pipeline_name == pipeline.name + assert pipeline_configs.pipeline_description == pipeline.description + for ( + component_name, + component_configs, + ) in pipeline_configs.component_configs.items(): + # Get exepcted component configs + component = pipeline._graph[component_name] + component_op = component["fondant_component_op"] + + # Check that the component configs are correct + assert component_configs.dependencies == component["dependencies"] + assert component_configs.memory_limit is None + assert component_configs.memory_request is None + assert component_configs.cpu_limit is None + assert component_configs.cpu_request is None + if component_configs.accelerators: + assert ( + component_configs.accelerators.number_of_accelerators + == component_op.accelerators.number_of_accelerators + ) + if component_op.input_partition_rows is not None: + assert ( + int(component_configs.arguments["input_partition_rows"]) + == component_op.input_partition_rows + ) @pytest.mark.usefixtures("_freeze_time") @@ -139,18 +170,18 @@ def test_docker_local_path(setup_pipeline, tmp_path_factory): work_dir = f"/{fn.stem}" pipeline.base_path = str(fn) compiler = DockerCompiler() - compiler.compile(pipeline=pipeline, output_path=fn / "docker-compose.yml") - - # read the generated docker-compose file - with open(fn / "docker-compose.yml") as f_spec: - spec = yaml.safe_load(f_spec) - + output_path = str(fn / "docker-compose.yml") + compiler.compile(pipeline=pipeline, output_path=output_path) + pipeline_configs = DockerPipelineConfigs.from_spec(output_path) expected_run_id = "testpipeline-20230101000000" - for name, service in spec["services"].items(): + for ( + component_name, + component_configs, + ) in pipeline_configs.component_configs.items(): # check if volumes are defined correctly - cache_key = cache_dict[name] - assert service["volumes"] == [ + cache_key = cache_dict[component_name] + assert component_configs.volumes == [ { "source": str(fn), "target": work_dir, @@ -159,14 +190,21 @@ def test_docker_local_path(setup_pipeline, tmp_path_factory): ] cleaned_pipeline_name = pipeline.name.replace("_", "") # check if commands are patched to use the working dir - commands_with_dir = [ - f"{work_dir}/{cleaned_pipeline_name}/{expected_run_id}/{name}/manifest.json", - f'{{"base_path": "{work_dir}", "pipeline_name": "{cleaned_pipeline_name}",' - f' "run_id": "{expected_run_id}", "component_id": "{name}",' - f' "cache_key": "{cache_key}"}}', - ] - for command in commands_with_dir: - assert command in service["command"] + expected_output_manifest_path = ( + f"{work_dir}/{cleaned_pipeline_name}/{expected_run_id}" + f"/{component_name}/manifest.json" + ) + expected_metadata = ( + f'{{"base_path": "{work_dir}", "pipeline_name": ' + f'"{cleaned_pipeline_name}", "run_id": "{expected_run_id}", ' + f'"component_id": "{component_name}", "cache_key": "{cache_key}"}}' + ) + + assert ( + component_configs.arguments["output_manifest_path"] + == expected_output_manifest_path + ) + assert component_configs.arguments["metadata"] == expected_metadata @pytest.mark.usefixtures("_freeze_time") @@ -177,27 +215,36 @@ def test_docker_remote_path(setup_pipeline, tmp_path_factory): pipeline.base_path = remote_dir compiler = DockerCompiler() with tmp_path_factory.mktemp("temp") as fn: - compiler.compile(pipeline=pipeline, output_path=fn / "docker-compose.yml") - - # read the generated docker-compose file - with open(fn / "docker-compose.yml") as f_spec: - spec = yaml.safe_load(f_spec) - + output_path = str(fn / "docker-compose.yml") + compiler.compile(pipeline=pipeline, output_path=output_path) + pipeline_configs = DockerPipelineConfigs.from_spec(output_path) expected_run_id = "testpipeline-20230101000000" - for name, service in spec["services"].items(): - cache_key = cache_dict[name] + for ( + component_name, + component_configs, + ) in pipeline_configs.component_configs.items(): + cache_key = cache_dict[component_name] # check that no volumes are created - assert service["volumes"] == [] + assert component_configs.volumes == [] # check if commands are patched to use the remote dir cleaned_pipeline_name = pipeline.name.replace("_", "") - commands_with_dir = [ - f"{remote_dir}/{cleaned_pipeline_name}/{expected_run_id}/{name}/manifest.json", - f'{{"base_path": "{remote_dir}", "pipeline_name": "{cleaned_pipeline_name}",' - f' "run_id": "{expected_run_id}", "component_id": "{name}",' - f' "cache_key": "{cache_key}"}}', - ] - for command in commands_with_dir: - assert command in service["command"] + + expected_output_manifest_path = ( + f"{remote_dir}/{cleaned_pipeline_name}/{expected_run_id}" + f"/{component_name}/manifest.json" + ) + + expected_metadata = ( + f'{{"base_path": "{remote_dir}", "pipeline_name": ' + f'"{cleaned_pipeline_name}", "run_id": "{expected_run_id}", ' + f'"component_id": "{component_name}", "cache_key": "{cache_key}"}}' + ) + + assert ( + component_configs.arguments["output_manifest_path"] + == expected_output_manifest_path + ) + assert component_configs.arguments["metadata"] == expected_metadata @pytest.mark.usefixtures("_freeze_time") @@ -210,18 +257,18 @@ def test_docker_extra_volumes(setup_pipeline, tmp_path_factory): compiler = DockerCompiler() # define some extra volumes to be mounted extra_volumes = ["hello:there", "general:kenobi"] + output_path = str(fn / "docker-compose.yml") + compiler.compile( pipeline=pipeline, - output_path=fn / "docker-compose.yml", + output_path=output_path, extra_volumes=extra_volumes, ) - # read the generated docker-compose file - with open(fn / "docker-compose.yml") as f_spec: - spec = yaml.safe_load(f_spec) - for _name, service in spec["services"].items(): + pipeline_configs = DockerPipelineConfigs.from_spec(output_path) + for _, service in pipeline_configs.component_configs.items(): assert all( - extra_volume in service["volumes"] for extra_volume in extra_volumes + extra_volume in service.volumes for extra_volume in extra_volumes ) @@ -242,35 +289,20 @@ def test_docker_configuration(tmp_path_factory): ), ) - expected_resources = { - "reservations": { - "devices": [ - { - "capabilities": ["gpu"], - "count": 1, - "driver": "nvidia", - }, - ], - }, - } - pipeline.add_op(component_1) compiler = DockerCompiler() with tmp_path_factory.mktemp("temp") as fn: output_path = str(fn / "docker-compose.yaml") compiler.compile(pipeline=pipeline, output_path=output_path) - # read the generated docker-compose file - with open(output_path) as f_spec: - spec = yaml.safe_load(f_spec) - assert ( - spec["services"]["first_component"]["deploy"]["resources"] - == expected_resources - ) + pipeline_configs = DockerPipelineConfigs.from_spec(output_path) + component_config = pipeline_configs.component_configs["first-component"] + assert component_config.accelerators[0].type == "gpu" + assert component_config.accelerators[0].number == 1 @pytest.mark.usefixtures("_freeze_time") def test_invalid_docker_configuration(tmp_path_factory): - """Test that extra volumes are applied correctly.""" + """Test that a valid error is returned when an unknown accelerator is set.""" pipeline = Pipeline( pipeline_name="test_pipeline", pipeline_description="description of the test pipeline", @@ -299,10 +331,33 @@ def test_kubeflow_compiler(setup_pipeline, tmp_path_factory): with tmp_path_factory.mktemp("temp") as fn: output_path = str(fn / "kubeflow_pipeline.yml") compiler.compile(pipeline=pipeline, output_path=output_path) - with open(output_path) as src, open( - VALID_PIPELINE / example_dir / "kubeflow_pipeline.yml", - ) as truth: - assert yaml.safe_load(src) == yaml.safe_load(truth) + pipeline_configs = KubeflowPipelineConfigs.from_spec(output_path) + assert pipeline_configs.pipeline_name == pipeline.name + assert pipeline_configs.pipeline_description == pipeline.description + for ( + component_name, + component_configs, + ) in pipeline_configs.component_configs.items(): + # Get exepcted component configs + component = pipeline._graph[component_name] + component_op = component["fondant_component_op"] + + # Check that the component configs are correct + assert component_configs.dependencies == component["dependencies"] + assert component_configs.memory_limit is None + assert component_configs.memory_request is None + assert component_configs.cpu_limit is None + assert component_configs.cpu_request is None + if component_configs.accelerators: + assert ( + component_configs.accelerators.number_of_accelerators + == component_op.accelerators.number_of_accelerators + ) + if component_op.input_partition_rows is not None: + assert ( + int(component_configs.arguments["input_partition_rows"]) + == component_op.input_partition_rows + ) @pytest.mark.usefixtures("_freeze_time") @@ -331,26 +386,13 @@ def test_kubeflow_configuration(tmp_path_factory): with tmp_path_factory.mktemp("temp") as fn: output_path = str(fn / "kubeflow_pipeline.yml") compiler.compile(pipeline=pipeline, output_path=output_path) - with open(output_path) as src: - # Two specs are present and loaded in the yaml file (component spec and k8s specs) - compiled_specs = yaml.load_all(src, Loader=yaml.FullLoader) - for spec in compiled_specs: - if "platforms" in spec: - component_kubernetes_spec = spec["platforms"]["kubernetes"][ - "deploymentSpec" - ]["executors"]["exec-first-component"] - assert component_kubernetes_spec["nodeSelector"]["labels"] == { - node_pool_label: node_pool_name, - } - - elif "deploymentSpec" in spec: - component_resources = spec["deploymentSpec"]["executors"][ - "exec-first-component" - ]["container"]["resources"] - assert component_resources["accelerator"]["count"] == "1" - assert ( - component_resources["accelerator"]["type"] == "nvidia.com/gpu" - ) + pipeline_configs = KubeflowPipelineConfigs.from_spec(output_path) + component_configs = pipeline_configs.component_configs["first-component"] + for accelerator in component_configs.accelerators: + assert accelerator.type == "nvidia.com/gpu" + assert accelerator.number == 1 + assert component_configs.node_pool_label == node_pool_label + assert component_configs.node_pool_name == node_pool_name @pytest.mark.usefixtures("_freeze_time") @@ -391,12 +433,35 @@ def test_vertex_compiler(setup_pipeline, tmp_path_factory): example_dir, pipeline, _ = setup_pipeline compiler = VertexCompiler() with tmp_path_factory.mktemp("temp") as fn: - output_path = str(fn / "vertex_pipeline.json") + output_path = str(fn / "kubeflow_pipeline.yml") compiler.compile(pipeline=pipeline, output_path=output_path) - with open(output_path) as src, open( - VALID_PIPELINE / example_dir / "vertex_pipeline.yml", - ) as truth: - assert yaml.safe_load(src) == yaml.safe_load(truth) + pipeline_configs = VertexPipelineConfigs.from_spec(output_path) + assert pipeline_configs.pipeline_name == pipeline.name + assert pipeline_configs.pipeline_description == pipeline.description + for ( + component_name, + component_configs, + ) in pipeline_configs.component_configs.items(): + # Get exepcted component configs + component = pipeline._graph[component_name] + component_op = component["fondant_component_op"] + + # Check that the component configs are correct + assert component_configs.dependencies == component["dependencies"] + assert component_configs.memory_limit is None + assert component_configs.memory_request is None + assert component_configs.cpu_limit is None + assert component_configs.cpu_request is None + if component_configs.accelerators: + assert ( + component_configs.accelerators.number_of_accelerators + == component_op.accelerators.number_of_accelerators + ) + if component_op.input_partition_rows is not None: + assert ( + int(component_configs.arguments["input_partition_rows"]) + == component_op.input_partition_rows + ) @pytest.mark.usefixtures("_freeze_time") @@ -418,17 +483,13 @@ def test_vertex_configuration(tmp_path_factory): pipeline.add_op(component_1) compiler = VertexCompiler() with tmp_path_factory.mktemp("temp") as fn: - output_path = str(fn / "vertex_pipeline.yml") + output_path = str(fn / "kubeflow_pipeline.yml") compiler.compile(pipeline=pipeline, output_path=output_path) - with open(output_path) as src: - # Two specs are present and loaded in the yaml file (component spec and k8s specs) - compiled_specs = yaml.safe_load(src) - - component_resources = compiled_specs["deploymentSpec"]["executors"][ - "exec-first-component" - ]["container"]["resources"] - assert component_resources["accelerator"]["count"] == "1" - assert component_resources["accelerator"]["type"] == "NVIDIA_TESLA_K80" + pipeline_configs = VertexPipelineConfigs.from_spec(output_path) + component_configs = pipeline_configs.component_configs["first-component"] + for accelerator in component_configs.accelerators: + assert accelerator.type == "NVIDIA_TESLA_K80" + assert accelerator.number == "1" @pytest.mark.usefixtures("_freeze_time") @@ -484,13 +545,13 @@ def test_caching_dependency_docker(tmp_path_factory): with tmp_path_factory.mktemp("temp") as fn: output_path = str(fn / "docker-compose.yml") compiler.compile(pipeline=pipeline, output_path=output_path, build_args=[]) - with open(output_path) as src: - spec = yaml.safe_load(src) - command = spec["services"]["second_component"]["command"] - cache_key = json.loads(command[command.index("--metadata") + 1])[ - "cache_key" - ] - + pipeline_configs = DockerPipelineConfigs.from_spec(output_path) + metadata = json.loads( + pipeline_configs.component_configs["second-component"].arguments[ + "metadata" + ], + ) + cache_key = metadata["cache_key"] second_component_cache_key_dict[arg] = cache_key assert ( @@ -529,14 +590,16 @@ def test_caching_dependency_kfp(tmp_path_factory): with tmp_path_factory.mktemp("temp") as fn: output_path = str(fn / "kubeflow_pipeline.yml") compiler.compile(pipeline=pipeline, output_path=output_path) - with open(output_path) as src: - spec = yaml.safe_load(src) - params = spec["root"]["dag"]["tasks"]["second-component"]["inputs"][ - "parameters" - ] - metadata = params["metadata"]["runtimeValue"]["constant"] - cache_key = json.loads(metadata)["cache_key"] + pipeline_configs = KubeflowPipelineConfigs.from_spec(output_path) + + metadata = json.loads( + pipeline_configs.component_configs["second-component"].arguments[ + "metadata" + ], + ) + cache_key = metadata["cache_key"] second_component_cache_key_dict[arg] = cache_key + second_component_cache_key_dict[arg] = cache_key assert ( second_component_cache_key_dict[arg_list[0]] diff --git a/tests/test_pipeline.py b/tests/test_pipeline.py index 37d421ef6..dfc941034 100644 --- a/tests/test_pipeline.py +++ b/tests/test_pipeline.py @@ -175,13 +175,13 @@ def test_valid_pipeline( pipeline.sort_graph() assert list(pipeline._graph.keys()) == [ - "first_component", - "second_component", - "third_component", + "first-component", + "second-component", + "third-component", ] - assert pipeline._graph["first_component"]["dependencies"] == [] - assert pipeline._graph["second_component"]["dependencies"] == ["first_component"] - assert pipeline._graph["third_component"]["dependencies"] == ["second_component"] + assert pipeline._graph["first-component"]["dependencies"] == [] + assert pipeline._graph["second-component"]["dependencies"] == ["first-component"] + assert pipeline._graph["third-component"]["dependencies"] == ["second-component"] pipeline._validate_pipeline_definition("test_pipeline")