Skip to content

Commit

Permalink
Feature/kfp support for lightweight components (#803)
Browse files Browse the repository at this point in the history
Tested successfully on vertex

TODO:
- test on kfp
- add compile tests that have lightweight components
- investigate duplicate data dir's
  • Loading branch information
GeorgesLorre authored Jan 23, 2024
1 parent c9b125d commit f5d5d4c
Show file tree
Hide file tree
Showing 5 changed files with 220 additions and 149 deletions.
108 changes: 0 additions & 108 deletions src/fondant/core/component_spec.py
Original file line number Diff line number Diff line change
Expand Up @@ -329,10 +329,6 @@ def default_arguments(self) -> t.Dict[str, Argument]:
),
}

@property
def kubeflow_specification(self) -> "KubeflowComponentSpec":
return KubeflowComponentSpec.from_fondant_component_spec(self)

def __repr__(self) -> str:
return f"{self.__class__.__name__}({self._specification!r})"

Expand Down Expand Up @@ -566,107 +562,3 @@ def __eq__(self, other: object) -> bool:
return False

return True


class KubeflowComponentSpec:
"""
Class representing a Kubeflow component specification.
Args:
specification: The kubeflow component specification as a Python dict
"""

def __init__(self, specification: t.Dict[str, t.Any]) -> None:
self._specification = specification

@staticmethod
def convert_arguments(fondant_component: ComponentSpec):
args = {}
for arg in fondant_component.args.values():
arg_type_dict = {}

# Enable isOptional attribute in spec if arg is Optional and defaults to None
if arg.optional and arg.default is None:
arg_type_dict["isOptional"] = True
if arg.default is not None:
arg_type_dict["defaultValue"] = arg.default

args[arg.name] = {
"parameterType": arg.kubeflow_type,
"description": arg.description,
**arg_type_dict, # type: ignore
}

return args

@classmethod
def from_fondant_component_spec(cls, fondant_component: ComponentSpec):
"""Generate a Kubeflow component spec from a Fondant component spec."""
input_definitions = {
"parameters": {
**cls.convert_arguments(fondant_component),
},
}

cleaned_component_name = fondant_component.sanitized_component_name

specification = {
"components": {
"comp-"
+ cleaned_component_name: {
"executorLabel": "exec-" + cleaned_component_name,
"inputDefinitions": input_definitions,
},
},
"deploymentSpec": {
"executors": {
"exec-"
+ cleaned_component_name: {
"container": {
"command": ["fondant", "execute", "main"],
"image": fondant_component.image,
},
},
},
},
"pipelineInfo": {"name": cleaned_component_name},
"root": {
"dag": {
"tasks": {
cleaned_component_name: {
"cachingOptions": {"enableCache": True},
"componentRef": {"name": "comp-" + cleaned_component_name},
"inputs": {
"parameters": {
param: {"componentInputParameter": param}
for param in input_definitions["parameters"]
},
},
"taskInfo": {"name": cleaned_component_name},
},
},
},
"inputDefinitions": input_definitions,
},
"schemaVersion": "2.1.0",
"sdkVersion": "kfp-2.0.1",
}
return cls(specification)

def to_file(self, path: t.Union[str, Path]) -> None:
"""Dump the component specification to the file specified by the provided path."""
with open(path, "w", encoding="utf-8") as file_:
yaml.dump(
self._specification,
file_,
indent=4,
default_flow_style=False,
sort_keys=False,
)

def to_string(self) -> str:
"""Return the component specification as a string."""
return json.dumps(self._specification)

def __repr__(self) -> str:
return f"{self.__class__.__name__}({self._specification!r})"
120 changes: 119 additions & 1 deletion src/fondant/pipeline/compiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

import yaml

from fondant.core.component_spec import ComponentSpec
from fondant.core.exceptions import InvalidPipelineDefinition
from fondant.core.manifest import Metadata
from fondant.core.schema import CloudCredentialsMount, DockerVolume
Expand Down Expand Up @@ -307,6 +308,115 @@ def _set_configuration(self, services, fondant_component_operation, component_na
return services


class KubeflowComponentSpec:
"""
Class representing a Kubeflow component specification.
Args:
specification: The kubeflow component specification as a Python dict
"""

def __init__(self, specification: t.Dict[str, t.Any]) -> None:
self._specification = specification

@staticmethod
def convert_arguments(fondant_component: ComponentSpec):
args = {}
for arg in fondant_component.args.values():
arg_type_dict = {}

# Enable isOptional attribute in spec if arg is Optional and defaults to None
if arg.optional and arg.default is None:
arg_type_dict["isOptional"] = True
if arg.default is not None:
arg_type_dict["defaultValue"] = arg.default

args[arg.name] = {
"parameterType": arg.kubeflow_type,
"description": arg.description,
**arg_type_dict, # type: ignore
}

return args

@classmethod
def from_fondant_component_spec(
cls,
fondant_component: ComponentSpec,
command: t.List[str],
image_uri: str,
):
"""Generate a Kubeflow component spec from a Fondant component spec."""
input_definitions = {
"parameters": {
**cls.convert_arguments(fondant_component),
},
}

cleaned_component_name = fondant_component.sanitized_component_name

specification = {
"components": {
"comp-"
+ cleaned_component_name: {
"executorLabel": "exec-" + cleaned_component_name,
"inputDefinitions": input_definitions,
},
},
"deploymentSpec": {
"executors": {
"exec-"
+ cleaned_component_name: {
"container": {
"command": command,
"image": image_uri,
},
},
},
},
"pipelineInfo": {"name": cleaned_component_name},
"root": {
"dag": {
"tasks": {
cleaned_component_name: {
"cachingOptions": {"enableCache": True},
"componentRef": {"name": "comp-" + cleaned_component_name},
"inputs": {
"parameters": {
param: {"componentInputParameter": param}
for param in input_definitions["parameters"]
},
},
"taskInfo": {"name": cleaned_component_name},
},
},
},
"inputDefinitions": input_definitions,
},
"schemaVersion": "2.1.0",
"sdkVersion": "kfp-2.0.1",
}
return cls(specification)

def to_file(self, path: t.Union[str, Path]) -> None:
"""Dump the component specification to the file specified by the provided path."""
with open(path, "w", encoding="utf-8") as file_:
yaml.dump(
self._specification,
file_,
indent=4,
default_flow_style=False,
sort_keys=False,
)

def to_string(self) -> str:
"""Return the component specification as a string."""
return json.dumps(self._specification)

def __repr__(self) -> str:
return f"{self.__class__.__name__}({self._specification!r})"


class KubeFlowCompiler(Compiler):
"""Compiler that creates a Kubeflow pipeline spec from a pipeline."""

Expand Down Expand Up @@ -378,8 +488,16 @@ def kfp_pipeline():

component_op = component["operation"]
# convert ComponentOp to Kubeflow component
command = self._build_entrypoint(component_op.image)
image_uri = component_op.image.base_image
kubeflow_spec = KubeflowComponentSpec.from_fondant_component_spec(
component_op.component_spec,
command=command,
image_uri=image_uri,
)

kubeflow_component_op = self.kfp.components.load_component_from_text(
text=component_op.component_spec.kubeflow_specification.to_string(),
text=kubeflow_spec.to_string(),
)

# Remove None values from arguments
Expand Down
2 changes: 2 additions & 0 deletions src/fondant/testing.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ class ComponentConfigs:
"""

image: t.Optional[str] = None
command: t.Optional[t.List[str]] = None
arguments: t.Optional[t.Dict[str, t.Any]] = None
dependencies: t.Optional[t.List[str]] = None
accelerators: t.Optional[t.List[Accelerator]] = None
Expand Down Expand Up @@ -249,6 +250,7 @@ def from_spec(cls, spec_path: str) -> "KubeflowPipelineConfigs":

component_config = KubeflowComponentConfig(
image=container_spec.get("image"),
command=container_spec.get("command"),
arguments=arguments,
dependencies=dependencies,
accelerators=accelerator_list,
Expand Down
41 changes: 1 addition & 40 deletions tests/core/test_component_specs.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,7 @@
import pyarrow as pa
import pytest
import yaml
from fondant.core.component_spec import (
ComponentSpec,
KubeflowComponentSpec,
OperationSpec,
)
from fondant.core.component_spec import ComponentSpec, OperationSpec
from fondant.core.exceptions import InvalidComponentSpec
from fondant.core.schema import Type

Expand All @@ -30,12 +26,6 @@ def valid_fondant_schema_no_args() -> dict:
return yaml.safe_load(f)


@pytest.fixture()
def valid_kubeflow_schema() -> dict:
with open(component_specs_path / "kubeflow_component.yaml") as f:
return yaml.safe_load(f)


@pytest.fixture()
def invalid_fondant_schema() -> dict:
with open(component_specs_path / "invalid_component.yaml") as f:
Expand Down Expand Up @@ -97,13 +87,6 @@ def test_attribute_access(valid_fondant_schema):
)


def test_kfp_component_creation(valid_fondant_schema, valid_kubeflow_schema):
"""Test that the created kubeflow component matches the expected kubeflow component."""
fondant_component = ComponentSpec.from_dict(valid_fondant_schema)
kubeflow_component = fondant_component.kubeflow_specification
assert kubeflow_component._specification == valid_kubeflow_schema


def test_component_spec_no_args(valid_fondant_schema_no_args):
"""Test that a component spec without args is supported."""
fondant_component = ComponentSpec.from_dict(valid_fondant_schema_no_args)
Expand All @@ -128,35 +111,13 @@ def test_component_spec_to_file(valid_fondant_schema):
assert written_data == valid_fondant_schema


def test_kubeflow_component_spec_to_file(valid_kubeflow_schema):
"""Test that the KubeflowComponentSpec can be written to a file."""
kubeflow_component_spec = KubeflowComponentSpec(valid_kubeflow_schema)

with tempfile.TemporaryDirectory() as temp_dir:
file_path = os.path.join(temp_dir, "kubeflow_component_spec.yaml")
kubeflow_component_spec.to_file(file_path)

with open(file_path) as f:
written_data = yaml.safe_load(f)

# check if the written data is the same as the original data
assert written_data == valid_kubeflow_schema


def test_component_spec_repr(valid_fondant_schema):
"""Test that the __repr__ method of ComponentSpec returns the expected string."""
fondant_component = ComponentSpec.from_dict(valid_fondant_schema)
expected_repr = f"ComponentSpec({valid_fondant_schema!r})"
assert repr(fondant_component) == expected_repr


def test_kubeflow_component_spec_repr(valid_kubeflow_schema):
"""Test that the __repr__ method of KubeflowComponentSpec returns the expected string."""
kubeflow_component_spec = KubeflowComponentSpec(valid_kubeflow_schema)
expected_repr = f"KubeflowComponentSpec({valid_kubeflow_schema!r})"
assert repr(kubeflow_component_spec) == expected_repr


def test_component_spec_generic_consumes(valid_fondant_schema_generic_consumes):
"""Test that a component spec with generic consumes is detected."""
component_spec = ComponentSpec.from_dict(valid_fondant_schema_generic_consumes)
Expand Down
Loading

0 comments on commit f5d5d4c

Please sign in to comment.