Skip to content

Commit

Permalink
feat: Allow memoization without outputs (#11379)
Browse files Browse the repository at this point in the history
Signed-off-by: Alan Clucas <[email protected]>
  • Loading branch information
Joibel authored Jul 25, 2023
1 parent 0b85a64 commit a76674c
Show file tree
Hide file tree
Showing 5 changed files with 90 additions and 21 deletions.
6 changes: 4 additions & 2 deletions docs/memoization.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,11 @@ Workflows often have outputs that are expensive to compute.
Memoization reduces cost and workflow execution time by recording the result of previously run steps:
it stores the outputs of a template into a specified cache with a variable key.

Memoization only works for steps which have outputs, if you attempt to use it on steps which do not it should not work (there are some cases where it does, but they shouldn't). It is designed for 'pure' steps, where the purpose of running the step is to calculate some outputs based upon the step's inputs, and only the inputs. Pure steps should not interact with the outside world, but workflows won't enforce this on you.
Prior to version 3.5 memoization only works for steps which have outputs, if you attempt to use it on steps which do not it should not work (there are some cases where it does, but they shouldn't). It was designed for 'pure' steps, where the purpose of running the step is to calculate some outputs based upon the step's inputs, and only the inputs. Pure steps should not interact with the outside world, but workflows won't enforce this on you.

If your steps are not there to create outputs, but you'd still like to skip running them, you should look at the [work avoidance](work-avoidance.md) technique instead of memoization.
If you are using workflows prior to version 3.5 you should look at the [work avoidance](work-avoidance.md) technique instead of memoization if your steps don't have outputs.

In version 3.5 or later all steps can be memoized, whether or not they have outputs.

## Cache Method

Expand Down
2 changes: 1 addition & 1 deletion docs/work-avoidance.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
You can make workflows faster and more robust by employing **work avoidance**. A workflow that utilizes this is simply a workflow containing steps that do not run if the work has already been done.

This technique is similar to [memoization](memoization.md) but they have distinct use cases. Work avoidance is totally in your control and you make the decisions as to have to skip the work. [Memoization](memoization.md) is a feature of Argo Workflows to automatically skip steps which generate outputs - it is designed for pure steps which only generate output based on their inputs.
This is a technique is similar to [memoization](memoization.md). Work avoidance is totally in your control and you make the decisions as to have to skip the work. [Memoization](memoization.md) is a feature of Argo Workflows to automatically skip steps which generate outputs. Prior to version 3.5 this required `outputs` to be specified, but you can use memoization for all steps and tasks in version 3.5 or later.

This simplest way to do this is to use **marker files**.

Expand Down
16 changes: 7 additions & 9 deletions workflow/controller/dag.go
Original file line number Diff line number Diff line change
Expand Up @@ -313,19 +313,17 @@ func (woc *wfOperationCtx) executeDAG(ctx context.Context, nodeName string, tmpl
return node, err
}
if outputs != nil {
node = woc.wf.GetNodeByName(nodeName)
node.Outputs = outputs
woc.wf.Status.Nodes[node.ID] = *node
if node.MemoizationStatus != nil {
c := woc.controller.cacheFactory.GetCache(controllercache.ConfigMapCache, node.MemoizationStatus.CacheName)
err := c.Save(ctx, node.MemoizationStatus.Key, node.ID, node.Outputs)
if err != nil {
woc.log.WithFields(log.Fields{"nodeID": node.ID}).WithError(err).Error("Failed to save node outputs to cache")
node.Phase = wfv1.NodeError
}
}
if node.MemoizationStatus != nil {
c := woc.controller.cacheFactory.GetCache(controllercache.ConfigMapCache, node.MemoizationStatus.CacheName)
err := c.Save(ctx, node.MemoizationStatus.Key, node.ID, outputs)
if err != nil {
woc.log.WithFields(log.Fields{"nodeID": node.ID}).WithError(err).Error("Failed to save node outputs to cache")
node.Phase = wfv1.NodeError
}
}

woc.updateOutboundNodesForTargetTasks(dagCtx, targetTasks, nodeName)

return woc.markNodePhase(nodeName, wfv1.NodeSucceeded), nil
Expand Down
71 changes: 71 additions & 0 deletions workflow/controller/operator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5128,6 +5128,77 @@ func TestConfigMapCacheLoadOperate(t *testing.T) {
}
}

var workflowCachedNoOutputs = `
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
name: memoized-workflow-test
namespace: default
spec:
entrypoint: whalesay
arguments:
parameters:
- name: message
value: hi-there-world
templates:
- name: whalesay
inputs:
parameters:
- name: message
memoize:
key: "{{inputs.parameters.message}}"
cache:
configMap:
name: whalesay-cache
container:
image: docker/whalesay:latest
command: [sh, -c]
args: ["sleep 10; cowsay {{inputs.parameters.message}} > /tmp/hello_world.txt"]
outputs:
parameters:
- name: hello
valueFrom:
path: /tmp/hello_world.txt
`

func TestConfigMapCacheLoadOperateNoOutputs(t *testing.T) {
sampleConfigMapCacheEntry := apiv1.ConfigMap{
Data: map[string]string{
"hi-there-world": `{"nodeID":"memoized-simple-workflow-5wj2p","outputs":null,"creationTimestamp":"2020-09-21T18:12:56Z"}`,
},
TypeMeta: metav1.TypeMeta{
Kind: "ConfigMap",
APIVersion: "v1",
},
ObjectMeta: metav1.ObjectMeta{
Name: "whalesay-cache",
ResourceVersion: "1630732",
Labels: map[string]string{
common.LabelKeyConfigMapType: common.LabelValueTypeConfigMapCache,
},
},
}
wf := wfv1.MustUnmarshalWorkflow(workflowCachedNoOutputs)
cancel, controller := newController()
defer cancel()

ctx := context.Background()
_, err := controller.wfclientset.ArgoprojV1alpha1().Workflows(wf.ObjectMeta.Namespace).Create(ctx, wf, metav1.CreateOptions{})
assert.NoError(t, err)
_, err = controller.kubeclientset.CoreV1().ConfigMaps("default").Create(ctx, &sampleConfigMapCacheEntry, metav1.CreateOptions{})
assert.NoError(t, err)

woc := newWorkflowOperationCtx(wf, controller)
woc.operate(ctx)

if assert.Len(t, woc.wf.Status.Nodes, 1) {
for _, node := range woc.wf.Status.Nodes {
assert.Nil(t, node.Outputs)
assert.Equal(t, wfv1.NodeSucceeded, node.Phase)
}
}
}

var workflowCachedMaxAge = `
apiVersion: argoproj.io/v1alpha1
kind: Workflow
Expand Down
16 changes: 7 additions & 9 deletions workflow/controller/steps.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,20 +151,18 @@ func (woc *wfOperationCtx) executeSteps(ctx context.Context, nodeName string, tm
return node, err
}
if outputs != nil {
node := woc.wf.GetNodeByName(nodeName)
node.Outputs = outputs
woc.addOutputsToGlobalScope(node.Outputs)
woc.wf.Status.Nodes[node.ID] = *node
if node.MemoizationStatus != nil {
c := woc.controller.cacheFactory.GetCache(controllercache.ConfigMapCache, node.MemoizationStatus.CacheName)
err := c.Save(ctx, node.MemoizationStatus.Key, node.ID, node.Outputs)
if err != nil {
woc.log.WithFields(log.Fields{"nodeID": node.ID}).WithError(err).Error("Failed to save node outputs to cache")
node.Phase = wfv1.NodeError
}
}
if node.MemoizationStatus != nil {
c := woc.controller.cacheFactory.GetCache(controllercache.ConfigMapCache, node.MemoizationStatus.CacheName)
err := c.Save(ctx, node.MemoizationStatus.Key, node.ID, outputs)
if err != nil {
woc.log.WithFields(log.Fields{"nodeID": node.ID}).WithError(err).Error("Failed to save node outputs to cache")
node.Phase = wfv1.NodeError
}
}

return woc.markNodePhase(nodeName, wfv1.NodeSucceeded), nil
}

Expand Down

0 comments on commit a76674c

Please sign in to comment.