From 289884b551d1d7165bb7df84bd0d4827e8b0cbf6 Mon Sep 17 00:00:00 2001 From: Dmitry Zbarski Date: Wed, 24 Jan 2018 10:54:28 +0200 Subject: [PATCH] Multiple Auto Scaling Groups and Instances not in Auto Scaling Group management (#118) * Add support for multiple AutoScaling groups #116 * Add suport for instances that are not part of Auto Scaling Group #116 * Document multiple ASGs and single instances features. * Skip target group update if there are no single instances #116 * When fetching nodes list from Kubernetes, filter out master nodes #116 * Minor fixes to documentation, coding style and replacement of some loops with calls to AWS API. * Fetch instances from EC2 instead of fetching them from Kubernetes * Aggregate calls to DescribeAutoScaleGroups when fetching multiple ASGs information * Remove Auto Scaling Group detection during start up to allow starting on instance not in ASG * Update documentation to match that instances are fetched from EC2 and not Kubernetes * Remove redundant kubernetes code that lists nodes * Change default filter to use tag kubernetes.io/cluster/ instead of KubernetesCluster * Update documentation for default filters * Minor fixes to documentation and code readability * Use DescribeInstancesPages in getInstancesDetailsWithFilters to fetch all instances * When fetching list of ASGs, do not send duplicate ASG names in call to getAutoScalingGroupsByName * Rename CUSTOM_TAG_FILTER env variable to CUSTOM_FILTERS and clarify documentation about it * Change pointer receiver to value receiver in a few places * Fix typo in README * Rename constant DIP_SPLIT_SIZE to dipSplitSize --- README.md | 37 ++- aws/adapter.go | 238 ++++++++++++++--- aws/adapter_test.go | 533 +++++++++++++++++++++++++++++++++++++ aws/asg.go | 34 +++ aws/asg_test.go | 83 ++++++ aws/ec2.go | 44 ++- aws/ec2_test.go | 87 +++++- aws/ec2mock_test.go | 43 ++- aws/elbv2.go | 53 ++++ aws/elbv2_test.go | 206 ++++++++++++++ aws/elbv2mock_test.go | 42 +++ controller.go | 2 +- kubernetes/adapter.go | 17 +- kubernetes/adapter_test.go | 2 +- worker.go | 10 + 15 files changed, 1370 insertions(+), 61 deletions(-) create mode 100644 aws/elbv2.go create mode 100644 aws/elbv2_test.go create mode 100644 aws/elbv2mock_test.go diff --git a/README.md b/README.md index a99db039..08f250e7 100644 --- a/README.md +++ b/README.md @@ -21,6 +21,8 @@ This information is used to manage AWS resources for each ingress objects of the - Automatic forwarding of requests to all Worker Nodes, even with auto scaling - Automatic cleanup of unnecessary managed resources - Support for internet-facing and internal load balancers +- Support for multiple Auto Scaling Groups +- Support for instances that are not part of Auto Scaling Group - Can be used in clusters created by [Kops](https://github.com/kubernetes/kops), see our [deployment guide for Kops](deploy/kops.md) ## Upgrade @@ -87,21 +89,42 @@ This is achieved using AWS CloudFormation. For more details check our [CloudForm The controller *will not* manage the security groups required to allow access from the Internet to the load balancers. It assumes that their lifecycle is external to the controller itself. -### Discovery +During startup phase EC2 filters are constructed as follows: -On startup, the controller discovers the AWS resources required for the controller operations: +* If `CUSTOM_FILTERS` environment variable is set, it is used to generate filters that are later used + to fetch instances from EC2. +* If `CUSTOM_FILTERS` environment variable is not set or could not be parsed, then default + filters are `tag:kubernetes.io/cluster/=owned tag-key=k8s.io/role/node` where `` + is determined from EC2 tags of instance on which Ingress Controller pod is started. + +`CUSTOM_FILTERS` is a list of filters separated by spaces. Each filter has a form of `name=value` where name is one +of names that are recognized by the EC2 API (you can find list [here](https://docs.aws.amazon.com/AWSEC2/latest/APIReference/API_DescribeInstances.html)) +and value is value of a filter. For example: + +* `tag-key=test` will filter instances that has tag named `test`. +* `vpc-id=vpc-1a2b3c4d` will filter instances that belong to specific VPC. +* Default filter `tag:kubernetes.io/cluster/=owned tag-key=k8s.io/role/node` filters instances + that has tag `kubernetes.io/cluster/` with value `owned` and have tag named `tag-key=k8s.io/role/node`. -1. The AutoScalingGroup +Every poll cycle EC2 is queried with filters that were constructed during startup. +Each new discovered instance is scanned for Auto Scaling Group tag. Each Target +Group created by this Ingress controller is then added to each known Auto Scaling Group. +Each Auto Scaling Group information is fetched only once when first node of it is discovered for first time. +If instance does not belong to Auto Scaling Group (does not have `aws:autoscaling:groupName` tag) it is stored in separate list of +Single Instances. On each cycle instances on this list are registered as targets in all Target Groups managed by this controller. +If call to get instances from EC2 did not return previously known Single Instance, it is deregistered from Target Group and removed from list of Single Instances. +Call to deregister instances is aggregated so that maximum 1 call to deregister is issued in poll cycle. - Simple lookup of the Auto Scaling Group that name matches the `aws:autoscaling:groupName` tag from the - EC2 instance running the controller. +### Discovery + +On startup, the controller discovers the AWS resources required for the controller operations: -2. The Security Group +1. The Security Group Lookup of the `kubernetes.io/cluster/` tag of the Security Group matching the clusterID for the controller node and `kubernetes:application` matching the value `kube-ingress-aws-controller` or as fallback for `", ErrMissingTag } + +// UpdateAutoScalingGroupsAndInstances updates list of known ASGs and EC2 instances. +func (a *Adapter) UpdateAutoScalingGroupsAndInstances() error { + var err error + a.ec2Details, err = getInstancesDetailsWithFilters(a.ec2, a.manifest.filters) + if err != nil { + return err + } + + newSingleInstances := make(map[string]*instanceDetails) + for instanceID, details := range a.singleInstances { + if _, ok := a.ec2Details[instanceID]; !ok { + // Instance does not exist on EC2 anymore, add it to list of obsolete instances + a.obsoleteInstances = append(a.obsoleteInstances, instanceID) + } else { + // Instance exists, so keep it in the list of single instances + newSingleInstances[instanceID] = details + } + } + a.singleInstances = newSingleInstances + + // update ASGs (create new map to get rid of deleted ASGs) + newAutoScalingGroups := make(map[string]*autoScalingGroupDetails) + autoScalingGroupsToFetchMap := make(map[string]bool) + for instanceID, details := range a.ec2Details { + asgName, err := getAutoScalingGroupName(details.tags) + if err != nil { + // Instance is not in ASG, save in single instances list. + a.singleInstances[instanceID] = details + continue + } + if _, ok := newAutoScalingGroups[asgName]; !ok { + if _, ok := a.autoScalingGroups[asgName]; ok { + newAutoScalingGroups[asgName] = a.autoScalingGroups[asgName] + } else { + // Save ASGs that have to be loaded to load all of them in one API call + autoScalingGroupsToFetchMap[asgName] = true + } + } + } + + autoScalingGroupsToFetch := make([]string, 0, len(autoScalingGroupsToFetchMap)) + for asgName := range autoScalingGroupsToFetchMap { + autoScalingGroupsToFetch = append(autoScalingGroupsToFetch, asgName) + } + + if len(autoScalingGroupsToFetch) != 0 { + fetchedAutoScalingGroups, err := getAutoScalingGroupsByName(a.autoscaling, autoScalingGroupsToFetch) + if err != nil { + log.Printf("failed fetching Auto Scaling Groups details: %v", err) + } else { + for name, asg := range fetchedAutoScalingGroups { + newAutoScalingGroups[name] = asg + } + } + } + + a.autoScalingGroups = newAutoScalingGroups + return nil +} + +// Create EC2 filter that will be used to filter instances when calling DescribeInstances +// later on each cycle. Filter is based on value of customTagFilterEnvVarName environment +// veriable. If it is undefined or could not be parsed, default filter is returned which +// filters on kubernetesClusterTag tag value and kubernetesNodeRoleTag existance. +func parseFilters(clusterId string) []*ec2.Filter { + if filter, ok := os.LookupEnv(customTagFilterEnvVarName); ok { + terms := strings.Fields(filter) + filters := make([]*ec2.Filter, len(terms)) + for i, term := range terms { + parts := strings.Split(term, "=") + if len(parts) != 2 { + log.Printf("failed parsing %s, falling back to default", customTagFilterEnvVarName) + return generateDefaultFilters(clusterId) + } + filters[i] = &ec2.Filter{ + Name: aws.String(parts[0]), + Values: aws.StringSlice(strings.Split(parts[1], ",")), + } + } + return filters + } + return generateDefaultFilters(clusterId) +} + +// Generate default EC2 filter for usage with ECs DescribeInstances call based on EC2 tags +// of instance where Ingress Controller pod was started. +func generateDefaultFilters(clusterId string) []*ec2.Filter { + return []*ec2.Filter{ + { + Name: aws.String("tag:" + clusterIDTagPrefix + clusterId), + Values: []*string{aws.String(resourceLifecycleOwned)}, + }, + { + Name: aws.String("tag-key"), + Values: []*string{aws.String(kubernetesNodeRoleTag)}, + }, + } +} diff --git a/aws/adapter_test.go b/aws/adapter_test.go index aa4b6a98..4ff9071a 100644 --- a/aws/adapter_test.go +++ b/aws/adapter_test.go @@ -1,12 +1,545 @@ package aws import ( + "fmt" + "os" + "reflect" "sort" "testing" + "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/service/ec2" "github.com/aws/aws-sdk-go/service/elbv2" ) +func TesGenerateDefaultFilters(tt *testing.T) { + for _, test := range []struct { + name string + clusterId string + }{ + { + "empty-cluster-id", + "", + }, + { + "set1", + "test1", + }, + { + "set2", + "test2+test2", + }, + { + "set3", + "= = ", + }, + } { + tt.Run(fmt.Sprintf("%v", test.name), func(t *testing.T) { + tt.Log(test.name) + filters := generateDefaultFilters(test.clusterId) + if len(filters) != 2 { + t.Errorf("generateDefaultFilters returned %d filters instead of 2", len(filters)) + } + if aws.StringValue(filters[0].Name) != "tag:"+clusterIDTagPrefix+test.clusterId { + t.Errorf("generateDefaultFilters first filter has wrong name %s", aws.StringValue(filters[0].Name)) + } + if len(filters[0].Values) != 1 { + t.Errorf("generateDefaultFilters first filter has %d values instead of 1", len(filters[0].Values)) + } + if aws.StringValue(filters[0].Values[0]) != resourceLifecycleOwned { + t.Errorf("generateDefaultFilters first filter has wrong value %s", aws.StringValue(filters[0].Values[0])) + } + if aws.StringValue(filters[1].Name) != "tag-key" { + t.Errorf("generateDefaultFilters second filter has wrong name %s", aws.StringValue(filters[1].Name)) + } + if len(filters[1].Values) != 1 { + t.Errorf("generateDefaultFilters second filter has %d values instead of 1", len(filters[1].Values)) + } + if aws.StringValue(filters[1].Values[0]) != kubernetesNodeRoleTag { + t.Errorf("generateDefaultFilters second filter has wrong value %s", aws.StringValue(filters[1].Values[0])) + } + }) + } +} + +func TestParseFilters(tt *testing.T) { + for _, test := range []struct { + name string + customFilter *string + clusterId string + expectedFilters []*ec2.Filter + }{ + { + "no-custom-filter", + nil, + "cluster", + []*ec2.Filter{ + { + Name: aws.String("tag:" + clusterIDTagPrefix + "cluster"), + Values: aws.StringSlice([]string{resourceLifecycleOwned}), + }, + { + Name: aws.String("tag-key"), + Values: aws.StringSlice([]string{kubernetesNodeRoleTag}), + }, + }, + }, + { + "custom-filter1", + aws.String("tag:Test=test"), + "cluster", + []*ec2.Filter{ + { + Name: aws.String("tag:Test"), + Values: aws.StringSlice([]string{"test"}), + }, + }, + }, + { + "custom-filter2", + aws.String("tag:Test=test vpc-id=id1,id2"), + "cluster", + []*ec2.Filter{ + { + Name: aws.String("tag:Test"), + Values: aws.StringSlice([]string{"test"}), + }, + { + Name: aws.String("vpc-id"), + Values: aws.StringSlice([]string{"id1", "id2"}), + }, + }, + }, + { + "custom-filter3", + aws.String("tag:Test=test tag:Test=test1,test2 tag-key=key1,key2,key3"), + "cluster", + []*ec2.Filter{ + { + Name: aws.String("tag:Test"), + Values: aws.StringSlice([]string{"test"}), + }, + { + Name: aws.String("tag:Test"), + Values: aws.StringSlice([]string{"test1", "test2"}), + }, + { + Name: aws.String("tag-key"), + Values: aws.StringSlice([]string{"key1", "key2", "key3"}), + }, + }, + }, + { + "illegal1", + aws.String("test"), + "cluster", + []*ec2.Filter{ + { + Name: aws.String("tag:" + clusterIDTagPrefix + "cluster"), + Values: aws.StringSlice([]string{resourceLifecycleOwned}), + }, + { + Name: aws.String("tag-key"), + Values: aws.StringSlice([]string{kubernetesNodeRoleTag}), + }, + }, + }, + } { + tt.Run(fmt.Sprintf("%v", test.name), func(t *testing.T) { + tt.Log(test.name) + if test.customFilter != nil { + os.Setenv(customTagFilterEnvVarName, *test.customFilter) + } else { + os.Unsetenv(customTagFilterEnvVarName) + } + output := parseFilters(test.clusterId) + if !reflect.DeepEqual(test.expectedFilters, output) { + t.Errorf("unexpected result. wanted %q, got %q", test.expectedFilters, output) + } + }) + } +} + +func TestFiltersString(tt *testing.T) { + for _, test := range []struct { + name string + filters []*ec2.Filter + str string + }{ + { + "test1", + []*ec2.Filter{ + { + Name: aws.String("tag:" + clusterIDTagPrefix + "cluster"), + Values: aws.StringSlice([]string{resourceLifecycleOwned}), + }, + { + Name: aws.String("tag-key"), + Values: aws.StringSlice([]string{kubernetesNodeRoleTag}), + }, + }, + "tag:" + clusterIDTagPrefix + "cluster=" + resourceLifecycleOwned + " tag-key=" + kubernetesNodeRoleTag, + }, + { + "test2", + []*ec2.Filter{ + { + Name: aws.String("tag:Test"), + Values: aws.StringSlice([]string{"test"}), + }, + }, + "tag:Test=test", + }, + { + "custom-filter2", + []*ec2.Filter{ + { + Name: aws.String("tag:Test"), + Values: aws.StringSlice([]string{"test"}), + }, + { + Name: aws.String("vpc-id"), + Values: aws.StringSlice([]string{"id1", "id2"}), + }, + }, + "tag:Test=test vpc-id=id1,id2", + }, + { + "custom-filter3", + []*ec2.Filter{ + { + Name: aws.String("tag:Test"), + Values: aws.StringSlice([]string{"test"}), + }, + { + Name: aws.String("tag:Test"), + Values: aws.StringSlice([]string{"test1", "test2"}), + }, + { + Name: aws.String("tag-key"), + Values: aws.StringSlice([]string{"key1", "key2", "key3"}), + }, + }, + "tag:Test=test tag:Test=test1,test2 tag-key=key1,key2,key3", + }, + } { + tt.Run(fmt.Sprintf("%v", test.name), func(t *testing.T) { + tt.Log(test.name) + a := Adapter{manifest: &manifest{filters: test.filters}} + if test.str != a.FiltersString() { + t.Errorf("filter string validation failure. wanted %q, got %q", test.str, a.FiltersString()) + } + }) + } +} + +func TestUpdateAutoScalingGroupsAndInstances(tt *testing.T) { + a := Adapter{ + ec2Details: map[string]*instanceDetails{}, + autoScalingGroups: make(map[string]*autoScalingGroupDetails), + singleInstances: make(map[string]*instanceDetails), + obsoleteInstances: make([]string, 0), + manifest: &manifest{}, + } + for _, test := range []struct { + name string + ec2responses ec2MockOutputs + asgresponses autoscalingMockOutputs + cacheSize int + wantAsgs []string + wantSingleInstances []string + wantRunningSingleInstances []string + wantObsoleteInstances []string + wantError bool + }{ + { + "initial", + ec2MockOutputs{describeInstancesPages: mockDIPOutput( + nil, + testInstance{id: "foo0", tags: tags{"aws:autoscaling:groupName": "asg1"}, privateIp: "1.2.3.3", vpcId: "1", state: runningState}, + testInstance{id: "foo1", tags: tags{"aws:autoscaling:groupName": "asg1"}, privateIp: "1.2.3.4", vpcId: "1", state: runningState}, + )}, + autoscalingMockOutputs{ + describeAutoScalingGroups: R(mockDASGOutput(map[string]asgtags{"asg1": {"foo": "bar"}}), nil), + }, + 2, + []string{"asg1"}, + []string{}, + []string{}, + []string{}, + false, + }, + { + "add-node-same-asg", + ec2MockOutputs{describeInstancesPages: mockDIPOutput( + nil, + testInstance{id: "foo0", tags: tags{"aws:autoscaling:groupName": "asg1"}, privateIp: "1.2.3.3", vpcId: "1", state: runningState}, + testInstance{id: "foo1", tags: tags{"aws:autoscaling:groupName": "asg1"}, privateIp: "1.2.3.4", vpcId: "1", state: 0}, + testInstance{id: "foo2", tags: tags{"aws:autoscaling:groupName": "asg1"}, privateIp: "1.2.3.5", vpcId: "1", state: runningState}, + )}, + autoscalingMockOutputs{}, + 3, + []string{"asg1"}, + []string{}, + []string{}, + []string{}, + false, + }, + { + "add-node-second-asg", + ec2MockOutputs{describeInstancesPages: mockDIPOutput( + nil, + testInstance{id: "foo0", tags: tags{"aws:autoscaling:groupName": "asg1"}, privateIp: "1.2.3.3", vpcId: "1", state: runningState}, + testInstance{id: "foo1", tags: tags{"aws:autoscaling:groupName": "asg1"}, privateIp: "1.2.3.4", vpcId: "1", state: runningState}, + testInstance{id: "foo2", tags: tags{"aws:autoscaling:groupName": "asg1"}, privateIp: "1.2.3.5", vpcId: "1", state: runningState}, + testInstance{id: "bar1", tags: tags{"aws:autoscaling:groupName": "asg2"}, privateIp: "2.1.1.1", vpcId: "1", state: runningState}, + )}, + autoscalingMockOutputs{ + describeAutoScalingGroups: R(mockDASGOutput(map[string]asgtags{"asg2": {"foo": "baz"}}), nil), + }, + 4, + []string{"asg1", "asg2"}, + []string{}, + []string{}, + []string{}, + false, + }, + { + "add-another-node-second-asg", + ec2MockOutputs{describeInstancesPages: mockDIPOutput( + nil, + testInstance{id: "foo0", tags: tags{"aws:autoscaling:groupName": "asg1"}, privateIp: "1.2.3.3", vpcId: "1", state: runningState}, + testInstance{id: "foo1", tags: tags{"aws:autoscaling:groupName": "asg1"}, privateIp: "1.2.3.4", vpcId: "1", state: runningState}, + testInstance{id: "foo2", tags: tags{"aws:autoscaling:groupName": "asg1"}, privateIp: "1.2.3.5", vpcId: "1", state: runningState}, + testInstance{id: "bar1", tags: tags{"aws:autoscaling:groupName": "asg2"}, privateIp: "2.1.1.1", vpcId: "1", state: runningState}, + testInstance{id: "bar2", tags: tags{"aws:autoscaling:groupName": "asg2"}, privateIp: "2.2.2.2", vpcId: "1", state: runningState}, + )}, + autoscalingMockOutputs{}, + 5, + []string{"asg1", "asg2"}, + []string{}, + []string{}, + []string{}, + false, + }, + { + "add-node-third-asg", + ec2MockOutputs{describeInstancesPages: mockDIPOutput( + nil, + testInstance{id: "foo0", tags: tags{"aws:autoscaling:groupName": "asg1"}, privateIp: "1.2.3.3", vpcId: "1", state: runningState}, + testInstance{id: "foo1", tags: tags{"aws:autoscaling:groupName": "asg1"}, privateIp: "1.2.3.4", vpcId: "1", state: runningState}, + testInstance{id: "foo2", tags: tags{"aws:autoscaling:groupName": "asg1"}, privateIp: "1.2.3.5", vpcId: "1", state: runningState}, + testInstance{id: "bar1", tags: tags{"aws:autoscaling:groupName": "asg2"}, privateIp: "2.1.1.1", vpcId: "1", state: runningState}, + testInstance{id: "bar2", tags: tags{"aws:autoscaling:groupName": "asg2"}, privateIp: "2.2.2.2", vpcId: "1", state: runningState}, + testInstance{id: "baz1", tags: tags{"aws:autoscaling:groupName": "asg3"}, privateIp: "3.1.1.1", vpcId: "1", state: runningState}, + )}, + autoscalingMockOutputs{ + describeAutoScalingGroups: R(mockDASGOutput(map[string]asgtags{"asg3": {"foo": "baz"}}), nil), + }, + 6, + []string{"asg1", "asg2", "asg3"}, + []string{}, + []string{}, + []string{}, + false, + }, + { + "add-node-without-asg", + ec2MockOutputs{describeInstancesPages: mockDIPOutput( + nil, + testInstance{id: "foo0", tags: tags{"aws:autoscaling:groupName": "asg1"}, privateIp: "1.2.3.3", vpcId: "1", state: runningState}, + testInstance{id: "foo1", tags: tags{"aws:autoscaling:groupName": "asg1"}, privateIp: "1.2.3.4", vpcId: "1", state: runningState}, + testInstance{id: "foo2", tags: tags{"aws:autoscaling:groupName": "asg1"}, privateIp: "1.2.3.5", vpcId: "1", state: runningState}, + testInstance{id: "bar1", tags: tags{"aws:autoscaling:groupName": "asg2"}, privateIp: "2.1.1.1", vpcId: "1", state: runningState}, + testInstance{id: "bar2", tags: tags{"aws:autoscaling:groupName": "asg2"}, privateIp: "2.2.2.2", vpcId: "1", state: runningState}, + testInstance{id: "baz1", tags: tags{"aws:autoscaling:groupName": "asg3"}, privateIp: "3.1.1.1", vpcId: "1", state: runningState}, + testInstance{id: "sgl1", tags: tags{"Name": "node1"}, privateIp: "0.1.1.1", vpcId: "1", state: runningState}, + )}, + autoscalingMockOutputs{}, + 7, + []string{"asg1", "asg2", "asg3"}, + []string{"sgl1"}, + []string{"sgl1"}, + []string{}, + false, + }, + { + "add-stopped-node-without-asg", + ec2MockOutputs{describeInstancesPages: mockDIPOutput( + nil, + testInstance{id: "foo0", tags: tags{"aws:autoscaling:groupName": "asg1"}, privateIp: "1.2.3.3", vpcId: "1", state: runningState}, + testInstance{id: "foo1", tags: tags{"aws:autoscaling:groupName": "asg1"}, privateIp: "1.2.3.4", vpcId: "1", state: runningState}, + testInstance{id: "foo2", tags: tags{"aws:autoscaling:groupName": "asg1"}, privateIp: "1.2.3.5", vpcId: "1", state: runningState}, + testInstance{id: "bar1", tags: tags{"aws:autoscaling:groupName": "asg2"}, privateIp: "2.1.1.1", vpcId: "1", state: runningState}, + testInstance{id: "bar2", tags: tags{"aws:autoscaling:groupName": "asg2"}, privateIp: "2.2.2.2", vpcId: "1", state: runningState}, + testInstance{id: "baz1", tags: tags{"aws:autoscaling:groupName": "asg3"}, privateIp: "3.1.1.1", vpcId: "1", state: runningState}, + testInstance{id: "sgl1", tags: tags{"Name": "node1"}, privateIp: "0.1.1.1", vpcId: "1", state: runningState}, + testInstance{id: "sgl2", tags: tags{"Name": "node2"}, privateIp: "0.1.1.2", vpcId: "1", state: stoppedState}, + )}, + autoscalingMockOutputs{}, + 8, + []string{"asg1", "asg2", "asg3"}, + []string{"sgl1", "sgl2"}, + []string{"sgl1"}, + []string{}, + false, + }, + { + "remove-third-asg-node-and-stopped-instance", + ec2MockOutputs{describeInstancesPages: mockDIPOutput( + nil, + testInstance{id: "foo0", tags: tags{"aws:autoscaling:groupName": "asg1"}, privateIp: "1.2.3.3", vpcId: "1", state: runningState}, + testInstance{id: "foo1", tags: tags{"aws:autoscaling:groupName": "asg1"}, privateIp: "1.2.3.4", vpcId: "1", state: runningState}, + testInstance{id: "foo2", tags: tags{"aws:autoscaling:groupName": "asg1"}, privateIp: "1.2.3.5", vpcId: "1", state: runningState}, + testInstance{id: "bar1", tags: tags{"aws:autoscaling:groupName": "asg2"}, privateIp: "2.1.1.1", vpcId: "1", state: runningState}, + testInstance{id: "bar2", tags: tags{"aws:autoscaling:groupName": "asg2"}, privateIp: "2.2.2.2", vpcId: "1", state: runningState}, + testInstance{id: "sgl1", tags: tags{"Name": "node1"}, privateIp: "0.1.1.1", vpcId: "1", state: runningState}, + )}, + autoscalingMockOutputs{}, + 6, + []string{"asg1", "asg2"}, + []string{"sgl1"}, + []string{"sgl1"}, + []string{"sgl2"}, + false, + }, + { + "error-fetching-instance", + ec2MockOutputs{describeInstancesPages: mockDIPOutput( + dummyErr, + testInstance{}, + )}, + autoscalingMockOutputs{}, + 6, + []string{"asg1", "asg2"}, + []string{"sgl1"}, + []string{"sgl1"}, + []string{"sgl2"}, + true, + }, + { + "error-fetching-asg", + ec2MockOutputs{describeInstancesPages: mockDIPOutput( + nil, + testInstance{id: "foo0", tags: tags{"aws:autoscaling:groupName": "asg1"}, privateIp: "1.2.3.3", vpcId: "1", state: runningState}, + testInstance{id: "foo1", tags: tags{"aws:autoscaling:groupName": "asg1"}, privateIp: "1.2.3.4", vpcId: "1", state: runningState}, + testInstance{id: "foo2", tags: tags{"aws:autoscaling:groupName": "asg1"}, privateIp: "1.2.3.5", vpcId: "1", state: runningState}, + testInstance{id: "bar1", tags: tags{"aws:autoscaling:groupName": "asg2"}, privateIp: "2.1.1.1", vpcId: "1", state: runningState}, + testInstance{id: "bar2", tags: tags{"aws:autoscaling:groupName": "asg2"}, privateIp: "2.2.2.2", vpcId: "1", state: runningState}, + testInstance{id: "sgl1", tags: tags{"Name": "node1"}, privateIp: "0.1.1.1", vpcId: "1", state: runningState}, + testInstance{id: "fail", tags: tags{"aws:autoscaling:groupName": "none"}, privateIp: "0.2.2.2", vpcId: "1", state: runningState}, + )}, + autoscalingMockOutputs{ + describeAutoScalingGroups: R(mockDASGOutput(map[string]asgtags{}), dummyErr), + }, + 7, + []string{"asg1", "asg2"}, + []string{"sgl1"}, + []string{"sgl1"}, + []string{"sgl2"}, + false, + }, + { + "add-back-third-asg", + ec2MockOutputs{describeInstancesPages: mockDIPOutput( + nil, + testInstance{id: "foo0", tags: tags{"aws:autoscaling:groupName": "asg1"}, privateIp: "1.2.3.3", vpcId: "1", state: runningState}, + testInstance{id: "foo1", tags: tags{"aws:autoscaling:groupName": "asg1"}, privateIp: "1.2.3.4", vpcId: "1", state: runningState}, + testInstance{id: "foo2", tags: tags{"aws:autoscaling:groupName": "asg1"}, privateIp: "1.2.3.5", vpcId: "1", state: runningState}, + testInstance{id: "bar1", tags: tags{"aws:autoscaling:groupName": "asg2"}, privateIp: "2.1.1.1", vpcId: "1", state: runningState}, + testInstance{id: "bar2", tags: tags{"aws:autoscaling:groupName": "asg2"}, privateIp: "2.2.2.2", vpcId: "1", state: runningState}, + testInstance{id: "baz1", tags: tags{"aws:autoscaling:groupName": "asg3"}, privateIp: "3.1.1.1", vpcId: "1", state: runningState}, + testInstance{id: "sgl1", tags: tags{"Name": "node1"}, privateIp: "0.1.1.1", vpcId: "1", state: runningState}, + )}, + autoscalingMockOutputs{ + describeAutoScalingGroups: R(mockDASGOutput(map[string]asgtags{"asg3": {"foo": "baz"}}), nil), + }, + 7, + []string{"asg1", "asg2", "asg3"}, + []string{"sgl1"}, + []string{"sgl1"}, + []string{"sgl2"}, + false, + }, + { + "remove-all-except-first-asg", + ec2MockOutputs{describeInstancesPages: mockDIPOutput( + nil, + testInstance{id: "foo0", tags: tags{"aws:autoscaling:groupName": "asg1"}, privateIp: "1.2.3.3", vpcId: "1", state: runningState}, + testInstance{id: "foo1", tags: tags{"aws:autoscaling:groupName": "asg1"}, privateIp: "1.2.3.4", vpcId: "1", state: runningState}, + testInstance{id: "foo2", tags: tags{"aws:autoscaling:groupName": "asg1"}, privateIp: "1.2.3.5", vpcId: "1", state: runningState}, + )}, + autoscalingMockOutputs{ + describeAutoScalingGroups: R(mockDASGOutput(map[string]asgtags{}), nil), + }, + 3, + []string{"asg1"}, + []string{}, + []string{}, + []string{"sgl2", "sgl1"}, + false, + }, + { + "add-remove-simultaneously", + ec2MockOutputs{describeInstancesPages: mockDIPOutput( + nil, + testInstance{id: "foo0", tags: tags{"aws:autoscaling:groupName": "asg1"}, privateIp: "1.2.3.3", vpcId: "1", state: runningState}, + testInstance{id: "sgl1", tags: tags{"Name": "node1"}, privateIp: "0.1.1.1", vpcId: "1", state: runningState}, + )}, + autoscalingMockOutputs{ + describeAutoScalingGroups: R(mockDASGOutput(map[string]asgtags{}), nil), + }, + 2, + []string{"asg1"}, + []string{"sgl1"}, + []string{"sgl1"}, + []string{"sgl2", "sgl1"}, + false, + }, + } { + tt.Run(fmt.Sprintf("%v", test.name), func(t *testing.T) { + a.ec2 = &mockEc2Client{outputs: test.ec2responses} + a.autoscaling = &mockAutoScalingClient{outputs: test.asgresponses} + err := a.UpdateAutoScalingGroupsAndInstances() + if test.wantError && err == nil { + t.Errorf("expected error, got nothing") + } + if !test.wantError && err != nil { + t.Errorf("unexpected error '%s'", err) + } + if !test.wantError && err == nil { + adapterSingleIds := a.SingleInstances() + adapterRunningIds := a.RunningSingleInstances() + adapterObsoleteIds := a.ObsoleteSingleInstances() + asgs := a.AutoScalingGroupNames() + sort.Strings(adapterSingleIds) + sort.Strings(adapterRunningIds) + sort.Strings(adapterObsoleteIds) + sort.Strings(asgs) + sort.Strings(test.wantSingleInstances) + sort.Strings(test.wantRunningSingleInstances) + sort.Strings(test.wantObsoleteInstances) + sort.Strings(test.wantAsgs) + if !reflect.DeepEqual(test.wantSingleInstances, adapterSingleIds) { + t.Errorf("unexpected singleInstances result. wanted %#v, got %#v", test.wantSingleInstances, adapterSingleIds) + } + if !reflect.DeepEqual(test.wantRunningSingleInstances, adapterRunningIds) { + t.Errorf("unexpected runningInstances result. wanted %#v, got %#v", test.wantRunningSingleInstances, adapterRunningIds) + } + if !reflect.DeepEqual(test.wantObsoleteInstances, adapterObsoleteIds) { + t.Errorf("unexpected obsoleteInstances result. wanted %#v, got %#v", test.wantObsoleteInstances, adapterObsoleteIds) + } + if !reflect.DeepEqual(test.wantAsgs, asgs) { + t.Errorf("unexpected autoScalingGroups result. wanted %+v, got %+v", test.wantAsgs, asgs) + } + if a.CachedInstances() != test.cacheSize { + t.Errorf("wrong cache size. wanted %d, got %d", test.cacheSize, a.CachedInstances()) + } + } + }) + } +} + func TestFindLBSubnets(tt *testing.T) { for _, test := range []struct { name string diff --git a/aws/asg.go b/aws/asg.go index bda75b29..fceb8cc7 100644 --- a/aws/asg.go +++ b/aws/asg.go @@ -48,6 +48,40 @@ func getAutoScalingGroupByName(service autoscalingiface.AutoScalingAPI, autoScal return nil, fmt.Errorf("auto scaling group %q not found", autoScalingGroupName) } +func getAutoScalingGroupsByName(service autoscalingiface.AutoScalingAPI, autoScalingGroupNames []string) (map[string]*autoScalingGroupDetails, error) { + params := &autoscaling.DescribeAutoScalingGroupsInput{ + AutoScalingGroupNames: aws.StringSlice(autoScalingGroupNames), + } + resp, err := service.DescribeAutoScalingGroups(params) + if err != nil { + return nil, err + } + + result := make(map[string]*autoScalingGroupDetails) + for _, g := range resp.AutoScalingGroups { + name := aws.StringValue(g.AutoScalingGroupName) + tags := make(map[string]string) + for _, td := range g.Tags { + tags[aws.StringValue(td.Key)] = aws.StringValue(td.Value) + } + result[name] = &autoScalingGroupDetails{ + name: name, + arn: aws.StringValue(g.AutoScalingGroupARN), + launchConfigurationName: aws.StringValue(g.LaunchConfigurationName), + targetGroups: aws.StringValueSlice(g.TargetGroupARNs), + tags: tags, + } + } + + for _, name := range autoScalingGroupNames { + if _, ok := result[name]; !ok { + return nil, fmt.Errorf("auto scaling group %q not found", name) + } + } + + return result, nil +} + func attachTargetGroupsToAutoScalingGroup(svc autoscalingiface.AutoScalingAPI, targetGroupARNs []string, autoScalingGroupName string) error { params := &autoscaling.AttachLoadBalancerTargetGroupsInput{ AutoScalingGroupName: aws.String(autoScalingGroupName), diff --git a/aws/asg_test.go b/aws/asg_test.go index 77e6303a..52f014fc 100644 --- a/aws/asg_test.go +++ b/aws/asg_test.go @@ -94,6 +94,89 @@ func TestGetAutoScalingGroupByName(t *testing.T) { } } +func TestGetAutoScalingGroupsByName(t *testing.T) { + for _, test := range []struct { + name string + givenNames []string + responses autoscalingMockOutputs + want map[string]*autoScalingGroupDetails + wantError bool + }{ + { + "success-call-single-asg", + []string{"foo"}, + autoscalingMockOutputs{ + describeAutoScalingGroups: R(mockDASGOutput(map[string]asgtags{"foo": {"bar": "baz"}}), nil), + }, + map[string]*autoScalingGroupDetails{ + "foo": mockAutoScalingGroupDetails("foo", map[string]string{"bar": "baz"}), + }, + false, + }, + { + "success-call-multiple-asg", + []string{"a", "d"}, + autoscalingMockOutputs{ + describeAutoScalingGroups: R(mockDASGOutput(map[string]asgtags{ + "a": {"b": "c"}, + "d": {"e": "f"}, + }), nil), + }, + map[string]*autoScalingGroupDetails{ + "a": mockAutoScalingGroupDetails("a", map[string]string{"b": "c"}), + "d": mockAutoScalingGroupDetails("d", map[string]string{"e": "f"}), + }, + false, + }, + { + "fail-to-match-single-asg", + []string{"miss"}, + autoscalingMockOutputs{ + describeAutoScalingGroups: R(mockDASGOutput(map[string]asgtags{"foo": {"bar": "baz"}}), nil), + }, + nil, + true, + }, + { + "fail-to-match-multiple-asg", + []string{"miss", "miss2"}, + autoscalingMockOutputs{ + describeAutoScalingGroups: R(mockDASGOutput(map[string]asgtags{ + "a": {"b": "c"}, + "d": {"e": "f"}, + }), nil), + }, + nil, + true, + }, + { + "autoscaling-api-failure", + []string{"dontcare"}, + autoscalingMockOutputs{describeAutoScalingGroups: R(nil, dummyErr)}, + nil, + true, + }, + } { + t.Run(fmt.Sprintf("%v", test.name), func(t *testing.T) { + mockSvc := &mockAutoScalingClient{outputs: test.responses} + got, err := getAutoScalingGroupsByName(mockSvc, test.givenNames) + + if test.wantError { + if err == nil { + t.Error("wanted an error but call seemed to have succeeded") + } + } else { + if err != nil { + t.Fatal("unexpected error", err) + } + if !reflect.DeepEqual(test.want, got) { + t.Errorf("unexpected result. wanted %v, got %v", test.want, got) + } + } + }) + } +} + func TestAttach(t *testing.T) { for _, test := range []struct { name string diff --git a/aws/ec2.go b/aws/ec2.go index a4a23053..b3e89d15 100644 --- a/aws/ec2.go +++ b/aws/ec2.go @@ -19,8 +19,10 @@ const ( kubernetesCreatorValue = "kube-ingress-aws-controller" autoScalingGroupNameTag = "aws:autoscaling:groupName" runningState = 16 // See https://github.com/aws/aws-sdk-go/blob/master/service/ec2/api.go, type InstanceState + stoppedState = 80 // See https://github.com/aws/aws-sdk-go/blob/master/service/ec2/api.go, type InstanceState elbRoleTagName = "kubernetes.io/role/elb" internalELBRoleTagName = "kubernetes.io/role/internal-elb" + kubernetesNodeRoleTag = "k8s.io/role/node" ) type securityGroupDetails struct { @@ -29,9 +31,11 @@ type securityGroupDetails struct { } type instanceDetails struct { - id string - vpcID string - tags map[string]string + id string + ip string + vpcID string + tags map[string]string + running bool } func (id *instanceDetails) clusterID() string { @@ -101,12 +105,40 @@ func getInstanceDetails(ec2Service ec2iface.EC2API, instanceID string) (*instanc } return &instanceDetails{ - id: aws.StringValue(i.InstanceId), - vpcID: aws.StringValue(i.VpcId), - tags: convertEc2Tags(i.Tags), + id: aws.StringValue(i.InstanceId), + ip: aws.StringValue(i.PrivateIpAddress), + vpcID: aws.StringValue(i.VpcId), + tags: convertEc2Tags(i.Tags), + running: aws.Int64Value(i.State.Code)&0xff == runningState, }, nil } +func getInstancesDetailsWithFilters(ec2Service ec2iface.EC2API, filters []*ec2.Filter) (map[string]*instanceDetails, error) { + params := &ec2.DescribeInstancesInput{ + Filters: filters, + } + result := make(map[string]*instanceDetails) + err := ec2Service.DescribeInstancesPages(params, func(resp *ec2.DescribeInstancesOutput, lastPage bool) bool { + for _, reservation := range resp.Reservations { + for _, instance := range reservation.Instances { + result[aws.StringValue(instance.InstanceId)] = &instanceDetails{ + id: aws.StringValue(instance.InstanceId), + ip: aws.StringValue(instance.PrivateIpAddress), + vpcID: aws.StringValue(instance.VpcId), + tags: convertEc2Tags(instance.Tags), + running: aws.Int64Value(instance.State.Code)&0xff == runningState, + } + } + } + return true + }) + if err != nil { + return nil, fmt.Errorf("failed getting instance list from EC2: %v", err) + } + + return result, nil +} + func findFirstRunningInstance(resp *ec2.DescribeInstancesOutput) (*ec2.Instance, error) { for _, reservation := range resp.Reservations { for _, instance := range reservation.Instances { diff --git a/aws/ec2_test.go b/aws/ec2_test.go index e713de32..bcde4d0c 100644 --- a/aws/ec2_test.go +++ b/aws/ec2_test.go @@ -4,6 +4,9 @@ import ( "fmt" "reflect" "testing" + + "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/service/ec2" ) func TestGetAutoScalingName(t *testing.T) { @@ -125,7 +128,7 @@ func TestGetInstanceDetails(t *testing.T) { ec2MockOutputs{describeInstances: R(mockDIOutput( testInstance{id: "foo", tags: tags{"bar": "baz"}, state: runningState}, ), nil)}, - &instanceDetails{id: "foo", tags: map[string]string{"bar": "baz"}}, + &instanceDetails{id: "foo", tags: map[string]string{"bar": "baz"}, running: true}, false, }, { @@ -221,6 +224,88 @@ func TestGetSubnets(t *testing.T) { } } +func TestGetInstancesDetailsWithFilters(t *testing.T) { + for _, test := range []struct { + name string + input []*ec2.Filter + responses ec2MockOutputs + want map[string]*instanceDetails + wantError bool + }{ + { + "success-call", + []*ec2.Filter{ + { + Name: aws.String("tag:KubernetesCluster"), + Values: []*string{ + aws.String("kube1"), + }, + }, + }, + ec2MockOutputs{describeInstancesPages: mockDIPOutput( + nil, + testInstance{id: "foo1", tags: tags{"bar": "baz"}, privateIp: "1.2.3.4", vpcId: "1", state: 16}, + testInstance{id: "foo2", tags: tags{"bar": "baz"}, privateIp: "1.2.3.5", vpcId: "1", state: 32}, + testInstance{id: "foo3", tags: tags{"aaa": "zzz"}, privateIp: "1.2.3.6", vpcId: "1", state: 80}, + )}, + map[string]*instanceDetails{ + "foo1": &instanceDetails{id: "foo1", tags: map[string]string{"bar": "baz"}, ip: "1.2.3.4", vpcID: "1", running: true}, + "foo2": &instanceDetails{id: "foo2", tags: map[string]string{"bar": "baz"}, ip: "1.2.3.5", vpcID: "1", running: false}, + "foo3": &instanceDetails{id: "foo3", tags: map[string]string{"aaa": "zzz"}, ip: "1.2.3.6", vpcID: "1", running: false}, + }, + false, + }, + { + "success-empty-filters", + []*ec2.Filter{}, + ec2MockOutputs{describeInstancesPages: mockDIPOutput( + nil, + testInstance{id: "foo1", tags: tags{"bar": "baz"}, privateIp: "1.2.3.4", vpcId: "1", state: 16}, + testInstance{id: "foo3", tags: tags{"aaa": "zzz"}, privateIp: "1.2.3.6", vpcId: "1", state: 80}, + )}, + map[string]*instanceDetails{ + "foo1": &instanceDetails{id: "foo1", tags: map[string]string{"bar": "baz"}, ip: "1.2.3.4", vpcID: "1", running: true}, + "foo3": &instanceDetails{id: "foo3", tags: map[string]string{"aaa": "zzz"}, ip: "1.2.3.6", vpcID: "1", running: false}, + }, + false, + }, + { + "success-empty-response", + []*ec2.Filter{ + { + Name: aws.String("vpc-id"), + Values: []*string{ + aws.String("some-vpc"), + }, + }, + }, + ec2MockOutputs{describeInstancesPages: mockDIPOutput(nil)}, + map[string]*instanceDetails{}, + false, + }, + { + "aws-api-fail", + []*ec2.Filter{ + { + Name: aws.String("tag-key"), + Values: []*string{ + aws.String("key1"), + }, + }, + }, + ec2MockOutputs{describeInstancesPages: mockDIPOutput(dummyErr, testInstance{})}, + nil, + true, + }, + } { + t.Run(fmt.Sprintf("%v", test.name), func(t *testing.T) { + ec2 := &mockEc2Client{outputs: test.responses} + got, err := getInstancesDetailsWithFilters(ec2, test.input) + assertResultAndError(t, test.want, got, test.wantError, err) + }) + } +} + func assertResultAndError(t *testing.T, want, got interface{}, wantError bool, err error) { if wantError { if err == nil { diff --git a/aws/ec2mock_test.go b/aws/ec2mock_test.go index 4dee52f6..15b77b1e 100644 --- a/aws/ec2mock_test.go +++ b/aws/ec2mock_test.go @@ -6,9 +6,12 @@ import ( "github.com/aws/aws-sdk-go/service/ec2/ec2iface" ) +const dipSplitSize = 2 + type ec2MockOutputs struct { describeSecurityGroups *apiResponse describeInstances *apiResponse + describeInstancesPages []*apiResponse describeSubnets *apiResponse describeRouteTables *apiResponse } @@ -32,6 +35,18 @@ func (m *mockEc2Client) DescribeInstances(*ec2.DescribeInstancesInput) (*ec2.Des return nil, m.outputs.describeInstances.err } +func (m *mockEc2Client) DescribeInstancesPages(params *ec2.DescribeInstancesInput, f func(*ec2.DescribeInstancesOutput, bool) bool) error { + for _, resp := range m.outputs.describeInstancesPages { + if out, ok := resp.response.(*ec2.DescribeInstancesOutput); ok { + f(out, true) + } + } + if len(m.outputs.describeInstancesPages) != 0 { + return m.outputs.describeInstancesPages[0].err + } + return nil +} + func (m *mockEc2Client) DescribeSubnets(*ec2.DescribeSubnetsInput) (*ec2.DescribeSubnetsOutput, error) { if out, ok := m.outputs.describeSubnets.response.(*ec2.DescribeSubnetsOutput); ok { return out, m.outputs.describeSubnets.err @@ -59,9 +74,11 @@ func mockDSGOutput(sgs map[string]string) *ec2.DescribeSecurityGroupsOutput { } type testInstance struct { - id string - tags tags - state int64 + id string + tags tags + privateIp string + vpcId string + state int64 } func mockDIOutput(mockedInstances ...testInstance) *ec2.DescribeInstancesOutput { @@ -72,15 +89,29 @@ func mockDIOutput(mockedInstances ...testInstance) *ec2.DescribeInstancesOutput tags = append(tags, &ec2.Tag{Key: aws.String(k), Value: aws.String(v)}) } instance := &ec2.Instance{ - InstanceId: aws.String(i.id), - Tags: tags, - State: &ec2.InstanceState{Code: aws.Int64(i.state)}, + InstanceId: aws.String(i.id), + Tags: tags, + State: &ec2.InstanceState{Code: aws.Int64(i.state)}, + PrivateIpAddress: aws.String(i.privateIp), + VpcId: aws.String(i.vpcId), } instances = append(instances, instance) } return &ec2.DescribeInstancesOutput{Reservations: []*ec2.Reservation{{Instances: instances}}} } +func mockDIPOutput(e error, mockedInstances ...testInstance) []*apiResponse { + pages := len(mockedInstances) / dipSplitSize + result := make([]*apiResponse, pages, pages+1) + for i := 0; i < pages; i++ { + result[i] = R(mockDIOutput(mockedInstances[i*dipSplitSize:(i+1)*dipSplitSize]...), e) + } + if len(mockedInstances)%dipSplitSize != 0 { + result = append(result, R(mockDIOutput(mockedInstances[pages*dipSplitSize:]...), e)) + } + return result +} + type testSubnet struct { id string az string diff --git a/aws/elbv2.go b/aws/elbv2.go new file mode 100644 index 00000000..c8808e05 --- /dev/null +++ b/aws/elbv2.go @@ -0,0 +1,53 @@ +package aws + +import ( + "fmt" + + "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/service/elbv2" + "github.com/aws/aws-sdk-go/service/elbv2/elbv2iface" +) + +func registerTargetsOnTargetGroups(svc elbv2iface.ELBV2API, targetGroupARNs []string, instances []string) error { + targets := make([]*elbv2.TargetDescription, len(instances)) + for i, instance := range instances { + targets[i] = &elbv2.TargetDescription{ + Id: aws.String(instance), + } + } + + for _, targetGroupARN := range targetGroupARNs { + input := &elbv2.RegisterTargetsInput{ + TargetGroupArn: aws.String(targetGroupARN), + Targets: targets, + } + + _, err := svc.RegisterTargets(input) + if err != nil { + return fmt.Errorf("unable to register instances %q in target group %s: %v", instances, targetGroupARN, err) + } + } + return nil +} + +func deregisterTargetsOnTargetGroups(svc elbv2iface.ELBV2API, targetGroupARNs []string, instances []string) error { + targets := make([]*elbv2.TargetDescription, len(instances)) + for i, instance := range instances { + targets[i] = &elbv2.TargetDescription{ + Id: aws.String(instance), + } + } + + for _, targetGroupARN := range targetGroupARNs { + input := &elbv2.DeregisterTargetsInput{ + TargetGroupArn: aws.String(targetGroupARN), + Targets: targets, + } + + _, err := svc.DeregisterTargets(input) + if err != nil { + return fmt.Errorf("unable to deregister instances %q in target group %s: %v", instances, targetGroupARN, err) + } + } + return nil +} diff --git a/aws/elbv2_test.go b/aws/elbv2_test.go new file mode 100644 index 00000000..85ec61b9 --- /dev/null +++ b/aws/elbv2_test.go @@ -0,0 +1,206 @@ +package aws + +import ( + "fmt" + "reflect" + "sort" + "testing" + + "github.com/aws/aws-sdk-go/aws" +) + +type registerTargetsOnTargetGroupsInputTest struct { + targetGroupARNs []string + instances []string +} + +type deregisterTargetsOnTargetGroupsInputTest struct { + targetGroupARNs []string + instances []string +} + +func TestRegisterTargetsOnTargetGroups(t *testing.T) { + outputsSuccess := elbv2MockOutputs{ + registerTargets: R(mockRTOutput(), nil), + } + outputsError := elbv2MockOutputs{ + registerTargets: R(mockRTOutput(), dummyErr), + } + + for _, test := range []struct { + name string + input registerTargetsOnTargetGroupsInputTest + outputs elbv2MockOutputs + wantError bool + }{ + { + "single-target-group", + registerTargetsOnTargetGroupsInputTest{ + targetGroupARNs: []string{"asg1"}, + instances: []string{"i1", "i2"}, + }, + outputsSuccess, + false, + }, + { + "multiple-target-groups", + registerTargetsOnTargetGroupsInputTest{ + targetGroupARNs: []string{"asg1", "asg2"}, + instances: []string{"i1", "i2"}, + }, + outputsSuccess, + false, + }, + { + "empty-input-no-error", + registerTargetsOnTargetGroupsInputTest{ + targetGroupARNs: []string{}, + instances: []string{}, + }, + outputsSuccess, + false, + }, + { + "error1", + registerTargetsOnTargetGroupsInputTest{ + targetGroupARNs: []string{"asg1"}, + instances: []string{"i1", "i2"}, + }, + outputsError, + true, + }, + { + "error2", + registerTargetsOnTargetGroupsInputTest{ + targetGroupARNs: []string{"asg1", "asg2"}, + instances: []string{"i1", "i2"}, + }, + outputsError, + true, + }, + } { + t.Run(fmt.Sprintf("%v", test.name), func(t *testing.T) { + svc := &mockElbv2Client{outputs: test.outputs} + err := registerTargetsOnTargetGroups(svc, test.input.targetGroupARNs, test.input.instances) + if test.wantError && err == nil { + t.Fatalf("expected error, got nothing") + } + if !test.wantError && err != nil { + t.Fatalf("unexpected error - %q", err) + } + if !test.wantError { + sort.Strings(test.input.targetGroupARNs) + sort.Strings(test.input.instances) + rtTargetsGroupARNs := make([]string, 0, len(test.input.targetGroupARNs)) + for _, input := range svc.rtinputs { + rtTargetsGroupARNs = append(rtTargetsGroupARNs, aws.StringValue(input.TargetGroupArn)) + rtInstances := make([]string, len(input.Targets)) + for j, tgt := range input.Targets { + rtInstances[j] = aws.StringValue(tgt.Id) + } + sort.Strings(rtInstances) + if !reflect.DeepEqual(rtInstances, test.input.instances) { + t.Errorf("unexpected set of registered instances. expected: %q, got: %q", test.input.instances, rtInstances) + } + } + sort.Strings(rtTargetsGroupARNs) + if !reflect.DeepEqual(rtTargetsGroupARNs, test.input.targetGroupARNs) { + t.Errorf("unexpected set of targetGroupARNs. expected: %q, got: %q", test.input.targetGroupARNs, rtTargetsGroupARNs) + } + } + }) + } +} + +func TestDeregisterTargetsOnTargetGroups(t *testing.T) { + outputsSuccess := elbv2MockOutputs{ + deregisterTargets: R(mockDTOutput(), nil), + } + outputsError := elbv2MockOutputs{ + deregisterTargets: R(mockDTOutput(), dummyErr), + } + + for _, test := range []struct { + name string + input deregisterTargetsOnTargetGroupsInputTest + outputs elbv2MockOutputs + wantError bool + }{ + { + "single-target-group", + deregisterTargetsOnTargetGroupsInputTest{ + targetGroupARNs: []string{"asg1"}, + instances: []string{"i1", "i2"}, + }, + outputsSuccess, + false, + }, + { + "multiple-target-groups", + deregisterTargetsOnTargetGroupsInputTest{ + targetGroupARNs: []string{"asg1", "asg2"}, + instances: []string{"i1", "i2"}, + }, + outputsSuccess, + false, + }, + { + "empty-input-no-error", + deregisterTargetsOnTargetGroupsInputTest{ + targetGroupARNs: []string{}, + instances: []string{}, + }, + outputsSuccess, + false, + }, + { + "error1", + deregisterTargetsOnTargetGroupsInputTest{ + targetGroupARNs: []string{"asg1"}, + instances: []string{"i1", "i2"}, + }, + outputsError, + true, + }, + { + "error2", + deregisterTargetsOnTargetGroupsInputTest{ + targetGroupARNs: []string{"asg1", "asg2"}, + instances: []string{"i1", "i2"}, + }, + outputsError, + true, + }, + } { + t.Run(fmt.Sprintf("%v", test.name), func(t *testing.T) { + svc := &mockElbv2Client{outputs: test.outputs} + err := deregisterTargetsOnTargetGroups(svc, test.input.targetGroupARNs, test.input.instances) + if test.wantError && err == nil { + t.Fatalf("expected error, got nothing") + } + if !test.wantError && err != nil { + t.Fatalf("unexpected error - %q", err) + } + if !test.wantError && err == nil { + sort.Strings(test.input.targetGroupARNs) + sort.Strings(test.input.instances) + dtTargetsGroupARNs := make([]string, 0, len(test.input.targetGroupARNs)) + for _, input := range svc.dtinputs { + dtTargetsGroupARNs = append(dtTargetsGroupARNs, aws.StringValue(input.TargetGroupArn)) + dtInstances := make([]string, len(input.Targets)) + for j, tgt := range input.Targets { + dtInstances[j] = aws.StringValue(tgt.Id) + } + sort.Strings(dtInstances) + if !reflect.DeepEqual(dtInstances, test.input.instances) { + t.Errorf("unexpected set of registered instances. expected: %q, got: %q", test.input.instances, dtInstances) + } + } + sort.Strings(dtTargetsGroupARNs) + if !reflect.DeepEqual(dtTargetsGroupARNs, test.input.targetGroupARNs) { + t.Errorf("unexpected set of targetGroupARNs. expected: %q, got: %q", test.input.targetGroupARNs, dtTargetsGroupARNs) + } + } + }) + } +} diff --git a/aws/elbv2mock_test.go b/aws/elbv2mock_test.go new file mode 100644 index 00000000..0dd49d89 --- /dev/null +++ b/aws/elbv2mock_test.go @@ -0,0 +1,42 @@ +package aws + +import ( + "github.com/aws/aws-sdk-go/service/elbv2" + "github.com/aws/aws-sdk-go/service/elbv2/elbv2iface" +) + +type elbv2MockOutputs struct { + registerTargets *apiResponse + deregisterTargets *apiResponse +} + +type mockElbv2Client struct { + elbv2iface.ELBV2API + outputs elbv2MockOutputs + rtinputs []*elbv2.RegisterTargetsInput + dtinputs []*elbv2.DeregisterTargetsInput +} + +func (m *mockElbv2Client) RegisterTargets(in *elbv2.RegisterTargetsInput) (*elbv2.RegisterTargetsOutput, error) { + m.rtinputs = append(m.rtinputs, in) + if out, ok := m.outputs.registerTargets.response.(*elbv2.RegisterTargetsOutput); ok { + return out, m.outputs.registerTargets.err + } + return nil, m.outputs.registerTargets.err +} + +func mockRTOutput() *elbv2.RegisterTargetsOutput { + return &elbv2.RegisterTargetsOutput{} +} + +func (m *mockElbv2Client) DeregisterTargets(in *elbv2.DeregisterTargetsInput) (*elbv2.DeregisterTargetsOutput, error) { + m.dtinputs = append(m.dtinputs, in) + if out, ok := m.outputs.deregisterTargets.response.(*elbv2.DeregisterTargetsOutput); ok { + return out, m.outputs.deregisterTargets.err + } + return nil, m.outputs.deregisterTargets.err +} + +func mockDTOutput() *elbv2.DeregisterTargetsOutput { + return &elbv2.DeregisterTargetsOutput{} +} diff --git a/controller.go b/controller.go index 0e732c94..e94e32fe 100644 --- a/controller.go +++ b/controller.go @@ -161,10 +161,10 @@ func main() { log.Printf("\tCluster ID: %s", awsAdapter.ClusterID()) log.Printf("\tvpc id: %s", awsAdapter.VpcID()) log.Printf("\tinstance id: %s", awsAdapter.InstanceID()) - log.Printf("\tauto scaling group name: %s", awsAdapter.AutoScalingGroupName()) log.Printf("\tsecurity group id: %s", awsAdapter.SecurityGroupID()) log.Printf("\tinternal subnet ids: %s", awsAdapter.FindLBSubnets(elbv2.LoadBalancerSchemeEnumInternal)) log.Printf("\tpublic subnet ids: %s", awsAdapter.FindLBSubnets(elbv2.LoadBalancerSchemeEnumInternetFacing)) + log.Printf("\tEC2 filters: %s", awsAdapter.FiltersString()) go serveMetrics(metricsAddress) quitCH := make(chan struct{}) diff --git a/kubernetes/adapter.go b/kubernetes/adapter.go index 8945016b..cf017b68 100644 --- a/kubernetes/adapter.go +++ b/kubernetes/adapter.go @@ -3,7 +3,8 @@ package kubernetes import ( "errors" "fmt" - "github.com/aws/aws-sdk-go/service/elbv2" + + "github.com/aws/aws-sdk-go/service/elbv2" ) type Adapter struct { @@ -59,7 +60,7 @@ func (i *Ingress) Hostname() string { // Scheme returns the scheme associated with the ingress func (i *Ingress) Scheme() string { - return i.scheme + return i.scheme } // CertHostname returns the DNS hostname associated with the ingress @@ -75,7 +76,7 @@ func (i *Ingress) SetCertificateARN(arn string) { // SetScheme sets Ingress.scheme to the scheme as specified. func (i *Ingress) SetScheme(scheme string) { - i.scheme = scheme + i.scheme = scheme } func newIngressFromKube(kubeIngress *ingress) *Ingress { @@ -96,10 +97,10 @@ func newIngressFromKube(kubeIngress *ingress) *Ingress { // Set schema to default if annotation value is not valid switch kubeIngress.getAnnotationsString(ingressSchemeAnnotation, "") { - case elbv2.LoadBalancerSchemeEnumInternal: - scheme = elbv2.LoadBalancerSchemeEnumInternal - default: - scheme = elbv2.LoadBalancerSchemeEnumInternetFacing + case elbv2.LoadBalancerSchemeEnumInternal: + scheme = elbv2.LoadBalancerSchemeEnumInternal + default: + scheme = elbv2.LoadBalancerSchemeEnumInternetFacing } certDomain := kubeIngress.getAnnotationsString(ingressCertificateDomainAnnotation, "") @@ -124,7 +125,7 @@ func newIngressForKube(i *Ingress) *ingress { Name: i.name, Annotations: map[string]interface{}{ ingressCertificateARNAnnotation: i.certificateARN, - ingressSchemeAnnotation: i.scheme, + ingressSchemeAnnotation: i.scheme, }, }, Status: ingressStatus{ diff --git a/kubernetes/adapter_test.go b/kubernetes/adapter_test.go index 2ecf473b..affc978c 100644 --- a/kubernetes/adapter_test.go +++ b/kubernetes/adapter_test.go @@ -25,7 +25,7 @@ func TestMappingRoundtrip(t *testing.T) { Name: "foo", Annotations: map[string]interface{}{ ingressCertificateARNAnnotation: "zbr", - ingressSchemeAnnotation: "internal", + ingressSchemeAnnotation: "internal", }, } kubeStatus := ingressStatus{ diff --git a/worker.go b/worker.go index 0474a158..750879e9 100644 --- a/worker.go +++ b/worker.go @@ -118,6 +118,16 @@ func doWork(certsProvider certs.CertificatesProvider, awsAdapter *aws.Adapter, k } log.Printf("Found %d stacks", len(stacks)) + err = awsAdapter.UpdateAutoScalingGroupsAndInstances() + if err != nil { + return fmt.Errorf("doWork failed to get instances from EC2: %v", err) + } + + awsAdapter.UpdateTargetGroupsAndAutoScalingGroups(stacks) + log.Printf("Found %d auto scaling groups", len(awsAdapter.AutoScalingGroupNames())) + log.Printf("Found %d single instances", len(awsAdapter.SingleInstances())) + log.Printf("Found %d EC2 instances", awsAdapter.CachedInstances()) + model := buildManagedModel(certsProvider, ingresses, stacks) log.Printf("Have %d models", len(model)) for _, managedItem := range model {