Skip to content

Commit

Permalink
server: use a shared informer pod cache rather than direct apiserver …
Browse files Browse the repository at this point in the history
…access

When running in server mode we can use a shared informer to listen for
Pod events from the apiserver, and grab pod info from that cache rather
than doing direct apiserver requests each time.

This reduces apiserver load and retry latency, since multus can poll
the local cache more frequently than it should do direct apiserver
requests.

Signed-off-by: Dan Williams <[email protected]>
  • Loading branch information
dcbw committed Sep 8, 2023
1 parent f6776b5 commit b07f965
Show file tree
Hide file tree
Showing 7 changed files with 272 additions and 95 deletions.
4 changes: 2 additions & 2 deletions cmd/multus/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ func main() {

skel.PluginMain(
func(args *skel.CmdArgs) error {
result, err := multus.CmdAdd(args, nil, nil)
result, err := multus.CmdAdd(args, nil, nil, nil)
if err != nil {
return err
}
Expand All @@ -54,6 +54,6 @@ func main() {
func(args *skel.CmdArgs) error {
return multus.CmdCheck(args, nil, nil)
},
func(args *skel.CmdArgs) error { return multus.CmdDel(args, nil, nil) },
func(args *skel.CmdArgs) error { return multus.CmdDel(args, nil, nil, nil) },
cniversion.All, "meta-plugin that delegates to other CNI plugins")
}
62 changes: 50 additions & 12 deletions pkg/multus/multus.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ import (
"k8s.io/apimachinery/pkg/api/errors"
k8snet "k8s.io/apimachinery/pkg/util/net"
"k8s.io/apimachinery/pkg/util/wait"
listers "k8s.io/client-go/listers/core/v1"
"k8s.io/client-go/tools/cache"

k8s "gopkg.in/k8snetworkplumbingwg/multus-cni.v4/pkg/k8sclient"
"gopkg.in/k8snetworkplumbingwg/multus-cni.v4/pkg/logging"
Expand All @@ -46,8 +48,9 @@ import (
)

const (
shortPollDuration = 250 * time.Millisecond
shortPollTimeout = 2500 * time.Millisecond
shortPollDuration = 250 * time.Millisecond
informerPollDuration = 50 * time.Millisecond
shortPollTimeout = 2500 * time.Millisecond
)

var (
Expand Down Expand Up @@ -492,10 +495,13 @@ func cmdPluginErr(k8sArgs *types.K8sArgs, confName string, format string, args .
return logging.Errorf(msg+format, args...)
}

func isCriticalRequestRetriable(err error) bool {
func isCriticalRequestRetriable(err error, otherFn func(error) bool) bool {
logging.Debugf("isCriticalRequestRetriable: %v", err)
errorTypesAllowingRetry := []func(error) bool{
errors.IsServiceUnavailable, errors.IsInternalError, k8snet.IsConnectionReset, k8snet.IsConnectionRefused}
if otherFn != nil {
errorTypesAllowingRetry = append(errorTypesAllowingRetry, otherFn)
}
for _, f := range errorTypesAllowingRetry {
if f(err) {
return true
Expand All @@ -506,7 +512,7 @@ func isCriticalRequestRetriable(err error) bool {

// GetPod retrieves Kubernetes Pod object from given namespace/name in k8sArgs (i.e. cni args)
// GetPod also get pod UID, but it is not used to retrieve, but it is used for double check
func GetPod(kubeClient *k8s.ClientInfo, k8sArgs *types.K8sArgs, warnOnly bool) (*v1.Pod, error) {
func GetPod(kubeClient *k8s.ClientInfo, podInformer cache.SharedIndexInformer, k8sArgs *types.K8sArgs, warnOnly bool) (*v1.Pod, error) {
if kubeClient == nil {
return nil, nil
}
Expand All @@ -515,12 +521,44 @@ func GetPod(kubeClient *k8s.ClientInfo, k8sArgs *types.K8sArgs, warnOnly bool) (
podName := string(k8sArgs.K8S_POD_NAME)
podUID := string(k8sArgs.K8S_POD_UID)

pod, err := kubeClient.GetPod(podNamespace, podName)
// Keep track of how long getting the pod takes
logging.Debugf("GetPod for [%s/%s] starting", podNamespace, podName)
start := time.Now()
defer func() {
logging.Debugf("GetPod for [%s/%s] took %v", podNamespace, podName, time.Since(start))
}()

var pod *v1.Pod
var err error
var retryErrFunc func(error) bool

// Default to direct apiserver request
podGetter := func(ns, name string) (*v1.Pod, error) {
return kubeClient.GetPod(ns, name)
}
pollDuration := shortPollDuration

if podInformer != nil {
logging.Debugf("GetPod for [%s/%s] will use informer cache", podNamespace, podName)
// Use the shared informer cache to reduce apiserver load
podGetter = func(ns, name string) (*v1.Pod, error) {
return listers.NewPodLister(podInformer.GetIndexer()).Pods(ns).Get(name)
}
// NotFound is a retriable error since the cache may be a bit behind the apiserver
retryErrFunc = errors.IsNotFound
// We can poll the informer cache more frequently since it's local
pollDuration = informerPollDuration
}

pod, err = podGetter(podNamespace, podName)
if err != nil {
// in case of a retriable error, retry 10 times with 0.25 sec interval
if isCriticalRequestRetriable(err) {
waitErr := wait.PollImmediate(shortPollDuration, shortPollTimeout, func() (bool, error) {
pod, err = kubeClient.GetPod(podNamespace, podName)
if isCriticalRequestRetriable(err, retryErrFunc) {
waitErr := wait.PollImmediate(pollDuration, shortPollTimeout, func() (bool, error) {
pod, err = podGetter(podNamespace, podName)
if retryErrFunc != nil && retryErrFunc(err) {
return false, nil
}
return pod != nil, err
})
// retry failed, then return error with retry out
Expand Down Expand Up @@ -552,7 +590,7 @@ func GetPod(kubeClient *k8s.ClientInfo, k8sArgs *types.K8sArgs, warnOnly bool) (
}

// CmdAdd ...
func CmdAdd(args *skel.CmdArgs, exec invoke.Exec, kubeClient *k8s.ClientInfo) (cnitypes.Result, error) {
func CmdAdd(args *skel.CmdArgs, exec invoke.Exec, kubeClient *k8s.ClientInfo, podInformer cache.SharedIndexInformer) (cnitypes.Result, error) {
n, err := types.LoadNetConf(args.StdinData)
logging.Debugf("CmdAdd: %v, %v, %v", args, exec, kubeClient)
if err != nil {
Expand All @@ -575,7 +613,7 @@ func CmdAdd(args *skel.CmdArgs, exec invoke.Exec, kubeClient *k8s.ClientInfo) (c
}
}

pod, err := GetPod(kubeClient, k8sArgs, false)
pod, err := GetPod(kubeClient, podInformer, k8sArgs, false)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -772,7 +810,7 @@ func CmdCheck(args *skel.CmdArgs, exec invoke.Exec, kubeClient *k8s.ClientInfo)
}

// CmdDel ...
func CmdDel(args *skel.CmdArgs, exec invoke.Exec, kubeClient *k8s.ClientInfo) error {
func CmdDel(args *skel.CmdArgs, exec invoke.Exec, kubeClient *k8s.ClientInfo, podInformer cache.SharedIndexInformer) error {
in, err := types.LoadNetConf(args.StdinData)
logging.Debugf("CmdDel: %v, %v, %v", args, exec, kubeClient)
if err != nil {
Expand Down Expand Up @@ -814,7 +852,7 @@ func CmdDel(args *skel.CmdArgs, exec invoke.Exec, kubeClient *k8s.ClientInfo) er
return cmdErr(nil, "error getting k8s client: %v", err)
}

pod, err := GetPod(kubeClient, k8sArgs, true)
pod, err := GetPod(kubeClient, podInformer, k8sArgs, true)
if err != nil {
// GetPod may be failed but just do print error in its log and continue to delete
logging.Errorf("Multus: GetPod failed: %v, but continue to delete", err)
Expand Down
36 changes: 18 additions & 18 deletions pkg/multus/multus_cni020_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,14 +142,14 @@ var _ = Describe("multus operations cniVersion 0.2.0 config", func() {
}`
fExec.addPlugin020(nil, "net1", expectedConf2, expectedResult2, nil)

result, err := CmdAdd(args, fExec, nil)
result, err := CmdAdd(args, fExec, nil, nil)
Expect(err).NotTo(HaveOccurred())
Expect(fExec.addIndex).To(Equal(len(fExec.plugins)))
r := result.(*types020.Result)
// plugin 1 is the masterplugin
Expect(reflect.DeepEqual(r, expectedResult1)).To(BeTrue())

err = CmdDel(args, fExec, nil)
err = CmdDel(args, fExec, nil, nil)
Expect(err).NotTo(HaveOccurred())
Expect(fExec.delIndex).To(Equal(len(fExec.plugins)))
})
Expand Down Expand Up @@ -203,14 +203,14 @@ var _ = Describe("multus operations cniVersion 0.2.0 config", func() {
}`
fExec.addPlugin020(nil, "net1", expectedConf2, expectedResult2, nil)

result, err := CmdAdd(args, fExec, nil)
result, err := CmdAdd(args, fExec, nil, nil)
Expect(err).NotTo(HaveOccurred())
Expect(fExec.addIndex).To(Equal(len(fExec.plugins)))
r := result.(*types020.Result)
// plugin 1 is the masterplugin
Expect(reflect.DeepEqual(r, expectedResult1)).To(BeTrue())

err = CmdDel(args, fExec, nil)
err = CmdDel(args, fExec, nil, nil)
Expect(err).NotTo(HaveOccurred())
Expect(fExec.delIndex).To(Equal(len(fExec.plugins)))
})
Expand Down Expand Up @@ -265,7 +265,7 @@ var _ = Describe("multus operations cniVersion 0.2.0 config", func() {
}`
fExec.addPlugin020(nil, "net1", expectedConf2, expectedResult2, nil)

_, err := CmdAdd(args, fExec, nil)
_, err := CmdAdd(args, fExec, nil, nil)
Expect(err).To(MatchError("[//:weave1]: error adding container to network \"weave1\": DelegateAdd: cannot set \"weave-net\" interface name to \"eth0\": validateIfName: no net namespace fsdadfad found: failed to Statfs \"fsdadfad\": no such file or directory"))
})

Expand Down Expand Up @@ -319,10 +319,10 @@ var _ = Describe("multus operations cniVersion 0.2.0 config", func() {
}`
fExec.addPlugin020(nil, "net1", expectedConf2, expectedResult2, nil)

_, err := CmdAdd(args, fExec, nil)
_, err := CmdAdd(args, fExec, nil, nil)
Expect(err).To(HaveOccurred())

err = CmdDel(args, fExec, nil)
err = CmdDel(args, fExec, nil, nil)
Expect(err).To(HaveOccurred())
})

Expand Down Expand Up @@ -363,7 +363,7 @@ var _ = Describe("multus operations cniVersion 0.2.0 config", func() {
err := fmt.Errorf("expected plugin failure")
fExec.addPlugin020(nil, "net1", expectedConf2, nil, err)

_, err = CmdAdd(args, fExec, nil)
_, err = CmdAdd(args, fExec, nil, nil)
Expect(fExec.addIndex).To(Equal(2))
Expect(fExec.delIndex).To(Equal(2))
Expect(err).To(MatchError("[//:other1]: error adding container to network \"other1\": expected plugin failure"))
Expand Down Expand Up @@ -409,7 +409,7 @@ var _ = Describe("multus operations cniVersion 0.2.0 config", func() {
err := fmt.Errorf("expected plugin failure")
fExec.addPlugin020(nil, "net1", expectedConf2, nil, err)

_, err = CmdAdd(args, fExec, nil)
_, err = CmdAdd(args, fExec, nil, nil)
Expect(fExec.addIndex).To(Equal(1))
Expect(fExec.delIndex).To(Equal(2))
Expect(err).To(HaveOccurred())
Expand Down Expand Up @@ -491,7 +491,7 @@ var _ = Describe("multus operations cniVersion 0.2.0 config", func() {
testhelpers.NewFakeNetAttachDef(fakePod.ObjectMeta.Namespace, "net3", net3))
Expect(err).NotTo(HaveOccurred())

result, err := CmdAdd(args, fExec, clientInfo)
result, err := CmdAdd(args, fExec, clientInfo, nil)
Expect(err).NotTo(HaveOccurred())
Expect(fExec.addIndex).To(Equal(len(fExec.plugins)))
r := result.(*types020.Result)
Expand Down Expand Up @@ -557,7 +557,7 @@ var _ = Describe("multus operations cniVersion 0.2.0 config", func() {
testhelpers.NewFakeNetAttachDef(fakePod.ObjectMeta.Namespace, "net1", net1))
Expect(err).NotTo(HaveOccurred())

result, err := CmdAdd(args, fExec, fKubeClient)
result, err := CmdAdd(args, fExec, fKubeClient, nil)
Expect(err).NotTo(HaveOccurred())
Expect(fExec.addIndex).To(Equal(len(fExec.plugins)))
r := result.(*types020.Result)
Expand All @@ -568,7 +568,7 @@ var _ = Describe("multus operations cniVersion 0.2.0 config", func() {
err = fKubeClient.DeletePod(fakePod.ObjectMeta.Namespace, fakePod.ObjectMeta.Name)
Expect(err).NotTo(HaveOccurred())

err = CmdDel(args, fExec, fKubeClient)
err = CmdDel(args, fExec, fKubeClient, nil)
Expect(err).NotTo(HaveOccurred())
Expect(fExec.delIndex).To(Equal(len(fExec.plugins)))
})
Expand Down Expand Up @@ -609,13 +609,13 @@ var _ = Describe("multus operations cniVersion 0.2.0 config", func() {
_, err := fKubeClient.AddNetAttachDef(testhelpers.NewFakeNetAttachDef("kube-system", "net1", net1))
Expect(err).NotTo(HaveOccurred())

result, err := CmdAdd(args, fExec, fKubeClient)
result, err := CmdAdd(args, fExec, fKubeClient, nil)
Expect(err).NotTo(HaveOccurred())
Expect(fExec.addIndex).To(Equal(len(fExec.plugins)))
r := result.(*types020.Result)
Expect(reflect.DeepEqual(r, expectedResult1)).To(BeTrue())

err = CmdDel(args, fExec, fKubeClient)
err = CmdDel(args, fExec, fKubeClient, nil)
Expect(err).NotTo(HaveOccurred())
Expect(fExec.delIndex).To(Equal(len(fExec.plugins)))
})
Expand Down Expand Up @@ -674,7 +674,7 @@ var _ = Describe("multus operations cniVersion 0.2.0 config", func() {
_, err = fKubeClient.AddNetAttachDef(
testhelpers.NewFakeNetAttachDef(fakePod.ObjectMeta.Namespace, "net1", net1))
Expect(err).NotTo(HaveOccurred())
result, err := CmdAdd(args, fExec, fKubeClient)
result, err := CmdAdd(args, fExec, fKubeClient, nil)
Expect(err).NotTo(HaveOccurred())
Expect(fExec.addIndex).To(Equal(len(fExec.plugins)))
r := result.(*types020.Result)
Expand All @@ -687,7 +687,7 @@ var _ = Describe("multus operations cniVersion 0.2.0 config", func() {
Expect(err).NotTo(HaveOccurred())

By("Delete and check net count is not incremented")
err = CmdDel(args, fExec, fKubeClient)
err = CmdDel(args, fExec, fKubeClient, nil)
Expect(err).NotTo(HaveOccurred())
Expect(fExec.delIndex).To(Equal(len(fExec.plugins)))
})
Expand Down Expand Up @@ -746,7 +746,7 @@ var _ = Describe("multus operations cniVersion 0.2.0 config", func() {
_, err = fKubeClient.AddNetAttachDef(
testhelpers.NewFakeNetAttachDef(fakePod.ObjectMeta.Namespace, "net1", net1))
Expect(err).NotTo(HaveOccurred())
result, err := CmdAdd(args, fExec, fKubeClient)
result, err := CmdAdd(args, fExec, fKubeClient, nil)
Expect(err).NotTo(HaveOccurred())
Expect(fExec.addIndex).To(Equal(len(fExec.plugins)))
r := result.(*types020.Result)
Expand All @@ -762,7 +762,7 @@ var _ = Describe("multus operations cniVersion 0.2.0 config", func() {
Expect(err).NotTo(HaveOccurred())

By("Delete and check pod/net count is incremented")
err = CmdDel(args, fExec, fKubeClient)
err = CmdDel(args, fExec, fKubeClient, nil)
Expect(err).NotTo(HaveOccurred())
Expect(fExec.delIndex).To(Equal(len(fExec.plugins)))
})
Expand Down
Loading

0 comments on commit b07f965

Please sign in to comment.