Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Move to datasets & apply interface #683

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions src/fondant/pipeline/compiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ def _generate_spec(
component_cache_key = None

for component_name, component in pipeline._graph.items():
component_op = component["fondant_component_op"]
component_op = component["operation"]

component_cache_key = component_op.get_component_cache_key(
previous_component_cache=component_cache_key,
Expand Down Expand Up @@ -347,7 +347,7 @@ def kfp_pipeline():
for component_name, component in pipeline._graph.items():
logger.info(f"Compiling service for {component_name}")

component_op = component["fondant_component_op"]
component_op = component["operation"]
# convert ComponentOp to Kubeflow component
kubeflow_component_op = self.kfp.components.load_component_from_text(
text=component_op.component_spec.kubeflow_specification.to_string(),
Expand Down Expand Up @@ -607,7 +607,7 @@ def compile(

with tempfile.TemporaryDirectory(dir=os.getcwd()) as tmpdirname:
for component_name, component in pipeline._graph.items():
component_op = component["fondant_component_op"]
component_op = component["operation"]
component_cache_key = component_op.get_component_cache_key(
previous_component_cache=component_cache_key,
)
Expand Down
261 changes: 181 additions & 80 deletions src/fondant/pipeline/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,8 @@ class ComponentOp:
is a representation of a function that will be executed as part of a pipeline.

Arguments:
component_dir: The path to the component directory.
name_or_path: The name of a reusable component, or the path to the directory containing a
custom component.
arguments: A dictionary containing the argument name and value for the operation.
input_partition_rows: The number of rows to load per partition. Set to override the
automatic partitioning
Expand All @@ -119,18 +120,13 @@ class ComponentOp:
Note:
- A Fondant Component operation is created by defining a Fondant Component and its input
arguments.
- The `accelerator_name`, `node_pool_label`, `node_pool_name`
attributes are optional and can be used to specify additional
configurations for the operation. More information on the optional attributes that can
be assigned to kfp components here:
https://kubeflow-pipelines.readthedocs.io/en/1.8.13/source/kfp.dsl.html
"""

COMPONENT_SPEC_NAME = "fondant_component.yaml"

def __init__(
self,
component_dir: t.Union[str, Path],
name_or_path: t.Union[str, Path],
*,
arguments: t.Optional[t.Dict[str, t.Any]] = None,
input_partition_rows: t.Optional[t.Union[str, int]] = None,
Expand All @@ -139,7 +135,11 @@ def __init__(
client_kwargs: t.Optional[dict] = None,
resources: t.Optional[Resources] = None,
) -> None:
self.component_dir = Path(component_dir)
if self._is_custom_component(name_or_path):
self.component_dir = Path(name_or_path)
else:
self.component_dir = self._get_registry_path(str(name_or_path))

self.input_partition_rows = input_partition_rows
self.component_spec = ComponentSpec.from_file(
self.component_dir / self.COMPONENT_SPEC_NAME,
Expand Down Expand Up @@ -210,45 +210,20 @@ def dockerfile_path(self) -> t.Optional[Path]:
path = self.component_dir / "Dockerfile"
return path if path.exists() else None

@classmethod
def from_registry(
cls,
name: str,
*,
arguments: t.Optional[t.Dict[str, t.Any]] = None,
input_partition_rows: t.Optional[t.Union[int, str]] = None,
resources: t.Optional[Resources] = None,
cache: t.Optional[bool] = True,
cluster_type: t.Optional[str] = "default",
client_kwargs: t.Optional[dict] = None,
) -> "ComponentOp":
"""Load a reusable component by its name.

Args:
name: Name of the component to load
arguments: A dictionary containing the argument name and value for the operation.
input_partition_rows: The number of rows to load per partition. Set to override the
automatic partitioning
resources: The resources to assign to the operation.
cache: Set to False to disable caching, True by default.
cluster_type: The type of cluster to use for distributed execution (default is "local").
client_kwargs: Keyword arguments used to initialise the dask client.
"""
components_dir: Path = t.cast(Path, files("fondant") / f"components/{name}")
@staticmethod
def _is_custom_component(path_or_name: t.Union[str, Path]) -> bool:
"""Checks if name is a local path and a custom component."""
component_dir: Path = Path(path_or_name)
return component_dir.exists() and component_dir.is_dir()

if not (components_dir.exists() and components_dir.is_dir()):
@staticmethod
def _get_registry_path(name: str) -> Path:
"""Checks if name is a local path and a custom component."""
component_dir: Path = t.cast(Path, files("fondant") / f"components/{name}")
if not (component_dir.exists() and component_dir.is_dir()):
msg = f"No reusable component with name {name} found."
raise ValueError(msg)

return ComponentOp(
components_dir,
arguments=arguments,
input_partition_rows=input_partition_rows,
resources=resources,
cache=cache,
cluster_type=cluster_type,
client_kwargs=client_kwargs,
)
return component_dir

def get_component_cache_key(
self,
Expand Down Expand Up @@ -302,64 +277,100 @@ class Pipeline:

def __init__(
self,
name: str,
*,
base_path: str,
pipeline_name: str,
pipeline_description: t.Optional[str] = None,
description: t.Optional[str] = None,
):
"""
Args:
name: The name of the pipeline.
base_path: The base path for the pipeline where the artifacts are stored.
pipeline_name: The name of the pipeline.
pipeline_description: Optional description of the pipeline.
description: Optional description of the pipeline.
"""
self.base_path = base_path
self.name = self._validate_pipeline_name(pipeline_name)
self.description = pipeline_description
self.package_path = f"{pipeline_name}.tgz"
self.name = self._validate_pipeline_name(name)
self.description = description
self.package_path = f"{name}.tgz"
self._graph: t.OrderedDict[str, t.Any] = OrderedDict()
self.task_without_dependencies_added = False

def add_op(
def _apply(
self,
task: ComponentOp,
dependencies: t.Optional[t.Union[ComponentOp, t.List[ComponentOp]]] = None,
):
operation: ComponentOp,
datasets: t.Optional[t.Union["Dataset", t.List["Dataset"]]] = None,
) -> "Dataset":
"""
Add a task to the pipeline with an optional dependency.
Apply an operation to the provided input datasets.

Args:
task: The task to add to the pipeline.
dependencies: Optional task dependencies that needs to be completed before the task
can run.
operation: The operation to apply.
datasets: The input datasets to apply the operation on
"""
if dependencies is None:
if self.task_without_dependencies_added:
msg = "At most one task can be defined without dependencies."
raise InvalidPipelineDefinition(
msg,
)
dependencies = []
self.task_without_dependencies_added = True
elif not isinstance(dependencies, list):
dependencies = [dependencies]
if datasets is None:
datasets = []
elif not isinstance(datasets, list):
datasets = [datasets]

if len(dependencies) > 1:
if len(datasets) > 1:
msg = (
f"Multiple component dependencies provided for component "
f"`{task.component_spec.name}`. The current version of Fondant can only handle "
f"components with a single dependency. Please note that the behavior of the "
f"pipeline may be unpredictable or incorrect."
f"Multiple input datasets provided for operation `{operation.name}`. The current "
f"version of Fondant can only handle components with a single input."
)
raise InvalidPipelineDefinition(
msg,
)

dependencies_names = [dependency.name for dependency in dependencies]

self._graph[task.name] = {
"fondant_component_op": task,
"dependencies": dependencies_names,
self._graph[operation.name] = {
"operation": operation,
"dependencies": [dataset.operation.name for dataset in datasets],
}
return Dataset(pipeline=self, operation=operation)

def read(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Aren't we missing the schema here? Same for the write function

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I left this out for now, will add this in a separate PR. This just changes the interface, but everything still works like before. At this stage, you still need to overwrite the component spec for generic components.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense, I can imagine it will require more fundamental changes so might be best indeed to leave it to a separate PR

self,
name_or_path: t.Union[str, Path],
*,
arguments: t.Optional[t.Dict[str, t.Any]] = None,
input_partition_rows: t.Optional[t.Union[int, str]] = None,
resources: t.Optional[Resources] = None,
cache: t.Optional[bool] = True,
cluster_type: t.Optional[str] = "default",
client_kwargs: t.Optional[dict] = None,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add the docstrings back here and to all other use facing methods?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Already started doing this locally 👍 Will push soon.

) -> "Dataset":
"""
Read data using the provided component.

Args:
name_or_path: The name of a reusable component, or the path to the directory containing
a custom component.
arguments: A dictionary containing the argument name and value for the operation.
input_partition_rows: The number of rows to load per partition. Set to override the
automatic partitioning
resources: The resources to assign to the operation.
cache: Set to False to disable caching, True by default.
cluster_type: The type of cluster to use for distributed execution (default is "local").
client_kwargs: Keyword arguments used to initialise the Dask client.

Returns:
An intermediate dataset.
"""
if self._graph:
msg = "For now, at most one read component can be applied per pipeline."
raise InvalidPipelineDefinition(
msg,
)

operation = ComponentOp(
name_or_path,
arguments=arguments,
input_partition_rows=input_partition_rows,
resources=resources,
cache=cache,
cluster_type=cluster_type,
client_kwargs=client_kwargs,
)
return self._apply(operation)

def sort_graph(self):
"""Sort the graph topologically based on task dependencies."""
Expand Down Expand Up @@ -437,7 +448,7 @@ def _validate_pipeline_definition(self, run_id: str):
cache_key="42",
)
for operation_specs in self._graph.values():
fondant_component_op = operation_specs["fondant_component_op"]
fondant_component_op = operation_specs["operation"]
component_spec = fondant_component_op.component_spec

if not load_component:
Expand Down Expand Up @@ -481,3 +492,93 @@ def _validate_pipeline_definition(self, run_id: str):
def __repr__(self) -> str:
"""Return a string representation of the FondantPipeline object."""
return f"{self.__class__.__name__}({self._graph!r}"


class Dataset:
def __init__(self, *, pipeline: Pipeline, operation: ComponentOp) -> None:
"""A class representing an intermediate dataset.

Args:
pipeline: The pipeline this dataset is a part of.
operation: The operation that created this dataset.
"""
self.pipeline = pipeline
self.operation = operation

def apply(
self,
name_or_path: t.Union[str, Path],
*,
arguments: t.Optional[t.Dict[str, t.Any]] = None,
input_partition_rows: t.Optional[t.Union[int, str]] = None,
resources: t.Optional[Resources] = None,
cache: t.Optional[bool] = True,
cluster_type: t.Optional[str] = "default",
client_kwargs: t.Optional[dict] = None,
) -> "Dataset":
"""
Apply the provided component on the dataset.

Args:
name_or_path: The name of a reusable component, or the path to the directory containing
a custom component.
arguments: A dictionary containing the argument name and value for the operation.
input_partition_rows: The number of rows to load per partition. Set to override the
automatic partitioning
resources: The resources to assign to the operation.
cache: Set to False to disable caching, True by default.
cluster_type: The type of cluster to use for distributed execution (default is "local").
client_kwargs: Keyword arguments used to initialise the Dask client.

Returns:
An intermediate dataset.
"""
operation = ComponentOp(
name_or_path,
arguments=arguments,
input_partition_rows=input_partition_rows,
resources=resources,
cache=cache,
cluster_type=cluster_type,
client_kwargs=client_kwargs,
)
return self.pipeline._apply(operation, self)

def write(
self,
name_or_path: t.Union[str, Path],
*,
arguments: t.Optional[t.Dict[str, t.Any]] = None,
input_partition_rows: t.Optional[t.Union[int, str]] = None,
resources: t.Optional[Resources] = None,
cache: t.Optional[bool] = True,
cluster_type: t.Optional[str] = "default",
client_kwargs: t.Optional[dict] = None,
) -> None:
"""
Write the dataset using the provided component.

Args:
name_or_path: The name of a reusable component, or the path to the directory containing
a custom component.
arguments: A dictionary containing the argument name and value for the operation.
input_partition_rows: The number of rows to load per partition. Set to override the
automatic partitioning
resources: The resources to assign to the operation.
cache: Set to False to disable caching, True by default.
cluster_type: The type of cluster to use for distributed execution (default is "local").
client_kwargs: Keyword arguments used to initialise the Dask client.

Returns:
An intermediate dataset.
"""
operation = ComponentOp(
name_or_path,
arguments=arguments,
input_partition_rows=input_partition_rows,
resources=resources,
cache=cache,
cluster_type=cluster_type,
client_kwargs=client_kwargs,
)
self.pipeline._apply(operation, self)
4 changes: 2 additions & 2 deletions tests/examples/example_modules/invalid_double_pipeline.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from fondant.pipeline import Pipeline

TEST_PIPELINE = Pipeline(pipeline_name="test_pipeline", base_path="some/path")
TEST_PIPELINE_2 = Pipeline(pipeline_name="test_pipeline", base_path="some/path")
TEST_PIPELINE = Pipeline(name="test_pipeline", base_path="some/path")
TEST_PIPELINE_2 = Pipeline(name="test_pipeline", base_path="some/path")
2 changes: 1 addition & 1 deletion tests/examples/example_modules/pipeline.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
from fondant.pipeline import Pipeline

pipeline = Pipeline(pipeline_name="test_pipeline", base_path="some/path")
pipeline = Pipeline(name="test_pipeline", base_path="some/path")
Loading
Loading