Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: persist archived workflows with Persisted label (#11367) #11413

Merged
merged 1 commit into from
Jul 21, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions persist/sqldb/workflow_archive.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
wfv1 "github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1"
sutils "github.com/argoproj/argo-workflows/v3/server/utils"
"github.com/argoproj/argo-workflows/v3/util/instanceid"
"github.com/argoproj/argo-workflows/v3/workflow/common"
)

const (
Expand Down Expand Up @@ -85,6 +86,7 @@ func NewWorkflowArchive(session sqlbuilder.Database, clusterName, managedNamespa
func (r *workflowArchive) ArchiveWorkflow(wf *wfv1.Workflow) error {
logCtx := log.WithFields(log.Fields{"uid": wf.UID, "labels": wf.GetLabels()})
logCtx.Debug("Archiving workflow")
wf.ObjectMeta.Labels[common.LabelKeyWorkflowArchivingStatus] = "Persisted"
workflow, err := json.Marshal(wf)
if err != nil {
return err
Expand Down Expand Up @@ -178,6 +180,8 @@ func (r *workflowArchive) ListWorkflows(namespace string, name string, namePrefi
if err != nil {
log.WithFields(log.Fields{"workflowUID": archivedWf.UID, "workflowName": archivedWf.Name}).Errorln("unable to unmarshal workflow from database")
} else {
// For backward compatibility, we should label workflow retrieved from DB as Persisted.
wf.ObjectMeta.Labels[common.LabelKeyWorkflowArchivingStatus] = "Persisted"
wfs = append(wfs, wf)
}
}
Expand Down Expand Up @@ -300,6 +304,8 @@ func (r *workflowArchive) GetWorkflow(uid string, namespace string, name string)
if err != nil {
return nil, err
}
// For backward compatibility, we should label workflow retrieved from DB as Persisted.
wf.ObjectMeta.Labels[common.LabelKeyWorkflowArchivingStatus] = "Persisted"
return wf, nil
}

Expand Down
4 changes: 2 additions & 2 deletions server/workflow/workflow_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,8 +140,8 @@ func mergeWithArchivedWorkflows(liveWfs v1alpha1.WorkflowList, archivedWfs v1alp
}

for _, v := range uidToWfs {
// The archived workflow we saved in the database will only have "Pending" as the archival status.
// We want to only keep the workflow that has the correct label to display correctly in the UI.
// The archived workflow we saved in the database have "Persisted" as the archival status.
// Prioritize 'Archived' over 'Persisted' because 'Archived' means the workflow is in the cluster
if len(v) == 1 {
mergedWfs = append(mergedWfs, v[0])
} else {
Expand Down
2 changes: 1 addition & 1 deletion server/workflow/workflow_server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -656,7 +656,7 @@ func TestMergeWithArchivedWorkflows(t *testing.T) {
Labels: map[string]string{common.LabelKeyWorkflowArchivingStatus: "Archived"}}}
wf1Archived := v1alpha1.Workflow{
ObjectMeta: metav1.ObjectMeta{UID: "1", CreationTimestamp: metav1.Time{Time: timeNow.Add(time.Second)},
Labels: map[string]string{common.LabelKeyWorkflowArchivingStatus: "Pending"}}}
Labels: map[string]string{common.LabelKeyWorkflowArchivingStatus: "Persisted"}}}
wf2 := v1alpha1.Workflow{
ObjectMeta: metav1.ObjectMeta{UID: "2", CreationTimestamp: metav1.Time{Time: timeNow.Add(2 * time.Second)}}}
wf3 := v1alpha1.Workflow{
Expand Down
16 changes: 14 additions & 2 deletions test/e2e/argo_server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1255,6 +1255,7 @@ func (s *ArgoServerSuite) TestWorkflowServiceStream() {

func (s *ArgoServerSuite) TestArchivedWorkflowService() {
var uid types.UID
var name string
s.Given().
Workflow(`
metadata:
Expand All @@ -1273,6 +1274,7 @@ spec:
Then().
ExpectWorkflow(func(t *testing.T, metadata *metav1.ObjectMeta, status *wfv1.WorkflowStatus) {
uid = metadata.UID
name = metadata.Name
})
var failedUid types.UID
var failedName string
Expand Down Expand Up @@ -1399,12 +1401,22 @@ spec:
s.e().GET("/api/v1/archived-workflows/not-found").
Expect().
Status(404)
s.e().GET("/api/v1/archived-workflows/{uid}", uid).
j := s.e().GET("/api/v1/archived-workflows/{uid}", uid).
Expect().
Status(200).
JSON().
JSON()
j.
Path("$.metadata.name").
NotNull()
j.
Path(fmt.Sprintf("$.metadata.labels[\"%s\"]", common.LabelKeyWorkflowArchivingStatus)).
Equal("Persisted")
s.e().GET("/api/v1/workflows/argo/" + name).
Expect().
Status(200).
JSON().
Path(fmt.Sprintf("$.metadata.labels[\"%s\"]", common.LabelKeyWorkflowArchivingStatus)).
Equal("Archived")
})

s.Run("DeleteForRetry", func() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ export class WorkflowsToolbar extends React.Component<WorkflowsToolbarProps, {}>
)
);
}
if (deleteArchived && wf.metadata.labels[archivalStatus] === 'Pending') {
if (deleteArchived && (wf.metadata.labels[archivalStatus] === 'Archived' || wf.metadata.labels[archivalStatus] === 'Persisted')) {
promises.push(
services.workflows.deleteArchived(wf.metadata.uid, wf.metadata.namespace).catch(reason =>
ctx.notifications.show({
Expand Down
2 changes: 1 addition & 1 deletion ui/src/models/workflows.ts
Original file line number Diff line number Diff line change
Expand Up @@ -542,7 +542,7 @@ export const execSpec = (w: Workflow) => Object.assign({}, w.status.storedWorkfl
export const archivalStatus = 'workflows.argoproj.io/workflow-archiving-status';

export function isArchivedWorkflow(wf: Workflow): boolean {
return wf.metadata.labels && wf.metadata.labels[archivalStatus] === 'Archived';
return wf.metadata.labels && (wf.metadata.labels[archivalStatus] === 'Archived' || wf.metadata.labels[archivalStatus] === 'Persisted');
}

export type NodeType = 'Pod' | 'Container' | 'Steps' | 'StepGroup' | 'DAG' | 'Retry' | 'Skipped' | 'TaskGroup' | 'Suspend';
Expand Down
3 changes: 2 additions & 1 deletion workflow/common/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,8 @@ const (
// LabelKeyWorkflowArchivingStatus indicates if a workflow needs archiving or not:
// * `` - does not need archiving ... yet
// * `Pending` - pending archiving
// * `Archived` - has been archived
// * `Archived` - has been archived and has live manifest
// * `Persisted` - has been archived and retrieved from db
// See also `LabelKeyCompleted`.
LabelKeyWorkflowArchivingStatus = workflow.WorkflowFullName + "/workflow-archiving-status"
// LabelKeyWorkflow is the pod metadata label to indicate the associated workflow name
Expand Down