Skip to content

Commit

Permalink
fix: prevent update race in workflow cache (Fixes #9574) (#12233)
Browse files Browse the repository at this point in the history
Signed-off-by: Dennis Lawler <[email protected]>
Signed-off-by: Dennis Lawler <[email protected]>
  • Loading branch information
drawlerr authored Jan 16, 2024
1 parent e20f312 commit 1202ae4
Showing 1 changed file with 37 additions and 14 deletions.
51 changes: 37 additions & 14 deletions workflow/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -648,13 +648,23 @@ func (wfc *WorkflowController) deleteOffloadedNodesForWorkflow(uid string, versi
if !ok {
return fmt.Errorf("object %+v is not an unstructured", workflows[0])
}
key := un.GetNamespace() + "/" + un.GetName()
wfc.workflowKeyLock.Lock(key)
defer wfc.workflowKeyLock.Unlock(key)

obj, ok := wfc.getWorkflowByKey(key)
if !ok {
return fmt.Errorf("failed to get workflow by key after locking")
}
un, ok = obj.(*unstructured.Unstructured)
if !ok {
return fmt.Errorf("object %+v is not an unstructured", obj)
}
wf, err = util.FromUnstructured(un)
if err != nil {
return err
}
key := wf.ObjectMeta.Namespace + "/" + wf.ObjectMeta.Name
wfc.workflowKeyLock.Lock(key)
defer wfc.workflowKeyLock.Unlock(key)

// workflow might still be hydrated
if wfc.hydrator.IsHydrated(wf) {
log.WithField("uid", wf.UID).Info("Hydrated workflow encountered")
Expand Down Expand Up @@ -728,20 +738,14 @@ func (wfc *WorkflowController) processNextItem(ctx context.Context) bool {
}
defer wfc.wfQueue.Done(key)

obj, exists, err := wfc.wfInformer.GetIndexer().GetByKey(key.(string))
if err != nil {
log.WithFields(log.Fields{"key": key, "error": err}).Error("Failed to get workflow from informer")
return true
}
if !exists {
// This happens after a workflow was labeled with completed=true
// or was deleted, but the work queue still had an entry for it.
return true
}

wfc.workflowKeyLock.Lock(key.(string))
defer wfc.workflowKeyLock.Unlock(key.(string))

obj, ok := wfc.getWorkflowByKey(key.(string))
if !ok {
return true
}

// The workflow informer receives unstructured objects to deal with the possibility of invalid
// workflow manifests that are unable to unmarshal to workflow objects
un, ok := obj.(*unstructured.Unstructured)
Expand Down Expand Up @@ -815,6 +819,20 @@ func (wfc *WorkflowController) processNextItem(ctx context.Context) bool {
return true
}

func (wfc *WorkflowController) getWorkflowByKey(key string) (interface{}, bool) {
obj, exists, err := wfc.wfInformer.GetIndexer().GetByKey(key)
if err != nil {
log.WithFields(log.Fields{"key": key, "error": err}).Error("Failed to get workflow from informer")
return nil, false
}
if !exists {
// This happens after a workflow was labeled with completed=true
// or was deleted, but the work queue still had an entry for it.
return nil, false
}
return obj, true
}

func reconciliationNeeded(wf metav1.Object) bool {
return wf.GetLabels()[common.LabelKeyCompleted] != "true" || slices.Contains(wf.GetFinalizers(), common.FinalizerArtifactGC)
}
Expand Down Expand Up @@ -1018,6 +1036,11 @@ func (wfc *WorkflowController) archiveWorkflow(ctx context.Context, obj interfac
}
wfc.workflowKeyLock.Lock(key)
defer wfc.workflowKeyLock.Unlock(key)
key, err = cache.MetaNamespaceKeyFunc(obj)
if err != nil {
log.Error("failed to get key for object after locking")
return
}
err = wfc.archiveWorkflowAux(ctx, obj)
if err != nil {
log.WithField("key", key).WithError(err).Error("failed to archive workflow")
Expand Down

0 comments on commit 1202ae4

Please sign in to comment.