Skip to content

Commit

Permalink
Move servicelb into cloudprovider
Browse files Browse the repository at this point in the history
Uses existing cloudprovider loadbalancer service hook instead of running our own bespoke controller

Signed-off-by: Brad Davidson <[email protected]>
  • Loading branch information
brandond committed Sep 27, 2022
1 parent 53c268d commit 3c8f3e0
Show file tree
Hide file tree
Showing 14 changed files with 401 additions and 338 deletions.
17 changes: 16 additions & 1 deletion manifests/ccm.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -24,17 +24,19 @@ rules:
resources:
- nodes
verbs:
- '*'
- "*"
- apiGroups:
- ""
resources:
- nodes/status
- services/status
verbs:
- patch
- apiGroups:
- ""
resources:
- services
- pods
verbs:
- list
- patch
Expand Down Expand Up @@ -65,6 +67,19 @@ rules:
- list
- watch
- update
- apiGroups:
- ""
resources:
- namespaces
verbs:
- create
- get
- apiGroups:
- apps
resources:
- daemonsets
verbs:
- "*"
---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRoleBinding
Expand Down
7 changes: 3 additions & 4 deletions pkg/cli/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,8 +114,8 @@ func run(app *cli.Context, cfg *cmds.Server, leaderControllers server.CustomCont
serverConfig.ControlConfig.DataDir = cfg.DataDir
serverConfig.ControlConfig.KubeConfigOutput = cfg.KubeConfigOutput
serverConfig.ControlConfig.KubeConfigMode = cfg.KubeConfigMode
serverConfig.Rootless = cfg.Rootless
serverConfig.ServiceLBNamespace = cfg.ServiceLBNamespace
serverConfig.ControlConfig.Rootless = cfg.Rootless
serverConfig.ControlConfig.ServiceLBNamespace = cfg.ServiceLBNamespace
serverConfig.ControlConfig.SANs = cfg.TLSSan
serverConfig.ControlConfig.BindAddress = cfg.BindAddress
serverConfig.ControlConfig.SupervisorPort = cfg.SupervisorPort
Expand Down Expand Up @@ -359,7 +359,7 @@ func run(app *cli.Context, cfg *cmds.Server, leaderControllers server.CustomCont
}
}
if serverConfig.ControlConfig.Skips["servicelb"] {
serverConfig.DisableServiceLB = true
serverConfig.ControlConfig.DisableServiceLB = true
}

if serverConfig.ControlConfig.DisableCCM {
Expand Down Expand Up @@ -479,7 +479,6 @@ func run(app *cli.Context, cfg *cmds.Server, leaderControllers server.CustomCont
agentConfig.ServerURL = url
agentConfig.Token = token
agentConfig.DisableLoadBalancer = !serverConfig.ControlConfig.DisableAPIServer
agentConfig.DisableServiceLB = serverConfig.DisableServiceLB
agentConfig.ETCDAgent = serverConfig.ControlConfig.DisableAPIServer
agentConfig.ClusterReset = serverConfig.ControlConfig.ClusterReset
agentConfig.Rootless = cfg.Rootless
Expand Down
106 changes: 91 additions & 15 deletions pkg/cloudprovider/cloudprovider.go
Original file line number Diff line number Diff line change
@@ -1,47 +1,123 @@
package cloudprovider

import (
"encoding/json"
"io"
"io/ioutil"

"github.com/k3s-io/k3s/pkg/util"
"github.com/k3s-io/k3s/pkg/version"
"k8s.io/client-go/informers"
informercorev1 "k8s.io/client-go/informers/core/v1"
"k8s.io/client-go/tools/cache"
"github.com/rancher/wrangler/pkg/apply"
"github.com/rancher/wrangler/pkg/generated/controllers/apps"
appsclient "github.com/rancher/wrangler/pkg/generated/controllers/apps/v1"
"github.com/rancher/wrangler/pkg/generated/controllers/core"
coreclient "github.com/rancher/wrangler/pkg/generated/controllers/core/v1"
"github.com/rancher/wrangler/pkg/generic"
"github.com/rancher/wrangler/pkg/start"
"github.com/sirupsen/logrus"
meta "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/record"
cloudprovider "k8s.io/cloud-provider"
)

// Config describes externally-configurable cloud provider configuration.
// This is normally unmarshalled from a JSON config file.
type Config struct {
LBEnabled bool `json:"lbEnabled"`
LBImage string `json:"lbImage"`
LBNamespace string `json:"lbNamespace"`
Rootless bool `json:"rootless"`
}

type k3s struct {
nodeInformer informercorev1.NodeInformer
nodeInformerHasSynced cache.InformerSynced
Config

client kubernetes.Interface
recorder record.EventRecorder

processor apply.Apply
daemonsetCache appsclient.DaemonSetCache
nodeCache coreclient.NodeCache
podCache coreclient.PodCache
serviceCache coreclient.ServiceCache
}

var _ cloudprovider.Interface = &k3s{}
var _ cloudprovider.InformerUser = &k3s{}

func init() {
cloudprovider.RegisterCloudProvider(version.Program, func(config io.Reader) (cloudprovider.Interface, error) {
return &k3s{}, nil
var err error
k := k3s{
Config: Config{
LBEnabled: true,
LBImage: DefaultLBImage,
LBNamespace: DefaultLBNS,
},
}

if config != nil {
var bytes []byte
bytes, err = ioutil.ReadAll(config)
if err == nil {
err = json.Unmarshal(bytes, &k.Config)
}
}

return &k, err
})
}

func (k *k3s) Initialize(clientBuilder cloudprovider.ControllerClientBuilder, stop <-chan struct{}) {
}
ctx, _ := wait.ContextForChannel(stop)
config := clientBuilder.ConfigOrDie(controllerName)
k.client = kubernetes.NewForConfigOrDie(config)
k.recorder = util.BuildControllerEventRecorder(k.client, controllerName, meta.NamespaceAll)

if k.LBEnabled {
// Wrangler controller and caches are only needed if the load balancer controller is enabled.
coreFactory := core.NewFactoryFromConfigOrDie(config)
k.nodeCache = coreFactory.Core().V1().Node().Cache()

func (k *k3s) SetInformers(informerFactory informers.SharedInformerFactory) {
k.nodeInformer = informerFactory.Core().V1().Nodes()
k.nodeInformerHasSynced = k.nodeInformer.Informer().HasSynced
lbCoreFactory := core.NewFactoryFromConfigWithOptionsOrDie(config, &generic.FactoryOptions{Namespace: k.LBNamespace})
lbAppsFactory := apps.NewFactoryFromConfigWithOptionsOrDie(config, &generic.FactoryOptions{Namespace: k.LBNamespace})

processor, err := apply.NewForConfig(config)
if err != nil {
logrus.Fatalf("Failed to create apply processor for %s: %v", controllerName, err)
}
k.processor = processor.WithDynamicLookup().WithCacheTypes(lbAppsFactory.Apps().V1().DaemonSet())
k.daemonsetCache = lbAppsFactory.Apps().V1().DaemonSet().Cache()
k.podCache = lbCoreFactory.Core().V1().Pod().Cache()
k.serviceCache = coreFactory.Core().V1().Service().Cache()

if err := k.Register(ctx, coreFactory.Core().V1().Node(), lbCoreFactory.Core().V1().Pod()); err != nil {
logrus.Fatalf("Failed to register %s handlers: %v", controllerName, err)
}

if err := start.All(ctx, 1, coreFactory, lbCoreFactory, lbAppsFactory); err != nil {
logrus.Fatalf("Failed to start %s controllers: %v", controllerName, err)
}
} else {
// If load-balancer functionality has not been enabled, delete managed daemonsets.
// This uses the raw kubernetes client, as the controllers are not started when the load balancer controller is disabled.
if err := k.deleteAllDaemonsets(ctx); err != nil {
logrus.Fatalf("Failed to clean up %s daemonsets: %v", controllerName, err)
}
}
}

func (k *k3s) Instances() (cloudprovider.Instances, bool) {
return k, true
return nil, false
}

func (k *k3s) InstancesV2() (cloudprovider.InstancesV2, bool) {
return nil, false
return k, true
}

func (k *k3s) LoadBalancer() (cloudprovider.LoadBalancer, bool) {
return nil, false
return k, k.LBEnabled
}

func (k *k3s) Zones() (cloudprovider.Zones, bool) {
Expand All @@ -61,5 +137,5 @@ func (k *k3s) ProviderName() string {
}

func (k *k3s) HasClusterID() bool {
return true
return false
}
74 changes: 22 additions & 52 deletions pkg/cloudprovider/instances.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,13 @@ package cloudprovider

import (
"context"
"fmt"
"strings"

"github.com/k3s-io/k3s/pkg/version"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/types"
v1 "k8s.io/api/core/v1"
cloudprovider "k8s.io/cloud-provider"
)

Expand All @@ -19,59 +18,28 @@ var (
HostnameKey = version.Program + ".io/hostname"
)

func (k *k3s) AddSSHKeyToAllInstances(ctx context.Context, user string, keyData []byte) error {
return cloudprovider.NotImplemented
}

func (k *k3s) CurrentNodeName(ctx context.Context, hostname string) (types.NodeName, error) {
return types.NodeName(hostname), nil
}
var _ cloudprovider.InstancesV2 = &k3s{}

func (k *k3s) InstanceExistsByProviderID(ctx context.Context, providerID string) (bool, error) {
// InstanceExists returns true if the instance for the given node exists according to the cloud provider.
// K3s nodes always exist.
func (k *k3s) InstanceExists(ctx context.Context, node *corev1.Node) (bool, error) {
return true, nil
}

func (k *k3s) InstanceID(ctx context.Context, nodeName types.NodeName) (string, error) {
if k.nodeInformerHasSynced == nil || !k.nodeInformerHasSynced() {
return "", errors.New("Node informer has not synced yet")
}

node, err := k.nodeInformer.Lister().Get(string(nodeName))
if err != nil {
return "", fmt.Errorf("failed to get node %s: %w", nodeName, err)
}
if (node.Annotations[InternalIPKey] == "") && (node.Labels[InternalIPKey] == "") {
return string(nodeName), errors.New("address annotations not yet set")
}
return string(nodeName), nil
}

func (k *k3s) InstanceShutdownByProviderID(ctx context.Context, providerID string) (bool, error) {
return true, cloudprovider.NotImplemented
// InstanceShutdown returns true if the instance is shutdown according to the cloud provider.
// K3s nodes are never shutdown.
func (k *k3s) InstanceShutdown(ctx context.Context, node *v1.Node) (bool, error) {
return false, nil
}

func (k *k3s) InstanceType(ctx context.Context, name types.NodeName) (string, error) {
_, err := k.InstanceID(ctx, name)
if err != nil {
return "", err
// InstanceMetadata returns the instance's metadata.
func (k *k3s) InstanceMetadata(ctx context.Context, node *v1.Node) (*cloudprovider.InstanceMetadata, error) {
if (node.Annotations[InternalIPKey] == "") && (node.Labels[InternalIPKey] == "") {
return nil, errors.New("address annotations not yet set")
}
return version.Program, nil
}

func (k *k3s) InstanceTypeByProviderID(ctx context.Context, providerID string) (string, error) {
return "", cloudprovider.NotImplemented
}

func (k *k3s) NodeAddresses(ctx context.Context, name types.NodeName) ([]corev1.NodeAddress, error) {
addresses := []corev1.NodeAddress{}
if k.nodeInformerHasSynced == nil || !k.nodeInformerHasSynced() {
return nil, errors.New("Node informer has not synced yet")
}

node, err := k.nodeInformer.Lister().Get(string(name))
if err != nil {
return nil, fmt.Errorf("Failed to find node %s: %v", name, err)
}
// check internal address
if address := node.Annotations[InternalIPKey]; address != "" {
for _, v := range strings.Split(address, ",") {
Expand All @@ -80,7 +48,7 @@ func (k *k3s) NodeAddresses(ctx context.Context, name types.NodeName) ([]corev1.
} else if address = node.Labels[InternalIPKey]; address != "" {
addresses = append(addresses, corev1.NodeAddress{Type: corev1.NodeInternalIP, Address: address})
} else {
logrus.Infof("Couldn't find node internal ip annotation or label on node %s", name)
logrus.Infof("Couldn't find node internal ip annotation or label on node %s", node.Name)
}

// check external address
Expand All @@ -98,12 +66,14 @@ func (k *k3s) NodeAddresses(ctx context.Context, name types.NodeName) ([]corev1.
} else if address = node.Labels[HostnameKey]; address != "" {
addresses = append(addresses, corev1.NodeAddress{Type: corev1.NodeHostName, Address: address})
} else {
logrus.Infof("Couldn't find node hostname annotation or label on node %s", name)
logrus.Infof("Couldn't find node hostname annotation or label on node %s", node.Name)
}

return addresses, nil
}

func (k *k3s) NodeAddressesByProviderID(ctx context.Context, providerID string) ([]corev1.NodeAddress, error) {
return nil, cloudprovider.NotImplemented
return &cloudprovider.InstanceMetadata{
ProviderID: version.Program,
InstanceType: version.Program,
NodeAddresses: addresses,
Zone: "",
Region: "",
}, nil
}
53 changes: 53 additions & 0 deletions pkg/cloudprovider/loadbalancer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
package cloudprovider

import (
"context"

corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
cloudprovider "k8s.io/cloud-provider"
)

var _ cloudprovider.LoadBalancer = &k3s{}

// GetLoadBalancer returns whether the specified load balancer exists, and if so, what its status is.
func (k *k3s) GetLoadBalancer(ctx context.Context, clusterName string, service *corev1.Service) (*corev1.LoadBalancerStatus, bool, error) {
if _, err := k.getDaemonSet(service); err != nil {
if apierrors.IsNotFound(err) {
return nil, false, nil
}
return nil, false, err
}

status, err := k.getStatus(service)
return status, true, err
}

// GetLoadBalancerName returns the name of the load balancer.
func (k *k3s) GetLoadBalancerName(ctx context.Context, clusterName string, service *corev1.Service) string {
return generateName(service)
}

// EnsureLoadBalancer creates a new load balancer 'name', or updates the existing one. Returns the status of the balancer.
// The node list is unused; see the comment on UpdateLoadBalancer for information on why.
// This is called when the Service is created or changes.
func (k *k3s) EnsureLoadBalancer(ctx context.Context, clusterName string, service *corev1.Service, nodes []*corev1.Node) (*corev1.LoadBalancerStatus, error) {
if err := k.deployDaemonSet(ctx, service); err != nil {
return nil, err
}
return nil, cloudprovider.ImplementedElsewhere
}

// UpdateLoadBalancer updates hosts under the specified load balancer.
// This is not used, as it filters node updates based on critera not compatible with how our DaemonSet selects
// nodes for inclusion. It also does not provide any opportunity to update the load balancer status.
// https://github.com/kubernetes/kubernetes/blob/v1.25.0/staging/src/k8s.io/cloud-provider/controllers/service/controller.go#L985-L993
func (k *k3s) UpdateLoadBalancer(ctx context.Context, clusterName string, service *corev1.Service, nodes []*corev1.Node) error {
return cloudprovider.ImplementedElsewhere
}

// EnsureLoadBalancerDeleted deletes the specified load balancer if it exists,
// returning nil if the load balancer specified either didn't exist or was successfully deleted.
func (k *k3s) EnsureLoadBalancerDeleted(ctx context.Context, clusterName string, service *corev1.Service) error {
return k.deleteDaemonSet(ctx, service)
}
Loading

0 comments on commit 3c8f3e0

Please sign in to comment.