From 000c919071e824754e137cc56db23260abcbffb3 Mon Sep 17 00:00:00 2001 From: Francesco Romani Date: Fri, 1 Sep 2023 15:11:04 +0200 Subject: [PATCH] nfd-updater: events: enable timer-only flow The nfd-topology-updater has state-directories notification mechanism enabled by default. In theory, we can have only timer-based updates, but if the option is given to disable the state-directories event source, then all the update mechanism is mistakenly disabled, including the timer-based updates. The two updaters mechanism should be decoupled. So this PR changes this to make sure we can enable just and only the timer-based updates. Signed-off-by: Francesco Romani --- .../kubeletnotifier/kubeletnotifier.go | 27 ++++++++----- .../nfd-topology-updater.go | 11 +++-- test/e2e/topology_updater_test.go | 40 +++++++++++++++++++ test/e2e/utils/noderesourcetopology.go | 2 +- 4 files changed, 64 insertions(+), 16 deletions(-) diff --git a/pkg/nfd-topology-updater/kubeletnotifier/kubeletnotifier.go b/pkg/nfd-topology-updater/kubeletnotifier/kubeletnotifier.go index 818985e03b..6dc0845116 100644 --- a/pkg/nfd-topology-updater/kubeletnotifier/kubeletnotifier.go +++ b/pkg/nfd-topology-updater/kubeletnotifier/kubeletnotifier.go @@ -54,26 +54,35 @@ type Info struct { } func New(sleepInterval time.Duration, dest chan<- Info, kubeletStateDir string) (*Notifier, error) { - devicePluginsDir := path.Join(kubeletStateDir, devicePluginsDirName) - ch, err := createFSWatcherEvent([]string{kubeletStateDir, devicePluginsDir}) - if err != nil { - return nil, err - } - return &Notifier{ + notif := Notifier{ sleepInterval: sleepInterval, dest: dest, - fsEvent: ch, - }, nil + } + + if kubeletStateDir != "" { + devicePluginsDir := path.Join(kubeletStateDir, devicePluginsDirName) + ch, err := createFSWatcherEvent([]string{kubeletStateDir, devicePluginsDir}) + if err != nil { + return nil, err + } + notif.fsEvent = ch + } + + return ¬if, nil } func (n *Notifier) Run() { - timeEvents := make(<-chan time.Time) + var timeEvents <-chan time.Time + if n.sleepInterval > 0 { ticker := time.NewTicker(n.sleepInterval) defer ticker.Stop() timeEvents = ticker.C } + // it's safe to keep the channels we don't need nil: + // https://dave.cheney.net/2014/03/19/channel-axioms + // "A receive from a nil channel blocks forever" for { select { case <-timeEvents: diff --git a/pkg/nfd-topology-updater/nfd-topology-updater.go b/pkg/nfd-topology-updater/nfd-topology-updater.go index 2d96d9f668..d3f053448c 100644 --- a/pkg/nfd-topology-updater/nfd-topology-updater.go +++ b/pkg/nfd-topology-updater/nfd-topology-updater.go @@ -85,13 +85,12 @@ type nfdTopologyUpdater struct { // NewTopologyUpdater creates a new NfdTopologyUpdater instance. func NewTopologyUpdater(args Args, resourcemonitorArgs resourcemonitor.Args) (NfdTopologyUpdater, error) { eventSource := make(chan kubeletnotifier.Info) - if args.KubeletStateDir != "" { - ntf, err := kubeletnotifier.New(resourcemonitorArgs.SleepInterval, eventSource, args.KubeletStateDir) - if err != nil { - return nil, err - } - go ntf.Run() + + ntf, err := kubeletnotifier.New(resourcemonitorArgs.SleepInterval, eventSource, args.KubeletStateDir) + if err != nil { + return nil, err } + go ntf.Run() kubeletConfigFunc, err := getKubeletConfigFunc(resourcemonitorArgs.KubeletConfigURI, resourcemonitorArgs.APIAuthTokenFile) if err != nil { diff --git a/test/e2e/topology_updater_test.go b/test/e2e/topology_updater_test.go index a85646bea1..07c017e792 100644 --- a/test/e2e/topology_updater_test.go +++ b/test/e2e/topology_updater_test.go @@ -31,6 +31,7 @@ import ( appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" extclient "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset" + apierrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" @@ -385,6 +386,45 @@ excludeList: }, 1*time.Minute, 10*time.Second).Should(BeFalse()) }) }) + + When("kubelet state monitoring disabled", func() { + BeforeEach(func(ctx context.Context) { + cfg, err := testutils.GetConfig() + Expect(err).ToNot(HaveOccurred()) + + kcfg := cfg.GetKubeletConfig() + By(fmt.Sprintf("Using config (%#v)", kcfg)) + // we need a predictable and "low enough" sleep interval to make sure we wait enough time, and still we don't want to waste too much time waiting + podSpecOpts := []testpod.SpecOption{testpod.SpecWithContainerImage(dockerImage()), testpod.SpecWithContainerExtraArgs("-kubelet-state-dir=", "-sleep-interval=3s")} + topologyUpdaterDaemonSet = testds.NFDTopologyUpdater(kcfg, podSpecOpts...) + }) + + It("should still create or update CRs with periodic updates", func(ctx context.Context) { + // this is the simplest test. A more refined test would be check updates. We do like this to minimize flakes. + By("deleting existing CRs") + + err := topologyClient.TopologyV1alpha2().NodeResourceTopologies().Delete(ctx, topologyUpdaterNode.Name, metav1.DeleteOptions{}) + Expect(err).ToNot(HaveOccurred()) + + // need to set the polling interval explicitly and bigger than the sleep interval + By("checking the topology was recreated or updated") + Eventually(func() bool { + _, err = topologyClient.TopologyV1alpha2().NodeResourceTopologies().Get(ctx, topologyUpdaterNode.Name, metav1.GetOptions{}) + if err != nil { + if apierrors.IsNotFound(err) { + framework.Logf("missing node topology resource for %q", topologyUpdaterNode.Name) + return true // intentionally retry + } + framework.Logf("failed to get the node topology resource: %v", err) + return false + } + return true + }).WithPolling(5 * time.Second).WithTimeout(30 * time.Second).Should(BeTrue()) + + framework.Logf("found NRT data for node %q!", topologyUpdaterNode.Name) + }) + }) + When("topology-updater configure to compute pod fingerprint", func() { BeforeEach(func(ctx context.Context) { cfg, err := testutils.GetConfig() diff --git a/test/e2e/utils/noderesourcetopology.go b/test/e2e/utils/noderesourcetopology.go index b6d7ebae03..b58d29289d 100644 --- a/test/e2e/utils/noderesourcetopology.go +++ b/test/e2e/utils/noderesourcetopology.go @@ -128,7 +128,7 @@ func GetNodeTopology(ctx context.Context, topologyClient *topologyclientset.Clie return false } return true - }, time.Minute, 5*time.Second).Should(gomega.BeTrue()) + }).WithPolling(5 * time.Second).WithTimeout(1 * time.Minute).Should(gomega.BeTrue()) return nodeTopology }