From 2ab9cefe228b27311128f66294753ef0950ec1f1 Mon Sep 17 00:00:00 2001 From: Philippe Moussalli Date: Wed, 13 Sep 2023 08:59:34 +0200 Subject: [PATCH] Detect pipeline attribute during compile/run (#398) Addresses #375 --- README.md | 2 +- docs/getting_started.md | 6 +- src/fondant/cli.py | 115 +++++++++--------- .../{valid.py => component.py} | 0 .../invalid_double_pipeline.py | 4 + tests/example_modules/pipeline.py | 3 + tests/test_cli.py | 73 +++++++---- 7 files changed, 118 insertions(+), 85 deletions(-) rename tests/example_modules/{valid.py => component.py} (100%) create mode 100644 tests/example_modules/invalid_double_pipeline.py create mode 100644 tests/example_modules/pipeline.py diff --git a/README.md b/README.md index eabcd3b62..97169b27d 100644 --- a/README.md +++ b/README.md @@ -257,7 +257,7 @@ For more advanced use cases, you can use the `DaskTransformComponent` instead. Once you have a pipeline you can easily run (and compile) it by using the built-in CLI: ```bash -fondant run foo.bar:pipeline --local +fondant run pipeline.py --local ``` To see all available arguments you can check the fondant CLI help pages diff --git a/docs/getting_started.md b/docs/getting_started.md index 58a87954a..f30a3c2ef 100644 --- a/docs/getting_started.md +++ b/docs/getting_started.md @@ -139,10 +139,10 @@ Fondant has a feature rich CLI that helps you with these steps. Let's start by r First of all make sure you have [Docker Compose](https://docs.docker.com/compose/) installed on your system ```bash -fondant run pipeline:my_pipeline --local +fondant run pipeline.py --local ``` -We call the fondant CLI to compile and run our pipeline, we pass a reference to our pipeline using the import_string syntax `:`. We also pass the `--local` flag to indicate we want to compile our pipeline for the local runner. +We call the fondant CLI to compile and run our pipeline, we pass the module containing the pipeline instance, the instance is the automatically detected. We also pass the `--local` flag to indicate we want to compile our pipeline for the local runner. Running this command will create a `docker-compose.yml` file with the compiled pipeline definition. Feel free to inspect this file but changing it is not needed. Note that if you use a local `base_path` in your pipeline declaration that this path will be mounted in the docker containers. This means that the data will be stored locally on your machine. If you use a cloud storage path, the data will be stored in the cloud. @@ -287,7 +287,7 @@ We add the component to our pipeline definition and specify that it depends on t We can now easily run our new pipeline: ```bash -fondant run pipeline:my_pipeline --local +fondant run pipeline --local ``` You will see that the components runs sequentially and that each has its own logs. diff --git a/src/fondant/cli.py b/src/fondant/cli.py index 946b6568d..f6423d086 100644 --- a/src/fondant/cli.py +++ b/src/fondant/cli.py @@ -23,6 +23,7 @@ import textwrap import typing as t from collections import defaultdict +from types import ModuleType from fondant.compiler import DockerCompiler, KubeFlowCompiler from fondant.component import BaseComponent, Component @@ -52,7 +53,7 @@ def entrypoint(): This CLI is used to interact with fondant pipelines like compiling and running your pipelines. Example: - fondant compile my_project.my_pipeline.py:pipeline + fondant compile my_project.my_pipeline.py """, ), epilog=textwrap.dedent( @@ -208,16 +209,18 @@ def register_compile(parent_parser): - to mount cloud credentials (see examples)) Example: - fondant compile my_project.my_pipeline.py:pipeline --local --extra-volumes $HOME/.aws/credentials:/root/.aws/credentials + fondant compile my_project.my_pipeline.py --local --extra-volumes $HOME/.aws/credentials:/root/.aws/credentials - fondant compile my_project.my_pipeline.py:pipeline --kubeflow --extra-volumes $HOME/.config/gcloud/application_default_credentials.json:/root/.config/gcloud/application_default_credentials.json + fondant compile my_project.my_pipeline.py --kubeflow --extra-volumes $HOME/.config/gcloud/application_default_credentials.json:/root/.config/gcloud/application_default_credentials.json """, ), ) parser.add_argument( - "pipeline", - help="Path to the fondant pipeline: path.to.module:instance", - type=pipeline_from_string, + "ref", + help="""Reference to the pipeline to run, can be a path to a spec file or + a module containing the pipeline instance that will be compiled first (e.g. pipeline.py) + """, + action="store", ) # add a mutually exclusive group for the mode mode_group = parser.add_mutually_exclusive_group(required=True) @@ -247,17 +250,19 @@ def register_compile(parent_parser): def compile(args): args = set_default_output(args) + pipeline = pipeline_from_module(args.ref) + if args.local: compiler = DockerCompiler() compiler.compile( - pipeline=args.pipeline, + pipeline=pipeline, extra_volumes=args.extra_volumes, output_path=args.output_path, build_args=args.build_arg, ) elif args.kubeflow: compiler = KubeFlowCompiler() - compiler.compile(pipeline=args.pipeline, output_path=args.output_path) + compiler.compile(pipeline=pipeline, output_path=args.output_path) def register_run(parent_parser): @@ -274,7 +279,7 @@ def register_run(parent_parser): You can use the --extra-volumes flag to specify extra volumes to mount in the containers this can be used: Example: - fondant run my_project.my_pipeline.py:pipeline --local --extra-volumes $HOME/.aws/credentials:/root/.aws/credentials + fondant run my_project.my_pipeline.py --local --extra-volumes $HOME/.aws/credentials:/root/.aws/credentials fondant run ./my_compiled_kubeflow_pipeline.tgz --kubeflow """, ), @@ -282,7 +287,8 @@ def register_run(parent_parser): parser.add_argument( "ref", help="""Reference to the pipeline to run, can be a path to a spec file or - a pipeline instance that will be compiled first""", + a module containing the pipeline instance that will be compiled first (e.g. pipeline.py) + """, action="store", ) # add a mutually exclusive group for the mode @@ -315,8 +321,8 @@ def run(args): if args.local: try: - pipeline = pipeline_from_string(args.ref) - except ImportFromStringError: + pipeline = pipeline_from_module(args.ref) + except ModuleNotFoundError: spec_ref = args.ref else: spec_ref = args.output_path @@ -338,8 +344,8 @@ def run(args): msg = "--host argument is required for running on Kubeflow" raise ValueError(msg) try: - pipeline = pipeline_from_string(args.ref) - except ImportFromStringError: + pipeline = pipeline_from_module(args.ref) + except ModuleNotFoundError: spec_ref = args.ref else: spec_ref = args.output_path @@ -389,62 +395,59 @@ def execute(args): executor.execute(component) -class ImportFromStringError(Exception): +class ComponentImportError(Exception): """Error raised when an import string is not valid.""" -class ImportFromModuleError(Exception): +class PipelineImportError(Exception): """Error raised when an import from module is not valid.""" -def pipeline_from_string(import_string: str) -> Pipeline: - """Try to import a pipeline from a string otherwise raise an ImportFromStringError.""" - module_str, _, attr_str = import_string.rpartition(":") - if not attr_str or not module_str: - raise ImportFromStringError( - f"{import_string} is not a valid import string." - + "Please provide a valid import string in the format of module:attr", - ) +def get_module(module_str: str) -> ModuleType: + """Function that retrieves module from a module string.""" + if ".py" in module_str: + module_str = module_str.rsplit(".py", 1)[0] + + module_str = module_str.replace("/", ".") try: module = importlib.import_module(module_str) - except ImportError: - msg = f"{module_str} is not a valid module. Please provide a valid module." - raise ImportFromStringError( - msg, - ) + except ModuleNotFoundError: + msg = f"`{module_str}` was not found. Please provide a valid module." + raise ModuleNotFoundError(msg) - try: - for attr_str_element in attr_str.split("."): - instance = getattr(module, attr_str_element) - except AttributeError: - msg = f"{attr_str} is not found in {module}." - raise ImportFromStringError(msg) + return module - if not isinstance(instance, Pipeline): - msg = f"{module}:{instance} is not a valid pipeline." - raise ImportFromStringError(msg) - return instance +def pipeline_from_module(module_str: str) -> Pipeline: + """Try to import a pipeline from a string otherwise raise an ImportFromStringError.""" + module = get_module(module_str) + pipeline_instances = [ + obj for obj in module.__dict__.values() if isinstance(obj, Pipeline) + ] -def component_from_module(module_str: str) -> t.Type[Component]: - """Try to import a component from a module otherwise raise an ImportFromModuleError.""" - if ".py" in module_str: - module_str = module_str.rsplit(".py", 1)[0] - - module_str = module_str.replace("/", ".") + if not pipeline_instances: + msg = f"No pipeline found in module {module_str}" + raise PipelineImportError(msg) - try: - class_members = inspect.getmembers( - importlib.import_module(module_str), - inspect.isclass, - ) - except ModuleNotFoundError: - msg = f"`{module_str}` was not found. Please provide a valid module." - raise ImportFromModuleError( - msg, + if len(pipeline_instances) > 1: + msg = ( + f"Found multiple instantiated pipelines in {module_str}. Only one pipeline " + f"can be present" ) + raise PipelineImportError(msg) + + pipeline = pipeline_instances[0] + logger.info(f"Pipeline `{pipeline.name}` found in module {module_str}") + + return pipeline + + +def component_from_module(module_str: str) -> t.Type[Component]: + """Try to import a component from a module otherwise raise an ImportFromModuleError.""" + module = get_module(module_str) + class_members = inspect.getmembers(module, inspect.isclass) component_classes_dict = defaultdict(list) @@ -455,7 +458,7 @@ def component_from_module(module_str: str) -> t.Type[Component]: if len(component_classes_dict) == 0: msg = f"No Component found in module {module_str}" - raise ImportFromModuleError(msg) + raise ComponentImportError(msg) max_order = max(component_classes_dict) found_components = component_classes_dict[max_order] @@ -465,7 +468,7 @@ def component_from_module(module_str: str) -> t.Type[Component]: f"Found multiple components in {module_str}: {found_components}. Only one component " f"can be present" ) - raise ImportFromModuleError(msg) + raise ComponentImportError(msg) component_name, component_cls = found_components[0] logger.info(f"Component `{component_name}` found in module {module_str}") diff --git a/tests/example_modules/valid.py b/tests/example_modules/component.py similarity index 100% rename from tests/example_modules/valid.py rename to tests/example_modules/component.py diff --git a/tests/example_modules/invalid_double_pipeline.py b/tests/example_modules/invalid_double_pipeline.py new file mode 100644 index 000000000..c518d20e2 --- /dev/null +++ b/tests/example_modules/invalid_double_pipeline.py @@ -0,0 +1,4 @@ +from fondant.pipeline import Pipeline + +TEST_PIPELINE = Pipeline(pipeline_name="test_pipeline", base_path="some/path") +TEST_PIPELINE_2 = Pipeline(pipeline_name="test_pipeline", base_path="some/path") diff --git a/tests/example_modules/pipeline.py b/tests/example_modules/pipeline.py new file mode 100644 index 000000000..74bb57927 --- /dev/null +++ b/tests/example_modules/pipeline.py @@ -0,0 +1,3 @@ +from fondant.pipeline import Pipeline + +pipeline = Pipeline(pipeline_name="test_pipeline", base_path="some/path") diff --git a/tests/test_cli.py b/tests/test_cli.py index af83a7eaa..33b742763 100644 --- a/tests/test_cli.py +++ b/tests/test_cli.py @@ -4,12 +4,13 @@ import pytest from fondant.cli import ( - ImportFromModuleError, - ImportFromStringError, + ComponentImportError, + PipelineImportError, compile, component_from_module, execute, - pipeline_from_string, + get_module, + pipeline_from_module, run, ) from fondant.component import DaskLoadComponent @@ -47,11 +48,29 @@ def test_basic_invocation(command): @pytest.mark.parametrize( "module_str", [ - __name__, - "example_modules.valid", - "example_modules/valid", - "example_modules.valid.py", - "example_modules/valid.py", + "example_modules.component", + "example_modules/component", + "example_modules.component.py", + "example_modules/component.py", + ], +) +def test_get_module(module_str): + """Test get module method.""" + module = get_module(module_str) + assert module.__name__ == "example_modules.component" + + +def test_get_module_error(): + """Test that an error is returned when an attempting to import an invalid module.""" + with pytest.raises(ModuleNotFoundError): + component_from_module("example_modules.invalid") + + +@pytest.mark.parametrize( + "module_str", + [ + __name__, # cannot be split + "example_modules.component", # module does not exist ], ) def test_component_from_module(module_str): @@ -63,36 +82,40 @@ def test_component_from_module(module_str): @pytest.mark.parametrize( "module_str", [ - "example_modules.None_existing_module", # module does not exist "example_modules.invalid_component", # module contains more than one component class "example_modules.invalid_double_components", # module does not contain a component class ], ) def test_component_from_module_error(module_str): """Test different error cases for pipeline_from_string.""" - with pytest.raises(ImportFromModuleError): + with pytest.raises(ComponentImportError): component_from_module(module_str) -def test_pipeline_from_string(): +@pytest.mark.parametrize( + "module_str", + [ + __name__, + "example_modules.pipeline", + ], +) +def test_pipeline_from_module(module_str): """Test that pipeline_from_string works.""" - pipeline = pipeline_from_string(__name__ + ":TEST_PIPELINE") - assert pipeline == TEST_PIPELINE + pipeline = pipeline_from_module(module_str) + assert pipeline.name == "test_pipeline" @pytest.mark.parametrize( - "import_string", + "module_str", [ - "foo.barTEST_PIPELINE", # cannot be split - "foo.bar:TEST_PIPELINE", # module does not exist - __name__ + ":IM_NOT_REAL", # pipeline does not exist - __name__ + ":test_basic_invocation", # not a pipeline instance + "example_modules.component", # module does not contain a pipeline instance + "example_modules.invalid_double_pipeline", # module contains many pipeline instances ], ) -def test_pipeline_from_string_error(import_string): +def test_pipeline_from_module_error(module_str): """Test different error cases for pipeline_from_string.""" - with pytest.raises(ImportFromStringError): - pipeline_from_string(import_string) + with pytest.raises(PipelineImportError): + pipeline_from_module(module_str) def test_execute_logic(monkeypatch): @@ -107,9 +130,9 @@ def test_local_logic(tmp_path_factory): """Test that the compile command works with arguments.""" with tmp_path_factory.mktemp("temp") as fn: args = argparse.Namespace( + ref=__name__, local=True, kubeflow=False, - pipeline=TEST_PIPELINE, output_path=str(fn / "docker-compose.yml"), extra_volumes=[], build_arg=[], @@ -120,9 +143,9 @@ def test_local_logic(tmp_path_factory): def test_kfp_compile(tmp_path_factory): with tmp_path_factory.mktemp("temp") as fn: args = argparse.Namespace( + ref=__name__, kubeflow=True, local=False, - pipeline=TEST_PIPELINE, output_path=str(fn / "kubeflow_pipelines.yml"), ) compile(args) @@ -150,7 +173,7 @@ def test_local_run(tmp_path_factory): with patch("subprocess.call") as mock_call, tmp_path_factory.mktemp("temp") as fn: args1 = argparse.Namespace( local=True, - ref=__name__ + ":TEST_PIPELINE", + ref=__name__, output_path=str(fn / "docker-compose.yml"), extra_volumes=[], build_arg=[], @@ -203,7 +226,7 @@ def test_kfp_run(tmp_path_factory): local=False, host="localhost2", output_path=str(fn / "kubeflow_pipelines.yml"), - ref=__name__ + ":TEST_PIPELINE", + ref=__name__, ) run(args) mock_runner.assert_called_once_with(host="localhost2")