Skip to content

Commit

Permalink
Merge pull request #3755 from rishinair11/add_multiple_args
Browse files Browse the repository at this point in the history
Allow multiple arguments for "flux suspend/resume"
  • Loading branch information
stefanprodan authored Jun 29, 2023
2 parents 2fe86a4 + 3580d4f commit 25af5d2
Show file tree
Hide file tree
Showing 33 changed files with 317 additions and 78 deletions.
15 changes: 15 additions & 0 deletions cmd/flux/kustomization_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,11 +61,26 @@ func TestKustomizationFromGit(t *testing.T) {
"testdata/kustomization/suspend_kustomization_from_git.golden",
tmpl,
},
{
"suspend kustomization tkfg foo tkfg bar",
"testdata/kustomization/suspend_kustomization_from_git_multiple_args.golden",
tmpl,
},
{
"resume kustomization tkfg foo --wait",
"testdata/kustomization/resume_kustomization_from_git_multiple_args_wait.golden",
tmpl,
},
{
"resume kustomization tkfg",
"testdata/kustomization/resume_kustomization_from_git.golden",
tmpl,
},
{
"resume kustomization tkfg tkfg",
"testdata/kustomization/resume_kustomization_from_git_multiple_args.golden",
tmpl,
},
{
"delete kustomization tkfg --silent",
"testdata/kustomization/delete_kustomization_from_git.golden",
Expand Down
6 changes: 6 additions & 0 deletions cmd/flux/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -365,6 +365,12 @@ func executeTemplate(content string, templateValues map[string]string) (string,
// Run the command and return the captured output.
func executeCommand(cmd string) (string, error) {
defer resetCmdArgs()
defer func() {
// need to set this explicitly because apparently its value isn't changed
// in subsequent executions which causes tests to fail that rely on the value
// of "Changed".
resumeCmd.PersistentFlags().Lookup("wait").Changed = false
}()
args, err := shellwords.Parse(cmd)
if err != nil {
return "", err
Expand Down
2 changes: 1 addition & 1 deletion cmd/flux/object.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ type copyable interface {
deepCopyClientObject() client.Object
}

// listAdapater is the analogue to adapter, but for lists; the
// listAdapter is the analogue to adapter, but for lists; the
// controller runtime distinguishes between methods dealing with
// objects and lists.
type listAdapter interface {
Expand Down
179 changes: 149 additions & 30 deletions cmd/flux/resume.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ package main
import (
"context"
"fmt"
"sort"
"sync"

"github.com/spf13/cobra"
"k8s.io/apimachinery/pkg/types"
Expand Down Expand Up @@ -59,15 +61,22 @@ type resumable interface {

type resumeCommand struct {
apiType
object resumable
list listResumable
client client.WithWatch
list listResumable
namespace string
shouldReconcile bool
}

type listResumable interface {
listAdapter
resumeItem(i int) resumable
}

type reconcileResponse struct {
resumable
err error
}

func (resume resumeCommand) run(cmd *cobra.Command, args []string) error {
if len(args) < 1 && !resumeArgs.all {
return fmt.Errorf("%s name is required", resume.humanKind)
Expand All @@ -80,52 +89,162 @@ func (resume resumeCommand) run(cmd *cobra.Command, args []string) error {
if err != nil {
return err
}
resume.client = kubeClient
resume.namespace = *kubeconfigArgs.Namespace

var listOpts []client.ListOption
listOpts = append(listOpts, client.InNamespace(*kubeconfigArgs.Namespace))
if len(args) > 0 {
listOpts = append(listOpts, client.MatchingFields{
"metadata.name": args[0],
})
}
// require waiting for the object(s) if the user has not provided the --wait flag and gave exactly
// one object to resume. This is necessary to maintain backwards compatibility with prior versions
// of this command. Otherwise just follow the value of the --wait flag (including its default).
resume.shouldReconcile = !resumeCmd.PersistentFlags().Changed("wait") && len(args) == 1 || resumeArgs.wait

err = kubeClient.List(ctx, resume.list.asClientList(), listOpts...)
resumables, err := resume.getPatchedResumables(ctx, args)
if err != nil {
return err
}

var wg sync.WaitGroup
wg.Add(len(resumables))

resultChan := make(chan reconcileResponse, len(resumables))
for _, r := range resumables {
go func(res resumable) {
defer wg.Done()
resultChan <- resume.reconcile(ctx, res)
}(r)
}

go func() {
defer close(resultChan)
wg.Wait()
}()

reconcileResps := make([]reconcileResponse, 0, len(resumables))
for c := range resultChan {
reconcileResps = append(reconcileResps, c)
}

resume.printMessage(reconcileResps)

return nil
}

// getPatchedResumables returns a list of the given resumable objects that have been patched to be resumed.
// If the args slice is empty, it patches all resumable objects in the given namespace.
func (resume *resumeCommand) getPatchedResumables(ctx context.Context, args []string) ([]resumable, error) {
if len(args) < 1 {
objs, err := resume.patch(ctx, []client.ListOption{
client.InNamespace(resume.namespace),
})
if err != nil {
return nil, fmt.Errorf("failed patching objects: %w", err)
}

return objs, nil
}

var resumables []resumable
processed := make(map[string]struct{}, len(args))
for _, arg := range args {
if _, has := processed[arg]; has {
continue // skip object that user might have provided more than once
}
processed[arg] = struct{}{}

objs, err := resume.patch(ctx, []client.ListOption{
client.InNamespace(resume.namespace),
client.MatchingFields{
"metadata.name": arg,
},
})
if err != nil {
return nil, err
}

resumables = append(resumables, objs...)
}

return resumables, nil
}

// Patches resumable objects by setting their status to unsuspended.
// Returns a slice of resumables that have been patched and any error encountered during patching.
func (resume resumeCommand) patch(ctx context.Context, listOpts []client.ListOption) ([]resumable, error) {
if err := resume.client.List(ctx, resume.list.asClientList(), listOpts...); err != nil {
return nil, err
}

if resume.list.len() == 0 {
logger.Failuref("no %s objects found in %s namespace", resume.kind, *kubeconfigArgs.Namespace)
return nil
logger.Failuref("no %s objects found in %s namespace", resume.kind, resume.namespace)
return nil, nil
}

var resumables []resumable

for i := 0; i < resume.list.len(); i++ {
logger.Actionf("resuming %s %s in %s namespace", resume.humanKind, resume.list.resumeItem(i).asClientObject().GetName(), *kubeconfigArgs.Namespace)
obj := resume.list.resumeItem(i)
logger.Actionf("resuming %s %s in %s namespace", resume.humanKind, obj.asClientObject().GetName(), resume.namespace)

patch := client.MergeFrom(obj.deepCopyClientObject())
obj.setUnsuspended()
if err := kubeClient.Patch(ctx, obj.asClientObject(), patch); err != nil {
return err
if err := resume.client.Patch(ctx, obj.asClientObject(), patch); err != nil {
return nil, err
}

resumables = append(resumables, obj)

logger.Successf("%s resumed", resume.humanKind)
}

return resumables, nil
}

// Waits for resumable object to be reconciled and returns the object and any error encountered while waiting.
// Returns an empty reconcileResponse, if shouldReconcile is false.
func (resume resumeCommand) reconcile(ctx context.Context, res resumable) reconcileResponse {
if !resume.shouldReconcile {
return reconcileResponse{}
}

if resumeArgs.wait || !resumeArgs.all {
namespacedName := types.NamespacedName{
Name: resume.list.resumeItem(i).asClientObject().GetName(),
Namespace: *kubeconfigArgs.Namespace,
}

logger.Waitingf("waiting for %s reconciliation", resume.kind)
if err := wait.PollImmediate(rootArgs.pollInterval, rootArgs.timeout,
isReady(ctx, kubeClient, namespacedName, resume.list.resumeItem(i))); err != nil {
logger.Failuref(err.Error())
continue
}
logger.Successf("%s reconciliation completed", resume.kind)
logger.Successf(resume.list.resumeItem(i).successMessage())
namespacedName := types.NamespacedName{
Name: res.asClientObject().GetName(),
Namespace: resume.namespace,
}

logger.Waitingf("waiting for %s reconciliation", resume.kind)

if err := wait.PollImmediate(rootArgs.pollInterval, rootArgs.timeout,
isReady(ctx, resume.client, namespacedName, res)); err != nil {
return reconcileResponse{
resumable: res,
err: err,
}
}

return nil
return reconcileResponse{
resumable: res,
err: nil,
}
}

// Sorts the given reconcileResponses by resumable name and prints the success/error message for each response.
func (resume resumeCommand) printMessage(responses []reconcileResponse) {
sort.Slice(responses, func(i, j int) bool {
r1, r2 := responses[i], responses[j]
if r1.resumable == nil || r2.resumable == nil {
return false
}
return r1.asClientObject().GetName() <= r2.asClientObject().GetName()
})

// Print success/error message.
for _, r := range responses {
if r.resumable == nil {
continue
}
if r.err != nil {
logger.Failuref(r.err.Error())
}
logger.Successf("%s %s reconciliation completed", resume.kind, r.asClientObject().GetName())
logger.Successf(r.successMessage())
}
}
6 changes: 4 additions & 2 deletions cmd/flux/resume_alert.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,13 @@ var resumeAlertCmd = &cobra.Command{
Long: `The resume command marks a previously suspended Alert resource for reconciliation and waits for it to
finish the apply.`,
Example: ` # Resume reconciliation for an existing Alert
flux resume alert main`,
flux resume alert main
# Resume reconciliation for multiple Alerts
flux resume alert main-1 main-2`,
ValidArgsFunction: resourceNamesCompletionFunc(notificationv1.GroupVersion.WithKind(notificationv1.AlertKind)),
RunE: resumeCommand{
apiType: alertType,
object: alertAdapter{&notificationv1.Alert{}},
list: &alertListAdapter{&notificationv1.AlertList{}},
}.run,
}
Expand Down
6 changes: 4 additions & 2 deletions cmd/flux/resume_helmrelease.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,13 @@ var resumeHrCmd = &cobra.Command{
Long: `The resume command marks a previously suspended HelmRelease resource for reconciliation and waits for it to
finish the apply.`,
Example: ` # Resume reconciliation for an existing Helm release
flux resume hr podinfo`,
flux resume hr podinfo
# Resume reconciliation for multiple Helm releases
flux resume hr podinfo-1 podinfo-2`,
ValidArgsFunction: resourceNamesCompletionFunc(helmv2.GroupVersion.WithKind(helmv2.HelmReleaseKind)),
RunE: resumeCommand{
apiType: helmReleaseType,
object: helmReleaseAdapter{&helmv2.HelmRelease{}},
list: helmReleaseListAdapter{&helmv2.HelmReleaseList{}},
}.run,
}
Expand Down
6 changes: 4 additions & 2 deletions cmd/flux/resume_image_repository.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,13 @@ var resumeImageRepositoryCmd = &cobra.Command{
Short: "Resume a suspended ImageRepository",
Long: `The resume command marks a previously suspended ImageRepository resource for reconciliation and waits for it to finish.`,
Example: ` # Resume reconciliation for an existing ImageRepository
flux resume image repository alpine`,
flux resume image repository alpine
# Resume reconciliation for multiple ImageRepositories
flux resume image repository alpine-1 alpine-2`,
ValidArgsFunction: resourceNamesCompletionFunc(imagev1.GroupVersion.WithKind(imagev1.ImageRepositoryKind)),
RunE: resumeCommand{
apiType: imageRepositoryType,
object: imageRepositoryAdapter{&imagev1.ImageRepository{}},
list: imageRepositoryListAdapter{&imagev1.ImageRepositoryList{}},
}.run,
}
Expand Down
6 changes: 4 additions & 2 deletions cmd/flux/resume_image_updateauto.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,13 @@ var resumeImageUpdateCmd = &cobra.Command{
Short: "Resume a suspended ImageUpdateAutomation",
Long: `The resume command marks a previously suspended ImageUpdateAutomation resource for reconciliation and waits for it to finish.`,
Example: ` # Resume reconciliation for an existing ImageUpdateAutomation
flux resume image update latest-images`,
flux resume image update latest-images
# Resume reconciliation for multiple ImageUpdateAutomations
flux resume image update latest-images-1 latest-images-2`,
ValidArgsFunction: resourceNamesCompletionFunc(autov1.GroupVersion.WithKind(autov1.ImageUpdateAutomationKind)),
RunE: resumeCommand{
apiType: imageUpdateAutomationType,
object: imageUpdateAutomationAdapter{&autov1.ImageUpdateAutomation{}},
list: imageUpdateAutomationListAdapter{&autov1.ImageUpdateAutomationList{}},
}.run,
}
Expand Down
6 changes: 4 additions & 2 deletions cmd/flux/resume_kustomization.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,13 @@ var resumeKsCmd = &cobra.Command{
Long: `The resume command marks a previously suspended Kustomization resource for reconciliation and waits for it to
finish the apply.`,
Example: ` # Resume reconciliation for an existing Kustomization
flux resume ks podinfo`,
flux resume ks podinfo
# Resume reconciliation for multiple Kustomizations
flux resume ks podinfo-1 podinfo-2`,
ValidArgsFunction: resourceNamesCompletionFunc(kustomizev1.GroupVersion.WithKind(kustomizev1.KustomizationKind)),
RunE: resumeCommand{
apiType: kustomizationType,
object: kustomizationAdapter{&kustomizev1.Kustomization{}},
list: kustomizationListAdapter{&kustomizev1.KustomizationList{}},
}.run,
}
Expand Down
6 changes: 4 additions & 2 deletions cmd/flux/resume_receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,13 @@ var resumeReceiverCmd = &cobra.Command{
Long: `The resume command marks a previously suspended Receiver resource for reconciliation and waits for it to
finish the apply.`,
Example: ` # Resume reconciliation for an existing Receiver
flux resume receiver main`,
flux resume receiver main
# Resume reconciliation for multiple Receivers
flux resume receiver main-1 main-2`,
ValidArgsFunction: resourceNamesCompletionFunc(notificationv1.GroupVersion.WithKind(notificationv1.ReceiverKind)),
RunE: resumeCommand{
apiType: receiverType,
object: receiverAdapter{&notificationv1.Receiver{}},
list: receiverListAdapter{&notificationv1.ReceiverList{}},
}.run,
}
Expand Down
6 changes: 4 additions & 2 deletions cmd/flux/resume_source_bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,13 @@ var resumeSourceBucketCmd = &cobra.Command{
Short: "Resume a suspended Bucket",
Long: `The resume command marks a previously suspended Bucket resource for reconciliation and waits for it to finish.`,
Example: ` # Resume reconciliation for an existing Bucket
flux resume source bucket podinfo`,
flux resume source bucket podinfo
# Resume reconciliation for multiple Buckets
flux resume source bucket podinfo-1 podinfo-2`,
ValidArgsFunction: resourceNamesCompletionFunc(sourcev1.GroupVersion.WithKind(sourcev1.BucketKind)),
RunE: resumeCommand{
apiType: bucketType,
object: bucketAdapter{&sourcev1.Bucket{}},
list: bucketListAdapter{&sourcev1.BucketList{}},
}.run,
}
Expand Down
Loading

0 comments on commit 25af5d2

Please sign in to comment.