From 90a336b3558a42dfbce90e21e9ccd27c1836a802 Mon Sep 17 00:00:00 2001 From: Mikkel Oscar Lyderik Larsen Date: Wed, 7 Feb 2018 22:56:10 +0100 Subject: [PATCH] Update stacks when needed --- aws/cf.go | 24 +++++--- aws/cf_test.go | 162 ++++++++++++++++++++++++++++++++++++++++++++----- controller.go | 7 +-- worker.go | 42 +++++-------- 4 files changed, 176 insertions(+), 59 deletions(-) diff --git a/aws/cf.go b/aws/cf.go index 6deed3e6..d79524a2 100644 --- a/aws/cf.go +++ b/aws/cf.go @@ -21,6 +21,7 @@ const ( // Stack is a simple wrapper around a CloudFormation Stack. type Stack struct { name string + status string dnsName string scheme string targetGroupARN string @@ -48,6 +49,14 @@ func (s *Stack) TargetGroupARN() string { return s.targetGroupARN } +// IsComplete returns true if the stack status is a complete state. +func (s *Stack) IsComplete() bool { + if s == nil { + return false + } + return isComplete(s.status) +} + // IsDeleteInProgress returns true if the stack has already a tag // deleteScheduled. func (s *Stack) IsDeleteInProgress() bool { @@ -290,10 +299,8 @@ func getCFStackByName(svc cloudformationiface.CloudFormationAPI, stackName strin var stack *cloudformation.Stack for _, s := range resp.Stacks { - if isComplete(s.StackStatus) { - stack = s - break - } + stack = s + break } if stack == nil { return nil, ErrLoadBalancerStackNotReady @@ -314,14 +321,15 @@ func mapToManagedStack(stack *cloudformation.Stack) *Stack { scheme: parameters[parameterLoadBalancerSchemeParameter], certificateARN: tags[certificateARNTag], tags: tags, + status: aws.StringValue(stack.StackStatus), } } // isComplete returns false by design on all other status, because // updateIngress will ignore not completed stacks. // Stack can never be in rollback state by design. -func isComplete(stackStatus *string) bool { - switch aws.StringValue(stackStatus) { +func isComplete(stackStatus string) bool { + switch stackStatus { case cloudformation.StackStatusCreateComplete: return true case cloudformation.StackStatusUpdateComplete: @@ -336,10 +344,6 @@ func findManagedStacks(svc cloudformationiface.CloudFormationAPI, clusterID stri err := svc.DescribeStacksPages(&cloudformation.DescribeStacksInput{}, func(page *cloudformation.DescribeStacksOutput, lastPage bool) bool { for _, s := range page.Stacks { - if !isComplete(s.StackStatus) { - continue - } - if isManagedStack(s.Tags, clusterID) { stacks = append(stacks, mapToManagedStack(s)) } diff --git a/aws/cf_test.go b/aws/cf_test.go index f636df41..f2a16d29 100644 --- a/aws/cf_test.go +++ b/aws/cf_test.go @@ -106,7 +106,7 @@ func TestStackReadiness(t *testing.T) { {"dummy-status", false}, } { t.Run(ti.given, func(t *testing.T) { - got := isComplete(aws.String(ti.given)) + got := isComplete(ti.given) if ti.want != got { t.Errorf("unexpected result. wanted %+v, got %+v", ti.want, got) } @@ -195,7 +195,7 @@ func TestConvertStackParameters(t *testing.T) { } -func TestFindingManagedStacks(t *testing.T) { +func TestFindManagedStacks(t *testing.T) { for _, ti := range []struct { name string given cfMockOutputs @@ -203,13 +203,14 @@ func TestFindingManagedStacks(t *testing.T) { wantErr bool }{ { - "successful-call", - cfMockOutputs{ + name: "successful-call", + given: cfMockOutputs{ describeStackPages: R(nil, nil), describeStacks: R(&cloudformation.DescribeStacksOutput{ Stacks: []*cloudformation.Stack{ { - StackName: aws.String("managed-stack-not-ready"), + StackName: aws.String("managed-stack-not-ready"), + StackStatus: aws.String(cloudformation.StackStatusUpdateInProgress), Tags: []*cloudformation.Tag{ cfTag(kubernetesCreatorTag, kubernetesCreatorValue), cfTag(clusterIDTagPrefix+"test-cluster", resourceLifecycleOwned), @@ -234,7 +235,8 @@ func TestFindingManagedStacks(t *testing.T) { }, }, { - StackName: aws.String("managed-stack-not-ready"), + StackName: aws.String("managed-stack-not-ready"), + StackStatus: aws.String(cloudformation.StackStatusUpdateInProgress), Tags: []*cloudformation.Tag{ cfTag(kubernetesCreatorTag, kubernetesCreatorValue), cfTag(clusterIDTagPrefix+"test-cluster", resourceLifecycleOwned), @@ -262,7 +264,19 @@ func TestFindingManagedStacks(t *testing.T) { }, }, nil), }, - []*Stack{ + want: []*Stack{ + { + name: "managed-stack-not-ready", + dnsName: "example-notready.com", + certificateARN: "cert-arn", + targetGroupARN: "tg-arn", + tags: map[string]string{ + kubernetesCreatorTag: kubernetesCreatorValue, + clusterIDTagPrefix + "test-cluster": resourceLifecycleOwned, + certificateARNTag: "cert-arn", + }, + status: cloudformation.StackStatusUpdateInProgress, + }, { name: "managed-stack", dnsName: "example.com", @@ -273,13 +287,22 @@ func TestFindingManagedStacks(t *testing.T) { clusterIDTagPrefix + "test-cluster": resourceLifecycleOwned, certificateARNTag: "cert-arn", }, + status: cloudformation.StackStatusCreateComplete, + }, + { + name: "managed-stack-not-ready", + tags: map[string]string{ + kubernetesCreatorTag: kubernetesCreatorValue, + clusterIDTagPrefix + "test-cluster": resourceLifecycleOwned, + }, + status: cloudformation.StackStatusUpdateInProgress, }, }, - false, + wantErr: false, }, { - "no-ready-stacks", - cfMockOutputs{ + name: "no-ready-stacks", + given: cfMockOutputs{ describeStackPages: R(nil, nil), describeStacks: R(&cloudformation.DescribeStacksOutput{ Stacks: []*cloudformation.Stack{ @@ -310,8 +333,29 @@ func TestFindingManagedStacks(t *testing.T) { }, }, nil), }, - []*Stack{}, - true, + want: []*Stack{ + { + name: "managed-stack-not-ready", + dnsName: "example-notready.com", + targetGroupARN: "tg-arn", + tags: map[string]string{ + kubernetesCreatorTag: kubernetesCreatorValue, + clusterIDTagPrefix + "test-cluster": resourceLifecycleOwned, + }, + status: cloudformation.StackStatusReviewInProgress, + }, + { + name: "managed-stack", + dnsName: "example.com", + targetGroupARN: "tg-arn", + tags: map[string]string{ + kubernetesCreatorTag: kubernetesCreatorValue, + clusterIDTagPrefix + "test-cluster": resourceLifecycleOwned, + }, + status: cloudformation.StackStatusRollbackComplete, + }, + }, + wantErr: false, }, { "failed-paging", @@ -343,18 +387,94 @@ func TestFindingManagedStacks(t *testing.T) { t.Errorf("unexpected result. wanted %+v, got %+v", ti.want, got) } } + }) + } +} +func TestGetStack(t *testing.T) { + for _, ti := range []struct { + name string + given cfMockOutputs + want *Stack + wantErr bool + }{ + { + name: "successful-call", + given: cfMockOutputs{ + describeStackPages: R(nil, nil), + describeStacks: R(&cloudformation.DescribeStacksOutput{ + Stacks: []*cloudformation.Stack{ + { + StackName: aws.String("managed-stack"), + StackStatus: aws.String(cloudformation.StackStatusCreateComplete), + Tags: []*cloudformation.Tag{ + cfTag(kubernetesCreatorTag, kubernetesCreatorValue), + cfTag(clusterIDTagPrefix+"test-cluster", resourceLifecycleOwned), + cfTag(certificateARNTag, "cert-arn"), + }, + Outputs: []*cloudformation.Output{ + {OutputKey: aws.String(outputLoadBalancerDNSName), OutputValue: aws.String("example.com")}, + {OutputKey: aws.String(outputTargetGroupARN), OutputValue: aws.String("tg-arn")}, + }, + }, + }, + }, nil), + }, + want: &Stack{ + name: "managed-stack", + dnsName: "example.com", + certificateARN: "cert-arn", + targetGroupARN: "tg-arn", + tags: map[string]string{ + kubernetesCreatorTag: kubernetesCreatorValue, + clusterIDTagPrefix + "test-cluster": resourceLifecycleOwned, + certificateARNTag: "cert-arn", + }, + status: cloudformation.StackStatusCreateComplete, + }, + wantErr: false, + }, + { + name: "no-ready-stacks", + given: cfMockOutputs{ + describeStackPages: R(nil, nil), + describeStacks: R(&cloudformation.DescribeStacksOutput{ + Stacks: []*cloudformation.Stack{}, + }, nil), + }, + want: nil, + wantErr: true, + }, + { + "failed-paging", + cfMockOutputs{ + describeStackPages: R(nil, dummyErr), + describeStacks: R(&cloudformation.DescribeStacksOutput{}, nil), + }, + nil, + true, + }, + { + "failed-describe-page", + cfMockOutputs{ + describeStacks: R(nil, dummyErr), + }, + nil, + true, + }, + } { + t.Run(ti.name, func(t *testing.T) { + c := &mockCloudFormationClient{outputs: ti.given} s, err := getStack(c, "dontcare") if err != nil { if !ti.wantErr { t.Error("unexpected error", err) } } else { - if !reflect.DeepEqual(ti.want[0], s) { - t.Errorf("unexpected result. wanted %+v, got %+v", ti.want[0], got) + if !reflect.DeepEqual(ti.want, s) { + t.Errorf("unexpected result. wanted %+v, got %+v", ti.want, s) } } - }) } } @@ -478,3 +598,15 @@ func TestDeleteTime(t *testing.T) { } } + +func TestIsComplete(t *testing.T) { + var s *Stack + if s.IsComplete() { + t.Errorf("expected the stack to not be complete") + } + + s = &Stack{status: cloudformation.StackStatusCreateComplete} + if !s.IsComplete() { + t.Errorf("expected the stack to be complete") + } +} diff --git a/controller.go b/controller.go index e94e32fe..5cd42730 100644 --- a/controller.go +++ b/controller.go @@ -28,7 +28,6 @@ var ( healthCheckPort uint healthcheckInterval time.Duration metricsAddress string - updateStackInterval time.Duration ) func loadSettings() error { @@ -37,7 +36,6 @@ func loadSettings() error { "server base url. If empty will try to use the configuration from the running cluster, else it will use InsecureConfig, that does not use encryption or authentication (use case to develop with kubectl proxy).") flag.DurationVar(&pollingInterval, "polling-interval", 30*time.Second, "sets the polling interval for "+ "ingress resources. The flag accepts a value acceptable to time.ParseDuration") - flag.DurationVar(&updateStackInterval, "update-stack-interval", 1*time.Hour, "sets the interval for update AWS ALB stack resources, which can fix migrations, if you add for example one subnet to your VPC your ALB has not interface in that. An update stack will add these interfaces automatically. The flag accepts a value acceptable to time.ParseDuration") flag.StringVar(&cfCustomTemplate, "cf-custom-template", "", "filename for a custom cloud formation template to use instead of the built in") flag.DurationVar(&creationTimeout, "creation-timeout", aws.DefaultCreationTimeout, @@ -64,9 +62,6 @@ func loadSettings() error { if err := loadDurationFromEnv("POLLING_INTERVAL", &pollingInterval); err != nil { return err } - if err := loadDurationFromEnv("UPDATE_STACK_INTERVAL", &updateStackInterval); err != nil { - return err - } if err := loadDurationFromEnv("CREATION_TIMEOUT", &creationTimeout); err != nil { return err @@ -168,7 +163,7 @@ func main() { go serveMetrics(metricsAddress) quitCH := make(chan struct{}) - go startPolling(quitCH, certificatesProvider, awsAdapter, kubeAdapter, pollingInterval, updateStackInterval) + go startPolling(quitCH, certificatesProvider, awsAdapter, kubeAdapter, pollingInterval) <-quitCH log.Printf("terminating %s", os.Args[0]) diff --git a/worker.go b/worker.go index 750879e9..996c6863 100644 --- a/worker.go +++ b/worker.go @@ -23,10 +23,12 @@ import ( type managedItem struct { ingresses []*kubernetes.Ingress stack *aws.Stack + Update bool } const ( ready int = iota + update missing marktodelete orphan @@ -46,6 +48,9 @@ func (item *managedItem) Status() int { if len(item.ingresses) != 0 && item.stack == nil { return missing } + if item.Update && item.stack.IsComplete() { + return update + } return ready } @@ -55,9 +60,8 @@ func waitForTerminationSignals(signals ...os.Signal) chan os.Signal { return c } -func startPolling(quitCH chan struct{}, certsProvider certs.CertificatesProvider, awsAdapter *aws.Adapter, kubeAdapter *kubernetes.Adapter, pollingInterval, updateStackInterval time.Duration) { +func startPolling(quitCH chan struct{}, certsProvider certs.CertificatesProvider, awsAdapter *aws.Adapter, kubeAdapter *kubernetes.Adapter, pollingInterval time.Duration) { items := make(chan *managedItem, maxTargetGroupSupported) - go updateStacks(awsAdapter, updateStackInterval, items) for { log.Printf("Start polling sleep %s", pollingInterval) select { @@ -72,30 +76,6 @@ func startPolling(quitCH chan struct{}, certsProvider certs.CertificatesProvider } } -func updateStacks(awsAdapter *aws.Adapter, interval time.Duration, items <-chan *managedItem) { - for { - itemsMap := map[string]*managedItem{} - done := make(chan struct{}) - go func() { - for { - select { - case item := <-items: - if _, ok := itemsMap[item.stack.CertificateARN()]; !ok { - itemsMap[item.stack.CertificateARN()] = item - } - case <-done: - return - } - } - }() - time.Sleep(interval) - done <- struct{}{} - for _, item := range itemsMap { - updateStack(awsAdapter, item) - } - } -} - func doWork(certsProvider certs.CertificatesProvider, awsAdapter *aws.Adapter, kubeAdapter *kubernetes.Adapter, items chan<- *managedItem) error { defer func() error { if r := recover(); r != nil { @@ -140,7 +120,9 @@ func doWork(certsProvider certs.CertificatesProvider, awsAdapter *aws.Adapter, k createStack(awsAdapter, managedItem) updateIngress(kubeAdapter, managedItem) case ready: - items <- managedItem + updateIngress(kubeAdapter, managedItem) + case update: + updateStack(awsAdapter, managedItem) updateIngress(kubeAdapter, managedItem) } } @@ -175,8 +157,12 @@ func buildManagedModel(certsProvider certs.CertificatesProvider, ingresses []*ku } if item, ok := model[certificateARN+"/"+ingress.Scheme()]; ok { item.ingresses = append(item.ingresses, ingress) + item.Update = true } else { - model[certificateARN+"/"+ingress.Scheme()] = &managedItem{ingresses: []*kubernetes.Ingress{ingress}} + model[certificateARN+"/"+ingress.Scheme()] = &managedItem{ + ingresses: []*kubernetes.Ingress{ingress}, + Update: true, + } } } return model