Skip to content

Commit

Permalink
Fix Topology Manager policy and scope not being updated properly
Browse files Browse the repository at this point in the history
NFD is only detecting policy and scope of Topology Manager when NRT object doesn't exist.
This means that topologyManagerScope and topologyManagerPolicy attributes won't be updated
even if kubelet config was changed to use other TopologyManager policy and scope.

Signed-off-by: pprokop <[email protected]>
  • Loading branch information
PiotrProkop committed Jul 20, 2023
1 parent fd0ba3f commit 6d98b61
Show file tree
Hide file tree
Showing 2 changed files with 103 additions and 72 deletions.
42 changes: 1 addition & 41 deletions cmd/nfd-topology-updater/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,19 +19,16 @@ package main
import (
"flag"
"fmt"
"net/url"
"os"
"path"
"time"

"k8s.io/klog/v2"
kubeletconfigv1beta1 "k8s.io/kubelet/config/v1beta1"

topology "sigs.k8s.io/node-feature-discovery/pkg/nfd-topology-updater"
"sigs.k8s.io/node-feature-discovery/pkg/resourcemonitor"
"sigs.k8s.io/node-feature-discovery/pkg/utils"
"sigs.k8s.io/node-feature-discovery/pkg/utils/hostpath"
"sigs.k8s.io/node-feature-discovery/pkg/utils/kubeconf"
"sigs.k8s.io/node-feature-discovery/pkg/version"
)

Expand Down Expand Up @@ -63,14 +60,8 @@ func main() {
// Plug klog into grpc logging infrastructure
utils.ConfigureGrpcKlog()

klConfig, err := getKubeletConfig(resourcemonitorArgs.KubeletConfigURI, resourcemonitorArgs.APIAuthTokenFile)
if err != nil {
klog.ErrorS(err, "failed to get kubelet configuration")
os.Exit(1)
}

// Get new TopologyUpdater instance
instance, err := topology.NewTopologyUpdater(*args, *resourcemonitorArgs, klConfig.TopologyManagerPolicy, klConfig.TopologyManagerScope)
instance, err := topology.NewTopologyUpdater(*args, *resourcemonitorArgs)
if err != nil {
klog.ErrorS(err, "failed to initialize topology updater instance")
os.Exit(1)
Expand Down Expand Up @@ -134,34 +125,3 @@ func initFlags(flagset *flag.FlagSet) (*topology.Args, *resourcemonitor.Args) {

return args, resourcemonitorArgs
}

func getKubeletConfig(uri, apiAuthTokenFile string) (*kubeletconfigv1beta1.KubeletConfiguration, error) {
u, err := url.ParseRequestURI(uri)
if err != nil {
return nil, fmt.Errorf("failed to parse -kubelet-config-uri: %w", err)
}

// init kubelet API client
var klConfig *kubeletconfigv1beta1.KubeletConfiguration
switch u.Scheme {
case "file":
klConfig, err = kubeconf.GetKubeletConfigFromLocalFile(u.Path)
if err != nil {
return nil, fmt.Errorf("failed to read kubelet config: %w", err)
}
return klConfig, err
case "https":
restConfig, err := kubeconf.InsecureConfig(u.String(), apiAuthTokenFile)
if err != nil {
return nil, fmt.Errorf("failed to initialize rest config for kubelet config uri: %w", err)
}

klConfig, err = kubeconf.GetKubeletConfiguration(restConfig)
if err != nil {
return nil, fmt.Errorf("failed to get kubelet config from configz endpoint: %w", err)
}
return klConfig, nil
}

return nil, fmt.Errorf("unsupported URI scheme: %v", u.Scheme)
}
133 changes: 102 additions & 31 deletions pkg/nfd-topology-updater/nfd-topology-updater.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package nfdtopologyupdater

import (
"fmt"
"net/url"
"os"
"path/filepath"

Expand All @@ -26,6 +27,7 @@ import (
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/klog/v2"
kubeletconfigv1beta1 "k8s.io/kubelet/config/v1beta1"

"github.com/k8stopologyawareschedwg/noderesourcetopology-api/pkg/apis/topology/v1alpha2"
"sigs.k8s.io/node-feature-discovery/pkg/apihelper"
Expand All @@ -34,6 +36,7 @@ import (
"sigs.k8s.io/node-feature-discovery/pkg/resourcemonitor"
"sigs.k8s.io/node-feature-discovery/pkg/topologypolicy"
"sigs.k8s.io/node-feature-discovery/pkg/utils"
"sigs.k8s.io/node-feature-discovery/pkg/utils/kubeconf"
"sigs.k8s.io/node-feature-discovery/pkg/version"
"sigs.k8s.io/yaml"
)
Expand Down Expand Up @@ -66,35 +69,20 @@ type NfdTopologyUpdater interface {
Stop()
}

type staticNodeInfo struct {
nodeName string
tmPolicy string
tmScope string
}

func newStaticNodeInfo(policy, scope string) staticNodeInfo {
nodeName := utils.NodeName()
klog.InfoS("detected kubelet Topology Manager configuration", "policy", policy, "scope", scope, "nodeName", nodeName)
return staticNodeInfo{
nodeName: nodeName,
tmPolicy: policy,
tmScope: scope,
}
}

type nfdTopologyUpdater struct {
nodeInfo staticNodeInfo
nodeName string
args Args
apihelper apihelper.APIHelpers
resourcemonitorArgs resourcemonitor.Args
stop chan struct{} // channel for signaling stop
eventSource <-chan kubeletnotifier.Info
configFilePath string
config *NFDConfig
kubeletConfigFunc func() (*kubeletconfigv1beta1.KubeletConfiguration, error)
}

// NewTopologyUpdater creates a new NfdTopologyUpdater instance.
func NewTopologyUpdater(args Args, resourcemonitorArgs resourcemonitor.Args, policy, scope string) (NfdTopologyUpdater, error) {
func NewTopologyUpdater(args Args, resourcemonitorArgs resourcemonitor.Args) (NfdTopologyUpdater, error) {
eventSource := make(chan kubeletnotifier.Info)
if args.KubeletStateDir != "" {
ntf, err := kubeletnotifier.New(resourcemonitorArgs.SleepInterval, eventSource, args.KubeletStateDir)
Expand All @@ -103,24 +91,40 @@ func NewTopologyUpdater(args Args, resourcemonitorArgs resourcemonitor.Args, pol
}
go ntf.Run()
}

kubeletConfigFunc, err := getKubeletConfigFunc(resourcemonitorArgs.KubeletConfigURI, resourcemonitorArgs.APIAuthTokenFile)
if err != nil {
return nil, err
}

nfd := &nfdTopologyUpdater{
args: args,
resourcemonitorArgs: resourcemonitorArgs,
nodeInfo: newStaticNodeInfo(policy, scope),
stop: make(chan struct{}, 1),
nodeName: utils.NodeName(),
eventSource: eventSource,
config: &NFDConfig{},
kubeletConfigFunc: kubeletConfigFunc,
}
if args.ConfigFile != "" {
nfd.configFilePath = filepath.Clean(args.ConfigFile)
}
return nfd, nil
}

func (w *nfdTopologyUpdater) detectTopologyPolicyAndScope() (string, string, error) {
klConfig, err := w.kubeletConfigFunc()
if err != nil {
return "", "", err
}

return klConfig.TopologyManagerPolicy, klConfig.TopologyManagerScope, nil
}

// Run nfdTopologyUpdater. Returns if a fatal error is encountered, or, after
// one request if OneShot is set to 'true' in the updater args.
func (w *nfdTopologyUpdater) Run() error {
klog.InfoS("Node Feature Discovery Topology Updater", "version", version.Get(), "nodeName", w.nodeInfo.nodeName)
klog.InfoS("Node Feature Discovery Topology Updater", "version", version.Get(), "nodeName", w.nodeName)

podResClient, err := podres.GetPodResClient(w.resourcemonitorArgs.PodResourceSocketPath)
if err != nil {
Expand Down Expand Up @@ -151,7 +155,7 @@ func (w *nfdTopologyUpdater) Run() error {
// zonesChannel := make(chan v1alpha1.ZoneList)
var zones v1alpha2.ZoneList

excludeList := resourcemonitor.NewExcludeResourceList(w.config.ExcludeList, w.nodeInfo.nodeName)
excludeList := resourcemonitor.NewExcludeResourceList(w.config.ExcludeList, w.nodeName)
resAggr, err := resourcemonitor.NewResourcesAggregator(podResClient, excludeList)
if err != nil {
return fmt.Errorf("failed to obtain node resource information: %w", err)
Expand All @@ -169,8 +173,13 @@ func (w *nfdTopologyUpdater) Run() error {
}
zones = resAggr.Aggregate(scanResponse.PodResources)
klog.V(1).InfoS("aggregated resources identified", "resourceZones", utils.DelayedDumper(zones))
readKubeletConfig := false
if info.Event == kubeletnotifier.IntervalBased {
readKubeletConfig = true
}

if !w.args.NoPublish {
if err = w.updateNodeResourceTopology(zones, scanResponse); err != nil {
if err = w.updateNodeResourceTopology(zones, scanResponse, readKubeletConfig); err != nil {
return err
}
}
Expand All @@ -195,27 +204,29 @@ func (w *nfdTopologyUpdater) Stop() {
}
}

func (w *nfdTopologyUpdater) updateNodeResourceTopology(zoneInfo v1alpha2.ZoneList, scanResponse resourcemonitor.ScanResponse) error {
func (w *nfdTopologyUpdater) updateNodeResourceTopology(zoneInfo v1alpha2.ZoneList, scanResponse resourcemonitor.ScanResponse, readKubeletConfig bool) error {
cli, err := w.apihelper.GetTopologyClient()
if err != nil {
return err
}

nrt, err := cli.TopologyV1alpha2().NodeResourceTopologies().Get(context.TODO(), w.nodeInfo.nodeName, metav1.GetOptions{})
nrt, err := cli.TopologyV1alpha2().NodeResourceTopologies().Get(context.TODO(), w.nodeName, metav1.GetOptions{})
if errors.IsNotFound(err) {
nrtNew := v1alpha2.NodeResourceTopology{
ObjectMeta: metav1.ObjectMeta{
Name: w.nodeInfo.nodeName,
Name: w.nodeName,
},
Zones: zoneInfo,
TopologyPolicies: []string{string(topologypolicy.DetectTopologyPolicy(w.nodeInfo.tmPolicy, w.nodeInfo.tmScope))},
Attributes: createTopologyAttributes(w.nodeInfo.tmPolicy, w.nodeInfo.tmScope),
Zones: zoneInfo,
Attributes: v1alpha2.AttributeList{},
}

if err := w.updateNRTTopologyManagerInfo(&nrtNew); err != nil {
return err
}

updateAttributes(&nrtNew.Attributes, scanResponse.Attributes)

_, err := cli.TopologyV1alpha2().NodeResourceTopologies().Create(context.TODO(), &nrtNew, metav1.CreateOptions{})
if err != nil {
if _, err := cli.TopologyV1alpha2().NodeResourceTopologies().Create(context.TODO(), &nrtNew, metav1.CreateOptions{}); err != nil {
return fmt.Errorf("failed to create NodeResourceTopology: %w", err)
}
return nil
Expand All @@ -225,16 +236,41 @@ func (w *nfdTopologyUpdater) updateNodeResourceTopology(zoneInfo v1alpha2.ZoneLi

nrtMutated := nrt.DeepCopy()
nrtMutated.Zones = zoneInfo
updateAttributes(&nrtMutated.Attributes, scanResponse.Attributes)

attributes := scanResponse.Attributes

if readKubeletConfig {
if err := w.updateNRTTopologyManagerInfo(nrtMutated); err != nil {
return err
}
}

updateAttributes(&nrtMutated.Attributes, attributes)

nrtUpdated, err := cli.TopologyV1alpha2().NodeResourceTopologies().Update(context.TODO(), nrtMutated, metav1.UpdateOptions{})
if err != nil {
return fmt.Errorf("failed to update NodeResourceTopology: %w", err)
}

klog.V(4).InfoS("NodeResourceTopology object updated", "nodeResourceTopology", utils.DelayedDumper(nrtUpdated))
return nil
}

func (w *nfdTopologyUpdater) updateNRTTopologyManagerInfo(nrt *v1alpha2.NodeResourceTopology) error {
policy, scope, err := w.detectTopologyPolicyAndScope()
if err != nil {
return fmt.Errorf("failed to detect TopologyManager's policy and scope: %w", err)
}

tmAttributes := createTopologyAttributes(policy, scope)
deprecatedTopologyPolicies := []string{string(topologypolicy.DetectTopologyPolicy(policy, scope))}

updateAttributes(&nrt.Attributes, tmAttributes)
nrt.TopologyPolicies = deprecatedTopologyPolicies

return nil
}

func (w *nfdTopologyUpdater) configure() error {
if w.configFilePath == "" {
klog.InfoS("no configuration file specified")
Expand Down Expand Up @@ -290,3 +326,38 @@ func updateAttributes(lhs *v1alpha2.AttributeList, rhs v1alpha2.AttributeList) {
updateAttribute(lhs, attr)
}
}

func getKubeletConfigFunc(uri, apiAuthTokenFile string) (func() (*kubeletconfigv1beta1.KubeletConfiguration, error), error) {
u, err := url.ParseRequestURI(uri)
if err != nil {
return nil, fmt.Errorf("failed to parse -kubelet-config-uri: %w", err)
}

// init kubelet API client
var klConfig *kubeletconfigv1beta1.KubeletConfiguration
switch u.Scheme {
case "file":
return func() (*kubeletconfigv1beta1.KubeletConfiguration, error) {
klConfig, err = kubeconf.GetKubeletConfigFromLocalFile(u.Path)
if err != nil {
return nil, fmt.Errorf("failed to read kubelet config: %w", err)
}
return klConfig, err
}, nil
case "https":
restConfig, err := kubeconf.InsecureConfig(u.String(), apiAuthTokenFile)
if err != nil {
return nil, fmt.Errorf("failed to initialize rest config for kubelet config uri: %w", err)
}

return func() (*kubeletconfigv1beta1.KubeletConfiguration, error) {
klConfig, err = kubeconf.GetKubeletConfiguration(restConfig)
if err != nil {
return nil, fmt.Errorf("failed to get kubelet config from configz endpoint: %w", err)
}
return klConfig, nil
}, nil
}

return nil, fmt.Errorf("unsupported URI scheme: %v", u.Scheme)
}

0 comments on commit 6d98b61

Please sign in to comment.