From 37e658600dff30df641252ef5d363098cf99d113 Mon Sep 17 00:00:00 2001 From: Matthias Richter Date: Wed, 7 Feb 2024 09:45:26 +0100 Subject: [PATCH 1/5] Create local artifact directory if it does not exist --- examples/sample_pipeline/pipeline.py | 1 - src/fondant/pipeline/compiler.py | 53 ++++++++++++++++++++-------- tests/pipeline/test_compiler.py | 43 +++++++++++++++++++++- tests/pipeline/test_runner.py | 7 ++-- 4 files changed, 85 insertions(+), 19 deletions(-) diff --git a/examples/sample_pipeline/pipeline.py b/examples/sample_pipeline/pipeline.py index f64024b1..28c01ced 100644 --- a/examples/sample_pipeline/pipeline.py +++ b/examples/sample_pipeline/pipeline.py @@ -11,7 +11,6 @@ from fondant.pipeline import Pipeline, lightweight_component BASE_PATH = Path("./.artifacts").resolve() -BASE_PATH.mkdir(parents=True, exist_ok=True) # Define pipeline pipeline = Pipeline(name="dummy-pipeline", base_path=str(BASE_PATH)) diff --git a/src/fondant/pipeline/compiler.py b/src/fondant/pipeline/compiler.py index 155c371c..be1004af 100644 --- a/src/fondant/pipeline/compiler.py +++ b/src/fondant/pipeline/compiler.py @@ -11,6 +11,7 @@ from pathlib import Path import yaml +from fsspec.registry import known_implementations from fondant.core.component_spec import ComponentSpec from fondant.core.exceptions import InvalidPipelineDefinition @@ -137,23 +138,45 @@ def _patch_path(base_path: str) -> t.Tuple[str, t.Optional[DockerVolume]]: if local it patches the base_path and prepares a bind mount Returns a tuple containing the path and volume. """ + + def is_remote_path(path: Path) -> bool: + """Check if the path is remote.""" + _path = str(path) + prefixes = set(known_implementations.keys()) - {"local", "file"} + return any(_path.startswith(prefix) for prefix in prefixes) + + def resolve_local_base_path(base_path: Path) -> Path: + """Resolve local base path and create base directory if it no exist.""" + p_base_path = base_path.resolve() + try: + if p_base_path.exists(): + logger.info( + f"Base path found on local system, setting up {base_path} as mount volume", + ) + else: + p_base_path.mkdir(parents=True, exist_ok=True) + logger.info( + f"Base path not found on local system, created base path and setting up " + f"{base_path} as mount volume", + ) + except Exception as e: + msg = f"Unable to create and mount local base path. {e}" + raise ValueError(msg) + + return p_base_path + p_base_path = Path(base_path) - # check if base path is an existing local folder - if p_base_path.exists(): - logger.info( - f"Base path found on local system, setting up {base_path} as mount volume", - ) - p_base_path = p_base_path.resolve() - volume = DockerVolume( - type="bind", - source=str(p_base_path), - target=f"/{p_base_path.stem}", - ) - path = f"/{p_base_path.stem}" - else: + if is_remote_path(p_base_path): logger.info(f"Base path {base_path} is remote") - volume = None - path = base_path + return base_path, None + + p_base_path = resolve_local_base_path(p_base_path) + volume = DockerVolume( + type="bind", + source=str(p_base_path), + target=f"/{p_base_path.stem}", + ) + path = f"/{p_base_path.stem}" return path, volume def _generate_spec( diff --git a/tests/pipeline/test_compiler.py b/tests/pipeline/test_compiler.py index 39cfb0f0..316af430 100644 --- a/tests/pipeline/test_compiler.py +++ b/tests/pipeline/test_compiler.py @@ -1,6 +1,7 @@ import datetime import json import os +import re import subprocess import sys import tempfile @@ -168,6 +169,7 @@ def test_docker_compiler(setup_pipeline, tmp_path_factory): example_dir, pipeline, _ = setup_pipeline compiler = DockerCompiler() with tmp_path_factory.mktemp("temp") as fn: + pipeline.base_path = str(fn) output_path = str(fn / "docker-compose.yml") compiler.compile(pipeline=pipeline, output_path=output_path, build_args=[]) pipeline_configs = DockerComposeConfigs.from_spec(output_path) @@ -336,6 +338,7 @@ def test_docker_configuration(tmp_path_factory): compiler = DockerCompiler() with tmp_path_factory.mktemp("temp") as fn: + pipeline.base_path = str(fn) output_path = str(fn / "docker-compose.yaml") compiler.compile(pipeline=pipeline, output_path=output_path) pipeline_configs = DockerComposeConfigs.from_spec(output_path) @@ -363,7 +366,10 @@ def test_invalid_docker_configuration(tmp_path_factory): ) compiler = DockerCompiler() - with pytest.raises(InvalidPipelineDefinition): + with tmp_path_factory.mktemp("temp") as fn, pytest.raises( # noqa PT012 + InvalidPipelineDefinition, + ): + pipeline.base_path = str(fn) compiler.compile(pipeline=pipeline, output_path="kubeflow_pipeline.yml") @@ -663,6 +669,7 @@ def test_caching_dependency_docker(tmp_path_factory): ) with tmp_path_factory.mktemp("temp") as fn: + pipeline.base_path = str(fn) output_path = str(fn / "docker-compose.yml") compiler.compile(pipeline=pipeline, output_path=output_path, build_args=[]) pipeline_configs = DockerComposeConfigs.from_spec(output_path) @@ -844,3 +851,37 @@ def test_sagemaker_base_path_validator(): # valid compiler.validate_base_path("s3://foo/bar") + + +@pytest.mark.usefixtures("_freeze_time") +def test_docker_compiler_create_local_base_path(setup_pipeline, tmp_path_factory): + """Test compiling a pipeline to docker-compose.""" + example_dir, pipeline, _ = setup_pipeline + compiler = DockerCompiler() + with tmp_path_factory.mktemp("temp") as fn: + pipeline.base_path = str(fn) + "/my-artifacts" + output_path = str(fn / "docker-compose.yml") + compiler.compile(pipeline=pipeline, output_path=output_path, build_args=[]) + assert Path(pipeline.base_path).exists() + + +@pytest.mark.usefixtures("_freeze_time") +def test_docker_compiler_create_local_base_path_propagate_exception( + setup_pipeline, + tmp_path_factory, +): + """Test compiling a pipeline to docker-compose.""" + example_dir, pipeline, _ = setup_pipeline + compiler = DockerCompiler() + msg = re.escape( + "Unable to create and mount local base path. " + "[Errno 30] Read-only file system: '/my-artifacts'", + ) + + with tmp_path_factory.mktemp("temp") as fn, pytest.raises( # noqa PT012 + ValueError, + match=msg, + ): + pipeline.base_path = "/my-artifacts" + output_path = str(fn / "docker-compose.yml") + compiler.compile(pipeline=pipeline, output_path=output_path, build_args=[]) diff --git a/tests/pipeline/test_runner.py b/tests/pipeline/test_runner.py index c93b5791..75fef907 100644 --- a/tests/pipeline/test_runner.py +++ b/tests/pipeline/test_runner.py @@ -59,8 +59,11 @@ def test_docker_runner(mock_docker_installation): ) -def test_docker_runner_from_pipeline(mock_docker_installation): - with mock.patch("subprocess.call") as mock_call: +def test_docker_runner_from_pipeline(mock_docker_installation, tmp_path_factory): + with mock.patch("subprocess.call") as mock_call, tmp_path_factory.mktemp( + "temp", + ) as fn: + PIPELINE.base_path = str(fn) DockerRunner().run(PIPELINE) mock_call.assert_called_once_with( [ From d34afde67809e0fa325cdb0f32a859d62d35a811 Mon Sep 17 00:00:00 2001 From: Matthias Richter Date: Wed, 7 Feb 2024 10:16:53 +0100 Subject: [PATCH 2/5] Fix tests --- tests/pipeline/test_compiler.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/tests/pipeline/test_compiler.py b/tests/pipeline/test_compiler.py index 316af430..545b56d0 100644 --- a/tests/pipeline/test_compiler.py +++ b/tests/pipeline/test_compiler.py @@ -874,8 +874,7 @@ def test_docker_compiler_create_local_base_path_propagate_exception( example_dir, pipeline, _ = setup_pipeline compiler = DockerCompiler() msg = re.escape( - "Unable to create and mount local base path. " - "[Errno 30] Read-only file system: '/my-artifacts'", + "Unable to create and mount local base path. ", ) with tmp_path_factory.mktemp("temp") as fn, pytest.raises( # noqa PT012 From f488a1400ccb71cca599b9e86528c91cf18a9e62 Mon Sep 17 00:00:00 2001 From: Matthias Richter Date: Wed, 7 Feb 2024 11:35:33 +0100 Subject: [PATCH 3/5] Fix tests --- tests/test_cli.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/tests/test_cli.py b/tests/test_cli.py index 0618b35d..f48155b5 100644 --- a/tests/test_cli.py +++ b/tests/test_cli.py @@ -270,7 +270,7 @@ def test_local_run(mock_docker_installation): """Test that the run command works with different arguments.""" args = argparse.Namespace( local=True, - ref="some/path", + ref=__name__, output_path=None, auth_provider=None, credentials=None, @@ -284,7 +284,7 @@ def test_local_run(mock_docker_installation): "docker", "compose", "-f", - "some/path", + ".fondant/compose.yaml", "up", "--build", "--pull", @@ -372,7 +372,7 @@ def test_kfp_run(tmp_path_factory): local=False, vertex=False, output_path=None, - ref="some/path", + ref=__name__, host=None, ) with pytest.raises( @@ -386,7 +386,7 @@ def test_kfp_run(tmp_path_factory): local=False, output_path=None, host="localhost", - ref="some/path", + ref=__name__, ) run_kfp(args) mock_runner.assert_called_once_with(host="localhost") @@ -419,7 +419,7 @@ def test_vertex_run(tmp_path_factory): project_id="project-123", service_account=None, network=None, - ref="some/path", + ref=__name__, ) run_vertex(args) mock_runner.assert_called_once_with( From a24cfb0c28a64e6fd7d6c70c75318272500de806 Mon Sep 17 00:00:00 2001 From: Matthias Richter Date: Wed, 7 Feb 2024 16:22:23 +0100 Subject: [PATCH 4/5] Addresses comments --- tests/pipeline/test_runner.py | 8 ++++++-- tests/test_cli.py | 10 +++++----- 2 files changed, 11 insertions(+), 7 deletions(-) diff --git a/tests/pipeline/test_runner.py b/tests/pipeline/test_runner.py index 75fef907..9cc3161e 100644 --- a/tests/pipeline/test_runner.py +++ b/tests/pipeline/test_runner.py @@ -63,8 +63,12 @@ def test_docker_runner_from_pipeline(mock_docker_installation, tmp_path_factory) with mock.patch("subprocess.call") as mock_call, tmp_path_factory.mktemp( "temp", ) as fn: - PIPELINE.base_path = str(fn) - DockerRunner().run(PIPELINE) + pipeline = Pipeline( + name="testpipeline", + description="description of the test pipeline", + base_path=str(fn), + ) + DockerRunner().run(pipeline) mock_call.assert_called_once_with( [ "docker", diff --git a/tests/test_cli.py b/tests/test_cli.py index f48155b5..0618b35d 100644 --- a/tests/test_cli.py +++ b/tests/test_cli.py @@ -270,7 +270,7 @@ def test_local_run(mock_docker_installation): """Test that the run command works with different arguments.""" args = argparse.Namespace( local=True, - ref=__name__, + ref="some/path", output_path=None, auth_provider=None, credentials=None, @@ -284,7 +284,7 @@ def test_local_run(mock_docker_installation): "docker", "compose", "-f", - ".fondant/compose.yaml", + "some/path", "up", "--build", "--pull", @@ -372,7 +372,7 @@ def test_kfp_run(tmp_path_factory): local=False, vertex=False, output_path=None, - ref=__name__, + ref="some/path", host=None, ) with pytest.raises( @@ -386,7 +386,7 @@ def test_kfp_run(tmp_path_factory): local=False, output_path=None, host="localhost", - ref=__name__, + ref="some/path", ) run_kfp(args) mock_runner.assert_called_once_with(host="localhost") @@ -419,7 +419,7 @@ def test_vertex_run(tmp_path_factory): project_id="project-123", service_account=None, network=None, - ref=__name__, + ref="some/path", ) run_vertex(args) mock_runner.assert_called_once_with( From 2978c264ff0b9338c04fe261419c17a397c6a3aa Mon Sep 17 00:00:00 2001 From: Matthias Richter Date: Thu, 8 Feb 2024 12:06:09 +0100 Subject: [PATCH 5/5] Fixing cli tests --- tests/test_cli.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/tests/test_cli.py b/tests/test_cli.py index 0618b35d..f48155b5 100644 --- a/tests/test_cli.py +++ b/tests/test_cli.py @@ -270,7 +270,7 @@ def test_local_run(mock_docker_installation): """Test that the run command works with different arguments.""" args = argparse.Namespace( local=True, - ref="some/path", + ref=__name__, output_path=None, auth_provider=None, credentials=None, @@ -284,7 +284,7 @@ def test_local_run(mock_docker_installation): "docker", "compose", "-f", - "some/path", + ".fondant/compose.yaml", "up", "--build", "--pull", @@ -372,7 +372,7 @@ def test_kfp_run(tmp_path_factory): local=False, vertex=False, output_path=None, - ref="some/path", + ref=__name__, host=None, ) with pytest.raises( @@ -386,7 +386,7 @@ def test_kfp_run(tmp_path_factory): local=False, output_path=None, host="localhost", - ref="some/path", + ref=__name__, ) run_kfp(args) mock_runner.assert_called_once_with(host="localhost") @@ -419,7 +419,7 @@ def test_vertex_run(tmp_path_factory): project_id="project-123", service_account=None, network=None, - ref="some/path", + ref=__name__, ) run_vertex(args) mock_runner.assert_called_once_with(