Skip to content

Commit

Permalink
Honor Service ExternalTrafficPolicy
Browse files Browse the repository at this point in the history
If the ExternalTrafficPolicy is set to Local, the LoadBalancer pods will
send traffic to the NodePort on the local node, instead of the
ClusterIP.

Signed-off-by: Brad Davidson <[email protected]>
  • Loading branch information
brandond committed Jan 19, 2023
1 parent 7bbcac9 commit cc544c0
Show file tree
Hide file tree
Showing 8 changed files with 150 additions and 92 deletions.
14 changes: 7 additions & 7 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -122,15 +122,15 @@ require (
github.com/otiai10/copy v1.7.0
github.com/pkg/errors v0.9.1
github.com/rancher/dynamiclistener v0.3.5
github.com/rancher/lasso v0.0.0-20210616224652-fc3ebd901c08
github.com/rancher/lasso v0.0.0-20221227210133-6ea88ca2fbcc
github.com/rancher/remotedialer v0.2.6-0.20220624190122-ea57207bf2b8
github.com/rancher/wharfie v0.5.3
github.com/rancher/wrangler v1.0.0
github.com/rancher/wrangler v1.0.1-0.20230112175341-ce552e665720
github.com/robfig/cron/v3 v3.0.1
github.com/rootless-containers/rootlesskit v1.0.1
github.com/sirupsen/logrus v1.9.0
github.com/spf13/pflag v1.0.5
github.com/stretchr/testify v1.8.0
github.com/stretchr/testify v1.8.1
github.com/urfave/cli v1.22.9
github.com/vishvananda/netlink v1.2.1-beta.2
github.com/yl2chen/cidranger v1.0.2
Expand Down Expand Up @@ -340,7 +340,7 @@ require (
github.com/soheilhy/cmux v0.1.5 // indirect
github.com/spf13/cobra v1.6.0 // indirect
github.com/stoewer/go-strcase v1.2.0 // indirect
github.com/stretchr/objx v0.4.0 // indirect
github.com/stretchr/objx v0.5.0 // indirect
github.com/syndtr/gocapability v0.0.0-20200815063812-42c35b437635 // indirect
github.com/tmc/grpc-websocket-proxy v0.0.0-20201229170055-e5319fda7802 // indirect
github.com/tmccombs/hcl2json v0.3.3 // indirect
Expand Down Expand Up @@ -393,17 +393,17 @@ require (
gopkg.in/warnings.v0 v0.1.1 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
honnef.co/go/tools v0.2.2 // indirect
k8s.io/apiextensions-apiserver v0.24.0 // indirect
k8s.io/apiextensions-apiserver v0.25.4 // indirect
k8s.io/cli-runtime v0.22.2 // indirect
k8s.io/cluster-bootstrap v0.0.0 // indirect
k8s.io/code-generator v0.24.0 // indirect
k8s.io/code-generator v0.25.4 // indirect
k8s.io/controller-manager v0.25.4 // indirect
k8s.io/csi-translation-lib v0.0.0 // indirect
k8s.io/dynamic-resource-allocation v0.0.0 // indirect
k8s.io/gengo v0.0.0-20220902162205-c0856e24416d // indirect
k8s.io/klog/v2 v2.80.1 // indirect
k8s.io/kms v0.0.0 // indirect
k8s.io/kube-aggregator v0.24.0 // indirect
k8s.io/kube-aggregator v0.25.4 // indirect
k8s.io/kube-controller-manager v0.0.0 // indirect
k8s.io/kube-openapi v0.0.0-20221012153701-172d655c2280 // indirect
k8s.io/kube-proxy v0.0.0 // indirect
Expand Down
35 changes: 8 additions & 27 deletions go.sum

Large diffs are not rendered by default.

8 changes: 8 additions & 0 deletions manifests/ccm.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,14 @@ rules:
- daemonsets
verbs:
- "*"
- apiGroups:
- "discovery.k8s.io"
resources:
- endpointslices
verbs:
- get
- list
- watch
---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRoleBinding
Expand Down
9 changes: 7 additions & 2 deletions pkg/cloudprovider/cloudprovider.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ import (
appsclient "github.com/rancher/wrangler/pkg/generated/controllers/apps/v1"
"github.com/rancher/wrangler/pkg/generated/controllers/core"
coreclient "github.com/rancher/wrangler/pkg/generated/controllers/core/v1"
"github.com/rancher/wrangler/pkg/generated/controllers/discovery"
discoveryclient "github.com/rancher/wrangler/pkg/generated/controllers/discovery/v1"
"github.com/rancher/wrangler/pkg/generic"
"github.com/rancher/wrangler/pkg/start"
"github.com/sirupsen/logrus"
Expand Down Expand Up @@ -41,6 +43,7 @@ type k3s struct {

processor apply.Apply
daemonsetCache appsclient.DaemonSetCache
endpointsCache discoveryclient.EndpointSliceCache
nodeCache coreclient.NodeCache
podCache coreclient.PodCache
workqueue workqueue.RateLimitingInterface
Expand Down Expand Up @@ -89,21 +92,23 @@ func (k *k3s) Initialize(clientBuilder cloudprovider.ControllerClientBuilder, st

lbCoreFactory := core.NewFactoryFromConfigWithOptionsOrDie(config, &generic.FactoryOptions{Namespace: k.LBNamespace})
lbAppsFactory := apps.NewFactoryFromConfigWithOptionsOrDie(config, &generic.FactoryOptions{Namespace: k.LBNamespace})
lbDiscFactory := discovery.NewFactoryFromConfigOrDie(config)

processor, err := apply.NewForConfig(config)
if err != nil {
logrus.Fatalf("Failed to create apply processor for %s: %v", controllerName, err)
}
k.processor = processor.WithDynamicLookup().WithCacheTypes(lbAppsFactory.Apps().V1().DaemonSet())
k.daemonsetCache = lbAppsFactory.Apps().V1().DaemonSet().Cache()
k.endpointsCache = lbDiscFactory.Discovery().V1().EndpointSlice().Cache()
k.podCache = lbCoreFactory.Core().V1().Pod().Cache()
k.workqueue = workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter())

if err := k.Register(ctx, coreFactory.Core().V1().Node(), lbCoreFactory.Core().V1().Pod()); err != nil {
if err := k.Register(ctx, coreFactory.Core().V1().Node(), lbCoreFactory.Core().V1().Pod(), lbDiscFactory.Discovery().V1().EndpointSlice()); err != nil {
logrus.Fatalf("Failed to register %s handlers: %v", controllerName, err)
}

if err := start.All(ctx, 1, coreFactory, lbCoreFactory, lbAppsFactory); err != nil {
if err := start.All(ctx, 1, coreFactory, lbCoreFactory, lbAppsFactory, lbDiscFactory); err != nil {
logrus.Fatalf("Failed to start %s controllers: %v", controllerName, err)
}
} else {
Expand Down
163 changes: 111 additions & 52 deletions pkg/cloudprovider/servicelb.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,13 @@ import (
"github.com/k3s-io/k3s/pkg/version"
"github.com/rancher/wrangler/pkg/condition"
coreclient "github.com/rancher/wrangler/pkg/generated/controllers/core/v1"
discoveryclient "github.com/rancher/wrangler/pkg/generated/controllers/discovery/v1"
"github.com/rancher/wrangler/pkg/merr"
"github.com/rancher/wrangler/pkg/objectset"
"github.com/sirupsen/logrus"
apps "k8s.io/api/apps/v1"
core "k8s.io/api/core/v1"
discovery "k8s.io/api/discovery/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
meta "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
Expand Down Expand Up @@ -48,9 +50,11 @@ const (
func (k *k3s) Register(ctx context.Context,
nodes coreclient.NodeController,
pods coreclient.PodController,
endpointslices discoveryclient.EndpointSliceController,
) error {
nodes.OnChange(ctx, controllerName, k.onChangeNode)
pods.OnChange(ctx, controllerName, k.onChangePod)
endpointslices.OnChange(ctx, controllerName, k.onChangeEndpointSlice)

if err := k.createServiceLBNamespace(ctx); err != nil {
return err
Expand Down Expand Up @@ -135,6 +139,22 @@ func (k *k3s) onChangeNode(key string, node *core.Node) (*core.Node, error) {
return node, nil
}

// onChangeEndpointSlice handles changes to EndpointSlices. This is used to ensure that LoadBalancer
// addresses only list Nodes with ready Pods, when their ExternalTrafficPolicy is set to Local.
func (k *k3s) onChangeEndpointSlice(key string, eps *discovery.EndpointSlice) (*discovery.EndpointSlice, error) {
if eps == nil {
return nil, nil
}

serviceName, ok := eps.Labels[discovery.LabelServiceName]
if !ok {
return eps, nil
}

k.workqueue.Add(eps.Namespace + "/" + serviceName)
return eps, nil
}

// runWorker dequeues Service changes from the work queue
// We run a lightweight work queue to handle service updates. We don't need the full overhead
// of a wrangler service controller and shared informer cache, but we do want to run changes
Expand Down Expand Up @@ -219,16 +239,37 @@ func (k *k3s) getDaemonSet(svc *core.Service) (*apps.DaemonSet, error) {
// getStatus returns a LoadBalancerStatus listing ingress IPs for all ready pods
// matching the selected service.
func (k *k3s) getStatus(svc *core.Service) (*core.LoadBalancerStatus, error) {
pods, err := k.podCache.List(k.LBNamespace, labels.SelectorFromSet(map[string]string{
var readyNodes map[string]bool

if servicehelper.RequestsOnlyLocalTraffic(svc) {
readyNodes = map[string]bool{}
eps, err := k.endpointsCache.List(svc.Namespace, labels.SelectorFromSet(labels.Set{
discovery.LabelServiceName: svc.Name,
}))
if err != nil {
return nil, err
}

for _, ep := range eps {
for _, endpoint := range ep.Endpoints {
isPod := endpoint.TargetRef != nil && endpoint.TargetRef.Kind == "Pod"
isReady := endpoint.Conditions.Ready != nil && *endpoint.Conditions.Ready
if isPod && isReady && endpoint.NodeName != nil {
readyNodes[*endpoint.NodeName] = true
}
}
}
}

pods, err := k.podCache.List(k.LBNamespace, labels.SelectorFromSet(labels.Set{
svcNameLabel: svc.Name,
svcNamespaceLabel: svc.Namespace,
}))

if err != nil {
return nil, err
}

expectedIPs, err := k.podIPs(pods, svc)
expectedIPs, err := k.podIPs(pods, svc, readyNodes)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -267,7 +308,7 @@ func (k *k3s) patchStatus(svc *core.Service, previousStatus, newStatus *core.Loa
// podIPs returns a list of IPs for Nodes hosting ServiceLB Pods.
// If at least one node has External IPs available, only external IPs are returned.
// If no nodes have External IPs set, the Internal IPs of all nodes running pods are returned.
func (k *k3s) podIPs(pods []*core.Pod, svc *core.Service) ([]string, error) {
func (k *k3s) podIPs(pods []*core.Pod, svc *core.Service, readyNodes map[string]bool) ([]string, error) {
// Go doesn't have sets so we stuff things into a map of bools and then get lists of keys
// to determine the unique set of IPs in use by pods.
extIPs := map[string]bool{}
Expand All @@ -280,6 +321,9 @@ func (k *k3s) podIPs(pods []*core.Pod, svc *core.Service) ([]string, error) {
if !Ready.IsTrue(pod) {
continue
}
if readyNodes != nil && !readyNodes[pod.Spec.NodeName] {
continue
}

node, err := k.nodeCache.Get(pod.Spec.NodeName)
if apierrors.IsNotFound(err) {
Expand Down Expand Up @@ -405,17 +449,27 @@ func (k *k3s) deleteDaemonSet(ctx context.Context, svc *core.Service) error {
func (k *k3s) newDaemonSet(svc *core.Service) (*apps.DaemonSet, error) {
name := generateName(svc)
oneInt := intstr.FromInt(1)

localTraffic := servicehelper.RequestsOnlyLocalTraffic(svc)
sourceRanges, err := servicehelper.GetLoadBalancerSourceRanges(svc)
if err != nil {
return nil, err
}

var sysctls []core.Sysctl
for _, ipFamily := range svc.Spec.IPFamilies {
switch ipFamily {
case core.IPv4Protocol:
sysctls = append(sysctls, core.Sysctl{Name: "net.ipv4.ip_forward", Value: "1"})
case core.IPv6Protocol:
sysctls = append(sysctls, core.Sysctl{Name: "net.ipv6.conf.all.forwarding", Value: "1"})
}
}

ds := &apps.DaemonSet{
ObjectMeta: meta.ObjectMeta{
Name: name,
Namespace: k.LBNamespace,
Labels: map[string]string{
Labels: labels.Set{
nodeSelectorLabel: "false",
svcNameLabel: svc.Name,
svcNamespaceLabel: svc.Namespace,
Expand All @@ -427,13 +481,13 @@ func (k *k3s) newDaemonSet(svc *core.Service) (*apps.DaemonSet, error) {
},
Spec: apps.DaemonSetSpec{
Selector: &meta.LabelSelector{
MatchLabels: map[string]string{
MatchLabels: labels.Set{
"app": name,
},
},
Template: core.PodTemplateSpec{
ObjectMeta: meta.ObjectMeta{
Labels: map[string]string{
Labels: labels.Set{
"app": name,
svcNameLabel: svc.Name,
svcNamespaceLabel: svc.Namespace,
Expand All @@ -442,6 +496,25 @@ func (k *k3s) newDaemonSet(svc *core.Service) (*apps.DaemonSet, error) {
Spec: core.PodSpec{
ServiceAccountName: "svclb",
AutomountServiceAccountToken: utilpointer.Bool(false),
SecurityContext: &core.PodSecurityContext{
Sysctls: sysctls,
},
Tolerations: []core.Toleration{
{
Key: "node-role.kubernetes.io/master",
Operator: "Exists",
Effect: "NoSchedule",
},
{
Key: "node-role.kubernetes.io/control-plane",
Operator: "Exists",
Effect: "NoSchedule",
},
{
Key: "CriticalAddonsOnly",
Operator: "Exists",
},
},
},
},
UpdateStrategy: apps.DaemonSetUpdateStrategy{
Expand All @@ -453,18 +526,6 @@ func (k *k3s) newDaemonSet(svc *core.Service) (*apps.DaemonSet, error) {
},
}

var sysctls []core.Sysctl
for _, ipFamily := range svc.Spec.IPFamilies {
switch ipFamily {
case core.IPv4Protocol:
sysctls = append(sysctls, core.Sysctl{Name: "net.ipv4.ip_forward", Value: "1"})
case core.IPv6Protocol:
sysctls = append(sysctls, core.Sysctl{Name: "net.ipv6.conf.all.forwarding", Value: "1"})
}
}

ds.Spec.Template.Spec.SecurityContext = &core.PodSecurityContext{Sysctls: sysctls}

for _, port := range svc.Spec.Ports {
portName := fmt.Sprintf("lb-%s-%d", strings.ToLower(string(port.Protocol)), port.Port)
container := core.Container{
Expand Down Expand Up @@ -492,14 +553,6 @@ func (k *k3s) newDaemonSet(svc *core.Service) (*apps.DaemonSet, error) {
Name: "DEST_PROTO",
Value: string(port.Protocol),
},
{
Name: "DEST_PORT",
Value: strconv.Itoa(int(port.Port)),
},
{
Name: "DEST_IPS",
Value: strings.Join(svc.Spec.ClusterIPs, " "),
},
},
SecurityContext: &core.SecurityContext{
Capabilities: &core.Capabilities{
Expand All @@ -510,31 +563,36 @@ func (k *k3s) newDaemonSet(svc *core.Service) (*apps.DaemonSet, error) {
},
}

ds.Spec.Template.Spec.Containers = append(ds.Spec.Template.Spec.Containers, container)
}

// Add toleration to noderole.kubernetes.io/master=*:NoSchedule
masterToleration := core.Toleration{
Key: "node-role.kubernetes.io/master",
Operator: "Exists",
Effect: "NoSchedule",
}
ds.Spec.Template.Spec.Tolerations = append(ds.Spec.Template.Spec.Tolerations, masterToleration)

// Add toleration to noderole.kubernetes.io/control-plane=*:NoSchedule
controlPlaneToleration := core.Toleration{
Key: "node-role.kubernetes.io/control-plane",
Operator: "Exists",
Effect: "NoSchedule",
}
ds.Spec.Template.Spec.Tolerations = append(ds.Spec.Template.Spec.Tolerations, controlPlaneToleration)
if localTraffic {
container.Env = append(container.Env,
core.EnvVar{
Name: "DEST_PORT",
Value: strconv.Itoa(int(port.NodePort)),
},
core.EnvVar{
Name: "DEST_IPS",
ValueFrom: &core.EnvVarSource{
FieldRef: &core.ObjectFieldSelector{
FieldPath: "status.hostIP",
},
},
},
)
} else {
container.Env = append(container.Env,
core.EnvVar{
Name: "DEST_PORT",
Value: strconv.Itoa(int(port.Port)),
},
core.EnvVar{
Name: "DEST_IPS",
Value: strings.Join(svc.Spec.ClusterIPs, " "),
},
)
}

// Add toleration to CriticalAddonsOnly
criticalAddonsOnlyToleration := core.Toleration{
Key: "CriticalAddonsOnly",
Operator: "Exists",
ds.Spec.Template.Spec.Containers = append(ds.Spec.Template.Spec.Containers, container)
}
ds.Spec.Template.Spec.Tolerations = append(ds.Spec.Template.Spec.Tolerations, criticalAddonsOnlyToleration)

// Add node selector only if label "svccontroller.k3s.cattle.io/enablelb" exists on the nodes
enableNodeSelector, err := k.nodeHasDaemonSetLabel()
Expand All @@ -551,6 +609,7 @@ func (k *k3s) newDaemonSet(svc *core.Service) (*apps.DaemonSet, error) {
}
ds.Labels[nodeSelectorLabel] = "true"
}

return ds, nil
}

Expand All @@ -563,7 +622,7 @@ func (k *k3s) updateDaemonSets() error {
return err
}

nodeSelector := labels.SelectorFromSet(map[string]string{nodeSelectorLabel: fmt.Sprintf("%t", !enableNodeSelector)})
nodeSelector := labels.SelectorFromSet(labels.Set{nodeSelectorLabel: fmt.Sprintf("%t", !enableNodeSelector)})
daemonsets, err := k.daemonsetCache.List(k.LBNamespace, nodeSelector)
if err != nil {
return err
Expand Down
Loading

0 comments on commit cc544c0

Please sign in to comment.