From 8f19071df9898b50878bbd6121fe91ba3157568c Mon Sep 17 00:00:00 2001 From: Philippe Moussalli Date: Mon, 21 Aug 2023 15:02:40 +0200 Subject: [PATCH 1/6] Create seperate class for metadata --- src/fondant/cli.py | 14 +- src/fondant/compiler.py | 113 +++--- src/fondant/executor.py | 9 +- src/fondant/manifest.py | 44 ++- src/fondant/pipeline.py | 17 +- .../example_1/docker-compose.yml | 16 +- .../example_1/kubeflow_pipeline.yml | 329 ++++++++++-------- .../example_2/docker-compose.yml | 6 +- .../example_2/kubeflow_pipeline.yml | 224 +++++++----- .../compiled_pipeline/kubeflow_pipeline.yml | 116 +++--- .../fourth_component/fondant_component.yaml | 2 +- tests/test_compiler.py | 41 ++- tests/test_component.py | 33 +- tests/test_manifest.py | 15 +- tests/test_pipeline.py | 12 +- 15 files changed, 576 insertions(+), 415 deletions(-) diff --git a/src/fondant/cli.py b/src/fondant/cli.py index 1afa56f6a..dfc6e8731 100644 --- a/src/fondant/cli.py +++ b/src/fondant/cli.py @@ -225,16 +225,15 @@ def register_compile(parent_parser): def compile(args): if args.local: - compiler = DockerCompiler() + compiler = DockerCompiler(args.pipeline) compiler.compile( - pipeline=args.pipeline, extra_volumes=args.extra_volumes, output_path=args.output_path, build_args=args.build_arg, ) elif args.kubeflow: - compiler = KubeFlowCompiler() - compiler.compile(pipeline=args.pipeline, output_path=args.output_path) + compiler = KubeFlowCompiler(args.pipeline) + compiler.compile(output_path=args.output_path) def register_run(parent_parser): @@ -298,9 +297,8 @@ def run(args): logging.info( "Found reference to un-compiled pipeline... compiling to {spec_ref}", ) - compiler = DockerCompiler() + compiler = DockerCompiler(pipeline) compiler.compile( - pipeline=pipeline, extra_volumes=args.extra_volumes, output_path=spec_ref, build_args=args.build_arg, @@ -320,8 +318,8 @@ def run(args): logging.info( "Found reference to un-compiled pipeline... compiling to {spec_ref}", ) - compiler = KubeFlowCompiler() - compiler.compile(pipeline=pipeline, output_path=spec_ref) + compiler = KubeFlowCompiler(pipeline) + compiler.compile(output_path=spec_ref) finally: runner = KubeflowRunner(host=args.host) runner.run(input_spec=spec_ref) diff --git a/src/fondant/compiler.py b/src/fondant/compiler.py index 52924a056..a5859ab21 100644 --- a/src/fondant/compiler.py +++ b/src/fondant/compiler.py @@ -2,23 +2,30 @@ import json import logging import typing as t -from abc import ABC, abstractmethod +from abc import ABC from dataclasses import asdict, dataclass from pathlib import Path import yaml +from fondant.manifest import Metadata from fondant.pipeline import Pipeline logger = logging.getLogger(__name__) class Compiler(ABC): - """Abstract base class for a compiler.""" + """Abstract base class for a pipeline compiler.""" - @abstractmethod - def compile(self, *args, **kwargs) -> None: - """Abstract method to invoke compilation.""" + def __init__( + self, + pipeline: Pipeline, + ): + self.pipeline = pipeline + + def get_run_id(self) -> str: + timestamp = datetime.datetime.now().strftime("%Y%m%d%H%M%S") + return f"{self.pipeline.name}-{timestamp}" @dataclass @@ -37,26 +44,11 @@ class DockerVolume: target: str -@dataclass -class MetaData: - """Dataclass representing the metadata arguments of a pipeline. - - Args: - run_id: identifier of the current pipeline run - base_path: the base path used to store the artifacts. - """ - - run_id: str - base_path: str - - class DockerCompiler(Compiler): """Compiler that creates a docker-compose spec from a pipeline.""" def compile( self, - pipeline: Pipeline, - *, output_path: str = "docker-compose.yml", extra_volumes: t.Optional[list] = None, build_args: t.Optional[t.List[str]] = None, @@ -64,7 +56,6 @@ def compile( """Compile a pipeline to docker-compose spec and save it to a specified output path. Args: - pipeline: the pipeline to compile output_path: the path where to save the docker-compose spec extra_volumes: a list of extra volumes (using the Short syntax: https://docs.docker.com/compose/compose-file/05-services/#short-syntax-5) @@ -74,9 +65,8 @@ def compile( if extra_volumes is None: extra_volumes = [] - logger.info(f"Compiling {pipeline.name} to {output_path}") + logger.info(f"Compiling {self.pipeline.name} to {output_path}") spec = self._generate_spec( - pipeline, extra_volumes=extra_volumes, build_args=build_args or [], ) @@ -90,13 +80,6 @@ def ignore_aliases(self, data): logger.info(f"Successfully compiled to {output_path}") - @staticmethod - def _safe_component_name(component_name: str) -> str: - """Transform a component name to a docker-compose friendly one. - eg: `Component A` -> `component_a`. - """ - return component_name.replace(" ", "_").lower() - @staticmethod def _patch_path(base_path: str) -> t.Tuple[str, t.Optional[DockerVolume]]: """Helper that checks if the base_path is local or remote, @@ -124,37 +107,40 @@ def _patch_path(base_path: str) -> t.Tuple[str, t.Optional[DockerVolume]]: def _generate_spec( self, - pipeline: Pipeline, - *, extra_volumes: t.List[str], build_args: t.List[str], ) -> dict: """Generate a docker-compose spec as a python dictionary, loops over the pipeline graph to create services and their dependencies. """ - timestamp = datetime.datetime.now().strftime("%Y%m%d%H%M%S") - path, volume = self._patch_path(base_path=pipeline.base_path) - run_id = f"{pipeline.name}-{timestamp}" - metadata = MetaData(run_id=run_id, base_path=path) + datetime.datetime.now().strftime("%Y%m%d%H%M%S") + path, volume = self._patch_path(base_path=self.pipeline.base_path) + run_id = self.get_run_id() services = {} - pipeline.validate(run_id=run_id) + self.pipeline.validate(run_id=run_id) + + for component_name, component in self.pipeline._graph.items(): + metadata = Metadata( + pipeline_name=self.pipeline.name, + run_id=run_id, + base_path=path, + component_id=component_name, + ) - for component_name, component in pipeline._graph.items(): logger.info(f"Compiling service for {component_name}") - safe_component_name = self._safe_component_name(component_name) component_op = component["fondant_component_op"] # add metadata argument to command - command = ["--metadata", json.dumps(asdict(metadata))] + command = ["--metadata", metadata.to_json()] # add in and out manifest paths to command command.extend( [ "--output_manifest_path", - f"{path}/{safe_component_name}/manifest.json", + f"{path}/{component_name}/manifest.json", ], ) @@ -169,15 +155,14 @@ def _generate_spec( depends_on = {} if component["dependencies"]: for dependency in component["dependencies"]: - safe_dependency = self._safe_component_name(dependency) - depends_on[safe_dependency] = { + depends_on[dependency] = { "condition": "service_completed_successfully", } # there is only an input manifest if the component has dependencies command.extend( [ "--input_manifest_path", - f"{path}/{safe_dependency}/manifest.json", + f"{path}/{dependency}/manifest.json", ], ) @@ -187,7 +172,7 @@ def _generate_spec( if extra_volumes: volumes.extend(extra_volumes) - services[safe_component_name] = { + services[component_name] = { "command": command, "depends_on": depends_on, "volumes": volumes, @@ -197,16 +182,14 @@ def _generate_spec( logger.info( f"Found Dockerfile for {component_name}, adding build step.", ) - services[safe_component_name]["build"] = { + services[component_name]["build"] = { "context": str(component_op.component_dir), "args": build_args, } else: - services[safe_component_name][ - "image" - ] = component_op.component_spec.image + services[component_name]["image"] = component_op.component_spec.image return { - "name": pipeline.name, + "name": self.pipeline.name, "version": "3.8", "services": services, } @@ -215,7 +198,8 @@ def _generate_spec( class KubeFlowCompiler(Compiler): """Compiler that creates a Kubeflow pipeline spec from a pipeline.""" - def __init__(self): + def __init__(self, pipeline: Pipeline): + super().__init__(pipeline) self._resolve_imports() def _resolve_imports(self): @@ -233,21 +217,31 @@ def _resolve_imports(self): def compile( self, - pipeline: Pipeline, output_path: str = "kubeflow_pipeline.yml", ) -> None: """Compile a pipeline to Kubeflow pipeline spec and save it to a specified output path. Args: - pipeline: the pipeline to compile output_path: the path where to save the Kubeflow pipeline spec """ + run_id = self.get_run_id() - @self.kfp.dsl.pipeline(name=pipeline.name, description=pipeline.description) + @self.kfp.dsl.pipeline( + name=self.pipeline.name, + description=self.pipeline.description, + ) def kfp_pipeline(): previous_component_task = None manifest_path = "" + for component_name, component in self.pipeline._graph.items(): + metadata = Metadata( + pipeline_name=self.pipeline.name, + run_id=run_id, + base_path=self.pipeline.base_path, + component_id=component_name, + ) + logger.info(f"Compiling service for {component_name}") component_op = component["fondant_component_op"] @@ -259,16 +253,10 @@ def kfp_pipeline(): # Execute the Kubeflow component and pass in the output manifest path from # the previous component. component_args = component_op.arguments - metadata = json.dumps( - { - "base_path": self.pipeline.base_path, - "run_id": "{{workflow.name}}", - }, - ) component_task = kubeflow_component_op( input_manifest_path=manifest_path, - metadata=metadata, + metadata=metadata.to_json(), **component_args, ) # Set optional configurations @@ -286,8 +274,7 @@ def kfp_pipeline(): previous_component_task = component_task - self.pipeline = pipeline - self.pipeline.validate(run_id="{{workflow.name}}") + self.pipeline.validate(run_id=run_id) logger.info(f"Compiling {self.pipeline.name} to {output_path}") self.kfp.compiler.Compiler().compile(kfp_pipeline, output_path) # type: ignore diff --git a/src/fondant/executor.py b/src/fondant/executor.py index a434f0c85..e265a83f8 100644 --- a/src/fondant/executor.py +++ b/src/fondant/executor.py @@ -24,7 +24,7 @@ ) from fondant.component_spec import Argument, ComponentSpec, kubeflow2python_type from fondant.data_io import DaskDataLoader, DaskDataWriter -from fondant.manifest import Manifest +from fondant.manifest import Manifest, Metadata from fondant.schema import validate_partition_number logger = logging.getLogger(__name__) @@ -46,7 +46,7 @@ def __init__( self.spec = spec self.input_manifest_path = input_manifest_path self.output_manifest_path = output_manifest_path - self.metadata = metadata + self.metadata = Metadata.from_dict(metadata) self.user_arguments = user_arguments self.input_partition_rows = input_partition_rows @@ -238,8 +238,9 @@ def optional_fondant_arguments() -> t.List[str]: def _load_or_create_manifest(self) -> Manifest: component_id = self.spec.name.lower().replace(" ", "_") return Manifest.create( - base_path=self.metadata["base_path"], - run_id=self.metadata["run_id"], + pipeline_name=self.metadata.pipeline_name, + base_path=self.metadata.base_path, + run_id=self.metadata.run_id, component_id=component_id, ) diff --git a/src/fondant/manifest.py b/src/fondant/manifest.py index 15d3cb584..fe9e62255 100644 --- a/src/fondant/manifest.py +++ b/src/fondant/manifest.py @@ -4,6 +4,7 @@ import pkgutil import types import typing as t +from dataclasses import asdict, dataclass from pathlib import Path import jsonschema.exceptions @@ -70,6 +71,26 @@ def fields(self) -> t.Dict[str, Field]: } +@dataclass +class Metadata: + """Class representing the Metadata of the manifest.""" + + base_path: str + pipeline_name: str + run_id: str + component_id: str + + def to_dict(self): + return asdict(self) + + def to_json(self): + return json.dumps(self.to_dict()) + + @classmethod + def from_dict(cls, data_dict): + return cls(**data_dict) + + class Manifest: """ Class representing a Fondant manifest. @@ -112,20 +133,31 @@ def retrieve_from_filesystem(uri: str) -> Resource: raise InvalidManifest.create_from(e) @classmethod - def create(cls, *, base_path: str, run_id: str, component_id: str) -> "Manifest": + def create( + cls, + *, + pipeline_name: str, + base_path: str, + run_id: str, + component_id: str, + ) -> "Manifest": """Create an empty manifest. Args: + pipeline_name: the bane of the pipeline base_path: The base path of the manifest run_id: The id of the current pipeline run component_id: The id of the current component being executed """ + metadata = Metadata( + pipeline_name=pipeline_name, + base_path=base_path, + run_id=run_id, + component_id=component_id, + ) + specification = { - "metadata": { - "base_path": base_path, - "run_id": run_id, - "component_id": component_id, - }, + "metadata": metadata.to_dict(), "index": {"location": f"/index/{run_id}/{component_id}"}, "subsets": {}, } diff --git a/src/fondant/pipeline.py b/src/fondant/pipeline.py index f094188e4..a4fd1798b 100644 --- a/src/fondant/pipeline.py +++ b/src/fondant/pipeline.py @@ -55,12 +55,13 @@ def __init__( node_pool_name: t.Optional[str] = None, ) -> None: self.component_dir = Path(component_dir) - self.input_partition_rows = input_partition_rows - self.arguments = self._set_arguments(arguments) - self.component_spec = ComponentSpec.from_file( self.component_dir / self.COMPONENT_SPEC_NAME, ) + self.name = self.component_spec.name.replace(" ", "_").lower() + self.input_partition_rows = input_partition_rows + self.arguments = self._set_arguments(arguments) + self.arguments.setdefault("component_spec", self.component_spec.specification) self.number_of_gpus = number_of_gpus @@ -197,11 +198,9 @@ def add_op( msg, ) - dependencies_names = [ - dependency.component_spec.name for dependency in dependencies - ] + dependencies_names = [dependency.name for dependency in dependencies] - self._graph[task.component_spec.name] = { + self._graph[task.name] = { "fondant_component_op": task, "dependencies": dependencies_names, } @@ -242,7 +241,8 @@ def validate(self, run_id: str): """Sort and run validation on the pipeline definition. Args: - run_id (str, optional): run identifier. Defaults to None. + run_id: run identifier + """ self.sort_graph() self._validate_pipeline_definition(run_id) @@ -269,6 +269,7 @@ def _validate_pipeline_definition(self, run_id: str): # Create initial manifest manifest = Manifest.create( + pipeline_name=self.name, base_path=self.base_path, run_id=run_id, component_id=load_component_name, diff --git a/tests/example_pipelines/compiled_pipeline/example_1/docker-compose.yml b/tests/example_pipelines/compiled_pipeline/example_1/docker-compose.yml index b7d31f3b2..fc39ca80d 100644 --- a/tests/example_pipelines/compiled_pipeline/example_1/docker-compose.yml +++ b/tests/example_pipelines/compiled_pipeline/example_1/docker-compose.yml @@ -6,7 +6,8 @@ services: context: tests/example_pipelines/valid_pipeline/example_1/first_component command: - --metadata - - '{"run_id": "test_pipeline-20230101000000", "base_path": "/foo/bar"}' + - '{"base_path": "/foo/bar", "pipeline_name": "test_pipeline", "run_id": "test_pipeline-20230101000000", + "component_id": "first_component"}' - --output_manifest_path - /foo/bar/first_component/manifest.json - --storage_args @@ -26,7 +27,8 @@ services: context: tests/example_pipelines/valid_pipeline/example_1/second_component command: - --metadata - - '{"run_id": "test_pipeline-20230101000000", "base_path": "/foo/bar"}' + - '{"base_path": "/foo/bar", "pipeline_name": "test_pipeline", "run_id": "test_pipeline-20230101000000", + "component_id": "second_component"}' - --output_manifest_path - /foo/bar/second_component/manifest.json - --storage_args @@ -48,16 +50,15 @@ services: third_component: build: args: [] - context: tests/example_pipelines/valid_pipeline/example_1/fourth_component + context: tests/example_pipelines/valid_pipeline/example_1/third_component command: - --metadata - - '{"run_id": "test_pipeline-20230101000000", "base_path": "/foo/bar"}' + - '{"base_path": "/foo/bar", "pipeline_name": "test_pipeline", "run_id": "test_pipeline-20230101000000", + "component_id": "third_component"}' - --output_manifest_path - /foo/bar/third_component/manifest.json - --storage_args - a dummy string arg - - --some_list - - '[1, 2, 3]' - --input_partition_rows - None - --component_spec @@ -67,8 +68,7 @@ services: "embeddings": {"fields": {"data": {"type": "array", "items": {"type": "float32"}}}}}, "produces": {"images": {"fields": {"data": {"type": "binary"}}}, "additionalSubsets": false}, "args": {"storage_args": {"description": "Storage arguments", "type": - "str"}, "some_list": {"description": "Some list", "type": "list", "items": {"type": - "int"}}}}' + "str"}}}' - --input_manifest_path - /foo/bar/second_component/manifest.json depends_on: diff --git a/tests/example_pipelines/compiled_pipeline/example_1/kubeflow_pipeline.yml b/tests/example_pipelines/compiled_pipeline/example_1/kubeflow_pipeline.yml index acdb7ac69..5ddb02e38 100644 --- a/tests/example_pipelines/compiled_pipeline/example_1/kubeflow_pipeline.yml +++ b/tests/example_pipelines/compiled_pipeline/example_1/kubeflow_pipeline.yml @@ -1,47 +1,68 @@ apiVersion: argoproj.io/v1alpha1 kind: Workflow metadata: - generateName: test-pipeline- - annotations: {pipelines.kubeflow.org/kfp_sdk_version: 1.8.22, pipelines.kubeflow.org/pipeline_compilation_time: '2023-01-01T00:00:00', + annotations: + pipelines.kubeflow.org/kfp_sdk_version: 1.8.22 + pipelines.kubeflow.org/pipeline_compilation_time: '2023-01-01T00:00:00' pipelines.kubeflow.org/pipeline_spec: '{"description": "description of the test - pipeline", "name": "test_pipeline"}'} - labels: {pipelines.kubeflow.org/kfp_sdk_version: 1.8.22} + pipeline", "name": "test_pipeline"}' + generateName: test-pipeline- + labels: + pipelines.kubeflow.org/kfp_sdk_version: 1.8.22 spec: + arguments: + parameters: [] entrypoint: test-pipeline + serviceAccountName: pipeline-runner templates: - - name: first-component - container: + - container: args: [] - command: [python3, main.py, --input_manifest_path, /tmp/inputs/input_manifest_path/data, - --metadata, '{"base_path": "/foo/bar", "run_id": "{{workflow.name}}"}', --component_spec, - '{"args": {"storage_args": {"description": "Storage arguments", "type": "str"}}, - "description": "This is an example component", "image": "example_component:latest", - "name": "First component", "produces": {"captions": {"fields": {"data": - {"type": "string"}}}, "images": {"fields": {"data": {"type": "binary"}}}}}', - --input_partition_rows, disable, --storage_args, a dummy string arg, --output_manifest_path, - /tmp/outputs/output_manifest_path/data] + command: + - python3 + - main.py + - --input_manifest_path + - /tmp/inputs/input_manifest_path/data + - --metadata + - '{"base_path": "/foo/bar", "pipeline_name": "test_pipeline", "run_id": "test_pipeline-20230101000000", + "component_id": "first_component"}' + - --component_spec + - '{"args": {"storage_args": {"description": "Storage arguments", "type": "str"}}, + "description": "This is an example component", "image": "example_component:latest", + "name": "First component", "produces": {"captions": {"fields": {"data": {"type": + "string"}}}, "images": {"fields": {"data": {"type": "binary"}}}}}' + - --input_partition_rows + - disable + - --storage_args + - a dummy string arg + - --output_manifest_path + - /tmp/outputs/output_manifest_path/data image: example_component:latest inputs: artifacts: - name: input_manifest_path path: /tmp/inputs/input_manifest_path/data - raw: {data: ''} - outputs: - artifacts: - - {name: first-component-output_manifest_path, path: /tmp/outputs/output_manifest_path/data} + raw: + data: '' metadata: - labels: - pipelines.kubeflow.org/kfp_sdk_version: 1.8.22 - pipelines.kubeflow.org/pipeline-sdk-type: kfp - pipelines.kubeflow.org/enable_caching: "true" - annotations: {pipelines.kubeflow.org/component_spec: '{"description": "This - is an example component", "implementation": {"container": {"command": ["python3", - "main.py", "--input_manifest_path", {"inputPath": "input_manifest_path"}, - "--metadata", {"inputValue": "metadata"}, "--component_spec", {"inputValue": - "component_spec"}, "--input_partition_rows", {"inputValue": "input_partition_rows"}, - "--storage_args", {"inputValue": "storage_args"}, "--output_manifest_path", - {"outputPath": "output_manifest_path"}], "image": "example_component:latest"}}, - "inputs": [{"description": "Path to the input manifest", "name": "input_manifest_path", + annotations: + pipelines.kubeflow.org/arguments.parameters: '{"component_spec": "{\"args\": + {\"storage_args\": {\"description\": \"Storage arguments\", \"type\": \"str\"}}, + \"description\": \"This is an example component\", \"image\": \"example_component:latest\", + \"name\": \"First component\", \"produces\": {\"captions\": {\"fields\": + {\"data\": {\"type\": \"string\"}}}, \"images\": {\"fields\": {\"data\": + {\"type\": \"binary\"}}}}}", "input_partition_rows": "disable", "metadata": + "{\"base_path\": \"/foo/bar\", \"pipeline_name\": \"test_pipeline\", \"run_id\": + \"test_pipeline-20230101000000\", \"component_id\": \"first_component\"}", + "storage_args": "a dummy string arg"}' + pipelines.kubeflow.org/component_ref: '{"digest": "2a304ce49a15404ba50dfd8b56ec43fa8ac8c29f80579d1c8fb974d3f1a5c87f"}' + pipelines.kubeflow.org/component_spec: '{"description": "This is an example + component", "implementation": {"container": {"command": ["python3", "main.py", + "--input_manifest_path", {"inputPath": "input_manifest_path"}, "--metadata", + {"inputValue": "metadata"}, "--component_spec", {"inputValue": "component_spec"}, + "--input_partition_rows", {"inputValue": "input_partition_rows"}, "--storage_args", + {"inputValue": "storage_args"}, "--output_manifest_path", {"outputPath": + "output_manifest_path"}], "image": "example_component:latest"}}, "inputs": + [{"description": "Path to the input manifest", "name": "input_manifest_path", "type": "String"}, {"description": "Metadata arguments containing the run id and base path", "name": "metadata", "type": "String"}, {"default": "None", "description": "The component specification as a dictionary", "name": "component_spec", @@ -50,46 +71,64 @@ spec: "name": "input_partition_rows", "type": "String"}, {"description": "Storage arguments", "name": "storage_args", "type": "String"}], "name": "First component", "outputs": [{"description": "Path to the output manifest", "name": "output_manifest_path", - "type": "String"}]}', pipelines.kubeflow.org/component_ref: '{"digest": - "2a304ce49a15404ba50dfd8b56ec43fa8ac8c29f80579d1c8fb974d3f1a5c87f"}', pipelines.kubeflow.org/arguments.parameters: '{"component_spec": - "{\"args\": {\"storage_args\": {\"description\": \"Storage arguments\", - \"type\": \"str\"}}, \"description\": \"This is an example component\", - \"image\": \"example_component:latest\", \"name\": \"First component\", - \"produces\": {\"captions\": {\"fields\": {\"data\": {\"type\": \"string\"}}}, - \"images\": {\"fields\": {\"data\": {\"type\": \"binary\"}}}}}", "input_partition_rows": - "disable", "metadata": "{\"base_path\": \"/foo/bar\", \"run_id\": \"{{workflow.name}}\"}", - "storage_args": "a dummy string arg"}'} - - name: second-component - container: + "type": "String"}]}' + labels: + pipelines.kubeflow.org/enable_caching: 'true' + pipelines.kubeflow.org/kfp_sdk_version: 1.8.22 + pipelines.kubeflow.org/pipeline-sdk-type: kfp + name: first-component + outputs: + artifacts: + - name: first-component-output_manifest_path + path: /tmp/outputs/output_manifest_path/data + - container: args: [] - command: [python3, main.py, --input_manifest_path, /tmp/inputs/input_manifest_path/data, - --metadata, '{"base_path": "/foo/bar", "run_id": "{{workflow.name}}"}', --component_spec, - '{"args": {"storage_args": {"description": "Storage arguments", "type": "str"}}, - "consumes": {"images": {"fields": {"data": {"type": "binary"}}}}, "description": - "This is an example component", "image": "example_component:latest", "name": - "Second component", "produces": {"embeddings": {"fields": {"data": {"items": - {"type": "float32"}, "type": "array"}}}}}', --input_partition_rows, '10', - --storage_args, a dummy string arg, --output_manifest_path, /tmp/outputs/output_manifest_path/data] + command: + - python3 + - main.py + - --input_manifest_path + - /tmp/inputs/input_manifest_path/data + - --metadata + - '{"base_path": "/foo/bar", "pipeline_name": "test_pipeline", "run_id": "test_pipeline-20230101000000", + "component_id": "second_component"}' + - --component_spec + - '{"args": {"storage_args": {"description": "Storage arguments", "type": "str"}}, + "consumes": {"images": {"fields": {"data": {"type": "binary"}}}}, "description": + "This is an example component", "image": "example_component:latest", "name": + "Second component", "produces": {"embeddings": {"fields": {"data": {"items": + {"type": "float32"}, "type": "array"}}}}}' + - --input_partition_rows + - '10' + - --storage_args + - a dummy string arg + - --output_manifest_path + - /tmp/outputs/output_manifest_path/data image: example_component:latest inputs: artifacts: - - {name: first-component-output_manifest_path, path: /tmp/inputs/input_manifest_path/data} - outputs: - artifacts: - - {name: second-component-output_manifest_path, path: /tmp/outputs/output_manifest_path/data} + - name: first-component-output_manifest_path + path: /tmp/inputs/input_manifest_path/data metadata: - labels: - pipelines.kubeflow.org/kfp_sdk_version: 1.8.22 - pipelines.kubeflow.org/pipeline-sdk-type: kfp - pipelines.kubeflow.org/enable_caching: "true" - annotations: {pipelines.kubeflow.org/component_spec: '{"description": "This - is an example component", "implementation": {"container": {"command": ["python3", - "main.py", "--input_manifest_path", {"inputPath": "input_manifest_path"}, - "--metadata", {"inputValue": "metadata"}, "--component_spec", {"inputValue": - "component_spec"}, "--input_partition_rows", {"inputValue": "input_partition_rows"}, - "--storage_args", {"inputValue": "storage_args"}, "--output_manifest_path", - {"outputPath": "output_manifest_path"}], "image": "example_component:latest"}}, - "inputs": [{"description": "Path to the input manifest", "name": "input_manifest_path", + annotations: + pipelines.kubeflow.org/arguments.parameters: '{"component_spec": "{\"args\": + {\"storage_args\": {\"description\": \"Storage arguments\", \"type\": \"str\"}}, + \"consumes\": {\"images\": {\"fields\": {\"data\": {\"type\": \"binary\"}}}}, + \"description\": \"This is an example component\", \"image\": \"example_component:latest\", + \"name\": \"Second component\", \"produces\": {\"embeddings\": {\"fields\": + {\"data\": {\"items\": {\"type\": \"float32\"}, \"type\": \"array\"}}}}}", + "input_partition_rows": "10", "metadata": "{\"base_path\": \"/foo/bar\", + \"pipeline_name\": \"test_pipeline\", \"run_id\": \"test_pipeline-20230101000000\", + \"component_id\": \"second_component\"}", "storage_args": "a dummy string + arg"}' + pipelines.kubeflow.org/component_ref: '{"digest": "a02b0189397a2d9318982201f020dbbbe3962427ed150fe58cc69ff508cc68bb"}' + pipelines.kubeflow.org/component_spec: '{"description": "This is an example + component", "implementation": {"container": {"command": ["python3", "main.py", + "--input_manifest_path", {"inputPath": "input_manifest_path"}, "--metadata", + {"inputValue": "metadata"}, "--component_spec", {"inputValue": "component_spec"}, + "--input_partition_rows", {"inputValue": "input_partition_rows"}, "--storage_args", + {"inputValue": "storage_args"}, "--output_manifest_path", {"outputPath": + "output_manifest_path"}], "image": "example_component:latest"}}, "inputs": + [{"description": "Path to the input manifest", "name": "input_manifest_path", "type": "String"}, {"description": "Metadata arguments containing the run id and base path", "name": "metadata", "type": "String"}, {"default": "None", "description": "The component specification as a dictionary", "name": "component_spec", @@ -98,89 +137,103 @@ spec: "name": "input_partition_rows", "type": "String"}, {"description": "Storage arguments", "name": "storage_args", "type": "String"}], "name": "Second component", "outputs": [{"description": "Path to the output manifest", "name": - "output_manifest_path", "type": "String"}]}', pipelines.kubeflow.org/component_ref: '{"digest": - "a02b0189397a2d9318982201f020dbbbe3962427ed150fe58cc69ff508cc68bb"}', pipelines.kubeflow.org/arguments.parameters: '{"component_spec": - "{\"args\": {\"storage_args\": {\"description\": \"Storage arguments\", - \"type\": \"str\"}}, \"consumes\": {\"images\": {\"fields\": {\"data\": - {\"type\": \"binary\"}}}}, \"description\": \"This is an example component\", - \"image\": \"example_component:latest\", \"name\": \"Second component\", - \"produces\": {\"embeddings\": {\"fields\": {\"data\": {\"items\": {\"type\": - \"float32\"}, \"type\": \"array\"}}}}}", "input_partition_rows": "10", "metadata": - "{\"base_path\": \"/foo/bar\", \"run_id\": \"{{workflow.name}}\"}", "storage_args": - "a dummy string arg"}'} - - name: test-pipeline - dag: + "output_manifest_path", "type": "String"}]}' + labels: + pipelines.kubeflow.org/enable_caching: 'true' + pipelines.kubeflow.org/kfp_sdk_version: 1.8.22 + pipelines.kubeflow.org/pipeline-sdk-type: kfp + name: second-component + outputs: + artifacts: + - name: second-component-output_manifest_path + path: /tmp/outputs/output_manifest_path/data + - dag: tasks: - - {name: first-component, template: first-component} - - name: second-component + - name: first-component + template: first-component + - arguments: + artifacts: + - from: '{{tasks.first-component.outputs.artifacts.first-component-output_manifest_path}}' + name: first-component-output_manifest_path + dependencies: + - first-component + name: second-component template: second-component - dependencies: [first-component] - arguments: + - arguments: artifacts: - - {name: first-component-output_manifest_path, from: '{{tasks.first-component.outputs.artifacts.first-component-output_manifest_path}}'} - - name: third-component + - from: '{{tasks.second-component.outputs.artifacts.second-component-output_manifest_path}}' + name: second-component-output_manifest_path + dependencies: + - second-component + name: third-component template: third-component - dependencies: [second-component] - arguments: - artifacts: - - {name: second-component-output_manifest_path, from: '{{tasks.second-component.outputs.artifacts.second-component-output_manifest_path}}'} - - name: third-component - container: + name: test-pipeline + - container: args: [] - command: [python3, main.py, --input_manifest_path, /tmp/inputs/input_manifest_path/data, - --metadata, '{"base_path": "/foo/bar", "run_id": "{{workflow.name}}"}', --component_spec, - '{"args": {"some_list": {"description": "Some list", "items": {"type": "int"}, - "type": "list"}, "storage_args": {"description": "Storage arguments", "type": - "str"}}, "consumes": {"captions": {"fields": {"data": {"type": "string"}}}, - "embeddings": {"fields": {"data": {"items": {"type": "float32"}, "type": - "array"}}}, "images": {"fields": {"data": {"type": "binary"}}}}, "description": - "This is an example component", "image": "example_component:latest", "name": - "Third component", "produces": {"additionalSubsets": false, "images": {"fields": - {"data": {"type": "binary"}}}}}', --input_partition_rows, None, --storage_args, - a dummy string arg, --some_list, '[1, 2, 3]', --output_manifest_path, /tmp/outputs/output_manifest_path/data] + command: + - python3 + - main.py + - --input_manifest_path + - /tmp/inputs/input_manifest_path/data + - --metadata + - '{"base_path": "/foo/bar", "pipeline_name": "test_pipeline", "run_id": "test_pipeline-20230101000000", + "component_id": "third_component"}' + - --component_spec + - '{"args": {"storage_args": {"description": "Storage arguments", "type": "str"}}, + "consumes": {"captions": {"fields": {"data": {"type": "string"}}}, "embeddings": + {"fields": {"data": {"items": {"type": "float32"}, "type": "array"}}}, "images": + {"fields": {"data": {"type": "binary"}}}}, "description": "This is an example + component", "image": "example_component:latest", "name": "Third component", + "produces": {"additionalSubsets": false, "images": {"fields": {"data": {"type": + "binary"}}}}}' + - --input_partition_rows + - None + - --storage_args + - a dummy string arg + - --output_manifest_path + - /tmp/outputs/output_manifest_path/data image: example_component:latest inputs: artifacts: - - {name: second-component-output_manifest_path, path: /tmp/inputs/input_manifest_path/data} - outputs: - artifacts: - - {name: third-component-output_manifest_path, path: /tmp/outputs/output_manifest_path/data} + - name: second-component-output_manifest_path + path: /tmp/inputs/input_manifest_path/data metadata: - labels: - pipelines.kubeflow.org/kfp_sdk_version: 1.8.22 - pipelines.kubeflow.org/pipeline-sdk-type: kfp - pipelines.kubeflow.org/enable_caching: "true" - annotations: {pipelines.kubeflow.org/component_spec: '{"description": "This - is an example component", "implementation": {"container": {"command": ["python3", - "main.py", "--input_manifest_path", {"inputPath": "input_manifest_path"}, - "--metadata", {"inputValue": "metadata"}, "--component_spec", {"inputValue": - "component_spec"}, "--input_partition_rows", {"inputValue": "input_partition_rows"}, - "--storage_args", {"inputValue": "storage_args"}, "--some_list", {"inputValue": - "some_list"}, "--output_manifest_path", {"outputPath": "output_manifest_path"}], - "image": "example_component:latest"}}, "inputs": [{"description": "Path - to the input manifest", "name": "input_manifest_path", "type": "String"}, - {"description": "Metadata arguments containing the run id and base path", - "name": "metadata", "type": "String"}, {"default": "None", "description": - "The component specification as a dictionary", "name": "component_spec", + annotations: + pipelines.kubeflow.org/arguments.parameters: '{"component_spec": "{\"args\": + {\"storage_args\": {\"description\": \"Storage arguments\", \"type\": \"str\"}}, + \"consumes\": {\"captions\": {\"fields\": {\"data\": {\"type\": \"string\"}}}, + \"embeddings\": {\"fields\": {\"data\": {\"items\": {\"type\": \"float32\"}, + \"type\": \"array\"}}}, \"images\": {\"fields\": {\"data\": {\"type\": \"binary\"}}}}, + \"description\": \"This is an example component\", \"image\": \"example_component:latest\", + \"name\": \"Third component\", \"produces\": {\"additionalSubsets\": false, + \"images\": {\"fields\": {\"data\": {\"type\": \"binary\"}}}}}", "input_partition_rows": + "None", "metadata": "{\"base_path\": \"/foo/bar\", \"pipeline_name\": \"test_pipeline\", + \"run_id\": \"test_pipeline-20230101000000\", \"component_id\": \"third_component\"}", + "storage_args": "a dummy string arg"}' + pipelines.kubeflow.org/component_ref: '{"digest": "698791c6aa2ed14d4b337840116a7a995f403e5be414389b05ccf7942b9e4437"}' + pipelines.kubeflow.org/component_spec: '{"description": "This is an example + component", "implementation": {"container": {"command": ["python3", "main.py", + "--input_manifest_path", {"inputPath": "input_manifest_path"}, "--metadata", + {"inputValue": "metadata"}, "--component_spec", {"inputValue": "component_spec"}, + "--input_partition_rows", {"inputValue": "input_partition_rows"}, "--storage_args", + {"inputValue": "storage_args"}, "--output_manifest_path", {"outputPath": + "output_manifest_path"}], "image": "example_component:latest"}}, "inputs": + [{"description": "Path to the input manifest", "name": "input_manifest_path", + "type": "String"}, {"description": "Metadata arguments containing the run + id and base path", "name": "metadata", "type": "String"}, {"default": "None", + "description": "The component specification as a dictionary", "name": "component_spec", "type": "JsonObject"}, {"default": "None", "description": "The number of rows to load per partition. Set to override the automatic partitioning", "name": "input_partition_rows", "type": "String"}, {"description": "Storage - arguments", "name": "storage_args", "type": "String"}, {"description": "Some - list", "name": "some_list", "type": "JsonArray"}], "name": "Third component", + arguments", "name": "storage_args", "type": "String"}], "name": "Third component", "outputs": [{"description": "Path to the output manifest", "name": "output_manifest_path", - "type": "String"}]}', pipelines.kubeflow.org/component_ref: '{"digest": - "253932349a663809f2ea6fcf63ebd58f963881c6960435269d3fbe3eb17dcf53"}', pipelines.kubeflow.org/arguments.parameters: '{"component_spec": - "{\"args\": {\"some_list\": {\"description\": \"Some list\", \"items\": - {\"type\": \"int\"}, \"type\": \"list\"}, \"storage_args\": {\"description\": - \"Storage arguments\", \"type\": \"str\"}}, \"consumes\": {\"captions\": - {\"fields\": {\"data\": {\"type\": \"string\"}}}, \"embeddings\": {\"fields\": - {\"data\": {\"items\": {\"type\": \"float32\"}, \"type\": \"array\"}}}, - \"images\": {\"fields\": {\"data\": {\"type\": \"binary\"}}}}, \"description\": - \"This is an example component\", \"image\": \"example_component:latest\", - \"name\": \"Third component\", \"produces\": {\"additionalSubsets\": false, - \"images\": {\"fields\": {\"data\": {\"type\": \"binary\"}}}}}", "input_partition_rows": - "None", "metadata": "{\"base_path\": \"/foo/bar\", \"run_id\": \"{{workflow.name}}\"}", - "some_list": "[1, 2, 3]", "storage_args": "a dummy string arg"}'} - arguments: - parameters: [] - serviceAccountName: pipeline-runner + "type": "String"}]}' + labels: + pipelines.kubeflow.org/enable_caching: 'true' + pipelines.kubeflow.org/kfp_sdk_version: 1.8.22 + pipelines.kubeflow.org/pipeline-sdk-type: kfp + name: third-component + outputs: + artifacts: + - name: third-component-output_manifest_path + path: /tmp/outputs/output_manifest_path/data diff --git a/tests/example_pipelines/compiled_pipeline/example_2/docker-compose.yml b/tests/example_pipelines/compiled_pipeline/example_2/docker-compose.yml index 807d281ba..b84fa6d69 100644 --- a/tests/example_pipelines/compiled_pipeline/example_2/docker-compose.yml +++ b/tests/example_pipelines/compiled_pipeline/example_2/docker-compose.yml @@ -6,7 +6,8 @@ services: context: tests/example_pipelines/valid_pipeline/example_1/first_component command: - --metadata - - '{"run_id": "test_pipeline-20230101000000", "base_path": "/foo/bar"}' + - '{"base_path": "/foo/bar", "pipeline_name": "test_pipeline", "run_id": "test_pipeline-20230101000000", + "component_id": "first_component"}' - --output_manifest_path - /foo/bar/first_component/manifest.json - --storage_args @@ -23,7 +24,8 @@ services: image_cropping: command: - --metadata - - '{"run_id": "test_pipeline-20230101000000", "base_path": "/foo/bar"}' + - '{"base_path": "/foo/bar", "pipeline_name": "test_pipeline", "run_id": "test_pipeline-20230101000000", + "component_id": "image_cropping"}' - --output_manifest_path - /foo/bar/image_cropping/manifest.json - --cropping_threshold diff --git a/tests/example_pipelines/compiled_pipeline/example_2/kubeflow_pipeline.yml b/tests/example_pipelines/compiled_pipeline/example_2/kubeflow_pipeline.yml index 67d306139..adcfdf703 100644 --- a/tests/example_pipelines/compiled_pipeline/example_2/kubeflow_pipeline.yml +++ b/tests/example_pipelines/compiled_pipeline/example_2/kubeflow_pipeline.yml @@ -1,47 +1,68 @@ apiVersion: argoproj.io/v1alpha1 kind: Workflow metadata: - generateName: test-pipeline- - annotations: {pipelines.kubeflow.org/kfp_sdk_version: 1.8.22, pipelines.kubeflow.org/pipeline_compilation_time: '2023-01-01T00:00:00', + annotations: + pipelines.kubeflow.org/kfp_sdk_version: 1.8.22 + pipelines.kubeflow.org/pipeline_compilation_time: '2023-01-01T00:00:00' pipelines.kubeflow.org/pipeline_spec: '{"description": "description of the test - pipeline", "name": "test_pipeline"}'} - labels: {pipelines.kubeflow.org/kfp_sdk_version: 1.8.22} + pipeline", "name": "test_pipeline"}' + generateName: test-pipeline- + labels: + pipelines.kubeflow.org/kfp_sdk_version: 1.8.22 spec: + arguments: + parameters: [] entrypoint: test-pipeline + serviceAccountName: pipeline-runner templates: - - name: first-component - container: + - container: args: [] - command: [python3, main.py, --input_manifest_path, /tmp/inputs/input_manifest_path/data, - --metadata, '{"base_path": "/foo/bar", "run_id": "{{workflow.name}}"}', --component_spec, - '{"args": {"storage_args": {"description": "Storage arguments", "type": "str"}}, - "description": "This is an example component", "image": "example_component:latest", - "name": "First component", "produces": {"captions": {"fields": {"data": - {"type": "string"}}}, "images": {"fields": {"data": {"type": "binary"}}}}}', - --input_partition_rows, None, --storage_args, a dummy string arg, --output_manifest_path, - /tmp/outputs/output_manifest_path/data] + command: + - python3 + - main.py + - --input_manifest_path + - /tmp/inputs/input_manifest_path/data + - --metadata + - '{"base_path": "/foo/bar", "pipeline_name": "test_pipeline", "run_id": "test_pipeline-20230101000000", + "component_id": "first_component"}' + - --component_spec + - '{"args": {"storage_args": {"description": "Storage arguments", "type": "str"}}, + "description": "This is an example component", "image": "example_component:latest", + "name": "First component", "produces": {"captions": {"fields": {"data": {"type": + "string"}}}, "images": {"fields": {"data": {"type": "binary"}}}}}' + - --input_partition_rows + - None + - --storage_args + - a dummy string arg + - --output_manifest_path + - /tmp/outputs/output_manifest_path/data image: example_component:latest inputs: artifacts: - name: input_manifest_path path: /tmp/inputs/input_manifest_path/data - raw: {data: ''} - outputs: - artifacts: - - {name: first-component-output_manifest_path, path: /tmp/outputs/output_manifest_path/data} + raw: + data: '' metadata: - labels: - pipelines.kubeflow.org/kfp_sdk_version: 1.8.22 - pipelines.kubeflow.org/pipeline-sdk-type: kfp - pipelines.kubeflow.org/enable_caching: "true" - annotations: {pipelines.kubeflow.org/component_spec: '{"description": "This - is an example component", "implementation": {"container": {"command": ["python3", - "main.py", "--input_manifest_path", {"inputPath": "input_manifest_path"}, - "--metadata", {"inputValue": "metadata"}, "--component_spec", {"inputValue": - "component_spec"}, "--input_partition_rows", {"inputValue": "input_partition_rows"}, - "--storage_args", {"inputValue": "storage_args"}, "--output_manifest_path", - {"outputPath": "output_manifest_path"}], "image": "example_component:latest"}}, - "inputs": [{"description": "Path to the input manifest", "name": "input_manifest_path", + annotations: + pipelines.kubeflow.org/arguments.parameters: '{"component_spec": "{\"args\": + {\"storage_args\": {\"description\": \"Storage arguments\", \"type\": \"str\"}}, + \"description\": \"This is an example component\", \"image\": \"example_component:latest\", + \"name\": \"First component\", \"produces\": {\"captions\": {\"fields\": + {\"data\": {\"type\": \"string\"}}}, \"images\": {\"fields\": {\"data\": + {\"type\": \"binary\"}}}}}", "input_partition_rows": "None", "metadata": + "{\"base_path\": \"/foo/bar\", \"pipeline_name\": \"test_pipeline\", \"run_id\": + \"test_pipeline-20230101000000\", \"component_id\": \"first_component\"}", + "storage_args": "a dummy string arg"}' + pipelines.kubeflow.org/component_ref: '{"digest": "2a304ce49a15404ba50dfd8b56ec43fa8ac8c29f80579d1c8fb974d3f1a5c87f"}' + pipelines.kubeflow.org/component_spec: '{"description": "This is an example + component", "implementation": {"container": {"command": ["python3", "main.py", + "--input_manifest_path", {"inputPath": "input_manifest_path"}, "--metadata", + {"inputValue": "metadata"}, "--component_spec", {"inputValue": "component_spec"}, + "--input_partition_rows", {"inputValue": "input_partition_rows"}, "--storage_args", + {"inputValue": "storage_args"}, "--output_manifest_path", {"outputPath": + "output_manifest_path"}], "image": "example_component:latest"}}, "inputs": + [{"description": "Path to the input manifest", "name": "input_manifest_path", "type": "String"}, {"description": "Metadata arguments containing the run id and base path", "name": "metadata", "type": "String"}, {"default": "None", "description": "The component specification as a dictionary", "name": "component_spec", @@ -50,47 +71,71 @@ spec: "name": "input_partition_rows", "type": "String"}, {"description": "Storage arguments", "name": "storage_args", "type": "String"}], "name": "First component", "outputs": [{"description": "Path to the output manifest", "name": "output_manifest_path", - "type": "String"}]}', pipelines.kubeflow.org/component_ref: '{"digest": - "2a304ce49a15404ba50dfd8b56ec43fa8ac8c29f80579d1c8fb974d3f1a5c87f"}', pipelines.kubeflow.org/arguments.parameters: '{"component_spec": - "{\"args\": {\"storage_args\": {\"description\": \"Storage arguments\", - \"type\": \"str\"}}, \"description\": \"This is an example component\", - \"image\": \"example_component:latest\", \"name\": \"First component\", - \"produces\": {\"captions\": {\"fields\": {\"data\": {\"type\": \"string\"}}}, - \"images\": {\"fields\": {\"data\": {\"type\": \"binary\"}}}}}", "input_partition_rows": - "None", "metadata": "{\"base_path\": \"/foo/bar\", \"run_id\": \"{{workflow.name}}\"}", - "storage_args": "a dummy string arg"}'} - - name: image-cropping - container: + "type": "String"}]}' + labels: + pipelines.kubeflow.org/enable_caching: 'true' + pipelines.kubeflow.org/kfp_sdk_version: 1.8.22 + pipelines.kubeflow.org/pipeline-sdk-type: kfp + name: first-component + outputs: + artifacts: + - name: first-component-output_manifest_path + path: /tmp/outputs/output_manifest_path/data + - container: args: [] - command: [python3, main.py, --input_manifest_path, /tmp/inputs/input_manifest_path/data, - --metadata, '{"base_path": "/foo/bar", "run_id": "{{workflow.name}}"}', --component_spec, - '{"args": {"cropping_threshold": {"default": -30, "description": "Threshold - parameter used for detecting borders. A lower (negative) parameter results - in a more performant border detection, but can cause overcropping. Default - is -30", "type": "int"}, "padding": {"default": 10, "description": "Padding - for the image cropping. The padding is added to all borders of the image.", - "type": "int"}}, "consumes": {"images": {"fields": {"data": {"type": "binary"}}}}, - "description": "Component that removes single-colored borders around images - and crops them appropriately", "image": "ghcr.io/ml6team/image_cropping:dev", - "name": "Image cropping", "produces": {"images": {"fields": {"data": {"type": - "binary"}, "height": {"type": "int32"}, "width": {"type": "int32"}}}}}', - --input_partition_rows, None, --cropping_threshold, '0', --padding, '0', --output_manifest_path, - /tmp/outputs/output_manifest_path/data] + command: + - python3 + - main.py + - --input_manifest_path + - /tmp/inputs/input_manifest_path/data + - --metadata + - '{"base_path": "/foo/bar", "pipeline_name": "test_pipeline", "run_id": "test_pipeline-20230101000000", + "component_id": "image_cropping"}' + - --component_spec + - '{"args": {"cropping_threshold": {"default": -30, "description": "Threshold + parameter used for detecting borders. A lower (negative) parameter results + in a more performant border detection, but can cause overcropping. Default + is -30", "type": "int"}, "padding": {"default": 10, "description": "Padding + for the image cropping. The padding is added to all borders of the image.", + "type": "int"}}, "consumes": {"images": {"fields": {"data": {"type": "binary"}}}}, + "description": "Component that removes single-colored borders around images + and crops them appropriately", "image": "ghcr.io/ml6team/image_cropping:dev", + "name": "Image cropping", "produces": {"images": {"fields": {"data": {"type": + "binary"}, "height": {"type": "int32"}, "width": {"type": "int32"}}}}}' + - --input_partition_rows + - None + - --cropping_threshold + - '0' + - --padding + - '0' + - --output_manifest_path + - /tmp/outputs/output_manifest_path/data image: ghcr.io/ml6team/image_cropping:dev inputs: artifacts: - - {name: first-component-output_manifest_path, path: /tmp/inputs/input_manifest_path/data} - outputs: - artifacts: - - {name: image-cropping-output_manifest_path, path: /tmp/outputs/output_manifest_path/data} + - name: first-component-output_manifest_path + path: /tmp/inputs/input_manifest_path/data metadata: - labels: - pipelines.kubeflow.org/kfp_sdk_version: 1.8.22 - pipelines.kubeflow.org/pipeline-sdk-type: kfp - pipelines.kubeflow.org/enable_caching: "true" - annotations: {pipelines.kubeflow.org/component_spec: '{"description": "Component - that removes single-colored borders around images and crops them appropriately", - "implementation": {"container": {"command": ["python3", "main.py", "--input_manifest_path", + annotations: + pipelines.kubeflow.org/arguments.parameters: '{"component_spec": "{\"args\": + {\"cropping_threshold\": {\"default\": -30, \"description\": \"Threshold + parameter used for detecting borders. A lower (negative) parameter results + in a more performant border detection, but can cause overcropping. Default + is -30\", \"type\": \"int\"}, \"padding\": {\"default\": 10, \"description\": + \"Padding for the image cropping. The padding is added to all borders of + the image.\", \"type\": \"int\"}}, \"consumes\": {\"images\": {\"fields\": + {\"data\": {\"type\": \"binary\"}}}}, \"description\": \"Component that + removes single-colored borders around images and crops them appropriately\", + \"image\": \"ghcr.io/ml6team/image_cropping:dev\", \"name\": \"Image cropping\", + \"produces\": {\"images\": {\"fields\": {\"data\": {\"type\": \"binary\"}, + \"height\": {\"type\": \"int32\"}, \"width\": {\"type\": \"int32\"}}}}}", + "cropping_threshold": "0", "input_partition_rows": "None", "metadata": "{\"base_path\": + \"/foo/bar\", \"pipeline_name\": \"test_pipeline\", \"run_id\": \"test_pipeline-20230101000000\", + \"component_id\": \"image_cropping\"}", "padding": "0"}' + pipelines.kubeflow.org/component_ref: '{"digest": "e86f02b6b9cc878b6187e44bb3caf9291c3ce42c1939e19b0a97dacdc78a9d72"}' + pipelines.kubeflow.org/component_spec: '{"description": "Component that removes + single-colored borders around images and crops them appropriately", "implementation": + {"container": {"command": ["python3", "main.py", "--input_manifest_path", {"inputPath": "input_manifest_path"}, "--metadata", {"inputValue": "metadata"}, "--component_spec", {"inputValue": "component_spec"}, "--input_partition_rows", {"inputValue": "input_partition_rows"}, "--cropping_threshold", {"inputValue": @@ -109,31 +154,26 @@ spec: 10, "description": "Padding for the image cropping. The padding is added to all borders of the image.", "name": "padding", "type": "Integer"}], "name": "Image cropping", "outputs": [{"description": "Path to the output manifest", - "name": "output_manifest_path", "type": "String"}]}', pipelines.kubeflow.org/component_ref: '{"digest": - "e86f02b6b9cc878b6187e44bb3caf9291c3ce42c1939e19b0a97dacdc78a9d72"}', pipelines.kubeflow.org/arguments.parameters: '{"component_spec": - "{\"args\": {\"cropping_threshold\": {\"default\": -30, \"description\": - \"Threshold parameter used for detecting borders. A lower (negative) parameter - results in a more performant border detection, but can cause overcropping. - Default is -30\", \"type\": \"int\"}, \"padding\": {\"default\": 10, \"description\": - \"Padding for the image cropping. The padding is added to all borders of - the image.\", \"type\": \"int\"}}, \"consumes\": {\"images\": {\"fields\": - {\"data\": {\"type\": \"binary\"}}}}, \"description\": \"Component that - removes single-colored borders around images and crops them appropriately\", - \"image\": \"ghcr.io/ml6team/image_cropping:dev\", \"name\": \"Image cropping\", - \"produces\": {\"images\": {\"fields\": {\"data\": {\"type\": \"binary\"}, - \"height\": {\"type\": \"int32\"}, \"width\": {\"type\": \"int32\"}}}}}", - "cropping_threshold": "0", "input_partition_rows": "None", "metadata": "{\"base_path\": - \"/foo/bar\", \"run_id\": \"{{workflow.name}}\"}", "padding": "0"}'} - - name: test-pipeline - dag: + "name": "output_manifest_path", "type": "String"}]}' + labels: + pipelines.kubeflow.org/enable_caching: 'true' + pipelines.kubeflow.org/kfp_sdk_version: 1.8.22 + pipelines.kubeflow.org/pipeline-sdk-type: kfp + name: image-cropping + outputs: + artifacts: + - name: image-cropping-output_manifest_path + path: /tmp/outputs/output_manifest_path/data + - dag: tasks: - - {name: first-component, template: first-component} - - name: image-cropping - template: image-cropping - dependencies: [first-component] - arguments: + - name: first-component + template: first-component + - arguments: artifacts: - - {name: first-component-output_manifest_path, from: '{{tasks.first-component.outputs.artifacts.first-component-output_manifest_path}}'} - arguments: - parameters: [] - serviceAccountName: pipeline-runner + - from: '{{tasks.first-component.outputs.artifacts.first-component-output_manifest_path}}' + name: first-component-output_manifest_path + dependencies: + - first-component + name: image-cropping + template: image-cropping + name: test-pipeline diff --git a/tests/example_pipelines/compiled_pipeline/kubeflow_pipeline.yml b/tests/example_pipelines/compiled_pipeline/kubeflow_pipeline.yml index 8383894d6..8450fe823 100644 --- a/tests/example_pipelines/compiled_pipeline/kubeflow_pipeline.yml +++ b/tests/example_pipelines/compiled_pipeline/kubeflow_pipeline.yml @@ -1,50 +1,71 @@ apiVersion: argoproj.io/v1alpha1 kind: Workflow metadata: - generateName: test-pipeline- - annotations: {pipelines.kubeflow.org/kfp_sdk_version: 1.8.22, pipelines.kubeflow.org/pipeline_compilation_time: '2023-01-01T00:00:00', + annotations: + pipelines.kubeflow.org/kfp_sdk_version: 1.8.22 + pipelines.kubeflow.org/pipeline_compilation_time: '2023-01-01T00:00:00' pipelines.kubeflow.org/pipeline_spec: '{"description": "description of the test - pipeline", "name": "test_pipeline"}'} - labels: {pipelines.kubeflow.org/kfp_sdk_version: 1.8.22} + pipeline", "name": "test_pipeline"}' + generateName: test-pipeline- + labels: + pipelines.kubeflow.org/kfp_sdk_version: 1.8.22 spec: + arguments: + parameters: [] entrypoint: test-pipeline + serviceAccountName: pipeline-runner templates: - - name: first-component - container: + - container: args: [] - command: [python3, main.py, --input_manifest_path, /tmp/inputs/input_manifest_path/data, - --metadata, '{"base_path": "/foo/bar", "run_id": "{{workflow.name}}"}', --component_spec, - '{"args": {"storage_args": {"description": "Storage arguments", "type": "str"}}, - "description": "This is an example component", "image": "example_component:latest", - "name": "First component", "produces": {"captions": {"fields": {"data": - {"type": "string"}}}, "images": {"fields": {"data": {"type": "binary"}}}}}', - --input_partition_rows, None, --storage_args, a dummy string arg, --output_manifest_path, - /tmp/outputs/output_manifest_path/data] + command: + - python3 + - main.py + - --input_manifest_path + - /tmp/inputs/input_manifest_path/data + - --metadata + - '{"base_path": "/foo/bar", "pipeline_name": "test_pipeline", "run_id": "test_pipeline-20230101000000", + "component_id": "first_component"}' + - --component_spec + - '{"args": {"storage_args": {"description": "Storage arguments", "type": "str"}}, + "description": "This is an example component", "image": "example_component:latest", + "name": "First component", "produces": {"captions": {"fields": {"data": {"type": + "string"}}}, "images": {"fields": {"data": {"type": "binary"}}}}}' + - --input_partition_rows + - None + - --storage_args + - a dummy string arg + - --output_manifest_path + - /tmp/outputs/output_manifest_path/data image: example_component:latest resources: - limits: {nvidia.com/gpu: 1} + limits: + nvidia.com/gpu: 1 inputs: artifacts: - name: input_manifest_path path: /tmp/inputs/input_manifest_path/data - raw: {data: ''} - outputs: - artifacts: - - {name: first-component-output_manifest_path, path: /tmp/outputs/output_manifest_path/data} - nodeSelector: {node_pool: a_node_pool} + raw: + data: '' metadata: - labels: - pipelines.kubeflow.org/kfp_sdk_version: 1.8.22 - pipelines.kubeflow.org/pipeline-sdk-type: kfp - pipelines.kubeflow.org/enable_caching: "true" - annotations: {pipelines.kubeflow.org/component_spec: '{"description": "This - is an example component", "implementation": {"container": {"command": ["python3", - "main.py", "--input_manifest_path", {"inputPath": "input_manifest_path"}, - "--metadata", {"inputValue": "metadata"}, "--component_spec", {"inputValue": - "component_spec"}, "--input_partition_rows", {"inputValue": "input_partition_rows"}, - "--storage_args", {"inputValue": "storage_args"}, "--output_manifest_path", - {"outputPath": "output_manifest_path"}], "image": "example_component:latest"}}, - "inputs": [{"description": "Path to the input manifest", "name": "input_manifest_path", + annotations: + pipelines.kubeflow.org/arguments.parameters: '{"component_spec": "{\"args\": + {\"storage_args\": {\"description\": \"Storage arguments\", \"type\": \"str\"}}, + \"description\": \"This is an example component\", \"image\": \"example_component:latest\", + \"name\": \"First component\", \"produces\": {\"captions\": {\"fields\": + {\"data\": {\"type\": \"string\"}}}, \"images\": {\"fields\": {\"data\": + {\"type\": \"binary\"}}}}}", "input_partition_rows": "None", "metadata": + "{\"base_path\": \"/foo/bar\", \"pipeline_name\": \"test_pipeline\", \"run_id\": + \"test_pipeline-20230101000000\", \"component_id\": \"first_component\"}", + "storage_args": "a dummy string arg"}' + pipelines.kubeflow.org/component_ref: '{"digest": "2a304ce49a15404ba50dfd8b56ec43fa8ac8c29f80579d1c8fb974d3f1a5c87f"}' + pipelines.kubeflow.org/component_spec: '{"description": "This is an example + component", "implementation": {"container": {"command": ["python3", "main.py", + "--input_manifest_path", {"inputPath": "input_manifest_path"}, "--metadata", + {"inputValue": "metadata"}, "--component_spec", {"inputValue": "component_spec"}, + "--input_partition_rows", {"inputValue": "input_partition_rows"}, "--storage_args", + {"inputValue": "storage_args"}, "--output_manifest_path", {"outputPath": + "output_manifest_path"}], "image": "example_component:latest"}}, "inputs": + [{"description": "Path to the input manifest", "name": "input_manifest_path", "type": "String"}, {"description": "Metadata arguments containing the run id and base path", "name": "metadata", "type": "String"}, {"default": "None", "description": "The component specification as a dictionary", "name": "component_spec", @@ -53,19 +74,20 @@ spec: "name": "input_partition_rows", "type": "String"}, {"description": "Storage arguments", "name": "storage_args", "type": "String"}], "name": "First component", "outputs": [{"description": "Path to the output manifest", "name": "output_manifest_path", - "type": "String"}]}', pipelines.kubeflow.org/component_ref: '{"digest": - "2a304ce49a15404ba50dfd8b56ec43fa8ac8c29f80579d1c8fb974d3f1a5c87f"}', pipelines.kubeflow.org/arguments.parameters: '{"component_spec": - "{\"args\": {\"storage_args\": {\"description\": \"Storage arguments\", - \"type\": \"str\"}}, \"description\": \"This is an example component\", - \"image\": \"example_component:latest\", \"name\": \"First component\", - \"produces\": {\"captions\": {\"fields\": {\"data\": {\"type\": \"string\"}}}, - \"images\": {\"fields\": {\"data\": {\"type\": \"binary\"}}}}}", "input_partition_rows": - "None", "metadata": "{\"base_path\": \"/foo/bar\", \"run_id\": \"{{workflow.name}}\"}", - "storage_args": "a dummy string arg"}'} - - name: test-pipeline - dag: + "type": "String"}]}' + labels: + pipelines.kubeflow.org/enable_caching: 'true' + pipelines.kubeflow.org/kfp_sdk_version: 1.8.22 + pipelines.kubeflow.org/pipeline-sdk-type: kfp + name: first-component + nodeSelector: + node_pool: a_node_pool + outputs: + artifacts: + - name: first-component-output_manifest_path + path: /tmp/outputs/output_manifest_path/data + - dag: tasks: - - {name: first-component, template: first-component} - arguments: - parameters: [] - serviceAccountName: pipeline-runner + - name: first-component + template: first-component + name: test-pipeline diff --git a/tests/example_pipelines/valid_pipeline/example_1/fourth_component/fondant_component.yaml b/tests/example_pipelines/valid_pipeline/example_1/fourth_component/fondant_component.yaml index f1d6d0b77..3cda0cc6c 100644 --- a/tests/example_pipelines/valid_pipeline/example_1/fourth_component/fondant_component.yaml +++ b/tests/example_pipelines/valid_pipeline/example_1/fourth_component/fondant_component.yaml @@ -1,4 +1,4 @@ -name: Third component +name: Fourth component description: This is an example component image: example_component:latest diff --git a/tests/test_compiler.py b/tests/test_compiler.py index 89c863143..fcef44247 100644 --- a/tests/test_compiler.py +++ b/tests/test_compiler.py @@ -27,10 +27,9 @@ input_partition_rows="10", ), ComponentOp( - Path(COMPONENTS_PATH / "example_1" / "fourth_component"), + Path(COMPONENTS_PATH / "example_1" / "third_component"), arguments={ "storage_args": "a dummy string arg", - "some_list": [1, 2, 3], }, ), ], @@ -82,17 +81,17 @@ def setup_pipeline(request, tmp_path, monkeypatch): # override the default package_path with temporary path to avoid the creation of artifacts monkeypatch.setattr(pipeline, "package_path", str(tmp_path / "test_pipeline.tgz")) - return (example_dir, pipeline) + return example_dir, pipeline @pytest.mark.usefixtures("_freeze_time") def test_docker_compiler(setup_pipeline, tmp_path_factory): """Test compiling a pipeline to docker-compose.""" example_dir, pipeline = setup_pipeline - compiler = DockerCompiler() + compiler = DockerCompiler(pipeline) with tmp_path_factory.mktemp("temp") as fn: output_path = str(fn / "docker-compose.yml") - compiler.compile(pipeline=pipeline, output_path=output_path, build_args=[]) + compiler.compile(output_path=output_path, build_args=[]) with open(output_path) as src, open( VALID_PIPELINE / example_dir / "docker-compose.yml", ) as truth: @@ -108,8 +107,8 @@ def test_docker_local_path(setup_pipeline, tmp_path_factory): _, pipeline = setup_pipeline work_dir = f"/{fn.stem}" pipeline.base_path = str(fn) - compiler = DockerCompiler() - compiler.compile(pipeline=pipeline, output_path=fn / "docker-compose.yml") + compiler = DockerCompiler(pipeline) + compiler.compile(output_path=fn / "docker-compose.yml") # read the generated docker-compose file with open(fn / "docker-compose.yml") as f_spec: @@ -127,7 +126,8 @@ def test_docker_local_path(setup_pipeline, tmp_path_factory): # check if commands are patched to use the working dir commands_with_dir = [ f"{work_dir}/{name}/manifest.json", - f'{{"run_id": "test_pipeline-20230101000000", "base_path": "{work_dir}"}}', + f'{{"base_path": "{work_dir}", "pipeline_name": "{pipeline.name}",' + f' "run_id": "test_pipeline-20230101000000", "component_id": "{name}"}}', ] for command in commands_with_dir: assert command in service["command"] @@ -139,9 +139,9 @@ def test_docker_remote_path(setup_pipeline, tmp_path_factory): _, pipeline = setup_pipeline remote_dir = "gs://somebucket/artifacts" pipeline.base_path = remote_dir - compiler = DockerCompiler() + compiler = DockerCompiler(pipeline) with tmp_path_factory.mktemp("temp") as fn: - compiler.compile(pipeline=pipeline, output_path=fn / "docker-compose.yml") + compiler.compile(output_path=fn / "docker-compose.yml") # read the generated docker-compose file with open(fn / "docker-compose.yml") as f_spec: @@ -153,7 +153,8 @@ def test_docker_remote_path(setup_pipeline, tmp_path_factory): # check if commands are patched to use the remote dir commands_with_dir = [ f"{remote_dir}/{name}/manifest.json", - f'{{"run_id": "test_pipeline-20230101000000", "base_path": "{remote_dir}"}}', + f'{{"base_path": "{remote_dir}", "pipeline_name": "{pipeline.name}",' + f' "run_id": "test_pipeline-20230101000000", "component_id": "{name}"}}', ] for command in commands_with_dir: assert command in service["command"] @@ -166,11 +167,10 @@ def test_docker_extra_volumes(setup_pipeline, tmp_path_factory): # this is the directory mounted in the container _, pipeline = setup_pipeline pipeline.base_path = str(fn) - compiler = DockerCompiler() + compiler = DockerCompiler(pipeline) # define some extra volumes to be mounted extra_volumes = ["hello:there", "general:kenobi"] compiler.compile( - pipeline=pipeline, output_path=fn / "docker-compose.yml", extra_volumes=extra_volumes, ) @@ -188,10 +188,10 @@ def test_docker_extra_volumes(setup_pipeline, tmp_path_factory): def test_kubeflow_compiler(setup_pipeline, tmp_path_factory): """Test compiling a pipeline to kubeflow.""" example_dir, pipeline = setup_pipeline - compiler = KubeFlowCompiler() + compiler = KubeFlowCompiler(pipeline) with tmp_path_factory.mktemp("temp") as fn: output_path = str(fn / "kubeflow_pipeline.yml") - compiler.compile(pipeline=pipeline, output_path=output_path) + compiler.compile(output_path=output_path) with open(output_path) as src, open( VALID_PIPELINE / example_dir / "kubeflow_pipeline.yml", ) as truth: @@ -214,10 +214,10 @@ def test_kubeflow_configuration(tmp_path_factory): number_of_gpus=1, ) pipeline.add_op(component_1) - compiler = KubeFlowCompiler() + compiler = KubeFlowCompiler(pipeline) with tmp_path_factory.mktemp("temp") as fn: output_path = str(fn / "kubeflow_pipeline.yml") - compiler.compile(pipeline=pipeline, output_path=output_path) + compiler.compile(output_path=output_path) with open(output_path) as src, open( VALID_PIPELINE / "kubeflow_pipeline.yml", ) as truth: @@ -226,8 +226,13 @@ def test_kubeflow_configuration(tmp_path_factory): def test_kfp_import(): """Test that the kfp import throws the correct error.""" + pipeline = Pipeline( + pipeline_name="test_pipeline", + pipeline_description="description of the test pipeline", + base_path="/foo/bar", + ) with mock.patch.dict(sys.modules): # remove kfp from the modules sys.modules["kfp"] = None with pytest.raises(ImportError): - _ = KubeFlowCompiler() + _ = KubeFlowCompiler(pipeline) diff --git a/tests/test_component.py b/tests/test_component.py index 612e9c7b9..65015d928 100644 --- a/tests/test_component.py +++ b/tests/test_component.py @@ -23,7 +23,7 @@ Executor, PandasTransformExecutor, ) -from fondant.manifest import Manifest +from fondant.manifest import Manifest, Metadata components_path = Path(__file__).parent / "example_specs/components" N_PARTITIONS = 2 @@ -35,6 +35,16 @@ def yaml_file_to_json_string(file_path): return json.dumps(data) +@pytest.fixture() +def metadata(): + return Metadata( + pipeline_name="pipeline", + base_path="/bucket", + component_id="load_component", + run_id="12345", + ) + + @pytest.fixture() def _patched_data_loading(monkeypatch): """Mock data loading so no actual data is loaded.""" @@ -74,14 +84,14 @@ def wrapper(self, *args, **kwargs): return wrapper -def test_component_arguments(): +def test_component_arguments(metadata): # Mock CLI arguments sys.argv = [ "", "--input_manifest_path", str(components_path / "arguments/input_manifest.json"), "--metadata", - "{}", + metadata.to_json(), "--output_manifest_path", str(components_path / "arguments/output_manifest.json"), "--component_spec", @@ -129,12 +139,13 @@ def _process_dataset(self, manifest: Manifest) -> t.Union[None, dd.DataFrame]: @pytest.mark.usefixtures("_patched_data_writing") -def test_load_component(): +def test_load_component(metadata): # Mock CLI arguments load + sys.argv = [ "", "--metadata", - json.dumps({"base_path": "/bucket", "run_id": "12345"}), + metadata.to_json(), "--flag", "success", "--value", @@ -168,14 +179,14 @@ def load(self): @pytest.mark.usefixtures("_patched_data_loading", "_patched_data_writing") -def test_dask_transform_component(): +def test_dask_transform_component(metadata): # Mock CLI arguments sys.argv = [ "", "--input_manifest_path", str(components_path / "input_manifest.json"), "--metadata", - "{}", + metadata.to_json(), "--flag", "success", "--value", @@ -212,14 +223,14 @@ def transform(self, dataframe): @pytest.mark.usefixtures("_patched_data_loading", "_patched_data_writing") -def test_pandas_transform_component(): +def test_pandas_transform_component(metadata): # Mock CLI arguments sys.argv = [ "", "--input_manifest_path", str(components_path / "input_manifest.json"), "--metadata", - "{}", + metadata.to_json(), "--flag", "success", "--value", @@ -327,14 +338,14 @@ def transform(dataframe: pd.DataFrame) -> pd.DataFrame: @pytest.mark.usefixtures("_patched_data_loading") -def test_write_component(): +def test_write_component(metadata): # Mock CLI arguments sys.argv = [ "", "--input_manifest_path", str(components_path / "input_manifest.json"), "--metadata", - "{}", + metadata.to_json(), "--flag", "success", "--value", diff --git a/tests/test_manifest.py b/tests/test_manifest.py index 68fb9f364..922d70cfa 100644 --- a/tests/test_manifest.py +++ b/tests/test_manifest.py @@ -117,9 +117,11 @@ def test_manifest_creation(): """Test the stepwise creation of a manifest via the Manifest class.""" base_path = "gs://bucket" run_id = "run_id" + pipeline_name = "pipeline_name" component_id = "component_id" manifest = Manifest.create( + pipeline_name=pipeline_name, base_path=base_path, run_id=run_id, component_id=component_id, @@ -129,6 +131,7 @@ def test_manifest_creation(): assert manifest._specification == { "metadata": { + "pipeline_name": pipeline_name, "base_path": base_path, "run_id": run_id, "component_id": component_id, @@ -154,11 +157,17 @@ def test_manifest_creation(): def test_manifest_repr(): - manifest = Manifest.create(base_path="/", run_id="A", component_id="1") + manifest = Manifest.create( + pipeline_name="NAME", + base_path="/", + run_id="A", + component_id="1", + ) assert ( manifest.__repr__() - == "Manifest({'metadata': {'base_path': '/', 'run_id': 'A', 'component_id': '1'}, " - "'index': {'location': '/index/A/1'}, 'subsets': {}})" + == "Manifest({'metadata': {'base_path': '/', 'pipeline_name': 'NAME'," + " 'run_id': 'A', 'component_id': '1'}, 'index': {'location': '/index/A/1'}," + " 'subsets': {}})" ) diff --git a/tests/test_pipeline.py b/tests/test_pipeline.py index 9b80c9121..77ed774b0 100644 --- a/tests/test_pipeline.py +++ b/tests/test_pipeline.py @@ -96,13 +96,13 @@ def test_valid_pipeline( pipeline.sort_graph() assert list(pipeline._graph.keys()) == [ - "First component", - "Second component", - "Third component", + "first_component", + "second_component", + "third_component", ] - assert pipeline._graph["First component"]["dependencies"] == [] - assert pipeline._graph["Second component"]["dependencies"] == ["First component"] - assert pipeline._graph["Third component"]["dependencies"] == ["Second component"] + assert pipeline._graph["first_component"]["dependencies"] == [] + assert pipeline._graph["second_component"]["dependencies"] == ["first_component"] + assert pipeline._graph["third_component"]["dependencies"] == ["second_component"] pipeline._validate_pipeline_definition("test_pipeline") From eab1f0d57de916d1142190ea7a7b8a72c049711e Mon Sep 17 00:00:00 2001 From: Philippe Moussalli Date: Mon, 21 Aug 2023 16:51:23 +0200 Subject: [PATCH 2/6] add pipeline name to manifest schema --- src/fondant/manifest.py | 4 ++++ src/fondant/schemas/manifest.json | 3 +++ tests/example_data/manifest.json | 3 ++- tests/example_specs/components/input_manifest.json | 3 ++- tests/example_specs/manifests/valid_manifest.json | 3 ++- 5 files changed, 13 insertions(+), 3 deletions(-) diff --git a/src/fondant/manifest.py b/src/fondant/manifest.py index fe9e62255..2256c0013 100644 --- a/src/fondant/manifest.py +++ b/src/fondant/manifest.py @@ -198,6 +198,10 @@ def run_id(self) -> str: def component_id(self) -> str: return self.metadata["component_id"] + @property + def pipeline_name(self) -> str: + return self.metadata["pipeline_name"] + @property def index(self) -> Index: return Index(self._specification["index"], base_path=self.base_path) diff --git a/src/fondant/schemas/manifest.json b/src/fondant/schemas/manifest.json index f08d82c29..fed5960f9 100644 --- a/src/fondant/schemas/manifest.json +++ b/src/fondant/schemas/manifest.json @@ -9,6 +9,9 @@ "type": "string", "format": "uri" }, + "pipeline_name": { + "type": "string" + }, "run_id": { "type": "string" }, diff --git a/tests/example_data/manifest.json b/tests/example_data/manifest.json index 2d17d6af8..8fe4ef16b 100644 --- a/tests/example_data/manifest.json +++ b/tests/example_data/manifest.json @@ -1,7 +1,8 @@ { "metadata": { + "pipeline_name": "test_pipeline", "base_path": "tests/example_data/subsets_input", - "run_id": "12345", + "run_id": "test_pipeline_12345", "component_id": "67890" }, "index": { diff --git a/tests/example_specs/components/input_manifest.json b/tests/example_specs/components/input_manifest.json index 7ff924680..7af13d599 100644 --- a/tests/example_specs/components/input_manifest.json +++ b/tests/example_specs/components/input_manifest.json @@ -1,7 +1,8 @@ { "metadata": { + "pipeline_name": "test_pipeline", "base_path": "/bucket", - "run_id": "12345", + "run_id": "test_pipeline_12345", "component_id": "67890" }, "index": { diff --git a/tests/example_specs/manifests/valid_manifest.json b/tests/example_specs/manifests/valid_manifest.json index 8b9a5794d..9bc00c512 100644 --- a/tests/example_specs/manifests/valid_manifest.json +++ b/tests/example_specs/manifests/valid_manifest.json @@ -1,7 +1,8 @@ { "metadata": { + "pipeline_name": "test_pipeline", "base_path": "gs://bucket", - "run_id": "12345", + "run_id": "test_pipeline_12345", "component_id": "67890" }, "index": { From a827668d0a29c173788ce914349b00a414be3b2c Mon Sep 17 00:00:00 2001 From: Philippe Moussalli Date: Mon, 21 Aug 2023 16:51:23 +0200 Subject: [PATCH 3/6] add pipeline name to manifest schema --- docs/manifest.md | 3 ++- src/fondant/manifest.py | 4 ++++ src/fondant/schemas/manifest.json | 4 ++++ tests/example_data/manifest.json | 3 ++- tests/example_specs/components/arguments/input_manifest.json | 1 + tests/example_specs/components/input_manifest.json | 3 ++- tests/example_specs/evolution_examples/1/output_manifest.json | 1 + tests/example_specs/evolution_examples/2/output_manifest.json | 1 + tests/example_specs/evolution_examples/3/output_manifest.json | 1 + tests/example_specs/evolution_examples/4/output_manifest.json | 1 + tests/example_specs/evolution_examples/5/output_manifest.json | 1 + tests/example_specs/evolution_examples/6/output_manifest.json | 1 + tests/example_specs/evolution_examples/7/output_manifest.json | 1 + tests/example_specs/evolution_examples/8/output_manifest.json | 1 + tests/example_specs/evolution_examples/input_manifest.json | 1 + tests/example_specs/manifests/valid_manifest.json | 3 ++- 16 files changed, 26 insertions(+), 4 deletions(-) diff --git a/docs/manifest.md b/docs/manifest.md index a967aa73e..d672d534d 100644 --- a/docs/manifest.md +++ b/docs/manifest.md @@ -33,8 +33,9 @@ the data is stored and references to the pipeline and component that were used t ```json { "metadata": { + "pipeline_name": "pipeline_name", "base_path": "gs://bucket", - "run_id": "12345", + "run_id": "pipeline_name_12345", "component_id": "67890" } } diff --git a/src/fondant/manifest.py b/src/fondant/manifest.py index fe9e62255..2256c0013 100644 --- a/src/fondant/manifest.py +++ b/src/fondant/manifest.py @@ -198,6 +198,10 @@ def run_id(self) -> str: def component_id(self) -> str: return self.metadata["component_id"] + @property + def pipeline_name(self) -> str: + return self.metadata["pipeline_name"] + @property def index(self) -> Index: return Index(self._specification["index"], base_path=self.base_path) diff --git a/src/fondant/schemas/manifest.json b/src/fondant/schemas/manifest.json index f08d82c29..00ad6d1cc 100644 --- a/src/fondant/schemas/manifest.json +++ b/src/fondant/schemas/manifest.json @@ -9,6 +9,9 @@ "type": "string", "format": "uri" }, + "pipeline_name": { + "type": "string" + }, "run_id": { "type": "string" }, @@ -18,6 +21,7 @@ }, "required": [ "base_path", + "pipeline_name", "run_id", "component_id" ] diff --git a/tests/example_data/manifest.json b/tests/example_data/manifest.json index 2d17d6af8..8fe4ef16b 100644 --- a/tests/example_data/manifest.json +++ b/tests/example_data/manifest.json @@ -1,7 +1,8 @@ { "metadata": { + "pipeline_name": "test_pipeline", "base_path": "tests/example_data/subsets_input", - "run_id": "12345", + "run_id": "test_pipeline_12345", "component_id": "67890" }, "index": { diff --git a/tests/example_specs/components/arguments/input_manifest.json b/tests/example_specs/components/arguments/input_manifest.json index bdaf8976e..004113289 100644 --- a/tests/example_specs/components/arguments/input_manifest.json +++ b/tests/example_specs/components/arguments/input_manifest.json @@ -1,5 +1,6 @@ { "metadata": { + "pipeline_name": "test_pipeline", "base_path": "/bucket", "run_id": "12345", "component_id": "67890" diff --git a/tests/example_specs/components/input_manifest.json b/tests/example_specs/components/input_manifest.json index 7ff924680..7af13d599 100644 --- a/tests/example_specs/components/input_manifest.json +++ b/tests/example_specs/components/input_manifest.json @@ -1,7 +1,8 @@ { "metadata": { + "pipeline_name": "test_pipeline", "base_path": "/bucket", - "run_id": "12345", + "run_id": "test_pipeline_12345", "component_id": "67890" }, "index": { diff --git a/tests/example_specs/evolution_examples/1/output_manifest.json b/tests/example_specs/evolution_examples/1/output_manifest.json index 9f14f77d9..2694368d7 100644 --- a/tests/example_specs/evolution_examples/1/output_manifest.json +++ b/tests/example_specs/evolution_examples/1/output_manifest.json @@ -1,5 +1,6 @@ { "metadata": { + "pipeline_name": "test_pipeline", "base_path": "gs://bucket", "run_id": "12345", "component_id": "example_component" diff --git a/tests/example_specs/evolution_examples/2/output_manifest.json b/tests/example_specs/evolution_examples/2/output_manifest.json index 98483d483..a9d3851b2 100644 --- a/tests/example_specs/evolution_examples/2/output_manifest.json +++ b/tests/example_specs/evolution_examples/2/output_manifest.json @@ -1,5 +1,6 @@ { "metadata": { + "pipeline_name": "test_pipeline", "base_path": "gs://bucket", "run_id": "12345", "component_id": "example_component" diff --git a/tests/example_specs/evolution_examples/3/output_manifest.json b/tests/example_specs/evolution_examples/3/output_manifest.json index b74d4b774..cb510c3cf 100644 --- a/tests/example_specs/evolution_examples/3/output_manifest.json +++ b/tests/example_specs/evolution_examples/3/output_manifest.json @@ -1,5 +1,6 @@ { "metadata": { + "pipeline_name": "test_pipeline", "base_path": "gs://bucket", "run_id": "12345", "component_id": "example_component" diff --git a/tests/example_specs/evolution_examples/4/output_manifest.json b/tests/example_specs/evolution_examples/4/output_manifest.json index 54fc737a4..bf0c2a295 100644 --- a/tests/example_specs/evolution_examples/4/output_manifest.json +++ b/tests/example_specs/evolution_examples/4/output_manifest.json @@ -1,5 +1,6 @@ { "metadata": { + "pipeline_name": "test_pipeline", "base_path": "gs://bucket", "run_id": "12345", "component_id": "example_component" diff --git a/tests/example_specs/evolution_examples/5/output_manifest.json b/tests/example_specs/evolution_examples/5/output_manifest.json index d9fffcaff..0ed082ce8 100644 --- a/tests/example_specs/evolution_examples/5/output_manifest.json +++ b/tests/example_specs/evolution_examples/5/output_manifest.json @@ -1,5 +1,6 @@ { "metadata": { + "pipeline_name": "test_pipeline", "base_path": "gs://bucket", "run_id": "12345", "component_id": "example_component" diff --git a/tests/example_specs/evolution_examples/6/output_manifest.json b/tests/example_specs/evolution_examples/6/output_manifest.json index f03e1647e..9f8e814fa 100644 --- a/tests/example_specs/evolution_examples/6/output_manifest.json +++ b/tests/example_specs/evolution_examples/6/output_manifest.json @@ -1,5 +1,6 @@ { "metadata": { + "pipeline_name": "test_pipeline", "base_path": "gs://bucket", "run_id": "12345", "component_id": "example_component" diff --git a/tests/example_specs/evolution_examples/7/output_manifest.json b/tests/example_specs/evolution_examples/7/output_manifest.json index 1879dc0be..dec03420a 100644 --- a/tests/example_specs/evolution_examples/7/output_manifest.json +++ b/tests/example_specs/evolution_examples/7/output_manifest.json @@ -1,5 +1,6 @@ { "metadata": { + "pipeline_name": "test_pipeline", "base_path": "gs://bucket", "run_id": "12345", "component_id": "example_component" diff --git a/tests/example_specs/evolution_examples/8/output_manifest.json b/tests/example_specs/evolution_examples/8/output_manifest.json index 3f1de1fa7..657aeaadb 100644 --- a/tests/example_specs/evolution_examples/8/output_manifest.json +++ b/tests/example_specs/evolution_examples/8/output_manifest.json @@ -1,5 +1,6 @@ { "metadata": { + "pipeline_name": "test_pipeline", "base_path": "gs://bucket", "run_id": "12345", "component_id": "example_component" diff --git a/tests/example_specs/evolution_examples/input_manifest.json b/tests/example_specs/evolution_examples/input_manifest.json index 2c12b2a1d..2d9910981 100644 --- a/tests/example_specs/evolution_examples/input_manifest.json +++ b/tests/example_specs/evolution_examples/input_manifest.json @@ -1,5 +1,6 @@ { "metadata": { + "pipeline_name": "test_pipeline", "base_path": "gs://bucket", "run_id": "12345", "component_id": "67890" diff --git a/tests/example_specs/manifests/valid_manifest.json b/tests/example_specs/manifests/valid_manifest.json index 8b9a5794d..9bc00c512 100644 --- a/tests/example_specs/manifests/valid_manifest.json +++ b/tests/example_specs/manifests/valid_manifest.json @@ -1,7 +1,8 @@ { "metadata": { + "pipeline_name": "test_pipeline", "base_path": "gs://bucket", - "run_id": "12345", + "run_id": "test_pipeline_12345", "component_id": "67890" }, "index": { From 1a8762edadd084705b3aaf9e89840306ec36e3ce Mon Sep 17 00:00:00 2001 From: Philippe Moussalli Date: Mon, 21 Aug 2023 18:05:22 +0200 Subject: [PATCH 4/6] reorder base path directory --- src/fondant/compiler.py | 7 +- src/fondant/executor.py | 4 +- src/fondant/manifest.py | 8 +- .../example_1/docker-compose.yml | 10 +-- .../example_2/docker-compose.yml | 6 +- .../evolution_examples/1/output_manifest.json | 88 +++++++++---------- .../evolution_examples/2/output_manifest.json | 72 +++++++-------- .../evolution_examples/3/output_manifest.json | 60 ++++++------- .../evolution_examples/4/output_manifest.json | 72 +++++++-------- .../evolution_examples/5/output_manifest.json | 54 ++++++------ .../evolution_examples/6/output_manifest.json | 38 ++++---- .../evolution_examples/7/output_manifest.json | 38 ++++---- .../evolution_examples/8/output_manifest.json | 6 +- .../evolution_examples/input_manifest.json | 64 +++++++------- tests/test_compiler.py | 10 ++- tests/test_manifest.py | 9 +- tests/test_manifest_evolution.py | 2 +- 17 files changed, 275 insertions(+), 273 deletions(-) diff --git a/src/fondant/compiler.py b/src/fondant/compiler.py index a5859ab21..11d47bcbd 100644 --- a/src/fondant/compiler.py +++ b/src/fondant/compiler.py @@ -113,7 +113,6 @@ def _generate_spec( """Generate a docker-compose spec as a python dictionary, loops over the pipeline graph to create services and their dependencies. """ - datetime.datetime.now().strftime("%Y%m%d%H%M%S") path, volume = self._patch_path(base_path=self.pipeline.base_path) run_id = self.get_run_id() @@ -140,7 +139,8 @@ def _generate_spec( command.extend( [ "--output_manifest_path", - f"{path}/{component_name}/manifest.json", + f"{path}/{metadata.pipeline_name}/{metadata.run_id}/" + f"{component_name}/manifest.json", ], ) @@ -162,7 +162,8 @@ def _generate_spec( command.extend( [ "--input_manifest_path", - f"{path}/{dependency}/manifest.json", + f"{path}/{metadata.pipeline_name}/{metadata.run_id}/" + f"{dependency}/manifest.json", ], ) diff --git a/src/fondant/executor.py b/src/fondant/executor.py index e265a83f8..de9d54895 100644 --- a/src/fondant/executor.py +++ b/src/fondant/executor.py @@ -211,9 +211,9 @@ def upload_manifest(self, manifest: Manifest, save_path: t.Union[str, Path]): if is_kubeflow_output: # Save to the expected base path directory - safe_component_name = self.spec.name.replace(" ", "_").lower() save_path_base_path = ( - f"{manifest.base_path}/{safe_component_name}/manifest.json" + f"{manifest.pipeline_name}/{manifest.run_id}/" + f"{manifest.component_id}/manifest.json" ) Path(save_path_base_path).parent.mkdir(parents=True, exist_ok=True) manifest.to_file(save_path_base_path) diff --git a/src/fondant/manifest.py b/src/fondant/manifest.py index 2256c0013..148cfc9cd 100644 --- a/src/fondant/manifest.py +++ b/src/fondant/manifest.py @@ -158,7 +158,7 @@ def create( specification = { "metadata": metadata.to_dict(), - "index": {"location": f"/index/{run_id}/{component_id}"}, + "index": {"location": f"/{pipeline_name}/{run_id}/{component_id}/index"}, "subsets": {}, } return cls(specification) @@ -226,7 +226,7 @@ def add_subset( raise ValueError(msg) self._specification["subsets"][name] = { - "location": f"/{name}/{self.run_id}/{self.component_id}", + "location": f"/{self.pipeline_name}/{self.run_id}/{self.component_id}/{name}", "fields": {name: type_.to_json() for name, type_ in fields}, } @@ -254,7 +254,7 @@ def evolve( # noqa : PLR0912 (too many branches) # Update index location as this is currently always rewritten evolved_manifest.index._specification[ "location" - ] = f"/index/{self.run_id}/{component_id}" + ] = f"/{self.pipeline_name}/{self.run_id}/{component_id}/index" # If additionalSubsets is False in consumes, # Remove all subsets from the manifest that are not listed @@ -305,7 +305,7 @@ def evolve( # noqa : PLR0912 (too many branches) # Update subset location as this is currently always rewritten evolved_manifest.subsets[subset_name]._specification[ "location" - ] = f"/{subset_name}/{self.run_id}/{component_id}" + ] = f"{self.pipeline_name}/{self.run_id}/{component_id}/{subset_name}" # Subset is not yet in manifest, add it else: diff --git a/tests/example_pipelines/compiled_pipeline/example_1/docker-compose.yml b/tests/example_pipelines/compiled_pipeline/example_1/docker-compose.yml index fc39ca80d..3146e9d67 100644 --- a/tests/example_pipelines/compiled_pipeline/example_1/docker-compose.yml +++ b/tests/example_pipelines/compiled_pipeline/example_1/docker-compose.yml @@ -9,7 +9,7 @@ services: - '{"base_path": "/foo/bar", "pipeline_name": "test_pipeline", "run_id": "test_pipeline-20230101000000", "component_id": "first_component"}' - --output_manifest_path - - /foo/bar/first_component/manifest.json + - /foo/bar/test_pipeline/test_pipeline-20230101000000/first_component/manifest.json - --storage_args - a dummy string arg - --input_partition_rows @@ -30,7 +30,7 @@ services: - '{"base_path": "/foo/bar", "pipeline_name": "test_pipeline", "run_id": "test_pipeline-20230101000000", "component_id": "second_component"}' - --output_manifest_path - - /foo/bar/second_component/manifest.json + - /foo/bar/test_pipeline/test_pipeline-20230101000000/second_component/manifest.json - --storage_args - a dummy string arg - --input_partition_rows @@ -42,7 +42,7 @@ services: "array", "items": {"type": "float32"}}}}}, "args": {"storage_args": {"description": "Storage arguments", "type": "str"}}}' - --input_manifest_path - - /foo/bar/first_component/manifest.json + - /foo/bar/test_pipeline/test_pipeline-20230101000000/first_component/manifest.json depends_on: first_component: condition: service_completed_successfully @@ -56,7 +56,7 @@ services: - '{"base_path": "/foo/bar", "pipeline_name": "test_pipeline", "run_id": "test_pipeline-20230101000000", "component_id": "third_component"}' - --output_manifest_path - - /foo/bar/third_component/manifest.json + - /foo/bar/test_pipeline/test_pipeline-20230101000000/third_component/manifest.json - --storage_args - a dummy string arg - --input_partition_rows @@ -70,7 +70,7 @@ services: false}, "args": {"storage_args": {"description": "Storage arguments", "type": "str"}}}' - --input_manifest_path - - /foo/bar/second_component/manifest.json + - /foo/bar/test_pipeline/test_pipeline-20230101000000/second_component/manifest.json depends_on: second_component: condition: service_completed_successfully diff --git a/tests/example_pipelines/compiled_pipeline/example_2/docker-compose.yml b/tests/example_pipelines/compiled_pipeline/example_2/docker-compose.yml index b84fa6d69..f3178ebe2 100644 --- a/tests/example_pipelines/compiled_pipeline/example_2/docker-compose.yml +++ b/tests/example_pipelines/compiled_pipeline/example_2/docker-compose.yml @@ -9,7 +9,7 @@ services: - '{"base_path": "/foo/bar", "pipeline_name": "test_pipeline", "run_id": "test_pipeline-20230101000000", "component_id": "first_component"}' - --output_manifest_path - - /foo/bar/first_component/manifest.json + - /foo/bar/test_pipeline/test_pipeline-20230101000000/first_component/manifest.json - --storage_args - a dummy string arg - --input_partition_rows @@ -27,7 +27,7 @@ services: - '{"base_path": "/foo/bar", "pipeline_name": "test_pipeline", "run_id": "test_pipeline-20230101000000", "component_id": "image_cropping"}' - --output_manifest_path - - /foo/bar/image_cropping/manifest.json + - /foo/bar/test_pipeline/test_pipeline-20230101000000/image_cropping/manifest.json - --cropping_threshold - '0' - --padding @@ -46,7 +46,7 @@ services: for the image cropping. The padding is added to all borders of the image.", "type": "int", "default": 10}}}' - --input_manifest_path - - /foo/bar/first_component/manifest.json + - /foo/bar/test_pipeline/test_pipeline-20230101000000/first_component/manifest.json depends_on: first_component: condition: service_completed_successfully diff --git a/tests/example_specs/evolution_examples/1/output_manifest.json b/tests/example_specs/evolution_examples/1/output_manifest.json index 2694368d7..6981a9643 100644 --- a/tests/example_specs/evolution_examples/1/output_manifest.json +++ b/tests/example_specs/evolution_examples/1/output_manifest.json @@ -1,46 +1,46 @@ { - "metadata": { - "pipeline_name": "test_pipeline", - "base_path": "gs://bucket", - "run_id": "12345", - "component_id": "example_component" - }, - "index": { - "location": "/index/12345/example_component" - }, - "subsets": { - "images": { - "location": "/images/12345/example_component", - "fields": { - "width": { - "type": "int32" - }, - "height": { - "type": "int32" - }, - "data": { - "type": "binary" - } + "metadata":{ + "pipeline_name":"test_pipeline", + "base_path":"gs://bucket", + "run_id":"12345", + "component_id":"example_component" + }, + "index":{ + "location":"/test_pipeline/12345/example_component/index" + }, + "subsets":{ + "images":{ + "location":"/test_pipeline/12345/example_component/images", + "fields":{ + "width":{ + "type":"int32" + }, + "height":{ + "type":"int32" + }, + "data":{ + "type":"binary" + } + } + }, + "captions":{ + "location":"/test_pipeline/12345/example_component/captions", + "fields":{ + "data":{ + "type":"binary" + } + } + }, + "embeddings":{ + "location":"/test_pipeline/12345/example_component/embeddings", + "fields":{ + "data":{ + "type":"array", + "items":{ + "type":"float32" + } + } + } } - }, - "captions": { - "location": "/captions/12345/example_component", - "fields": { - "data": { - "type": "binary" - } - } - }, - "embeddings": { - "location": "/embeddings/12345/example_component", - "fields": { - "data": { - "type": "array", - "items": { - "type": "float32" - } - } - } - } - } -} + } +} \ No newline at end of file diff --git a/tests/example_specs/evolution_examples/2/output_manifest.json b/tests/example_specs/evolution_examples/2/output_manifest.json index a9d3851b2..fbd5f13f0 100644 --- a/tests/example_specs/evolution_examples/2/output_manifest.json +++ b/tests/example_specs/evolution_examples/2/output_manifest.json @@ -1,38 +1,38 @@ { - "metadata": { - "pipeline_name": "test_pipeline", - "base_path": "gs://bucket", - "run_id": "12345", - "component_id": "example_component" - }, - "index": { - "location": "/index/12345/example_component" - }, - "subsets": { - "images": { - "location": "/images/12345/example_component", - "fields": { - "width": { - "type": "int32" - }, - "height": { - "type": "int32" - }, - "data": { - "type": "binary" - } + "metadata":{ + "pipeline_name":"test_pipeline", + "base_path":"gs://bucket", + "run_id":"12345", + "component_id":"example_component" + }, + "index":{ + "location":"/test_pipeline/12345/example_component/index" + }, + "subsets":{ + "images":{ + "location":"/test_pipeline/12345/example_component/images", + "fields":{ + "width":{ + "type":"int32" + }, + "height":{ + "type":"int32" + }, + "data":{ + "type":"binary" + } + } + }, + "embeddings":{ + "location":"/test_pipeline/12345/example_component/embeddings", + "fields":{ + "data":{ + "type":"array", + "items":{ + "type":"float32" + } + } + } } - }, - "embeddings": { - "location": "/embeddings/12345/example_component", - "fields": { - "data": { - "type": "array", - "items": { - "type": "float32" - } - } - } - } - } -} + } +} \ No newline at end of file diff --git a/tests/example_specs/evolution_examples/3/output_manifest.json b/tests/example_specs/evolution_examples/3/output_manifest.json index cb510c3cf..f5ddfbfd5 100644 --- a/tests/example_specs/evolution_examples/3/output_manifest.json +++ b/tests/example_specs/evolution_examples/3/output_manifest.json @@ -1,32 +1,32 @@ { - "metadata": { - "pipeline_name": "test_pipeline", - "base_path": "gs://bucket", - "run_id": "12345", - "component_id": "example_component" - }, - "index": { - "location": "/index/12345/example_component" - }, - "subsets": { - "images": { - "location": "/images/12345/example_component", - "fields": { - "data": { - "type": "binary" - } + "metadata":{ + "pipeline_name":"test_pipeline", + "base_path":"gs://bucket", + "run_id":"12345", + "component_id":"example_component" + }, + "index":{ + "location":"/test_pipeline/12345/example_component/index" + }, + "subsets":{ + "images":{ + "location":"/test_pipeline/12345/example_component/images", + "fields":{ + "data":{ + "type":"binary" + } + } + }, + "embeddings":{ + "location":"/test_pipeline/12345/example_component/embeddings", + "fields":{ + "data":{ + "type":"array", + "items":{ + "type":"float32" + } + } + } } - }, - "embeddings": { - "location": "/embeddings/12345/example_component", - "fields": { - "data": { - "type": "array", - "items": { - "type": "float32" - } - } - } - } - } -} + } +} \ No newline at end of file diff --git a/tests/example_specs/evolution_examples/4/output_manifest.json b/tests/example_specs/evolution_examples/4/output_manifest.json index bf0c2a295..4b824a0fb 100644 --- a/tests/example_specs/evolution_examples/4/output_manifest.json +++ b/tests/example_specs/evolution_examples/4/output_manifest.json @@ -1,38 +1,38 @@ { - "metadata": { - "pipeline_name": "test_pipeline", - "base_path": "gs://bucket", - "run_id": "12345", - "component_id": "example_component" - }, - "index": { - "location": "/index/12345/example_component" - }, - "subsets": { - "images": { - "location": "/images/12345/example_component", - "fields": { - "width": { - "type": "int32" - }, - "height": { - "type": "int32" - }, - "data": { - "type": "binary" - }, - "encoding": { - "type": "string" - } + "metadata":{ + "pipeline_name":"test_pipeline", + "base_path":"gs://bucket", + "run_id":"12345", + "component_id":"example_component" + }, + "index":{ + "location":"/test_pipeline/12345/example_component/index" + }, + "subsets":{ + "images":{ + "location":"test_pipeline/12345/example_component/images", + "fields":{ + "width":{ + "type":"int32" + }, + "height":{ + "type":"int32" + }, + "data":{ + "type":"binary" + }, + "encoding":{ + "type":"string" + } + } + }, + "captions":{ + "location":"/test_pipeline/12345/example_component/captions", + "fields":{ + "data":{ + "type":"binary" + } + } } - }, - "captions": { - "location": "/captions/12345/example_component", - "fields": { - "data": { - "type": "binary" - } - } - } - } -} + } +} \ No newline at end of file diff --git a/tests/example_specs/evolution_examples/5/output_manifest.json b/tests/example_specs/evolution_examples/5/output_manifest.json index 0ed082ce8..4f9ee5604 100644 --- a/tests/example_specs/evolution_examples/5/output_manifest.json +++ b/tests/example_specs/evolution_examples/5/output_manifest.json @@ -1,29 +1,29 @@ { - "metadata": { - "pipeline_name": "test_pipeline", - "base_path": "gs://bucket", - "run_id": "12345", - "component_id": "example_component" - }, - "index": { - "location": "/index/12345/example_component" - }, - "subsets": { - "images": { - "location": "/images/12345/example_component", - "fields": { - "encoding": { - "type": "string" - } + "metadata":{ + "pipeline_name":"test_pipeline", + "base_path":"gs://bucket", + "run_id":"12345", + "component_id":"example_component" + }, + "index":{ + "location":"/test_pipeline/12345/example_component/index" + }, + "subsets":{ + "images":{ + "location":"test_pipeline/12345/example_component/images", + "fields":{ + "encoding":{ + "type":"string" + } + } + }, + "captions":{ + "location":"/test_pipeline/12345/example_component/captions", + "fields":{ + "data":{ + "type":"binary" + } + } } - }, - "captions": { - "location": "/captions/12345/example_component", - "fields": { - "data": { - "type": "binary" - } - } - } - } -} + } +} \ No newline at end of file diff --git a/tests/example_specs/evolution_examples/6/output_manifest.json b/tests/example_specs/evolution_examples/6/output_manifest.json index 9f8e814fa..57cb50734 100644 --- a/tests/example_specs/evolution_examples/6/output_manifest.json +++ b/tests/example_specs/evolution_examples/6/output_manifest.json @@ -1,21 +1,21 @@ { - "metadata": { - "pipeline_name": "test_pipeline", - "base_path": "gs://bucket", - "run_id": "12345", - "component_id": "example_component" - }, - "index": { - "location": "/index/12345/example_component" - }, - "subsets": { - "images": { - "location": "/images/12345/example_component", - "fields": { - "encoding": { - "type": "string" - } + "metadata":{ + "pipeline_name":"test_pipeline", + "base_path":"gs://bucket", + "run_id":"12345", + "component_id":"example_component" + }, + "index":{ + "location":"/test_pipeline/12345/example_component/index" + }, + "subsets":{ + "images":{ + "location":"test_pipeline/12345/example_component/images", + "fields":{ + "encoding":{ + "type":"string" + } + } } - } - } -} + } +} \ No newline at end of file diff --git a/tests/example_specs/evolution_examples/7/output_manifest.json b/tests/example_specs/evolution_examples/7/output_manifest.json index dec03420a..2ec76c1ae 100644 --- a/tests/example_specs/evolution_examples/7/output_manifest.json +++ b/tests/example_specs/evolution_examples/7/output_manifest.json @@ -1,21 +1,21 @@ { - "metadata": { - "pipeline_name": "test_pipeline", - "base_path": "gs://bucket", - "run_id": "12345", - "component_id": "example_component" - }, - "index": { - "location": "/index/12345/example_component" - }, - "subsets": { - "images": { - "location": "/images/12345/example_component", - "fields": { - "data": { - "type": "string" - } + "metadata":{ + "pipeline_name":"test_pipeline", + "base_path":"gs://bucket", + "run_id":"12345", + "component_id":"example_component" + }, + "index":{ + "location":"/test_pipeline/12345/example_component/index" + }, + "subsets":{ + "images":{ + "location":"test_pipeline/12345/example_component/images", + "fields":{ + "data":{ + "type":"string" + } + } } - } - } -} + } +} \ No newline at end of file diff --git a/tests/example_specs/evolution_examples/8/output_manifest.json b/tests/example_specs/evolution_examples/8/output_manifest.json index 657aeaadb..33e7bf1c0 100644 --- a/tests/example_specs/evolution_examples/8/output_manifest.json +++ b/tests/example_specs/evolution_examples/8/output_manifest.json @@ -6,11 +6,11 @@ "component_id": "example_component" }, "index": { - "location": "/index/12345/example_component" + "location": "/test_pipeline/12345/example_component/index" }, "subsets": { "images": { - "location": "/images/12345/example_component", + "location": "/test_pipeline/12345/example_component/images", "fields": { "width": { "type": "int32" @@ -24,7 +24,7 @@ } }, "captions": { - "location": "/captions/12345/example_component", + "location": "/test_pipeline/12345/example_component/captions", "fields": { "data": { "type": "binary" diff --git a/tests/example_specs/evolution_examples/input_manifest.json b/tests/example_specs/evolution_examples/input_manifest.json index 2d9910981..2ecf37243 100644 --- a/tests/example_specs/evolution_examples/input_manifest.json +++ b/tests/example_specs/evolution_examples/input_manifest.json @@ -1,35 +1,35 @@ { - "metadata": { - "pipeline_name": "test_pipeline", - "base_path": "gs://bucket", - "run_id": "12345", - "component_id": "67890" - }, - "index": { - "location": "/index/12345/example_component" - }, - "subsets": { - "images": { - "location": "/images/12345/example_component", - "fields": { - "width": { - "type": "int32" - }, - "height": { - "type": "int32" - }, - "data": { - "type": "binary" - } + "metadata":{ + "pipeline_name":"test_pipeline", + "base_path":"gs://bucket", + "run_id":"12345", + "component_id":"example_component" + }, + "index":{ + "location":"/test_pipeline/12345/example_component/index" + }, + "subsets":{ + "images":{ + "location":"/test_pipeline/12345/example_component/images", + "fields":{ + "width":{ + "type":"int32" + }, + "height":{ + "type":"int32" + }, + "data":{ + "type":"binary" + } + } + }, + "captions":{ + "location":"/test_pipeline/12345/example_component/captions", + "fields":{ + "data":{ + "type":"binary" + } + } } - }, - "captions": { - "location": "/captions/12345/example_component", - "fields": { - "data": { - "type": "binary" - } - } - } - } + } } \ No newline at end of file diff --git a/tests/test_compiler.py b/tests/test_compiler.py index fcef44247..d0aed1ee6 100644 --- a/tests/test_compiler.py +++ b/tests/test_compiler.py @@ -114,6 +114,7 @@ def test_docker_local_path(setup_pipeline, tmp_path_factory): with open(fn / "docker-compose.yml") as f_spec: spec = yaml.safe_load(f_spec) + expected_run_id = "test_pipeline-20230101000000" for name, service in spec["services"].items(): # check if volumes are defined correctly assert service["volumes"] == [ @@ -125,9 +126,9 @@ def test_docker_local_path(setup_pipeline, tmp_path_factory): ] # check if commands are patched to use the working dir commands_with_dir = [ - f"{work_dir}/{name}/manifest.json", + f"{work_dir}/{pipeline.name}/{expected_run_id}/{name}/manifest.json", f'{{"base_path": "{work_dir}", "pipeline_name": "{pipeline.name}",' - f' "run_id": "test_pipeline-20230101000000", "component_id": "{name}"}}', + f' "run_id": "{expected_run_id}", "component_id": "{name}"}}', ] for command in commands_with_dir: assert command in service["command"] @@ -147,14 +148,15 @@ def test_docker_remote_path(setup_pipeline, tmp_path_factory): with open(fn / "docker-compose.yml") as f_spec: spec = yaml.safe_load(f_spec) + expected_run_id = "test_pipeline-20230101000000" for name, service in spec["services"].items(): # check that no volumes are created assert service["volumes"] == [] # check if commands are patched to use the remote dir commands_with_dir = [ - f"{remote_dir}/{name}/manifest.json", + f"{remote_dir}/{pipeline.name}/{expected_run_id}/{name}/manifest.json", f'{{"base_path": "{remote_dir}", "pipeline_name": "{pipeline.name}",' - f' "run_id": "test_pipeline-20230101000000", "component_id": "{name}"}}', + f' "run_id": "{expected_run_id}", "component_id": "{name}"}}', ] for command in commands_with_dir: assert command in service["command"] diff --git a/tests/test_manifest.py b/tests/test_manifest.py index 922d70cfa..9abb979cf 100644 --- a/tests/test_manifest.py +++ b/tests/test_manifest.py @@ -136,10 +136,10 @@ def test_manifest_creation(): "run_id": run_id, "component_id": component_id, }, - "index": {"location": f"/index/{run_id}/{component_id}"}, + "index": {"location": f"/{pipeline_name}/{run_id}/{component_id}/index"}, "subsets": { "images": { - "location": f"/images/{run_id}/{component_id}", + "location": f"/{pipeline_name}/{run_id}/{component_id}/images", "fields": { "width": { "type": "int32", @@ -165,9 +165,8 @@ def test_manifest_repr(): ) assert ( manifest.__repr__() - == "Manifest({'metadata': {'base_path': '/', 'pipeline_name': 'NAME'," - " 'run_id': 'A', 'component_id': '1'}, 'index': {'location': '/index/A/1'}," - " 'subsets': {}})" + == "Manifest({'metadata': {'base_path': '/', 'pipeline_name': 'NAME', 'run_id': 'A'," + " 'component_id': '1'}, 'index': {'location': '/NAME/A/1/index'}, 'subsets': {}})" ) diff --git a/tests/test_manifest_evolution.py b/tests/test_manifest_evolution.py index 29afabe87..f07e5d498 100644 --- a/tests/test_manifest_evolution.py +++ b/tests/test_manifest_evolution.py @@ -46,5 +46,5 @@ def test_component_spec_location_update(): assert ( evolved_manifest._specification["subsets"]["images"]["location"] - == "/images/12345/example_component" + == "test_pipeline/12345/example_component/images" ) From 986e3a8183bb3492fd63b97a4e67e26e194656e7 Mon Sep 17 00:00:00 2001 From: Philippe Moussalli Date: Tue, 22 Aug 2023 14:44:52 +0200 Subject: [PATCH 5/6] revert back changes to compiler --- src/fondant/cli.py | 14 +++++----- src/fondant/compiler.py | 57 +++++++++++++++++++---------------------- src/fondant/pipeline.py | 6 +++++ tests/test_compiler.py | 30 ++++++++++------------ 4 files changed, 54 insertions(+), 53 deletions(-) diff --git a/src/fondant/cli.py b/src/fondant/cli.py index bc2e407e7..70c203b69 100644 --- a/src/fondant/cli.py +++ b/src/fondant/cli.py @@ -233,15 +233,16 @@ def register_compile(parent_parser): def compile(args): if args.local: - compiler = DockerCompiler(args.pipeline) + compiler = DockerCompiler() compiler.compile( + pipeline=args.pipeline, extra_volumes=args.extra_volumes, output_path=args.output_path, build_args=args.build_arg, ) elif args.kubeflow: - compiler = KubeFlowCompiler(args.pipeline) - compiler.compile(output_path=args.output_path) + compiler = KubeFlowCompiler() + compiler.compile(pipeline=args.pipeline, output_path=args.output_path) def register_run(parent_parser): @@ -305,8 +306,9 @@ def run(args): logging.info( "Found reference to un-compiled pipeline... compiling to {spec_ref}", ) - compiler = DockerCompiler(pipeline) + compiler = DockerCompiler() compiler.compile( + pipeline=pipeline, extra_volumes=args.extra_volumes, output_path=spec_ref, build_args=args.build_arg, @@ -326,8 +328,8 @@ def run(args): logging.info( "Found reference to un-compiled pipeline... compiling to {spec_ref}", ) - compiler = KubeFlowCompiler(pipeline) - compiler.compile(output_path=spec_ref) + compiler = KubeFlowCompiler() + compiler.compile(pipeline=pipeline, output_path=spec_ref) finally: runner = KubeflowRunner(host=args.host) runner.run(input_spec=spec_ref) diff --git a/src/fondant/compiler.py b/src/fondant/compiler.py index a5859ab21..5eaed0866 100644 --- a/src/fondant/compiler.py +++ b/src/fondant/compiler.py @@ -1,8 +1,7 @@ -import datetime import json import logging import typing as t -from abc import ABC +from abc import ABC, abstractmethod from dataclasses import asdict, dataclass from pathlib import Path @@ -15,17 +14,11 @@ class Compiler(ABC): - """Abstract base class for a pipeline compiler.""" + """Abstract base class for a compiler.""" - def __init__( - self, - pipeline: Pipeline, - ): - self.pipeline = pipeline - - def get_run_id(self) -> str: - timestamp = datetime.datetime.now().strftime("%Y%m%d%H%M%S") - return f"{self.pipeline.name}-{timestamp}" + @abstractmethod + def compile(self, *args, **kwargs) -> None: + """Abstract method to invoke compilation.""" @dataclass @@ -49,6 +42,8 @@ class DockerCompiler(Compiler): def compile( self, + pipeline: Pipeline, + *, output_path: str = "docker-compose.yml", extra_volumes: t.Optional[list] = None, build_args: t.Optional[t.List[str]] = None, @@ -56,6 +51,7 @@ def compile( """Compile a pipeline to docker-compose spec and save it to a specified output path. Args: + pipeline: the pipeline to compile output_path: the path where to save the docker-compose spec extra_volumes: a list of extra volumes (using the Short syntax: https://docs.docker.com/compose/compose-file/05-services/#short-syntax-5) @@ -65,8 +61,9 @@ def compile( if extra_volumes is None: extra_volumes = [] - logger.info(f"Compiling {self.pipeline.name} to {output_path}") + logger.info(f"Compiling {pipeline.name} to {output_path}") spec = self._generate_spec( + pipeline, extra_volumes=extra_volumes, build_args=build_args or [], ) @@ -107,23 +104,24 @@ def _patch_path(base_path: str) -> t.Tuple[str, t.Optional[DockerVolume]]: def _generate_spec( self, + pipeline: Pipeline, + *, extra_volumes: t.List[str], build_args: t.List[str], ) -> dict: """Generate a docker-compose spec as a python dictionary, loops over the pipeline graph to create services and their dependencies. """ - datetime.datetime.now().strftime("%Y%m%d%H%M%S") - path, volume = self._patch_path(base_path=self.pipeline.base_path) - run_id = self.get_run_id() + path, volume = self._patch_path(base_path=pipeline.base_path) + run_id = pipeline.get_run_id() services = {} - self.pipeline.validate(run_id=run_id) + pipeline.validate(run_id=run_id) - for component_name, component in self.pipeline._graph.items(): + for component_name, component in pipeline._graph.items(): metadata = Metadata( - pipeline_name=self.pipeline.name, + pipeline_name=pipeline.name, run_id=run_id, base_path=path, component_id=component_name, @@ -189,7 +187,7 @@ def _generate_spec( else: services[component_name]["image"] = component_op.component_spec.image return { - "name": self.pipeline.name, + "name": pipeline.name, "version": "3.8", "services": services, } @@ -198,8 +196,7 @@ def _generate_spec( class KubeFlowCompiler(Compiler): """Compiler that creates a Kubeflow pipeline spec from a pipeline.""" - def __init__(self, pipeline: Pipeline): - super().__init__(pipeline) + def __init__(self): self._resolve_imports() def _resolve_imports(self): @@ -217,28 +214,27 @@ def _resolve_imports(self): def compile( self, + pipeline: Pipeline, output_path: str = "kubeflow_pipeline.yml", ) -> None: """Compile a pipeline to Kubeflow pipeline spec and save it to a specified output path. Args: + pipeline: the pipeline to compile output_path: the path where to save the Kubeflow pipeline spec """ - run_id = self.get_run_id() + run_id = pipeline.get_run_id() - @self.kfp.dsl.pipeline( - name=self.pipeline.name, - description=self.pipeline.description, - ) + @self.kfp.dsl.pipeline(name=pipeline.name, description=pipeline.description) def kfp_pipeline(): previous_component_task = None manifest_path = "" - for component_name, component in self.pipeline._graph.items(): + for component_name, component in pipeline._graph.items(): metadata = Metadata( - pipeline_name=self.pipeline.name, + pipeline_name=pipeline.name, run_id=run_id, - base_path=self.pipeline.base_path, + base_path=pipeline.base_path, component_id=component_name, ) @@ -274,6 +270,7 @@ def kfp_pipeline(): previous_component_task = component_task + self.pipeline = pipeline self.pipeline.validate(run_id=run_id) logger.info(f"Compiling {self.pipeline.name} to {output_path}") diff --git a/src/fondant/pipeline.py b/src/fondant/pipeline.py index 94ca8fbc0..a82bb09ea 100644 --- a/src/fondant/pipeline.py +++ b/src/fondant/pipeline.py @@ -1,4 +1,5 @@ """This module defines classes to represent a Fondant Pipeline.""" +import datetime import hashlib import json import logging @@ -278,6 +279,11 @@ def _validate_pipeline_name(pipeline_name: str) -> str: raise InvalidPipelineDefinition(msg) return pipeline_name + def get_run_id(self) -> str: + """Get a unique run ID for the pipeline.""" + timestamp = datetime.datetime.now().strftime("%Y%m%d%H%M%S") + return f"{self.name}-{timestamp}" + def validate(self, run_id: str): """Sort and run validation on the pipeline definition. diff --git a/tests/test_compiler.py b/tests/test_compiler.py index fcef44247..af15e58f8 100644 --- a/tests/test_compiler.py +++ b/tests/test_compiler.py @@ -88,10 +88,10 @@ def setup_pipeline(request, tmp_path, monkeypatch): def test_docker_compiler(setup_pipeline, tmp_path_factory): """Test compiling a pipeline to docker-compose.""" example_dir, pipeline = setup_pipeline - compiler = DockerCompiler(pipeline) + compiler = DockerCompiler() with tmp_path_factory.mktemp("temp") as fn: output_path = str(fn / "docker-compose.yml") - compiler.compile(output_path=output_path, build_args=[]) + compiler.compile(pipeline=pipeline, output_path=output_path, build_args=[]) with open(output_path) as src, open( VALID_PIPELINE / example_dir / "docker-compose.yml", ) as truth: @@ -107,8 +107,8 @@ def test_docker_local_path(setup_pipeline, tmp_path_factory): _, pipeline = setup_pipeline work_dir = f"/{fn.stem}" pipeline.base_path = str(fn) - compiler = DockerCompiler(pipeline) - compiler.compile(output_path=fn / "docker-compose.yml") + compiler = DockerCompiler() + compiler.compile(pipeline=pipeline, output_path=fn / "docker-compose.yml") # read the generated docker-compose file with open(fn / "docker-compose.yml") as f_spec: @@ -139,9 +139,9 @@ def test_docker_remote_path(setup_pipeline, tmp_path_factory): _, pipeline = setup_pipeline remote_dir = "gs://somebucket/artifacts" pipeline.base_path = remote_dir - compiler = DockerCompiler(pipeline) + compiler = DockerCompiler() with tmp_path_factory.mktemp("temp") as fn: - compiler.compile(output_path=fn / "docker-compose.yml") + compiler.compile(pipeline=pipeline, output_path=fn / "docker-compose.yml") # read the generated docker-compose file with open(fn / "docker-compose.yml") as f_spec: @@ -167,10 +167,11 @@ def test_docker_extra_volumes(setup_pipeline, tmp_path_factory): # this is the directory mounted in the container _, pipeline = setup_pipeline pipeline.base_path = str(fn) - compiler = DockerCompiler(pipeline) + compiler = DockerCompiler() # define some extra volumes to be mounted extra_volumes = ["hello:there", "general:kenobi"] compiler.compile( + pipeline=pipeline, output_path=fn / "docker-compose.yml", extra_volumes=extra_volumes, ) @@ -188,10 +189,10 @@ def test_docker_extra_volumes(setup_pipeline, tmp_path_factory): def test_kubeflow_compiler(setup_pipeline, tmp_path_factory): """Test compiling a pipeline to kubeflow.""" example_dir, pipeline = setup_pipeline - compiler = KubeFlowCompiler(pipeline) + compiler = KubeFlowCompiler() with tmp_path_factory.mktemp("temp") as fn: output_path = str(fn / "kubeflow_pipeline.yml") - compiler.compile(output_path=output_path) + compiler.compile(pipeline=pipeline, output_path=output_path) with open(output_path) as src, open( VALID_PIPELINE / example_dir / "kubeflow_pipeline.yml", ) as truth: @@ -214,10 +215,10 @@ def test_kubeflow_configuration(tmp_path_factory): number_of_gpus=1, ) pipeline.add_op(component_1) - compiler = KubeFlowCompiler(pipeline) + compiler = KubeFlowCompiler() with tmp_path_factory.mktemp("temp") as fn: output_path = str(fn / "kubeflow_pipeline.yml") - compiler.compile(output_path=output_path) + compiler.compile(pipeline=pipeline, output_path=output_path) with open(output_path) as src, open( VALID_PIPELINE / "kubeflow_pipeline.yml", ) as truth: @@ -226,13 +227,8 @@ def test_kubeflow_configuration(tmp_path_factory): def test_kfp_import(): """Test that the kfp import throws the correct error.""" - pipeline = Pipeline( - pipeline_name="test_pipeline", - pipeline_description="description of the test pipeline", - base_path="/foo/bar", - ) with mock.patch.dict(sys.modules): # remove kfp from the modules sys.modules["kfp"] = None with pytest.raises(ImportError): - _ = KubeFlowCompiler(pipeline) + _ = KubeFlowCompiler() From f75838a005414daec97881443a46a2c55e7e6255 Mon Sep 17 00:00:00 2001 From: Philippe Moussalli Date: Tue, 22 Aug 2023 15:13:14 +0200 Subject: [PATCH 6/6] fix manifest save path --- src/fondant/executor.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/fondant/executor.py b/src/fondant/executor.py index e89b7d134..1725cb9ba 100644 --- a/src/fondant/executor.py +++ b/src/fondant/executor.py @@ -213,7 +213,7 @@ def upload_manifest(self, manifest: Manifest, save_path: t.Union[str, Path]): if is_kubeflow_output: # Save to the expected base path directory save_path_base_path = ( - f"{manifest.pipeline_name}/{manifest.run_id}/" + f"{manifest.base_path}/{manifest.pipeline_name}/{manifest.run_id}/" f"{manifest.component_id}/manifest.json" ) Path(save_path_base_path).parent.mkdir(parents=True, exist_ok=True)