Skip to content

Commit

Permalink
implements zalando-incubator#298 Support for target-type IP
Browse files Browse the repository at this point in the history
This adds a new operational mode of adding the **Ingress Pods as Targets in the Target Groups directly**, instead of the current operation mode where the Ingress pods are accessed through a **HostPort**.

The core idea is based on the fact that standard AWS EKS cluster running AWS VPC CNI have their pods as first class members in the VPCs. Their IPs are directly reachable from the ALB/NLB target groups like the nodes are too, which means there is no necessity for the HostPort indirection.

There are several drivers and advantages accessing the pod directly vs a HostPort:

ref: https://kubernetes.io/docs/concepts/configuration/overview/#services

This has been the biggest trouble in operations, that the update of node members in target groups is slower than the nodes are physically replaced, which ends up in a black hole of **no Ingresses available at a time**. We are facing regularly downtimes especially when spot interruptions or ASG node rollings happen that the ALB/NLB takes up to 2 minutes to reflect the group change. For smaller clusters this leads to no Skipper instance being registered hence no target available to forward traffic to.
With this new mode the registration happens independently of ASGs and instantly, the scheduling of pods up to be serving traffic from ALB takes less than 10 seconds!

With HostPort there is an eventual dependency on available nodes to scale the Ingress.
Plus the Ingress pod cannot be replaced in place but requires a termination first and then rescheduling. For a certain time, which can be more than a minute, this node is offline as an Ingress.
With this mode the (host networking ?) and HostPort is obsolete, which allows node independent scaling of Skipper pods! Skipper becomes a regular deployment and its ReplicaSet can be independent on the cluster size, which simplifies operations especially for smaller clusters. We are using a custom HPA metric attached to node group size to counteract this deployment / daemonset hybrid combination, which is obsolete now!

Core idea is the event based registration to Kubernetes using pod `Informer` that receives immediate notifications about pod changes, which allow almost zero delayed updates on the target groups.

The registration happens as soon as the pod received an IP from AWS VPC. Hence the readiness probe of the ALB/NLB starts to monitor already during scheduling of the pod, serving the earliest possible. Tests in lab show pods serving ALB traffic well under 10s from scheduling!

Deregistration happens bound to the kubernetes event. That means the LB is now in sync with the cluster and will stop sending traffic before the pod is actually terminated. This implement save deregistering without traffic loss. Tests in lab show even under aggressive deployment scalings there are no packet losses measured!

Since the IP based TGs are managed now by this controller, they represent pods and thus all of them are shown healthy, otherwise cleaned up by this controller.

* client-go Informer: This high level functions are providing a convenient access to event registrations of kubernetes. Since the event registration is the key of fast response and efficient compared to high rate polling, using this standard factory methods seems standing to reason.

*  successful transistion of TG from type Instance to type IP vice versa
* the controller registers pods that are discovered
* the controller deregisters pods that are "terminating" status
* the controller recovers desired state if manual intervention on the TG happened by "resyncing"
* it removes pods that are killed or dead

| access mode | HostNetwork | HostPort | Result |                      Notes                       |
| :---------: | :---------: | :------: | :----: | :----------------------------------------------: |
| `Instance`  |   `true`    |  `true`  |  `ok`  |                    status quo                    |
|    `IP`     |   `true`    |  `true`  |  `ok`  |       PodIP == HostIP --> limited benefits       |
|    `IP`     |   `false`   |  `true`  |  `ok`  | PodIP != HostIP --> limited scaling, node bounde |
|    `IP`     |   `true`    | `false`  | `N/A`  |            impossible, HN implies HP             |
|    `IP`     |   `false`   | `false`  |  `OK`  |            goal achieved: free scaling and HA             |

Example logs:
```
time="2021-12-17T15:36:36Z" level=info msg="Deleted Pod skipper-ingress-575548f66-k99vs IP:172.16.49.249"
time="2021-12-17T15:36:36Z" level=info msg="Deregistering CNI targets: [172.16.49.249]"
time="2021-12-17T15:36:37Z" level=info msg="New Pod skipper-ingress-575548f66-qff2q IP:172.16.60.193"
time="2021-12-17T15:36:40Z" level=info msg="Registering CNI targets: [172.16.60.193]"
```

* extended the service account with required RBAC permissions to watch/list pods
* added example of Skipper without a HostPort and HostNetwork
Signed-off-by: Samuel Lang <[email protected]>
  • Loading branch information
universam1 committed Jan 11, 2022
1 parent 949a0bc commit 211920e
Show file tree
Hide file tree
Showing 23 changed files with 1,435 additions and 26 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ jobs:
- run: go get github.com/mattn/goveralls
env:
GO111MODULE: off
- run: curl -sfL https://install.goreleaser.com/github.com/golangci/golangci-lint.sh | sh -s -- -b $(go env GOPATH)/bin ${GOLANGCI_RELEASE}
- run: curl -sSfL https://raw.githubusercontent.com/golangci/golangci-lint/master/install.sh | sh -s -- -b $(go env GOPATH)/bin ${GOLANGCI_RELEASE}
env:
GOLANGCI_RELEASE: v1.32.1
- run: make test
Expand Down
23 changes: 23 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ This information is used to manage AWS resources for each ingress objects of the
- Can be used in clusters created by [Kops](https://github.com/kubernetes/kops), see our [deployment guide for Kops](deploy/kops.md)
- [Support Multiple TLS Certificates per ALB (SNI)](https://aws.amazon.com/blogs/aws/new-application-load-balancer-sni/).
- Support for AWS WAF and WAFv2
- Support for AWS CNI pod direct access

## Upgrade

Expand Down Expand Up @@ -590,6 +591,28 @@ By default, the controller will expose both HTTP and HTTPS ports on the load bal
The controller used to have only the `--health-check-port` flag available, and would use the same port as health check and the target port.
Those ports are now configured individually. If you relied on this behavior, please include the `--target-port` in your configuration.

## AWS CNI Mode (experimental)

The default operation mode of the controller (`target-access-mode=HostPort`) is to link the target groups to the autoscaling group. The target group type is `instance`, requiring the ingress pod to be accessible through a `HostNetwork` and `HostPort`.

In *AWS CNI Mode* (`target-access-mode=AWSCNI`) the controller actively manages the target group members. Since AWS EKS cluster running AWS VPC CNI have their pods as first class members in the VPCs, they can receive the traffic directly, being managed through a target group type is `ip`, which means there is no necessity for the HostPort indirection.

### Notes

- For security reasons the HostPort requirement might be of concern
- Direct management of the target group members is significantly faster compared to the AWS linked mode, but it requires a running controller for updates. As of now, the controller is not prepared for high availability replicated setup.
- The registration and deregistration is synced with the pod lifecycle, hence a pod in terminating phase is deregistered from the target group before shut down.
- Ingress pods are not bound to nodes in CNI mode and the deployment can scale independently.

### Configuration options

| access mode | HostNetwork | HostPort | Notes |
| :---------: | :---------: | :------: | :---------------------------------------------: |
| `HostPort` | `true` | `true` | default setup |
| `AWSCNI` | `true` | `true` | PodIP == HostIP: limited scaling and host bound |
| `AWSCNI` | `false` | `true` | PodIP != HostIP: limited scaling and host bound |
| `AWSCNI` | `false` | `false` | free scaling, pod VPC CNI IP used |

## Trying it out

The Ingress Controller's responsibility is limited to managing load balancers, as described above. To have a fully
Expand Down
99 changes: 91 additions & 8 deletions aws/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,13 @@ type Adapter struct {
denyInternalRespBody string
denyInternalRespContentType string
denyInternalRespStatusCode int
TargetCNI *TargetCNIconfig
}

type TargetCNIconfig struct {
Enabled bool
TargetGroupARNs []string
TargetGroupCh chan []string
}

type manifest struct {
Expand Down Expand Up @@ -124,11 +131,14 @@ const (
DefaultNLBCrossZone = false
DefaultNLBHTTPEnabled = false

nameTag = "Name"
LoadBalancerTypeApplication = "application"
LoadBalancerTypeNetwork = "network"
IPAddressTypeIPV4 = "ipv4"
IPAddressTypeDualstack = "dualstack"
nameTag = "Name"
LoadBalancerTypeApplication = "application"
LoadBalancerTypeNetwork = "network"
IPAddressTypeIPV4 = "ipv4"
IPAddressTypeDualstack = "dualstack"
TargetAccessModeAWSCNI = "AWSCNI"
TargetAccessModeHostPort = "HostPort"
DefaultTargetCNILabelSelector = "kube-system/application=skipper-ingress"
)

var (
Expand Down Expand Up @@ -225,6 +235,10 @@ func NewAdapter(clusterID, newControllerID, vpcID string, debug, disableInstrume
nlbCrossZone: DefaultNLBCrossZone,
nlbHTTPEnabled: DefaultNLBHTTPEnabled,
customFilter: DefaultCustomFilter,
TargetCNI: &TargetCNIconfig{
Enabled: false,
TargetGroupCh: make(chan []string, 10),
},
}

adapter.manifest, err = buildManifest(adapter, clusterID, vpcID)
Expand Down Expand Up @@ -432,6 +446,12 @@ func (a *Adapter) WithInternalDomains(domains []string) *Adapter {
return a
}

// WithTargetAccessMode returns the receiver adapter after defining the target access mode
func (a *Adapter) WithTargetAccessMode(t string) *Adapter {
a.TargetCNI.Enabled = t == TargetAccessModeAWSCNI
return a
}

// WithDenyInternalDomains returns the receiver adapter after setting
// the denyInternalDomains config.
func (a *Adapter) WithDenyInternalDomains(deny bool) *Adapter {
Expand Down Expand Up @@ -568,14 +588,26 @@ func (a *Adapter) UpdateTargetGroupsAndAutoScalingGroups(stacks []*Stack, proble
return
}

// split the full list into relevant TG types
targetTypesARNs, err := categorizeTargetTypeInstance(a.elbv2, targetGroupARNs)
if err != nil {
problems.Add("failed to categorize Target Type Instance: %w", err)
return
}

// update the CNI TG list
if a.TargetCNI.Enabled {
a.TargetCNI.TargetGroupCh <- targetTypesARNs[elbv2.TargetTypeEnumIp]
}

ownerTags := map[string]string{
clusterIDTagPrefix + a.ClusterID(): resourceLifecycleOwned,
kubernetesCreatorTag: a.controllerID,
}

for _, asg := range a.TargetedAutoScalingGroups {
// This call is idempotent and safe to execute every time
if err := updateTargetGroupsForAutoScalingGroup(a.autoscaling, a.elbv2, targetGroupARNs, asg.name, ownerTags); err != nil {
if err := updateTargetGroupsForAutoScalingGroup(a.autoscaling, a.elbv2, targetTypesARNs[elbv2.TargetTypeEnumInstance], asg.name, ownerTags); err != nil {
problems.Add("failed to update target groups for autoscaling group %q: %w", asg.name, err)
}
}
Expand All @@ -592,13 +624,13 @@ func (a *Adapter) UpdateTargetGroupsAndAutoScalingGroups(stacks []*Stack, proble
runningSingleInstances := a.RunningSingleInstances()
if len(runningSingleInstances) != 0 {
// This call is idempotent too
if err := registerTargetsOnTargetGroups(a.elbv2, targetGroupARNs, runningSingleInstances); err != nil {
if err := registerTargetsOnTargetGroups(a.elbv2, targetTypesARNs[elbv2.TargetTypeEnumInstance], runningSingleInstances); err != nil {
problems.Add("failed to register instances %q in target groups: %w", runningSingleInstances, err)
}
}
if len(a.obsoleteInstances) != 0 {
// Deregister instances from target groups and clean up list of obsolete instances
if err := deregisterTargetsOnTargetGroups(a.elbv2, targetGroupARNs, a.obsoleteInstances); err != nil {
if err := deregisterTargetsOnTargetGroups(a.elbv2, targetTypesARNs[elbv2.TargetTypeEnumInstance], a.obsoleteInstances); err != nil {
problems.Add("failed to deregister instances %q in target groups: %w", a.obsoleteInstances, err)
} else {
a.obsoleteInstances = make([]string, 0)
Expand Down Expand Up @@ -665,6 +697,7 @@ func (a *Adapter) CreateStack(certificateARNs []string, scheme, securityGroup, o
http2: http2,
tags: a.stackTags,
internalDomains: a.internalDomains,
targetAccessModeCNI: a.TargetCNI.Enabled,
denyInternalDomains: a.denyInternalDomains,
denyInternalDomainsResponse: denyResp{
body: a.denyInternalRespBody,
Expand Down Expand Up @@ -720,6 +753,7 @@ func (a *Adapter) UpdateStack(stackName string, certificateARNs map[string]time.
http2: http2,
tags: a.stackTags,
internalDomains: a.internalDomains,
targetAccessModeCNI: a.TargetCNI.Enabled,
denyInternalDomains: a.denyInternalDomains,
denyInternalDomainsResponse: denyResp{
body: a.denyInternalRespBody,
Expand Down Expand Up @@ -1009,3 +1043,52 @@ func nonTargetedASGs(ownedASGs, targetedASGs map[string]*autoScalingGroupDetails

return nonTargetedASGs
}

// SetTargetsOnCNITargetGroups implements desired state for CNI target groups
// by polling the current list of targets thus creating a diff of what needs to be added and removed.
func (a *Adapter) SetTargetsOnCNITargetGroups(endpoints []string) error {
for _, targetGroupARN := range a.TargetCNI.TargetGroupARNs {
tgh, err := a.elbv2.DescribeTargetHealth(&elbv2.DescribeTargetHealthInput{TargetGroupArn: &targetGroupARN})
if err != nil {
log.Errorf("unable to describe target health %v", err)
// continue for processing of the rest of the target groups
continue
}
registeredInstances := make([]string, len(tgh.TargetHealthDescriptions))
for i, target := range tgh.TargetHealthDescriptions {
registeredInstances[i] = *target.Target.Id
}
toregister := difference(endpoints, registeredInstances)
if len(toregister) > 0 {
log.Info("Registering CNI targets: ", toregister)
err := registerTargetsOnTargetGroups(a.elbv2, []string{targetGroupARN}, toregister)
if err != nil {
return err
}
}
toderegister := difference(registeredInstances, endpoints)
if len(toderegister) > 0 {
log.Info("Deregistering CNI targets: ", toderegister)
err := deregisterTargetsOnTargetGroups(a.elbv2, []string{targetGroupARN}, toderegister)
if err != nil {
return err
}
}
}
return nil
}

// difference returns the elements in `a` that aren't in `b`.
func difference(a, b []string) []string {
mb := make(map[string]struct{}, len(b))
for _, x := range b {
mb[x] = struct{}{}
}
var diff []string
for _, x := range a {
if _, found := mb[x]; !found {
diff = append(diff, x)
}
}
return diff
}
76 changes: 76 additions & 0 deletions aws/adapter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -947,3 +947,79 @@ func TestWithxlbHealthyThresholdCount(t *testing.T) {
require.Equal(t, uint(4), b.nlbHealthyThresholdCount)
})
}

func TestAdapter_SetTargetsOnCNITargetGroups(t *testing.T) {
thOut := elbv2.DescribeTargetHealthOutput{TargetHealthDescriptions: []*elbv2.TargetHealthDescription{}}
m := &mockElbv2Client{
outputs: elbv2MockOutputs{
describeTargetHealth: &apiResponse{response: &thOut, err: nil},
registerTargets: R(mockDTOutput(), nil),
deregisterTargets: R(mockDTOutput(), nil),
},
}
a := &Adapter{elbv2: m, TargetCNI: &TargetCNIconfig{}}

t.Run("adding a new endpoint", func(t *testing.T) {
a.TargetCNI.TargetGroupARNs = []string{"asg1"}

require.NoError(t, a.SetTargetsOnCNITargetGroups([]string{"1.1.1.1"}))
require.Equal(t, []*elbv2.RegisterTargetsInput{{
TargetGroupArn: aws.String("asg1"),
Targets: []*elbv2.TargetDescription{{Id: aws.String("1.1.1.1")}},
}}, m.rtinputs)
require.Equal(t, []*elbv2.DeregisterTargetsInput(nil), m.dtinputs)
})

t.Run("two new endpoints, registers the new EPs only", func(t *testing.T) {
thOut = elbv2.DescribeTargetHealthOutput{TargetHealthDescriptions: []*elbv2.TargetHealthDescription{
{Target: &elbv2.TargetDescription{Id: aws.String("1.1.1.1")}}},
}
m.rtinputs, m.dtinputs = nil, nil

require.NoError(t, a.SetTargetsOnCNITargetGroups([]string{"1.1.1.1", "2.2.2.2", "3.3.3.3"}))
require.Equal(t, []*elbv2.TargetDescription{
{Id: aws.String("2.2.2.2")},
{Id: aws.String("3.3.3.3")},
}, m.rtinputs[0].Targets)
require.Equal(t, []*elbv2.DeregisterTargetsInput(nil), m.dtinputs)
})

t.Run("removing one endpoint, causing deregistration of it", func(t *testing.T) {
thOut = elbv2.DescribeTargetHealthOutput{TargetHealthDescriptions: []*elbv2.TargetHealthDescription{
{Target: &elbv2.TargetDescription{Id: aws.String("1.1.1.1")}},
{Target: &elbv2.TargetDescription{Id: aws.String("2.2.2.2")}},
{Target: &elbv2.TargetDescription{Id: aws.String("3.3.3.3")}},
}}
m.rtinputs, m.dtinputs = nil, nil

require.NoError(t, a.SetTargetsOnCNITargetGroups([]string{"1.1.1.1", "3.3.3.3"}))
require.Equal(t, []*elbv2.RegisterTargetsInput(nil), m.rtinputs)
require.Equal(t, []*elbv2.TargetDescription{{Id: aws.String("2.2.2.2")}}, m.dtinputs[0].Targets)
})

t.Run("restoring desired state after external manipulation, adding and removing one", func(t *testing.T) {
thOut = elbv2.DescribeTargetHealthOutput{TargetHealthDescriptions: []*elbv2.TargetHealthDescription{
{Target: &elbv2.TargetDescription{Id: aws.String("1.1.1.1")}},
{Target: &elbv2.TargetDescription{Id: aws.String("2.2.2.2")}},
{Target: &elbv2.TargetDescription{Id: aws.String("4.4.4.4")}},
}}
m.rtinputs, m.dtinputs = nil, nil

require.NoError(t, a.SetTargetsOnCNITargetGroups([]string{"1.1.1.1", "2.2.2.2", "3.3.3.3"}))
require.Equal(t, []*elbv2.TargetDescription{{Id: aws.String("3.3.3.3")}}, m.rtinputs[0].Targets)
require.Equal(t, []*elbv2.TargetDescription{{Id: aws.String("4.4.4.4")}}, m.dtinputs[0].Targets)
})
}

func TestWithTargetAccessMode(t *testing.T) {
t.Run("WithTargetAccessMode enables AWS CNI mode", func(t *testing.T) {
a := &Adapter{TargetCNI: &TargetCNIconfig{Enabled: false}}
a = a.WithTargetAccessMode("AWSCNI")
require.True(t, a.TargetCNI.Enabled)
})
t.Run("WithTargetAccessMode disables AWS CNI mode", func(t *testing.T) {
a := &Adapter{TargetCNI: &TargetCNIconfig{Enabled: true}}
a = a.WithTargetAccessMode("HostPort")
require.False(t, a.TargetCNI.Enabled)
})
}
19 changes: 19 additions & 0 deletions aws/asg.go
Original file line number Diff line number Diff line change
Expand Up @@ -260,6 +260,25 @@ func describeTargetGroups(elbv2svc elbv2iface.ELBV2API) (map[string]struct{}, er
return targetGroups, err
}

// map the target group slice into specific types such as instance, ip, etc
func categorizeTargetTypeInstance(elbv2svc elbv2iface.ELBV2API, allTGARNs []string) (map[string][]string, error) {
targetTypes := make(map[string][]string)
err := elbv2svc.DescribeTargetGroupsPagesWithContext(context.TODO(), &elbv2.DescribeTargetGroupsInput{},
func(resp *elbv2.DescribeTargetGroupsOutput, lastPage bool) bool {
for _, tg := range resp.TargetGroups {
for _, v := range allTGARNs {
if v != aws.StringValue(tg.TargetGroupArn) {
continue
}
targetTypes[aws.StringValue(tg.TargetType)] = append(targetTypes[aws.StringValue(tg.TargetType)], aws.StringValue(tg.TargetGroupArn))
}
}
return true
})
log.Debugf("categorized target group arns: %#v", targetTypes)
return targetTypes, err
}

// tgHasTags returns true if the specified resource has the expected tags.
func tgHasTags(descs []*elbv2.TagDescription, arn string, tags map[string]string) bool {
for _, desc := range descs {
Expand Down
Loading

0 comments on commit 211920e

Please sign in to comment.