Skip to content

Commit

Permalink
Add overwrite_cache option the to calls of remote and local executions (
Browse files Browse the repository at this point in the history
#1375)

Signed-off-by: H. Furkan Vural <[email protected]>

Implemented cache overwrite feature is added on flytekit as well for the completeness. In order to support the cache eviction RFC, an overwrite parameter was added, indicating the data store should replace an existing artifact instead of creating a new one on local calls.
  • Loading branch information
hfurkanvural authored and eapolinario committed Feb 22, 2023
1 parent 567ed29 commit 33e05c2
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 0 deletions.
4 changes: 4 additions & 0 deletions flytekit/models/execution.py
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,7 @@ def __init__(
raw_output_data_config=None,
max_parallelism=None,
security_context: typing.Optional[security.SecurityContext] = None,
overwrite_cache: bool = None,
):
"""
:param flytekit.models.core.identifier.Identifier launch_plan: Launch plan unique identifier to execute
Expand All @@ -200,6 +201,7 @@ def __init__(
self._raw_output_data_config = raw_output_data_config
self._max_parallelism = max_parallelism
self._security_context = security_context
self.overwrite_cache = overwrite_cache

@property
def launch_plan(self):
Expand Down Expand Up @@ -283,6 +285,7 @@ def to_flyte_idl(self):
else None,
max_parallelism=self.max_parallelism,
security_context=self.security_context.to_flyte_idl() if self.security_context else None,
overwrite_cache=self.overwrite_cache,
)

@classmethod
Expand All @@ -306,6 +309,7 @@ def from_flyte_idl(cls, p):
security_context=security.SecurityContext.from_flyte_idl(p.security_context)
if p.security_context
else None,
overwrite_cache=p.overwrite_cache,
)


Expand Down
27 changes: 27 additions & 0 deletions flytekit/remote/remote.py
Original file line number Diff line number Diff line change
Expand Up @@ -744,6 +744,7 @@ def _execute(
options: typing.Optional[Options] = None,
wait: bool = False,
type_hints: typing.Optional[typing.Dict[str, typing.Type]] = None,
overwrite_cache: bool = None,
) -> FlyteWorkflowExecution:
"""Common method for execution across all entities.
Expand All @@ -755,6 +756,9 @@ def _execute(
:param wait: if True, waits for execution to complete
:param type_hints: map of python types to inputs so that the TypeEngine knows how to convert the input values
into Flyte Literals.
:param overwrite_cache: Allows for all cached values of a workflow and its tasks to be overwritten
for a single execution. If enabled, all calculations are performed even if cached results would
be available, overwriting the stored data once execution finishes successfully.
:returns: :class:`~flytekit.remote.workflow_execution.FlyteWorkflowExecution`
"""
execution_name = execution_name or "f" + uuid.uuid4().hex[:19]
Expand Down Expand Up @@ -810,6 +814,7 @@ def _execute(
"placeholder", # Admin replaces this from oidc token if auth is enabled.
0,
),
overwrite_cache=overwrite_cache,
notifications=notifications,
disable_all=options.disable_notifications,
labels=options.labels,
Expand Down Expand Up @@ -873,6 +878,7 @@ def execute(
options: typing.Optional[Options] = None,
wait: bool = False,
type_hints: typing.Optional[typing.Dict[str, typing.Type]] = None,
overwrite_cache: bool = None,
) -> FlyteWorkflowExecution:
"""
Execute a task, workflow, or launchplan, either something that's been declared locally, or a fetched entity.
Expand Down Expand Up @@ -906,6 +912,9 @@ def execute(
using the type engine, and then to ``type(v)``. Providing the correct Python types is particularly important
if the inputs are containers like lists or maps, or if the Python type is one of the more complex Flyte
provided classes (like a StructuredDataset that's annotated with columns).
:param overwrite_cache: Allows for all cached values of a workflow and its tasks to be overwritten
for a single execution. If enabled, all calculations are performed even if cached results would
be available, overwriting the stored data once execution finishes successfully.
.. note:
Expand All @@ -924,6 +933,7 @@ def execute(
options=options,
wait=wait,
type_hints=type_hints,
overwrite_cache=overwrite_cache,
)
if isinstance(entity, FlyteWorkflow):
return self.execute_remote_wf(
Expand All @@ -935,6 +945,7 @@ def execute(
options=options,
wait=wait,
type_hints=type_hints,
overwrite_cache=overwrite_cache,
)
if isinstance(entity, PythonTask):
return self.execute_local_task(
Expand All @@ -947,6 +958,7 @@ def execute(
execution_name=execution_name,
image_config=image_config,
wait=wait,
overwrite_cache=overwrite_cache,
)
if isinstance(entity, WorkflowBase):
return self.execute_local_workflow(
Expand All @@ -960,6 +972,7 @@ def execute(
image_config=image_config,
options=options,
wait=wait,
overwrite_cache=overwrite_cache,
)
if isinstance(entity, LaunchPlan):
return self.execute_local_launch_plan(
Expand All @@ -971,6 +984,7 @@ def execute(
execution_name=execution_name,
options=options,
wait=wait,
overwrite_cache=overwrite_cache,
)
raise NotImplementedError(f"entity type {type(entity)} not recognized for execution")

Expand All @@ -987,6 +1001,7 @@ def execute_remote_task_lp(
options: typing.Optional[Options] = None,
wait: bool = False,
type_hints: typing.Optional[typing.Dict[str, typing.Type]] = None,
overwrite_cache: bool = None,
) -> FlyteWorkflowExecution:
"""Execute a FlyteTask, or FlyteLaunchplan.
Expand All @@ -1001,6 +1016,7 @@ def execute_remote_task_lp(
wait=wait,
options=options,
type_hints=type_hints,
overwrite_cache=overwrite_cache,
)

def execute_remote_wf(
Expand All @@ -1013,6 +1029,7 @@ def execute_remote_wf(
options: typing.Optional[Options] = None,
wait: bool = False,
type_hints: typing.Optional[typing.Dict[str, typing.Type]] = None,
overwrite_cache: bool = None,
) -> FlyteWorkflowExecution:
"""Execute a FlyteWorkflow.
Expand All @@ -1028,6 +1045,7 @@ def execute_remote_wf(
options=options,
wait=wait,
type_hints=type_hints,
overwrite_cache=overwrite_cache,
)

# Flytekit Entities
Expand All @@ -1044,6 +1062,7 @@ def execute_local_task(
execution_name: str = None,
image_config: typing.Optional[ImageConfig] = None,
wait: bool = False,
overwrite_cache: bool = None,
) -> FlyteWorkflowExecution:
"""
Execute an @task-decorated function or TaskTemplate task.
Expand All @@ -1058,6 +1077,7 @@ def execute_local_task(
:param execution_name:
:param image_config:
:param wait:
:param overwrite_cache:
:return:
"""
resolved_identifiers = self._resolve_identifier_kwargs(entity, project, domain, name, version)
Expand All @@ -1084,6 +1104,7 @@ def execute_local_task(
execution_name=execution_name,
wait=wait,
type_hints=entity.python_interface.inputs,
overwrite_cache=overwrite_cache,
)

def execute_local_workflow(
Expand All @@ -1098,6 +1119,7 @@ def execute_local_workflow(
image_config: typing.Optional[ImageConfig] = None,
options: typing.Optional[Options] = None,
wait: bool = False,
overwrite_cache: bool = None,
) -> FlyteWorkflowExecution:
"""
Execute an @workflow decorated function.
Expand All @@ -1111,6 +1133,7 @@ def execute_local_workflow(
:param image_config:
:param options:
:param wait:
:param overwrite_cache:
:return:
"""
resolved_identifiers = self._resolve_identifier_kwargs(entity, project, domain, name, version)
Expand Down Expand Up @@ -1155,6 +1178,7 @@ def execute_local_workflow(
wait=wait,
options=options,
type_hints=entity.python_interface.inputs,
overwrite_cache=overwrite_cache,
)

def execute_local_launch_plan(
Expand All @@ -1167,6 +1191,7 @@ def execute_local_launch_plan(
execution_name: typing.Optional[str] = None,
options: typing.Optional[Options] = None,
wait: bool = False,
overwrite_cache: bool = None,
) -> FlyteWorkflowExecution:
"""
Expand All @@ -1178,6 +1203,7 @@ def execute_local_launch_plan(
:param execution_name: If specified, will be used as the execution name instead of randomly generating.
:param options:
:param wait:
:param overwrite_cache:
:return:
"""
try:
Expand All @@ -1203,6 +1229,7 @@ def execute_local_launch_plan(
options=options,
wait=wait,
type_hints=entity.python_interface.inputs,
overwrite_cache=overwrite_cache,
)

###################################
Expand Down

0 comments on commit 33e05c2

Please sign in to comment.