Skip to content

Commit

Permalink
Augment DockerRunner to support running from a fondant Pipeline (#651)
Browse files Browse the repository at this point in the history
  • Loading branch information
GeorgesLorre authored Nov 27, 2023
1 parent 3b72e20 commit 130fb25
Show file tree
Hide file tree
Showing 4 changed files with 79 additions and 28 deletions.
23 changes: 6 additions & 17 deletions src/fondant/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -577,7 +577,6 @@ def register_run(parent_parser):


def run_local(args):
from fondant.pipeline.compiler import DockerCompiler
from fondant.pipeline.runner import DockerRunner

extra_volumes = []
Expand All @@ -590,26 +589,16 @@ def run_local(args):
extra_volumes.append(cloud_cred)

try:
pipeline = pipeline_from_module(args.ref)
ref = pipeline_from_module(args.ref)
except ModuleNotFoundError:
spec_ref = args.ref
else:
spec_ref = args.output_path
logging.info(
"Found reference to un-compiled pipeline... compiling to {spec_ref}",
)
compiler = DockerCompiler()
compiler.compile(
pipeline=pipeline,
ref = args.ref
finally:
runner = DockerRunner()
runner.run(
input=ref,
extra_volumes=extra_volumes,
output_path=spec_ref,
build_args=args.build_arg,
)
finally:
try:
DockerRunner().run(spec_ref)
except UnboundLocalError as e:
raise e


def run_kfp(args):
Expand Down
40 changes: 39 additions & 1 deletion src/fondant/pipeline/runner.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,14 @@
import logging
import os
import subprocess # nosec
import typing as t
from abc import ABC, abstractmethod

import yaml

from fondant.pipeline import Pipeline
from fondant.pipeline.compiler import DockerCompiler

logger = logging.getLogger(__name__)


Expand All @@ -17,7 +21,7 @@ def run(self, *args, **kwargs):


class DockerRunner(Runner):
def run(self, input_spec: str, *args, **kwargs):
def _run(self, input_spec: str, *args, **kwargs):
"""Run a docker-compose spec."""
cmd = [
"docker",
Expand All @@ -33,6 +37,40 @@ def run(self, input_spec: str, *args, **kwargs):

subprocess.call(cmd) # nosec

def run(
self,
input: t.Union[Pipeline, str],
*,
extra_volumes: t.Union[t.Optional[list], t.Optional[str]] = None,
build_args: t.Optional[t.List[str]] = None,
) -> None:
"""Compile a pipeline to docker-compose spec and save it to a specified output path.
Args:
input: the pipeline to compile or a path to a already compiled docker-compose spec
output_path: the path where to save the docker-compose spec
extra_volumes: a list of extra volumes (using the Short syntax:
https://docs.docker.com/compose/compose-file/05-services/#short-syntax-5)
to mount in the docker-compose spec.
build_args: List of build arguments to pass to docker
"""
if isinstance(input, Pipeline):
os.makedirs(".fondant", exist_ok=True)
output_path = ".fondant/compose.yaml"
logging.info(
"Found reference to un-compiled pipeline... compiling",
)
compiler = DockerCompiler()
compiler.compile(
input,
output_path=output_path,
extra_volumes=extra_volumes,
build_args=build_args,
)
self._run(output_path)
else:
self._run(input)


class KubeflowRunner(Runner):
def __init__(self, host: str):
Expand Down
25 changes: 25 additions & 0 deletions tests/pipeline/test_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from unittest import mock

import pytest
from fondant.pipeline import Pipeline
from fondant.pipeline.runner import (
DockerRunner,
KubeflowRunner,
Expand All @@ -13,6 +14,12 @@

VALID_PIPELINE = Path("./tests/pipeline/examples/pipelines/compiled_pipeline/")

PIPELINE = Pipeline(
pipeline_name="testpipeline",
pipeline_description="description of the test pipeline",
base_path="/foo/bar",
)


def test_docker_runner():
"""Test that the docker runner while mocking subprocess.call."""
Expand All @@ -33,6 +40,24 @@ def test_docker_runner():
)


def test_docker_runner_from_pipeline():
with mock.patch("subprocess.call") as mock_call:
DockerRunner().run(PIPELINE)
mock_call.assert_called_once_with(
[
"docker",
"compose",
"-f",
".fondant/compose.yaml",
"up",
"--build",
"--pull",
"always",
"--remove-orphans",
],
)


class MockKfpClient:
def __init__(self, host):
self.host = host
Expand Down
19 changes: 9 additions & 10 deletions tests/test_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ def test_vertex_compile(tmp_path_factory):
)


def test_local_run(tmp_path_factory):
def test_local_run():
"""Test that the run command works with different arguments."""
args = argparse.Namespace(
local=True,
Expand All @@ -224,6 +224,7 @@ def test_local_run(tmp_path_factory):
auth_aws=False,
credentials=None,
extra_volumes=[],
build_arg=[],
)
with patch("subprocess.call") as mock_call:
run_local(args)
Expand All @@ -241,13 +242,12 @@ def test_local_run(tmp_path_factory):
],
)

with patch("subprocess.call") as mock_call, tmp_path_factory.mktemp("temp") as fn:
with patch("subprocess.call") as mock_call:
args1 = argparse.Namespace(
local=True,
vertex=False,
kubeflow=False,
ref=__name__,
output_path=str(fn / "docker-compose.yml"),
extra_volumes=[],
build_arg=[],
auth_gcp=False,
Expand All @@ -261,7 +261,7 @@ def test_local_run(tmp_path_factory):
"docker",
"compose",
"-f",
str(fn / "docker-compose.yml"),
".fondant/compose.yaml",
"up",
"--build",
"--pull",
Expand All @@ -271,15 +271,15 @@ def test_local_run(tmp_path_factory):
)


def test_local_run_cloud_credentials(tmp_path_factory):
def test_local_run_cloud_credentials():
namespace_creds_kwargs = [
{"auth_gcp": True, "auth_azure": False, "auth_aws": False},
{"auth_gcp": False, "auth_azure": True, "auth_aws": False},
{"auth_gcp": False, "auth_azure": False, "auth_aws": True},
]

for namespace_cred_kwargs in namespace_creds_kwargs:
with tmp_path_factory.mktemp("temp") as fn, patch(
with patch(
"fondant.pipeline.compiler.DockerCompiler.compile",
) as mock_compiler, patch(
"subprocess.call",
Expand All @@ -289,7 +289,6 @@ def test_local_run_cloud_credentials(tmp_path_factory):
vertex=False,
kubeflow=False,
ref=__name__,
output_path=str(fn / "docker-compose.yml"),
**namespace_cred_kwargs,
credentials=None,
extra_volumes=[],
Expand All @@ -305,9 +304,9 @@ def test_local_run_cloud_credentials(tmp_path_factory):
extra_volumes = [CloudCredentialsMount.AZURE.value]

mock_compiler.assert_called_once_with(
pipeline=TEST_PIPELINE,
TEST_PIPELINE,
extra_volumes=extra_volumes,
output_path=str(fn / "docker-compose.yml"),
output_path=".fondant/compose.yaml",
build_args=[],
)

Expand All @@ -316,7 +315,7 @@ def test_local_run_cloud_credentials(tmp_path_factory):
"docker",
"compose",
"-f",
str(fn / "docker-compose.yml"),
".fondant/compose.yaml",
"up",
"--build",
"--pull",
Expand Down

0 comments on commit 130fb25

Please sign in to comment.