From 5cc072f96a5c1116b8c2a392f43acc4eb7899a36 Mon Sep 17 00:00:00 2001 From: Georges Lorre Date: Tue, 8 Aug 2023 14:49:20 +0200 Subject: [PATCH] Make validate method for pipeline and call it from the compilers --- src/fondant/compiler.py | 8 +++++--- src/fondant/components | 1 + src/fondant/pipeline.py | 14 ++++++++++++++ src/fondant/runner.py | 31 ------------------------------- tests/test_compiler.py | 1 + tests/test_pipeline.py | 9 ++++----- 6 files changed, 25 insertions(+), 39 deletions(-) create mode 120000 src/fondant/components diff --git a/src/fondant/compiler.py b/src/fondant/compiler.py index 52ce263eb..6c1c76ad8 100644 --- a/src/fondant/compiler.py +++ b/src/fondant/compiler.py @@ -134,10 +134,13 @@ def _generate_spec( """ timestamp = datetime.datetime.now().strftime("%Y%m%d%H%M%S") path, volume = self._patch_path(base_path=pipeline.base_path) - metadata = MetaData(run_id=f"{pipeline.name}-{timestamp}", base_path=path) + run_id = f"{pipeline.name}-{timestamp}" + metadata = MetaData(run_id=run_id, base_path=path) services = {} + pipeline.validate(run_id=run_id) + for component_name, component in pipeline._graph.items(): logger.info(f"Compiling service for {component_name}") safe_component_name = self._safe_component_name(component_name) @@ -240,8 +243,7 @@ def compile( output_path: the path where to save the Kubeflow pipeline spec """ self.pipeline = pipeline - self.pipeline.sort_graph() - self.pipeline._validate_pipeline_definition("{{workflow.name}}") + self.pipeline.validate(run_id="{{workflow.name}}") logger.info(f"Compiling {self.pipeline.name} to {output_path}") wrapped_pipeline = (self.kfp.dsl.pipeline())(self.kf_pipeline) # type: ignore self.kfp.compiler.Compiler().compile(wrapped_pipeline, output_path) # type: ignore diff --git a/src/fondant/components b/src/fondant/components new file mode 120000 index 000000000..6e10371d3 --- /dev/null +++ b/src/fondant/components @@ -0,0 +1 @@ +../../components \ No newline at end of file diff --git a/src/fondant/pipeline.py b/src/fondant/pipeline.py index 66b08bab5..2b2a3dcd0 100644 --- a/src/fondant/pipeline.py +++ b/src/fondant/pipeline.py @@ -240,6 +240,7 @@ def add_op( def sort_graph(self): """Sort the graph topologically based on task dependencies.""" + logger.info("Sorting pipeline component graph topologically.") sorted_graph = [] visited = set() @@ -269,6 +270,15 @@ def _validate_pipeline_name(pipeline_name: str) -> str: raise InvalidPipelineDefinition(msg) return pipeline_name + def validate(self, run_id: str): + """Sort and run validation on the pipeline definition. + + Args: + run_id (str, optional): run identifier. Defaults to None. + """ + self.sort_graph() + self._validate_pipeline_definition(run_id) + def _validate_pipeline_definition(self, run_id: str): """ Validates the pipeline definition by ensuring that the consumed and produced subsets and @@ -281,6 +291,10 @@ def _validate_pipeline_definition(self, run_id: str): base_path: the base path where to store the pipelines artifacts run_id: the run id of the component """ + if len(self._graph.keys()) == 0: + logger.info("No components defined in the pipeline. Nothing to validate.") + return + # TODO: change later if we decide to run 2 fondant pipelines after each other load_component = True load_component_name = list(self._graph.keys())[0] diff --git a/src/fondant/runner.py b/src/fondant/runner.py index 286c2922a..a5311f7dc 100644 --- a/src/fondant/runner.py +++ b/src/fondant/runner.py @@ -29,34 +29,3 @@ def run(self, input_spec: str, *args, **kwargs): ] subprocess.call(cmd) # nosec - - -class KubeflowRunner(Runner): - def __init__(self): - self._resolve_imports() - - @abstractmethod - def _resolve_imports(self): - """Resolve imports for the Kubeflow compiler.""" - try: - global kfp - import kfp - except ImportError: - raise ImportError( - "You need to install kfp to use the Kubeflow compiler, " - / "you can install it with `pip install --extras kfp`", - ) - - def run(cls, input_spec: str, host: str, *args, **kwargs): - """Run a kubeflow pipeline.""" - pass - # client = kfp.Client(host=host) - # # TODO add logic to see if pipeline exists - # pipeline_spec = client.run_pipeline( - # experiment_id=experiment.id, - # job_name=run_name, - # pipeline_package_path=pipeline.package_path, - # ) - - # pipeline_url = f"{self.host}/#/runs/details/{pipeline_spec.id}" - # logger.info(f"Pipeline is running at: {pipeline_url}") diff --git a/tests/test_compiler.py b/tests/test_compiler.py index ef0105a5e..553069a42 100644 --- a/tests/test_compiler.py +++ b/tests/test_compiler.py @@ -214,6 +214,7 @@ def test_kubeflow_configuration(tmp_path_factory): Path(COMPONENTS_PATH / "example_1" / "first_component"), arguments={"storage_args": "a dummy string arg"}, node_pool_name="a_node_pool", + node_pool_label="a_node_pool_label", number_of_gpus=1, p_volumes={"/mnt": PipelineVolume(name="mypvc", empty_dir={})}, ephemeral_storage_size="1Gi", diff --git a/tests/test_pipeline.py b/tests/test_pipeline.py index 0d4c55b14..b4270e725 100644 --- a/tests/test_pipeline.py +++ b/tests/test_pipeline.py @@ -182,13 +182,12 @@ def test_invalid_pipeline_dependencies(default_pipeline_args, valid_pipeline_exa ("example_3", ["first_component", "second_component"]), ], ) -def test_invalid_pipeline_compilation( +def test_invalid_pipeline_declaration( default_pipeline_args, invalid_pipeline_example, ): - """ - Test that an InvalidPipelineDefinition exception is raised when attempting to compile - an invalid pipeline definition. + """Test that an InvalidPipelineDefinition exception is raised when attempting + to register invalid components combinations. """ example_dir, component_names = invalid_pipeline_example components_path = Path(invalid_pipeline_path / example_dir) @@ -212,7 +211,7 @@ def test_invalid_pipeline_compilation( pipeline._validate_pipeline_definition("test_pipeline") -def test_invalid_pipeline_composition(default_pipeline_args): +def test_invalid_pipeline_validation(default_pipeline_args): """ Test that an InvalidPipelineDefinition exception is raised when attempting to compile an invalid pipeline definition.