Skip to content

Commit

Permalink
Merge pull request #1325 from ffromani/nfd-updater-fix-events
Browse files Browse the repository at this point in the history
nfd-updater: events: enable timer-only flow
  • Loading branch information
k8s-ci-robot authored Sep 4, 2023
2 parents 9848ef9 + 000c919 commit 19520c0
Show file tree
Hide file tree
Showing 4 changed files with 64 additions and 16 deletions.
27 changes: 18 additions & 9 deletions pkg/nfd-topology-updater/kubeletnotifier/kubeletnotifier.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,26 +54,35 @@ type Info struct {
}

func New(sleepInterval time.Duration, dest chan<- Info, kubeletStateDir string) (*Notifier, error) {
devicePluginsDir := path.Join(kubeletStateDir, devicePluginsDirName)
ch, err := createFSWatcherEvent([]string{kubeletStateDir, devicePluginsDir})
if err != nil {
return nil, err
}
return &Notifier{
notif := Notifier{
sleepInterval: sleepInterval,
dest: dest,
fsEvent: ch,
}, nil
}

if kubeletStateDir != "" {
devicePluginsDir := path.Join(kubeletStateDir, devicePluginsDirName)
ch, err := createFSWatcherEvent([]string{kubeletStateDir, devicePluginsDir})
if err != nil {
return nil, err
}
notif.fsEvent = ch
}

return &notif, nil
}

func (n *Notifier) Run() {
timeEvents := make(<-chan time.Time)
var timeEvents <-chan time.Time

if n.sleepInterval > 0 {
ticker := time.NewTicker(n.sleepInterval)
defer ticker.Stop()
timeEvents = ticker.C
}

// it's safe to keep the channels we don't need nil:
// https://dave.cheney.net/2014/03/19/channel-axioms
// "A receive from a nil channel blocks forever"
for {
select {
case <-timeEvents:
Expand Down
11 changes: 5 additions & 6 deletions pkg/nfd-topology-updater/nfd-topology-updater.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,13 +85,12 @@ type nfdTopologyUpdater struct {
// NewTopologyUpdater creates a new NfdTopologyUpdater instance.
func NewTopologyUpdater(args Args, resourcemonitorArgs resourcemonitor.Args) (NfdTopologyUpdater, error) {
eventSource := make(chan kubeletnotifier.Info)
if args.KubeletStateDir != "" {
ntf, err := kubeletnotifier.New(resourcemonitorArgs.SleepInterval, eventSource, args.KubeletStateDir)
if err != nil {
return nil, err
}
go ntf.Run()

ntf, err := kubeletnotifier.New(resourcemonitorArgs.SleepInterval, eventSource, args.KubeletStateDir)
if err != nil {
return nil, err
}
go ntf.Run()

kubeletConfigFunc, err := getKubeletConfigFunc(resourcemonitorArgs.KubeletConfigURI, resourcemonitorArgs.APIAuthTokenFile)
if err != nil {
Expand Down
40 changes: 40 additions & 0 deletions test/e2e/topology_updater_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
extclient "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
Expand Down Expand Up @@ -385,6 +386,45 @@ excludeList:
}, 1*time.Minute, 10*time.Second).Should(BeFalse())
})
})

When("kubelet state monitoring disabled", func() {
BeforeEach(func(ctx context.Context) {
cfg, err := testutils.GetConfig()
Expect(err).ToNot(HaveOccurred())

kcfg := cfg.GetKubeletConfig()
By(fmt.Sprintf("Using config (%#v)", kcfg))
// we need a predictable and "low enough" sleep interval to make sure we wait enough time, and still we don't want to waste too much time waiting
podSpecOpts := []testpod.SpecOption{testpod.SpecWithContainerImage(dockerImage()), testpod.SpecWithContainerExtraArgs("-kubelet-state-dir=", "-sleep-interval=3s")}
topologyUpdaterDaemonSet = testds.NFDTopologyUpdater(kcfg, podSpecOpts...)
})

It("should still create or update CRs with periodic updates", func(ctx context.Context) {
// this is the simplest test. A more refined test would be check updates. We do like this to minimize flakes.
By("deleting existing CRs")

err := topologyClient.TopologyV1alpha2().NodeResourceTopologies().Delete(ctx, topologyUpdaterNode.Name, metav1.DeleteOptions{})
Expect(err).ToNot(HaveOccurred())

// need to set the polling interval explicitly and bigger than the sleep interval
By("checking the topology was recreated or updated")
Eventually(func() bool {
_, err = topologyClient.TopologyV1alpha2().NodeResourceTopologies().Get(ctx, topologyUpdaterNode.Name, metav1.GetOptions{})
if err != nil {
if apierrors.IsNotFound(err) {
framework.Logf("missing node topology resource for %q", topologyUpdaterNode.Name)
return true // intentionally retry
}
framework.Logf("failed to get the node topology resource: %v", err)
return false
}
return true
}).WithPolling(5 * time.Second).WithTimeout(30 * time.Second).Should(BeTrue())

framework.Logf("found NRT data for node %q!", topologyUpdaterNode.Name)
})
})

When("topology-updater configure to compute pod fingerprint", func() {
BeforeEach(func(ctx context.Context) {
cfg, err := testutils.GetConfig()
Expand Down
2 changes: 1 addition & 1 deletion test/e2e/utils/noderesourcetopology.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ func GetNodeTopology(ctx context.Context, topologyClient *topologyclientset.Clie
return false
}
return true
}, time.Minute, 5*time.Second).Should(gomega.BeTrue())
}).WithPolling(5 * time.Second).WithTimeout(1 * time.Minute).Should(gomega.BeTrue())
return nodeTopology
}

Expand Down

0 comments on commit 19520c0

Please sign in to comment.