-
Notifications
You must be signed in to change notification settings - Fork 26
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add kfp compiler #291
Add kfp compiler #291
Conversation
pipeline_versions = self.client.list_pipeline_versions(pipeline_id).versions | ||
return [version.id for version in pipeline_versions] | ||
|
||
def delete_pipeline(self, pipeline_name: str): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Where do you see all the other functionalities that are not related to compiling and running? user might still want to list pipeline runs, delete pipelines, ...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure if we need to wrap this functionality for every platform we want to run on. At least at this point in time, I would be ok with removing it and having the user work with the underlying platform directly.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
makes sense, we'll re-add it later if we notice that it's needed
|
||
def compile( | ||
self, | ||
pipeline: Pipeline, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the Pipeline
is the unifying interface between all runners right? if so, would add it to the abstract class
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I like the overall structure and separation between the Compiler and Runner!
I think there are still some things that don't fall quite between like Client related functionalities. Not the highest priority but we should figure out where those land
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @GeorgesLorre!
The lines along you split the behavior look good to me!
src/fondant/compiler.py
Outdated
except ImportError: | ||
raise ImportError( | ||
"You need to install kfp to use the Kubeflow compiler, " | ||
/ "you can install it with `poetry install --extras kfp`" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
/ "you can install it with `poetry install --extras kfp`" | |
/ "you can install it with `pip install fondant[kfp]`" |
poetry install
is only for fondant developers. Users can also use poetry, but then they'll install their own project and need to indicate the fondant extras with the fondant dependency in their pyproject.toml
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
src/fondant/runner.py
Outdated
"You need to install kfp to use the Kubeflow compiler, " | ||
/ "you can install it with `poetry install --extras kfp`" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"You need to install kfp to use the Kubeflow compiler, " | |
/ "you can install it with `poetry install --extras kfp`" | |
"You need to install kfp to use the Kubeflow runner, " | |
/ "you can install it with `pip install fondant[kfp]`" |
pipeline_versions = self.client.list_pipeline_versions(pipeline_id).versions | ||
return [version.id for version in pipeline_versions] | ||
|
||
def delete_pipeline(self, pipeline_name: str): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure if we need to wrap this functionality for every platform we want to run on. At least at this point in time, I would be ok with removing it and having the user work with the underlying platform directly.
src/fondant/runner.py
Outdated
def run(cls, input_spec: str, host: str, *args, **kwargs): | ||
"""Run a kubeflow pipeline.""" | ||
cls._resolve_imports() | ||
client = kfp.Client(host=host) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This probably doesn't change a lot to the current functionality, but it would make more sense to me to instantiate this in __init__()
.
9f3d36d
to
e7e98f2
Compare
e7e98f2
to
6d26bb3
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @GeorgesLorre!
The general direction looks good. Left some comments.
src/fondant/compiler.py
Outdated
self.kfp = kfp | ||
except ImportError: | ||
msg = """You need to install kfp to use the Kubeflow compiler,\n | ||
you can install it with `pip install --extras kfp`""" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you can install it with `pip install --extras kfp`""" | |
you can install it with `pip install fondant[pipelines]`""" |
Although we probably want to split this up for the different runners so it becomes fondant[kfp].
src/fondant/compiler.py
Outdated
self.pipeline.sort_graph() | ||
self.pipeline._validate_pipeline_definition("{{workflow.name}}") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should be done in the DockerCompiler
as well, no?
@@ -18,8 +17,6 @@ | |||
from fondant.schema import validate_partition_number, validate_partition_size | |||
|
|||
if is_kfp_available(): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ideally we can get rid of this and make this file completely runner independent.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is only needed for setting p_volumes
at the componentOp
level, that was only needed when we were initially planning to download the Laion dataset locally to use it. I think we can remove both ephemeral_storage_size
and p_volumes
from the ComponenOp. We can always re-add them later if we notice they're needed
src/fondant/runner.py
Outdated
def _resolve_imports(self): | ||
"""Resolve imports for the Kubeflow compiler.""" | ||
try: | ||
global kfp |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we assign to self.kfp
here as in the compiler?
src/fondant/runner.py
Outdated
except ImportError: | ||
raise ImportError( | ||
"You need to install kfp to use the Kubeflow compiler, " | ||
/ "you can install it with `pip install --extras kfp`", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
See comment on same line above.
src/fondant/runner.py
Outdated
# client = kfp.Client(host=host) | ||
# # TODO add logic to see if pipeline exists | ||
# pipeline_spec = client.run_pipeline( | ||
# experiment_id=experiment.id, | ||
# job_name=run_name, | ||
# pipeline_package_path=pipeline.package_path, | ||
# ) | ||
|
||
# pipeline_url = f"{self.host}/#/runs/details/{pipeline_spec.id}" | ||
# logger.info(f"Pipeline is running at: {pipeline_url}") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there a reason this is still commented out?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should not have committed this, will be taken up in a separate PR addressing the kfp runner
""" | ||
Test that an exception is raised when the passed invalid argument name or type to the fondant | ||
component does not match the ones specified in the fondant specifications. | ||
Test that an InvalidPipelineDefinition exception is raised when attempting to compile |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You're no longer compiling in the test :)
tests/test_pipeline.py
Outdated
], | ||
) | ||
def test_invalid_pipeline_compilation( | ||
default_pipeline_args, | ||
invalid_pipeline_example, | ||
tmp_path, | ||
): | ||
""" | ||
Test that an InvalidPipelineDefinition exception is raised when attempting to compile |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You're no longer compiling in the test :)
tests/test_pipeline.py
Outdated
@@ -169,12 +165,12 @@ def test_invalid_pipeline_dependencies( | |||
[ | |||
("example_1", ["first_component", "second_component"]), | |||
("example_2", ["first_component", "second_component"]), | |||
("example_3", ["first_component", "second_component"]), | |||
], | |||
) | |||
def test_invalid_pipeline_compilation( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
def test_invalid_pipeline_compilation( | |
def test_invalid_pipeline_validation( |
src/fondant/components
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should not be removed
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks Georges! Looks much cleaner already ✨
src/fondant/compiler.py
Outdated
self.kfp.compiler.Compiler().compile(wrapped_pipeline, output_path) # type: ignore | ||
logger.info("Pipeline compiled successfully") | ||
|
||
def kf_pipeline(self): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
def kf_pipeline(self): | |
def kfp_pipeline(self): |
src/fondant/compiler.py
Outdated
metadata=metadata, | ||
**component_args, | ||
) | ||
else: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No else statement needed, the two statements are the same, I think now they can be the same since we're passing the metadata to every components
@@ -18,8 +17,6 @@ | |||
from fondant.schema import validate_partition_number, validate_partition_size | |||
|
|||
if is_kfp_available(): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is only needed for setting p_volumes
at the componentOp
level, that was only needed when we were initially planning to download the Laion dataset locally to use it. I think we can remove both ephemeral_storage_size
and p_volumes
from the ComponenOp. We can always re-add them later if we notice they're needed
6d26bb3
to
5cc072f
Compare
@RobbeSneyders updated! |
@PhilippeMoussalli Ok I'll remove the |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks! Can you just update the error message for the kfp import before merging?
Move all kfp related stuff from the Pipeline to a separate kfp compiler
Untested -> looking for feedback on the structure not the content