From cbcb386efd896c116db1b968a8a9466d46b26aec Mon Sep 17 00:00:00 2001 From: Ilya Dmitrichenko Date: Mon, 1 Apr 2019 12:15:52 +0100 Subject: [PATCH] Rework stack management - more visibility into what task manager does - print clear descriptions of what is being done - enable addition of `--dry-run` to every command - cleaner main functions for create and delete commands - more general abstraction for tasks - expose all common operations through functions - remove special purpose taks handler functions --- pkg/cfn/manager/api.go | 71 +++++---- pkg/cfn/manager/cluster.go | 17 +- pkg/cfn/manager/create_tasks.go | 49 ++++++ pkg/cfn/manager/delete_tasks.go | 87 ++++++++++ pkg/cfn/manager/deprecated.go | 125 ++++++++++----- pkg/cfn/manager/nodegroup.go | 42 +---- pkg/cfn/manager/tasks.go | 272 +++++++++++++++++++------------- pkg/ctl/create/cluster.go | 6 +- pkg/ctl/create/nodegroup.go | 4 +- pkg/ctl/delete/cluster.go | 101 ++++++------ pkg/ctl/delete/nodegroup.go | 21 +-- pkg/eks/eks.go | 17 -- 12 files changed, 491 insertions(+), 321 deletions(-) create mode 100644 pkg/cfn/manager/create_tasks.go create mode 100644 pkg/cfn/manager/delete_tasks.go diff --git a/pkg/cfn/manager/api.go b/pkg/cfn/manager/api.go index c51ebf8408..96d69cae34 100644 --- a/pkg/cfn/manager/api.go +++ b/pkg/cfn/manager/api.go @@ -33,7 +33,7 @@ type StackInfo struct { Template *string } -// ChangeSet represents a Cloudformation changeSet +// ChangeSet represents a CloudFormation ChangeSet type ChangeSet = cloudformation.DescribeChangeSetOutput // StackCollection stores the CloudFormation stack information @@ -62,7 +62,7 @@ func NewStackCollection(provider api.ClusterProvider, spec *api.ClusterConfig) * } } -// DoCreateStackRequest requests the creation of a cloudformation stack. +// DoCreateStackRequest requests the creation of a CloudFormation stack func (c *StackCollection) DoCreateStackRequest(i *Stack, templateBody []byte, tags, parameters map[string]string, withIAM bool, withNamedIAM bool) error { input := &cloudformation.CreateStackInput{ StackName: i.StackName, @@ -121,12 +121,14 @@ func (c *StackCollection) CreateStack(name string, stack builder.ResourceSet, ta return err } + logger.Info("deploying stack %q", name) + go c.waitUntilStackIsCreated(i, stack, errs) return nil } -// UpdateStack will update a cloudformation stack by creating and executing a ChangeSet. +// UpdateStack will update a CloudFormation stack by creating and executing a ChangeSet func (c *StackCollection) UpdateStack(stackName string, changeSetName string, description string, template []byte, parameters map[string]string) error { logger.Info(description) i := &Stack{StackName: &stackName} @@ -226,8 +228,8 @@ func defaultStackStatusFilter() []*string { ) } -// DeleteStack kills a stack by name without waiting for DELETED status -func (c *StackCollection) DeleteStack(name string, force bool) (*Stack, error) { +// DeleteStackByName sends a request to delete the stack +func (c *StackCollection) DeleteStackByName(name string) (*Stack, error) { i := &Stack{StackName: &name} s, err := c.DescribeStack(i) if err != nil { @@ -243,14 +245,32 @@ func (c *StackCollection) DeleteStack(name string, force bool) (*Stack, error) { } return nil, err } - if *s.StackStatus == cloudformation.StackStatusDeleteFailed && !force { - return nil, fmt.Errorf("stack %q previously couldn't be deleted", name) - } i.StackId = s.StackId + return c.DeleteStackBySpec(i) +} + +// WaitDeleteStackByName sends a request to delete the stack, and waits until status is DELETE_COMPLETE; +// any errors will be written to errs channel, assume completion when nil is written, do not expect +// more then one error value on the channel, it's closed immediately after it is written to +func (c *StackCollection) WaitDeleteStackByName(name string, errs chan error) error { + i, err := c.DeleteStackByName(name) + if err != nil { + return err + } + + logger.Info("waiting for stack %q to get deleted", *i.StackName) + + go c.waitUntilStackIsDeleted(i, errs) + + return nil +} + +// DeleteStackBySpec sends a request to delete the stack +func (c *StackCollection) DeleteStackBySpec(s *Stack) (*Stack, error) { for _, tag := range s.Tags { if *tag.Key == api.ClusterNameTag && *tag.Value == c.spec.Metadata.Name { input := &cloudformation.DeleteStackInput{ - StackName: i.StackId, + StackName: s.StackId, } if cfnRole := c.provider.CloudFormationRoleARN(); cfnRole != "" { @@ -258,10 +278,10 @@ func (c *StackCollection) DeleteStack(name string, force bool) (*Stack, error) { } if _, err := c.provider.CloudFormation().DeleteStack(input); err != nil { - return nil, errors.Wrapf(err, "not able to delete stack %q", name) + return nil, errors.Wrapf(err, "not able to delete stack %q", s.StackName) } - logger.Info("will delete stack %q", name) - return i, nil + logger.Info("will delete stack %q", *s.StackName) + return s, nil } } @@ -269,12 +289,11 @@ func (c *StackCollection) DeleteStack(name string, force bool) (*Stack, error) { fmt.Sprintf("%s:%s", api.ClusterNameTag, c.spec.Metadata.Name)) } -// WaitDeleteStack kills a stack by name and waits for DELETED status; -// any errors will be written to errs channel, when nil is written, -// assume completion, do not expect more then one error value on the -// channel, it's closed immediately after it is written to -func (c *StackCollection) WaitDeleteStack(name string, force bool, errs chan error) error { - i, err := c.DeleteStack(name, force) +// WaitDeleteStackBySpec sends a request to delete the stack, and waits until status is DELETE_COMPLETE; +// any errors will be written to errs channel, assume completion when nil is written, do not expect +// more then one error value on the channel, it's closed immediately after it is written to +func (c *StackCollection) WaitDeleteStackBySpec(s *Stack, errs chan error) error { + i, err := c.DeleteStackBySpec(s) if err != nil { return err } @@ -286,18 +305,6 @@ func (c *StackCollection) WaitDeleteStack(name string, force bool, errs chan err return nil } -// BlockingWaitDeleteStack kills a stack by name and waits for DELETED status -func (c *StackCollection) BlockingWaitDeleteStack(name string, force bool) error { - i, err := c.DeleteStack(name, force) - if err != nil { - return err - } - - logger.Info("waiting for stack %q to get deleted", *i.StackName) - - return c.doWaitUntilStackIsDeleted(i) -} - func fmtStacksRegexForCluster(name string) string { const ourStackRegexFmt = "^(eksctl|EKS)-%s-((cluster|nodegroup-.+)|(VPC|ServiceRole|ControlPlane|DefaultNodeGroup))$" return fmt.Sprintf(ourStackRegexFmt, name) @@ -315,7 +322,7 @@ func (c *StackCollection) DescribeStacks() ([]*Stack, error) { return stacks, nil } -// DescribeStackEvents describes the occurred stack events +// DescribeStackEvents describes the events that have occurred on the stack func (c *StackCollection) DescribeStackEvents(i *Stack) ([]*cloudformation.StackEvent, error) { input := &cloudformation.DescribeStackEventsInput{ StackName: i.StackName, @@ -410,7 +417,7 @@ func (c *StackCollection) doExecuteChangeSet(stackName string, changeSetName str return nil } -// DescribeStackChangeSet gets a cloudformation changeset. +// DescribeStackChangeSet describes a ChangeSet by name func (c *StackCollection) DescribeStackChangeSet(i *Stack, changeSetName string) (*ChangeSet, error) { input := &cloudformation.DescribeChangeSetInput{ StackName: i.StackName, diff --git a/pkg/cfn/manager/cluster.go b/pkg/cfn/manager/cluster.go index 9ef4b26a7d..04e64835b6 100644 --- a/pkg/cfn/manager/cluster.go +++ b/pkg/cfn/manager/cluster.go @@ -25,8 +25,8 @@ func (c *StackCollection) makeClusterStackName() string { return "eksctl-" + c.spec.Metadata.Name + "-cluster" } -// CreateCluster creates the cluster -func (c *StackCollection) CreateCluster(errs chan error, _ interface{}) error { +// createClusterTask creates the cluster +func (c *StackCollection) createClusterTask(errs chan error) error { name := c.makeClusterStackName() logger.Info("building cluster stack %q", name) stack := builder.NewClusterResourceSet(c.provider, c.spec) @@ -34,7 +34,7 @@ func (c *StackCollection) CreateCluster(errs chan error, _ interface{}) error { return err } - // Unlike with `CreateNodeGroup`, all tags are already set for the cluster stack + // Unlike with `createNodeGroupTask`, all tags are already set for the cluster stack return c.CreateStack(name, stack, nil, nil, errs) } @@ -56,17 +56,6 @@ func (c *StackCollection) DescribeClusterStack() (*Stack, error) { return nil, nil } -// DeleteCluster deletes the cluster -func (c *StackCollection) DeleteCluster(force bool) error { - _, err := c.DeleteStack(c.makeClusterStackName(), force) - return err -} - -// WaitDeleteCluster waits till the cluster is deleted -func (c *StackCollection) WaitDeleteCluster(force bool) error { - return c.BlockingWaitDeleteStack(c.makeClusterStackName(), force) -} - // AppendNewClusterStackResource will update cluster // stack with new resources in append-only way func (c *StackCollection) AppendNewClusterStackResource(dryRun bool) (bool, error) { diff --git a/pkg/cfn/manager/create_tasks.go b/pkg/cfn/manager/create_tasks.go new file mode 100644 index 0000000000..eb03072e23 --- /dev/null +++ b/pkg/cfn/manager/create_tasks.go @@ -0,0 +1,49 @@ +package manager + +import ( + "fmt" + + "k8s.io/apimachinery/pkg/util/sets" +) + +// CreateTasksForClusterWithNodeGroups defines all tasks required to create a cluster along +// with some nodegroups; see CreateAllNodeGroups for how onlyNodeGroupSubset works +func (c *StackCollection) CreateTasksForClusterWithNodeGroups(onlyNodeGroupSubset sets.String) *TaskSet { + tasks := &TaskSet{Parallel: false} + + tasks.Append( + &taskWithoutParams{ + info: fmt.Sprintf("create cluster control plane %q", c.spec.Metadata.Name), + call: c.createClusterTask, + }, + ) + + nodeGroupTasks := c.CreateTasksForNodeGroups(onlyNodeGroupSubset) + if nodeGroupTasks.Len() > 0 { + nodeGroupTasks.Sub = true + tasks.Append(nodeGroupTasks) + } + + return tasks +} + +// CreateTasksForNodeGroups defines tasks required to create all of the nodegroups if +// onlySubset is nil, otherwise just the tasks for nodegroups that are in onlySubset +// will be defined +func (c *StackCollection) CreateTasksForNodeGroups(onlySubset sets.String) *TaskSet { + tasks := &TaskSet{Parallel: true} + + for i := range c.spec.NodeGroups { + ng := c.spec.NodeGroups[i] + if onlySubset != nil && !onlySubset.Has(ng.Name) { + continue + } + tasks.Append(&taskWithNodeGroupSpec{ + info: fmt.Sprintf("create nodegroup %q", ng.Name), + nodeGroup: ng, + call: c.createNodeGroupTask, + }) + } + + return tasks +} diff --git a/pkg/cfn/manager/delete_tasks.go b/pkg/cfn/manager/delete_tasks.go new file mode 100644 index 0000000000..36321805ac --- /dev/null +++ b/pkg/cfn/manager/delete_tasks.go @@ -0,0 +1,87 @@ +package manager + +import ( + "fmt" + + "github.com/aws/aws-sdk-go/service/cloudformation" + + "k8s.io/apimachinery/pkg/util/sets" +) + +// DeleteTasksForClusterWithNodeGroups defines tasks required to delete all the nodegroup +// stacks and the cluster +func (c *StackCollection) DeleteTasksForClusterWithNodeGroups(wait bool, cleanup func(chan error, string) error) (*TaskSet, error) { + tasks := &TaskSet{Parallel: false} + + nodeGroupTasks, err := c.DeleteTasksForNodeGroups(nil, true, cleanup) + if err != nil { + return nil, err + } + if nodeGroupTasks.Len() > 0 { + nodeGroupTasks.Sub = true + tasks.Append(nodeGroupTasks) + } + + clusterStack, err := c.DescribeClusterStack() + if err != nil { + return nil, err + } + + info := fmt.Sprintf("delete cluster control plane %q", c.spec.Metadata.Name) + if wait { + tasks.Append(&taskWithStackSpec{ + info: info, + stack: clusterStack, + call: c.WaitDeleteStackBySpec, + }) + } else { + tasks.Append(&asyncTaskWithStackSpec{ + info: info, + stack: clusterStack, + call: c.DeleteStackBySpec, + }) + } + + return tasks, nil +} + +// DeleteTasksForNodeGroups defines tasks required to delete all of the nodegroups if +// onlySubset is nil, otherwise just the tasks for nodegroups that are in onlySubset +// will be defined +func (c *StackCollection) DeleteTasksForNodeGroups(onlySubset sets.String, wait bool, cleanup func(chan error, string) error) (*TaskSet, error) { + nodeGroupStacks, err := c.DescribeNodeGroupStacks() + if err != nil { + return nil, err + } + + tasks := &TaskSet{Parallel: true} + + for _, s := range nodeGroupStacks { + name := getNodeGroupName(s) + if onlySubset != nil && !onlySubset.Has(name) { + continue + } + if *s.StackStatus == cloudformation.StackStatusDeleteFailed && cleanup != nil { + tasks.Append(&taskWithNameParam{ + info: fmt.Sprintf("cleanup for nodegroup %q", name), + call: cleanup, + }) + } + info := fmt.Sprintf("delete nodegroup %q", name) + if wait { + tasks.Append(&taskWithStackSpec{ + info: info, + stack: s, + call: c.WaitDeleteStackBySpec, + }) + } else { + tasks.Append(&asyncTaskWithStackSpec{ + info: info, + stack: s, + call: c.DeleteStackBySpec, + }) + } + } + + return tasks, nil +} diff --git a/pkg/cfn/manager/deprecated.go b/pkg/cfn/manager/deprecated.go index 6da72c0508..d27756d10d 100644 --- a/pkg/cfn/manager/deprecated.go +++ b/pkg/cfn/manager/deprecated.go @@ -1,57 +1,98 @@ package manager -// DeprecatedDeleteStackVPC deletes the VPC stack -func (c *StackCollection) DeprecatedDeleteStackVPC(wait bool) error { - var err error - stackName := "EKS-" + c.spec.Metadata.Name + "-VPC" - - if wait { - err = c.BlockingWaitDeleteStack(stackName, true) - } else { - _, err = c.DeleteStack(stackName, true) - } +import ( + "fmt" + "strings" - return err -} + "github.com/aws/aws-sdk-go/aws/request" + "github.com/aws/aws-sdk-go/service/eks" + "github.com/pkg/errors" + "github.com/weaveworks/eksctl/pkg/utils/waiters" +) -// DeprecatedDeleteStackServiceRole deletes the service role stack -func (c *StackCollection) DeprecatedDeleteStackServiceRole(wait bool) error { - var err error - stackName := "EKS-" + c.spec.Metadata.Name + "-ServiceRole" +func deprecastedStackSuffices() []string { + return []string{ + "DefaultNodeGroup", + "ControlPlane", + "ServiceRole", + "VPC", + } +} +func fmtDeprecatedStacksRegexForCluster(name string) string { + const ourStackRegexFmt = "^EKS-%s-(%s)$" + return fmt.Sprintf(ourStackRegexFmt, name, strings.Join(deprecastedStackSuffices(), "|")) +} - if wait { - err = c.BlockingWaitDeleteStack(stackName, true) - } else { - _, err = c.DeleteStack(stackName, true) +// DeleteTasksForDeprecatedStacks all deprecated stacks +func (c *StackCollection) DeleteTasksForDeprecatedStacks() (*TaskSet, error) { + stacks, err := c.ListStacks(fmtDeprecatedStacksRegexForCluster(c.spec.Metadata.Name)) + if err != nil { + return nil, errors.Wrapf(err, "describing deprecated CloudFormation stacks for %q", c.spec.Metadata.Name) + } + if len(stacks) == 0 { + return nil, nil } - return err -} + deleteControlPlaneTask := &taskWithoutParams{ + info: fmt.Sprintf("delete control plane %q", c.spec.Metadata.Name), + call: func(errs chan error) error { + _, err := c.provider.EKS().DescribeCluster(&eks.DescribeClusterInput{ + Name: &c.spec.Metadata.Name, + }) + if err != nil { + return err + } -// DeprecatedDeleteStackDefaultNodeGroup deletes the default node group stack -func (c *StackCollection) DeprecatedDeleteStackDefaultNodeGroup(wait bool) error { - var err error - stackName := "EKS-" + c.spec.Metadata.Name + "-DefaultNodeGroup" + _, err = c.provider.EKS().DeleteCluster(&eks.DeleteClusterInput{ + Name: &c.spec.Metadata.Name, + }) + if err != nil { + return err + } - if wait { - err = c.BlockingWaitDeleteStack(stackName, true) - } else { - _, err = c.DeleteStack(stackName, true) - } + newRequest := func() *request.Request { + input := &eks.DescribeClusterInput{ + Name: &c.spec.Metadata.Name, + } + req, _ := c.provider.EKS().DescribeClusterRequest(input) + return req + } - return err -} + msg := fmt.Sprintf("waiting for control plane %q to be deleted", c.spec.Metadata.Name) -// DeprecatedDeleteStackControlPlane deletes the control plane stack -func (c *StackCollection) DeprecatedDeleteStackControlPlane(wait bool) error { - var err error - stackName := "EKS-" + c.spec.Metadata.Name + "-ControlPlane" + acceptors := waiters.MakeAcceptors( + "Cluster.Status", + eks.ClusterStatusDeleting, + []string{ + eks.ClusterStatusFailed, + }, + ) - if wait { - err = c.BlockingWaitDeleteStack(stackName, true) - } else { - _, err = c.DeleteStack(stackName, true) + return waiters.Wait(c.spec.Metadata.Name, msg, acceptors, newRequest, c.provider.WaitTimeout(), nil) + }, } - return err + cpStackFound := false + for _, s := range stacks { + if strings.HasSuffix(*s.StackName, "-ControlPlane") { + cpStackFound = true + } + } + tasks := &TaskSet{} + + for _, suffix := range deprecastedStackSuffices() { + for _, s := range stacks { + if strings.HasSuffix(*s.StackName, "-"+suffix) { + if suffix == "-ControlPlane" && !cpStackFound { + tasks.Append(deleteControlPlaneTask) + } else { + tasks.Append(&taskWithStackSpec{ + stack: s, + call: c.WaitDeleteStackBySpec, + }) + } + } + } + } + return tasks, nil } diff --git a/pkg/cfn/manager/nodegroup.go b/pkg/cfn/manager/nodegroup.go index 1fa77f45bc..5d1afb2b0c 100644 --- a/pkg/cfn/manager/nodegroup.go +++ b/pkg/cfn/manager/nodegroup.go @@ -38,16 +38,15 @@ type NodeGroupSummary struct { CreationTime *time.Time } -// MakeNodeGroupStackName generates the name of the node group identified by its ID, isolated by the cluster this StackCollection operates on -func (c *StackCollection) MakeNodeGroupStackName(name string) string { +// makeNodeGroupStackName generates the name of the node group identified by its ID, isolated by the cluster this StackCollection operates on +func (c *StackCollection) makeNodeGroupStackName(name string) string { return fmt.Sprintf("eksctl-%s-nodegroup-%s", c.spec.Metadata.Name, name) } -// CreateNodeGroup creates the nodegroup -func (c *StackCollection) CreateNodeGroup(errs chan error, data interface{}) error { - ng := data.(*api.NodeGroup) - name := c.MakeNodeGroupStackName(ng.Name) - logger.Info("creating nodegroup stack %q", name) +// createNodeGroupTask creates the nodegroup +func (c *StackCollection) createNodeGroupTask(errs chan error, ng *api.NodeGroup) error { + name := c.makeNodeGroupStackName(ng.Name) + logger.Info("buildings nodegroup stack %q", name) stack := builder.NewNodeGroupResourceSet(c.provider, c.spec, c.makeClusterStackName(), ng) if err := stack.AddAllResources(); err != nil { return err @@ -127,38 +126,11 @@ func (c *StackCollection) DescribeNodeGroupStacksAndResources() (map[string]Stac return allResources, nil } -// DeleteNodeGroup deletes a nodegroup stack -func (c *StackCollection) DeleteNodeGroup(name string) error { - name = c.MakeNodeGroupStackName(name) - _, err := c.DeleteStack(name, false) - return err -} - -// WaitDeleteNodeGroup waits until the nodegroup is deleted, -// it calls WaitDeleteStack without force -func (c *StackCollection) WaitDeleteNodeGroup(errs chan error, data interface{}) error { - name := c.MakeNodeGroupStackName(data.(string)) - return c.WaitDeleteStack(name, false, errs) -} - -// WaitForceDeleteNodeGroup waits until the nodegroup is deleted, -// it calls WaitDeleteStack with force -func (c *StackCollection) WaitForceDeleteNodeGroup(errs chan error, data interface{}) error { - name := c.MakeNodeGroupStackName(data.(string)) - return c.WaitDeleteStack(name, true, errs) -} - -// BlockingWaitDeleteNodeGroup waits until the nodegroup is deleted -func (c *StackCollection) BlockingWaitDeleteNodeGroup(name string, force bool) error { - name = c.MakeNodeGroupStackName(name) - return c.BlockingWaitDeleteStack(name, force) -} - // ScaleNodeGroup will scale an existing nodegroup func (c *StackCollection) ScaleNodeGroup(ng *api.NodeGroup) error { clusterName := c.makeClusterStackName() c.spec.Status = &api.ClusterStatus{StackName: clusterName} - name := c.MakeNodeGroupStackName(ng.Name) + name := c.makeNodeGroupStackName(ng.Name) logger.Info("scaling nodegroup stack %q in cluster %s", name, clusterName) // Get current stack diff --git a/pkg/cfn/manager/tasks.go b/pkg/cfn/manager/tasks.go index 5d18fed5c8..802df3500b 100644 --- a/pkg/cfn/manager/tasks.go +++ b/pkg/cfn/manager/tasks.go @@ -1,158 +1,208 @@ package manager import ( + "fmt" + "strings" "sync" - "k8s.io/apimachinery/pkg/util/sets" - "github.com/kris-nova/logger" api "github.com/weaveworks/eksctl/pkg/apis/eksctl.io/v1alpha4" ) -type taskFunc func(chan error, interface{}) error +// Task is a common interface for the stack manager tasks +type Task interface { + Do(chan error) error + Describe() string +} -// Task has a function with an opaque payload -type Task struct { - Call taskFunc - Data interface{} +// TaskSet wraps a set of tasks +type TaskSet struct { + tasks []Task + Parallel bool + DryRun bool + Sub bool } -// Run a set of tests in parallel and wait for them to complete; -// passError should take any errors and do what it needs to in -// a given context, e.g. during serial CLI-driven execution one -// can keep errors in a slice, while in a daemon channel maybe -// more suitable -func Run(passError func(error), tasks ...Task) { - wg := &sync.WaitGroup{} - wg.Add(len(tasks)) - for t := range tasks { - go func(t int) { - defer wg.Done() - logger.Debug("task %d started", t) - errs := make(chan error) - if err := tasks[t].Call(errs, tasks[t].Data); err != nil { - passError(err) - return - } - if err := <-errs; err != nil { - passError(err) - return - } - logger.Debug("task %d returned without errors", t) - }(t) +// Append new tasks to the set +func (t *TaskSet) Append(task ...Task) { + t.tasks = append(t.tasks, task...) +} + +// Len returns number of tasks in the set +func (t *TaskSet) Len() int { + if t == nil { + return 0 } - logger.Debug("waiting for %d tasks to complete", len(tasks)) - wg.Wait() + return len(t.tasks) } -// RunSingleTask runs a task with a proper error handling -func (c *StackCollection) RunSingleTask(t Task) []error { - errs := []error{} - appendErr := func(err error) { - errs = append(errs, err) +// Describe the set +func (t *TaskSet) Describe() string { + descriptions := []string{} + for _, task := range t.tasks { + descriptions = append(descriptions, task.Describe()) } - if Run(appendErr, t); len(errs) > 0 { - return errs + mode := "sequential" + if t.Parallel { + mode = "parallel" } - return nil + count := len(descriptions) + var msg string + noun := "task" + if t.Sub { + noun = "sub-task" + } + switch count { + case 0: + msg = "no tasks" + case 1: + msg = fmt.Sprintf("1 %s: { %s }", noun, descriptions[0]) + if t.Sub { + msg = descriptions[0] // simple description for single sub-task + } + default: + noun += "s" + msg = fmt.Sprintf("%d %s %s: { %s }", count, mode, noun, strings.Join(descriptions, ", ")) + } + if t.DryRun { + return "(dry-run) " + msg + } + return msg } -// CreateClusterWithNodeGroups runs all tasks required to create -// the stacks (a cluster and one or more nodegroups); any errors -// will be returned as a slice as soon as one of the tasks or group -// of tasks is completed -func (c *StackCollection) CreateClusterWithNodeGroups(onlySubset sets.String) []error { - if errs := c.RunSingleTask(Task{c.CreateCluster, nil}); len(errs) > 0 { - return errs +// Do will run through the set in the backround, it may return an error immediately, +// or eventually write to the errs channel; it will close the channel once all tasks +// are completed +func (t *TaskSet) Do(errs chan error) error { + if t.Len() == 0 || t.DryRun { + logger.Debug("no actual tasks") + close(errs) + return nil } - return c.CreateAllNodeGroups(onlySubset) + sendErr := func(err error) { + errs <- err + } + + if t.Parallel { + go doParallelTasks(sendErr, t.tasks) + } else { + go doSequentialTasks(sendErr, t.tasks) + } + return nil } -// CreateAllNodeGroups runs all tasks required to create the node groups; -// any errors will be returned as a slice as soon as one of the tasks -// or group of tasks is completed -func (c *StackCollection) CreateAllNodeGroups(onlySubset sets.String) []error { +// DoAllSync will run through the set in the foregounds and return all the errors +// in a slice +func (t *TaskSet) DoAllSync() []error { + if t.Len() == 0 || t.DryRun { + logger.Debug("no actual tasks") + return nil + } + errs := []error{} appendErr := func(err error) { errs = append(errs, err) } - createAllNodeGroups := []Task{} - for i := range c.spec.NodeGroups { - ng := c.spec.NodeGroups[i] - if onlySubset != nil && !onlySubset.Has(ng.Name) { - continue - } - t := Task{ - Call: c.CreateNodeGroup, - Data: ng, - } - createAllNodeGroups = append(createAllNodeGroups, t) + if t.Parallel { + doParallelTasks(appendErr, t.tasks) + } else { + doSequentialTasks(appendErr, t.tasks) } - if Run(appendErr, createAllNodeGroups...); len(errs) > 0 { + if len(errs) > 0 { return errs } - return nil } -// CreateOneNodeGroup runs a task to create a single node groups; -// any errors will be returned as a slice as soon as the tasks is -// completed -func (c *StackCollection) CreateOneNodeGroup(ng *api.NodeGroup) []error { - return c.RunSingleTask(Task{ - Call: c.CreateNodeGroup, - Data: ng, - }) +type taskWithoutParams struct { + info string + call func(chan error) error } -// DeleteAllNodeGroups deletes all nodegroups without waiting -func (c *StackCollection) DeleteAllNodeGroups(call taskFunc) []error { - nodeGroupStacks, err := c.DescribeNodeGroupStacks() - if err != nil { - return []error{err} - } +func (t *taskWithoutParams) Describe() string { return t.info } +func (t *taskWithoutParams) Do(errs chan error) error { + return t.call(errs) +} - errs := []error{} - for _, s := range nodeGroupStacks { - if err := c.DeleteNodeGroup(getNodeGroupName(s)); err != nil { - errs = append(errs, err) - } - } +type taskWithNameParam struct { + info string + name string + call func(chan error, string) error +} - return errs +func (t *taskWithNameParam) Describe() string { return t.info } +func (t *taskWithNameParam) Do(errs chan error) error { + return t.call(errs, t.name) } -// WaitDeleteAllNodeGroups runs all tasks required to delete all the nodegroup -// stacks and wait for all nodegroups to be deleted; any errors will be returned -// as a slice as soon as the group of tasks is completed -func (c *StackCollection) WaitDeleteAllNodeGroups(force bool) []error { - nodeGroupStacks, err := c.DescribeNodeGroupStacks() - if err != nil { - return []error{err} - } +type taskWithNodeGroupSpec struct { + info string + nodeGroup *api.NodeGroup + call func(chan error, *api.NodeGroup) error +} - errs := []error{} - appendErr := func(err error) { - errs = append(errs, err) - } +func (t *taskWithNodeGroupSpec) Describe() string { return t.info } +func (t *taskWithNodeGroupSpec) Do(errs chan error) error { + return t.call(errs, t.nodeGroup) +} - deleteAllNodeGroups := []Task{} - for i := range nodeGroupStacks { - t := Task{ - Call: c.WaitDeleteNodeGroup, - Data: getNodeGroupName(nodeGroupStacks[i]), - } - if force { - t.Call = c.WaitForceDeleteNodeGroup - } - deleteAllNodeGroups = append(deleteAllNodeGroups, t) +type taskWithStackSpec struct { + info string + stack *Stack + call func(*Stack, chan error) error +} + +func (t *taskWithStackSpec) Describe() string { return t.info } +func (t *taskWithStackSpec) Do(errs chan error) error { + return t.call(t.stack, errs) +} + +type asyncTaskWithStackSpec struct { + info string + stack *Stack + call func(*Stack) (*Stack, error) +} + +func (t *asyncTaskWithStackSpec) Describe() string { return t.info + " [async]" } +func (t *asyncTaskWithStackSpec) Do(errs chan error) error { + _, err := t.call(t.stack) + close(errs) + return err +} + +func doSingleTask(passError func(error), task Task) { + desc := task.Describe() + logger.Debug("started task: %s", desc) + errs := make(chan error) + if err := task.Do(errs); err != nil { + passError(err) + return } - if Run(appendErr, deleteAllNodeGroups...); len(errs) > 0 { - return errs + if err := <-errs; err != nil { + passError(err) + return } + logger.Debug("completed task: %s", desc) +} - return nil +func doParallelTasks(passError func(error), tasks []Task) { + wg := &sync.WaitGroup{} + wg.Add(len(tasks)) + for t := range tasks { + go func(t int) { + defer wg.Done() + doSingleTask(passError, tasks[t]) + }(t) + } + logger.Debug("waiting for %d parallel tasks to complete", len(tasks)) + wg.Wait() +} + +func doSequentialTasks(passError func(error), tasks []Task) { + for t := range tasks { + doSingleTask(passError, tasks[t]) + } } diff --git a/pkg/ctl/create/cluster.go b/pkg/ctl/create/cluster.go index deedd9a0da..d1c2b092be 100644 --- a/pkg/ctl/create/cluster.go +++ b/pkg/ctl/create/cluster.go @@ -292,9 +292,9 @@ func doCreateCluster(p *api.ProviderConfig, cfg *api.ClusterConfig, nameArg stri logger.Info("will create a CloudFormation stack for cluster itself and %d nodegroup stack(s)", ngCount) } logger.Info("if you encounter any issues, check CloudFormation console or try 'eksctl utils describe-stacks --region=%s --name=%s'", meta.Region, meta.Name) - errs := stackManager.CreateClusterWithNodeGroups(ngSubset) - // read any errors (it only gets non-nil errors) - if len(errs) > 0 { + tasks := stackManager.CreateTasksForClusterWithNodeGroups(ngSubset) + logger.Info(tasks.Describe()) + if errs := tasks.DoAllSync(); len(errs) > 0 { logger.Info("%d error(s) occurred and cluster hasn't been created properly, you may wish to check CloudFormation console", len(errs)) logger.Info("to cleanup resources, run 'eksctl delete cluster --region=%s --name=%s'", meta.Region, meta.Name) for _, err := range errs { diff --git a/pkg/ctl/create/nodegroup.go b/pkg/ctl/create/nodegroup.go index 598c2682ec..1d00d34f0b 100644 --- a/pkg/ctl/create/nodegroup.go +++ b/pkg/ctl/create/nodegroup.go @@ -156,7 +156,9 @@ func doCreateNodeGroups(p *api.ProviderConfig, cfg *api.ClusterConfig, nameArg s logger.Info("will create a CloudFormation stack for each of %d nodegroups in cluster %q", ngCount, cfg.Metadata.Name) } - errs := stackManager.CreateAllNodeGroups(ngSubset) + tasks := stackManager.CreateTasksForNodeGroups(ngSubset) + logger.Info(tasks.Describe()) + errs := tasks.DoAllSync() if len(errs) > 0 { logger.Info("%d error(s) occurred and nodegroups haven't been created properly, you may wish to check CloudFormation console", len(errs)) logger.Info("to cleanup resources, run 'eksctl delete nodegroup --region=%s --cluster=%s --name=' for each of the failed nodegroup", cfg.Metadata.Region, cfg.Metadata.Name) diff --git a/pkg/ctl/delete/cluster.go b/pkg/ctl/delete/cluster.go index fc90e95261..03430d58ad 100644 --- a/pkg/ctl/delete/cluster.go +++ b/pkg/ctl/delete/cluster.go @@ -3,9 +3,10 @@ package delete import ( "fmt" "os" - "strings" "github.com/pkg/errors" + "github.com/weaveworks/eksctl/pkg/cfn/manager" + "github.com/weaveworks/eksctl/pkg/utils/kubeconfig" "github.com/weaveworks/eksctl/pkg/vpc" "github.com/kris-nova/logger" @@ -15,7 +16,6 @@ import ( "github.com/weaveworks/eksctl/pkg/ctl/cmdutils" "github.com/weaveworks/eksctl/pkg/eks" "github.com/weaveworks/eksctl/pkg/printers" - "github.com/weaveworks/eksctl/pkg/utils/kubeconfig" ) var ( @@ -52,6 +52,30 @@ func deleteClusterCmd(g *cmdutils.Grouping) *cobra.Command { return cmd } +func handleErrors(errs []error, subject string) error { + logger.Info("%d error(s) occurred while deleting %s", len(errs), subject) + for _, err := range errs { + logger.Critical("%s\n", err.Error()) + } + return fmt.Errorf("failed to delete %s", subject) +} + +func deleteDeprecatedStacks(stackManager *manager.StackCollection) (bool, error) { + tasks, err := stackManager.DeleteTasksForDeprecatedStacks() + if err != nil { + return true, err + } + if count := tasks.Len(); count > 0 { + logger.Info(tasks.Describe()) + if errs := tasks.DoAllSync(); len(errs) > 0 { + return true, handleErrors(errs, "deprecated stacks") + } + logger.Success("deleted all %s deperecated stacks", count) + return true, nil + } + return false, nil +} + func doDeleteCluster(p *api.ProviderConfig, cfg *api.ClusterConfig, nameArg string, cmd *cobra.Command) error { printer := printers.NewJSONPrinter() @@ -76,36 +100,21 @@ func doDeleteCluster(p *api.ProviderConfig, cfg *api.ClusterConfig, nameArg stri return err } - var deletedResources []string + stackManager := ctl.NewStackManager(cfg) - handleIfError := func(err error, name string) bool { + ctl.MaybeDeletePublicSSHKey(meta.Name) + + kubeconfig.MaybeDeleteConfig(meta) + + if hasDeprectatedStacks, err := deleteDeprecatedStacks(stackManager); hasDeprectatedStacks { if err != nil { - logger.Debug("continue despite error: %v", err) - return true + return err } - logger.Debug("deleted %q", name) - deletedResources = append(deletedResources, name) - return false + return nil } - // We can remove all 'DeprecatedDelete*' calls in 0.2.0 - - stackManager := ctl.NewStackManager(cfg) - { - tryDeleteAllNodeGroups := func(force bool) error { - errs := stackManager.WaitDeleteAllNodeGroups(force) - if len(errs) > 0 { - logger.Info("%d error(s) occurred while deleting nodegroup(s)", len(errs)) - for _, err := range errs { - logger.Critical("%s\n", err.Error()) - } - return fmt.Errorf("failed to delete nodegroup(s)") - } - return nil - } - if err := tryDeleteAllNodeGroups(false); err != nil { - logger.Info("will retry deleting nodegroup") + tasks, err := stackManager.DeleteTasksForClusterWithNodeGroups(wait, func(_ chan error, _ string) error { logger.Info("trying to cleanup dangling network interfaces") if err := ctl.GetClusterVPC(cfg); err != nil { return errors.Wrapf(err, "getting VPC configuration for cluster %q", cfg.Metadata.Name) @@ -113,38 +122,24 @@ func doDeleteCluster(p *api.ProviderConfig, cfg *api.ClusterConfig, nameArg stri if err := vpc.CleanupNetworkInterfaces(ctl.Provider, cfg); err != nil { return err } - if err := tryDeleteAllNodeGroups(true); err != nil { - return err - } - } - logger.Debug("all nodegroups were deleted") - } - - var clusterErr bool - if wait { - clusterErr = handleIfError(stackManager.WaitDeleteCluster(true), "cluster") - } else { - clusterErr = handleIfError(stackManager.DeleteCluster(true), "cluster") - } + return nil + }) - if clusterErr { - if handleIfError(ctl.DeprecatedDeleteControlPlane(meta), "control plane") { - handleIfError(stackManager.DeprecatedDeleteStackControlPlane(wait), "stack control plane (deprecated)") + if err != nil { + return err } - } - handleIfError(stackManager.DeprecatedDeleteStackServiceRole(wait), "service group (deprecated)") - handleIfError(stackManager.DeprecatedDeleteStackVPC(wait), "stack VPC (deprecated)") - handleIfError(stackManager.DeprecatedDeleteStackDefaultNodeGroup(wait), "default nodegroup (deprecated)") - - ctl.MaybeDeletePublicSSHKey(meta.Name) + if tasks.Len() == 0 { + logger.Warning("no cluster resources were found for %q", meta.Name) + return nil + } - kubeconfig.MaybeDeleteConfig(meta) + logger.Info(tasks.Describe()) + if errs := tasks.DoAllSync(); len(errs) > 0 { + return handleErrors(errs, "cluster with nodegroup(s)") + } - if len(deletedResources) == 0 { - logger.Warning("no EKS cluster resources were found for %q", meta.Name) - } else { - logger.Success("the following EKS cluster resource(s) for %q will be deleted: %s. If in doubt, check CloudFormation console", meta.Name, strings.Join(deletedResources, ", ")) + logger.Success("all cluster resources were deleted") } return nil diff --git a/pkg/ctl/delete/nodegroup.go b/pkg/ctl/delete/nodegroup.go index a0ee9c1c0e..42cef36272 100644 --- a/pkg/ctl/delete/nodegroup.go +++ b/pkg/ctl/delete/nodegroup.go @@ -7,6 +7,7 @@ import ( "github.com/pkg/errors" "github.com/spf13/cobra" "github.com/spf13/pflag" + "k8s.io/apimachinery/pkg/util/sets" api "github.com/weaveworks/eksctl/pkg/apis/eksctl.io/v1alpha4" "github.com/weaveworks/eksctl/pkg/authconfigmap" @@ -115,21 +116,15 @@ func doDeleteNodeGroup(p *api.ProviderConfig, cfg *api.ClusterConfig, ng *api.No logger.Info("deleting nodegroup %q in cluster %q", ng.Name, cfg.Metadata.Name) { - var ( - err error - verb string - ) - if wait { - err = stackManager.BlockingWaitDeleteNodeGroup(ng.Name, false) - verb = "was" - } else { - err = stackManager.DeleteNodeGroup(ng.Name) - verb = "will be" - } + tasks, err := stackManager.DeleteTasksForNodeGroups(sets.NewString(ng.Name), wait, nil) if err != nil { - return errors.Wrapf(err, "failed to delete nodegroup %q", ng.Name) + return err + } + logger.Info(tasks.Describe()) + if errs := tasks.DoAllSync(); len(errs) > 0 { + return handleErrors(errs, "nodegroup(s)") } - logger.Success("nodegroup %q %s deleted", ng.Name, verb) + logger.Success("all nodes in nodegroup %q will be deleted", ng.Name) } return nil diff --git a/pkg/eks/eks.go b/pkg/eks/eks.go index 031c41ea7b..9593fdd402 100644 --- a/pkg/eks/eks.go +++ b/pkg/eks/eks.go @@ -48,23 +48,6 @@ func (c *ClusterProvider) DescribeControlPlaneMustBeActive(cl *api.ClusterMeta) return cluster, nil } -// DeprecatedDeleteControlPlane deletes the control plane -func (c *ClusterProvider) DeprecatedDeleteControlPlane(cl *api.ClusterMeta) error { - cluster, err := c.DescribeControlPlane(cl) - if err != nil { - return errors.Wrap(err, "not able to get control plane for deletion") - } - - input := &awseks.DeleteClusterInput{ - Name: cluster.Name, - } - - if _, err := c.Provider.EKS().DeleteCluster(input); err != nil { - return errors.Wrap(err, "unable to delete cluster control plane") - } - return nil -} - // GetCredentials retrieves cluster endpoint and the certificate authority data func (c *ClusterProvider) GetCredentials(spec *api.ClusterConfig) error { // Check the cluster exists and is active