Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/no artifacts #444

Merged
merged 2 commits into from
Sep 21, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
87 changes: 48 additions & 39 deletions src/fondant/compiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -277,21 +277,10 @@ def compile(
@self.kfp.dsl.pipeline(name=pipeline.name, description=pipeline.description)
def kfp_pipeline():
previous_component_task = None
manifest_path = ""

for component_name, component in pipeline._graph.items():
component_op = component["fondant_component_op"]

metadata = Metadata(
pipeline_name=pipeline.name,
run_id=run_id,
base_path=pipeline.base_path,
component_id=component_name,
cache_key=component_op.get_component_cache_key(),
)

logger.info(f"Compiling service for {component_name}")

component_op = component["fondant_component_op"]
# convert ComponentOp to Kubeflow component
kubeflow_component_op = self.kfp.components.load_component_from_text(
text=component_op.component_spec.kubeflow_specification.to_string(),
Expand All @@ -302,33 +291,47 @@ def kfp_pipeline():
k: v for k, v in component_op.arguments.items() if v is not None
}

# # Set image pull policy to always
# Execute the Kubeflow component and pass in the output manifest path from
# the previous component.
metadata = Metadata(
pipeline_name=pipeline.name,
run_id=run_id,
base_path=pipeline.base_path,
component_id=component_name,
cache_key=component_op.get_component_cache_key(),
)

if previous_component_task is not None:
component_task = kubeflow_component_op(
input_manifest_path=manifest_path,
metadata=metadata.to_json(),
**component_args,
)
component_task.after(previous_component_task)
output_manifest_path = (
f"{pipeline.base_path}/{pipeline.name}/"
f"{run_id}/{component_name}/manifest.json"
)
# Set the execution order of the component task to be after the previous
# component task.
if component["dependencies"]:
for dependency in component["dependencies"]:
input_manifest_path = (
f"{pipeline.base_path}/{pipeline.name}/"
f"{run_id}/{dependency}/manifest.json"
)
component_task = kubeflow_component_op(
input_manifest_path=input_manifest_path,
output_manifest_path=output_manifest_path,
metadata=metadata.to_json(),
**component_args,
)
component_task.after(previous_component_task)

else:
component_task = kubeflow_component_op(
metadata=metadata.to_json(),
output_manifest_path=output_manifest_path,
**component_args,
)

# Set optional configurations
# Set optional arguments
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this should still be configurations

component_task = self._set_configuration(
component_task,
component_op,
)

# Update the manifest path to be the output path of the current component task.
manifest_path = component_task.outputs["output_manifest_path"]

previous_component_task = component_task

logger.info(f"Compiling {pipeline.name} to {output_path}")
Expand Down Expand Up @@ -404,7 +407,6 @@ def compile(
@self.kfp.dsl.pipeline(name=pipeline.name, description=pipeline.description)
def kfp_pipeline():
previous_component_task = None
manifest_path = None
for component_name, component in pipeline._graph.items():
logger.info(f"Compiling service for {component_name}")

Expand All @@ -426,21 +428,31 @@ def kfp_pipeline():
component_id=component_name,
cache_key=component_op.get_component_cache_key(),
)

output_manifest_path = (
f"{pipeline.base_path}/{pipeline.name}/"
f"{run_id}/{component_name}/manifest.json"
)
# Set the execution order of the component task to be after the previous
# component task.
if previous_component_task is not None:
# Execute the Kubeflow component and pass in the output manifest path from
# the previous component.
component_task = kubeflow_component_op(
input_manifest_path=manifest_path,
metadata=metadata.to_json(),
**component_args,
)
component_task.after(previous_component_task)
if component["dependencies"]:
for dependency in component["dependencies"]:
input_manifest_path = (
f"{pipeline.base_path}/{pipeline.name}/"
f"{run_id}/{dependency}/manifest.json"
)
component_task = kubeflow_component_op(
input_manifest_path=input_manifest_path,
output_manifest_path=output_manifest_path,
metadata=metadata.to_json(),
**component_args,
)
component_task.after(previous_component_task)

else:
component_task = kubeflow_component_op(
metadata=metadata.to_json(),
output_manifest_path=output_manifest_path,
**component_args,
)

Expand All @@ -450,9 +462,6 @@ def kfp_pipeline():
component_op,
)

# Update the manifest path to be the output path of the current component task.
manifest_path = component_task.outputs["output_manifest_path"]

previous_component_task = component_task

self.kfp.compiler.Compiler().compile(kfp_pipeline, output_path) # type: ignore
Expand Down
Loading