From ad7e57ed2dfc67a15a30ab868991b2ac5363c957 Mon Sep 17 00:00:00 2001 From: Francesco Romani Date: Mon, 30 Jan 2023 15:09:47 +0100 Subject: [PATCH] nrt: expose TM config as top-level attributes expose TM configuration as attributes. This is in addition to TopologyPolicies field, which is being deprecated. See: - https://github.com/k8stopologyawareschedwg/noderesourcetopology-api/issues/24 - https://github.com/k8stopologyawareschedwg/noderesourcetopology-api/pull/25 Signed-off-by: Francesco Romani --- pkg/nrtupdater/nrtupdater.go | 32 +++++++++++- pkg/nrtupdater/nrtupdater_test.go | 39 +++++++++++++-- pkg/resourcemonitor/resourcemonitor.go | 48 +++++++++++++++--- pkg/resourcemonitor/resourcemonitor_test.go | 49 ++++++------------- .../resourceobserver.go | 7 ++- .../resourcetopologyexporter.go | 10 ++-- tools/nrtstress/main.go | 8 ++- 7 files changed, 141 insertions(+), 52 deletions(-) diff --git a/pkg/nrtupdater/nrtupdater.go b/pkg/nrtupdater/nrtupdater.go index c6558580c..fa3d9fa50 100644 --- a/pkg/nrtupdater/nrtupdater.go +++ b/pkg/nrtupdater/nrtupdater.go @@ -32,15 +32,26 @@ type Args struct { Hostname string } +type TMConfig struct { + Policy string + Scope string +} + +func (conf TMConfig) IsValid() bool { + return conf.Policy != "" && conf.Scope != "" +} + type NRTUpdater struct { args Args tmPolicy string + tmConfig TMConfig stopChan chan struct{} } type MonitorInfo struct { Timer bool Zones v1alpha2.ZoneList + Attributes v1alpha2.AttributeList Annotations map[string]string } @@ -51,10 +62,11 @@ func (mi MonitorInfo) UpdateReason() string { return RTEUpdateReactive } -func NewNRTUpdater(args Args, policy string) *NRTUpdater { +func NewNRTUpdater(args Args, policy string, tmconf TMConfig) *NRTUpdater { return &NRTUpdater{ args: args, tmPolicy: policy, + tmConfig: tmconf, stopChan: make(chan struct{}), } } @@ -117,7 +129,23 @@ func (te *NRTUpdater) updateNRTInfo(nrt *v1alpha2.NodeResourceTopology, info Mon nrt.Annotations = mergeAnnotations(nrt.Annotations, info.Annotations) nrt.Annotations[k8sannotations.RTEUpdate] = info.UpdateReason() nrt.TopologyPolicies = []string{te.tmPolicy} - nrt.Zones = info.Zones + nrt.Zones = info.Zones.DeepCopy() + nrt.Attributes = info.Attributes.DeepCopy() + nrt.Attributes = append(nrt.Attributes, te.makeAttributes()...) + // TODO: check for duplicate attributes? +} + +func (te *NRTUpdater) makeAttributes() v1alpha2.AttributeList { + return v1alpha2.AttributeList{ + { + Name: "topologyManagerScope", + Value: te.tmConfig.Scope, + }, + { + Name: "topologyManagerPolicy", + Value: te.tmConfig.Policy, + }, + } } func (te *NRTUpdater) Stop() { diff --git a/pkg/nrtupdater/nrtupdater_test.go b/pkg/nrtupdater/nrtupdater_test.go index c847d581a..e5124fc26 100644 --- a/pkg/nrtupdater/nrtupdater_test.go +++ b/pkg/nrtupdater/nrtupdater_test.go @@ -17,6 +17,7 @@ limitations under the License. package nrtupdater import ( + "reflect" "testing" corev1 "k8s.io/api/core/v1" @@ -40,8 +41,17 @@ func TestUpdateTMPolicy(t *testing.T) { policyInitial := "policy-initial" policyUpdated := "policy-updated" + tmConfInitial := TMConfig{ + Scope: "scope-initial", + Policy: "policy-initial", + } + tmConfUpdated := TMConfig{ + Scope: "scope-updated", + Policy: "polcy-updated", + } + var err error - nrtUpd = NewNRTUpdater(args, policyInitial) + nrtUpd = NewNRTUpdater(args, policyInitial, tmConfInitial) err = nrtUpd.UpdateWithClient( cli, MonitorInfo{ @@ -76,9 +86,9 @@ func TestUpdateTMPolicy(t *testing.T) { if err != nil { t.Fatalf("failed to get the NRT object from tracker: %v", err) } - checkTMPolicy(t, obj, policyInitial) + checkTMPolicy(t, obj, policyInitial, tmConfInitial) - nrtUpd = NewNRTUpdater(args, policyUpdated) + nrtUpd = NewNRTUpdater(args, policyUpdated, tmConfUpdated) err = nrtUpd.UpdateWithClient( cli, MonitorInfo{ @@ -112,10 +122,10 @@ func TestUpdateTMPolicy(t *testing.T) { if err != nil { t.Fatalf("failed to get the NRT object from tracker: %v", err) } - checkTMPolicy(t, obj, policyUpdated) + checkTMPolicy(t, obj, policyUpdated, tmConfUpdated) } -func checkTMPolicy(t *testing.T, obj runtime.Object, expectedPolicy string) { +func checkTMPolicy(t *testing.T, obj runtime.Object, expectedPolicy string, expectedConf TMConfig) { t.Helper() nrtObj, ok := obj.(*v1alpha2.NodeResourceTopology) @@ -128,4 +138,23 @@ func checkTMPolicy(t *testing.T, obj runtime.Object, expectedPolicy string) { if nrtObj.TopologyPolicies[0] != expectedPolicy { t.Fatalf("topology policy mismatch: expected %q got %q", expectedPolicy, nrtObj.TopologyPolicies[0]) } + gotConf := tmConfigFromAttributes(nrtObj.Attributes) + if !reflect.DeepEqual(gotConf, expectedConf) { + t.Fatalf("config got=%+#v expected=%+#v", gotConf, expectedConf) + } +} + +func tmConfigFromAttributes(attrs v1alpha2.AttributeList) TMConfig { + conf := TMConfig{} + for _, attr := range attrs { + if attr.Name == "topologyManagerScope" { + conf.Scope = attr.Value + continue + } + if attr.Name == "topologyManagerPolicy" { + conf.Policy = attr.Value + continue + } + } + return conf } diff --git a/pkg/resourcemonitor/resourcemonitor.go b/pkg/resourcemonitor/resourcemonitor.go index 8bb8b10e6..df486d126 100644 --- a/pkg/resourcemonitor/resourcemonitor.go +++ b/pkg/resourcemonitor/resourcemonitor.go @@ -23,6 +23,7 @@ import ( "os" "path/filepath" "reflect" + "sort" "strconv" "strings" "time" @@ -37,6 +38,7 @@ import ( podresourcesapi "k8s.io/kubelet/pkg/apis/podresources/v1" "github.com/jaypipes/ghw" + "github.com/k8stopologyawareschedwg/noderesourcetopology-api/pkg/apis/topology/v1alpha2" topologyv1alpha2 "github.com/k8stopologyawareschedwg/noderesourcetopology-api/pkg/apis/topology/v1alpha2" "github.com/k8stopologyawareschedwg/podfingerprint" "github.com/k8stopologyawareschedwg/resource-topology-exporter/pkg/k8shelpers" @@ -61,8 +63,32 @@ type Args struct { PodSetFingerprintStatusFile string } +type ScanResponse struct { + Zones v1alpha2.ZoneList + Attributes v1alpha2.AttributeList + Annotations map[string]string +} + +func (sr ScanResponse) SortedZones() v1alpha2.ZoneList { + res := sr.Zones.DeepCopy() + sort.Slice(res, func(i, j int) bool { + return res[i].Name < res[j].Name + }) + for _, resource := range res { + sort.Slice(resource.Costs, func(x, y int) bool { + return resource.Costs[x].Name < resource.Costs[y].Name + }) + } + for _, resource := range res { + sort.Slice(resource.Resources, func(x, y int) bool { + return resource.Resources[x].Name < resource.Resources[y].Name + }) + } + return res +} + type ResourceMonitor interface { - Scan(excludeList ResourceExcludeList) (topologyv1alpha2.ZoneList, map[string]string, error) + Scan(excludeList ResourceExcludeList) (ScanResponse, error) } // ToMapSet keeps the original keys, but replaces values with set.String types @@ -174,23 +200,30 @@ func WithNodeName(name string) func(*resourceMonitor) { } } -func (rm *resourceMonitor) Scan(excludeList ResourceExcludeList) (topologyv1alpha2.ZoneList, map[string]string, error) { +func (rm *resourceMonitor) Scan(excludeList ResourceExcludeList) (ScanResponse, error) { ctx, cancel := context.WithTimeout(context.Background(), defaultPodResourcesTimeout) defer cancel() resp, err := rm.podResCli.List(ctx, &podresourcesapi.ListPodResourcesRequest{}) if err != nil { prometheus.UpdatePodResourceApiCallsFailureMetric("list") - return nil, nil, err + return ScanResponse{}, err } var st podfingerprint.Status - annotations := make(map[string]string) + scanRes := ScanResponse{ + Attributes: topologyv1alpha2.AttributeList{}, + Annotations: map[string]string{}, + } respPodRes := resp.GetPodResources() if rm.args.PodSetFingerprint { pfpSign := ComputePodFingerprint(respPodRes, &st) - annotations[podfingerprint.Annotation] = pfpSign + scanRes.Attributes = append(scanRes.Attributes, topologyv1alpha2.AttributeInfo{ + Name: "nodeTopologyPodsFingerprint", + Value: pfpSign, + }) + scanRes.Annotations[podfingerprint.Annotation] = pfpSign klog.V(6).Infof("pfp: " + st.Repr()) } @@ -198,7 +231,7 @@ func (rm *resourceMonitor) Scan(excludeList ResourceExcludeList) (topologyv1alph allocated := ContainerDevicesToPerNUMAResourceCounters(allDevs) excludeSet := excludeList.ToMapSet() - zones := make(topologyv1alpha2.ZoneList, 0) + zones := make(topologyv1alpha2.ZoneList, 0, len(rm.topo.Nodes)) // if there are no allocatable resources under a NUMA we might ended up with holes in the NRT objects. // this is why we're using the topology info and not the nodeAllocatable for nodeID := range rm.topo.Nodes { @@ -274,6 +307,7 @@ func (rm *resourceMonitor) Scan(excludeList ResourceExcludeList) (topologyv1alph zones = append(zones, zone) } + scanRes.Zones = zones if rm.args.PodSetFingerprint && rm.args.PodSetFingerprintStatusFile != "" { dir, file := filepath.Split(rm.args.PodSetFingerprintStatusFile) @@ -281,7 +315,7 @@ func (rm *resourceMonitor) Scan(excludeList ResourceExcludeList) (topologyv1alph klog.V(6).InfoS("error dumping the pfp status to %q (%v): %v", rm.args.PodSetFingerprintStatusFile, file, err) // intentionally ignore error, we must keep going. } - return zones, annotations, nil + return scanRes, nil } func (rm *resourceMonitor) updateNodeCapacity() error { diff --git a/pkg/resourcemonitor/resourcemonitor_test.go b/pkg/resourcemonitor/resourcemonitor_test.go index eacb984b7..149e3f8ff 100644 --- a/pkg/resourcemonitor/resourcemonitor_test.go +++ b/pkg/resourcemonitor/resourcemonitor_test.go @@ -529,22 +529,10 @@ func TestResourcesScan(t *testing.T) { PodResources: []*v1.PodResources{}, } mockPodResClient.On("List", mock.AnythingOfType("*context.timerCtx"), mock.AnythingOfType("*v1.ListPodResourcesRequest")).Return(resp, nil) - res, _, err := resMon.Scan(ResourceExcludeList{}) // no pods allocation + scanRes, err := resMon.Scan(ResourceExcludeList{}) // no pods allocation So(err, ShouldBeNil) - sort.Slice(res, func(i, j int) bool { - return res[i].Name < res[j].Name - }) - for _, resource := range res { - sort.Slice(resource.Costs, func(x, y int) bool { - return resource.Costs[x].Name < resource.Costs[y].Name - }) - } - for _, resource := range res { - sort.Slice(resource.Resources, func(x, y int) bool { - return resource.Resources[x].Name < resource.Resources[y].Name - }) - } + res := scanRes.SortedZones() log.Printf("result=%v", res) log.Printf("expected=%v", expected) log.Printf("diff=%s", cmp.Diff(res, expected)) @@ -638,22 +626,10 @@ func TestResourcesScan(t *testing.T) { PodResources: []*v1.PodResources{}, } mockPodResClient.On("List", mock.AnythingOfType("*context.timerCtx"), mock.AnythingOfType("*v1.ListPodResourcesRequest")).Return(resp, nil) - res, _, err := resMon.Scan(ResourceExcludeList{}) // no pods allocation + scanRes, err := resMon.Scan(ResourceExcludeList{}) // no pods allocation So(err, ShouldBeNil) - sort.Slice(res, func(i, j int) bool { - return res[i].Name < res[j].Name - }) - for _, resource := range res { - sort.Slice(resource.Costs, func(x, y int) bool { - return resource.Costs[x].Name < resource.Costs[y].Name - }) - } - for _, resource := range res { - sort.Slice(resource.Resources, func(x, y int) bool { - return resource.Resources[x].Name < resource.Resources[y].Name - }) - } + res := scanRes.SortedZones() log.Printf("result=%v", res) log.Printf("expected=%v", expected) log.Printf("diff=%s", cmp.Diff(res, expected)) @@ -846,8 +822,11 @@ func TestResourcesScan(t *testing.T) { } mockPodResClient.On("List", mock.AnythingOfType("*context.timerCtx"), mock.AnythingOfType("*v1.ListPodResourcesRequest")).Return(resp, nil) - res, _, err := resMon.Scan(excludeList) + scanRes, err := resMon.Scan(excludeList) So(err, ShouldBeNil) + + res := scanRes.Zones.DeepCopy() + // Check if resources were excluded correctly for _, zone := range res { for _, resource := range zone.Resources { @@ -1014,8 +993,10 @@ func TestResourcesScan(t *testing.T) { } mockPodResClient.On("List", mock.AnythingOfType("*context.timerCtx"), mock.AnythingOfType("*v1.ListPodResourcesRequest")).Return(resp, nil) - res, _, err := resMon.Scan(excludeList) + scanRes, err := resMon.Scan(excludeList) So(err, ShouldBeNil) + + res := scanRes.Zones.DeepCopy() // Check if resources were excluded correctly for _, zone := range res { for _, resource := range zone.Resources { @@ -1235,8 +1216,10 @@ func TestResourcesScan(t *testing.T) { } mockPodResClient.On("List", mock.AnythingOfType("*context.timerCtx"), mock.AnythingOfType("*v1.ListPodResourcesRequest")).Return(resp, nil) - res, _, err := resMon.Scan(excludeList) + scanRes, err := resMon.Scan(excludeList) So(err, ShouldBeNil) + + res := scanRes.Zones.DeepCopy() // Check if resources were excluded correctly for _, zone := range res { for _, resource := range zone.Resources { @@ -1363,10 +1346,10 @@ func TestResourcesScan(t *testing.T) { mockPodResClient.On("GetAllocatableResources", mock.AnythingOfType("*context.timerCtx"), mock.AnythingOfType("*v1.AllocatableResourcesRequest")).Return(allocRes, nil) mockPodResClient.On("List", mock.AnythingOfType("*context.timerCtx"), mock.AnythingOfType("*v1.ListPodResourcesRequest")).Return(resp, nil) - _, ann, err := resMon.Scan(ResourceExcludeList{}) + scanRes, err := resMon.Scan(ResourceExcludeList{}) expectedFP := "pfp0v001fe53c4dbd2c5f4a0" // pre-computed and validated manually - fp, ok := ann[podfingerprint.Annotation] + fp, ok := scanRes.Annotations[podfingerprint.Annotation] So(ok, ShouldBeTrue) log.Printf("FP %q expected %q", fp, expectedFP) So(cmp.Equal(fp, expectedFP), ShouldBeTrue) diff --git a/pkg/resourcetopologyexporter/resourceobserver.go b/pkg/resourcetopologyexporter/resourceobserver.go index 100c9188d..9b0613eff 100644 --- a/pkg/resourcetopologyexporter/resourceobserver.go +++ b/pkg/resourcetopologyexporter/resourceobserver.go @@ -52,6 +52,7 @@ func (rm *ResourceObserver) Run(eventsChan <-chan notification.Event, condChan c select { case ev := <-eventsChan: var err error + monInfo := nrtupdater.MonitorInfo{Timer: ev.IsTimer()} tsWakeupDiff := ev.Timestamp.Sub(lastWakeup) @@ -59,9 +60,13 @@ func (rm *ResourceObserver) Run(eventsChan <-chan notification.Event, condChan c prometheus.UpdateWakeupDelayMetric(monInfo.UpdateReason(), float64(tsWakeupDiff.Milliseconds())) tsBegin := time.Now() - monInfo.Zones, monInfo.Annotations, err = rm.resMon.Scan(rm.excludeList) + scanRes, err := rm.resMon.Scan(rm.excludeList) tsEnd := time.Now() + monInfo.Annotations = scanRes.Annotations + monInfo.Attributes = scanRes.Attributes + monInfo.Zones = scanRes.Zones + if rm.exposeTiming { monInfo.Annotations[k8sannotations.SleepDuration] = clampTime(tsWakeupDiff.Round(time.Second)).String() monInfo.Annotations[k8sannotations.UpdateInterval] = clampTime(ev.TimerInterval).String() diff --git a/pkg/resourcetopologyexporter/resourcetopologyexporter.go b/pkg/resourcetopologyexporter/resourcetopologyexporter.go index 63fd492a4..6dd8119c3 100644 --- a/pkg/resourcetopologyexporter/resourcetopologyexporter.go +++ b/pkg/resourcetopologyexporter/resourcetopologyexporter.go @@ -36,10 +36,14 @@ type Args struct { } func Execute(cli podresourcesapi.PodResourcesListerClient, nrtupdaterArgs nrtupdater.Args, resourcemonitorArgs resourcemonitor.Args, rteArgs Args) error { - tmPolicy, err := getTopologyManagerPolicy(resourcemonitorArgs, rteArgs) + tmPolicy, err := getTopologyManagerPolicy(rteArgs) if err != nil { return err } + tmConf := nrtupdater.TMConfig{ + Policy: rteArgs.TopologyManagerPolicy, + Scope: rteArgs.TopologyManagerScope, + } var condChan chan v1.PodCondition if rteArgs.PodReadinessEnable { @@ -62,7 +66,7 @@ func Execute(cli podresourcesapi.PodResourcesListerClient, nrtupdaterArgs nrtupd } go resObs.Run(eventSource.Events(), condChan) - upd := nrtupdater.NewNRTUpdater(nrtupdaterArgs, string(tmPolicy)) + upd := nrtupdater.NewNRTUpdater(nrtupdaterArgs, string(tmPolicy), tmConf) go upd.Run(resObs.Infos, condChan) go eventSource.Run() @@ -108,7 +112,7 @@ func createEventSource(rteArgs *Args) (notification.EventSource, error) { return es, nil } -func getTopologyManagerPolicy(resourcemonitorArgs resourcemonitor.Args, rteArgs Args) (v1alpha2.TopologyManagerPolicy, error) { +func getTopologyManagerPolicy(rteArgs Args) (v1alpha2.TopologyManagerPolicy, error) { if rteArgs.TopologyManagerPolicy != "" && rteArgs.TopologyManagerScope != "" { klog.Infof("using given Topology Manager policy %q scope %q", rteArgs.TopologyManagerPolicy, rteArgs.TopologyManagerScope) return topologypolicy.DetectTopologyPolicy(rteArgs.TopologyManagerPolicy, rteArgs.TopologyManagerScope), nil diff --git a/tools/nrtstress/main.go b/tools/nrtstress/main.go index a0b9be7e0..c399bdfa5 100644 --- a/tools/nrtstress/main.go +++ b/tools/nrtstress/main.go @@ -14,11 +14,13 @@ import ( func main() { var hostname string var tmPolicy string + var tmScope string var interval time.Duration var seed int var dryRun bool flag.StringVar(&hostname, "hosthame", "fake-host-0", "fake host name (not validated)") flag.StringVar(&tmPolicy, "tm-policy", "single-numa-node", "topology manager policy (not validated)") + flag.StringVar(&tmScope, "tm-scope", "pod", "topology manager scope (not validated)") flag.DurationVar(&interval, "interval", 10*time.Second, "periodic update interval") flag.IntVar(&seed, "random-seed", 0, "random seed (use time)") flag.BoolVar(&dryRun, "dry-run", false, "don't send data") @@ -47,6 +49,10 @@ func main() { klog.Infof("using NRT Updater args: %+#v", nrtupdaterArgs) - upd := nrtupdater.NewNRTUpdater(nrtupdaterArgs, tmPolicy) + tmConf := nrtupdater.TMConfig{ + Policy: tmPolicy, + Scope: tmScope, + } + upd := nrtupdater.NewNRTUpdater(nrtupdaterArgs, tmPolicy, tmConf) upd.Run(gen.Infos, nil) }