From 6d98b6150bb72698dca460bad5e00b8bfbf817a8 Mon Sep 17 00:00:00 2001 From: pprokop Date: Tue, 4 Jul 2023 10:46:52 +0200 Subject: [PATCH] Fix Topology Manager policy and scope not being updated properly NFD is only detecting policy and scope of Topology Manager when NRT object doesn't exist. This means that topologyManagerScope and topologyManagerPolicy attributes won't be updated even if kubelet config was changed to use other TopologyManager policy and scope. Signed-off-by: pprokop --- cmd/nfd-topology-updater/main.go | 42 +----- .../nfd-topology-updater.go | 133 ++++++++++++++---- 2 files changed, 103 insertions(+), 72 deletions(-) diff --git a/cmd/nfd-topology-updater/main.go b/cmd/nfd-topology-updater/main.go index b857325207..f187f4df47 100644 --- a/cmd/nfd-topology-updater/main.go +++ b/cmd/nfd-topology-updater/main.go @@ -19,19 +19,16 @@ package main import ( "flag" "fmt" - "net/url" "os" "path" "time" "k8s.io/klog/v2" - kubeletconfigv1beta1 "k8s.io/kubelet/config/v1beta1" topology "sigs.k8s.io/node-feature-discovery/pkg/nfd-topology-updater" "sigs.k8s.io/node-feature-discovery/pkg/resourcemonitor" "sigs.k8s.io/node-feature-discovery/pkg/utils" "sigs.k8s.io/node-feature-discovery/pkg/utils/hostpath" - "sigs.k8s.io/node-feature-discovery/pkg/utils/kubeconf" "sigs.k8s.io/node-feature-discovery/pkg/version" ) @@ -63,14 +60,8 @@ func main() { // Plug klog into grpc logging infrastructure utils.ConfigureGrpcKlog() - klConfig, err := getKubeletConfig(resourcemonitorArgs.KubeletConfigURI, resourcemonitorArgs.APIAuthTokenFile) - if err != nil { - klog.ErrorS(err, "failed to get kubelet configuration") - os.Exit(1) - } - // Get new TopologyUpdater instance - instance, err := topology.NewTopologyUpdater(*args, *resourcemonitorArgs, klConfig.TopologyManagerPolicy, klConfig.TopologyManagerScope) + instance, err := topology.NewTopologyUpdater(*args, *resourcemonitorArgs) if err != nil { klog.ErrorS(err, "failed to initialize topology updater instance") os.Exit(1) @@ -134,34 +125,3 @@ func initFlags(flagset *flag.FlagSet) (*topology.Args, *resourcemonitor.Args) { return args, resourcemonitorArgs } - -func getKubeletConfig(uri, apiAuthTokenFile string) (*kubeletconfigv1beta1.KubeletConfiguration, error) { - u, err := url.ParseRequestURI(uri) - if err != nil { - return nil, fmt.Errorf("failed to parse -kubelet-config-uri: %w", err) - } - - // init kubelet API client - var klConfig *kubeletconfigv1beta1.KubeletConfiguration - switch u.Scheme { - case "file": - klConfig, err = kubeconf.GetKubeletConfigFromLocalFile(u.Path) - if err != nil { - return nil, fmt.Errorf("failed to read kubelet config: %w", err) - } - return klConfig, err - case "https": - restConfig, err := kubeconf.InsecureConfig(u.String(), apiAuthTokenFile) - if err != nil { - return nil, fmt.Errorf("failed to initialize rest config for kubelet config uri: %w", err) - } - - klConfig, err = kubeconf.GetKubeletConfiguration(restConfig) - if err != nil { - return nil, fmt.Errorf("failed to get kubelet config from configz endpoint: %w", err) - } - return klConfig, nil - } - - return nil, fmt.Errorf("unsupported URI scheme: %v", u.Scheme) -} diff --git a/pkg/nfd-topology-updater/nfd-topology-updater.go b/pkg/nfd-topology-updater/nfd-topology-updater.go index b810f7d4e5..0c620355d7 100644 --- a/pkg/nfd-topology-updater/nfd-topology-updater.go +++ b/pkg/nfd-topology-updater/nfd-topology-updater.go @@ -18,6 +18,7 @@ package nfdtopologyupdater import ( "fmt" + "net/url" "os" "path/filepath" @@ -26,6 +27,7 @@ import ( "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/klog/v2" + kubeletconfigv1beta1 "k8s.io/kubelet/config/v1beta1" "github.com/k8stopologyawareschedwg/noderesourcetopology-api/pkg/apis/topology/v1alpha2" "sigs.k8s.io/node-feature-discovery/pkg/apihelper" @@ -34,6 +36,7 @@ import ( "sigs.k8s.io/node-feature-discovery/pkg/resourcemonitor" "sigs.k8s.io/node-feature-discovery/pkg/topologypolicy" "sigs.k8s.io/node-feature-discovery/pkg/utils" + "sigs.k8s.io/node-feature-discovery/pkg/utils/kubeconf" "sigs.k8s.io/node-feature-discovery/pkg/version" "sigs.k8s.io/yaml" ) @@ -66,24 +69,8 @@ type NfdTopologyUpdater interface { Stop() } -type staticNodeInfo struct { - nodeName string - tmPolicy string - tmScope string -} - -func newStaticNodeInfo(policy, scope string) staticNodeInfo { - nodeName := utils.NodeName() - klog.InfoS("detected kubelet Topology Manager configuration", "policy", policy, "scope", scope, "nodeName", nodeName) - return staticNodeInfo{ - nodeName: nodeName, - tmPolicy: policy, - tmScope: scope, - } -} - type nfdTopologyUpdater struct { - nodeInfo staticNodeInfo + nodeName string args Args apihelper apihelper.APIHelpers resourcemonitorArgs resourcemonitor.Args @@ -91,10 +78,11 @@ type nfdTopologyUpdater struct { eventSource <-chan kubeletnotifier.Info configFilePath string config *NFDConfig + kubeletConfigFunc func() (*kubeletconfigv1beta1.KubeletConfiguration, error) } // NewTopologyUpdater creates a new NfdTopologyUpdater instance. -func NewTopologyUpdater(args Args, resourcemonitorArgs resourcemonitor.Args, policy, scope string) (NfdTopologyUpdater, error) { +func NewTopologyUpdater(args Args, resourcemonitorArgs resourcemonitor.Args) (NfdTopologyUpdater, error) { eventSource := make(chan kubeletnotifier.Info) if args.KubeletStateDir != "" { ntf, err := kubeletnotifier.New(resourcemonitorArgs.SleepInterval, eventSource, args.KubeletStateDir) @@ -103,13 +91,20 @@ func NewTopologyUpdater(args Args, resourcemonitorArgs resourcemonitor.Args, pol } go ntf.Run() } + + kubeletConfigFunc, err := getKubeletConfigFunc(resourcemonitorArgs.KubeletConfigURI, resourcemonitorArgs.APIAuthTokenFile) + if err != nil { + return nil, err + } + nfd := &nfdTopologyUpdater{ args: args, resourcemonitorArgs: resourcemonitorArgs, - nodeInfo: newStaticNodeInfo(policy, scope), stop: make(chan struct{}, 1), + nodeName: utils.NodeName(), eventSource: eventSource, config: &NFDConfig{}, + kubeletConfigFunc: kubeletConfigFunc, } if args.ConfigFile != "" { nfd.configFilePath = filepath.Clean(args.ConfigFile) @@ -117,10 +112,19 @@ func NewTopologyUpdater(args Args, resourcemonitorArgs resourcemonitor.Args, pol return nfd, nil } +func (w *nfdTopologyUpdater) detectTopologyPolicyAndScope() (string, string, error) { + klConfig, err := w.kubeletConfigFunc() + if err != nil { + return "", "", err + } + + return klConfig.TopologyManagerPolicy, klConfig.TopologyManagerScope, nil +} + // Run nfdTopologyUpdater. Returns if a fatal error is encountered, or, after // one request if OneShot is set to 'true' in the updater args. func (w *nfdTopologyUpdater) Run() error { - klog.InfoS("Node Feature Discovery Topology Updater", "version", version.Get(), "nodeName", w.nodeInfo.nodeName) + klog.InfoS("Node Feature Discovery Topology Updater", "version", version.Get(), "nodeName", w.nodeName) podResClient, err := podres.GetPodResClient(w.resourcemonitorArgs.PodResourceSocketPath) if err != nil { @@ -151,7 +155,7 @@ func (w *nfdTopologyUpdater) Run() error { // zonesChannel := make(chan v1alpha1.ZoneList) var zones v1alpha2.ZoneList - excludeList := resourcemonitor.NewExcludeResourceList(w.config.ExcludeList, w.nodeInfo.nodeName) + excludeList := resourcemonitor.NewExcludeResourceList(w.config.ExcludeList, w.nodeName) resAggr, err := resourcemonitor.NewResourcesAggregator(podResClient, excludeList) if err != nil { return fmt.Errorf("failed to obtain node resource information: %w", err) @@ -169,8 +173,13 @@ func (w *nfdTopologyUpdater) Run() error { } zones = resAggr.Aggregate(scanResponse.PodResources) klog.V(1).InfoS("aggregated resources identified", "resourceZones", utils.DelayedDumper(zones)) + readKubeletConfig := false + if info.Event == kubeletnotifier.IntervalBased { + readKubeletConfig = true + } + if !w.args.NoPublish { - if err = w.updateNodeResourceTopology(zones, scanResponse); err != nil { + if err = w.updateNodeResourceTopology(zones, scanResponse, readKubeletConfig); err != nil { return err } } @@ -195,27 +204,29 @@ func (w *nfdTopologyUpdater) Stop() { } } -func (w *nfdTopologyUpdater) updateNodeResourceTopology(zoneInfo v1alpha2.ZoneList, scanResponse resourcemonitor.ScanResponse) error { +func (w *nfdTopologyUpdater) updateNodeResourceTopology(zoneInfo v1alpha2.ZoneList, scanResponse resourcemonitor.ScanResponse, readKubeletConfig bool) error { cli, err := w.apihelper.GetTopologyClient() if err != nil { return err } - nrt, err := cli.TopologyV1alpha2().NodeResourceTopologies().Get(context.TODO(), w.nodeInfo.nodeName, metav1.GetOptions{}) + nrt, err := cli.TopologyV1alpha2().NodeResourceTopologies().Get(context.TODO(), w.nodeName, metav1.GetOptions{}) if errors.IsNotFound(err) { nrtNew := v1alpha2.NodeResourceTopology{ ObjectMeta: metav1.ObjectMeta{ - Name: w.nodeInfo.nodeName, + Name: w.nodeName, }, - Zones: zoneInfo, - TopologyPolicies: []string{string(topologypolicy.DetectTopologyPolicy(w.nodeInfo.tmPolicy, w.nodeInfo.tmScope))}, - Attributes: createTopologyAttributes(w.nodeInfo.tmPolicy, w.nodeInfo.tmScope), + Zones: zoneInfo, + Attributes: v1alpha2.AttributeList{}, + } + + if err := w.updateNRTTopologyManagerInfo(&nrtNew); err != nil { + return err } updateAttributes(&nrtNew.Attributes, scanResponse.Attributes) - _, err := cli.TopologyV1alpha2().NodeResourceTopologies().Create(context.TODO(), &nrtNew, metav1.CreateOptions{}) - if err != nil { + if _, err := cli.TopologyV1alpha2().NodeResourceTopologies().Create(context.TODO(), &nrtNew, metav1.CreateOptions{}); err != nil { return fmt.Errorf("failed to create NodeResourceTopology: %w", err) } return nil @@ -225,16 +236,41 @@ func (w *nfdTopologyUpdater) updateNodeResourceTopology(zoneInfo v1alpha2.ZoneLi nrtMutated := nrt.DeepCopy() nrtMutated.Zones = zoneInfo - updateAttributes(&nrtMutated.Attributes, scanResponse.Attributes) + + attributes := scanResponse.Attributes + + if readKubeletConfig { + if err := w.updateNRTTopologyManagerInfo(nrtMutated); err != nil { + return err + } + } + + updateAttributes(&nrtMutated.Attributes, attributes) nrtUpdated, err := cli.TopologyV1alpha2().NodeResourceTopologies().Update(context.TODO(), nrtMutated, metav1.UpdateOptions{}) if err != nil { return fmt.Errorf("failed to update NodeResourceTopology: %w", err) } + klog.V(4).InfoS("NodeResourceTopology object updated", "nodeResourceTopology", utils.DelayedDumper(nrtUpdated)) return nil } +func (w *nfdTopologyUpdater) updateNRTTopologyManagerInfo(nrt *v1alpha2.NodeResourceTopology) error { + policy, scope, err := w.detectTopologyPolicyAndScope() + if err != nil { + return fmt.Errorf("failed to detect TopologyManager's policy and scope: %w", err) + } + + tmAttributes := createTopologyAttributes(policy, scope) + deprecatedTopologyPolicies := []string{string(topologypolicy.DetectTopologyPolicy(policy, scope))} + + updateAttributes(&nrt.Attributes, tmAttributes) + nrt.TopologyPolicies = deprecatedTopologyPolicies + + return nil +} + func (w *nfdTopologyUpdater) configure() error { if w.configFilePath == "" { klog.InfoS("no configuration file specified") @@ -290,3 +326,38 @@ func updateAttributes(lhs *v1alpha2.AttributeList, rhs v1alpha2.AttributeList) { updateAttribute(lhs, attr) } } + +func getKubeletConfigFunc(uri, apiAuthTokenFile string) (func() (*kubeletconfigv1beta1.KubeletConfiguration, error), error) { + u, err := url.ParseRequestURI(uri) + if err != nil { + return nil, fmt.Errorf("failed to parse -kubelet-config-uri: %w", err) + } + + // init kubelet API client + var klConfig *kubeletconfigv1beta1.KubeletConfiguration + switch u.Scheme { + case "file": + return func() (*kubeletconfigv1beta1.KubeletConfiguration, error) { + klConfig, err = kubeconf.GetKubeletConfigFromLocalFile(u.Path) + if err != nil { + return nil, fmt.Errorf("failed to read kubelet config: %w", err) + } + return klConfig, err + }, nil + case "https": + restConfig, err := kubeconf.InsecureConfig(u.String(), apiAuthTokenFile) + if err != nil { + return nil, fmt.Errorf("failed to initialize rest config for kubelet config uri: %w", err) + } + + return func() (*kubeletconfigv1beta1.KubeletConfiguration, error) { + klConfig, err = kubeconf.GetKubeletConfiguration(restConfig) + if err != nil { + return nil, fmt.Errorf("failed to get kubelet config from configz endpoint: %w", err) + } + return klConfig, nil + }, nil + } + + return nil, fmt.Errorf("unsupported URI scheme: %v", u.Scheme) +}