Skip to content

Commit

Permalink
Make validate method for pipeline and call it from the compilers
Browse files Browse the repository at this point in the history
  • Loading branch information
GeorgesLorre committed Aug 8, 2023
1 parent c12fd20 commit 5cc072f
Show file tree
Hide file tree
Showing 6 changed files with 25 additions and 39 deletions.
8 changes: 5 additions & 3 deletions src/fondant/compiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -134,10 +134,13 @@ def _generate_spec(
"""
timestamp = datetime.datetime.now().strftime("%Y%m%d%H%M%S")
path, volume = self._patch_path(base_path=pipeline.base_path)
metadata = MetaData(run_id=f"{pipeline.name}-{timestamp}", base_path=path)
run_id = f"{pipeline.name}-{timestamp}"
metadata = MetaData(run_id=run_id, base_path=path)

services = {}

pipeline.validate(run_id=run_id)

for component_name, component in pipeline._graph.items():
logger.info(f"Compiling service for {component_name}")
safe_component_name = self._safe_component_name(component_name)
Expand Down Expand Up @@ -240,8 +243,7 @@ def compile(
output_path: the path where to save the Kubeflow pipeline spec
"""
self.pipeline = pipeline
self.pipeline.sort_graph()
self.pipeline._validate_pipeline_definition("{{workflow.name}}")
self.pipeline.validate(run_id="{{workflow.name}}")
logger.info(f"Compiling {self.pipeline.name} to {output_path}")
wrapped_pipeline = (self.kfp.dsl.pipeline())(self.kf_pipeline) # type: ignore
self.kfp.compiler.Compiler().compile(wrapped_pipeline, output_path) # type: ignore
Expand Down
1 change: 1 addition & 0 deletions src/fondant/components
14 changes: 14 additions & 0 deletions src/fondant/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,7 @@ def add_op(

def sort_graph(self):
"""Sort the graph topologically based on task dependencies."""
logger.info("Sorting pipeline component graph topologically.")
sorted_graph = []
visited = set()

Expand Down Expand Up @@ -269,6 +270,15 @@ def _validate_pipeline_name(pipeline_name: str) -> str:
raise InvalidPipelineDefinition(msg)
return pipeline_name

def validate(self, run_id: str):
"""Sort and run validation on the pipeline definition.
Args:
run_id (str, optional): run identifier. Defaults to None.
"""
self.sort_graph()
self._validate_pipeline_definition(run_id)

def _validate_pipeline_definition(self, run_id: str):
"""
Validates the pipeline definition by ensuring that the consumed and produced subsets and
Expand All @@ -281,6 +291,10 @@ def _validate_pipeline_definition(self, run_id: str):
base_path: the base path where to store the pipelines artifacts
run_id: the run id of the component
"""
if len(self._graph.keys()) == 0:
logger.info("No components defined in the pipeline. Nothing to validate.")
return

# TODO: change later if we decide to run 2 fondant pipelines after each other
load_component = True
load_component_name = list(self._graph.keys())[0]
Expand Down
31 changes: 0 additions & 31 deletions src/fondant/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,34 +29,3 @@ def run(self, input_spec: str, *args, **kwargs):
]

subprocess.call(cmd) # nosec


class KubeflowRunner(Runner):
def __init__(self):
self._resolve_imports()

@abstractmethod
def _resolve_imports(self):
"""Resolve imports for the Kubeflow compiler."""
try:
global kfp
import kfp
except ImportError:
raise ImportError(
"You need to install kfp to use the Kubeflow compiler, "
/ "you can install it with `pip install --extras kfp`",
)

def run(cls, input_spec: str, host: str, *args, **kwargs):
"""Run a kubeflow pipeline."""
pass
# client = kfp.Client(host=host)
# # TODO add logic to see if pipeline exists
# pipeline_spec = client.run_pipeline(
# experiment_id=experiment.id,
# job_name=run_name,
# pipeline_package_path=pipeline.package_path,
# )

# pipeline_url = f"{self.host}/#/runs/details/{pipeline_spec.id}"
# logger.info(f"Pipeline is running at: {pipeline_url}")
1 change: 1 addition & 0 deletions tests/test_compiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,7 @@ def test_kubeflow_configuration(tmp_path_factory):
Path(COMPONENTS_PATH / "example_1" / "first_component"),
arguments={"storage_args": "a dummy string arg"},
node_pool_name="a_node_pool",
node_pool_label="a_node_pool_label",
number_of_gpus=1,
p_volumes={"/mnt": PipelineVolume(name="mypvc", empty_dir={})},
ephemeral_storage_size="1Gi",
Expand Down
9 changes: 4 additions & 5 deletions tests/test_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -182,13 +182,12 @@ def test_invalid_pipeline_dependencies(default_pipeline_args, valid_pipeline_exa
("example_3", ["first_component", "second_component"]),
],
)
def test_invalid_pipeline_compilation(
def test_invalid_pipeline_declaration(
default_pipeline_args,
invalid_pipeline_example,
):
"""
Test that an InvalidPipelineDefinition exception is raised when attempting to compile
an invalid pipeline definition.
"""Test that an InvalidPipelineDefinition exception is raised when attempting
to register invalid components combinations.
"""
example_dir, component_names = invalid_pipeline_example
components_path = Path(invalid_pipeline_path / example_dir)
Expand All @@ -212,7 +211,7 @@ def test_invalid_pipeline_compilation(
pipeline._validate_pipeline_definition("test_pipeline")


def test_invalid_pipeline_composition(default_pipeline_args):
def test_invalid_pipeline_validation(default_pipeline_args):
"""
Test that an InvalidPipelineDefinition exception is raised when attempting to compile
an invalid pipeline definition.
Expand Down

0 comments on commit 5cc072f

Please sign in to comment.