Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor compile method for kfp and vertex #522

Merged
merged 2 commits into from
Oct 16, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
97 changes: 8 additions & 89 deletions src/fondant/compiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,10 @@ class Compiler(ABC):
def compile(self, *args, **kwargs) -> None:
"""Abstract method to invoke compilation."""

@abstractmethod
def _set_configuration(self, *args, **kwargs) -> None:
"""Abstract method to set pipeline configuration."""


@dataclass
class DockerVolume:
Expand Down Expand Up @@ -277,7 +281,7 @@ def _resolve_imports(self):
def compile(
self,
pipeline: Pipeline,
output_path: str = "kubeflow_pipeline.yml",
output_path: str,
) -> None:
"""Compile a pipeline to Kubeflow pipeline spec and save it to a specified output path.

Expand Down Expand Up @@ -398,8 +402,9 @@ def _set_configuration(self, task, fondant_component_operation):
return task


class VertexCompiler(Compiler):
class VertexCompiler(KubeFlowCompiler):
def __init__(self):
super().__init__()
self.resolve_imports()

def resolve_imports(self):
Expand All @@ -416,93 +421,7 @@ def resolve_imports(self):
msg,
)

def compile(
self,
pipeline: Pipeline,
output_path: str = "vertex_pipeline.yml",
) -> None:
"""Compile a pipeline to vertex pipeline spec and save it to a specified output path.

Args:
pipeline: the pipeline to compile
output_path: the path where to save the Kubeflow pipeline spec
"""
run_id = pipeline.get_run_id()
pipeline.validate(run_id=run_id)
logger.info(f"Compiling {pipeline.name} to {output_path}")

@self.kfp.dsl.pipeline(name=pipeline.name, description=pipeline.description)
def kfp_pipeline():
previous_component_task = None
component_cache_key = None

for component_name, component in pipeline._graph.items():
logger.info(f"Compiling service for {component_name}")

component_op = component["fondant_component_op"]
# convert ComponentOp to Kubeflow component
kubeflow_component_op = self.kfp.components.load_component_from_text(
text=component_op.component_spec.kubeflow_specification.to_string(),
)

# Remove None values from arguments
component_args = {
k: v for k, v in component_op.arguments.items() if v is not None
}
component_cache_key = component_op.get_component_cache_key(
previous_component_cache=component_cache_key,
)
metadata = Metadata(
pipeline_name=pipeline.name,
run_id=run_id,
base_path=pipeline.base_path,
component_id=component_name,
cache_key=component_cache_key,
)

output_manifest_path = (
f"{pipeline.base_path}/{pipeline.name}/"
f"{run_id}/{component_name}/manifest.json"
)
# Set the execution order of the component task to be after the previous
# component task.
if component["dependencies"]:
for dependency in component["dependencies"]:
input_manifest_path = (
f"{pipeline.base_path}/{pipeline.name}/"
f"{run_id}/{dependency}/manifest.json"
)
component_task = kubeflow_component_op(
input_manifest_path=input_manifest_path,
output_manifest_path=output_manifest_path,
metadata=metadata.to_json(),
**component_args,
)
component_task.after(previous_component_task)

else:
component_task = kubeflow_component_op(
metadata=metadata.to_json(),
output_manifest_path=output_manifest_path,
**component_args,
)

# Set optional arguments
component_task = self._set_configuration(
component_task,
component_op,
)

# Disable caching
component_task.set_caching_options(enable_caching=False)

previous_component_task = component_task

self.kfp.compiler.Compiler().compile(kfp_pipeline, output_path) # type: ignore
logger.info("Pipeline compiled successfully")

@staticmethod
def _set_configuration(task, fondant_component_operation):
def _set_configuration(self, task, fondant_component_operation):
# Unpack optional specifications
number_of_accelerators = fondant_component_operation.number_of_accelerators
accelerator_name = fondant_component_operation.accelerator_name
Expand Down