Skip to content

Commit

Permalink
Add disable caching argument (#320)
Browse files Browse the repository at this point in the history
Related to #313 

Update [22/08]

This PR has been updated to add an argument at the componentOp level to
disable caching. Previous implementation relied on passing an
`execute_component` since we were estimating whether a component is
cached or not at compile time.

A component will only execute if `disable_caching` is not enabled and if
previous matching executions are found (this will be implemented in a
later PR)
  • Loading branch information
PhilippeMoussalli authored Aug 24, 2023
1 parent a44f955 commit 81cd69f
Show file tree
Hide file tree
Showing 12 changed files with 253 additions and 144 deletions.
2 changes: 1 addition & 1 deletion data_explorer/requirements.txt
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
git+https://github.com/ml6team/fondant@main
git+https://github.com/ml6team/fondant@71214bb9de30cd4302d09af97acf117d4b0e9231
streamlit==1.23.1
streamlit-aggrid==0.3.4
matplotlib==3.7.1
Expand Down
1 change: 1 addition & 0 deletions src/fondant/compiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,7 @@ def kfp_pipeline():
logger.info(f"Compiling service for {component_name}")

component_op = component["fondant_component_op"]

# convert ComponentOp to Kubeflow component
kubeflow_component_op = self.kfp.components.load_component(
text=component_op.component_spec.kubeflow_specification.to_string(),
Expand Down
8 changes: 8 additions & 0 deletions src/fondant/component_spec.py
Original file line number Diff line number Diff line change
Expand Up @@ -263,6 +263,12 @@ def from_fondant_component_spec(
"type": "String",
"default": "None",
},
{
"name": "cache",
"description": "Set to False to disable caching, True by default.",
"type": "Boolean",
"default": "True",
},
*(
{
"name": arg.name,
Expand Down Expand Up @@ -295,6 +301,8 @@ def from_fondant_component_spec(
{"inputValue": "component_spec"},
"--input_partition_rows",
{"inputValue": "input_partition_rows"},
"--cache",
{"inputValue": "cache"},
*cls._dump_args(fondant_component.args.values()),
"--output_manifest_path",
{"outputPath": "output_manifest_path"},
Expand Down
47 changes: 41 additions & 6 deletions src/fondant/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
"""

import argparse
import ast
import json
import logging
import typing as t
Expand Down Expand Up @@ -37,13 +38,15 @@ def __init__(
self,
spec: ComponentSpec,
*,
cache: bool,
input_manifest_path: t.Union[str, Path],
output_manifest_path: t.Union[str, Path],
metadata: t.Dict[str, t.Any],
user_arguments: t.Dict[str, t.Any],
input_partition_rows: t.Optional[t.Union[str, int]] = None,
) -> None:
self.spec = spec
self.cache = cache
self.input_manifest_path = input_manifest_path
self.output_manifest_path = output_manifest_path
self.metadata = Metadata.from_dict(metadata)
Expand All @@ -55,6 +58,7 @@ def from_args(cls) -> "Executor":
"""Create an executor from a passed argument containing the specification as a dict."""
parser = argparse.ArgumentParser()
parser.add_argument("--component_spec", type=json.loads)
parser.add_argument("--cache", type=ast.literal_eval)
parser.add_argument("--input_partition_rows", type=validate_partition_number)
args, _ = parser.parse_known_args()

Expand All @@ -64,16 +68,20 @@ def from_args(cls) -> "Executor":

component_spec = ComponentSpec(args.component_spec)
input_partition_rows = args.input_partition_rows
cache = args.cache

return cls.from_spec(
component_spec,
input_partition_rows,
cache=cache,
input_partition_rows=input_partition_rows,
)

@classmethod
def from_spec(
cls,
component_spec: ComponentSpec,
*,
cache: bool,
input_partition_rows: t.Optional[t.Union[str, int]],
) -> "Executor":
"""Create an executor from a component spec."""
Expand All @@ -85,6 +93,9 @@ def from_spec(
if "input_partition_rows" in args_dict:
args_dict.pop("input_partition_rows")

if "cache" in args_dict:
args_dict.pop("cache")

input_manifest_path = args_dict.pop("input_manifest_path")
output_manifest_path = args_dict.pop("output_manifest_path")
metadata = args_dict.pop("metadata")
Expand All @@ -94,6 +105,7 @@ def from_spec(
component_spec,
input_manifest_path=input_manifest_path,
output_manifest_path=output_manifest_path,
cache=cache,
metadata=metadata,
user_arguments=args_dict,
input_partition_rows=input_partition_rows,
Expand Down Expand Up @@ -177,20 +189,43 @@ def _write_data(self, dataframe: dd.DataFrame, *, manifest: Manifest):

data_writer.write_dataframe(dataframe)

def _load_cached_output_manifest(self) -> "Manifest":
"""Function that returns the cached output manifest."""
raise NotImplementedError

def _has_matching_execution(self) -> bool:
"""Function that checks if there is an existing previous matching execution."""
# TODO: implement
return False

def execute(self, component_cls: t.Type[Component]) -> None:
"""Execute a component.
Args:
component_cls: The class of the component to execute
"""
input_manifest = self._load_or_create_manifest()
matching_execution_exists = self._has_matching_execution()

component = component_cls(self.spec, **self.user_arguments)
output_df = self._execute_component(component, manifest=input_manifest)
if matching_execution_exists:
logger.info("Previous matching execution found")
else:
logger.info("No previous matching execution found")

output_manifest = input_manifest.evolve(component_spec=self.spec)
if self.cache:
logger.info("Caching for the component is disabled")
else:
logger.info("Caching for the component is enabled")

self._write_data(dataframe=output_df, manifest=output_manifest)
if self.cache is False and matching_execution_exists:
logging.info("Cached component run. Skipping component execution")
output_manifest = self._load_cached_output_manifest()
else:
logging.info("Executing component")
input_manifest = self._load_or_create_manifest()
component = component_cls(self.spec, **self.user_arguments)
output_df = self._execute_component(component, manifest=input_manifest)
output_manifest = input_manifest.evolve(component_spec=self.spec)
self._write_data(dataframe=output_df, manifest=output_manifest)

self.upload_manifest(output_manifest, save_path=self.output_manifest_path)

Expand Down
17 changes: 15 additions & 2 deletions src/fondant/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ class ComponentOp:
number_of_gpus: The number of gpus to assign to the operation
node_pool_label: The label of the node pool to which the operation will be assigned.
node_pool_name: The name of the node pool to which the operation will be assigned.
cache: Set to False to disable caching, True by default.
Note:
- A Fondant Component operation is created by defining a Fondant Component and its input
Expand All @@ -56,14 +57,17 @@ def __init__(
number_of_gpus: t.Optional[int] = None,
node_pool_label: t.Optional[str] = None,
node_pool_name: t.Optional[str] = None,
cache: t.Optional[bool] = True,
) -> None:
self.component_dir = Path(component_dir)
self.input_partition_rows = input_partition_rows
self.cache = cache
self.arguments = self._set_arguments(arguments)

self.component_spec = ComponentSpec.from_file(
self.component_dir / self.COMPONENT_SPEC_NAME,
)
self.name = self.component_spec.name.replace(" ", "_").lower()
self.input_partition_rows = input_partition_rows
self.arguments = self._set_arguments(arguments)

self.arguments.setdefault("component_spec", self.component_spec.specification)

Expand All @@ -85,6 +89,7 @@ def _set_arguments(
input_partition_rows = validate_partition_number(self.input_partition_rows)

arguments["input_partition_rows"] = str(input_partition_rows)
arguments["cache"] = str(self.cache)

return arguments

Expand Down Expand Up @@ -116,6 +121,7 @@ def from_registry(
number_of_gpus: t.Optional[int] = None,
node_pool_label: t.Optional[str] = None,
node_pool_name: t.Optional[str] = None,
cache: t.Optional[bool] = True,
) -> "ComponentOp":
"""Load a reusable component by its name.
Expand All @@ -127,6 +133,7 @@ def from_registry(
number_of_gpus: The number of gpus to assign to the operation
node_pool_label: The label of the node pool to which the operation will be assigned.
node_pool_name: The name of the node pool to which the operation will be assigned.
cache: Set to False to disable caching, True by default.
"""
components_dir: Path = t.cast(Path, files("fondant") / f"components/{name}")

Expand All @@ -141,6 +148,7 @@ def from_registry(
number_of_gpus=number_of_gpus,
node_pool_label=node_pool_label,
node_pool_name=node_pool_name,
cache=cache,
)

def get_component_cache_key(self) -> str:
Expand Down Expand Up @@ -324,6 +332,7 @@ def _validate_pipeline_definition(self, run_id: str):
for operation_specs in self._graph.values():
fondant_component_op = operation_specs["fondant_component_op"]
component_spec = fondant_component_op.component_spec

if not load_component:
# Check subset exists
for (
Expand Down Expand Up @@ -375,3 +384,7 @@ def _validate_pipeline_definition(self, run_id: str):
load_component = False

logger.info("All pipeline component specifications match.")

def __repr__(self) -> str:
"""Return a string representation of the FondantPipeline object."""
return f"{self.__class__.__name__}({self._graph!r}"
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ services:
- a dummy string arg
- --input_partition_rows
- disable
- --cache
- 'True'
- --component_spec
- '{"name": "First component", "description": "This is an example component",
"image": "example_component:latest", "produces": {"images": {"fields": {"data":
Expand Down Expand Up @@ -43,6 +45,8 @@ services:
- a dummy string arg
- --input_partition_rows
- '10'
- --cache
- 'True'
- --component_spec
- '{"name": "Second component", "description": "This is an example component",
"image": "example_component:latest", "consumes": {"images": {"fields": {"data":
Expand All @@ -69,6 +73,8 @@ services:
- a dummy string arg
- --input_partition_rows
- None
- --cache
- 'True'
- --component_spec
- '{"name": "Third component", "description": "This is an example component",
"image": "example_component:latest", "consumes": {"images": {"fields": {"data":
Expand Down
Loading

0 comments on commit 81cd69f

Please sign in to comment.