Skip to content

Commit

Permalink
nrt: logr: better integration with framework
Browse files Browse the repository at this point in the history
After more review and conversations, we have a better
understanding of how integration with contextual logging
should loom like.
First and foremost, injecting loggers would conflict with
the very goals of contextual logging.
So, let's drop this code we added in #710.

The contextual logger doesn't do key/values deduplication.
This is let to (some) backends. To avoid log clutter,
trim down the extra key/value pairs and add only those we
really need to ensure a good troubleshooting experience.

Still let's make sure to add critical key/value
pairs in the relevant entries, at cost of a possible
duplication.

When reporting the current assumed resources, the current
representation is neither concise nor very human friendly.
Additionally, multi-line log entries are harder to process
and should be avoided.
So let's move to a more concise representation, which turns
out not obviously less human friendly and is no longer multiline.

Review verbosiness of log entries.
Move down to verbose=2 logs which are really key to understand
the behavior. We should set a hard limit to log entries to minimize
the log spam while keeping at least some observability without
requiring v=4 or greater.

The level v=4 is usually/often the highest-not-spammy log.
When debug logs are needed we often set v=4, and higher verbosity
levels are often used only in desperate times.
Thus, promote to v=4 the debug logs we should really see.

Everywhere else in the kubernetes ecosystem, and most
notably in the scheduler, the pod namespace/name pair is called
"pod", while we called it "logID".
We do it to use the same name for all the flows, being the
cache resync (which is driven by time, not by an object) the
odd one.

It seems better to be externally consistent (with the ecosystem)
rather than internally consistent (all the flows in the same plugin),
so we rename "logID" to "pod" in the log entries.

Signed-off-by: Francesco Romani <[email protected]>
  • Loading branch information
ffromani committed May 6, 2024
1 parent 521dd8e commit 7098181
Show file tree
Hide file tree
Showing 13 changed files with 86 additions and 91 deletions.
6 changes: 3 additions & 3 deletions pkg/noderesourcetopology/cache/discardreserved.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ func (pt *DiscardReserved) NodeMaybeOverReserved(nodeName string, pod *corev1.Po
func (pt *DiscardReserved) NodeHasForeignPods(nodeName string, pod *corev1.Pod) {}

func (pt *DiscardReserved) ReserveNodeResources(nodeName string, pod *corev1.Pod) {
pt.lh.V(5).Info("NRT Reserve", "logID", logging.PodLogID(pod), "podUID", pod.GetUID(), "node", nodeName)
pt.lh.V(5).Info("NRT Reserve", logging.KeyPod, klog.KObj(pod), logging.KeyPodUID, logging.PodUID(pod), logging.KeyNode, nodeName)
pt.rMutex.Lock()
defer pt.rMutex.Unlock()

Expand All @@ -89,14 +89,14 @@ func (pt *DiscardReserved) ReserveNodeResources(nodeName string, pod *corev1.Pod
}

func (pt *DiscardReserved) UnreserveNodeResources(nodeName string, pod *corev1.Pod) {
pt.lh.V(5).Info("NRT Unreserve", "logID", klog.KObj(pod), "podUID", pod.GetUID(), "node", nodeName)
pt.lh.V(5).Info("NRT Unreserve", logging.KeyPod, klog.KObj(pod), logging.KeyPodUID, logging.PodUID(pod), logging.KeyNode, nodeName)

pt.removeReservationForNode(nodeName, pod)
}

// PostBind is invoked to cleanup reservationMap
func (pt *DiscardReserved) PostBind(nodeName string, pod *corev1.Pod) {
pt.lh.V(5).Info("NRT PostBind", "logID", klog.KObj(pod), "podUID", pod.GetUID(), "node", nodeName)
pt.lh.V(5).Info("NRT PostBind", logging.KeyPod, klog.KObj(pod), logging.KeyPodUID, logging.PodUID(pod), logging.KeyNode, nodeName)

pt.removeReservationForNode(nodeName, pod)
}
Expand Down
3 changes: 2 additions & 1 deletion pkg/noderesourcetopology/cache/foreign_pods.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/util/sets"
k8scache "k8s.io/client-go/tools/cache"
"k8s.io/klog/v2"

"sigs.k8s.io/scheduler-plugins/pkg/noderesourcetopology/logging"
"sigs.k8s.io/scheduler-plugins/pkg/noderesourcetopology/resourcerequests"
Expand Down Expand Up @@ -50,7 +51,7 @@ func SetupForeignPodsDetector(lh logr.Logger, schedProfileName string, podInform
}

cc.NodeHasForeignPods(pod.Spec.NodeName, pod)
lh.V(6).Info("detected foreign pods", "logID", logging.PodLogID(pod), "podUID", pod.GetUID(), "node", pod.Spec.NodeName)
lh.V(6).Info("detected foreign pods", logging.KeyPod, klog.KObj(pod), logging.KeyPodUID, logging.PodUID(pod), logging.KeyNode, pod.Spec.NodeName)
}

podInformer.AddEventHandler(k8scache.ResourceEventHandlerFuncs{
Expand Down
65 changes: 31 additions & 34 deletions pkg/noderesourcetopology/cache/overreserve.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/types"
podlisterv1 "k8s.io/client-go/listers/core/v1"
"k8s.io/klog/v2"

ctrlclient "sigs.k8s.io/controller-runtime/pkg/client"

Expand Down Expand Up @@ -64,12 +65,11 @@ func NewOverReserve(ctx context.Context, lh logr.Logger, cfg *apiconfig.NodeReso
resyncMethod := getCacheResyncMethod(lh, cfg)

nrtObjs := &topologyv1alpha2.NodeResourceTopologyList{}
// TODO: we should pass-in a context in the future
if err := client.List(ctx, nrtObjs); err != nil {
return nil, err
}

lh.V(3).Info("initializing", "noderesourcetopologies", len(nrtObjs.Items), "method", resyncMethod)
lh.V(2).Info("initializing", "noderesourcetopologies", len(nrtObjs.Items), "method", resyncMethod)
obj := &OverReserve{
lh: lh,
client: client,
Expand Down Expand Up @@ -100,11 +100,11 @@ func (ov *OverReserve) GetCachedNRTCopy(ctx context.Context, nodeName string, po
return nrt, true
}

logID := logging.PodLogID(pod)
lh := ov.lh.WithValues("logID", logID, "podUID", pod.GetUID(), "node", nodeName)
logID := klog.KObj(pod)
lh := ov.lh.WithValues(logging.KeyPod, logID, logging.KeyPodUID, logging.PodUID(pod), logging.KeyNode, nodeName)

lh.V(6).Info("NRT", "fromcache", stringify.NodeResourceTopologyResources(nrt))
nodeAssumedResources.UpdateNRT(logID, nrt)
nodeAssumedResources.UpdateNRT(nrt, logging.KeyPod, logID)

lh.V(5).Info("NRT", "withassumed", stringify.NodeResourceTopologyResources(nrt))
return nrt, true
Expand All @@ -114,23 +114,23 @@ func (ov *OverReserve) NodeMaybeOverReserved(nodeName string, pod *corev1.Pod) {
ov.lock.Lock()
defer ov.lock.Unlock()
val := ov.nodesMaybeOverreserved.Incr(nodeName)
ov.lh.V(4).Info("mark discarded", "node", nodeName, "count", val)
ov.lh.V(4).Info("mark discarded", logging.KeyNode, nodeName, "count", val)
}

func (ov *OverReserve) NodeHasForeignPods(nodeName string, pod *corev1.Pod) {
lh := ov.lh.WithValues("logID", logging.PodLogID(pod), "podUID", pod.GetUID(), "node", nodeName)
lh := ov.lh.WithValues(logging.KeyPod, klog.KObj(pod), logging.KeyPodUID, logging.PodUID(pod), logging.KeyNode, nodeName)
ov.lock.Lock()
defer ov.lock.Unlock()
if !ov.nrts.Contains(nodeName) {
lh.V(5).Info("ignoring foreign pods", "nrtinfo", "missing")
return
}
val := ov.nodesWithForeignPods.Incr(nodeName)
lh.V(4).Info("marked with foreign pods", "count", val)
lh.V(2).Info("marked with foreign pods", logging.KeyNode, nodeName, "count", val)
}

func (ov *OverReserve) ReserveNodeResources(nodeName string, pod *corev1.Pod) {
lh := ov.lh.WithValues("logID", logging.PodLogID(pod), "podUID", pod.GetUID(), "node", nodeName)
lh := ov.lh.WithValues(logging.KeyPod, klog.KObj(pod), logging.KeyPodUID, logging.PodUID(pod), logging.KeyNode, nodeName)
ov.lock.Lock()
defer ov.lock.Unlock()
nodeAssumedResources, ok := ov.assumedResources[nodeName]
Expand All @@ -140,26 +140,26 @@ func (ov *OverReserve) ReserveNodeResources(nodeName string, pod *corev1.Pod) {
}

nodeAssumedResources.AddPod(pod)
lh.V(5).Info("post reserve", "assumedResources", nodeAssumedResources.String())
lh.V(2).Info("post reserve", logging.KeyNode, nodeName, "assumedResources", nodeAssumedResources.String())

ov.nodesMaybeOverreserved.Delete(nodeName)
lh.V(6).Info("reset discard counter")
lh.V(6).Info("reset discard counter", logging.KeyNode, nodeName)
}

func (ov *OverReserve) UnreserveNodeResources(nodeName string, pod *corev1.Pod) {
lh := ov.lh.WithValues("logID", logging.PodLogID(pod), "podUID", pod.GetUID(), "node", nodeName)
lh := ov.lh.WithValues(logging.KeyPod, klog.KObj(pod), logging.KeyPodUID, logging.PodUID(pod), logging.KeyNode, nodeName)
ov.lock.Lock()
defer ov.lock.Unlock()
nodeAssumedResources, ok := ov.assumedResources[nodeName]
if !ok {
// this should not happen, so we're vocal about it
// we don't return error because not much to do to recover anyway
lh.V(3).Info("no resources tracked")
lh.V(2).Info("no resources tracked", logging.KeyNode, nodeName)
return
}

nodeAssumedResources.DeletePod(pod)
lh.V(5).Info("post release", "assumedResources", nodeAssumedResources.String())
lh.V(2).Info("post unreserve", logging.KeyNode, nodeName, "assumedResources", nodeAssumedResources.String())
}

// NodesMaybeOverReserved returns a slice of all the node names which have been discarded previously,
Expand Down Expand Up @@ -201,81 +201,78 @@ func (ov *OverReserve) NodesMaybeOverReserved(lh logr.Logger) []string {
// too aggressive resync attempts, so to more, likely unnecessary, computation work on the scheduler side.
func (ov *OverReserve) Resync() {
// we are not working with a specific pod, so we need a unique key to track this flow
lh := ov.lh.WithValues("logID", logging.TimeLogID(), "flow", logging.FlowCacheSync)
lh.V(4).Info(logging.FlowBegin)
defer lh.V(4).Info(logging.FlowEnd)
lh_ := ov.lh.WithName(logging.FlowCacheSync).WithValues(logging.KeyLogID, logging.TimeLogID())
lh_.V(4).Info(logging.FlowBegin)
defer lh_.V(4).Info(logging.FlowEnd)

nodeNames := ov.NodesMaybeOverReserved(lh)
nodeNames := ov.NodesMaybeOverReserved(lh_)
// avoid as much as we can unnecessary work and logs.
if len(nodeNames) == 0 {
lh.V(6).Info("no dirty nodes detected")
lh_.V(5).Info("no dirty nodes detected")
return
}

// node -> pod identifier (namespace, name)
nodeToObjsMap, err := makeNodeToPodDataMap(lh, ov.podLister, ov.isPodRelevant)
nodeToObjsMap, err := makeNodeToPodDataMap(lh_, ov.podLister, ov.isPodRelevant)
if err != nil {
lh.Error(err, "cannot find the mapping between running pods and nodes")
lh_.Error(err, "cannot find the mapping between running pods and nodes")
return
}

lh.V(6).Info("resync NodeTopology cache starting")
defer lh.V(6).Info("resync NodeTopology cache complete")

var nrtUpdates []*topologyv1alpha2.NodeResourceTopology
for _, nodeName := range nodeNames {
lh = lh.WithValues("node", nodeName)
lh := lh_.WithValues(logging.KeyNode, nodeName)

nrtCandidate := &topologyv1alpha2.NodeResourceTopology{}
if err := ov.client.Get(context.Background(), types.NamespacedName{Name: nodeName}, nrtCandidate); err != nil {
lh.V(3).Info("failed to get NodeTopology", "error", err)
lh.V(2).Info("failed to get NodeTopology", "error", err)
continue
}
if nrtCandidate == nil {
lh.V(3).Info("missing NodeTopology")
lh.V(2).Info("missing NodeTopology")
continue
}

objs, ok := nodeToObjsMap[nodeName]
if !ok {
// this really should never happen
lh.V(3).Info("cannot find any pod for node")
lh.Info("cannot find any pod for node")
continue
}

pfpExpected, onlyExclRes := podFingerprintForNodeTopology(nrtCandidate, ov.resyncMethod)
if pfpExpected == "" {
lh.V(3).Info("missing NodeTopology podset fingerprint data")
lh.V(2).Info("missing NodeTopology podset fingerprint data")
continue
}

lh.V(6).Info("trying to sync NodeTopology", "fingerprint", pfpExpected, "onlyExclusiveResources", onlyExclRes)
lh.V(4).Info("trying to sync NodeTopology", "fingerprint", pfpExpected, "onlyExclusiveResources", onlyExclRes)

err = checkPodFingerprintForNode(lh, objs, nodeName, pfpExpected, onlyExclRes)
if errors.Is(err, podfingerprint.ErrSignatureMismatch) {
// can happen, not critical
lh.V(5).Info("NodeTopology podset fingerprint mismatch")
lh.V(4).Info("NodeTopology podset fingerprint mismatch")
continue
}
if err != nil {
// should never happen, let's be vocal
lh.V(3).Error(err, "checking NodeTopology podset fingerprint")
lh.Error(err, "checking NodeTopology podset fingerprint")
continue
}

lh.V(4).Info("overriding cached info")
nrtUpdates = append(nrtUpdates, nrtCandidate)
}

ov.FlushNodes(lh, nrtUpdates...)
ov.FlushNodes(lh_, nrtUpdates...)
}

// FlushNodes drops all the cached information about a given node, resetting its state clean.
func (ov *OverReserve) FlushNodes(lh logr.Logger, nrts ...*topologyv1alpha2.NodeResourceTopology) {
ov.lock.Lock()
defer ov.lock.Unlock()
for _, nrt := range nrts {
lh.V(4).Info("flushing", "node", nrt.Name)
lh.V(2).Info("flushing", logging.KeyNode, nrt.Name)
ov.nrts.Update(nrt)
delete(ov.assumedResources, nrt.Name)
ov.nodesMaybeOverreserved.Delete(nrt.Name)
Expand Down
10 changes: 6 additions & 4 deletions pkg/noderesourcetopology/cache/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/k8stopologyawareschedwg/podfingerprint"

apiconfig "sigs.k8s.io/scheduler-plugins/apis/config"
"sigs.k8s.io/scheduler-plugins/pkg/noderesourcetopology/logging"
"sigs.k8s.io/scheduler-plugins/pkg/noderesourcetopology/stringify"
"sigs.k8s.io/scheduler-plugins/pkg/util"
)
Expand Down Expand Up @@ -91,7 +92,7 @@ func newResourceStore(lh logr.Logger) *resourceStore {
func (rs *resourceStore) String() string {
var sb strings.Builder
for podKey, podRes := range rs.data {
sb.WriteString(" " + podKey + ": " + stringify.ResourceList(podRes) + "\n")
sb.WriteString(podKey + "::[" + stringify.ResourceList(podRes) + "];")
}
return sb.String()
}
Expand Down Expand Up @@ -125,7 +126,7 @@ func (rs *resourceStore) DeletePod(pod *corev1.Pod) bool {

// UpdateNRT updates the provided Node Resource Topology object with the resources tracked in this store,
// performing pessimistic overallocation across all the NUMA zones.
func (rs *resourceStore) UpdateNRT(logID string, nrt *topologyv1alpha2.NodeResourceTopology) {
func (rs *resourceStore) UpdateNRT(nrt *topologyv1alpha2.NodeResourceTopology, logKeysAndValues ...any) {
for key, res := range rs.data {
// We cannot predict on which Zone the workload will be placed.
// And we should totally not guess. So the only safe (and conservative)
Expand All @@ -146,7 +147,8 @@ func (rs *resourceStore) UpdateNRT(logID string, nrt *topologyv1alpha2.NodeResou
if zr.Available.Cmp(qty) < 0 {
// this should happen rarely, and it is likely caused by
// a bug elsewhere.
rs.lh.V(3).Info("cannot decrement resource", "logID", logID, "zone", zr.Name, "node", nrt.Name, "available", zr.Available, "requestor", key, "quantity", qty.String())
logKeysAndValues = append(logKeysAndValues, "zone", zr.Name, logging.KeyNode, nrt.Name, "available", zr.Available, "requestor", key, "quantity", qty.String())
rs.lh.V(3).Info("cannot decrement resource", logKeysAndValues...)
zr.Available = resource.Quantity{}
continue
}
Expand Down Expand Up @@ -239,7 +241,7 @@ func checkPodFingerprintForNode(lh logr.Logger, objs []podData, nodeName, pfpExp
}
pfpComputed := pfp.Sign()

lh.V(5).Info("podset fingerprint check", "expected", pfpExpected, "computed", pfpComputed, "onlyExclusiveResources", onlyExclRes)
lh.V(4).Info("podset fingerprint check", "expected", pfpExpected, "computed", pfpComputed, "onlyExclusiveResources", onlyExclRes)
lh.V(6).Info("podset fingerprint debug", "status", st.Repr())

err := pfp.Check(pfpExpected)
Expand Down
2 changes: 1 addition & 1 deletion pkg/noderesourcetopology/cache/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -516,7 +516,7 @@ func TestResourceStoreUpdate(t *testing.T) {
}

logID := "testResourceStoreUpdate"
rs.UpdateNRT(logID, nrt)
rs.UpdateNRT(nrt, "logID", logID)

cpuInfo0 := findResourceInfo(nrt.Zones[0].Resources, cpu)
if cpuInfo0.Capacity.Cmp(resource.MustParse("20")) != 0 {
Expand Down
14 changes: 9 additions & 5 deletions pkg/noderesourcetopology/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (

v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
"k8s.io/klog/v2"
v1helper "k8s.io/kubernetes/pkg/apis/core/v1/helper"
v1qos "k8s.io/kubernetes/pkg/apis/core/v1/helper/qos"
kubeletconfig "k8s.io/kubernetes/pkg/kubelet/apis/config"
Expand Down Expand Up @@ -80,6 +81,7 @@ func singleNUMAContainerLevelHandler(lh logr.Logger, pod *v1.Pod, zones topology
// this is necessary, so we won't allocate the same resources for the upcoming containers
subtractFromNUMA(lh, nodes, numaID, container)
}
lh.V(2).Info("can align all containers")
return nil
}

Expand All @@ -105,7 +107,7 @@ func resourcesAvailableInAnyNUMANodes(lh logr.Logger, numaNodes NUMANodeList, re
// some resources may not expose NUMA affinity (device plugins, extended resources), but all resources
// must be reported at node level; thus, if they are not present at node level, we can safely assume
// we don't have the resource at all.
lh.V(5).Info("early verdict: cannot meet request", "resource", resource, "suitable", "false")
lh.V(2).Info("early verdict: cannot meet request", "resource", resource, "suitable", "false")
return numaID, false
}

Expand Down Expand Up @@ -137,7 +139,7 @@ func resourcesAvailableInAnyNUMANodes(lh logr.Logger, numaNodes NUMANodeList, re

bitmask.And(resourceBitmask)
if bitmask.IsEmpty() {
lh.V(5).Info("early verdict", "resource", resource, "suitable", "false")
lh.V(2).Info("early verdict", "resource", resource, "suitable", "false")
return numaID, false
}
}
Expand All @@ -149,7 +151,7 @@ func resourcesAvailableInAnyNUMANodes(lh logr.Logger, numaNodes NUMANodeList, re

// at least one NUMA node is available
ret := !bitmask.IsEmpty()
lh.V(5).Info("final verdict", "suitable", ret)
lh.V(2).Info("final verdict", "suitable", ret)
return numaID, ret
}

Expand Down Expand Up @@ -187,6 +189,7 @@ func singleNUMAPodLevelHandler(lh logr.Logger, pod *v1.Pod, zones topologyv1alph
lh.V(2).Info("cannot align pod", "name", pod.Name)
return framework.NewStatus(framework.Unschedulable, "cannot align pod")
}
lh.V(2).Info("can align pod")
return nil
}

Expand All @@ -201,7 +204,8 @@ func (tm *TopologyMatch) Filter(ctx context.Context, cycleState *framework.Cycle

nodeName := nodeInfo.Node().Name

lh := logging.Log().WithValues(logging.KeyLogID, logging.PodLogID(pod), logging.KeyPodUID, pod.GetUID(), logging.KeyNode, nodeName, logging.KeyFlow, logging.FlowFilter)
lh := klog.FromContext(ctx).WithValues(logging.KeyPod, klog.KObj(pod), logging.KeyPodUID, logging.PodUID(pod), logging.KeyNode, nodeName)

lh.V(4).Info(logging.FlowBegin)
defer lh.V(4).Info(logging.FlowEnd)

Expand All @@ -214,7 +218,7 @@ func (tm *TopologyMatch) Filter(ctx context.Context, cycleState *framework.Cycle
return nil
}

lh.V(5).Info("found nrt data", "object", stringify.NodeResourceTopologyResources(nodeTopology))
lh.V(4).Info("found nrt data", "object", stringify.NodeResourceTopologyResources(nodeTopology))

handler := filterHandlerFromTopologyManagerConfig(topologyManagerConfigFromNodeResourceTopology(lh, nodeTopology))
if handler == nil {
Expand Down
Loading

0 comments on commit 7098181

Please sign in to comment.