Skip to content

Commit

Permalink
nrt: expose TM config as top-level attributes
Browse files Browse the repository at this point in the history
expose TM configuration as attributes.
This is in addition to TopologyPolicies field, which is
being deprecated. See:
- k8stopologyawareschedwg/noderesourcetopology-api#24
- k8stopologyawareschedwg/noderesourcetopology-api#25

Signed-off-by: Francesco Romani <[email protected]>
  • Loading branch information
ffromani committed Feb 7, 2023
1 parent 2f7bf2f commit c356833
Show file tree
Hide file tree
Showing 7 changed files with 141 additions and 52 deletions.
32 changes: 30 additions & 2 deletions pkg/nrtupdater/nrtupdater.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,15 +32,26 @@ type Args struct {
Hostname string
}

type TMConfig struct {
Policy string
Scope string
}

func (conf TMConfig) IsValid() bool {
return conf.Policy != "" && conf.Scope != ""
}

type NRTUpdater struct {
args Args
tmPolicy string
tmConfig TMConfig
stopChan chan struct{}
}

type MonitorInfo struct {
Timer bool
Zones v1alpha2.ZoneList
Attributes v1alpha2.AttributeList
Annotations map[string]string
}

Expand All @@ -51,10 +62,11 @@ func (mi MonitorInfo) UpdateReason() string {
return RTEUpdateReactive
}

func NewNRTUpdater(args Args, policy string) *NRTUpdater {
func NewNRTUpdater(args Args, policy string, tmconf TMConfig) *NRTUpdater {
return &NRTUpdater{
args: args,
tmPolicy: policy,
tmConfig: tmconf,
stopChan: make(chan struct{}),
}
}
Expand Down Expand Up @@ -117,7 +129,23 @@ func (te *NRTUpdater) updateNRTInfo(nrt *v1alpha2.NodeResourceTopology, info Mon
nrt.Annotations = mergeAnnotations(nrt.Annotations, info.Annotations)
nrt.Annotations[k8sannotations.RTEUpdate] = info.UpdateReason()
nrt.TopologyPolicies = []string{te.tmPolicy}
nrt.Zones = info.Zones
nrt.Zones = info.Zones.DeepCopy()
nrt.Attributes = info.Attributes.DeepCopy()
nrt.Attributes = append(nrt.Attributes, te.makeAttributes()...)
// TODO: check for duplicate attributes?
}

func (te *NRTUpdater) makeAttributes() v1alpha2.AttributeList {
return v1alpha2.AttributeList{
{
Name: "topologyManagerScope",
Value: te.tmConfig.Scope,
},
{
Name: "topologyManagerPolicy",
Value: te.tmConfig.Policy,
},
}
}

func (te *NRTUpdater) Stop() {
Expand Down
39 changes: 34 additions & 5 deletions pkg/nrtupdater/nrtupdater_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package nrtupdater

import (
"reflect"
"testing"

corev1 "k8s.io/api/core/v1"
Expand All @@ -40,8 +41,17 @@ func TestUpdateTMPolicy(t *testing.T) {
policyInitial := "policy-initial"
policyUpdated := "policy-updated"

tmConfInitial := TMConfig{
Scope: "scope-initial",
Policy: "policy-initial",
}
tmConfUpdated := TMConfig{
Scope: "scope-updated",
Policy: "polcy-updated",
}

var err error
nrtUpd = NewNRTUpdater(args, policyInitial)
nrtUpd = NewNRTUpdater(args, policyInitial, tmConfInitial)
err = nrtUpd.UpdateWithClient(
cli,
MonitorInfo{
Expand Down Expand Up @@ -76,9 +86,9 @@ func TestUpdateTMPolicy(t *testing.T) {
if err != nil {
t.Fatalf("failed to get the NRT object from tracker: %v", err)
}
checkTMPolicy(t, obj, policyInitial)
checkTMPolicy(t, obj, policyInitial, tmConfInitial)

nrtUpd = NewNRTUpdater(args, policyUpdated)
nrtUpd = NewNRTUpdater(args, policyUpdated, tmConfUpdated)
err = nrtUpd.UpdateWithClient(
cli,
MonitorInfo{
Expand Down Expand Up @@ -112,10 +122,10 @@ func TestUpdateTMPolicy(t *testing.T) {
if err != nil {
t.Fatalf("failed to get the NRT object from tracker: %v", err)
}
checkTMPolicy(t, obj, policyUpdated)
checkTMPolicy(t, obj, policyUpdated, tmConfUpdated)
}

func checkTMPolicy(t *testing.T, obj runtime.Object, expectedPolicy string) {
func checkTMPolicy(t *testing.T, obj runtime.Object, expectedPolicy string, expectedConf TMConfig) {
t.Helper()

nrtObj, ok := obj.(*v1alpha2.NodeResourceTopology)
Expand All @@ -128,4 +138,23 @@ func checkTMPolicy(t *testing.T, obj runtime.Object, expectedPolicy string) {
if nrtObj.TopologyPolicies[0] != expectedPolicy {
t.Fatalf("topology policy mismatch: expected %q got %q", expectedPolicy, nrtObj.TopologyPolicies[0])
}
gotConf := tmConfigFromAttributes(nrtObj.Attributes)
if !reflect.DeepEqual(gotConf, expectedConf) {
t.Fatalf("config got=%+#v expected=%+#v", gotConf, expectedConf)
}
}

func tmConfigFromAttributes(attrs v1alpha2.AttributeList) TMConfig {
conf := TMConfig{}
for _, attr := range attrs {
if attr.Name == "topologyManagerScope" {
conf.Scope = attr.Value
continue
}
if attr.Name == "topologyManagerPolicy" {
conf.Policy = attr.Value
continue
}
}
return conf
}
48 changes: 41 additions & 7 deletions pkg/resourcemonitor/resourcemonitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"os"
"path/filepath"
"reflect"
"sort"
"strconv"
"strings"
"time"
Expand All @@ -37,6 +38,7 @@ import (
podresourcesapi "k8s.io/kubelet/pkg/apis/podresources/v1"

"github.com/jaypipes/ghw"
"github.com/k8stopologyawareschedwg/noderesourcetopology-api/pkg/apis/topology/v1alpha2"
topologyv1alpha2 "github.com/k8stopologyawareschedwg/noderesourcetopology-api/pkg/apis/topology/v1alpha2"
"github.com/k8stopologyawareschedwg/podfingerprint"
"github.com/k8stopologyawareschedwg/resource-topology-exporter/pkg/k8shelpers"
Expand All @@ -61,8 +63,32 @@ type Args struct {
PodSetFingerprintStatusFile string
}

type ScanResponse struct {
Zones v1alpha2.ZoneList
Attributes v1alpha2.AttributeList
Annotations map[string]string
}

func (sr ScanResponse) SortedZones() v1alpha2.ZoneList {
res := sr.Zones.DeepCopy()
sort.Slice(res, func(i, j int) bool {
return res[i].Name < res[j].Name
})
for _, resource := range res {
sort.Slice(resource.Costs, func(x, y int) bool {
return resource.Costs[x].Name < resource.Costs[y].Name
})
}
for _, resource := range res {
sort.Slice(resource.Resources, func(x, y int) bool {
return resource.Resources[x].Name < resource.Resources[y].Name
})
}
return res
}

type ResourceMonitor interface {
Scan(excludeList ResourceExcludeList) (topologyv1alpha2.ZoneList, map[string]string, error)
Scan(excludeList ResourceExcludeList) (ScanResponse, error)
}

// ToMapSet keeps the original keys, but replaces values with set.String types
Expand Down Expand Up @@ -174,31 +200,38 @@ func WithNodeName(name string) func(*resourceMonitor) {
}
}

func (rm *resourceMonitor) Scan(excludeList ResourceExcludeList) (topologyv1alpha2.ZoneList, map[string]string, error) {
func (rm *resourceMonitor) Scan(excludeList ResourceExcludeList) (ScanResponse, error) {
ctx, cancel := context.WithTimeout(context.Background(), defaultPodResourcesTimeout)
defer cancel()
resp, err := rm.podResCli.List(ctx, &podresourcesapi.ListPodResourcesRequest{})
if err != nil {
prometheus.UpdatePodResourceApiCallsFailureMetric("list")
return nil, nil, err
return ScanResponse{}, err
}

var st podfingerprint.Status
annotations := make(map[string]string)
scanRes := ScanResponse{
Attributes: topologyv1alpha2.AttributeList{},
Annotations: map[string]string{},
}

respPodRes := resp.GetPodResources()

if rm.args.PodSetFingerprint {
pfpSign := ComputePodFingerprint(respPodRes, &st)
annotations[podfingerprint.Annotation] = pfpSign
scanRes.Attributes = append(scanRes.Attributes, topologyv1alpha2.AttributeInfo{
Name: podfingerprint.Attribute,
Value: pfpSign,
})
scanRes.Annotations[podfingerprint.Annotation] = pfpSign
klog.V(6).Infof("pfp: " + st.Repr())
}

allDevs := GetAllContainerDevices(respPodRes, rm.args.Namespace, rm.coreIDToNodeIDMap)
allocated := ContainerDevicesToPerNUMAResourceCounters(allDevs)

excludeSet := excludeList.ToMapSet()
zones := make(topologyv1alpha2.ZoneList, 0)
zones := make(topologyv1alpha2.ZoneList, 0, len(rm.topo.Nodes))
// if there are no allocatable resources under a NUMA we might ended up with holes in the NRT objects.
// this is why we're using the topology info and not the nodeAllocatable
for nodeID := range rm.topo.Nodes {
Expand Down Expand Up @@ -274,14 +307,15 @@ func (rm *resourceMonitor) Scan(excludeList ResourceExcludeList) (topologyv1alph

zones = append(zones, zone)
}
scanRes.Zones = zones

if rm.args.PodSetFingerprint && rm.args.PodSetFingerprintStatusFile != "" {
dir, file := filepath.Split(rm.args.PodSetFingerprintStatusFile)
err := toFile(st, dir, file)
klog.V(6).InfoS("error dumping the pfp status to %q (%v): %v", rm.args.PodSetFingerprintStatusFile, file, err)
// intentionally ignore error, we must keep going.
}
return zones, annotations, nil
return scanRes, nil
}

func (rm *resourceMonitor) updateNodeCapacity() error {
Expand Down
49 changes: 16 additions & 33 deletions pkg/resourcemonitor/resourcemonitor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -529,22 +529,10 @@ func TestResourcesScan(t *testing.T) {
PodResources: []*v1.PodResources{},
}
mockPodResClient.On("List", mock.AnythingOfType("*context.timerCtx"), mock.AnythingOfType("*v1.ListPodResourcesRequest")).Return(resp, nil)
res, _, err := resMon.Scan(ResourceExcludeList{}) // no pods allocation
scanRes, err := resMon.Scan(ResourceExcludeList{}) // no pods allocation
So(err, ShouldBeNil)

sort.Slice(res, func(i, j int) bool {
return res[i].Name < res[j].Name
})
for _, resource := range res {
sort.Slice(resource.Costs, func(x, y int) bool {
return resource.Costs[x].Name < resource.Costs[y].Name
})
}
for _, resource := range res {
sort.Slice(resource.Resources, func(x, y int) bool {
return resource.Resources[x].Name < resource.Resources[y].Name
})
}
res := scanRes.SortedZones()
log.Printf("result=%v", res)
log.Printf("expected=%v", expected)
log.Printf("diff=%s", cmp.Diff(res, expected))
Expand Down Expand Up @@ -638,22 +626,10 @@ func TestResourcesScan(t *testing.T) {
PodResources: []*v1.PodResources{},
}
mockPodResClient.On("List", mock.AnythingOfType("*context.timerCtx"), mock.AnythingOfType("*v1.ListPodResourcesRequest")).Return(resp, nil)
res, _, err := resMon.Scan(ResourceExcludeList{}) // no pods allocation
scanRes, err := resMon.Scan(ResourceExcludeList{}) // no pods allocation
So(err, ShouldBeNil)

sort.Slice(res, func(i, j int) bool {
return res[i].Name < res[j].Name
})
for _, resource := range res {
sort.Slice(resource.Costs, func(x, y int) bool {
return resource.Costs[x].Name < resource.Costs[y].Name
})
}
for _, resource := range res {
sort.Slice(resource.Resources, func(x, y int) bool {
return resource.Resources[x].Name < resource.Resources[y].Name
})
}
res := scanRes.SortedZones()
log.Printf("result=%v", res)
log.Printf("expected=%v", expected)
log.Printf("diff=%s", cmp.Diff(res, expected))
Expand Down Expand Up @@ -846,8 +822,11 @@ func TestResourcesScan(t *testing.T) {
}

mockPodResClient.On("List", mock.AnythingOfType("*context.timerCtx"), mock.AnythingOfType("*v1.ListPodResourcesRequest")).Return(resp, nil)
res, _, err := resMon.Scan(excludeList)
scanRes, err := resMon.Scan(excludeList)
So(err, ShouldBeNil)

res := scanRes.Zones.DeepCopy()

// Check if resources were excluded correctly
for _, zone := range res {
for _, resource := range zone.Resources {
Expand Down Expand Up @@ -1014,8 +993,10 @@ func TestResourcesScan(t *testing.T) {
}

mockPodResClient.On("List", mock.AnythingOfType("*context.timerCtx"), mock.AnythingOfType("*v1.ListPodResourcesRequest")).Return(resp, nil)
res, _, err := resMon.Scan(excludeList)
scanRes, err := resMon.Scan(excludeList)
So(err, ShouldBeNil)

res := scanRes.Zones.DeepCopy()
// Check if resources were excluded correctly
for _, zone := range res {
for _, resource := range zone.Resources {
Expand Down Expand Up @@ -1235,8 +1216,10 @@ func TestResourcesScan(t *testing.T) {
}

mockPodResClient.On("List", mock.AnythingOfType("*context.timerCtx"), mock.AnythingOfType("*v1.ListPodResourcesRequest")).Return(resp, nil)
res, _, err := resMon.Scan(excludeList)
scanRes, err := resMon.Scan(excludeList)
So(err, ShouldBeNil)

res := scanRes.Zones.DeepCopy()
// Check if resources were excluded correctly
for _, zone := range res {
for _, resource := range zone.Resources {
Expand Down Expand Up @@ -1363,10 +1346,10 @@ func TestResourcesScan(t *testing.T) {

mockPodResClient.On("GetAllocatableResources", mock.AnythingOfType("*context.timerCtx"), mock.AnythingOfType("*v1.AllocatableResourcesRequest")).Return(allocRes, nil)
mockPodResClient.On("List", mock.AnythingOfType("*context.timerCtx"), mock.AnythingOfType("*v1.ListPodResourcesRequest")).Return(resp, nil)
_, ann, err := resMon.Scan(ResourceExcludeList{})
scanRes, err := resMon.Scan(ResourceExcludeList{})

expectedFP := "pfp0v001fe53c4dbd2c5f4a0" // pre-computed and validated manually
fp, ok := ann[podfingerprint.Annotation]
fp, ok := scanRes.Annotations[podfingerprint.Annotation]
So(ok, ShouldBeTrue)
log.Printf("FP %q expected %q", fp, expectedFP)
So(cmp.Equal(fp, expectedFP), ShouldBeTrue)
Expand Down
7 changes: 6 additions & 1 deletion pkg/resourcetopologyexporter/resourceobserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,16 +52,21 @@ func (rm *ResourceObserver) Run(eventsChan <-chan notification.Event, condChan c
select {
case ev := <-eventsChan:
var err error

monInfo := nrtupdater.MonitorInfo{Timer: ev.IsTimer()}

tsWakeupDiff := ev.Timestamp.Sub(lastWakeup)
lastWakeup = ev.Timestamp
prometheus.UpdateWakeupDelayMetric(monInfo.UpdateReason(), float64(tsWakeupDiff.Milliseconds()))

tsBegin := time.Now()
monInfo.Zones, monInfo.Annotations, err = rm.resMon.Scan(rm.excludeList)
scanRes, err := rm.resMon.Scan(rm.excludeList)
tsEnd := time.Now()

monInfo.Annotations = scanRes.Annotations
monInfo.Attributes = scanRes.Attributes
monInfo.Zones = scanRes.Zones

if rm.exposeTiming {
monInfo.Annotations[k8sannotations.SleepDuration] = clampTime(tsWakeupDiff.Round(time.Second)).String()
monInfo.Annotations[k8sannotations.UpdateInterval] = clampTime(ev.TimerInterval).String()
Expand Down
Loading

0 comments on commit c356833

Please sign in to comment.