Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

topology-updater:compute pod set fingerprint #1049

Merged
merged 3 commits into from
Feb 22, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions cmd/nfd-topology-updater/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,7 @@ func initFlags(flagset *flag.FlagSet) (*topology.Args, *resourcemonitor.Args) {
"Pod Resource Socket path to use.")
flagset.StringVar(&args.ConfigFile, "config", "/etc/kubernetes/node-feature-discovery/nfd-topology-updater.conf",
"Config file to use.")
flagset.BoolVar(&resourcemonitorArgs.PodSetFingerprint, "pods-fingerprint", false, "Compute and report the pod set fingerprint")

klog.InitFlags(flagset)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,9 @@ spec:
- "-key-file=/etc/kubernetes/node-feature-discovery/certs/tls.key"
- "-cert-file=/etc/kubernetes/node-feature-discovery/certs/tls.crt"
{{- end }}
{{- if .Values.topologyUpdater.podSetFingerprint }}
- "-pods-fingerprint"
{{- end }}
volumeMounts:
- name: kubelet-config
mountPath: /host-var/lib/kubelet/config.yaml
Expand Down
1 change: 1 addition & 0 deletions deployment/helm/node-feature-discovery/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -424,6 +424,7 @@ topologyUpdater:
tolerations: []
annotations: {}
affinity: {}
podSetFingerprint: true

topologyGC:
enable: true
Expand Down
1 change: 1 addition & 0 deletions docs/deployment/helm.md
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,7 @@ We have introduced the following Chart parameters.
| `topologyUpdater.annotations` | dict | {} | Topology updater pod [annotations](https://kubernetes.io/docs/concepts/overview/working-with-objects/annotations/) |
| `topologyUpdater.affinity` | dict | {} | Topology updater pod [affinity](https://kubernetes.io/docs/tasks/configure-pod-container/assign-pods-nodes-using-node-affinity/) |
| `topologyUpdater.config` | dict | | [configuration](../reference/topology-updater-configuration-reference) |
| `topologyUpdater.podSetFingerprint` | bool | false | Enables compute and report of pod fingerprint in NRT objects. |

### Topology garbage collector parameters

Expand Down
13 changes: 13 additions & 0 deletions docs/reference/topology-updater-commandline-reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -147,3 +147,16 @@ Example:
```bash
nfd-topology-updater -podresources-socket=/var/lib/kubelet/pod-resources/kubelet.sock
```

### -pods-fingerprint

Enbles the compute and report the pod set fingerprint in the NRT.
A pod fingerprint is a compact representation of the "node state" regarding resources.

Default: `false`

Example:

```bash
nfd-topology-updater -pods-fingerprint
```
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ require (
github.com/google/go-cmp v0.5.9
github.com/jaypipes/ghw v0.8.1-0.20210827132705-c7224150a17e
github.com/k8stopologyawareschedwg/noderesourcetopology-api v0.1.0
github.com/k8stopologyawareschedwg/podfingerprint v0.1.2
github.com/klauspost/cpuid/v2 v2.2.4
github.com/onsi/ginkgo/v2 v2.4.0
github.com/onsi/gomega v1.23.0
Expand Down Expand Up @@ -49,6 +50,7 @@ require (
github.com/Microsoft/go-winio v0.4.17 // indirect
github.com/Microsoft/hcsshim v0.8.22 // indirect
github.com/NYTimes/gziphandler v1.1.1 // indirect
github.com/OneOfOne/xxhash v1.2.8 // indirect
github.com/StackExchange/wmi v0.0.0-20190523213315-cbe66965904d // indirect
github.com/antlr/antlr4/runtime/Go/antlr v1.4.10 // indirect
github.com/armon/circbuf v0.0.0-20150827004946-bbbad097214e // indirect
Expand Down
4 changes: 4 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,8 @@ github.com/NYTimes/gziphandler v0.0.0-20170623195520-56545f4a5d46/go.mod h1:3wb0
github.com/NYTimes/gziphandler v1.1.1 h1:ZUDjpQae29j0ryrS0u/B8HZfJBtBQHjqw2rQ2cqUQ3I=
github.com/NYTimes/gziphandler v1.1.1/go.mod h1:n/CVRwUEOgIxrgPvAQhUUr9oeUtvrhMomdKFjzJNB0c=
github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAElWljhcU=
github.com/OneOfOne/xxhash v1.2.8 h1:31czK/TI9sNkxIKfaUfGlU47BAxQ0ztGgd9vPyqimf8=
github.com/OneOfOne/xxhash v1.2.8/go.mod h1:eZbhyaAYD41SGSSsnmcpxVoRiQ/MPUTjUdIIOT9Um7Q=
github.com/PuerkitoBio/purell v1.1.1/go.mod h1:c11w/QuzBsJSee3cPx9rAFu61PvFxuPbtSwDGJws/X0=
github.com/PuerkitoBio/urlesc v0.0.0-20170810143723-de5bf2ad4578/go.mod h1:uGdkoq3SwY9Y+13GIhn11/XLaGBb4BfwItxLd5jeuXE=
github.com/StackExchange/wmi v0.0.0-20190523213315-cbe66965904d h1:G0m3OIz70MZUWq3EgK3CesDbo8upS2Vm9/P3FtgI+Jk=
Expand Down Expand Up @@ -422,6 +424,8 @@ github.com/julienschmidt/httprouter v1.2.0/go.mod h1:SYymIcj16QtmaHHD7aYtjjsJG7V
github.com/julienschmidt/httprouter v1.3.0/go.mod h1:JR6WtHb+2LUe8TCKY3cZOxFyyO8IZAc4RVcycCCAKdM=
github.com/k8stopologyawareschedwg/noderesourcetopology-api v0.1.0 h1:2uCRJbv+A+fmaUaO0wLZ8oYd6cLE1dRzBQcFNxggH3s=
github.com/k8stopologyawareschedwg/noderesourcetopology-api v0.1.0/go.mod h1:AkACMQGiTgCt0lQw3m7TTU8PLH9lYKNK5e9DqFf5VuM=
github.com/k8stopologyawareschedwg/podfingerprint v0.1.2 h1:Db5KLJjPg2mKaCoeEliMlea+JMyDMWdbNPXnWbPNDyM=
github.com/k8stopologyawareschedwg/podfingerprint v0.1.2/go.mod h1:C23pM15t06dXg/OihGlqBvnYzLr+MXDXJ7zMfbNAyXI=
github.com/karrick/godirwalk v1.17.0 h1:b4kY7nqDdioR/6qnbHQyDvmA17u5G1cZ6J+CZXwSWoI=
github.com/karrick/godirwalk v1.17.0/go.mod h1:j4mkqPuvaLI8mp1DroR3P6ad7cyYd4c1qeJ3RV7ULlk=
github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8=
Expand Down
34 changes: 28 additions & 6 deletions pkg/nfd-topology-updater/nfd-topology-updater.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ func (w *nfdTopologyUpdater) Run() error {

var resScan resourcemonitor.ResourcesScanner

resScan, err = resourcemonitor.NewPodResourcesScanner(w.resourcemonitorArgs.Namespace, podResClient, w.apihelper)
resScan, err = resourcemonitor.NewPodResourcesScanner(w.resourcemonitorArgs.Namespace, podResClient, w.apihelper, w.resourcemonitorArgs.PodSetFingerprint)
if err != nil {
return fmt.Errorf("failed to initialize ResourceMonitor instance: %w", err)
}
Expand All @@ -154,16 +154,16 @@ func (w *nfdTopologyUpdater) Run() error {
select {
case <-crTrigger.C:
klog.Infof("Scanning")
podResources, err := resScan.Scan()
utils.KlogDump(1, "podResources are", " ", podResources)
scanResponse, err := resScan.Scan()
utils.KlogDump(1, "podResources are", " ", scanResponse.PodResources)
if err != nil {
klog.Warningf("Scan failed: %v", err)
continue
}
zones = resAggr.Aggregate(podResources)
zones = resAggr.Aggregate(scanResponse.PodResources)
utils.KlogDump(1, "After aggregating resources identified zones are", " ", zones)
if !w.args.NoPublish {
if err = w.updateNodeResourceTopology(zones); err != nil {
if err = w.updateNodeResourceTopology(zones, scanResponse); err != nil {
return err
}
}
Expand All @@ -188,7 +188,7 @@ func (w *nfdTopologyUpdater) Stop() {
}
}

func (w *nfdTopologyUpdater) updateNodeResourceTopology(zoneInfo v1alpha2.ZoneList) error {
func (w *nfdTopologyUpdater) updateNodeResourceTopology(zoneInfo v1alpha2.ZoneList, scanResponse resourcemonitor.ScanResponse) error {
cli, err := w.apihelper.GetTopologyClient()
if err != nil {
return err
Expand All @@ -205,6 +205,8 @@ func (w *nfdTopologyUpdater) updateNodeResourceTopology(zoneInfo v1alpha2.ZoneLi
Attributes: createTopologyAttributes(w.nodeInfo.tmPolicy, w.nodeInfo.tmScope),
}

updateAttributes(&nrtNew.Attributes, scanResponse.Attributes)

_, err := cli.TopologyV1alpha2().NodeResourceTopologies().Create(context.TODO(), &nrtNew, metav1.CreateOptions{})
if err != nil {
return fmt.Errorf("failed to create NodeResourceTopology: %w", err)
Expand All @@ -216,6 +218,7 @@ func (w *nfdTopologyUpdater) updateNodeResourceTopology(zoneInfo v1alpha2.ZoneLi

nrtMutated := nrt.DeepCopy()
nrtMutated.Zones = zoneInfo
updateAttributes(&nrtMutated.Attributes, scanResponse.Attributes)

nrtUpdated, err := cli.TopologyV1alpha2().NodeResourceTopologies().Update(context.TODO(), nrtMutated, metav1.UpdateOptions{})
if err != nil {
Expand Down Expand Up @@ -261,3 +264,22 @@ func createTopologyAttributes(policy string, scope string) v1alpha2.AttributeLis
},
}
}

func updateAttribute(attrList *v1alpha2.AttributeList, attrInfo v1alpha2.AttributeInfo) {
if attrList == nil {
return
}

for idx := range *attrList {
if (*attrList)[idx].Name == attrInfo.Name {
(*attrList)[idx].Value = attrInfo.Value
return
}
}
*attrList = append(*attrList, attrInfo)
}
func updateAttributes(lhs *v1alpha2.AttributeList, rhs v1alpha2.AttributeList) {
jlojosnegros marked this conversation as resolved.
Show resolved Hide resolved
for _, attr := range rhs {
updateAttribute(lhs, attr)
}
}
112 changes: 112 additions & 0 deletions pkg/nfd-topology-updater/nfd-topology-updater_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
/*
Copyright 2023 The Kubernetes Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package nfdtopologyupdater

import (
"fmt"
"testing"

"github.com/k8stopologyawareschedwg/noderesourcetopology-api/pkg/apis/topology/v1alpha2"
. "github.com/smartystreets/goconvey/convey"
)

func TestTopologyUpdater(t *testing.T) {

Convey("Given a list of Attributes", t, func() {

attr_two := v1alpha2.AttributeInfo{
Name: "attr_two_name",
Value: "attr_two_value",
}

attrList := v1alpha2.AttributeList{
v1alpha2.AttributeInfo{
Name: "attr_one_name",
Value: "attr_one_value",
},
attr_two,
v1alpha2.AttributeInfo{
Name: "attr_three_name",
Value: "attr_three_value",
},
}
attrListLen := len(attrList)
attrNames := getListOfNames(attrList)

Convey("When an existing attribute is updated", func() {

updatedAttribute := v1alpha2.AttributeInfo{
Name: attr_two.Name,
Value: attr_two.Value + "_new",
}
updateAttribute(&attrList, updatedAttribute)

Convey("Then list should have the same number of elements", func() {
So(attrList, ShouldHaveLength, attrListLen)
})
Convey("Then the order of the elemens should be the same", func() {
So(attrNames, ShouldResemble, getListOfNames(attrList))
})
Convey("Then Attribute value in the list should be updated", func() {
attr, err := findAttributeByName(attrList, attr_two.Name)
So(err, ShouldBeNil)
So(attr.Value, ShouldEqual, updatedAttribute.Value)
})
})

Convey("When a non existing attribute is updated", func() {
completelyNewAttribute := v1alpha2.AttributeInfo{
Name: "NonExistingAttribute_Name",
Value: "NonExistingAttribute_Value",
}
_, err := findAttributeByName(attrList, completelyNewAttribute.Name)
So(err, ShouldNotBeNil)

updateAttribute(&attrList, completelyNewAttribute)

Convey("Then list should have the one more element", func() {
So(attrList, ShouldHaveLength, attrListLen+1)
})

Convey("Then new Attribute should be added at the end of the list", func() {
So(attrList[len(attrList)-1], ShouldResemble, completelyNewAttribute)
})

Convey("Then the order of the elemens should be the same", func() {
So(attrNames, ShouldResemble, getListOfNames(attrList[:len(attrList)-1]))
})
})
})
}

func getListOfNames(attrList v1alpha2.AttributeList) []string {
ret := make([]string, len(attrList))

for idx, attr := range attrList {
ret[idx] = attr.Name
}
return ret
}

func findAttributeByName(attrList v1alpha2.AttributeList, name string) (v1alpha2.AttributeInfo, error) {
for _, attr := range attrList {
if attr.Name == name {
return attr, nil
}
}
return v1alpha2.AttributeInfo{}, fmt.Errorf("Attribute Not Found name:=%s", name)
}
49 changes: 43 additions & 6 deletions pkg/resourcemonitor/podresourcesscanner.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,20 +27,25 @@ import (
podresourcesapi "k8s.io/kubelet/pkg/apis/podresources/v1"

"sigs.k8s.io/node-feature-discovery/pkg/apihelper"

"github.com/k8stopologyawareschedwg/noderesourcetopology-api/pkg/apis/topology/v1alpha2"
"github.com/k8stopologyawareschedwg/podfingerprint"
)

type PodResourcesScanner struct {
namespace string
podResourceClient podresourcesapi.PodResourcesListerClient
apihelper apihelper.APIHelpers
podFingerprint bool
}

// NewPodResourcesScanner creates a new ResourcesScanner instance
func NewPodResourcesScanner(namespace string, podResourceClient podresourcesapi.PodResourcesListerClient, kubeApihelper apihelper.APIHelpers) (ResourcesScanner, error) {
func NewPodResourcesScanner(namespace string, podResourceClient podresourcesapi.PodResourcesListerClient, kubeApihelper apihelper.APIHelpers, podFingerprint bool) (ResourcesScanner, error) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also here we should consider to move args in a struct, but this can be deferred in a later PR

resourcemonitorInstance := &PodResourcesScanner{
namespace: namespace,
podResourceClient: podResourceClient,
apihelper: kubeApihelper,
podFingerprint: podFingerprint,
}
if resourcemonitorInstance.namespace != "*" {
klog.Infof("watching namespace %q", resourcemonitorInstance.namespace)
Expand Down Expand Up @@ -113,24 +118,43 @@ func hasIntegralCPUs(pod *corev1.Pod, container *corev1.Container) bool {
}

// Scan gathers all the PodResources from the system, using the podresources API client.
func (resMon *PodResourcesScanner) Scan() ([]PodResources, error) {
func (resMon *PodResourcesScanner) Scan() (ScanResponse, error) {
ctx, cancel := context.WithTimeout(context.Background(), defaultPodResourcesTimeout)
defer cancel()

// Pod Resource API client
resp, err := resMon.podResourceClient.List(ctx, &podresourcesapi.ListPodResourcesRequest{})
if err != nil {
return nil, fmt.Errorf("can't receive response: %v.Get(_) = _, %w", resMon.podResourceClient, err)
return ScanResponse{}, fmt.Errorf("can't receive response: %v.Get(_) = _, %w", resMon.podResourceClient, err)
}

respPodResources := resp.GetPodResources()
retVal := ScanResponse{
Attributes: v1alpha2.AttributeList{},
}

if resMon.podFingerprint && len(respPodResources) > 0 {
var status podfingerprint.Status
podFingerprintSign, err := computePodFingerprint(respPodResources, &status)
if err != nil {
klog.Errorf("podFingerprint: Unable to compute fingerprint %v", err)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we in this case remove the existing pod fingerprint attribute?

Copy link
Contributor

@ffromani ffromani Feb 20, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we in this case remove the existing pod fingerprint attribute?

I for myself I'm fine both ways. From the scheduler plugin perspective, clients consuming this attribute must tolerate either attribute disappearing (a previous update added, a later update removes) values and obsolete values.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here we are computing the ScanResponse. Podfingerprint attribute will not be added to the ScanResponse in case of an error.

Removing the existing attribute in the NTR should be done in the nfd-topology-updater's updateNodeResourceTopology function, when updating the attribute list.
That can be done but will go agains this (#1049 (comment)) and will require some changes in the updateAttribute function.
I understood that those changes will wait for the new utility functions in @ffromani 's PR

} else {
klog.Info("podFingerprint: " + status.Repr())

retVal.Attributes = append(retVal.Attributes, v1alpha2.AttributeInfo{
Name: podfingerprint.Attribute,
Value: podFingerprintSign,
})
}
}
var podResData []PodResources

for _, podResource := range resp.GetPodResources() {
for _, podResource := range respPodResources {
klog.Infof("podresource iter: %s", podResource.GetName())
hasDevice := hasDevice(podResource)
isWatchable, isIntegralGuaranteed, err := resMon.isWatchable(podResource.GetNamespace(), podResource.GetName(), hasDevice)
if err != nil {
return nil, fmt.Errorf("checking if pod in a namespace is watchable, namespace:%v, pod name %v: %v", podResource.GetNamespace(), podResource.GetName(), err)
return ScanResponse{}, fmt.Errorf("checking if pod in a namespace is watchable, namespace:%v, pod name %v: %v", podResource.GetNamespace(), podResource.GetName(), err)
}
if !isWatchable {
continue
Expand Down Expand Up @@ -198,7 +222,9 @@ func (resMon *PodResourcesScanner) Scan() ([]PodResources, error) {

}

return podResData, nil
retVal.PodResources = podResData

return retVal, nil
}

func hasDevice(podResource *podresourcesapi.PodResources) bool {
Expand All @@ -225,3 +251,14 @@ func getNumaNodeIds(topologyInfo *podresourcesapi.TopologyInfo) []int {

return topology
}

func computePodFingerprint(podResources []*podresourcesapi.PodResources, status *podfingerprint.Status) (string, error) {
fingerprint := podfingerprint.NewTracingFingerprint(len(podResources), status)
for _, podResource := range podResources {
err := fingerprint.Add(podResource.Namespace, podResource.Name)
if err != nil {
return "", err
}
}
return fingerprint.Sign(), nil
}
Loading