Skip to content

Commit

Permalink
Move to datasets & apply interface (#683)
Browse files Browse the repository at this point in the history
This PR is the first one of multiple PRs to replace #665. This PR only
focuses on implementing the new pipeline interface, without adding any
new functionality.

The new interface applies operations to intermediate datasets instead of
adding operations to a pipeline, as shown below. It's a superficial
change, since only the interface is changed. All underlying behavior is
still the same.

The new interface fits nicely with our data format design and we'll be
able to leverage it for interactive development in the future. We can
calculate the schema for each intermediate dataset so the user can
inspect it. Or with eager execution, we could execute a single operation
and allow the user to explore the data using the dataset.

I still need to update the README generation, but I'll do that as a
separate PR. It becomes a bit more complex since we now need to
discriminate between read, transform, and write components to generate
the example code.

**Old interface**
```Python
from fondant.pipeline import ComponentOp, Pipeline

pipeline = Pipeline(
    pipeline_name="my_pipeline",
    pipeline_description="description of my pipeline",
    base_path="/foo/bar",
)

load_op = ComponentOp(
    component_dir="load_data",
    arguments={...},
)

caption_op = ComponentOp.from_registry(
    name="caption_images",
    arguments={...},
)

embed_op = ComponentOp(
    component_dir="embed_text",
    arguments={...},
)

write_op = ComponentOp.from_registry(
    name="write_to_hf_hub",
    arguments={...},
)

pipeline.add_op(load_op)
pipeline.add_op(caption_op, dependencies=[load_op])
pipeline.add_op(embed_op, dependencies=[caption_op])
pipeline.add_op(write_op, dependencies=[embed_op])
```

**New interface**
```Python
pipeline = Pipeline(
    pipeline_name="my_pipeline",
    pipeline_description="description of my pipeline",
    base_path="/foo/bar",
)

dataset = pipeline.read(
    "load_data",
    arguments={...},
)
dataset = dataset.apply(
    "caption_images",
    arguments={...},
)
dataset = dataset.apply(
    "embed_text",
    arguments={...},
)
dataset.write(
    "write_to_hf_hub",
    arguments={...},
)
  • Loading branch information
RobbeSneyders committed Dec 7, 2023
1 parent 0c96d7b commit 918d1a0
Show file tree
Hide file tree
Showing 10 changed files with 263 additions and 197 deletions.
6 changes: 3 additions & 3 deletions src/fondant/pipeline/compiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ def _generate_spec(
component_cache_key = None

for component_name, component in pipeline._graph.items():
component_op = component["fondant_component_op"]
component_op = component["operation"]

component_cache_key = component_op.get_component_cache_key(
previous_component_cache=component_cache_key,
Expand Down Expand Up @@ -347,7 +347,7 @@ def kfp_pipeline():
for component_name, component in pipeline._graph.items():
logger.info(f"Compiling service for {component_name}")

component_op = component["fondant_component_op"]
component_op = component["operation"]
# convert ComponentOp to Kubeflow component
kubeflow_component_op = self.kfp.components.load_component_from_text(
text=component_op.component_spec.kubeflow_specification.to_string(),
Expand Down Expand Up @@ -673,7 +673,7 @@ def compile(

with tempfile.TemporaryDirectory(dir=os.getcwd()) as tmpdirname:
for component_name, component in pipeline._graph.items():
component_op = component["fondant_component_op"]
component_op = component["operation"]
component_cache_key = component_op.get_component_cache_key(
previous_component_cache=component_cache_key,
)
Expand Down
261 changes: 181 additions & 80 deletions src/fondant/pipeline/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,8 @@ class ComponentOp:
is a representation of a function that will be executed as part of a pipeline.
Arguments:
component_dir: The path to the component directory.
name_or_path: The name of a reusable component, or the path to the directory containing a
custom component.
arguments: A dictionary containing the argument name and value for the operation.
input_partition_rows: The number of rows to load per partition. Set to override the
automatic partitioning
Expand All @@ -119,18 +120,13 @@ class ComponentOp:
Note:
- A Fondant Component operation is created by defining a Fondant Component and its input
arguments.
- The `accelerator_name`, `node_pool_label`, `node_pool_name`
attributes are optional and can be used to specify additional
configurations for the operation. More information on the optional attributes that can
be assigned to kfp components here:
https://kubeflow-pipelines.readthedocs.io/en/1.8.13/source/kfp.dsl.html
"""

COMPONENT_SPEC_NAME = "fondant_component.yaml"

def __init__(
self,
component_dir: t.Union[str, Path],
name_or_path: t.Union[str, Path],
*,
arguments: t.Optional[t.Dict[str, t.Any]] = None,
input_partition_rows: t.Optional[t.Union[str, int]] = None,
Expand All @@ -139,7 +135,11 @@ def __init__(
client_kwargs: t.Optional[dict] = None,
resources: t.Optional[Resources] = None,
) -> None:
self.component_dir = Path(component_dir)
if self._is_custom_component(name_or_path):
self.component_dir = Path(name_or_path)
else:
self.component_dir = self._get_registry_path(str(name_or_path))

self.input_partition_rows = input_partition_rows
self.component_spec = ComponentSpec.from_file(
self.component_dir / self.COMPONENT_SPEC_NAME,
Expand Down Expand Up @@ -210,45 +210,20 @@ def dockerfile_path(self) -> t.Optional[Path]:
path = self.component_dir / "Dockerfile"
return path if path.exists() else None

@classmethod
def from_registry(
cls,
name: str,
*,
arguments: t.Optional[t.Dict[str, t.Any]] = None,
input_partition_rows: t.Optional[t.Union[int, str]] = None,
resources: t.Optional[Resources] = None,
cache: t.Optional[bool] = True,
cluster_type: t.Optional[str] = "default",
client_kwargs: t.Optional[dict] = None,
) -> "ComponentOp":
"""Load a reusable component by its name.
Args:
name: Name of the component to load
arguments: A dictionary containing the argument name and value for the operation.
input_partition_rows: The number of rows to load per partition. Set to override the
automatic partitioning
resources: The resources to assign to the operation.
cache: Set to False to disable caching, True by default.
cluster_type: The type of cluster to use for distributed execution (default is "local").
client_kwargs: Keyword arguments used to initialise the dask client.
"""
components_dir: Path = t.cast(Path, files("fondant") / f"components/{name}")
@staticmethod
def _is_custom_component(path_or_name: t.Union[str, Path]) -> bool:
"""Checks if name is a local path and a custom component."""
component_dir: Path = Path(path_or_name)
return component_dir.exists() and component_dir.is_dir()

if not (components_dir.exists() and components_dir.is_dir()):
@staticmethod
def _get_registry_path(name: str) -> Path:
"""Checks if name is a local path and a custom component."""
component_dir: Path = t.cast(Path, files("fondant") / f"components/{name}")
if not (component_dir.exists() and component_dir.is_dir()):
msg = f"No reusable component with name {name} found."
raise ValueError(msg)

return ComponentOp(
components_dir,
arguments=arguments,
input_partition_rows=input_partition_rows,
resources=resources,
cache=cache,
cluster_type=cluster_type,
client_kwargs=client_kwargs,
)
return component_dir

def get_component_cache_key(
self,
Expand Down Expand Up @@ -302,64 +277,100 @@ class Pipeline:

def __init__(
self,
name: str,
*,
base_path: str,
pipeline_name: str,
pipeline_description: t.Optional[str] = None,
description: t.Optional[str] = None,
):
"""
Args:
name: The name of the pipeline.
base_path: The base path for the pipeline where the artifacts are stored.
pipeline_name: The name of the pipeline.
pipeline_description: Optional description of the pipeline.
description: Optional description of the pipeline.
"""
self.base_path = base_path
self.name = self._validate_pipeline_name(pipeline_name)
self.description = pipeline_description
self.package_path = f"{pipeline_name}.tgz"
self.name = self._validate_pipeline_name(name)
self.description = description
self.package_path = f"{name}.tgz"
self._graph: t.OrderedDict[str, t.Any] = OrderedDict()
self.task_without_dependencies_added = False

def add_op(
def _apply(
self,
task: ComponentOp,
dependencies: t.Optional[t.Union[ComponentOp, t.List[ComponentOp]]] = None,
):
operation: ComponentOp,
datasets: t.Optional[t.Union["Dataset", t.List["Dataset"]]] = None,
) -> "Dataset":
"""
Add a task to the pipeline with an optional dependency.
Apply an operation to the provided input datasets.
Args:
task: The task to add to the pipeline.
dependencies: Optional task dependencies that needs to be completed before the task
can run.
operation: The operation to apply.
datasets: The input datasets to apply the operation on
"""
if dependencies is None:
if self.task_without_dependencies_added:
msg = "At most one task can be defined without dependencies."
raise InvalidPipelineDefinition(
msg,
)
dependencies = []
self.task_without_dependencies_added = True
elif not isinstance(dependencies, list):
dependencies = [dependencies]
if datasets is None:
datasets = []
elif not isinstance(datasets, list):
datasets = [datasets]

if len(dependencies) > 1:
if len(datasets) > 1:
msg = (
f"Multiple component dependencies provided for component "
f"`{task.component_spec.name}`. The current version of Fondant can only handle "
f"components with a single dependency. Please note that the behavior of the "
f"pipeline may be unpredictable or incorrect."
f"Multiple input datasets provided for operation `{operation.name}`. The current "
f"version of Fondant can only handle components with a single input."
)
raise InvalidPipelineDefinition(
msg,
)

dependencies_names = [dependency.name for dependency in dependencies]

self._graph[task.name] = {
"fondant_component_op": task,
"dependencies": dependencies_names,
self._graph[operation.name] = {
"operation": operation,
"dependencies": [dataset.operation.name for dataset in datasets],
}
return Dataset(pipeline=self, operation=operation)

def read(
self,
name_or_path: t.Union[str, Path],
*,
arguments: t.Optional[t.Dict[str, t.Any]] = None,
input_partition_rows: t.Optional[t.Union[int, str]] = None,
resources: t.Optional[Resources] = None,
cache: t.Optional[bool] = True,
cluster_type: t.Optional[str] = "default",
client_kwargs: t.Optional[dict] = None,
) -> "Dataset":
"""
Read data using the provided component.
Args:
name_or_path: The name of a reusable component, or the path to the directory containing
a custom component.
arguments: A dictionary containing the argument name and value for the operation.
input_partition_rows: The number of rows to load per partition. Set to override the
automatic partitioning
resources: The resources to assign to the operation.
cache: Set to False to disable caching, True by default.
cluster_type: The type of cluster to use for distributed execution (default is "local").
client_kwargs: Keyword arguments used to initialise the Dask client.
Returns:
An intermediate dataset.
"""
if self._graph:
msg = "For now, at most one read component can be applied per pipeline."
raise InvalidPipelineDefinition(
msg,
)

operation = ComponentOp(
name_or_path,
arguments=arguments,
input_partition_rows=input_partition_rows,
resources=resources,
cache=cache,
cluster_type=cluster_type,
client_kwargs=client_kwargs,
)
return self._apply(operation)

def sort_graph(self):
"""Sort the graph topologically based on task dependencies."""
Expand Down Expand Up @@ -437,7 +448,7 @@ def _validate_pipeline_definition(self, run_id: str):
cache_key="42",
)
for operation_specs in self._graph.values():
fondant_component_op = operation_specs["fondant_component_op"]
fondant_component_op = operation_specs["operation"]
component_spec = fondant_component_op.component_spec

if not load_component:
Expand Down Expand Up @@ -481,3 +492,93 @@ def _validate_pipeline_definition(self, run_id: str):
def __repr__(self) -> str:
"""Return a string representation of the FondantPipeline object."""
return f"{self.__class__.__name__}({self._graph!r}"


class Dataset:
def __init__(self, *, pipeline: Pipeline, operation: ComponentOp) -> None:
"""A class representing an intermediate dataset.
Args:
pipeline: The pipeline this dataset is a part of.
operation: The operation that created this dataset.
"""
self.pipeline = pipeline
self.operation = operation

def apply(
self,
name_or_path: t.Union[str, Path],
*,
arguments: t.Optional[t.Dict[str, t.Any]] = None,
input_partition_rows: t.Optional[t.Union[int, str]] = None,
resources: t.Optional[Resources] = None,
cache: t.Optional[bool] = True,
cluster_type: t.Optional[str] = "default",
client_kwargs: t.Optional[dict] = None,
) -> "Dataset":
"""
Apply the provided component on the dataset.
Args:
name_or_path: The name of a reusable component, or the path to the directory containing
a custom component.
arguments: A dictionary containing the argument name and value for the operation.
input_partition_rows: The number of rows to load per partition. Set to override the
automatic partitioning
resources: The resources to assign to the operation.
cache: Set to False to disable caching, True by default.
cluster_type: The type of cluster to use for distributed execution (default is "local").
client_kwargs: Keyword arguments used to initialise the Dask client.
Returns:
An intermediate dataset.
"""
operation = ComponentOp(
name_or_path,
arguments=arguments,
input_partition_rows=input_partition_rows,
resources=resources,
cache=cache,
cluster_type=cluster_type,
client_kwargs=client_kwargs,
)
return self.pipeline._apply(operation, self)

def write(
self,
name_or_path: t.Union[str, Path],
*,
arguments: t.Optional[t.Dict[str, t.Any]] = None,
input_partition_rows: t.Optional[t.Union[int, str]] = None,
resources: t.Optional[Resources] = None,
cache: t.Optional[bool] = True,
cluster_type: t.Optional[str] = "default",
client_kwargs: t.Optional[dict] = None,
) -> None:
"""
Write the dataset using the provided component.
Args:
name_or_path: The name of a reusable component, or the path to the directory containing
a custom component.
arguments: A dictionary containing the argument name and value for the operation.
input_partition_rows: The number of rows to load per partition. Set to override the
automatic partitioning
resources: The resources to assign to the operation.
cache: Set to False to disable caching, True by default.
cluster_type: The type of cluster to use for distributed execution (default is "local").
client_kwargs: Keyword arguments used to initialise the Dask client.
Returns:
An intermediate dataset.
"""
operation = ComponentOp(
name_or_path,
arguments=arguments,
input_partition_rows=input_partition_rows,
resources=resources,
cache=cache,
cluster_type=cluster_type,
client_kwargs=client_kwargs,
)
self.pipeline._apply(operation, self)
4 changes: 2 additions & 2 deletions tests/examples/example_modules/invalid_double_pipeline.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from fondant.pipeline import Pipeline

TEST_PIPELINE = Pipeline(pipeline_name="test_pipeline", base_path="some/path")
TEST_PIPELINE_2 = Pipeline(pipeline_name="test_pipeline", base_path="some/path")
TEST_PIPELINE = Pipeline(name="test_pipeline", base_path="some/path")
TEST_PIPELINE_2 = Pipeline(name="test_pipeline", base_path="some/path")
4 changes: 2 additions & 2 deletions tests/examples/example_modules/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,11 @@


def create_pipeline_with_args(name):
return Pipeline(pipeline_name=name, base_path="some/path")
return Pipeline(name=name, base_path="some/path")


def create_pipeline():
return Pipeline(pipeline_name="test_pipeline", base_path="some/path")
return Pipeline(name="test_pipeline", base_path="some/path")


def not_implemented():
Expand Down
Loading

0 comments on commit 918d1a0

Please sign in to comment.