From 72f995889d1c830d804f436d1eb0e4b5f33269bc Mon Sep 17 00:00:00 2001
From: Philippe Moussalli <philippe.moussalli95@gmail.com>
Date: Thu, 27 Jul 2023 13:36:07 +0200
Subject: [PATCH] Unify manifest save path (#322)

Related to #313


The local and remote runner store the manifest in different locations:

* For the local runner, the manifest save path is set at compile time to
a fixed path that we specify
`{base_path}/{component_name}/manifest.json"`

* For the remote runner (kfp), the `output_manifest_path` is set as an
output artifact type. This is needed for chaining component together.
The save path in this case is save internally within the VM
`/tmp/outputs/output_manifest_path/data` and then mapped to minio
storage after the component run (which then gets mapped to a cloud
storage). The mapping saves the artifact to the specified base path
followed by a fixed file structure that cannot be changed and also
stored. It is also stored as zip file which contains the written text
file (manifest)

Example:
```
minio://soy-audio-379412_kfp-artifacts/artifacts/datacomp-filtering-pipeline-wvglp/2023/07/26/datacomp-filtering-pipeline-wvglp-3788902962/load-from-hub-output_manifest_path.tgz
```

where `/soy-audio-379412_kfp-artifacts` is the artifact bucket specified
when deploying KFP.

This PR unifies the manifest save path for both local and remote runner.
It checks whether the `save_path` matches the expected kubeflow path and
if so, save it both to the expected kfp artifact path (needed for
chaining component) and the custom path that we require for caching.

It's not the most optimal solution since we're writing the file twice
but I don't see any other clear cut solution.

@ChristiaensBert I think this might also fix some issues with the data
explorer.
---
 src/fondant/executor.py | 33 +++++++++++++++++++++++++++++++--
 1 file changed, 31 insertions(+), 2 deletions(-)

diff --git a/src/fondant/executor.py b/src/fondant/executor.py
index a2afd78c9..778fd7011 100644
--- a/src/fondant/executor.py
+++ b/src/fondant/executor.py
@@ -205,8 +205,37 @@ def execute(self, component_cls: t.Type[Component]) -> None:
         self.upload_manifest(output_manifest, save_path=self.output_manifest_path)
 
     def upload_manifest(self, manifest: Manifest, save_path: t.Union[str, Path]):
-        Path(save_path).parent.mkdir(parents=True, exist_ok=True)
-        manifest.to_file(save_path)
+        """
+        Uploads the manifest to the specified destination.
+
+        If the save_path points to the kubeflow output artifact temporary path,
+        it will be saved both in a specific base path and the native kfp artifact path.
+
+        Args:
+            manifest: The Manifest object to be uploaded.
+            save_path: The path where the Manifest object will be saved.
+
+        """
+        is_kubeflow_output = (
+            str(save_path) == "/tmp/outputs/output_manifest_path/data"  # nosec
+        )
+
+        if is_kubeflow_output:
+            # Save to the expected base path directory
+            safe_component_name = self.spec.name.replace(" ", "_").lower()
+            base_path = self.metadata["base_path"]
+            save_path_base_path = f"{base_path}/{safe_component_name}/manifest.json"
+            Path(save_path_base_path).parent.mkdir(parents=True, exist_ok=True)
+            manifest.to_file(save_path_base_path)
+            logger.info(f"Saving output manifest to {save_path_base_path}")
+            # Write manifest to the native kfp artifact path that will be passed as an artifact
+            # and read by the next component
+            manifest.to_file(save_path)
+        else:
+            # Local runner
+            Path(save_path).parent.mkdir(parents=True, exist_ok=True)
+            manifest.to_file(save_path)
+            logger.info(f"Saving output manifest to {save_path}")
 
 
 class DaskLoadExecutor(Executor[DaskLoadComponent]):