Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove obsolete args from ComponentOp #356

Merged
merged 2 commits into from
Aug 16, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 0 additions & 6 deletions src/fondant/compiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -292,17 +292,11 @@ def _set_configuration(self, task, fondant_component_operation):
# Unpack optional specifications
number_of_gpus = fondant_component_operation.number_of_gpus
node_pool_name = fondant_component_operation.node_pool_name
p_volumes = fondant_component_operation.p_volumes
ephemeral_storage_size = fondant_component_operation.ephemeral_storage_size

# Assign optional specification
if number_of_gpus is not None:
task.set_gpu_limit(number_of_gpus)
if node_pool_name is not None:
task.add_node_selector_constraint("node_pool", node_pool_name)
if p_volumes is not None:
task.add_pvolumes(p_volumes)
if ephemeral_storage_size is not None:
task.set_ephemeral_storage_request(ephemeral_storage_size)

return task
26 changes: 2 additions & 24 deletions src/fondant/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,9 @@

from fondant.component_spec import ComponentSpec
from fondant.exceptions import InvalidPipelineDefinition
from fondant.import_utils import is_kfp_available
from fondant.manifest import Manifest
from fondant.schema import validate_partition_number

if is_kfp_available():
from kubernetes import client as k8s_client

logger = logging.getLogger(__name__)


Expand All @@ -35,17 +31,12 @@ class ComponentOp:
number_of_gpus: The number of gpus to assign to the operation
node_pool_label: The label of the node pool to which the operation will be assigned.
node_pool_name: The name of the node pool to which the operation will be assigned.
p_volumes: Collection of persistent volumes in a Kubernetes cluster. Keys are mount paths,
values are Kubernetes volumes or inherited types(e.g. PipelineVolumes).
ephemeral_storage_size: Used ephemeral-storage size (minimum) for the operation.
Defined by string which can be a number or a number followed by one of “E”, “P”, “T”, “G”,
“M”, “K”. (e.g. 2T for 2 Terabytes)

Note:
- A Fondant Component operation is created by defining a Fondant Component and its input
arguments.
- The `number_of_gpus`, `node_pool_label`, `node_pool_name`,`p_volumes` and
`ephemeral_storage_size` attributes are optional and can be used to specify additional
- The `number_of_gpus`, `node_pool_label`, `node_pool_name`
attributes are optional and can be used to specify additional
configurations for the operation. More information on the optional attributes that can
be assigned to kfp components here:
https://kubeflow-pipelines.readthedocs.io/en/1.8.13/source/kfp.dsl.html
Expand All @@ -62,8 +53,6 @@ def __init__(
number_of_gpus: t.Optional[int] = None,
node_pool_label: t.Optional[str] = None,
node_pool_name: t.Optional[str] = None,
p_volumes: t.Optional[t.Dict[str, k8s_client.V1Volume]] = None,
ephemeral_storage_size: t.Optional[str] = None,
) -> None:
self.component_dir = Path(component_dir)
self.input_partition_rows = input_partition_rows
Expand All @@ -79,8 +68,6 @@ def __init__(
node_pool_label,
node_pool_name,
)
self.p_volumes = p_volumes
self.ephemeral_storage_size = ephemeral_storage_size

def _set_arguments(
self,
Expand Down Expand Up @@ -125,8 +112,6 @@ def from_registry(
number_of_gpus: t.Optional[int] = None,
node_pool_label: t.Optional[str] = None,
node_pool_name: t.Optional[str] = None,
p_volumes: t.Optional[t.Dict[str, k8s_client.V1Volume]] = None,
ephemeral_storage_size: t.Optional[str] = None,
) -> "ComponentOp":
"""Load a reusable component by its name.

Expand All @@ -138,11 +123,6 @@ def from_registry(
number_of_gpus: The number of gpus to assign to the operation
node_pool_label: The label of the node pool to which the operation will be assigned.
node_pool_name: The name of the node pool to which the operation will be assigned.
p_volumes: Collection of persistent volumes in a Kubernetes cluster. Keys are mount
paths, values are Kubernetes volumes or inherited types(e.g. PipelineVolumes).
ephemeral_storage_size: Used ephemeral-storage request (minimum) for the operation.
Defined by string which can be a number or a number followed by one of “E”, “P”,
“T”, “G”, “M”, “K”. (e.g. 2T for 2 Terabytes)
"""
components_dir: Path = t.cast(Path, files("fondant") / f"components/{name}")

Expand All @@ -157,8 +137,6 @@ def from_registry(
number_of_gpus=number_of_gpus,
node_pool_label=node_pool_label,
node_pool_name=node_pool_name,
p_volumes=p_volumes,
ephemeral_storage_size=ephemeral_storage_size,
)


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,6 @@ spec:
resources:
limits:
nvidia.com/gpu: 1
requests:
ephemeral-storage: 1Gi
volumeMounts:
- mountPath: /mnt
name: mypvc
inputs:
artifacts:
- name: input_manifest_path
Expand Down Expand Up @@ -88,9 +83,6 @@ spec:
artifacts:
- name: first-component-output_manifest_path
path: /tmp/outputs/output_manifest_path/data
volumes:
- emptyDir: {}
name: mypvc
- dag:
tasks:
- name: first-component
Expand Down
4 changes: 0 additions & 4 deletions tests/test_compiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -201,8 +201,6 @@ def test_kubeflow_compiler(setup_pipeline, tmp_path_factory):
@pytest.mark.usefixtures("_freeze_time")
def test_kubeflow_configuration(tmp_path_factory):
"""Test that the kubeflow pipeline can be configured."""
from kfp.dsl import PipelineVolume

pipeline = Pipeline(
pipeline_name="test_pipeline",
pipeline_description="description of the test pipeline",
Expand All @@ -214,8 +212,6 @@ def test_kubeflow_configuration(tmp_path_factory):
node_pool_name="a_node_pool",
node_pool_label="a_node_pool_label",
number_of_gpus=1,
p_volumes={"/mnt": PipelineVolume(name="mypvc", empty_dir={})},
ephemeral_storage_size="1Gi",
)
pipeline.add_op(component_1)
compiler = KubeFlowCompiler()
Expand Down