You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
Dynamic workflows work by compiling a workflow closure defining subnodes which is then subsequently evaluated. In this process, Flyte assigns each subnode an incremental unique node ID which is used to track status, etc. Since these node IDs are incremental values, in scenarios where the dynamic task has some non-deterministic ordering a node ID in one execution of the dynamic workflow may not represent the same subnode as another execution.
This is problematic in partially completed dynamic workflows because the recovery mechanisms rely on node IDs to reuse previously completed executions. Under certain circumstances recovery of dynamic tasks will corrupt the result data.
Expected behavior
Recovery should work in all scenarios for dynamic workflows.
One solution is to ensure that the recovered dynamic workflow reuses the previously computed dynamic workflow closure rather than recomputing it. This means that node IDs would be the same for both the original execution and the recovered execution.
Additional context to reproduce
To reproduce run the following workflow with an input dict:
import time
from flytekit import Resources, dynamic, task, workflow
from typing import Dict
@task(requests=Resources(cpu="500m", mem="400Mi"), limits=Resources(cpu="1", mem="600Mi"))
def echo(a: int) -> int:
time.sleep(10)
return a
@task(requests=Resources(cpu="500m", mem="400Mi"), limits=Resources(cpu="1", mem="600Mi"))
def sleep(a: int, b: Dict[str, int]) -> Dict[str, int]:
time.sleep(a)
return b
@dynamic(cache=True, cache_version="1.0", requests=Resources(cpu="500m", mem="400Mi"), limits=Resources(cpu="1", mem="600Mi"))
def dynamic_dict(a: Dict[str, int]) -> Dict[str, int]:
b = {}
for x, y in a.items():
b[x] = echo(a=y)
return b
@workflow
def dynamic_dict_wf(a: Dict[str, int]) -> Dict[str, int]:
b = dynamic_dict(a=a)
c = sleep(a=10, b=b)
return c
Then manually fail of the of the dynamic subnodes by repeatedly deleting the pod in k8s before it completes until Flyte exhausts the number of completions. Then use the 'Recover' button in the UI to start a workflow recovery. Analyze output values of the dynamic task to see incorrect data.
Screenshots
An example of the reproduce context. Launched the aforementioned workflow with inputs:
{
"foo": 1,
"bar": 2,
"baz": 4,
}
Manually failed the n0-0-dn1 node to fail the workflow, the result is below:
In this execution nodes n0-0-dn0 and n0-0-dn2 correspond to the input values 4 and 2.
So this means the n0-0-dn1 should be responsible for 1 because the python iteration over this Dict was orderd 4,1,2.
Then we recover the workflow and allow it to succeed and see that the output values are:
{
"bar": 2,
"baz": 2,
"foo": 4,
}
This is because the dynamic task recompiled the workflow closure which resulted in a different ordering of the Dict iteration. The results from n0-0-dn0 and n0-0-dn2 were reused - albeit applied to different keys (ie. bar & foo rather than baz and bar) and n0-0-dn1 was executed.
Are you sure this issue hasn't been raised already?
Yes
Have you read the Code of Conduct?
Yes
The text was updated successfully, but these errors were encountered:
Describe the bug
Dynamic workflows work by compiling a workflow closure defining subnodes which is then subsequently evaluated. In this process, Flyte assigns each subnode an incremental unique node ID which is used to track status, etc. Since these node IDs are incremental values, in scenarios where the dynamic task has some non-deterministic ordering a node ID in one execution of the dynamic workflow may not represent the same subnode as another execution.
This is problematic in partially completed dynamic workflows because the recovery mechanisms rely on node IDs to reuse previously completed executions. Under certain circumstances recovery of dynamic tasks will corrupt the result data.
Expected behavior
Recovery should work in all scenarios for dynamic workflows.
One solution is to ensure that the recovered dynamic workflow reuses the previously computed dynamic workflow closure rather than recomputing it. This means that node IDs would be the same for both the original execution and the recovered execution.
Additional context to reproduce
To reproduce run the following workflow with an input dict:
Then manually fail of the of the dynamic subnodes by repeatedly deleting the pod in k8s before it completes until Flyte exhausts the number of completions. Then use the 'Recover' button in the UI to start a workflow recovery. Analyze output values of the dynamic task to see incorrect data.
Screenshots
An example of the reproduce context. Launched the aforementioned workflow with inputs:
Manually failed the
n0-0-dn1
node to fail the workflow, the result is below:In this execution nodes
n0-0-dn0
andn0-0-dn2
correspond to the input values4
and2
.So this means the
n0-0-dn1
should be responsible for1
because the python iteration over thisDict
was orderd4,1,2
.Then we recover the workflow and allow it to succeed and see that the output values are:
This is because the dynamic task recompiled the workflow closure which resulted in a different ordering of the
Dict
iteration. The results fromn0-0-dn0
andn0-0-dn2
were reused - albeit applied to different keys (ie.bar
&foo
rather thanbaz
andbar
) andn0-0-dn1
was executed.Are you sure this issue hasn't been raised already?
Have you read the Code of Conduct?
The text was updated successfully, but these errors were encountered: