-
Notifications
You must be signed in to change notification settings - Fork 26
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Feature/vertex compiler #411
Changes from 8 commits
96b9883
1fbead5
c531a2e
08aea04
4bfbe60
9ff9cb5
8affd1a
4aaf2ed
a97727c
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -243,6 +243,7 @@ def compile( | |
output_path: the path where to save the Kubeflow pipeline spec | ||
""" | ||
run_id = pipeline.get_run_id() | ||
pipeline.validate(run_id=run_id) | ||
|
||
@self.kfp.dsl.pipeline(name=pipeline.name, description=pipeline.description) | ||
def kfp_pipeline(): | ||
|
@@ -263,28 +264,32 @@ def kfp_pipeline(): | |
logger.info(f"Compiling service for {component_name}") | ||
|
||
# convert ComponentOp to Kubeflow component | ||
kubeflow_component_op = self.kfp.components.load_component( | ||
kubeflow_component_op = self.kfp.components.load_component_from_text( | ||
text=component_op.component_spec.kubeflow_specification.to_string(), | ||
) | ||
|
||
# # Set image pull policy to always | ||
# Execute the Kubeflow component and pass in the output manifest path from | ||
# the previous component. | ||
component_args = component_op.arguments | ||
|
||
component_task = kubeflow_component_op( | ||
input_manifest_path=manifest_path, | ||
metadata=metadata.to_json(), | ||
**component_args, | ||
) | ||
# Set optional configurations | ||
component_task = self._set_configuration( | ||
component_task, | ||
component_op, | ||
) | ||
|
||
# Set image pull policy to always | ||
component_task.container.set_image_pull_policy("Always") | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. might be nice to keep the image pull policy (unless it's another function in v2 in which case we can tackle it separately or create a task for it) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. added as a task: #393 |
||
if previous_component_task is not None: | ||
component_task = kubeflow_component_op( | ||
input_manifest_path=manifest_path, | ||
metadata=metadata.to_json(), | ||
**component_args, | ||
) | ||
component_task.after(previous_component_task) | ||
|
||
else: | ||
component_task = kubeflow_component_op( | ||
metadata=metadata.to_json(), | ||
**component_args, | ||
) | ||
component_task | ||
# Set optional configurations | ||
# component_task, | ||
# component_op, | ||
# Set the execution order of the component task to be after the previous | ||
# component task. | ||
if previous_component_task is not None: | ||
|
@@ -295,9 +300,7 @@ def kfp_pipeline(): | |
|
||
previous_component_task = component_task | ||
|
||
self.pipeline = pipeline | ||
self.pipeline.validate(run_id=run_id) | ||
logger.info(f"Compiling {self.pipeline.name} to {output_path}") | ||
logger.info(f"Compiling {pipeline.name} to {output_path}") | ||
|
||
self.kfp.compiler.Compiler().compile(kfp_pipeline, output_path) # type: ignore | ||
logger.info("Pipeline compiled successfully") | ||
|
@@ -315,3 +318,84 @@ def _set_configuration(self, task, fondant_component_operation): | |
task.add_node_selector_constraint(node_pool_label, node_pool_name) | ||
|
||
return task | ||
|
||
|
||
class VertexCompiler(Compiler): | ||
def __init__(self): | ||
self.resolve_imports() | ||
|
||
def resolve_imports(self): | ||
"""Resolve imports for the Vertex compiler.""" | ||
try: | ||
import kfp | ||
|
||
self.kfp = kfp | ||
|
||
except ImportError: | ||
msg = """You need to install kfp to use the Vertex compiler,\n | ||
you can install it with `pip install fondant[vertex]`""" | ||
raise ImportError( | ||
msg, | ||
) | ||
|
||
def compile( | ||
self, | ||
pipeline: Pipeline, | ||
output_path: str = "vertex_pipeline.yml", | ||
) -> None: | ||
"""Compile a pipeline to vertex pipeline spec and save it to a specified output path. | ||
|
||
Args: | ||
pipeline: the pipeline to compile | ||
output_path: the path where to save the Kubeflow pipeline spec | ||
""" | ||
run_id = pipeline.get_run_id() | ||
pipeline.validate(run_id=run_id) | ||
logger.info(f"Compiling {pipeline.name} to {output_path}") | ||
|
||
@self.kfp.dsl.pipeline(name=pipeline.name, description=pipeline.description) | ||
def kfp_pipeline(): | ||
previous_component_task = None | ||
manifest_path = None | ||
for component_name, component in pipeline._graph.items(): | ||
logger.info(f"Compiling service for {component_name}") | ||
|
||
component_op = component["fondant_component_op"] | ||
# convert ComponentOp to Kubeflow component | ||
kubeflow_component_op = self.kfp.components.load_component_from_text( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It seems like this is the only difference between the two compiler (the SDK changed for this function). Can we perhaps reuse the same function instead of defining it twice. Seems like there is quite a bit of overlap |
||
text=component_op.component_spec.kubeflow_specification.to_string(), | ||
) | ||
|
||
# Execute the Kubeflow component and pass in the output manifest path from | ||
# the previous component. | ||
|
||
component_args = component_op.arguments | ||
metadata = Metadata( | ||
pipeline_name=pipeline.name, | ||
run_id=run_id, | ||
base_path=pipeline.base_path, | ||
component_id=component_name, | ||
cache_key=component_op.get_component_cache_key(), | ||
) | ||
# Set the execution order of the component task to be after the previous | ||
# component task. | ||
if previous_component_task is not None: | ||
component_task = kubeflow_component_op( | ||
input_manifest_path=manifest_path, | ||
metadata=metadata.to_json(), | ||
**component_args, | ||
) | ||
component_task.after(previous_component_task) | ||
|
||
else: | ||
component_task = kubeflow_component_op( | ||
metadata=metadata.to_json(), | ||
**component_args, | ||
) | ||
# Update the manifest path to be the output path of the current component task. | ||
manifest_path = component_task.outputs["output_manifest_path"] | ||
|
||
previous_component_task = component_task | ||
|
||
self.kfp.compiler.Compiler().compile(kfp_pipeline, output_path) # type: ignore | ||
logger.info("Pipeline compiled successfully") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
kfp(v2) only supports up to python 3.12