Skip to content

Commit

Permalink
Add kfp compiler (#291)
Browse files Browse the repository at this point in the history
Move all kfp related stuff from the Pipeline to a separate kfp compiler
  • Loading branch information
GeorgesLorre authored Aug 10, 2023
1 parent 8eb6d4c commit 5e5957d
Show file tree
Hide file tree
Showing 11 changed files with 688 additions and 280 deletions.
103 changes: 101 additions & 2 deletions src/fondant/compiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ class Compiler(ABC):
"""Abstract base class for a compiler."""

@abstractmethod
def compile(self, *args, **kwargs):
def compile(self, *args, **kwargs) -> None:
"""Abstract method to invoke compilation."""


Expand Down Expand Up @@ -134,10 +134,13 @@ def _generate_spec(
"""
timestamp = datetime.datetime.now().strftime("%Y%m%d%H%M%S")
path, volume = self._patch_path(base_path=pipeline.base_path)
metadata = MetaData(run_id=f"{pipeline.name}-{timestamp}", base_path=path)
run_id = f"{pipeline.name}-{timestamp}"
metadata = MetaData(run_id=run_id, base_path=path)

services = {}

pipeline.validate(run_id=run_id)

for component_name, component in pipeline._graph.items():
logger.info(f"Compiling service for {component_name}")
safe_component_name = self._safe_component_name(component_name)
Expand Down Expand Up @@ -207,3 +210,99 @@ def _generate_spec(
"version": "3.8",
"services": services,
}


class KubeFlowCompiler(Compiler):
"""Compiler that creates a Kubeflow pipeline spec from a pipeline."""

def __init__(self):
self._resolve_imports()

def _resolve_imports(self):
"""Resolve imports for the Kubeflow compiler."""
try:
import kfp

self.kfp = kfp
except ImportError:
msg = """You need to install kfp to use the Kubeflow compiler,\n
you can install it with `pip install --extras pipelines`"""
raise ImportError(
msg,
)

def compile(
self,
pipeline: Pipeline,
output_path: str = "kubeflow_pipeline.yml",
) -> None:
"""Compile a pipeline to Kubeflow pipeline spec and save it to a specified output path.
Args:
pipeline: the pipeline to compile
output_path: the path where to save the Kubeflow pipeline spec
"""
self.pipeline = pipeline
self.pipeline.validate(run_id="{{workflow.name}}")
logger.info(f"Compiling {self.pipeline.name} to {output_path}")
wrapped_pipeline = (self.kfp.dsl.pipeline())(self.kfp_pipeline) # type: ignore
self.kfp.compiler.Compiler().compile(wrapped_pipeline, output_path) # type: ignore
logger.info("Pipeline compiled successfully")

def kfp_pipeline(self):
previous_component_task = None
manifest_path = ""
for component_name, component in self.pipeline._graph.items():
logger.info(f"Compiling service for {component_name}")

component_op = component["fondant_component_op"]
# convert ComponentOp to Kubeflow component
kubeflow_component_op = self.kfp.components.load_component(
text=component_op.component_spec.kubeflow_specification.to_string(),
)

# Execute the Kubeflow component and pass in the output manifest path from
# the previous component.
component_args = component_op.arguments
metadata = json.dumps(
{"base_path": self.pipeline.base_path, "run_id": "{{workflow.name}}"},
)

component_task = kubeflow_component_op(
input_manifest_path=manifest_path,
metadata=metadata,
**component_args,
)
# Set optional configurations
component_task = self._set_configuration(
component_task,
component_op,
)
# Set the execution order of the component task to be after the previous
# component task.
if previous_component_task is not None:
component_task.after(previous_component_task)

# Update the manifest path to be the output path of the current component task.
manifest_path = component_task.outputs["output_manifest_path"]

previous_component_task = component_task

def _set_configuration(self, task, fondant_component_operation):
# Unpack optional specifications
number_of_gpus = fondant_component_operation.number_of_gpus
node_pool_name = fondant_component_operation.node_pool_name
p_volumes = fondant_component_operation.p_volumes
ephemeral_storage_size = fondant_component_operation.ephemeral_storage_size

# Assign optional specification
if number_of_gpus is not None:
task.set_gpu_limit(number_of_gpus)
if node_pool_name is not None:
task.add_node_selector_constraint("node_pool", node_pool_name)
if p_volumes is not None:
task.add_pvolumes(p_volumes)
if ephemeral_storage_size is not None:
task.set_ephemeral_storage_request(ephemeral_storage_size)

return task
2 changes: 1 addition & 1 deletion src/fondant/import_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,5 +37,5 @@ def is_package_available(package_name: str, import_error_msg: str) -> bool:


def is_kfp_available():
"""Check if 'pandas' is available."""
"""Check if 'kfp' is available."""
return is_package_available("kfp", KFP_IMPORT_ERROR)
Loading

0 comments on commit 5e5957d

Please sign in to comment.