Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Create local artifact directory if it does not exist #847

Merged
merged 5 commits into from
Feb 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion examples/sample_pipeline/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
from fondant.pipeline import Pipeline, lightweight_component

BASE_PATH = Path("./.artifacts").resolve()
BASE_PATH.mkdir(parents=True, exist_ok=True)

# Define pipeline
pipeline = Pipeline(name="dummy-pipeline", base_path=str(BASE_PATH))
Expand Down
53 changes: 38 additions & 15 deletions src/fondant/pipeline/compiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from pathlib import Path

import yaml
from fsspec.registry import known_implementations

from fondant.core.component_spec import ComponentSpec
from fondant.core.exceptions import InvalidPipelineDefinition
Expand Down Expand Up @@ -137,23 +138,45 @@ def _patch_path(base_path: str) -> t.Tuple[str, t.Optional[DockerVolume]]:
if local it patches the base_path and prepares a bind mount
Returns a tuple containing the path and volume.
"""

def is_remote_path(path: Path) -> bool:
"""Check if the path is remote."""
_path = str(path)
prefixes = set(known_implementations.keys()) - {"local", "file"}
return any(_path.startswith(prefix) for prefix in prefixes)
Comment on lines +142 to +146
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wouldn't it be easier to swap this check and check if it's local instead?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure. The remote prefixes are well defined by the known_implementations. The local paths can be more complex (start with prefix, without prefix, absolute path, relative path, ...). Thought a check for a remote path is easier.


def resolve_local_base_path(base_path: Path) -> Path:
"""Resolve local base path and create base directory if it no exist."""
p_base_path = base_path.resolve()
try:
if p_base_path.exists():
logger.info(
f"Base path found on local system, setting up {base_path} as mount volume",
)
else:
p_base_path.mkdir(parents=True, exist_ok=True)
logger.info(
f"Base path not found on local system, created base path and setting up "
f"{base_path} as mount volume",
)
except Exception as e:
msg = f"Unable to create and mount local base path. {e}"
raise ValueError(msg)

return p_base_path

p_base_path = Path(base_path)
# check if base path is an existing local folder
if p_base_path.exists():
logger.info(
f"Base path found on local system, setting up {base_path} as mount volume",
)
p_base_path = p_base_path.resolve()
volume = DockerVolume(
type="bind",
source=str(p_base_path),
target=f"/{p_base_path.stem}",
)
path = f"/{p_base_path.stem}"
else:
if is_remote_path(p_base_path):
logger.info(f"Base path {base_path} is remote")
volume = None
path = base_path
return base_path, None

p_base_path = resolve_local_base_path(p_base_path)
volume = DockerVolume(
type="bind",
source=str(p_base_path),
target=f"/{p_base_path.stem}",
)
path = f"/{p_base_path.stem}"
return path, volume

def _generate_spec(
Expand Down
42 changes: 41 additions & 1 deletion tests/pipeline/test_compiler.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import datetime
import json
import os
import re
import subprocess
import sys
import tempfile
Expand Down Expand Up @@ -168,6 +169,7 @@ def test_docker_compiler(setup_pipeline, tmp_path_factory):
example_dir, pipeline, _ = setup_pipeline
compiler = DockerCompiler()
with tmp_path_factory.mktemp("temp") as fn:
pipeline.base_path = str(fn)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not a big fan of this. Can we turn the fixture into a factory function?

See this example from connexion.

Copy link
Contributor Author

@mrchtr mrchtr Feb 8, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If it's fine for you I would exclude it here and keep the test as it is. I'll create an separat issue for this. I took a look into the test and I think it would result in a bigger refactoring. It would make indeed sense to have a pipeline factory cause it is needed in several test cases. Not only here in this file.

Copy link
Collaborator

@GeorgesLorre GeorgesLorre Feb 9, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We already have code like this that we could reuse more.

eg:

def setup_pipeline(request, tmp_path, monkeypatch):

This is one is even parameterized so you can easily run the same test on multiple pipelines.

output_path = str(fn / "docker-compose.yml")
compiler.compile(pipeline=pipeline, output_path=output_path, build_args=[])
pipeline_configs = DockerComposeConfigs.from_spec(output_path)
Expand Down Expand Up @@ -336,6 +338,7 @@ def test_docker_configuration(tmp_path_factory):

compiler = DockerCompiler()
with tmp_path_factory.mktemp("temp") as fn:
pipeline.base_path = str(fn)
output_path = str(fn / "docker-compose.yaml")
compiler.compile(pipeline=pipeline, output_path=output_path)
pipeline_configs = DockerComposeConfigs.from_spec(output_path)
Expand Down Expand Up @@ -363,7 +366,10 @@ def test_invalid_docker_configuration(tmp_path_factory):
)

compiler = DockerCompiler()
with pytest.raises(InvalidPipelineDefinition):
with tmp_path_factory.mktemp("temp") as fn, pytest.raises( # noqa PT012
InvalidPipelineDefinition,
):
pipeline.base_path = str(fn)
compiler.compile(pipeline=pipeline, output_path="kubeflow_pipeline.yml")


Expand Down Expand Up @@ -663,6 +669,7 @@ def test_caching_dependency_docker(tmp_path_factory):
)

with tmp_path_factory.mktemp("temp") as fn:
pipeline.base_path = str(fn)
output_path = str(fn / "docker-compose.yml")
compiler.compile(pipeline=pipeline, output_path=output_path, build_args=[])
pipeline_configs = DockerComposeConfigs.from_spec(output_path)
Expand Down Expand Up @@ -844,3 +851,36 @@ def test_sagemaker_base_path_validator():

# valid
compiler.validate_base_path("s3://foo/bar")


@pytest.mark.usefixtures("_freeze_time")
def test_docker_compiler_create_local_base_path(setup_pipeline, tmp_path_factory):
"""Test compiling a pipeline to docker-compose."""
example_dir, pipeline, _ = setup_pipeline
compiler = DockerCompiler()
with tmp_path_factory.mktemp("temp") as fn:
pipeline.base_path = str(fn) + "/my-artifacts"
output_path = str(fn / "docker-compose.yml")
compiler.compile(pipeline=pipeline, output_path=output_path, build_args=[])
assert Path(pipeline.base_path).exists()


@pytest.mark.usefixtures("_freeze_time")
def test_docker_compiler_create_local_base_path_propagate_exception(
setup_pipeline,
tmp_path_factory,
):
"""Test compiling a pipeline to docker-compose."""
example_dir, pipeline, _ = setup_pipeline
compiler = DockerCompiler()
msg = re.escape(
"Unable to create and mount local base path. ",
)

with tmp_path_factory.mktemp("temp") as fn, pytest.raises( # noqa PT012
ValueError,
match=msg,
):
pipeline.base_path = "/my-artifacts"
output_path = str(fn / "docker-compose.yml")
compiler.compile(pipeline=pipeline, output_path=output_path, build_args=[])
13 changes: 10 additions & 3 deletions tests/pipeline/test_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,9 +59,16 @@ def test_docker_runner(mock_docker_installation):
)


def test_docker_runner_from_pipeline(mock_docker_installation):
with mock.patch("subprocess.call") as mock_call:
DockerRunner().run(PIPELINE)
def test_docker_runner_from_pipeline(mock_docker_installation, tmp_path_factory):
with mock.patch("subprocess.call") as mock_call, tmp_path_factory.mktemp(
"temp",
) as fn:
pipeline = Pipeline(
name="testpipeline",
description="description of the test pipeline",
base_path=str(fn),
)
DockerRunner().run(pipeline)
mock_call.assert_called_once_with(
[
"docker",
Expand Down
10 changes: 5 additions & 5 deletions tests/test_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -270,7 +270,7 @@ def test_local_run(mock_docker_installation):
"""Test that the run command works with different arguments."""
args = argparse.Namespace(
local=True,
ref="some/path",
ref=__name__,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are we using __name__ here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't understand completely why the tests have worked before. The actual module contains a test pipeline which is used as reference for the test cases. __name__ makes more sense here.
However, I would cleanup the test cases as well. I think we can extract some fixtures here and make it more robust.

output_path=None,
auth_provider=None,
credentials=None,
Expand All @@ -284,7 +284,7 @@ def test_local_run(mock_docker_installation):
"docker",
"compose",
"-f",
"some/path",
".fondant/compose.yaml",
"up",
"--build",
"--pull",
Expand Down Expand Up @@ -372,7 +372,7 @@ def test_kfp_run(tmp_path_factory):
local=False,
vertex=False,
output_path=None,
ref="some/path",
ref=__name__,
host=None,
)
with pytest.raises(
Expand All @@ -386,7 +386,7 @@ def test_kfp_run(tmp_path_factory):
local=False,
output_path=None,
host="localhost",
ref="some/path",
ref=__name__,
)
run_kfp(args)
mock_runner.assert_called_once_with(host="localhost")
Expand Down Expand Up @@ -419,7 +419,7 @@ def test_vertex_run(tmp_path_factory):
project_id="project-123",
service_account=None,
network=None,
ref="some/path",
ref=__name__,
)
run_vertex(args)
mock_runner.assert_called_once_with(
Expand Down
Loading