From 2c51c07d670ca7799ba2890a60d222948428dde9 Mon Sep 17 00:00:00 2001 From: Philippe Moussalli Date: Tue, 2 Jan 2024 11:01:08 +0100 Subject: [PATCH] Make data explorer non-blocking with docker compose (#731) Small PR to avoid showing the localhost url before the image is build --- docs/data_explorer.md | 14 +- docs/guides/build_a_simple_pipeline.md | 2 +- docs/guides/first_pipeline.md | 2 +- src/fondant/cli.py | 43 +++++- src/fondant/core/schema.py | 17 +++ src/fondant/explore.py | 197 ++++++++++++++++--------- src/fondant/pipeline/compiler.py | 19 +-- src/fondant/testing.py | 4 +- tests/pipeline/test_compiler.py | 16 +- tests/test_explorer.py | 114 ++++++++++---- 10 files changed, 289 insertions(+), 139 deletions(-) diff --git a/docs/data_explorer.md b/docs/data_explorer.md index 74394d880..7660b80f4 100644 --- a/docs/data_explorer.md +++ b/docs/data_explorer.md @@ -52,7 +52,7 @@ together with the Fondant python package. === "Console" ```bash - fondant explore --base_path $BASE_PATH + fondant explore start --base_path $BASE_PATH ``` === "Python" @@ -71,22 +71,22 @@ or `--auth-azure` to mount your default local cloud credentials to the pipeline. Or You can also use the `--extra-volumnes` flag to specify credentials or local files you need to mount. +To stop the data explorer service you can use the following commands: + Example: === "Console" ```bash - export BASE_PATH=gs://foo/bar - fondant explore --base_path $BASE_PATH + fondant explore stop ``` === "Python" ```python - from fondant.explore import run_explorer_app - - BASE_PATH = "gs://foo/bar" - run_explorer_app(base_path=BASE_PATH) + from fondant.explore import stop_explorer_app + + stop_explorer_app() ``` diff --git a/docs/guides/build_a_simple_pipeline.md b/docs/guides/build_a_simple_pipeline.md index 2fef3eb7f..f3209c232 100644 --- a/docs/guides/build_a_simple_pipeline.md +++ b/docs/guides/build_a_simple_pipeline.md @@ -118,7 +118,7 @@ The pipeline execution will start, initiating the download of the dataset from H After the pipeline has completed, you can explore the pipeline result using the fondant explorer: ```bash -fondant explore --base_path ./data +fondant explore start --base_path ./data ``` You can open your browser at `localhost:8501` to explore the loaded data. diff --git a/docs/guides/first_pipeline.md b/docs/guides/first_pipeline.md index c0676912a..cba81b200 100644 --- a/docs/guides/first_pipeline.md +++ b/docs/guides/first_pipeline.md @@ -47,7 +47,7 @@ fondant run local pipeline.py Congrats, you just ran your first Fondant pipeline! To visually inspect the results between every pipeline step, you can use the fondant explorer: ``` -fondant explore --base_path ./data-dir +fondant explore start --base_path ./data-dir ``` ### Building your own pipeline diff --git a/src/fondant/cli.py b/src/fondant/cli.py index 9c6d0d302..cce7cfaf1 100644 --- a/src/fondant/cli.py +++ b/src/fondant/cli.py @@ -107,41 +107,52 @@ def register_explore(parent_parser): Example: - fondant explore --base_path gs://foo/bar \ + fondant explore start --base_path gs://foo/bar \ -c $HOME/.config/gcloud/application_default_credentials.json:/root/.config/gcloud/application_default_credentials.json """, ), ) - auth_group = parser.add_mutually_exclusive_group() - parser.add_argument( + explore_subparser = parser.add_subparsers() + start_parser = explore_subparser.add_parser(name="start", help="Start explorer app") + stop_parser = explore_subparser.add_parser(name="stop", help="Stop explorer app") + + auth_group = start_parser.add_mutually_exclusive_group() + + start_parser.add_argument( "--base_path", "-b", type=str, help="""Base path that contains the data produced by a Fondant pipeline (local or remote) .""", ) - parser.add_argument( + start_parser.add_argument( "--container", "-r", type=str, default="fndnt/data_explorer", help="Docker container to use. Defaults to fndnt/data_explorer.", ) - parser.add_argument( + start_parser.add_argument( "--tag", "-t", type=str, default=version("fondant") if version("fondant") != "0.1.dev0" else "latest", help="Docker image tag to use.", ) - parser.add_argument( + start_parser.add_argument( "--port", "-p", type=int, default=8501, help="Port to expose the container on.", ) + start_parser.add_argument( + "--output-path", + type=str, + default=".fondant/explorer-compose.yaml", + help="The path to the Docker Compose specification.", + ) auth_group.add_argument( "--auth-gcp", @@ -172,10 +183,18 @@ def register_explore(parent_parser): nargs="+", ) - parser.set_defaults(func=explore) + stop_parser.add_argument( + "--output-path", + type=str, + default=".fondant/explorer-compose.yaml", + help="The path to the Docker Compose specification.", + ) + start_parser.set_defaults(func=start_explore) + stop_parser.set_defaults(func=stop_explore) -def explore(args): + +def start_explore(args): from fondant.explore import run_explorer_app if not shutil.which("docker"): @@ -202,6 +221,14 @@ def explore(args): ) +def stop_explore(args): + from fondant.explore import stop_explorer_app + + stop_explorer_app( + output_path=args.output_path, + ) + + def register_build(parent_parser): parser = parent_parser.add_parser( "build", diff --git a/src/fondant/core/schema.py b/src/fondant/core/schema.py index 121fceddf..a2e0dcded 100644 --- a/src/fondant/core/schema.py +++ b/src/fondant/core/schema.py @@ -5,6 +5,7 @@ import os import re import typing as t +from dataclasses import dataclass from enum import Enum import pyarrow as pa @@ -12,6 +13,22 @@ from fondant.core.exceptions import InvalidTypeSchema +@dataclass +class DockerVolume: + """Dataclass representing a DockerVolume. + (https://docs.docker.com/compose/compose-file/05-services/#volumes). + + Args: + type: the mount type volume (bind, volume) + source: the source of the mount, a path on the host for a bind mount + target: the path in the container where the volume is mounted. + """ + + type: str + source: str + target: str + + class CloudCredentialsMount(Enum): home_directory = os.path.expanduser("~") AWS = f"{home_directory}/credentials:/root/.aws/credentials" diff --git a/src/fondant/explore.py b/src/fondant/explore.py index 667f71267..b9c801fd9 100644 --- a/src/fondant/explore.py +++ b/src/fondant/explore.py @@ -3,35 +3,34 @@ import shlex import subprocess # nosec import typing as t +from dataclasses import asdict from importlib.metadata import version from pathlib import Path import fsspec.core +import yaml from fsspec.implementations.local import LocalFileSystem +from fondant.core.schema import DockerVolume -# type: ignore -def run_explorer_app( # type: ignore +CONTAINER = "fndnt/data_explorer" +PORT = 8501 +OUTPUT_PATH = ".fondant/explorer-compose.yaml" + + +def _get_service_name(container: str) -> str: + return os.path.basename(container) + + +def _generate_explorer_spec( + *, base_path: str, - port: int = 8501, - container: str = "fndnt/data_explorer", + port: int = PORT, + container: str = CONTAINER, tag: t.Optional[str] = None, extra_volumes: t.Union[t.Optional[list], t.Optional[str]] = None, -): # type: ignore - """ - Run an Explorer App in a Docker container. - - Args: - base_path: the base path where the Explorer App will be mounted. - port: The port number to expose the Explorer App. Default is 8501. - container: The Docker container name or image to use. Default is "fndnt/data_explorer". - tag: The tag/version of the Docker container. Default is "latest". - extra_volumes: Extra volumes to mount in containers. You can use the --extra-volumes flag - to specify extra volumes to mount in the containers this can be used: - - to mount data directories to be used by the pipeline (note that if your pipeline's - base_path is local it will already be mounted for you). - - to mount cloud credentials - """ +) -> t.Dict[str, t.Any]: + """Generate a Docker Compose specification for the Explorer App.""" if tag is None: tag = version("fondant") if version("fondant") != "0.1.dev0" else "latest" @@ -41,17 +40,8 @@ def run_explorer_app( # type: ignore if isinstance(extra_volumes, str): extra_volumes = [extra_volumes] - cmd = [ - "docker", - "run", - "--pull", - "always", - "--name", - "fondant-explorer", - "--rm", - "-p", - f"{port}:8501", - ] + # Mount extra volumes to the container + volumes: t.List[t.Union[str, dict]] = [] fs, _ = fsspec.core.url_to_fs(base_path) if isinstance(fs, LocalFileSystem): @@ -60,31 +50,20 @@ def run_explorer_app( # type: ignore "This directory will be mounted to /artifacts in the container.", ) data_directory_path = Path(base_path).resolve() - host_machine_path = data_directory_path.as_posix() container_path = os.path.join("/", data_directory_path.name) - # Mount extra volumes to the container + command = ["--base_path", f"{container_path}"] + if extra_volumes: - for volume in extra_volumes: - cmd.extend( - ["-v", volume], - ) + volumes.extend(extra_volumes) # Mount the local base path to the container - cmd.extend( - ["-v", f"{shlex.quote(host_machine_path)}:{shlex.quote(container_path)}"], - ) - - # add the image name - cmd.extend( - [ - f"{shlex.quote(container)}:{shlex.quote(tag)}", - ], - ) - - cmd.extend( - ["--base_path", f"{container_path}"], + volume = DockerVolume( + type="bind", + source=str(data_directory_path), + target=f"/{data_directory_path.stem}", ) + volumes.append(asdict(volume)) else: if not extra_volumes: @@ -97,27 +76,113 @@ def run_explorer_app( # type: ignore logging.info(f"Using remote base path: {base_path}") # Mount extra volumes to the container - if extra_volumes: - for volume in extra_volumes: - cmd.extend( - ["-v", volume], - ) - - # add the image name - cmd.extend( - [ - f"{shlex.quote(container)}:{shlex.quote(tag)}", - ], - ) + volumes.extend(extra_volumes) # Add the remote base path as a container argument - cmd.extend( - ["--base_path", f"{base_path}"], - ) + command = ["--base_path", base_path] + + services = { + f"{_get_service_name(container)}": { + "command": command, + "volumes": volumes, + "ports": [f"{port}:8501"], + "image": f"{shlex.quote(container)}:{shlex.quote(tag)}", + }, + } + + return { + "name": "explorer_app", + "version": "3.8", + "services": services, + } + + +# type: ignore +def run_explorer_app( # type: ignore # noqa: PLR0913 + base_path: str, + port: int = PORT, + container: str = CONTAINER, + output_path: str = OUTPUT_PATH, + tag: t.Optional[str] = None, + extra_volumes: t.Union[t.Optional[list], t.Optional[str]] = None, +): # type: ignore + """ + Run an Explorer App in a Docker container. + + Args: + base_path: the base path where the Explorer App will be mounted. + port: The port number to expose the Explorer App. Default is 8501. + container: The name of the Docker container. Default is "fndnt/data_explorer". + output_path: The path to the Docker Compose specification. Default is + ".fondant/explorer-compose.yaml". + tag: The tag/version of the Docker container. Default is "latest". + extra_volumes: Extra volumes to mount in containers. You can use the --extra-volumes flag + to specify extra volumes to mount in the containers this can be used: + - to mount data directories to be used by the pipeline (note that if your pipeline's + base_path is local it will already be mounted for you). + - to mount cloud credentials + """ + os.makedirs(".fondant", exist_ok=True) + + explorer_app_spec = _generate_explorer_spec( + base_path=base_path, + port=port, + container=container, + tag=tag, + extra_volumes=extra_volumes, + ) + + with open(output_path, "w") as outfile: + yaml.dump(explorer_app_spec, outfile, default_flow_style=False) + + cmd = [ + "docker", + "compose", + "-f", + output_path, + "up", + "--build", + "--pull", + "always", + "--remove-orphans", + "--detach", + ] + + try: + subprocess.check_call(cmd, stdout=subprocess.PIPE) # nosec + except subprocess.CalledProcessError as e: + raise SystemExit(e.returncode) logging.info( - f"Running image from registry: {container} with tag: {tag} on port: {port}", + f"Running image from registry '{container}' with tag '{tag}' on port '{port}'", ) logging.info(f"Access the explorer at http://localhost:{port}") - subprocess.call(cmd, stdout=subprocess.PIPE) # nosec + +def stop_explorer_app( + output_path: str = OUTPUT_PATH, +): + """ + Stop the Explorer App. + + Args: + output_path: The path to save the Docker Compose specification. Default is + ".fondant/explorer-compose.yaml". + """ + cmd = [ + "docker", + "compose", + "-f", + output_path, + "stop", + ] + + try: + subprocess.check_call(cmd, stdout=subprocess.PIPE) # nosec + except subprocess.CalledProcessError as e: + raise SystemExit(e.returncode) + + # check if the container is running + logging.info( + "Explorer app stopped successfully", + ) diff --git a/src/fondant/pipeline/compiler.py b/src/fondant/pipeline/compiler.py index 2ba2ea752..c491e1542 100644 --- a/src/fondant/pipeline/compiler.py +++ b/src/fondant/pipeline/compiler.py @@ -5,13 +5,14 @@ import tempfile import typing as t from abc import ABC, abstractmethod -from dataclasses import asdict, dataclass +from dataclasses import asdict from pathlib import Path import yaml from fondant.core.exceptions import InvalidPipelineDefinition from fondant.core.manifest import Metadata +from fondant.core.schema import DockerVolume from fondant.pipeline import ( VALID_ACCELERATOR_TYPES, VALID_VERTEX_ACCELERATOR_TYPES, @@ -45,22 +46,6 @@ def log_unused_configurations(self, **kwargs): ) -@dataclass -class DockerVolume: - """Dataclass representing a DockerVolume. - (https://docs.docker.com/compose/compose-file/05-services/#volumes). - - Args: - type: the mount type volume (bind, volume) - source: the source of the mount, a path on the host for a bind mount - target: the path in the container where the volume is mounted. - """ - - type: str - source: str - target: str - - class DockerCompiler(Compiler): """Compiler that creates a docker-compose spec from a pipeline.""" diff --git a/src/fondant/testing.py b/src/fondant/testing.py index 160ebffd1..818d4171c 100644 --- a/src/fondant/testing.py +++ b/src/fondant/testing.py @@ -99,7 +99,7 @@ def from_spec(cls, spec_path: str) -> "PipelineConfigs": @dataclass -class DockerPipelineConfigs(PipelineConfigs): +class DockerComposeConfigs(PipelineConfigs): """ Represents Docker-specific configurations for a pipeline. @@ -110,7 +110,7 @@ class DockerPipelineConfigs(PipelineConfigs): component_configs: t.Optional[t.Dict[str, DockerComponentConfig]] = None @classmethod - def from_spec(cls, spec_path: str) -> "DockerPipelineConfigs": + def from_spec(cls, spec_path: str) -> "DockerComposeConfigs": """Get pipeline configs from a pipeline specification.""" with open(spec_path) as file_: specification = yaml.safe_load(file_) diff --git a/tests/pipeline/test_compiler.py b/tests/pipeline/test_compiler.py index dafd94cd9..df1f42a17 100644 --- a/tests/pipeline/test_compiler.py +++ b/tests/pipeline/test_compiler.py @@ -17,7 +17,7 @@ VertexCompiler, ) from fondant.testing import ( - DockerPipelineConfigs, + DockerComposeConfigs, KubeflowPipelineConfigs, VertexPipelineConfigs, ) @@ -136,14 +136,14 @@ def test_docker_compiler(setup_pipeline, tmp_path_factory): with tmp_path_factory.mktemp("temp") as fn: output_path = str(fn / "docker-compose.yml") compiler.compile(pipeline=pipeline, output_path=output_path, build_args=[]) - pipeline_configs = DockerPipelineConfigs.from_spec(output_path) + pipeline_configs = DockerComposeConfigs.from_spec(output_path) assert pipeline_configs.pipeline_name == pipeline.name assert pipeline_configs.pipeline_description == pipeline.description for ( component_name, component_configs, ) in pipeline_configs.component_configs.items(): - # Get exepcted component configs + # Get expected component configs component = pipeline._graph[component_name] component_op = component["operation"] @@ -177,7 +177,7 @@ def test_docker_local_path(setup_pipeline, tmp_path_factory): compiler = DockerCompiler() output_path = str(fn / "docker-compose.yml") compiler.compile(pipeline=pipeline, output_path=output_path) - pipeline_configs = DockerPipelineConfigs.from_spec(output_path) + pipeline_configs = DockerComposeConfigs.from_spec(output_path) expected_run_id = "testpipeline-20230101000000" for ( component_name, @@ -222,7 +222,7 @@ def test_docker_remote_path(setup_pipeline, tmp_path_factory): with tmp_path_factory.mktemp("temp") as fn: output_path = str(fn / "docker-compose.yml") compiler.compile(pipeline=pipeline, output_path=output_path) - pipeline_configs = DockerPipelineConfigs.from_spec(output_path) + pipeline_configs = DockerComposeConfigs.from_spec(output_path) expected_run_id = "testpipeline-20230101000000" for ( component_name, @@ -270,7 +270,7 @@ def test_docker_extra_volumes(setup_pipeline, tmp_path_factory): extra_volumes=extra_volumes, ) - pipeline_configs = DockerPipelineConfigs.from_spec(output_path) + pipeline_configs = DockerComposeConfigs.from_spec(output_path) for _, service in pipeline_configs.component_configs.items(): assert all( extra_volume in service.volumes for extra_volume in extra_volumes @@ -299,7 +299,7 @@ def test_docker_configuration(tmp_path_factory): with tmp_path_factory.mktemp("temp") as fn: output_path = str(fn / "docker-compose.yaml") compiler.compile(pipeline=pipeline, output_path=output_path) - pipeline_configs = DockerPipelineConfigs.from_spec(output_path) + pipeline_configs = DockerComposeConfigs.from_spec(output_path) component_config = pipeline_configs.component_configs["first_component"] assert component_config.accelerators[0].type == "gpu" assert component_config.accelerators[0].number == 1 @@ -547,7 +547,7 @@ def test_caching_dependency_docker(tmp_path_factory): with tmp_path_factory.mktemp("temp") as fn: output_path = str(fn / "docker-compose.yml") compiler.compile(pipeline=pipeline, output_path=output_path, build_args=[]) - pipeline_configs = DockerPipelineConfigs.from_spec(output_path) + pipeline_configs = DockerComposeConfigs.from_spec(output_path) metadata = json.loads( pipeline_configs.component_configs["second_component"].arguments[ "metadata" diff --git a/tests/test_explorer.py b/tests/test_explorer.py index 0ba10dc42..9b8ed082c 100644 --- a/tests/test_explorer.py +++ b/tests/test_explorer.py @@ -2,9 +2,11 @@ from unittest.mock import patch import pytest -from fondant.explore import run_explorer_app +from fondant.explore import run_explorer_app, stop_explorer_app +from fondant.testing import DockerComposeConfigs DEFAULT_CONTAINER = "fndnt/data_explorer" +OUTPUT_FILE = "explorer-compose.yaml" DEFAULT_TAG = "latest" DEFAULT_PORT = 8501 @@ -32,66 +34,120 @@ def container_path() -> str: return "/source" -def test_run_data_explorer_local_base_path(host_path, container_path, extra_volumes): +def test_run_data_explorer_local_base_path( + host_path, + container_path, + extra_volumes, + tmp_path_factory, +): """Test that the data explorer can be run with a local base path.""" - with patch("subprocess.call") as mock_call: + with tmp_path_factory.mktemp("temp") as fn, patch( + "subprocess.check_call", + ) as mock_call: + output_path = str(fn / OUTPUT_FILE) run_explorer_app( base_path=host_path, + output_path=output_path, port=DEFAULT_PORT, container=DEFAULT_CONTAINER, tag=DEFAULT_TAG, extra_volumes=extra_volumes, ) + + pipeline_configs = DockerComposeConfigs.from_spec(output_path) + data_explorer_config = pipeline_configs.component_configs["data_explorer"] + volumes = data_explorer_config.volumes + + assert data_explorer_config.arguments["base_path"] == container_path + assert data_explorer_config.image == f"{DEFAULT_CONTAINER}:{DEFAULT_TAG}" + assert data_explorer_config.ports == [f"{DEFAULT_PORT}:8501"] + assert volumes[0] == extra_volumes + assert volumes[1]["source"] == str(Path(host_path).resolve()) + assert volumes[1]["target"] == container_path + mock_call.assert_called_once_with( [ "docker", - "run", + "compose", + "-f", + output_path, + "up", + "--build", "--pull", "always", - "--name", - "fondant-explorer", - "--rm", - "-p", - "8501:8501", - "-v", - f"{extra_volumes}", - "-v", - f"{Path(host_path).resolve()}:{container_path}", - f"{DEFAULT_CONTAINER}:{DEFAULT_TAG}", - "--base_path", - f"{container_path}", + "--remove-orphans", + "--detach", ], stdout=-1, ) -def test_run_data_explorer_remote_base_path(remote_path, extra_volumes): +def test_run_data_explorer_remote_base_path( + remote_path, + extra_volumes, + tmp_path_factory, +): """Test that the data explorer can be run with a remote base path.""" - with patch("subprocess.call") as mock_call: + with tmp_path_factory.mktemp("temp") as fn, patch( + "subprocess.check_call", + ) as mock_call: + output_path = str(fn / OUTPUT_FILE) run_explorer_app( base_path=remote_path, + output_path=output_path, port=DEFAULT_PORT, container=DEFAULT_CONTAINER, tag=DEFAULT_TAG, extra_volumes=extra_volumes, ) + pipeline_configs = DockerComposeConfigs.from_spec(output_path) + data_explorer_config = pipeline_configs.component_configs["data_explorer"] + volumes = data_explorer_config.volumes + + assert data_explorer_config.arguments["base_path"] == remote_path + assert data_explorer_config.image == f"{DEFAULT_CONTAINER}:{DEFAULT_TAG}" + assert data_explorer_config.ports == [f"{DEFAULT_PORT}:8501"] + assert volumes[0] == extra_volumes + mock_call.assert_called_once_with( [ "docker", - "run", + "compose", + "-f", + output_path, + "up", + "--build", "--pull", "always", - "--name", - "fondant-explorer", - "--rm", - "-p", - "8501:8501", - "-v", - f"{extra_volumes}", - f"{DEFAULT_CONTAINER}:{DEFAULT_TAG}", - "--base_path", - f"{remote_path}", + "--remove-orphans", + "--detach", + ], + stdout=-1, + ) + + +def test_stop_data_explorer( + remote_path, + extra_volumes, + tmp_path_factory, +): + """Test that the data explorer can be run with a remote base path.""" + with tmp_path_factory.mktemp("temp") as fn, patch( + "subprocess.check_call", + ) as mock_call: + output_path = str(fn / OUTPUT_FILE) + stop_explorer_app( + output_path=output_path, + ) + + mock_call.assert_called_once_with( + [ + "docker", + "compose", + "-f", + output_path, + "stop", ], stdout=-1, )