Skip to content

Commit

Permalink
Honor Service ExternalTrafficPolicy
Browse files Browse the repository at this point in the history
Signed-off-by: Brad Davidson <[email protected]>
  • Loading branch information
brandond committed Jan 26, 2023
1 parent 8a39f2f commit 15f89bf
Show file tree
Hide file tree
Showing 4 changed files with 129 additions and 57 deletions.
8 changes: 8 additions & 0 deletions manifests/ccm.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,14 @@ rules:
- daemonsets
verbs:
- "*"
- apiGroups:
- "discovery.k8s.io"
resources:
- endpointslices
verbs:
- get
- list
- watch
---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRoleBinding
Expand Down
9 changes: 7 additions & 2 deletions pkg/cloudprovider/cloudprovider.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ import (
appsclient "github.com/rancher/wrangler/pkg/generated/controllers/apps/v1"
"github.com/rancher/wrangler/pkg/generated/controllers/core"
coreclient "github.com/rancher/wrangler/pkg/generated/controllers/core/v1"
"github.com/rancher/wrangler/pkg/generated/controllers/discovery"
discoveryclient "github.com/rancher/wrangler/pkg/generated/controllers/discovery/v1"
"github.com/rancher/wrangler/pkg/generic"
"github.com/rancher/wrangler/pkg/start"
"github.com/sirupsen/logrus"
Expand Down Expand Up @@ -41,6 +43,7 @@ type k3s struct {

processor apply.Apply
daemonsetCache appsclient.DaemonSetCache
endpointsCache discoveryclient.EndpointSliceCache
nodeCache coreclient.NodeCache
podCache coreclient.PodCache
workqueue workqueue.RateLimitingInterface
Expand Down Expand Up @@ -89,21 +92,23 @@ func (k *k3s) Initialize(clientBuilder cloudprovider.ControllerClientBuilder, st

lbCoreFactory := core.NewFactoryFromConfigWithOptionsOrDie(config, &generic.FactoryOptions{Namespace: k.LBNamespace})
lbAppsFactory := apps.NewFactoryFromConfigWithOptionsOrDie(config, &generic.FactoryOptions{Namespace: k.LBNamespace})
lbDiscFactory := discovery.NewFactoryFromConfigOrDie(config)

processor, err := apply.NewForConfig(config)
if err != nil {
logrus.Fatalf("Failed to create apply processor for %s: %v", controllerName, err)
}
k.processor = processor.WithDynamicLookup().WithCacheTypes(lbAppsFactory.Apps().V1().DaemonSet())
k.daemonsetCache = lbAppsFactory.Apps().V1().DaemonSet().Cache()
k.endpointsCache = lbDiscFactory.Discovery().V1().EndpointSlice().Cache()
k.podCache = lbCoreFactory.Core().V1().Pod().Cache()
k.workqueue = workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter())

if err := k.Register(ctx, coreFactory.Core().V1().Node(), lbCoreFactory.Core().V1().Pod()); err != nil {
if err := k.Register(ctx, coreFactory.Core().V1().Node(), lbCoreFactory.Core().V1().Pod(), lbDiscFactory.Discovery().V1().EndpointSlice()); err != nil {
logrus.Fatalf("Failed to register %s handlers: %v", controllerName, err)
}

if err := start.All(ctx, 1, coreFactory, lbCoreFactory, lbAppsFactory); err != nil {
if err := start.All(ctx, 1, coreFactory, lbCoreFactory, lbAppsFactory, lbDiscFactory); err != nil {
logrus.Fatalf("Failed to start %s controllers: %v", controllerName, err)
}
} else {
Expand Down
163 changes: 111 additions & 52 deletions pkg/cloudprovider/servicelb.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,13 @@ import (
"github.com/k3s-io/k3s/pkg/version"
"github.com/rancher/wrangler/pkg/condition"
coreclient "github.com/rancher/wrangler/pkg/generated/controllers/core/v1"
discoveryclient "github.com/rancher/wrangler/pkg/generated/controllers/discovery/v1"
"github.com/rancher/wrangler/pkg/merr"
"github.com/rancher/wrangler/pkg/objectset"
"github.com/sirupsen/logrus"
apps "k8s.io/api/apps/v1"
core "k8s.io/api/core/v1"
discovery "k8s.io/api/discovery/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
meta "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
Expand Down Expand Up @@ -48,9 +50,11 @@ const (
func (k *k3s) Register(ctx context.Context,
nodes coreclient.NodeController,
pods coreclient.PodController,
endpointslices discoveryclient.EndpointSliceController,
) error {
nodes.OnChange(ctx, controllerName, k.onChangeNode)
pods.OnChange(ctx, controllerName, k.onChangePod)
endpointslices.OnChange(ctx, controllerName, k.onChangeEndpointSlice)

if err := k.createServiceLBNamespace(ctx); err != nil {
return err
Expand Down Expand Up @@ -135,6 +139,22 @@ func (k *k3s) onChangeNode(key string, node *core.Node) (*core.Node, error) {
return node, nil
}

// onChangeEndpointSlice handles changes to EndpointSlices. This is used to ensure that LoadBalancer
// addresses only list Nodes with ready Pods, when their ExternalTrafficPolicy is set to Local.
func (k *k3s) onChangeEndpointSlice(key string, eps *discovery.EndpointSlice) (*discovery.EndpointSlice, error) {
if eps == nil {
return nil, nil
}

serviceName, ok := eps.Labels[discovery.LabelServiceName]
if !ok {
return eps, nil
}

k.workqueue.Add(eps.Namespace + "/" + serviceName)
return eps, nil
}

// runWorker dequeues Service changes from the work queue
// We run a lightweight work queue to handle service updates. We don't need the full overhead
// of a wrangler service controller and shared informer cache, but we do want to run changes
Expand Down Expand Up @@ -219,16 +239,37 @@ func (k *k3s) getDaemonSet(svc *core.Service) (*apps.DaemonSet, error) {
// getStatus returns a LoadBalancerStatus listing ingress IPs for all ready pods
// matching the selected service.
func (k *k3s) getStatus(svc *core.Service) (*core.LoadBalancerStatus, error) {
pods, err := k.podCache.List(k.LBNamespace, labels.SelectorFromSet(map[string]string{
var readyNodes map[string]bool

if servicehelper.RequestsOnlyLocalTraffic(svc) {
readyNodes = map[string]bool{}
eps, err := k.endpointsCache.List(svc.Namespace, labels.SelectorFromSet(labels.Set{
discovery.LabelServiceName: svc.Name,
}))
if err != nil {
return nil, err
}

for _, ep := range eps {
for _, endpoint := range ep.Endpoints {
isPod := endpoint.TargetRef != nil && endpoint.TargetRef.Kind == "Pod"
isReady := endpoint.Conditions.Ready != nil && *endpoint.Conditions.Ready
if isPod && isReady && endpoint.NodeName != nil {
readyNodes[*endpoint.NodeName] = true
}
}
}
}

pods, err := k.podCache.List(k.LBNamespace, labels.SelectorFromSet(labels.Set{
svcNameLabel: svc.Name,
svcNamespaceLabel: svc.Namespace,
}))

if err != nil {
return nil, err
}

expectedIPs, err := k.podIPs(pods, svc)
expectedIPs, err := k.podIPs(pods, svc, readyNodes)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -267,7 +308,7 @@ func (k *k3s) patchStatus(svc *core.Service, previousStatus, newStatus *core.Loa
// podIPs returns a list of IPs for Nodes hosting ServiceLB Pods.
// If at least one node has External IPs available, only external IPs are returned.
// If no nodes have External IPs set, the Internal IPs of all nodes running pods are returned.
func (k *k3s) podIPs(pods []*core.Pod, svc *core.Service) ([]string, error) {
func (k *k3s) podIPs(pods []*core.Pod, svc *core.Service, readyNodes map[string]bool) ([]string, error) {
// Go doesn't have sets so we stuff things into a map of bools and then get lists of keys
// to determine the unique set of IPs in use by pods.
extIPs := map[string]bool{}
Expand All @@ -280,6 +321,9 @@ func (k *k3s) podIPs(pods []*core.Pod, svc *core.Service) ([]string, error) {
if !Ready.IsTrue(pod) {
continue
}
if readyNodes != nil && !readyNodes[pod.Spec.NodeName] {
continue
}

node, err := k.nodeCache.Get(pod.Spec.NodeName)
if apierrors.IsNotFound(err) {
Expand Down Expand Up @@ -405,17 +449,27 @@ func (k *k3s) deleteDaemonSet(ctx context.Context, svc *core.Service) error {
func (k *k3s) newDaemonSet(svc *core.Service) (*apps.DaemonSet, error) {
name := generateName(svc)
oneInt := intstr.FromInt(1)

localTraffic := servicehelper.RequestsOnlyLocalTraffic(svc)
sourceRanges, err := servicehelper.GetLoadBalancerSourceRanges(svc)
if err != nil {
return nil, err
}

var sysctls []core.Sysctl
for _, ipFamily := range svc.Spec.IPFamilies {
switch ipFamily {
case core.IPv4Protocol:
sysctls = append(sysctls, core.Sysctl{Name: "net.ipv4.ip_forward", Value: "1"})
case core.IPv6Protocol:
sysctls = append(sysctls, core.Sysctl{Name: "net.ipv6.conf.all.forwarding", Value: "1"})
}
}

ds := &apps.DaemonSet{
ObjectMeta: meta.ObjectMeta{
Name: name,
Namespace: k.LBNamespace,
Labels: map[string]string{
Labels: labels.Set{
nodeSelectorLabel: "false",
svcNameLabel: svc.Name,
svcNamespaceLabel: svc.Namespace,
Expand All @@ -427,13 +481,13 @@ func (k *k3s) newDaemonSet(svc *core.Service) (*apps.DaemonSet, error) {
},
Spec: apps.DaemonSetSpec{
Selector: &meta.LabelSelector{
MatchLabels: map[string]string{
MatchLabels: labels.Set{
"app": name,
},
},
Template: core.PodTemplateSpec{
ObjectMeta: meta.ObjectMeta{
Labels: map[string]string{
Labels: labels.Set{
"app": name,
svcNameLabel: svc.Name,
svcNamespaceLabel: svc.Namespace,
Expand All @@ -442,6 +496,25 @@ func (k *k3s) newDaemonSet(svc *core.Service) (*apps.DaemonSet, error) {
Spec: core.PodSpec{
ServiceAccountName: "svclb",
AutomountServiceAccountToken: utilpointer.Bool(false),
SecurityContext: &core.PodSecurityContext{
Sysctls: sysctls,
},
Tolerations: []core.Toleration{
{
Key: "node-role.kubernetes.io/master",
Operator: "Exists",
Effect: "NoSchedule",
},
{
Key: "node-role.kubernetes.io/control-plane",
Operator: "Exists",
Effect: "NoSchedule",
},
{
Key: "CriticalAddonsOnly",
Operator: "Exists",
},
},
},
},
UpdateStrategy: apps.DaemonSetUpdateStrategy{
Expand All @@ -453,18 +526,6 @@ func (k *k3s) newDaemonSet(svc *core.Service) (*apps.DaemonSet, error) {
},
}

var sysctls []core.Sysctl
for _, ipFamily := range svc.Spec.IPFamilies {
switch ipFamily {
case core.IPv4Protocol:
sysctls = append(sysctls, core.Sysctl{Name: "net.ipv4.ip_forward", Value: "1"})
case core.IPv6Protocol:
sysctls = append(sysctls, core.Sysctl{Name: "net.ipv6.conf.all.forwarding", Value: "1"})
}
}

ds.Spec.Template.Spec.SecurityContext = &core.PodSecurityContext{Sysctls: sysctls}

for _, port := range svc.Spec.Ports {
portName := fmt.Sprintf("lb-%s-%d", strings.ToLower(string(port.Protocol)), port.Port)
container := core.Container{
Expand Down Expand Up @@ -492,14 +553,6 @@ func (k *k3s) newDaemonSet(svc *core.Service) (*apps.DaemonSet, error) {
Name: "DEST_PROTO",
Value: string(port.Protocol),
},
{
Name: "DEST_PORT",
Value: strconv.Itoa(int(port.Port)),
},
{
Name: "DEST_IPS",
Value: strings.Join(svc.Spec.ClusterIPs, " "),
},
},
SecurityContext: &core.SecurityContext{
Capabilities: &core.Capabilities{
Expand All @@ -510,31 +563,36 @@ func (k *k3s) newDaemonSet(svc *core.Service) (*apps.DaemonSet, error) {
},
}

ds.Spec.Template.Spec.Containers = append(ds.Spec.Template.Spec.Containers, container)
}

// Add toleration to noderole.kubernetes.io/master=*:NoSchedule
masterToleration := core.Toleration{
Key: "node-role.kubernetes.io/master",
Operator: "Exists",
Effect: "NoSchedule",
}
ds.Spec.Template.Spec.Tolerations = append(ds.Spec.Template.Spec.Tolerations, masterToleration)

// Add toleration to noderole.kubernetes.io/control-plane=*:NoSchedule
controlPlaneToleration := core.Toleration{
Key: "node-role.kubernetes.io/control-plane",
Operator: "Exists",
Effect: "NoSchedule",
}
ds.Spec.Template.Spec.Tolerations = append(ds.Spec.Template.Spec.Tolerations, controlPlaneToleration)
if localTraffic {
container.Env = append(container.Env,
core.EnvVar{
Name: "DEST_PORT",
Value: strconv.Itoa(int(port.NodePort)),
},
core.EnvVar{
Name: "DEST_IPS",
ValueFrom: &core.EnvVarSource{
FieldRef: &core.ObjectFieldSelector{
FieldPath: "status.hostIP",
},
},
},
)
} else {
container.Env = append(container.Env,
core.EnvVar{
Name: "DEST_PORT",
Value: strconv.Itoa(int(port.Port)),
},
core.EnvVar{
Name: "DEST_IPS",
Value: strings.Join(svc.Spec.ClusterIPs, " "),
},
)
}

// Add toleration to CriticalAddonsOnly
criticalAddonsOnlyToleration := core.Toleration{
Key: "CriticalAddonsOnly",
Operator: "Exists",
ds.Spec.Template.Spec.Containers = append(ds.Spec.Template.Spec.Containers, container)
}
ds.Spec.Template.Spec.Tolerations = append(ds.Spec.Template.Spec.Tolerations, criticalAddonsOnlyToleration)

// Add node selector only if label "svccontroller.k3s.cattle.io/enablelb" exists on the nodes
enableNodeSelector, err := k.nodeHasDaemonSetLabel()
Expand All @@ -551,6 +609,7 @@ func (k *k3s) newDaemonSet(svc *core.Service) (*apps.DaemonSet, error) {
}
ds.Labels[nodeSelectorLabel] = "true"
}

return ds, nil
}

Expand All @@ -563,7 +622,7 @@ func (k *k3s) updateDaemonSets() error {
return err
}

nodeSelector := labels.SelectorFromSet(map[string]string{nodeSelectorLabel: fmt.Sprintf("%t", !enableNodeSelector)})
nodeSelector := labels.SelectorFromSet(labels.Set{nodeSelectorLabel: fmt.Sprintf("%t", !enableNodeSelector)})
daemonsets, err := k.daemonsetCache.List(k.LBNamespace, nodeSelector)
if err != nil {
return err
Expand Down
6 changes: 3 additions & 3 deletions pkg/daemons/control/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -371,9 +371,9 @@ func cloudControllerManager(ctx context.Context, cfg *config.Control) error {
func checkForCloudControllerPrivileges(ctx context.Context, runtime *config.ControlRuntime, timeout time.Duration) error {
return util.WaitForRBACReady(ctx, runtime.KubeConfigAdmin, timeout, authorizationv1.ResourceAttributes{
Namespace: metav1.NamespaceSystem,
Verb: "*",
Resource: "daemonsets",
Group: "apps",
Verb: "watch",
Resource: "endpointslices",
Group: "discovery.k8s.io",
}, version.Program+"-cloud-controller-manager")
}

Expand Down

0 comments on commit 15f89bf

Please sign in to comment.