diff --git a/cmd/nfd-topology-updater/main.go b/cmd/nfd-topology-updater/main.go index b857325207..f187f4df47 100644 --- a/cmd/nfd-topology-updater/main.go +++ b/cmd/nfd-topology-updater/main.go @@ -19,19 +19,16 @@ package main import ( "flag" "fmt" - "net/url" "os" "path" "time" "k8s.io/klog/v2" - kubeletconfigv1beta1 "k8s.io/kubelet/config/v1beta1" topology "sigs.k8s.io/node-feature-discovery/pkg/nfd-topology-updater" "sigs.k8s.io/node-feature-discovery/pkg/resourcemonitor" "sigs.k8s.io/node-feature-discovery/pkg/utils" "sigs.k8s.io/node-feature-discovery/pkg/utils/hostpath" - "sigs.k8s.io/node-feature-discovery/pkg/utils/kubeconf" "sigs.k8s.io/node-feature-discovery/pkg/version" ) @@ -63,14 +60,8 @@ func main() { // Plug klog into grpc logging infrastructure utils.ConfigureGrpcKlog() - klConfig, err := getKubeletConfig(resourcemonitorArgs.KubeletConfigURI, resourcemonitorArgs.APIAuthTokenFile) - if err != nil { - klog.ErrorS(err, "failed to get kubelet configuration") - os.Exit(1) - } - // Get new TopologyUpdater instance - instance, err := topology.NewTopologyUpdater(*args, *resourcemonitorArgs, klConfig.TopologyManagerPolicy, klConfig.TopologyManagerScope) + instance, err := topology.NewTopologyUpdater(*args, *resourcemonitorArgs) if err != nil { klog.ErrorS(err, "failed to initialize topology updater instance") os.Exit(1) @@ -134,34 +125,3 @@ func initFlags(flagset *flag.FlagSet) (*topology.Args, *resourcemonitor.Args) { return args, resourcemonitorArgs } - -func getKubeletConfig(uri, apiAuthTokenFile string) (*kubeletconfigv1beta1.KubeletConfiguration, error) { - u, err := url.ParseRequestURI(uri) - if err != nil { - return nil, fmt.Errorf("failed to parse -kubelet-config-uri: %w", err) - } - - // init kubelet API client - var klConfig *kubeletconfigv1beta1.KubeletConfiguration - switch u.Scheme { - case "file": - klConfig, err = kubeconf.GetKubeletConfigFromLocalFile(u.Path) - if err != nil { - return nil, fmt.Errorf("failed to read kubelet config: %w", err) - } - return klConfig, err - case "https": - restConfig, err := kubeconf.InsecureConfig(u.String(), apiAuthTokenFile) - if err != nil { - return nil, fmt.Errorf("failed to initialize rest config for kubelet config uri: %w", err) - } - - klConfig, err = kubeconf.GetKubeletConfiguration(restConfig) - if err != nil { - return nil, fmt.Errorf("failed to get kubelet config from configz endpoint: %w", err) - } - return klConfig, nil - } - - return nil, fmt.Errorf("unsupported URI scheme: %v", u.Scheme) -} diff --git a/pkg/nfd-topology-updater/nfd-topology-updater.go b/pkg/nfd-topology-updater/nfd-topology-updater.go index b810f7d4e5..0c620355d7 100644 --- a/pkg/nfd-topology-updater/nfd-topology-updater.go +++ b/pkg/nfd-topology-updater/nfd-topology-updater.go @@ -18,6 +18,7 @@ package nfdtopologyupdater import ( "fmt" + "net/url" "os" "path/filepath" @@ -26,6 +27,7 @@ import ( "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/klog/v2" + kubeletconfigv1beta1 "k8s.io/kubelet/config/v1beta1" "github.com/k8stopologyawareschedwg/noderesourcetopology-api/pkg/apis/topology/v1alpha2" "sigs.k8s.io/node-feature-discovery/pkg/apihelper" @@ -34,6 +36,7 @@ import ( "sigs.k8s.io/node-feature-discovery/pkg/resourcemonitor" "sigs.k8s.io/node-feature-discovery/pkg/topologypolicy" "sigs.k8s.io/node-feature-discovery/pkg/utils" + "sigs.k8s.io/node-feature-discovery/pkg/utils/kubeconf" "sigs.k8s.io/node-feature-discovery/pkg/version" "sigs.k8s.io/yaml" ) @@ -66,24 +69,8 @@ type NfdTopologyUpdater interface { Stop() } -type staticNodeInfo struct { - nodeName string - tmPolicy string - tmScope string -} - -func newStaticNodeInfo(policy, scope string) staticNodeInfo { - nodeName := utils.NodeName() - klog.InfoS("detected kubelet Topology Manager configuration", "policy", policy, "scope", scope, "nodeName", nodeName) - return staticNodeInfo{ - nodeName: nodeName, - tmPolicy: policy, - tmScope: scope, - } -} - type nfdTopologyUpdater struct { - nodeInfo staticNodeInfo + nodeName string args Args apihelper apihelper.APIHelpers resourcemonitorArgs resourcemonitor.Args @@ -91,10 +78,11 @@ type nfdTopologyUpdater struct { eventSource <-chan kubeletnotifier.Info configFilePath string config *NFDConfig + kubeletConfigFunc func() (*kubeletconfigv1beta1.KubeletConfiguration, error) } // NewTopologyUpdater creates a new NfdTopologyUpdater instance. -func NewTopologyUpdater(args Args, resourcemonitorArgs resourcemonitor.Args, policy, scope string) (NfdTopologyUpdater, error) { +func NewTopologyUpdater(args Args, resourcemonitorArgs resourcemonitor.Args) (NfdTopologyUpdater, error) { eventSource := make(chan kubeletnotifier.Info) if args.KubeletStateDir != "" { ntf, err := kubeletnotifier.New(resourcemonitorArgs.SleepInterval, eventSource, args.KubeletStateDir) @@ -103,13 +91,20 @@ func NewTopologyUpdater(args Args, resourcemonitorArgs resourcemonitor.Args, pol } go ntf.Run() } + + kubeletConfigFunc, err := getKubeletConfigFunc(resourcemonitorArgs.KubeletConfigURI, resourcemonitorArgs.APIAuthTokenFile) + if err != nil { + return nil, err + } + nfd := &nfdTopologyUpdater{ args: args, resourcemonitorArgs: resourcemonitorArgs, - nodeInfo: newStaticNodeInfo(policy, scope), stop: make(chan struct{}, 1), + nodeName: utils.NodeName(), eventSource: eventSource, config: &NFDConfig{}, + kubeletConfigFunc: kubeletConfigFunc, } if args.ConfigFile != "" { nfd.configFilePath = filepath.Clean(args.ConfigFile) @@ -117,10 +112,19 @@ func NewTopologyUpdater(args Args, resourcemonitorArgs resourcemonitor.Args, pol return nfd, nil } +func (w *nfdTopologyUpdater) detectTopologyPolicyAndScope() (string, string, error) { + klConfig, err := w.kubeletConfigFunc() + if err != nil { + return "", "", err + } + + return klConfig.TopologyManagerPolicy, klConfig.TopologyManagerScope, nil +} + // Run nfdTopologyUpdater. Returns if a fatal error is encountered, or, after // one request if OneShot is set to 'true' in the updater args. func (w *nfdTopologyUpdater) Run() error { - klog.InfoS("Node Feature Discovery Topology Updater", "version", version.Get(), "nodeName", w.nodeInfo.nodeName) + klog.InfoS("Node Feature Discovery Topology Updater", "version", version.Get(), "nodeName", w.nodeName) podResClient, err := podres.GetPodResClient(w.resourcemonitorArgs.PodResourceSocketPath) if err != nil { @@ -151,7 +155,7 @@ func (w *nfdTopologyUpdater) Run() error { // zonesChannel := make(chan v1alpha1.ZoneList) var zones v1alpha2.ZoneList - excludeList := resourcemonitor.NewExcludeResourceList(w.config.ExcludeList, w.nodeInfo.nodeName) + excludeList := resourcemonitor.NewExcludeResourceList(w.config.ExcludeList, w.nodeName) resAggr, err := resourcemonitor.NewResourcesAggregator(podResClient, excludeList) if err != nil { return fmt.Errorf("failed to obtain node resource information: %w", err) @@ -169,8 +173,13 @@ func (w *nfdTopologyUpdater) Run() error { } zones = resAggr.Aggregate(scanResponse.PodResources) klog.V(1).InfoS("aggregated resources identified", "resourceZones", utils.DelayedDumper(zones)) + readKubeletConfig := false + if info.Event == kubeletnotifier.IntervalBased { + readKubeletConfig = true + } + if !w.args.NoPublish { - if err = w.updateNodeResourceTopology(zones, scanResponse); err != nil { + if err = w.updateNodeResourceTopology(zones, scanResponse, readKubeletConfig); err != nil { return err } } @@ -195,27 +204,29 @@ func (w *nfdTopologyUpdater) Stop() { } } -func (w *nfdTopologyUpdater) updateNodeResourceTopology(zoneInfo v1alpha2.ZoneList, scanResponse resourcemonitor.ScanResponse) error { +func (w *nfdTopologyUpdater) updateNodeResourceTopology(zoneInfo v1alpha2.ZoneList, scanResponse resourcemonitor.ScanResponse, readKubeletConfig bool) error { cli, err := w.apihelper.GetTopologyClient() if err != nil { return err } - nrt, err := cli.TopologyV1alpha2().NodeResourceTopologies().Get(context.TODO(), w.nodeInfo.nodeName, metav1.GetOptions{}) + nrt, err := cli.TopologyV1alpha2().NodeResourceTopologies().Get(context.TODO(), w.nodeName, metav1.GetOptions{}) if errors.IsNotFound(err) { nrtNew := v1alpha2.NodeResourceTopology{ ObjectMeta: metav1.ObjectMeta{ - Name: w.nodeInfo.nodeName, + Name: w.nodeName, }, - Zones: zoneInfo, - TopologyPolicies: []string{string(topologypolicy.DetectTopologyPolicy(w.nodeInfo.tmPolicy, w.nodeInfo.tmScope))}, - Attributes: createTopologyAttributes(w.nodeInfo.tmPolicy, w.nodeInfo.tmScope), + Zones: zoneInfo, + Attributes: v1alpha2.AttributeList{}, + } + + if err := w.updateNRTTopologyManagerInfo(&nrtNew); err != nil { + return err } updateAttributes(&nrtNew.Attributes, scanResponse.Attributes) - _, err := cli.TopologyV1alpha2().NodeResourceTopologies().Create(context.TODO(), &nrtNew, metav1.CreateOptions{}) - if err != nil { + if _, err := cli.TopologyV1alpha2().NodeResourceTopologies().Create(context.TODO(), &nrtNew, metav1.CreateOptions{}); err != nil { return fmt.Errorf("failed to create NodeResourceTopology: %w", err) } return nil @@ -225,16 +236,41 @@ func (w *nfdTopologyUpdater) updateNodeResourceTopology(zoneInfo v1alpha2.ZoneLi nrtMutated := nrt.DeepCopy() nrtMutated.Zones = zoneInfo - updateAttributes(&nrtMutated.Attributes, scanResponse.Attributes) + + attributes := scanResponse.Attributes + + if readKubeletConfig { + if err := w.updateNRTTopologyManagerInfo(nrtMutated); err != nil { + return err + } + } + + updateAttributes(&nrtMutated.Attributes, attributes) nrtUpdated, err := cli.TopologyV1alpha2().NodeResourceTopologies().Update(context.TODO(), nrtMutated, metav1.UpdateOptions{}) if err != nil { return fmt.Errorf("failed to update NodeResourceTopology: %w", err) } + klog.V(4).InfoS("NodeResourceTopology object updated", "nodeResourceTopology", utils.DelayedDumper(nrtUpdated)) return nil } +func (w *nfdTopologyUpdater) updateNRTTopologyManagerInfo(nrt *v1alpha2.NodeResourceTopology) error { + policy, scope, err := w.detectTopologyPolicyAndScope() + if err != nil { + return fmt.Errorf("failed to detect TopologyManager's policy and scope: %w", err) + } + + tmAttributes := createTopologyAttributes(policy, scope) + deprecatedTopologyPolicies := []string{string(topologypolicy.DetectTopologyPolicy(policy, scope))} + + updateAttributes(&nrt.Attributes, tmAttributes) + nrt.TopologyPolicies = deprecatedTopologyPolicies + + return nil +} + func (w *nfdTopologyUpdater) configure() error { if w.configFilePath == "" { klog.InfoS("no configuration file specified") @@ -290,3 +326,38 @@ func updateAttributes(lhs *v1alpha2.AttributeList, rhs v1alpha2.AttributeList) { updateAttribute(lhs, attr) } } + +func getKubeletConfigFunc(uri, apiAuthTokenFile string) (func() (*kubeletconfigv1beta1.KubeletConfiguration, error), error) { + u, err := url.ParseRequestURI(uri) + if err != nil { + return nil, fmt.Errorf("failed to parse -kubelet-config-uri: %w", err) + } + + // init kubelet API client + var klConfig *kubeletconfigv1beta1.KubeletConfiguration + switch u.Scheme { + case "file": + return func() (*kubeletconfigv1beta1.KubeletConfiguration, error) { + klConfig, err = kubeconf.GetKubeletConfigFromLocalFile(u.Path) + if err != nil { + return nil, fmt.Errorf("failed to read kubelet config: %w", err) + } + return klConfig, err + }, nil + case "https": + restConfig, err := kubeconf.InsecureConfig(u.String(), apiAuthTokenFile) + if err != nil { + return nil, fmt.Errorf("failed to initialize rest config for kubelet config uri: %w", err) + } + + return func() (*kubeletconfigv1beta1.KubeletConfiguration, error) { + klConfig, err = kubeconf.GetKubeletConfiguration(restConfig) + if err != nil { + return nil, fmt.Errorf("failed to get kubelet config from configz endpoint: %w", err) + } + return klConfig, nil + }, nil + } + + return nil, fmt.Errorf("unsupported URI scheme: %v", u.Scheme) +}