Skip to content

Commit

Permalink
Add compile and run interface to kfp based runners (#704)
Browse files Browse the repository at this point in the history
  • Loading branch information
GeorgesLorre authored Dec 7, 2023
1 parent dac4e20 commit 1f07824
Show file tree
Hide file tree
Showing 3 changed files with 120 additions and 34 deletions.
36 changes: 10 additions & 26 deletions src/fondant/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -376,7 +376,7 @@ def register_compile(parent_parser):
"--output-path",
"-o",
help="Output path of compiled pipeline",
default="pipeline.yaml",
default="kubeflow-pipeline.yaml",
)

# vertex parser
Expand All @@ -390,7 +390,7 @@ def register_compile(parent_parser):
"--output-path",
"-o",
help="Output path of compiled pipeline",
default="vertex_pipeline.yml",
default="vertex-pipeline.yml",
)

# sagemaker parser
Expand Down Expand Up @@ -547,7 +547,7 @@ def register_run(parent_parser):
"--output-path",
"-o",
help="Output path of compiled pipeline",
default="pipeline.yaml",
default="kubeflow-pipeline.yaml",
)
kubeflow_parser.add_argument(
"--host",
Expand Down Expand Up @@ -596,7 +596,7 @@ def register_run(parent_parser):
"--output-path",
"-o",
help="Output path of compiled pipeline",
default="vertex_pipeline.yaml",
default="vertex-pipeline.yaml",
)

vertex_parser.add_argument(
Expand Down Expand Up @@ -670,51 +670,35 @@ def run_local(args):


def run_kfp(args):
from fondant.pipeline.compiler import KubeFlowCompiler
from fondant.pipeline.runner import KubeflowRunner

if not args.host:
msg = "--host argument is required for running on Kubeflow"
raise ValueError(msg)
try:
pipeline = pipeline_from_string(args.ref)
ref = pipeline_from_string(args.ref)
except ModuleNotFoundError:
spec_ref = args.ref
else:
spec_ref = args.output_path
logging.info(
"Found reference to un-compiled pipeline... compiling to {spec_ref}",
)
compiler = KubeFlowCompiler()
compiler.compile(pipeline=pipeline, output_path=spec_ref)
ref = args.ref

runner = KubeflowRunner(host=args.host)
runner.run(input_spec=spec_ref)
runner.run(input=ref)


def run_vertex(args):
from fondant.pipeline.compiler import VertexCompiler
from fondant.pipeline.runner import VertexRunner

try:
pipeline = pipeline_from_string(args.ref)
ref = pipeline_from_string(args.ref)
except ModuleNotFoundError:
spec_ref = args.ref
else:
spec_ref = args.output_path
logging.info(
"Found reference to un-compiled pipeline... compiling to {spec_ref}",
)
compiler = VertexCompiler()
compiler.compile(pipeline=pipeline, output_path=spec_ref)
ref = args.ref

runner = VertexRunner(
project_id=args.project_id,
region=args.region,
service_account=args.service_account,
network=args.network,
)
runner.run(input_spec=spec_ref)
runner.run(input=ref)


def run_sagemaker(args):
Expand Down
63 changes: 59 additions & 4 deletions src/fondant/pipeline/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,12 @@
import yaml

from fondant.pipeline import Pipeline
from fondant.pipeline.compiler import DockerCompiler, SagemakerCompiler
from fondant.pipeline.compiler import (
DockerCompiler,
KubeFlowCompiler,
SagemakerCompiler,
VertexCompiler,
)

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -92,11 +97,37 @@ def _resolve_imports(self):
)

def run(
self,
input: t.Union[Pipeline, str],
*,
experiment_name: str = "Default",
):
"""Run a pipeline, either from a compiled kubeflow spec or from a fondant pipeline.
Args:
input: the pipeline to compile or a path to a already compiled sagemaker spec
experiment_name: the name of the experiment to create
"""
if isinstance(input, Pipeline):
os.makedirs(".fondant", exist_ok=True)
output_path = ".fondant/kubeflow-pipeline.yaml"
logging.info(
"Found reference to un-compiled pipeline... compiling",
)
compiler = KubeFlowCompiler()
compiler.compile(
input,
output_path=output_path,
)
self._run(output_path, experiment_name=experiment_name)
else:
self._run(input, experiment_name=experiment_name)

def _run(
self,
input_spec: str,
*,
experiment_name: str = "Default",
*args,
**kwargs,
):
"""Run a kubeflow pipeline."""
try:
Expand Down Expand Up @@ -148,7 +179,31 @@ def __init__(
self.service_account = service_account
self.network = network

def run(self, input_spec: str, *args, **kwargs):
def run(
self,
input: t.Union[Pipeline, str],
):
"""Run a pipeline, either from a compiled vertex spec or from a fondant pipeline.
Args:
input: the pipeline to compile or a path to a already compiled sagemaker spec
"""
if isinstance(input, Pipeline):
os.makedirs(".fondant", exist_ok=True)
output_path = ".fondant/vertex-pipeline.yaml"
logging.info(
"Found reference to un-compiled pipeline... compiling",
)
compiler = VertexCompiler()
compiler.compile(
input,
output_path=output_path,
)
self._run(output_path)
else:
self._run(input)

def _run(self, input_spec: str, *args, **kwargs):
job = self.aip.PipelineJob(
display_name=self.get_name_from_spec(input_spec),
template_path=input_spec,
Expand Down
55 changes: 51 additions & 4 deletions tests/pipeline/test_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ def test_kubeflow_runner():
new=MockKfpClient,
):
runner = KubeflowRunner(host="some_host")
runner.run(input_spec=input_spec_path)
runner.run(input=input_spec_path)

assert runner.client.host == "some_host"

Expand All @@ -96,7 +96,7 @@ def test_kubeflow_runner_new_experiment():
new=MockKfpClient,
):
runner = KubeflowRunner(host="some_host")
runner.run(input_spec=input_spec_path, experiment_name="NewExperiment")
runner.run(input=input_spec_path, experiment_name="NewExperiment")


def test_kfp_import():
Expand All @@ -111,21 +111,68 @@ def test_kfp_import():
_ = KubeflowRunner(host="some_host")


class MockKubeFlowCompiler:
def compile(
self,
pipeline,
output_path,
) -> None:
with open(output_path, "w") as f:
f.write("foo: bar")


def test_kubeflow_runner_from_pipeline():
with mock.patch(
"fondant.pipeline.runner.KubeFlowCompiler",
new=MockKubeFlowCompiler,
), mock.patch(
"fondant.pipeline.runner.KubeflowRunner._run",
) as mock_run, mock.patch(
"kfp.Client",
new=MockKfpClient,
):
runner = KubeflowRunner(host="some_host")
runner.run(
input=PIPELINE,
)

mock_run.assert_called_once_with(
".fondant/kubeflow-pipeline.yaml",
experiment_name="Default",
)


def test_vertex_runner():
input_spec_path = str(VALID_PIPELINE / "kubeflow_pipeline.yml")
with mock.patch("google.cloud.aiplatform.init", return_value=None), mock.patch(
"google.cloud.aiplatform.PipelineJob",
):
runner = VertexRunner(project_id="some_project", region="some_region")
runner.run(input_spec=input_spec_path)
runner.run(input=input_spec_path)

# test with service account
runner2 = VertexRunner(
project_id="some_project",
region="some_region",
service_account="some_account",
)
runner2.run(input_spec=input_spec_path)
runner2.run(input=input_spec_path)


def test_vertex_runner_from_pipeline():
with mock.patch(
"fondant.pipeline.runner.VertexCompiler",
new=MockKubeFlowCompiler,
), mock.patch("fondant.pipeline.runner.VertexRunner._run") as mock_run, mock.patch(
"google.cloud.aiplatform.init",
return_value=None,
):
runner = VertexRunner(project_id="some_project", region="some_region")
runner.run(
input=PIPELINE,
)

mock_run.assert_called_once_with(".fondant/vertex-pipeline.yaml")


def test_sagemaker_runner(tmp_path_factory):
Expand Down

0 comments on commit 1f07824

Please sign in to comment.