Skip to content

Commit

Permalink
nfd-gc: Remove stale NRT objects
Browse files Browse the repository at this point in the history
Remove stale NRT objects whose creator pod does not exist anymore.

Signed-off-by: Oleg Zhurakivskyy <[email protected]>
  • Loading branch information
ozhuraki committed Oct 15, 2024
1 parent 91f631b commit b5fee22
Show file tree
Hide file tree
Showing 5 changed files with 95 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,10 @@ spec:
valueFrom:
fieldRef:
fieldPath: status.hostIP
- name: POD_NAME
valueFrom:
fieldRef:
fieldPath: metadata.name
{{- with .Values.topologyUpdater.extraEnvs }}
{{- toYaml . | nindent 8 }}
{{- end}}
Expand Down
31 changes: 30 additions & 1 deletion pkg/nfd-gc/nfd-gc.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package nfdgarbagecollector
import (
"context"
"fmt"
"strings"
"time"

topologyv1alpha2 "github.com/k8stopologyawareschedwg/noderesourcetopology-api/pkg/apis/topology/v1alpha2"
Expand All @@ -34,6 +35,7 @@ import (
"k8s.io/klog/v2"

nfdv1alpha1 "sigs.k8s.io/node-feature-discovery/api/nfd/v1alpha1"
"sigs.k8s.io/node-feature-discovery/pkg/nfd-topology-updater"
"sigs.k8s.io/node-feature-discovery/pkg/utils"
"sigs.k8s.io/node-feature-discovery/pkg/version"
)
Expand All @@ -42,6 +44,7 @@ var (
gvrNF = nfdv1alpha1.SchemeGroupVersion.WithResource("nodefeatures")
gvrNRT = topologyv1alpha2.SchemeGroupVersion.WithResource("noderesourcetopologies")
gvrNode = corev1.SchemeGroupVersion.WithResource("nodes")
gvrPod = corev1.SchemeGroupVersion.WithResource("pods")
)

// Args are the command line arguments
Expand All @@ -61,6 +64,8 @@ type nfdGarbageCollector struct {
stopChan chan struct{}
client metadataclient.Interface
factory metadatainformer.SharedInformerFactory
// gcNRTs holds owner-pod and namespace information for detecting stale NRT objects
gcNRTs map[string]string
}

func New(args *Args) (NfdGarbageCollector, error) {
Expand All @@ -76,6 +81,7 @@ func New(args *Args) (NfdGarbageCollector, error) {
stopChan: make(chan struct{}),
client: cli,
factory: metadatainformer.NewSharedInformerFactory(cli, 0),
gcNRTs: make(map[string]string),
}, nil
}

Expand Down Expand Up @@ -190,7 +196,30 @@ func (n *nfdGarbageCollector) garbageCollect() {

// Handle NodeResourceTopology objects
listAndHandle(gvrNRT, func(meta metav1.PartialObjectMetadata) {
if !nodeNames.Has(meta.Name) {
deleteNRT := false

if objRef, ok := meta.Annotations[nfdtopologyupdater.NRTOwnerPodAnnotation]; ok {
if s := strings.Split(objRef, "/"); len(s) == 2 {
pod := s[1]
_, err := n.client.Resource(gvrPod).Get(context.TODO(), pod, metav1.GetOptions{})
if errors.IsNotFound(err) {
if val, ok := n.gcNRTs[meta.Name]; !ok {
n.gcNRTs[meta.Name] = objRef
} else {
if val != objRef {
n.gcNRTs[meta.Name] = objRef
} else {
delete(n.gcNRTs, meta.Name)
deleteNRT = true
}
}
} else if err != nil {
klog.ErrorS(err, "failed to get Pod object")
}
}
}

if !nodeNames.Has(meta.Name) || deleteNRT {
n.deleteNRT(meta.Name)
}
})
Expand Down
57 changes: 50 additions & 7 deletions pkg/nfd-gc/nfd-gc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ import (

func TestNRTGC(t *testing.T) {
Convey("When theres is old NRT ", t, func() {
gc := newMockGC(nil, []string{"node1"})
gc := newMockGC(nil, []string{"node1"}, []string{"pod1"})

errChan := make(chan error)
go func() { errChan <- gc.Run() }()
Expand All @@ -47,7 +47,7 @@ func TestNRTGC(t *testing.T) {
So(<-errChan, ShouldBeNil)
})
Convey("When theres is one old NRT and one up to date", t, func() {
gc := newMockGC([]string{"node1"}, []string{"node1", "node2"})
gc := newMockGC([]string{"node1"}, []string{"node1", "node2"}, []string{"pod1", "pod2"})

errChan := make(chan error)
go func() { errChan <- gc.Run() }()
Expand All @@ -58,7 +58,7 @@ func TestNRTGC(t *testing.T) {
So(<-errChan, ShouldBeNil)
})
Convey("Should react to delete event", t, func() {
gc := newMockGC([]string{"node1", "node2"}, []string{"node1", "node2"})
gc := newMockGC([]string{"node1", "node2"}, []string{"node1", "node2"}, []string{"pod1", "pod2"})

errChan := make(chan error)
go func() { errChan <- gc.Run() }()
Expand All @@ -70,7 +70,7 @@ func TestNRTGC(t *testing.T) {
So(waitForNRT(gc.client, "node2"), ShouldBeTrue)
})
Convey("periodic GC should remove obsolete NRT", t, func() {
gc := newMockGC([]string{"node1", "node2"}, []string{"node1", "node2"})
gc := newMockGC([]string{"node1", "node2"}, []string{"node1", "node2"}, []string{"pod1", "pod2"})
// Override period to run fast
gc.args.GCPeriod = 100 * time.Millisecond

Expand All @@ -85,16 +85,38 @@ func TestNRTGC(t *testing.T) {

So(waitForNRT(gc.client, "node1", "node2"), ShouldBeTrue)
})
Convey("periodic GC should remove stale NRT", t, func() {
gc := newMockGC([]string{"node1", "node2"}, []string{"node1", "node2"}, []string{"pod1", "pod2"})
// Override period to run fast
gc.args.GCPeriod = 100 * time.Millisecond

nrt := createPartialObjectMetadata("topology.node.k8s.io/v1alpha2", "NodeResourceTopology", "", "not-existing")
nrt.ObjectMeta.Annotations = map[string]string{"owner-pod": "pod4"}

errChan := make(chan error)
go func() { errChan <- gc.Run() }()

gvr := topologyv1alpha2.SchemeGroupVersion.WithResource("noderesourcetopologies")
_, err := gc.client.Resource(gvr).(fake.MetadataClient).CreateFake(nrt, metav1.CreateOptions{})
So(err, ShouldBeNil)

So(waitForNrtPodsGC(gc.client, "pod1", "pod2"), ShouldBeTrue)
})
}

func newMockGC(nodes, nrts []string) *mockGC {
func newMockGC(nodes, nrts, pods []string) *mockGC {
// Create fake objects
objs := []runtime.Object{}
for _, name := range pods {
objs = append(objs, createPartialObjectMetadata("v1", "Pod", "", name))
}
for _, name := range nodes {
objs = append(objs, createPartialObjectMetadata("v1", "Node", "", name))
}
for _, name := range nrts {
objs = append(objs, createPartialObjectMetadata("topology.node.k8s.io/v1alpha2", "NodeResourceTopology", "", name))
for i, name := range nrts {
nrt := createPartialObjectMetadata("topology.node.k8s.io/v1alpha2", "NodeResourceTopology", "", name)
nrt.ObjectMeta.Annotations = map[string]string{"owner-pod": pods[i]}
objs = append(objs, nrt)
}

scheme := fake.NewTestScheme()
Expand All @@ -108,6 +130,7 @@ func newMockGC(nodes, nrts []string) *mockGC {
args: &Args{
GCPeriod: 10 * time.Minute,
},
gcNRTs: make(map[string]string),
},
client: cli,
}
Expand Down Expand Up @@ -151,3 +174,23 @@ func waitForNRT(cli metadataclient.Interface, names ...string) bool {
}
return false
}

func waitForNrtPodsGC(cli metadataclient.Interface, pods ...string) bool {
podsSet := sets.NewString(pods...)
gvr := topologyv1alpha2.SchemeGroupVersion.WithResource("noderesourcetopologies")
for i := 0; i < 2; i++ {
rsp, err := cli.Resource(gvr).List(context.TODO(), metav1.ListOptions{})
So(err, ShouldBeNil)

nrtPods := sets.NewString()
for _, nrt := range rsp.Items {
nrtPods.Insert(nrt.Annotations["owner-pod"])
}

if nrtPods.Equal(podsSet) {
return true
}
time.Sleep(1 * time.Second)
}
return false
}
3 changes: 3 additions & 0 deletions pkg/nfd-topology-updater/nfd-topology-updater.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ const (
TopologyManagerPolicyAttributeName = "topologyManagerPolicy"
// TopologyManagerScopeAttributeName represents an attribute which defines Topology Manager Policy Scope
TopologyManagerScopeAttributeName = "topologyManagerScope"
NRTOwnerPodAnnotation = "nfd.node.kubernetes.io/owner-pod"
)

// Args are the command line arguments
Expand Down Expand Up @@ -294,6 +295,7 @@ func (w *nfdTopologyUpdater) updateNodeResourceTopology(zoneInfo v1alpha2.ZoneLi
nrtNew := v1alpha2.NodeResourceTopology{
ObjectMeta: metav1.ObjectMeta{
Name: w.nodeName,
Annotations: map[string]string{NRTOwnerPodAnnotation: fmt.Sprintf("%s/%s", w.kubernetesNamespace, os.Getenv("POD_NAME"))},
OwnerReferences: w.ownerRefs,
},
Zones: zoneInfo,
Expand All @@ -317,6 +319,7 @@ func (w *nfdTopologyUpdater) updateNodeResourceTopology(zoneInfo v1alpha2.ZoneLi
nrtMutated := nrt.DeepCopy()
nrtMutated.Zones = zoneInfo
nrtMutated.OwnerReferences = w.ownerRefs
nrtMutated.Annotations[NRTOwnerPodAnnotation] = fmt.Sprintf("%s/%s", w.kubernetesNamespace, os.Getenv("POD_NAME"))

attributes := scanResponse.Attributes

Expand Down
8 changes: 8 additions & 0 deletions test/e2e/utils/pod/pod.go
Original file line number Diff line number Diff line change
Expand Up @@ -417,6 +417,14 @@ func NFDTopologyUpdaterSpec(kc utils.KubeletConfig, opts ...SpecOption) *corev1.
},
},
},
{
Name: "POD_NAME",
ValueFrom: &corev1.EnvVarSource{
FieldRef: &corev1.ObjectFieldSelector{
FieldPath: "metadata.name",
},
},
},
},
SecurityContext: &corev1.SecurityContext{
Capabilities: &corev1.Capabilities{
Expand Down

0 comments on commit b5fee22

Please sign in to comment.