Skip to content

Commit

Permalink
Unify manifest save path (#322)
Browse files Browse the repository at this point in the history
Related to #313 


The local and remote runner store the manifest in different locations: 

* For the local runner, the manifest save path is set at compile time to
a fixed path that we specify
`{base_path}/{component_name}/manifest.json"`

* For the remote runner (kfp), the `output_manifest_path` is set as an
output artifact type. This is needed for chaining component together.
The save path in this case is save internally within the VM
`/tmp/outputs/output_manifest_path/data` and then mapped to minio
storage after the component run (which then gets mapped to a cloud
storage). The mapping saves the artifact to the specified base path
followed by a fixed file structure that cannot be changed and also
stored. It is also stored as zip file which contains the written text
file (manifest)

Example:
```
minio://soy-audio-379412_kfp-artifacts/artifacts/datacomp-filtering-pipeline-wvglp/2023/07/26/datacomp-filtering-pipeline-wvglp-3788902962/load-from-hub-output_manifest_path.tgz
```

where `/soy-audio-379412_kfp-artifacts` is the artifact bucket specified
when deploying KFP.

This PR unifies the manifest save path for both local and remote runner.
It checks whether the `save_path` matches the expected kubeflow path and
if so, save it both to the expected kfp artifact path (needed for
chaining component) and the custom path that we require for caching.

It's not the most optimal solution since we're writing the file twice
but I don't see any other clear cut solution.

@ChristiaensBert I think this might also fix some issues with the data
explorer.
  • Loading branch information
PhilippeMoussalli authored Jul 27, 2023
1 parent c7e8da0 commit f6256c8
Showing 1 changed file with 31 additions and 2 deletions.
33 changes: 31 additions & 2 deletions src/fondant/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -205,8 +205,37 @@ def execute(self, component_cls: t.Type[Component]) -> None:
self.upload_manifest(output_manifest, save_path=self.output_manifest_path)

def upload_manifest(self, manifest: Manifest, save_path: t.Union[str, Path]):
Path(save_path).parent.mkdir(parents=True, exist_ok=True)
manifest.to_file(save_path)
"""
Uploads the manifest to the specified destination.
If the save_path points to the kubeflow output artifact temporary path,
it will be saved both in a specific base path and the native kfp artifact path.
Args:
manifest: The Manifest object to be uploaded.
save_path: The path where the Manifest object will be saved.
"""
is_kubeflow_output = (
str(save_path) == "/tmp/outputs/output_manifest_path/data" # nosec
)

if is_kubeflow_output:
# Save to the expected base path directory
safe_component_name = self.spec.name.replace(" ", "_").lower()
base_path = self.metadata["base_path"]
save_path_base_path = f"{base_path}/{safe_component_name}/manifest.json"
Path(save_path_base_path).parent.mkdir(parents=True, exist_ok=True)
manifest.to_file(save_path_base_path)
logger.info(f"Saving output manifest to {save_path_base_path}")
# Write manifest to the native kfp artifact path that will be passed as an artifact
# and read by the next component
manifest.to_file(save_path)
else:
# Local runner
Path(save_path).parent.mkdir(parents=True, exist_ok=True)
manifest.to_file(save_path)
logger.info(f"Saving output manifest to {save_path}")


class DaskLoadExecutor(Executor[DaskLoadComponent]):
Expand Down

0 comments on commit f6256c8

Please sign in to comment.