Skip to content

Commit

Permalink
Vertex cli (#519)
Browse files Browse the repository at this point in the history
PR that adds functionalities for compiling and running Vertex Pipelines
with the CLI.

After this PR is merged, i'd like to simplify a bit the compilation for
both vertex and compile since there is a lot of repetition #517
  • Loading branch information
PhilippeMoussalli authored Oct 16, 2023
1 parent 45bc198 commit 0c79cb4
Show file tree
Hide file tree
Showing 4 changed files with 160 additions and 8 deletions.
87 changes: 85 additions & 2 deletions src/fondant/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,14 @@
from collections import defaultdict
from types import ModuleType

from fondant.compiler import DockerCompiler, KubeFlowCompiler
from fondant.compiler import DockerCompiler, KubeFlowCompiler, VertexCompiler
from fondant.component import BaseComponent, Component
from fondant.executor import ExecutorFactory
from fondant.explorer import (
run_explorer_app,
)
from fondant.pipeline import Pipeline
from fondant.runner import DockerRunner, KubeflowRunner
from fondant.runner import DockerRunner, KubeflowRunner, VertexRunner

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -192,6 +192,10 @@ def register_compile(parent_parser):
name="kubeflow",
help="Kubeflow compiler",
)
vertex_parser = compiler_subparser.add_parser(
name="vertex",
help="vertex compiler",
)

# Local runner parser
local_parser.add_argument(
Expand Down Expand Up @@ -236,8 +240,24 @@ def register_compile(parent_parser):
default="pipeline.yaml",
)

# vertex parser
vertex_parser.add_argument(
"ref",
help="""Reference to the pipeline to run, can be a path to a spec file or
a module containing the pipeline instance that will be compiled first (e.g. pipeline.py)
""",
action="store",
)
vertex_parser.add_argument(
"--output-path",
"-o",
help="Output path of compiled pipeline",
default="vertex_pipeline.yml",
)

local_parser.set_defaults(func=compile_local)
kubeflow_parser.set_defaults(func=compile_kfp)
vertex_parser.set_defaults(func=compile_vertex)


def compile_local(args):
Expand All @@ -257,6 +277,12 @@ def compile_kfp(args):
compiler.compile(pipeline=pipeline, output_path=args.output_path)


def compile_vertex(args):
pipeline = pipeline_from_module(args.ref)
compiler = VertexCompiler()
compiler.compile(pipeline=pipeline, output_path=args.output_path)


def register_run(parent_parser):
parser = parent_parser.add_parser(
"run",
Expand Down Expand Up @@ -284,6 +310,10 @@ def register_run(parent_parser):
name="kubeflow",
help="Kubeflow runner",
)
vertex_parser = runner_subparser.add_parser(
name="vertex",
help="Vertex runner",
)

# Local runner parser
local_parser.add_argument(
Expand Down Expand Up @@ -335,6 +365,38 @@ def register_run(parent_parser):

kubeflow_parser.set_defaults(func=run_kfp)

# Vertex runner parser
vertex_parser.add_argument(
"ref",
help="""Reference to the pipeline to run, can be a path to a spec file or
a module containing the pipeline instance that will be compiled first (e.g. pipeline.py)
""",
action="store",
)
vertex_parser.add_argument(
"--project-id",
help="""The project id of the GCP project used to submit the pipeline""",
)
vertex_parser.add_argument(
"--region",
help="The region where to run the pipeline",
)

vertex_parser.add_argument(
"--output-path",
"-o",
help="Output path of compiled pipeline",
default="vertex_pipeline.yaml",
)

vertex_parser.add_argument(
"--service-account",
help="The service account used to launch jobs",
default=None,
)

vertex_parser.set_defaults(func=run_vertex)


def run_local(args):
try:
Expand Down Expand Up @@ -377,6 +439,27 @@ def run_kfp(args):
runner.run(input_spec=spec_ref)


def run_vertex(args):
try:
pipeline = pipeline_from_module(args.ref)
except ModuleNotFoundError:
spec_ref = args.ref
else:
spec_ref = args.output_path
logging.info(
"Found reference to un-compiled pipeline... compiling to {spec_ref}",
)
compiler = VertexCompiler()
compiler.compile(pipeline=pipeline, output_path=spec_ref)
finally:
runner = VertexRunner(
project_id=args.project_id,
region=args.region,
service_account=args.service_account,
)
runner.run(input_spec=spec_ref)


def register_execute(parent_parser):
parser = parent_parser.add_parser(
"execute",
Expand Down
4 changes: 2 additions & 2 deletions src/fondant/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,14 +97,14 @@ def __resolve_imports(self):
def __init__(
self,
project_id: str,
project_region: str,
region: str,
service_account: t.Optional[str] = None,
):
self.__resolve_imports()

self.aip.init(
project=project_id,
location=project_region,
location=region,
)
self.service_account = service_account

Expand Down
73 changes: 71 additions & 2 deletions tests/test_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,14 @@
PipelineImportError,
compile_kfp,
compile_local,
compile_vertex,
component_from_module,
execute,
get_module,
pipeline_from_module,
run_kfp,
run_local,
run_vertex,
)
from fondant.component import DaskLoadComponent
from fondant.executor import Executor, ExecutorFactory
Expand Down Expand Up @@ -135,6 +137,7 @@ def test_local_logic(tmp_path_factory):
ref=__name__,
local=True,
kubeflow=False,
vertex=False,
output_path=str(fn / "docker-compose.yml"),
extra_volumes=[],
build_arg=[],
Expand All @@ -150,12 +153,31 @@ def test_kfp_compile(tmp_path_factory):
ref=__name__,
kubeflow=True,
local=False,
output_path=str(fn / "kubeflow_pipelines.yml"),
vertex=False,
output_path=str(fn / "kubeflow_pipeline.yml"),
)
compile_kfp(args)
mock_compiler.assert_called_once_with(
pipeline=TEST_PIPELINE,
output_path=str(fn / "kubeflow_pipelines.yml"),
output_path=str(fn / "kubeflow_pipeline.yml"),
)


def test_vertex_compile(tmp_path_factory):
with tmp_path_factory.mktemp("temp") as fn, patch(
"fondant.compiler.VertexCompiler.compile",
) as mock_compiler:
args = argparse.Namespace(
ref=__name__,
kubeflow=False,
local=False,
vertex=True,
output_path=str(fn / "vertex_pipeline.yml"),
)
compile_vertex(args)
mock_compiler.assert_called_once_with(
pipeline=TEST_PIPELINE,
output_path=str(fn / "vertex_pipeline.yml"),
)


Expand All @@ -181,6 +203,8 @@ def test_local_run(tmp_path_factory):
with patch("subprocess.call") as mock_call, tmp_path_factory.mktemp("temp") as fn:
args1 = argparse.Namespace(
local=True,
vertex=False,
kubeflow=False,
ref=__name__,
output_path=str(fn / "docker-compose.yml"),
extra_volumes=[],
Expand All @@ -207,6 +231,7 @@ def test_kfp_run(tmp_path_factory):
args = argparse.Namespace(
kubeflow=True,
local=False,
vertex=False,
output_path=None,
ref="some/path",
host=None,
Expand Down Expand Up @@ -241,3 +266,47 @@ def test_kfp_run(tmp_path_factory):
)
run_kfp(args)
mock_runner.assert_called_once_with(host="localhost2")


def test_vertex_run(tmp_path_factory):
"""Test that the run command works in different scenarios."""
with patch("fondant.cli.VertexRunner") as mock_runner:
args = argparse.Namespace(
kubeflow=False,
local=False,
vertex=True,
output_path=None,
region="europe-west-1",
project_id="project-123",
service_account=None,
ref="some/path",
)
run_vertex(args)
mock_runner.assert_called_once_with(
project_id="project-123",
region="europe-west-1",
service_account=None,
)

with patch("fondant.cli.VertexRunner") as mock_runner, patch(
"fondant.cli.VertexCompiler",
) as mock_compiler, tmp_path_factory.mktemp(
"temp",
) as fn:
mock_compiler.compile.return_value = "some/path"
args = argparse.Namespace(
kubeflow=True,
local=False,
host="localhost2",
output_path=str(fn / "kubeflow_pipelines.yml"),
ref=__name__,
region="europe-west-1",
project_id="project-123",
service_account=None,
)
run_vertex(args)
mock_runner.assert_called_once_with(
project_id="project-123",
region="europe-west-1",
service_account=None,
)
4 changes: 2 additions & 2 deletions tests/test_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,13 +86,13 @@ def test_vertex_runner():
with mock.patch("google.cloud.aiplatform.init", return_value=None), mock.patch(
"google.cloud.aiplatform.PipelineJob",
):
runner = VertexRunner(project_id="some_project", project_region="some_region")
runner = VertexRunner(project_id="some_project", region="some_region")
runner.run(input_spec=input_spec_path)

# test with service account
runner2 = VertexRunner(
project_id="some_project",
project_region="some_region",
region="some_region",
service_account="some_account",
)
runner2.run(input_spec=input_spec_path)

0 comments on commit 0c79cb4

Please sign in to comment.