Skip to content

Commit

Permalink
Feature/vertex compiler (#411)
Browse files Browse the repository at this point in the history
  • Loading branch information
GeorgesLorre committed Oct 2, 2023
1 parent 7d3b881 commit a39cdbc
Show file tree
Hide file tree
Showing 17 changed files with 3,068 additions and 901 deletions.
5 changes: 3 additions & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ classifiers = [
]

[tool.poetry.dependencies]
python = ">= 3.8"
python = ">= 3.8 < 3.12"
dask = {extras = ["dataframe", "distributed", "diagnostics"], version = ">= 2023.4.1"}
importlib-resources = { version = ">= 1.3", python = "<3.9" }
jsonschema = ">= 4.18"
Expand All @@ -51,14 +51,15 @@ fsspec = { version = ">= 2023.4.0", optional = true}
gcsfs = { version = ">= 2023.4.0", optional = true }
s3fs = { version = ">= 2023.4.0", optional = true }
adlfs = { version = ">= 2023.4.0", optional = true }
kfp = { version = ">= 1.8.19, < 2", optional = true }
kfp = { version = "2.0.1", optional = true }
pandas = { version = ">= 1.3.5", optional = true }

[tool.poetry.extras]
aws = ["fsspec", "s3fs"]
azure = ["fsspec", "adlfs"]
gcp = ["fsspec", "gcsfs"]
kfp = ["kfp"]
vertex = ["kfp"]

[tool.poetry.group.test.dependencies]
pre-commit = "^3.1.1"
Expand Down
118 changes: 101 additions & 17 deletions src/fondant/compiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -253,6 +253,7 @@ def compile(
output_path: the path where to save the Kubeflow pipeline spec
"""
run_id = pipeline.get_run_id()
pipeline.validate(run_id=run_id)

@self.kfp.dsl.pipeline(name=pipeline.name, description=pipeline.description)
def kfp_pipeline():
Expand All @@ -273,28 +274,32 @@ def kfp_pipeline():
logger.info(f"Compiling service for {component_name}")

# convert ComponentOp to Kubeflow component
kubeflow_component_op = self.kfp.components.load_component(
kubeflow_component_op = self.kfp.components.load_component_from_text(
text=component_op.component_spec.kubeflow_specification.to_string(),
)

# # Set image pull policy to always
# Execute the Kubeflow component and pass in the output manifest path from
# the previous component.
component_args = component_op.arguments

component_task = kubeflow_component_op(
input_manifest_path=manifest_path,
metadata=metadata.to_json(),
**component_args,
)
# Set optional configurations
component_task = self._set_configuration(
component_task,
component_op,
)

# Set image pull policy to always
component_task.container.set_image_pull_policy("Always")
if previous_component_task is not None:
component_task = kubeflow_component_op(
input_manifest_path=manifest_path,
metadata=metadata.to_json(),
**component_args,
)
component_task.after(previous_component_task)

else:
component_task = kubeflow_component_op(
metadata=metadata.to_json(),
**component_args,
)
component_task
# Set optional configurations
# component_task,
# component_op,
# Set the execution order of the component task to be after the previous
# component task.
if previous_component_task is not None:
Expand All @@ -305,9 +310,7 @@ def kfp_pipeline():

previous_component_task = component_task

self.pipeline = pipeline
self.pipeline.validate(run_id=run_id)
logger.info(f"Compiling {self.pipeline.name} to {output_path}")
logger.info(f"Compiling {pipeline.name} to {output_path}")

self.kfp.compiler.Compiler().compile(kfp_pipeline, output_path) # type: ignore
logger.info("Pipeline compiled successfully")
Expand All @@ -334,3 +337,84 @@ def _set_configuration(self, task, fondant_component_operation):
task.apply(self.kfp_gcp.use_preemptible_nodepool())

return task


class VertexCompiler(Compiler):
def __init__(self):
self.resolve_imports()

def resolve_imports(self):
"""Resolve imports for the Vertex compiler."""
try:
import kfp

self.kfp = kfp

except ImportError:
msg = """You need to install kfp to use the Vertex compiler,\n
you can install it with `pip install fondant[vertex]`"""
raise ImportError(
msg,
)

def compile(
self,
pipeline: Pipeline,
output_path: str = "vertex_pipeline.yml",
) -> None:
"""Compile a pipeline to vertex pipeline spec and save it to a specified output path.
Args:
pipeline: the pipeline to compile
output_path: the path where to save the Kubeflow pipeline spec
"""
run_id = pipeline.get_run_id()
pipeline.validate(run_id=run_id)
logger.info(f"Compiling {pipeline.name} to {output_path}")

@self.kfp.dsl.pipeline(name=pipeline.name, description=pipeline.description)
def kfp_pipeline():
previous_component_task = None
manifest_path = None
for component_name, component in pipeline._graph.items():
logger.info(f"Compiling service for {component_name}")

component_op = component["fondant_component_op"]
# convert ComponentOp to Kubeflow component
kubeflow_component_op = self.kfp.components.load_component_from_text(
text=component_op.component_spec.kubeflow_specification.to_string(),
)

# Execute the Kubeflow component and pass in the output manifest path from
# the previous component.

component_args = component_op.arguments
metadata = Metadata(
pipeline_name=pipeline.name,
run_id=run_id,
base_path=pipeline.base_path,
component_id=component_name,
cache_key=component_op.get_component_cache_key(),
)
# Set the execution order of the component task to be after the previous
# component task.
if previous_component_task is not None:
component_task = kubeflow_component_op(
input_manifest_path=manifest_path,
metadata=metadata.to_json(),
**component_args,
)
component_task.after(previous_component_task)

else:
component_task = kubeflow_component_op(
metadata=metadata.to_json(),
**component_args,
)
# Update the manifest path to be the output path of the current component task.
manifest_path = component_task.outputs["output_manifest_path"]

previous_component_task = component_task

self.kfp.compiler.Compiler().compile(kfp_pipeline, output_path) # type: ignore
logger.info("Pipeline compiled successfully")
Loading

0 comments on commit a39cdbc

Please sign in to comment.