Skip to content

Commit

Permalink
feat: Advanced options for NLBs
Browse files Browse the repository at this point in the history
This implements two new annotations in an attempt to cover the use
case described in zalando-incubator#438.

The `zalando.org/aws-nlb-extra-listeners` annotation accepts a JSON
string that describes a list of extra listen/target ports to add to an
NLB. These will be routed to pods matching a specific label in the same
namespace as the ingress. As such, this depends on the AWS CNI mode
feature.

The `zalando.org/aws-nlb-cascade-http-to-alb` allows the NLB to
"cascade" HTTP(S) traffic to another managed ALB. This was added to
solve two issues while testing in a live cluster:

1. When using skipper, if it is set to only accept https traffic, it
   will reject requests from an NLB that is offloading the
   SSL and passing on http traffic, which is the current default
   configuration for NLBs. Some clusters or use cases may require
   end-to-end encryption.
2. When using SSL offloading and testing with Gitlab (the use case
   described in the original issue) it became confused as to the actual
   protocol and produced incorrect redirect links which broke
   authentication. This could likely be remedied in the application
   itself, however the issue is not present when using the ALB and
   redirects are handled there. Since the ALB is already present,
   routing the traffic directly to it is a simple, reasonable choice.
  • Loading branch information
jhuntwork committed Jul 26, 2022
1 parent 268c4b2 commit 7914a07
Show file tree
Hide file tree
Showing 13 changed files with 451 additions and 129 deletions.
33 changes: 33 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,8 @@ Overview of configuration which can be set via Ingress annotations.
|`zalando.org/aws-load-balancer-ssl-policy`|`string`|`ELBSecurityPolicy-2016-08`|
|`zalando.org/aws-load-balancer-type`| `nlb` \| `alb`|`alb`|
|`zalando.org/aws-load-balancer-http2`| `true` \| `false`|`true`|
|[`zalando.org/aws-nlb-cascade-http-to-alb`](#cascading-to-an-alb)| `true` \| `false`|`false`|
|[`zalando.org/aws-nlb-extra-listeners`](#extra-listen-ports)|`string`|N/A|
|`zalando.org/aws-waf-web-acl-id` | `string` | N/A |
|`kubernetes.io/ingress.class`|`string`|N/A|

Expand Down Expand Up @@ -664,6 +666,37 @@ In *AWS CNI Mode* (`target-access-mode=AWSCNI`) the controller actively manages
| `AWSCNI` | `false` | `true` | PodIP != HostIP: limited scaling and host bound |
| `AWSCNI` | `false` | `false` | free scaling, pod VPC CNI IP used |

## Advanced Options for NLBs

### Extra Listen Ports

Some real world scenarios may require non-standard TCP or UDP ingresses. The `zalando.org/aws-nlb-extra-listeners`
annotation allows you to specify a list of additional listeners to add to your NLB. The value of the annotation should
be a valid JSON string of the following format.

```json
[
{
"protocol": "TCP",
"listenport": 22,
"targetport": 2222,
"podlabel": "application=ssh-service"
}
]
```

The `podlabel` value is used to register targets in the target group associated with the listener. This depends on the
AWS CNI Mode feature, where individual pods receive accessible IP addresses. The value is used to identify pods running
in the same namespace as the ingress that will receive traffic from the load balancer.

### Cascading to an ALB

If your usage of an NLB requires both HTTP(S) and extra listen ports, but you already have an ALB managed by
kube-ingress-aws-controller, you may wish to route the HTTP(S) traffic through the ALB for consistency. Some
applications may even depend on ALB features, for example the way SSL is offloaded or how redirects are handled.
The `zalando.org/aws-nlb-cascade-http-to-alb` allows the NLB to use TCP listeners on standard ports to forward
HTTP(S) traffic to any ALBs discovered and managed by kube-ingress-aws-controller.

## Trying it out

The Ingress Controller's responsibility is limited to managing load balancers, as described above. To have a fully
Expand Down
124 changes: 92 additions & 32 deletions aws/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,13 +75,24 @@ type Adapter struct {
denyInternalRespContentType string
denyInternalRespStatusCode int
TargetCNI *TargetCNIconfig
CascadeCh chan []TargetGroupWithLabels
}

type TargetCNIconfig struct {
Enabled bool
TargetGroupCh chan []string
TargetGroupCh chan []TargetGroupWithLabels
}

type TargetGroupWithLabels struct {
ARN string
PodNamespace string
PodLabel string
}
type CNIEndpoint struct {
IPAddress string
Namespace string
Podlabel string
}
type manifest struct {
securityGroup *securityGroupDetails
instance *instanceDetails
Expand Down Expand Up @@ -235,7 +246,7 @@ func NewAdapter(clusterID, newControllerID, vpcID string, debug, disableInstrume
customFilter: DefaultCustomFilter,
TargetCNI: &TargetCNIconfig{
Enabled: false,
TargetGroupCh: make(chan []string, 10),
TargetGroupCh: make(chan []TargetGroupWithLabels, 10),
},
}

Expand Down Expand Up @@ -592,8 +603,33 @@ func (a *Adapter) UpdateTargetGroupsAndAutoScalingGroups(stacks []*Stack, proble
a.TargetCNI.TargetGroupCh <- targetTypesARNs[elbv2.TargetTypeEnumIp]
}

// run through any target groups with ALB targets and register all ALBs
for _, tg := range targetTypesARNs[elbv2.TargetTypeEnumAlb] {
albARNs := make([]string, 0, len(stacks))
for _, stack := range stacks {
if stack.LoadBalancerType == LoadBalancerTypeApplication {
albARNs = append(albARNs, stack.loadbalancerARN)
}
}
registeredTargets, err := a.getRegisteredTargets(tg.ARN)
if err != nil {
problems.Add("failed to get existing targets: %w", err)
}
if err := a.registerAndDeregister(albARNs, registeredTargets, tg.ARN); err != nil {
problems.Add("failed to update target registration %w", err)
}
}

// remove the IP TGs from the list keeping all other TGs including problematic #127 and nonexistent #436
targetGroupARNs := difference(allTargetGroupARNs, targetTypesARNs[elbv2.TargetTypeEnumIp])
var targetGroupARNs []string
for targetType, tgList := range targetTypesARNs {
if targetType == elbv2.TargetTypeEnumIp || targetType == elbv2.TargetTypeEnumAlb {
continue
}
for _, tg := range tgList {
targetGroupARNs = append(targetGroupARNs, tg.ARN)
}
}

ownerTags := map[string]string{
clusterIDTagPrefix + a.ClusterID(): resourceLifecycleOwned,
Expand Down Expand Up @@ -639,7 +675,7 @@ func (a *Adapter) UpdateTargetGroupsAndAutoScalingGroups(stacks []*Stack, proble
// All the required resources (listeners and target group) are created in a
// transactional fashion.
// Failure to create the stack causes it to be deleted automatically.
func (a *Adapter) CreateStack(certificateARNs []string, scheme, securityGroup, owner, sslPolicy, ipAddressType, wafWebACLID string, cwAlarms CloudWatchAlarmList, loadBalancerType string, http2 bool) (string, error) {
func (a *Adapter) CreateStack(certificateARNs []string, scheme, securityGroup, owner, sslPolicy, ipAddressType, wafWebACLID string, cwAlarms CloudWatchAlarmList, loadBalancerType string, http2 bool, extraListeners []ExtraListener, cascade bool) (string, error) {
certARNs := make(map[string]time.Time, len(certificateARNs))
for _, arn := range certificateARNs {
certARNs[arn] = time.Time{}
Expand Down Expand Up @@ -673,7 +709,7 @@ func (a *Adapter) CreateStack(certificateARNs []string, scheme, securityGroup, o
nlbHealthyThresholdCount: a.nlbHealthyThresholdCount,
targetPort: a.targetPort,
targetHTTPS: a.targetHTTPS,
httpDisabled: a.httpDisabled(loadBalancerType),
httpDisabled: a.httpDisabled(loadBalancerType, cascade),
httpTargetPort: a.httpTargetPort(loadBalancerType),
timeoutInMinutes: uint(a.creationTimeout.Minutes()),
stackTerminationProtection: a.stackTerminationProtection,
Expand All @@ -690,6 +726,8 @@ func (a *Adapter) CreateStack(certificateARNs []string, scheme, securityGroup, o
httpRedirectToHTTPS: a.httpRedirectToHTTPS,
nlbCrossZone: a.nlbCrossZone,
http2: http2,
extraListeners: extraListeners,
cascade: cascade,
tags: a.stackTags,
internalDomains: a.internalDomains,
targetAccessModeCNI: a.TargetCNI.Enabled,
Expand All @@ -704,7 +742,7 @@ func (a *Adapter) CreateStack(certificateARNs []string, scheme, securityGroup, o
return createStack(a.cloudformation, spec)
}

func (a *Adapter) UpdateStack(stackName string, certificateARNs map[string]time.Time, scheme, securityGroup, owner, sslPolicy, ipAddressType, wafWebACLID string, cwAlarms CloudWatchAlarmList, loadBalancerType string, http2 bool) (string, error) {
func (a *Adapter) UpdateStack(stackName string, certificateARNs map[string]time.Time, scheme, securityGroup, owner, sslPolicy, ipAddressType, wafWebACLID string, cwAlarms CloudWatchAlarmList, loadBalancerType string, http2 bool, extraListeners []ExtraListener, cascade bool) (string, error) {
if _, ok := SSLPolicies[sslPolicy]; !ok {
return "", fmt.Errorf("invalid SSLPolicy '%s' defined", sslPolicy)
}
Expand All @@ -729,7 +767,7 @@ func (a *Adapter) UpdateStack(stackName string, certificateARNs map[string]time.
nlbHealthyThresholdCount: a.nlbHealthyThresholdCount,
targetPort: a.targetPort,
targetHTTPS: a.targetHTTPS,
httpDisabled: a.httpDisabled(loadBalancerType),
httpDisabled: a.httpDisabled(loadBalancerType, cascade),
httpTargetPort: a.httpTargetPort(loadBalancerType),
timeoutInMinutes: uint(a.creationTimeout.Minutes()),
stackTerminationProtection: a.stackTerminationProtection,
Expand All @@ -746,6 +784,8 @@ func (a *Adapter) UpdateStack(stackName string, certificateARNs map[string]time.
httpRedirectToHTTPS: a.httpRedirectToHTTPS,
nlbCrossZone: a.nlbCrossZone,
http2: http2,
extraListeners: extraListeners,
cascade: cascade,
tags: a.stackTags,
internalDomains: a.internalDomains,
targetAccessModeCNI: a.TargetCNI.Enabled,
Expand All @@ -769,8 +809,8 @@ func (a *Adapter) httpTargetPort(loadBalancerType string) uint {
return a.targetPort
}

func (a *Adapter) httpDisabled(loadBalancerType string) bool {
if loadBalancerType == LoadBalancerTypeNetwork {
func (a *Adapter) httpDisabled(loadBalancerType string, cascade bool) bool {
if loadBalancerType == LoadBalancerTypeNetwork && !cascade {
return !a.nlbHTTPEnabled
}
return false
Expand Down Expand Up @@ -1039,36 +1079,56 @@ func nonTargetedASGs(ownedASGs, targetedASGs map[string]*autoScalingGroupDetails
return nonTargetedASGs
}

func (a *Adapter) getRegisteredTargets(tgARN string) ([]string, error) {
tgh, err := a.elbv2.DescribeTargetHealth(&elbv2.DescribeTargetHealthInput{TargetGroupArn: &tgARN})
if err != nil {
log.Errorf("unable to describe target health %v", err)
return []string{}, err
}
registeredTargets := make([]string, len(tgh.TargetHealthDescriptions))
for i, target := range tgh.TargetHealthDescriptions {
registeredTargets[i] = *target.Target.Id
}
return registeredTargets, nil
}

func (a *Adapter) registerAndDeregister(new []string, old []string, tgARN string) error {
toRegister := difference(new, old)
if len(toRegister) > 0 {
log.Info("Registering CNI targets: ", toRegister)
err := registerTargetsOnTargetGroups(a.elbv2, []string{tgARN}, toRegister)
if err != nil {
return err
}
}
toDeregister := difference(old, new)
if len(toDeregister) > 0 {
log.Info("Deregistering CNI targets: ", toDeregister)
err := deregisterTargetsOnTargetGroups(a.elbv2, []string{tgARN}, toDeregister)
if err != nil {
return err
}
}
return nil
}

// SetTargetsOnCNITargetGroups implements desired state for CNI target groups
// by polling the current list of targets thus creating a diff of what needs to be added and removed.
func (a *Adapter) SetTargetsOnCNITargetGroups(endpoints, cniTargetGroupARNs []string) error {
log.Debugf("setting targets on CNI target groups: '%v'", cniTargetGroupARNs)
for _, targetGroupARN := range cniTargetGroupARNs {
tgh, err := a.elbv2.DescribeTargetHealth(&elbv2.DescribeTargetHealthInput{TargetGroupArn: &targetGroupARN})
func (a *Adapter) SetTargetsOnCNITargetGroups(endpoints []CNIEndpoint, cniTargetGroups []TargetGroupWithLabels) error {
log.Debugf("setting targets on CNI target groups: '%v'", cniTargetGroups)
for _, targetGroup := range cniTargetGroups {
registeredTargets, err := a.getRegisteredTargets(targetGroup.ARN)
if err != nil {
log.Errorf("unable to describe target health %v", err)
// continue for processing of the rest of the target groups
continue
}
registeredInstances := make([]string, len(tgh.TargetHealthDescriptions))
for i, target := range tgh.TargetHealthDescriptions {
registeredInstances[i] = *target.Target.Id
}
toRegister := difference(endpoints, registeredInstances)
if len(toRegister) > 0 {
log.Info("Registering CNI targets: ", toRegister)
err := registerTargetsOnTargetGroups(a.elbv2, []string{targetGroupARN}, toRegister)
if err != nil {
return err
var matchingEndpoints []string
for _, endpoint := range endpoints {
if endpoint.Podlabel == targetGroup.PodLabel && endpoint.Namespace == targetGroup.PodNamespace {
matchingEndpoints = append(matchingEndpoints, endpoint.IPAddress)
}
}
toDeregister := difference(registeredInstances, endpoints)
if len(toDeregister) > 0 {
log.Info("Deregistering CNI targets: ", toDeregister)
err := deregisterTargetsOnTargetGroups(a.elbv2, []string{targetGroupARN}, toDeregister)
if err != nil {
return err
}
if err := a.registerAndDeregister(matchingEndpoints, registeredTargets, targetGroup.ARN); err != nil {
return err
}
}
return nil
Expand Down
12 changes: 7 additions & 5 deletions aws/adapter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -949,7 +949,7 @@ func TestWithxlbHealthyThresholdCount(t *testing.T) {
}

func TestAdapter_SetTargetsOnCNITargetGroups(t *testing.T) {
tgARNs := []string{"asg1"}
tgARNs := []TargetGroupWithLabels{{ARN: "asg1"}}
thOut := elbv2.DescribeTargetHealthOutput{TargetHealthDescriptions: []*elbv2.TargetHealthDescription{}}
m := &mockElbv2Client{
outputs: elbv2MockOutputs{
Expand All @@ -961,7 +961,7 @@ func TestAdapter_SetTargetsOnCNITargetGroups(t *testing.T) {
a := &Adapter{elbv2: m, TargetCNI: &TargetCNIconfig{}}

t.Run("adding a new endpoint", func(t *testing.T) {
require.NoError(t, a.SetTargetsOnCNITargetGroups([]string{"1.1.1.1"}, tgARNs))
require.NoError(t, a.SetTargetsOnCNITargetGroups([]CNIEndpoint{{IPAddress: "1.1.1.1"}}, tgARNs))
require.Equal(t, []*elbv2.RegisterTargetsInput{{
TargetGroupArn: aws.String("asg1"),
Targets: []*elbv2.TargetDescription{{Id: aws.String("1.1.1.1")}},
Expand All @@ -975,7 +975,8 @@ func TestAdapter_SetTargetsOnCNITargetGroups(t *testing.T) {
}
m.rtinputs, m.dtinputs = nil, nil

require.NoError(t, a.SetTargetsOnCNITargetGroups([]string{"1.1.1.1", "2.2.2.2", "3.3.3.3"}, tgARNs))
require.NoError(t, a.SetTargetsOnCNITargetGroups(
[]CNIEndpoint{{IPAddress: "1.1.1.1"}, {IPAddress: "2.2.2.2"}, {IPAddress: "3.3.3.3"}}, tgARNs))
require.Equal(t, []*elbv2.TargetDescription{
{Id: aws.String("2.2.2.2")},
{Id: aws.String("3.3.3.3")},
Expand All @@ -991,7 +992,7 @@ func TestAdapter_SetTargetsOnCNITargetGroups(t *testing.T) {
}}
m.rtinputs, m.dtinputs = nil, nil

require.NoError(t, a.SetTargetsOnCNITargetGroups([]string{"1.1.1.1", "3.3.3.3"}, tgARNs))
require.NoError(t, a.SetTargetsOnCNITargetGroups([]CNIEndpoint{{IPAddress: "1.1.1.1"}, {IPAddress: "3.3.3.3"}}, tgARNs))
require.Equal(t, []*elbv2.RegisterTargetsInput(nil), m.rtinputs)
require.Equal(t, []*elbv2.TargetDescription{{Id: aws.String("2.2.2.2")}}, m.dtinputs[0].Targets)
})
Expand All @@ -1004,7 +1005,8 @@ func TestAdapter_SetTargetsOnCNITargetGroups(t *testing.T) {
}}
m.rtinputs, m.dtinputs = nil, nil

require.NoError(t, a.SetTargetsOnCNITargetGroups([]string{"1.1.1.1", "2.2.2.2", "3.3.3.3"}, tgARNs))
require.NoError(t, a.SetTargetsOnCNITargetGroups(
[]CNIEndpoint{{IPAddress: "1.1.1.1"}, {IPAddress: "2.2.2.2"}, {IPAddress: "3.3.3.3"}}, tgARNs))
require.Equal(t, []*elbv2.TargetDescription{{Id: aws.String("3.3.3.3")}}, m.rtinputs[0].Targets)
require.Equal(t, []*elbv2.TargetDescription{{Id: aws.String("4.4.4.4")}}, m.dtinputs[0].Targets)
})
Expand Down
29 changes: 26 additions & 3 deletions aws/asg.go
Original file line number Diff line number Diff line change
Expand Up @@ -261,16 +261,39 @@ func describeTargetGroups(elbv2svc elbv2iface.ELBV2API) (map[string]struct{}, er
}

// map the target group slice into specific types such as instance, ip, etc
func categorizeTargetTypeInstance(elbv2svc elbv2iface.ELBV2API, allTGARNs []string) (map[string][]string, error) {
targetTypes := make(map[string][]string)
func categorizeTargetTypeInstance(elbv2svc elbv2iface.ELBV2API, allTGARNs []string) (map[string][]TargetGroupWithLabels, error) {
targetTypes := make(map[string][]TargetGroupWithLabels)
err := elbv2svc.DescribeTargetGroupsPagesWithContext(context.TODO(), &elbv2.DescribeTargetGroupsInput{},
func(resp *elbv2.DescribeTargetGroupsOutput, lastPage bool) bool {
for _, tg := range resp.TargetGroups {
for _, v := range allTGARNs {
if v != aws.StringValue(tg.TargetGroupArn) {
continue
}
targetTypes[aws.StringValue(tg.TargetType)] = append(targetTypes[aws.StringValue(tg.TargetType)], aws.StringValue(tg.TargetGroupArn))
var podlabel, podnamespace string
log.Debugf("Looking for tags on %s", aws.StringValue(tg.TargetGroupArn))
out, err := elbv2svc.DescribeTags(&elbv2.DescribeTagsInput{ResourceArns: []*string{tg.TargetGroupArn}})
if err != nil {
log.Errorf("cannot describe tags on target group: %v", err)
} else {
for _, desc := range out.TagDescriptions {
for _, tag := range desc.Tags {
switch aws.StringValue(tag.Key) {
case podLabelTag:
podlabel = aws.StringValue(tag.Value)
case podNamespaceTag:
podnamespace = aws.StringValue(tag.Value)
}
}
}
}
log.Debugf("Adding tg with label: '%s' in namespace: '%s'", podlabel, podnamespace)
targetTypes[aws.StringValue(tg.TargetType)] = append(
targetTypes[aws.StringValue(tg.TargetType)],
TargetGroupWithLabels{
ARN: aws.StringValue(tg.TargetGroupArn),
PodLabel: podlabel,
PodNamespace: podnamespace})
}
}
return true
Expand Down
Loading

0 comments on commit 7914a07

Please sign in to comment.