Skip to content

Commit

Permalink
Use downward API to pass current spec.nodeName to pod
Browse files Browse the repository at this point in the history
The podInformerFactory uses filter key spec.nodeName to filter the pods
that it should monitor. Up until now, this filter was set to the value
of HOSTNAME. However, this is not reliable, as spec.nodeName can be
overridden in kubernetes with --hostname-override and thus HOSTNAME and
spec.nodeName do not necessarily always match. Instead, rely on a new
custom environment variable NODENAME which is populated by the downward
API.

Signed-off-by: Andreas Karis <[email protected]>
  • Loading branch information
andreaskaris committed Mar 16, 2023
1 parent 87e95f1 commit 888e459
Show file tree
Hide file tree
Showing 6 changed files with 123 additions and 25 deletions.
16 changes: 4 additions & 12 deletions cmd/controlloop/controlloop.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,6 @@ import (

gocron "github.com/go-co-op/gocron"
corev1 "k8s.io/api/core/v1"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
v1coreinformerfactory "k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/scheme"
typedcorev1 "k8s.io/client-go/kubernetes/typed/core/v1"
Expand Down Expand Up @@ -132,15 +129,10 @@ func newPodController(stopChannel chan struct{}) (*controlloop.PodController, er
const noResyncPeriod = 0
ipPoolInformerFactory := wbinformers.NewSharedInformerFactory(wbClientSet, noResyncPeriod)
netAttachDefInformerFactory := nadinformers.NewSharedInformerFactory(nadK8sClientSet, noResyncPeriod)
podInformerFactory := v1coreinformerfactory.NewSharedInformerFactoryWithOptions(
k8sClientSet, noResyncPeriod, v1coreinformerfactory.WithTweakListOptions(
func(options *v1.ListOptions) {
const (
filterKey = "spec.nodeName"
hostnameEnvVariable = "HOSTNAME"
)
options.FieldSelector = fields.OneTermEqualSelector(filterKey, os.Getenv(hostnameEnvVariable)).String()
}))
podInformerFactory, err := controlloop.PodInformerFactory(k8sClientSet)
if err != nil {
return nil, err
}

controller := controlloop.NewPodController(
k8sClientSet,
Expand Down
10 changes: 10 additions & 0 deletions doc/crds/daemonset-install.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,11 @@ rules:
verbs:
- list
- watch
- apiGroups: [""]
resources:
- nodes
verbs:
- get
- apiGroups: ["k8s.cni.cncf.io"]
resources:
- network-attachment-definitions
Expand Down Expand Up @@ -103,6 +108,11 @@ spec:
/ip-control-loop -log-level debug
image: ghcr.io/k8snetworkplumbingwg/whereabouts:latest-amd64
env:
- name: NODENAME
valueFrom:
fieldRef:
apiVersion: v1
fieldPath: spec.nodeName
- name: WHEREABOUTS_NAMESPACE
valueFrom:
fieldRef:
Expand Down
9 changes: 6 additions & 3 deletions pkg/controlloop/dummy_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,11 @@ package controlloop

import (
"context"
kubeClient "github.com/k8snetworkplumbingwg/whereabouts/pkg/storage/kubernetes"
"net"

kubeClient "github.com/k8snetworkplumbingwg/whereabouts/pkg/storage/kubernetes"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
v1coreinformerfactory "k8s.io/client-go/informers"
k8sclient "k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/record"
Expand Down Expand Up @@ -41,7 +41,10 @@ func newDummyPodController(
const noResyncPeriod = 0
netAttachDefInformerFactory := nadinformers.NewSharedInformerFactory(nadClient, noResyncPeriod)
wbInformerFactory := wbinformers.NewSharedInformerFactory(wbClient, noResyncPeriod)
podInformerFactory := v1coreinformerfactory.NewSharedInformerFactory(k8sClient, noResyncPeriod)
podInformerFactory, err := PodInformerFactory(k8sClient)
if err != nil {
return nil, err
}

podController := newPodController(
k8sClient,
Expand Down
13 changes: 12 additions & 1 deletion pkg/controlloop/entity_generators.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,13 +50,24 @@ func dummyNonWhereaboutsIPAMNetSpec(networkName string) string {
}`, networkName)
}

func podSpec(name string, namespace string, networks ...string) *v1.Pod {
func nodeSpec(name string) *v1.Node {
return &v1.Node{
ObjectMeta: metav1.ObjectMeta{
Name: name,
},
}
}

func podSpec(name string, namespace string, nodeName string, networks ...string) *v1.Pod {
return &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: name,
Namespace: namespace,
Annotations: podNetworkSelectionElements(networks...),
},
Spec: v1.PodSpec{
NodeName: nodeName,
},
}
}

Expand Down
28 changes: 27 additions & 1 deletion pkg/controlloop/pod.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,17 @@ import (
"context"
"encoding/json"
"fmt"
"k8s.io/client-go/kubernetes"
"net"
"os"
"strconv"
"strings"
"time"

"k8s.io/client-go/kubernetes"

v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/util/wait"
v1coreinformerfactory "k8s.io/client-go/informers"
v1corelisters "k8s.io/client-go/listers/core/v1"
Expand All @@ -22,6 +25,7 @@ import (
nadv1 "github.com/k8snetworkplumbingwg/network-attachment-definition-client/pkg/apis/k8s.cni.cncf.io/v1"
nadinformers "github.com/k8snetworkplumbingwg/network-attachment-definition-client/pkg/client/informers/externalversions"
nadlister "github.com/k8snetworkplumbingwg/network-attachment-definition-client/pkg/client/listers/k8s.cni.cncf.io/v1"
"github.com/pkg/errors"

"github.com/k8snetworkplumbingwg/whereabouts/pkg/allocate"
whereaboutsv1alpha1 "github.com/k8snetworkplumbingwg/whereabouts/pkg/api/whereabouts.cni.cncf.io/v1alpha1"
Expand All @@ -47,6 +51,12 @@ const (
addressGarbageCollectionFailed = "IPAddressGarbageCollectionFailed"
)

const (
podControllerFilterKey = "spec.nodeName"
podControllerNodeNameEnvVariable = "NODENAME"
noResyncPeriod = 0
)

type garbageCollector func(ctx context.Context, mode int, ipamConf types.IPAMConfig, client *wbclient.KubernetesIPAM) ([]net.IPNet, error)

type PodController struct {
Expand All @@ -73,6 +83,22 @@ func NewPodController(k8sCoreClient kubernetes.Interface, wbClient wbclientset.I
return newPodController(k8sCoreClient, wbClient, k8sCoreInformerFactory, wbSharedInformerFactory, netAttachDefInformerFactory, broadcaster, recorder, wbclient.IPManagement)
}

// PodInformerFactory is a wrapper around NewSharedInformerFactoryWithOptions. Before returning the informer, it will
// extract the node name from environment variable "NODENAME". It will then try to look up the node with the given name.
// On success, it will create an informer that filters all pods with spec.nodeName == <value of env NODENAME>.
func PodInformerFactory(k8sClientSet kubernetes.Interface) (v1coreinformerfactory.SharedInformerFactory, error) {
nodeName := os.Getenv(podControllerNodeNameEnvVariable)
logging.Debugf("Filtering pods with filter key '%s' and filter value '%s'", podControllerFilterKey, nodeName)
if _, err := k8sClientSet.CoreV1().Nodes().Get(context.TODO(), nodeName, metav1.GetOptions{}); err != nil {
return nil, errors.Wrap(err, fmt.Sprintf("Could not find node with node name '%s'.", nodeName))
}
return v1coreinformerfactory.NewSharedInformerFactoryWithOptions(
k8sClientSet, noResyncPeriod, v1coreinformerfactory.WithTweakListOptions(
func(options *metav1.ListOptions) {
options.FieldSelector = fields.OneTermEqualSelector(podControllerFilterKey, nodeName).String()
})), nil
}

func newPodController(k8sCoreClient kubernetes.Interface, wbClient wbclientset.Interface, k8sCoreInformerFactory v1coreinformerfactory.SharedInformerFactory, wbSharedInformerFactory wbinformers.SharedInformerFactory, netAttachDefInformerFactory nadinformers.SharedInformerFactory, broadcaster record.EventBroadcaster, recorder record.EventRecorder, cleanupFunc garbageCollector) *PodController {
k8sPodFilteredInformer := k8sCoreInformerFactory.Core().V1().Pods()
ipPoolInformer := wbSharedInformerFactory.Whereabouts().V1alpha1().IPPools()
Expand Down
72 changes: 64 additions & 8 deletions pkg/controlloop/pod_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
. "github.com/onsi/gomega"

v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
k8sclient "k8s.io/client-go/kubernetes"
Expand Down Expand Up @@ -56,20 +57,61 @@ var _ = Describe("IPControlLoop", func() {
Expect(os.RemoveAll(cniConfigDir)).To(Succeed())
})

Context("a running pod", func() {
Context("a running pod on a node", func() {
const (
networkName = "meganet"
podName = "tiny-winy-pod"
nodeName = "hypernode"
)

var (
k8sClient k8sclient.Interface
pod *v1.Pod
k8sClient k8sclient.Interface
pod *v1.Pod
node *v1.Node
dummyPodController *dummyPodController
podControllerError error
)

BeforeEach(func() {
pod = podSpec(podName, namespace, networkName)
k8sClient = fakek8sclient.NewSimpleClientset(pod)
pod = podSpec(podName, namespace, nodeName, networkName)
node = nodeSpec(nodeName)
k8sClient = fakek8sclient.NewSimpleClientset(pod, node)
os.Setenv("NODENAME", nodeName)
})

When("NODENAME is set to an invalid value", func() {
var (
wbClient wbclient.Interface
eventRecorder *record.FakeRecorder
netAttachDefClient nadclient.Interface
stopChannel chan struct{}
)

BeforeEach(func() {
os.Setenv("NODENAME", "invalid-node-name")

stopChannel = make(chan struct{})

wbClient = fakewbclient.NewSimpleClientset()
var err error
netAttachDefClient, err = newFakeNetAttachDefClient(namespace, netAttachDef(networkName, namespace, dummyNonWhereaboutsIPAMNetSpec(networkName)))
Expect(err).NotTo(HaveOccurred())

const maxEvents = 1
eventRecorder = record.NewFakeRecorder(maxEvents)
})

It("should fail", func() {
_, podControllerError = newDummyPodController(k8sClient, wbClient, netAttachDefClient, stopChannel, cniConfigDir, eventRecorder)
Expect(errors.IsNotFound(podControllerError)).Should(BeTrue())
})

AfterEach(func() {
if podControllerError != nil {
return
}
stopChannel <- struct{}{}
})
})

Context("IPPool featuring an allocation for the pod", func() {
Expand Down Expand Up @@ -98,7 +140,10 @@ var _ = Describe("IPControlLoop", func() {
const maxEvents = 10
stopChannel = make(chan struct{})
eventRecorder = record.NewFakeRecorder(maxEvents)
Expect(newDummyPodController(k8sClient, wbClient, netAttachDefClient, stopChannel, cniConfigDir, eventRecorder)).NotTo(BeNil())

dummyPodController, podControllerError = newDummyPodController(k8sClient, wbClient, netAttachDefClient, stopChannel, cniConfigDir, eventRecorder)
Expect(podControllerError).NotTo(HaveOccurred())
Expect(dummyPodController).NotTo(BeNil())

// assure the pool features an allocated address
ipPool, err := wbClient.WhereaboutsV1alpha1().IPPools(dummyNetworkPool.GetNamespace()).Get(context.TODO(), dummyNetworkPool.GetName(), metav1.GetOptions{})
Expand All @@ -107,6 +152,9 @@ var _ = Describe("IPControlLoop", func() {
})

AfterEach(func() {
if podControllerError != nil {
return
}
stopChannel <- struct{}{}
})

Expand Down Expand Up @@ -145,7 +193,9 @@ var _ = Describe("IPControlLoop", func() {

const maxEvents = 10
eventRecorder = record.NewFakeRecorder(maxEvents)
Expect(newDummyPodController(k8sClient, wbClient, netAttachDefClient, stopChannel, cniConfigDir, eventRecorder)).NotTo(BeNil())
dummyPodController, podControllerError = newDummyPodController(k8sClient, wbClient, netAttachDefClient, stopChannel, cniConfigDir, eventRecorder)
Expect(podControllerError).NotTo(HaveOccurred())
Expect(dummyPodController).NotTo(BeNil())

// assure the pool features an allocated address
ipPool, err := wbClient.WhereaboutsV1alpha1().IPPools(dummyNetworkPool.GetNamespace()).Get(context.TODO(), dummyNetworkPool.GetName(), metav1.GetOptions{})
Expand Down Expand Up @@ -196,10 +246,16 @@ var _ = Describe("IPControlLoop", func() {

const maxEvents = 1
eventRecorder = record.NewFakeRecorder(maxEvents)
Expect(newDummyPodController(k8sClient, wbClient, netAttachDefClient, stopChannel, cniConfigDir, eventRecorder)).NotTo(BeNil())

dummyPodController, podControllerError = newDummyPodController(k8sClient, wbClient, netAttachDefClient, stopChannel, cniConfigDir, eventRecorder)
Expect(podControllerError).NotTo(HaveOccurred())
Expect(dummyPodController).NotTo(BeNil())
})

AfterEach(func() {
if podControllerError != nil {
return
}
stopChannel <- struct{}{}
})

Expand Down

0 comments on commit 888e459

Please sign in to comment.