Skip to content

Commit

Permalink
Always pull images in local runner (#279)
Browse files Browse the repository at this point in the history
Fixes #259 

This PR:
- Instructs the local runner to always pull images. This makes sure that
moving tags like `dev` or `latest` are up to date before running the
pipeline. As far as I can tell from my testing, this doesn't cause a
noticeable delay when the image was not updated.
- Adds the pipeline name to the docker compose yaml and removes orphan
containers. Containers are namespaced by the pipeline name, which
defaults to the cwd if not specified. So I set the name explicitly so we
don't accidentally remove containers from another pipeline.
  • Loading branch information
RobbeSneyders authored Jul 6, 2023
1 parent 9364802 commit c180686
Show file tree
Hide file tree
Showing 8 changed files with 59 additions and 5 deletions.
6 changes: 5 additions & 1 deletion fondant/compiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -178,4 +178,8 @@ def _generate_spec(self, pipeline: Pipeline, extra_volumes: list) -> dict:
services[safe_component_name][
"image"
] = component_op.component_spec.image
return {"version": "3.8", "services": services}
return {
"name": pipeline.name,
"version": "3.8",
"services": services,
}
11 changes: 10 additions & 1 deletion fondant/pipeline.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
"""This module defines classes to represent a Fondant Pipeline."""
import json
import logging
import re
import typing as t
from collections import OrderedDict
from pathlib import Path
Expand Down Expand Up @@ -145,7 +146,7 @@ def __init__(
pipeline_description: Optional description of the pipeline.
"""
self.base_path = base_path
self.name = pipeline_name
self.name = self._validate_pipeline_name(pipeline_name)
self.description = pipeline_description
self.package_path = f"{pipeline_name}.tgz"
self._graph: t.OrderedDict[str, t.Any] = OrderedDict()
Expand Down Expand Up @@ -218,6 +219,14 @@ def depth_first_traversal(node: str):

self._graph = OrderedDict((node, self._graph[node]) for node in sorted_graph)

@staticmethod
def _validate_pipeline_name(pipeline_name: str) -> str:
pattern = r"^[a-z0-9][a-z0-9_-]*$"
if not re.match(pattern, pipeline_name):
msg = f"The pipeline name violates the pattern {pattern}"
raise InvalidPipelineDefinition(msg)
return pipeline_name

def _validate_pipeline_definition(self, run_id: str):
"""
Validates the pipeline definition by ensuring that the consumed and produced subsets and
Expand Down
12 changes: 11 additions & 1 deletion fondant/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,16 @@ def run(self, *args, **kwargs):
class DockerRunner(Runner):
def run(cls, input_spec: str, *args, **kwargs):
"""Run a docker-compose spec."""
cmd = ["docker", "compose", "-f", input_spec, "up", "--build"]
cmd = [
"docker",
"compose",
"-f",
input_spec,
"up",
"--build",
"--pull",
"always",
"--remove-orphans",
]

subprocess.call(cmd) # nosec
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
name: test_pipeline
services:
first_component:
build: ./tests/example_pipelines/valid_pipeline/example_1
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
name: test_pipeline
services:
first_component:
build: ./tests/example_pipelines/valid_pipeline/example_1
Expand Down
15 changes: 14 additions & 1 deletion tests/test_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,17 @@ def test_run_logic(tmp_path_factory):
with patch("subprocess.call") as mock_call:
run(args)
mock_call.assert_called_once_with(
["docker", "compose", "-f", "some/path", "up", "--build"],
[
"docker",
"compose",
"-f",
"some/path",
"up",
"--build",
"--pull",
"always",
"--remove-orphans",
],
)

with patch("subprocess.call") as mock_call, tmp_path_factory.mktemp("temp") as fn:
Expand All @@ -87,6 +97,9 @@ def test_run_logic(tmp_path_factory):
str(fn / "docker-compose.yml"),
"up",
"--build",
"--pull",
"always",
"--remove-orphans",
],
)
args2 = argparse.Namespace(kubeflow=True, local=False, ref="some/path")
Expand Down
6 changes: 6 additions & 0 deletions tests/test_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -238,3 +238,9 @@ def test_defining_reusable_component_op_with_custom_spec():
assert load_from_hub_op_default_op.component_spec == load_from_hub_op_default_spec
assert load_from_hub_op_default_op.component_spec != load_from_hub_op_custom_spec
assert load_from_hub_op_custom_op.component_spec != load_from_hub_op_default_spec


def test_pipeline_name():
Pipeline(pipeline_name="valid-name", base_path="base_path")
with pytest.raises(InvalidPipelineDefinition, match="The pipeline name violates"):
Pipeline(pipeline_name="invalid name", base_path="base_path")
12 changes: 11 additions & 1 deletion tests/test_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,15 @@ def test_docker_runner():
with patch("subprocess.call") as mock_call:
DockerRunner().run("some/path")
mock_call.assert_called_once_with(
["docker", "compose", "-f", "some/path", "up", "--build"],
[
"docker",
"compose",
"-f",
"some/path",
"up",
"--build",
"--pull",
"always",
"--remove-orphans",
],
)

0 comments on commit c180686

Please sign in to comment.