Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(cli): add archive resubmit command. Fixes #7910 #8166

Merged
merged 10 commits into from
Mar 23, 2022

Conversation

dpadhiar
Copy link
Member

@dpadhiar dpadhiar commented Mar 16, 2022

Fixes #7910

It is currently only possible for an archived workflow to be resubmitted through the CLI by using kubectl create then argo resubmit. This feature removes the need to do this set of operations by resubmitting workflows directly from the archive with the argo CLI: argo archive resubmit {uid}. This cli command supports the same field selector, label selector and wait, watch or log flags as argo resubmit.

Example archive with examples/hello-world.yaml and examples/http-hello-world.yaml:

NAMESPACE   NAME                  STATUS      AGE   DURATION   PRIORITY   P/R/C   PARAMETERS   UID
argo        http-template-szmtn   Succeeded   1m    5s         0          0/0/0                36077313-3c04-4ffc-89ac-eb8ee2aac41f
argo        hello-world-p2skq     Succeeded   8m    20s        0          0/0/0                962342ce-a54d-4a71-aff8-a3a4aad85158

Resubmitting one workflow:
dist/argo archive resubmit 962342ce-a54d-4a71-aff8-a3a4aad85158

Name:                hello-world-9dkdx
Namespace:           argo
ServiceAccount:      unset (will run with the default ServiceAccount)
Status:              
Created:             Thu Mar 17 14:19:39 -0700 (now)

Resubmitting multiple workflows:
dist/argo archive resubmit 962342ce-a54d-4a71-aff8-a3a4aad85158 36077313-3c04-4ffc-89ac-eb8ee2aac41f

Name:                hello-world-gl2hp
Namespace:           argo
ServiceAccount:      unset (will run with the default ServiceAccount)
Status:              
Created:             Thu Mar 17 14:21:18 -0700 (now)
Name:                http-template-pjr45
Namespace:           argo
ServiceAccount:      unset (will run with the default ServiceAccount)
Status:              
Created:             Thu Mar 17 14:21:18 -0700 (now)

Resubmitting one workflow with --watch:
dist/argo archive resubmit 962342ce-a54d-4a71-aff8-a3a4aad85158 --watch

Name:                hello-world-9dkdx
Name:                hello-world-5d7w9
Namespace:           argo
ServiceAccount:      unset (will run with the default ServiceAccount)
Status:              Succeeded
Created:             Thu Mar 17 14:22:25 -0700 (7 seconds ago)
Started:             Thu Mar 17 14:22:25 -0700 (7 seconds ago)
Finished:            Thu Mar 17 14:22:32 -0700 (now)
Duration:            7 seconds

Resubmitting workflows with label selector:
dist/argo archive resubmit -l workflows.argoproj.io/test=true

Name:                http-template-rrbq8
Namespace:           argo
ServiceAccount:      unset (will run with the default ServiceAccount)
Status:              
Created:             Thu Mar 17 14:25:27 -0700 (now)
Name:                http-template-rmjpf
Namespace:           argo
ServiceAccount:      unset (will run with the default ServiceAccount)
Status:              
Created:             Thu Mar 17 14:25:27 -0700 (now)

Resubmitting workflows with field selector:
dist/argo archive resubmit --field-selector metadata.namespace=argo

Name:                http-template-624db
Namespace:           argo
ServiceAccount:      unset (will run with the default ServiceAccount)
Status:              
Created:             Thu Mar 17 14:26:34 -0700 (now)
Name:                hello-world-rzpwn
Namespace:           argo
ServiceAccount:      unset (will run with the default ServiceAccount)
Status:              
Created:             Thu Mar 17 14:26:34 -0700 (now)

@dpadhiar dpadhiar marked this pull request as ready for review March 17, 2022 21:28
@dpadhiar dpadhiar requested a review from alexec March 17, 2022 21:29
@alexec alexec enabled auto-merge (squash) March 17, 2022 21:29
log bool // --log
}

func waitWatchOrLog(ctx context.Context, serviceClient workflowpkg.WorkflowServiceClient, namespace string, workflowNames []string, cliSubmitOpts cliSubmitOpts) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this looks like copy-and-paste?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It wasn't possible to use the functions like waitWatchorLog without a cycle error and this keeps us from having the same functionality as argo resubmit with support for these flags. Bala and I's solution was to move these functions we need to a common.go to use.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what if you make the func s public, and then move to a new package?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can do that, would we move all the command files into a new package then

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no, just the shared code

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

still needs doing

Copy link
Member Author

@dpadhiar dpadhiar Mar 21, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can move the functions required for waitWatchOrLog to a new package in cmd/argo/commands called util:

Files / functions needed:

wait.go

// waitWorkflows waits for the given workflowNames.
func WaitWorkflows(ctx context.Context, serviceClient workflowpkg.WorkflowServiceClient, namespace string, workflowNames []string, ignoreNotFound, quiet bool) {
	var wg sync.WaitGroup
	wfSuccessStatus := true

	for _, name := range workflowNames {
		wg.Add(1)
		go func(name string) {
			if !waitOnOne(serviceClient, ctx, name, namespace, ignoreNotFound, quiet) {
				wfSuccessStatus = false
			}
			wg.Done()
		}(name)

	}
	wg.Wait()

	if !wfSuccessStatus {
		os.Exit(1)
	}
}

func waitOnOne(serviceClient workflowpkg.WorkflowServiceClient, ctx context.Context, wfName, namespace string, ignoreNotFound, quiet bool) bool {
	req := &workflowpkg.WatchWorkflowsRequest{
		Namespace: namespace,
		ListOptions: &metav1.ListOptions{
			FieldSelector:   util.GenerateFieldSelectorFromWorkflowName(wfName),
			ResourceVersion: "0",
		},
	}
	stream, err := serviceClient.WatchWorkflows(ctx, req)
	if err != nil {
		if status.Code(err) == codes.NotFound && ignoreNotFound {
			return true
		}
		errors.CheckError(err)
		return false
	}
	for {
		event, err := stream.Recv()
		if err == io.EOF {
			log.Debug("Re-establishing workflow watch")
			stream, err = serviceClient.WatchWorkflows(ctx, req)
			errors.CheckError(err)
			continue
		}
		errors.CheckError(err)
		if event == nil {
			continue
		}
		wf := event.Object
		if !wf.Status.FinishedAt.IsZero() {
			if !quiet {
				fmt.Printf("%s %s at %v\n", wfName, wf.Status.Phase, wf.Status.FinishedAt)
			}
			if wf.Status.Phase == wfv1.WorkflowFailed || wf.Status.Phase == wfv1.WorkflowError {
				return false
			}
			return true
		}
	}
}

watch.go

func WatchWorkflow(ctx context.Context, serviceClient workflowpkg.WorkflowServiceClient, namespace string, workflow string, getArgs getFlags) {
	req := &workflowpkg.WatchWorkflowsRequest{
		Namespace: namespace,
		ListOptions: &metav1.ListOptions{
			FieldSelector:   util.GenerateFieldSelectorFromWorkflowName(workflow),
			ResourceVersion: "0",
		},
	}
	stream, err := serviceClient.WatchWorkflows(ctx, req)
	errors.CheckError(err)

	wfChan := make(chan *wfv1.Workflow)
	go func() {
		for {
			event, err := stream.Recv()
			if err == io.EOF {
				log.Debug("Re-establishing workflow watch")
				stream, err = serviceClient.WatchWorkflows(ctx, req)
				errors.CheckError(err)
				continue
			}
			errors.CheckError(err)
			if event == nil {
				continue
			}
			wfChan <- event.Object
		}
	}()

	var wf *wfv1.Workflow
	ticker := time.NewTicker(time.Second)
	for {
		select {
		case newWf := <-wfChan:
			// If we get a new event, update our workflow
			if newWf == nil {
				return
			}
			wf = newWf
		case <-ticker.C:
			// If we don't, refresh the workflow screen every second
		case <-ctx.Done():
			// When the context gets canceled
			return
		}

		printWorkflowStatus(wf, getArgs)
		if wf != nil && !wf.Status.FinishedAt.IsZero() {
			return
		}
	}
}

func printWorkflowStatus(wf *wfv1.Workflow, getArgs getFlags) {
	if wf == nil {
		return
	}
	err := packer.DecompressWorkflow(wf)
	errors.CheckError(err)
	print("\033[H\033[2J")
	print("\033[0;0H")
	fmt.Print(PrintWorkflowHelper(wf, getArgs))
}

logs.go

func LogWorkflow(ctx context.Context, serviceClient workflowpkg.WorkflowServiceClient, namespace, workflow, podName, grep, selector string, logOptions *corev1.PodLogOptions) {
	// logs
	stream, err := serviceClient.WorkflowLogs(ctx, &workflowpkg.WorkflowLogRequest{
		Name:       workflow,
		Namespace:  namespace,
		PodName:    podName,
		LogOptions: logOptions,
		Selector:   selector,
		Grep:       grep,
	})
	errors.CheckError(err)

	// loop on log lines
	for {
		event, err := stream.Recv()
		if err == io.EOF {
			return
		}
		errors.CheckError(err)
		fmt.Println(ansiFormat(fmt.Sprintf("%s: %s", event.PodName, event.Content), ansiColorCode(event.PodName)))
	}
}

It would be necessary for get.go to be included for printWorkflowHelper and getFlags struct.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

discussed with @sarabala1979: will rename package to common since util isn't meant for these workflow functions, will also move common.go into this package

auto-merge was automatically disabled March 22, 2022 21:31

Head branch was pushed to by a user without write access

cmd/argo/commands/watch.go Outdated Show resolved Hide resolved
Signed-off-by: Dillen Padhiar <[email protected]>
@dpadhiar dpadhiar requested a review from alexec March 22, 2022 23:05
Copy link
Contributor

@alexec alexec left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🚀 🥇 good work!

@alexec alexec merged commit 8c77e89 into argoproj:master Mar 23, 2022
dpadhiar added a commit to dpadhiar/argo-workflows that referenced this pull request Mar 23, 2022
@sarabala1979 sarabala1979 mentioned this pull request Apr 14, 2022
85 tasks
@agilgur5 agilgur5 changed the title feat: add archive resubmit command to argo CLI. Fixes #7910 feat(cli): add archive resubmit command. Fixes #7910 Oct 7, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

CLI: Add archive resubmit command
3 participants