diff --git a/src/fondant/compiler.py b/src/fondant/compiler.py index 5eaed0866..50b2409b2 100644 --- a/src/fondant/compiler.py +++ b/src/fondant/compiler.py @@ -138,7 +138,8 @@ def _generate_spec( command.extend( [ "--output_manifest_path", - f"{path}/{component_name}/manifest.json", + f"{path}/{metadata.pipeline_name}/{metadata.run_id}/" + f"{component_name}/manifest.json", ], ) @@ -160,7 +161,8 @@ def _generate_spec( command.extend( [ "--input_manifest_path", - f"{path}/{dependency}/manifest.json", + f"{path}/{metadata.pipeline_name}/{metadata.run_id}/" + f"{dependency}/manifest.json", ], ) diff --git a/src/fondant/executor.py b/src/fondant/executor.py index 9b6bc4f34..1725cb9ba 100644 --- a/src/fondant/executor.py +++ b/src/fondant/executor.py @@ -212,9 +212,9 @@ def upload_manifest(self, manifest: Manifest, save_path: t.Union[str, Path]): if is_kubeflow_output: # Save to the expected base path directory - safe_component_name = self.spec.name.replace(" ", "_").lower() save_path_base_path = ( - f"{manifest.base_path}/{safe_component_name}/manifest.json" + f"{manifest.base_path}/{manifest.pipeline_name}/{manifest.run_id}/" + f"{manifest.component_id}/manifest.json" ) Path(save_path_base_path).parent.mkdir(parents=True, exist_ok=True) manifest.to_file(save_path_base_path) diff --git a/src/fondant/manifest.py b/src/fondant/manifest.py index 448c21865..c0ba456dd 100644 --- a/src/fondant/manifest.py +++ b/src/fondant/manifest.py @@ -158,7 +158,7 @@ def create( specification = { "metadata": metadata.to_dict(), - "index": {"location": f"/index/{run_id}/{component_id}"}, + "index": {"location": f"/{pipeline_name}/{run_id}/{component_id}/index"}, "subsets": {}, } return cls(specification) @@ -226,7 +226,7 @@ def add_subset( raise ValueError(msg) self._specification["subsets"][name] = { - "location": f"/{name}/{self.run_id}/{self.component_id}", + "location": f"/{self.pipeline_name}/{self.run_id}/{self.component_id}/{name}", "fields": {name: type_.to_json() for name, type_ in fields}, } @@ -254,7 +254,7 @@ def evolve( # noqa : PLR0912 (too many branches) # Update index location as this is currently always rewritten evolved_manifest.index._specification[ "location" - ] = f"/index/{self.run_id}/{component_id}" + ] = f"/{self.pipeline_name}/{self.run_id}/{component_id}/index" # If additionalSubsets is False in consumes, # Remove all subsets from the manifest that are not listed @@ -305,7 +305,7 @@ def evolve( # noqa : PLR0912 (too many branches) # Update subset location as this is currently always rewritten evolved_manifest.subsets[subset_name]._specification[ "location" - ] = f"/{subset_name}/{self.run_id}/{component_id}" + ] = f"{self.pipeline_name}/{self.run_id}/{component_id}/{subset_name}" # Subset is not yet in manifest, add it else: diff --git a/tests/example_pipelines/compiled_pipeline/example_1/docker-compose.yml b/tests/example_pipelines/compiled_pipeline/example_1/docker-compose.yml index fc39ca80d..3146e9d67 100644 --- a/tests/example_pipelines/compiled_pipeline/example_1/docker-compose.yml +++ b/tests/example_pipelines/compiled_pipeline/example_1/docker-compose.yml @@ -9,7 +9,7 @@ services: - '{"base_path": "/foo/bar", "pipeline_name": "test_pipeline", "run_id": "test_pipeline-20230101000000", "component_id": "first_component"}' - --output_manifest_path - - /foo/bar/first_component/manifest.json + - /foo/bar/test_pipeline/test_pipeline-20230101000000/first_component/manifest.json - --storage_args - a dummy string arg - --input_partition_rows @@ -30,7 +30,7 @@ services: - '{"base_path": "/foo/bar", "pipeline_name": "test_pipeline", "run_id": "test_pipeline-20230101000000", "component_id": "second_component"}' - --output_manifest_path - - /foo/bar/second_component/manifest.json + - /foo/bar/test_pipeline/test_pipeline-20230101000000/second_component/manifest.json - --storage_args - a dummy string arg - --input_partition_rows @@ -42,7 +42,7 @@ services: "array", "items": {"type": "float32"}}}}}, "args": {"storage_args": {"description": "Storage arguments", "type": "str"}}}' - --input_manifest_path - - /foo/bar/first_component/manifest.json + - /foo/bar/test_pipeline/test_pipeline-20230101000000/first_component/manifest.json depends_on: first_component: condition: service_completed_successfully @@ -56,7 +56,7 @@ services: - '{"base_path": "/foo/bar", "pipeline_name": "test_pipeline", "run_id": "test_pipeline-20230101000000", "component_id": "third_component"}' - --output_manifest_path - - /foo/bar/third_component/manifest.json + - /foo/bar/test_pipeline/test_pipeline-20230101000000/third_component/manifest.json - --storage_args - a dummy string arg - --input_partition_rows @@ -70,7 +70,7 @@ services: false}, "args": {"storage_args": {"description": "Storage arguments", "type": "str"}}}' - --input_manifest_path - - /foo/bar/second_component/manifest.json + - /foo/bar/test_pipeline/test_pipeline-20230101000000/second_component/manifest.json depends_on: second_component: condition: service_completed_successfully diff --git a/tests/example_pipelines/compiled_pipeline/example_2/docker-compose.yml b/tests/example_pipelines/compiled_pipeline/example_2/docker-compose.yml index b84fa6d69..f3178ebe2 100644 --- a/tests/example_pipelines/compiled_pipeline/example_2/docker-compose.yml +++ b/tests/example_pipelines/compiled_pipeline/example_2/docker-compose.yml @@ -9,7 +9,7 @@ services: - '{"base_path": "/foo/bar", "pipeline_name": "test_pipeline", "run_id": "test_pipeline-20230101000000", "component_id": "first_component"}' - --output_manifest_path - - /foo/bar/first_component/manifest.json + - /foo/bar/test_pipeline/test_pipeline-20230101000000/first_component/manifest.json - --storage_args - a dummy string arg - --input_partition_rows @@ -27,7 +27,7 @@ services: - '{"base_path": "/foo/bar", "pipeline_name": "test_pipeline", "run_id": "test_pipeline-20230101000000", "component_id": "image_cropping"}' - --output_manifest_path - - /foo/bar/image_cropping/manifest.json + - /foo/bar/test_pipeline/test_pipeline-20230101000000/image_cropping/manifest.json - --cropping_threshold - '0' - --padding @@ -46,7 +46,7 @@ services: for the image cropping. The padding is added to all borders of the image.", "type": "int", "default": 10}}}' - --input_manifest_path - - /foo/bar/first_component/manifest.json + - /foo/bar/test_pipeline/test_pipeline-20230101000000/first_component/manifest.json depends_on: first_component: condition: service_completed_successfully diff --git a/tests/example_specs/evolution_examples/1/output_manifest.json b/tests/example_specs/evolution_examples/1/output_manifest.json index 2694368d7..6981a9643 100644 --- a/tests/example_specs/evolution_examples/1/output_manifest.json +++ b/tests/example_specs/evolution_examples/1/output_manifest.json @@ -1,46 +1,46 @@ { - "metadata": { - "pipeline_name": "test_pipeline", - "base_path": "gs://bucket", - "run_id": "12345", - "component_id": "example_component" - }, - "index": { - "location": "/index/12345/example_component" - }, - "subsets": { - "images": { - "location": "/images/12345/example_component", - "fields": { - "width": { - "type": "int32" - }, - "height": { - "type": "int32" - }, - "data": { - "type": "binary" - } + "metadata":{ + "pipeline_name":"test_pipeline", + "base_path":"gs://bucket", + "run_id":"12345", + "component_id":"example_component" + }, + "index":{ + "location":"/test_pipeline/12345/example_component/index" + }, + "subsets":{ + "images":{ + "location":"/test_pipeline/12345/example_component/images", + "fields":{ + "width":{ + "type":"int32" + }, + "height":{ + "type":"int32" + }, + "data":{ + "type":"binary" + } + } + }, + "captions":{ + "location":"/test_pipeline/12345/example_component/captions", + "fields":{ + "data":{ + "type":"binary" + } + } + }, + "embeddings":{ + "location":"/test_pipeline/12345/example_component/embeddings", + "fields":{ + "data":{ + "type":"array", + "items":{ + "type":"float32" + } + } + } } - }, - "captions": { - "location": "/captions/12345/example_component", - "fields": { - "data": { - "type": "binary" - } - } - }, - "embeddings": { - "location": "/embeddings/12345/example_component", - "fields": { - "data": { - "type": "array", - "items": { - "type": "float32" - } - } - } - } - } -} + } +} \ No newline at end of file diff --git a/tests/example_specs/evolution_examples/2/output_manifest.json b/tests/example_specs/evolution_examples/2/output_manifest.json index a9d3851b2..fbd5f13f0 100644 --- a/tests/example_specs/evolution_examples/2/output_manifest.json +++ b/tests/example_specs/evolution_examples/2/output_manifest.json @@ -1,38 +1,38 @@ { - "metadata": { - "pipeline_name": "test_pipeline", - "base_path": "gs://bucket", - "run_id": "12345", - "component_id": "example_component" - }, - "index": { - "location": "/index/12345/example_component" - }, - "subsets": { - "images": { - "location": "/images/12345/example_component", - "fields": { - "width": { - "type": "int32" - }, - "height": { - "type": "int32" - }, - "data": { - "type": "binary" - } + "metadata":{ + "pipeline_name":"test_pipeline", + "base_path":"gs://bucket", + "run_id":"12345", + "component_id":"example_component" + }, + "index":{ + "location":"/test_pipeline/12345/example_component/index" + }, + "subsets":{ + "images":{ + "location":"/test_pipeline/12345/example_component/images", + "fields":{ + "width":{ + "type":"int32" + }, + "height":{ + "type":"int32" + }, + "data":{ + "type":"binary" + } + } + }, + "embeddings":{ + "location":"/test_pipeline/12345/example_component/embeddings", + "fields":{ + "data":{ + "type":"array", + "items":{ + "type":"float32" + } + } + } } - }, - "embeddings": { - "location": "/embeddings/12345/example_component", - "fields": { - "data": { - "type": "array", - "items": { - "type": "float32" - } - } - } - } - } -} + } +} \ No newline at end of file diff --git a/tests/example_specs/evolution_examples/3/output_manifest.json b/tests/example_specs/evolution_examples/3/output_manifest.json index cb510c3cf..f5ddfbfd5 100644 --- a/tests/example_specs/evolution_examples/3/output_manifest.json +++ b/tests/example_specs/evolution_examples/3/output_manifest.json @@ -1,32 +1,32 @@ { - "metadata": { - "pipeline_name": "test_pipeline", - "base_path": "gs://bucket", - "run_id": "12345", - "component_id": "example_component" - }, - "index": { - "location": "/index/12345/example_component" - }, - "subsets": { - "images": { - "location": "/images/12345/example_component", - "fields": { - "data": { - "type": "binary" - } + "metadata":{ + "pipeline_name":"test_pipeline", + "base_path":"gs://bucket", + "run_id":"12345", + "component_id":"example_component" + }, + "index":{ + "location":"/test_pipeline/12345/example_component/index" + }, + "subsets":{ + "images":{ + "location":"/test_pipeline/12345/example_component/images", + "fields":{ + "data":{ + "type":"binary" + } + } + }, + "embeddings":{ + "location":"/test_pipeline/12345/example_component/embeddings", + "fields":{ + "data":{ + "type":"array", + "items":{ + "type":"float32" + } + } + } } - }, - "embeddings": { - "location": "/embeddings/12345/example_component", - "fields": { - "data": { - "type": "array", - "items": { - "type": "float32" - } - } - } - } - } -} + } +} \ No newline at end of file diff --git a/tests/example_specs/evolution_examples/4/output_manifest.json b/tests/example_specs/evolution_examples/4/output_manifest.json index bf0c2a295..4b824a0fb 100644 --- a/tests/example_specs/evolution_examples/4/output_manifest.json +++ b/tests/example_specs/evolution_examples/4/output_manifest.json @@ -1,38 +1,38 @@ { - "metadata": { - "pipeline_name": "test_pipeline", - "base_path": "gs://bucket", - "run_id": "12345", - "component_id": "example_component" - }, - "index": { - "location": "/index/12345/example_component" - }, - "subsets": { - "images": { - "location": "/images/12345/example_component", - "fields": { - "width": { - "type": "int32" - }, - "height": { - "type": "int32" - }, - "data": { - "type": "binary" - }, - "encoding": { - "type": "string" - } + "metadata":{ + "pipeline_name":"test_pipeline", + "base_path":"gs://bucket", + "run_id":"12345", + "component_id":"example_component" + }, + "index":{ + "location":"/test_pipeline/12345/example_component/index" + }, + "subsets":{ + "images":{ + "location":"test_pipeline/12345/example_component/images", + "fields":{ + "width":{ + "type":"int32" + }, + "height":{ + "type":"int32" + }, + "data":{ + "type":"binary" + }, + "encoding":{ + "type":"string" + } + } + }, + "captions":{ + "location":"/test_pipeline/12345/example_component/captions", + "fields":{ + "data":{ + "type":"binary" + } + } } - }, - "captions": { - "location": "/captions/12345/example_component", - "fields": { - "data": { - "type": "binary" - } - } - } - } -} + } +} \ No newline at end of file diff --git a/tests/example_specs/evolution_examples/5/output_manifest.json b/tests/example_specs/evolution_examples/5/output_manifest.json index 0ed082ce8..4f9ee5604 100644 --- a/tests/example_specs/evolution_examples/5/output_manifest.json +++ b/tests/example_specs/evolution_examples/5/output_manifest.json @@ -1,29 +1,29 @@ { - "metadata": { - "pipeline_name": "test_pipeline", - "base_path": "gs://bucket", - "run_id": "12345", - "component_id": "example_component" - }, - "index": { - "location": "/index/12345/example_component" - }, - "subsets": { - "images": { - "location": "/images/12345/example_component", - "fields": { - "encoding": { - "type": "string" - } + "metadata":{ + "pipeline_name":"test_pipeline", + "base_path":"gs://bucket", + "run_id":"12345", + "component_id":"example_component" + }, + "index":{ + "location":"/test_pipeline/12345/example_component/index" + }, + "subsets":{ + "images":{ + "location":"test_pipeline/12345/example_component/images", + "fields":{ + "encoding":{ + "type":"string" + } + } + }, + "captions":{ + "location":"/test_pipeline/12345/example_component/captions", + "fields":{ + "data":{ + "type":"binary" + } + } } - }, - "captions": { - "location": "/captions/12345/example_component", - "fields": { - "data": { - "type": "binary" - } - } - } - } -} + } +} \ No newline at end of file diff --git a/tests/example_specs/evolution_examples/6/output_manifest.json b/tests/example_specs/evolution_examples/6/output_manifest.json index 9f8e814fa..57cb50734 100644 --- a/tests/example_specs/evolution_examples/6/output_manifest.json +++ b/tests/example_specs/evolution_examples/6/output_manifest.json @@ -1,21 +1,21 @@ { - "metadata": { - "pipeline_name": "test_pipeline", - "base_path": "gs://bucket", - "run_id": "12345", - "component_id": "example_component" - }, - "index": { - "location": "/index/12345/example_component" - }, - "subsets": { - "images": { - "location": "/images/12345/example_component", - "fields": { - "encoding": { - "type": "string" - } + "metadata":{ + "pipeline_name":"test_pipeline", + "base_path":"gs://bucket", + "run_id":"12345", + "component_id":"example_component" + }, + "index":{ + "location":"/test_pipeline/12345/example_component/index" + }, + "subsets":{ + "images":{ + "location":"test_pipeline/12345/example_component/images", + "fields":{ + "encoding":{ + "type":"string" + } + } } - } - } -} + } +} \ No newline at end of file diff --git a/tests/example_specs/evolution_examples/7/output_manifest.json b/tests/example_specs/evolution_examples/7/output_manifest.json index dec03420a..2ec76c1ae 100644 --- a/tests/example_specs/evolution_examples/7/output_manifest.json +++ b/tests/example_specs/evolution_examples/7/output_manifest.json @@ -1,21 +1,21 @@ { - "metadata": { - "pipeline_name": "test_pipeline", - "base_path": "gs://bucket", - "run_id": "12345", - "component_id": "example_component" - }, - "index": { - "location": "/index/12345/example_component" - }, - "subsets": { - "images": { - "location": "/images/12345/example_component", - "fields": { - "data": { - "type": "string" - } + "metadata":{ + "pipeline_name":"test_pipeline", + "base_path":"gs://bucket", + "run_id":"12345", + "component_id":"example_component" + }, + "index":{ + "location":"/test_pipeline/12345/example_component/index" + }, + "subsets":{ + "images":{ + "location":"test_pipeline/12345/example_component/images", + "fields":{ + "data":{ + "type":"string" + } + } } - } - } -} + } +} \ No newline at end of file diff --git a/tests/example_specs/evolution_examples/8/output_manifest.json b/tests/example_specs/evolution_examples/8/output_manifest.json index 657aeaadb..33e7bf1c0 100644 --- a/tests/example_specs/evolution_examples/8/output_manifest.json +++ b/tests/example_specs/evolution_examples/8/output_manifest.json @@ -6,11 +6,11 @@ "component_id": "example_component" }, "index": { - "location": "/index/12345/example_component" + "location": "/test_pipeline/12345/example_component/index" }, "subsets": { "images": { - "location": "/images/12345/example_component", + "location": "/test_pipeline/12345/example_component/images", "fields": { "width": { "type": "int32" @@ -24,7 +24,7 @@ } }, "captions": { - "location": "/captions/12345/example_component", + "location": "/test_pipeline/12345/example_component/captions", "fields": { "data": { "type": "binary" diff --git a/tests/example_specs/evolution_examples/input_manifest.json b/tests/example_specs/evolution_examples/input_manifest.json index 2d9910981..2ecf37243 100644 --- a/tests/example_specs/evolution_examples/input_manifest.json +++ b/tests/example_specs/evolution_examples/input_manifest.json @@ -1,35 +1,35 @@ { - "metadata": { - "pipeline_name": "test_pipeline", - "base_path": "gs://bucket", - "run_id": "12345", - "component_id": "67890" - }, - "index": { - "location": "/index/12345/example_component" - }, - "subsets": { - "images": { - "location": "/images/12345/example_component", - "fields": { - "width": { - "type": "int32" - }, - "height": { - "type": "int32" - }, - "data": { - "type": "binary" - } + "metadata":{ + "pipeline_name":"test_pipeline", + "base_path":"gs://bucket", + "run_id":"12345", + "component_id":"example_component" + }, + "index":{ + "location":"/test_pipeline/12345/example_component/index" + }, + "subsets":{ + "images":{ + "location":"/test_pipeline/12345/example_component/images", + "fields":{ + "width":{ + "type":"int32" + }, + "height":{ + "type":"int32" + }, + "data":{ + "type":"binary" + } + } + }, + "captions":{ + "location":"/test_pipeline/12345/example_component/captions", + "fields":{ + "data":{ + "type":"binary" + } + } } - }, - "captions": { - "location": "/captions/12345/example_component", - "fields": { - "data": { - "type": "binary" - } - } - } - } + } } \ No newline at end of file diff --git a/tests/test_compiler.py b/tests/test_compiler.py index af15e58f8..a4ab23fd0 100644 --- a/tests/test_compiler.py +++ b/tests/test_compiler.py @@ -114,6 +114,7 @@ def test_docker_local_path(setup_pipeline, tmp_path_factory): with open(fn / "docker-compose.yml") as f_spec: spec = yaml.safe_load(f_spec) + expected_run_id = "test_pipeline-20230101000000" for name, service in spec["services"].items(): # check if volumes are defined correctly assert service["volumes"] == [ @@ -125,9 +126,9 @@ def test_docker_local_path(setup_pipeline, tmp_path_factory): ] # check if commands are patched to use the working dir commands_with_dir = [ - f"{work_dir}/{name}/manifest.json", + f"{work_dir}/{pipeline.name}/{expected_run_id}/{name}/manifest.json", f'{{"base_path": "{work_dir}", "pipeline_name": "{pipeline.name}",' - f' "run_id": "test_pipeline-20230101000000", "component_id": "{name}"}}', + f' "run_id": "{expected_run_id}", "component_id": "{name}"}}', ] for command in commands_with_dir: assert command in service["command"] @@ -147,14 +148,15 @@ def test_docker_remote_path(setup_pipeline, tmp_path_factory): with open(fn / "docker-compose.yml") as f_spec: spec = yaml.safe_load(f_spec) + expected_run_id = "test_pipeline-20230101000000" for name, service in spec["services"].items(): # check that no volumes are created assert service["volumes"] == [] # check if commands are patched to use the remote dir commands_with_dir = [ - f"{remote_dir}/{name}/manifest.json", + f"{remote_dir}/{pipeline.name}/{expected_run_id}/{name}/manifest.json", f'{{"base_path": "{remote_dir}", "pipeline_name": "{pipeline.name}",' - f' "run_id": "test_pipeline-20230101000000", "component_id": "{name}"}}', + f' "run_id": "{expected_run_id}", "component_id": "{name}"}}', ] for command in commands_with_dir: assert command in service["command"] diff --git a/tests/test_manifest.py b/tests/test_manifest.py index 922d70cfa..9abb979cf 100644 --- a/tests/test_manifest.py +++ b/tests/test_manifest.py @@ -136,10 +136,10 @@ def test_manifest_creation(): "run_id": run_id, "component_id": component_id, }, - "index": {"location": f"/index/{run_id}/{component_id}"}, + "index": {"location": f"/{pipeline_name}/{run_id}/{component_id}/index"}, "subsets": { "images": { - "location": f"/images/{run_id}/{component_id}", + "location": f"/{pipeline_name}/{run_id}/{component_id}/images", "fields": { "width": { "type": "int32", @@ -165,9 +165,8 @@ def test_manifest_repr(): ) assert ( manifest.__repr__() - == "Manifest({'metadata': {'base_path': '/', 'pipeline_name': 'NAME'," - " 'run_id': 'A', 'component_id': '1'}, 'index': {'location': '/index/A/1'}," - " 'subsets': {}})" + == "Manifest({'metadata': {'base_path': '/', 'pipeline_name': 'NAME', 'run_id': 'A'," + " 'component_id': '1'}, 'index': {'location': '/NAME/A/1/index'}, 'subsets': {}})" ) diff --git a/tests/test_manifest_evolution.py b/tests/test_manifest_evolution.py index 29afabe87..f07e5d498 100644 --- a/tests/test_manifest_evolution.py +++ b/tests/test_manifest_evolution.py @@ -46,5 +46,5 @@ def test_component_spec_location_update(): assert ( evolved_manifest._specification["subsets"]["images"]["location"] - == "/images/12345/example_component" + == "test_pipeline/12345/example_component/images" )