diff --git a/pkg/scheduler/nodezone/node_zone.go b/pkg/scheduler/nodezone/node_zone.go index e9dbfff9..4ab4d452 100644 --- a/pkg/scheduler/nodezone/node_zone.go +++ b/pkg/scheduler/nodezone/node_zone.go @@ -23,7 +23,6 @@ import ( corev1 "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" - "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/util/sets" corelisters "k8s.io/client-go/listers/core/v1" @@ -57,7 +56,7 @@ const ( // preFilterStateKey is the key in CycleState to NodeZone pre-computed data. // Using the name of the plugin will likely help us avoid collisions with other plugins. - preFilterStateKey = "PreFilter" + Name + preFilterStateKey framework.StateKey = "PreFilter" + Name // ErrReasonNoLabelTopologyZone is used for predicate error. ErrReasonNoLabelTopologyZone = "node(s) no topology zone label" @@ -179,36 +178,6 @@ func (pl *NodeZone) getTopologyZones() ([]string, error) { return sortedZones, nil } -// activateSiblings stashes the pods belonging to the same workload of the given pod -// in the given state, with a reserved key "kubernetes.io/pods-to-activate". -func (pl *NodeZone) activateSiblings(pod *corev1.Pod, state *framework.CycleState) { - pods, err := pl.podLister.Pods(pod.Namespace).List(labels.SelectorFromSet(pod.GetLabels())) - if err != nil { - klog.ErrorS(err, "failed to list pods belong to a workload: %v", err) - return - } - - for i := range pods { - if pods[i].UID == pod.UID { - pods = append(pods[:i], pods[i+1:]...) - break - } - } - - if len(pods) != 0 { - if c, err := state.Read(framework.PodsToActivateKey); err == nil { - if s, ok := c.(*framework.PodsToActivate); ok { - s.Lock() - for _, pod := range pods { - namespacedName := getNamespacedName(pod) - s.Map[namespacedName] = pod - } - s.Unlock() - } - } - } -} - // Filter invoked at the filter extension point. func (pl *NodeZone) Filter(ctx context.Context, cycleState *framework.CycleState, pod *corev1.Pod, nodeInfo *framework.NodeInfo) *framework.Status { if len(pod.Spec.TopologySpreadConstraints) == 0 { @@ -246,15 +215,15 @@ func (pl *NodeZone) Filter(ctx context.Context, cycleState *framework.CycleState zoneIndex[zone] = index } - klog.V(5).Infof("available zones: %v", zones) + klog.V(5).Infof("Available topology zones: %v", zones) parentName, ordinal := getParentNameAndOrdinal(pod) - m := ordinal % AvailableZones - if m == 0 { + remainder := ordinal % AvailableZones + if remainder == 0 { return nil } - anchorOrdinal := ordinal - m + anchorOrdinal := ordinal - remainder anchorName := getPodNameByOrdinal(parentName, anchorOrdinal) anchorPod, err := pl.getPod(anchorName, pod.Namespace) if err != nil { @@ -263,7 +232,7 @@ func (pl *NodeZone) Filter(ctx context.Context, cycleState *framework.CycleState } } - if anchorPod != nil && anchorPod.Spec.NodeName != "" { + if isPodScheduled(anchorPod) { anchorNode, err := pl.getNode(anchorPod.Spec.NodeName) if s := getErrorAsStatus(err); !s.IsSuccess() { return s @@ -273,10 +242,38 @@ func (pl *NodeZone) Filter(ctx context.Context, cycleState *framework.CycleState return framework.NewStatus(framework.UnschedulableAndUnresolvable, ErrReasonNoLabelTopologyZone) } shift := zoneIndex[anchorZone] - klog.V(5).Infof("anchor pod %s zone shift %d", anchorName, shift) - idealZone := zones[(shift+m)%AvailableZones] + klog.V(5).Infof("Anchor pod %s zone %s shift %d", anchorName, anchorZone, shift) + idealZone := zones[(shift+remainder)%AvailableZones] if idealZone != nodeZone { - klog.V(5).Infof("pod [%s/%s] not fit node %s in zone %s, ideal zone %s", pod.Namespace, pod.Name, nodeInfo.Node().Name, nodeZone, idealZone) + klog.V(5).Infof("Pod [%s/%s] not fit node %s in zone %s, ideal zone %s", pod.Namespace, pod.Name, nodeInfo.Node().Name, nodeZone, idealZone) + return framework.NewStatus(framework.UnschedulableAndUnresolvable, ErrReasonNotMatch) + } + } + + var siblingOrdinal int + if remainder == 1 { + siblingOrdinal = ordinal + 1 + } else if remainder == 2 { + siblingOrdinal = ordinal - 1 + } + siblingName := getPodNameByOrdinal(parentName, siblingOrdinal) + siblingPod, err := pl.getPod(siblingName, pod.Namespace) + if err != nil { + if !apierrors.IsNotFound(err) { + return framework.AsStatus(err) + } + } + if isPodScheduled(siblingPod) { + siblingNode, err := pl.getNode(siblingPod.Spec.NodeName) + if s := getErrorAsStatus(err); !s.IsSuccess() { + return s + } + siblingZone, ok := siblingNode.GetLabels()[corev1.LabelTopologyZone] + if !ok { + return framework.NewStatus(framework.UnschedulableAndUnresolvable, ErrReasonNoLabelTopologyZone) + } + if siblingZone == nodeZone { + klog.V(5).Infof("Sibling pod [%s/%s] exists in zone %s", pod.Namespace, siblingPod.Name, siblingZone) return framework.NewStatus(framework.UnschedulableAndUnresolvable, ErrReasonNotMatch) } } @@ -285,18 +282,18 @@ func (pl *NodeZone) Filter(ctx context.Context, cycleState *framework.CycleState } // Permit is the functions invoked by the framework at "Permit" extension point. -func (pl *NodeZone) Permit(ctx context.Context, state *framework.CycleState, pod *corev1.Pod, nodeName string) (*framework.Status, time.Duration) { +func (pl *NodeZone) Permit(ctx context.Context, cycleState *framework.CycleState, pod *corev1.Pod, nodeName string) (*framework.Status, time.Duration) { if !needSchedule(pod.Name) { return framework.NewStatus(framework.Success), 0 } parentName, ordinal := getParentNameAndOrdinal(pod) - m := ordinal % AvailableZones - if m == 0 { + remainder := ordinal % AvailableZones + if remainder == 0 { return framework.NewStatus(framework.Success), 0 } - anchorOrdinal := ordinal - m + anchorOrdinal := ordinal - remainder anchorName := getPodNameByOrdinal(parentName, anchorOrdinal) anchorPod, err := pl.getPod(anchorName, pod.Namespace) if err != nil { @@ -304,18 +301,26 @@ func (pl *NodeZone) Permit(ctx context.Context, state *framework.CycleState, pod return framework.AsStatus(err), 0 } } - if anchorPod == nil || anchorPod.Spec.NodeName == "" { - // We will also request to move the sibling pods back to activeQ. - pl.activateSiblings(pod, state) + if !isPodScheduled(anchorPod) { + klog.InfoS("Pod is waiting to be scheduled to node", "pod", klog.KObj(pod), "nodeName", nodeName) return framework.NewStatus(framework.Wait), WaitTime } - pl.handler.IterateOverWaitingPods(func(waitingPod framework.WaitingPod) { - wParentName, wOrdinal := getParentNameAndOrdinal(waitingPod.GetPod()) - if wParentName == parentName && ordinal == wOrdinal { - klog.V(3).InfoS("Permit allows", "pod", klog.KObj(waitingPod.GetPod())) - waitingPod.Allow(pl.Name()) - } - }) + + anchorNode, err := pl.getNode(anchorPod.Spec.NodeName) + if s := getErrorAsStatus(err); !s.IsSuccess() { + return s, 0 + } + anchorZone := anchorNode.GetLabels()[corev1.LabelTopologyZone] + assumedNode, err := pl.getNode(nodeName) + if s := getErrorAsStatus(err); !s.IsSuccess() { + return s, 0 + } + assumedZone := assumedNode.GetLabels()[corev1.LabelTopologyZone] + if anchorZone == assumedZone { + klog.V(5).Infof("Zone conflict, anchor pod [%s/%s] exists in zone %s", pod.Namespace, anchorName, anchorZone) + return framework.NewStatus(framework.Unschedulable, ErrReasonNotMatch), 0 + } + klog.V(3).InfoS("Permit allows", "pod", klog.KObj(pod)) return framework.NewStatus(framework.Success), 0 } @@ -328,14 +333,16 @@ func (pl *NodeZone) Reserve(ctx context.Context, state *framework.CycleState, po // Unreserve rejects all other adjacent Pods times out. func (pl *NodeZone) Unreserve(ctx context.Context, state *framework.CycleState, pod *corev1.Pod, nodeName string) { parentName, ordinal := getParentNameAndOrdinal(pod) - m := ordinal % AvailableZones - if m == 0 { + remainder := ordinal % AvailableZones + if remainder == 0 { return } + quotient := ordinal / AvailableZones pl.handler.IterateOverWaitingPods(func(waitingPod framework.WaitingPod) { wParentName, wOrdinal := getParentNameAndOrdinal(waitingPod.GetPod()) - if wParentName == parentName && ordinal == wOrdinal { + wQuotient := wOrdinal / AvailableZones + if waitingPod.GetPod().Namespace == pod.Namespace && wParentName == parentName && quotient == wQuotient { klog.V(3).InfoS("Unreserve rejects", "pod", klog.KObj(waitingPod.GetPod())) waitingPod.Reject(pl.Name(), "rejection in Unreserve") } diff --git a/pkg/scheduler/nodezone/node_zone_utils.go b/pkg/scheduler/nodezone/node_zone_utils.go index 08df549d..faa99a6a 100644 --- a/pkg/scheduler/nodezone/node_zone_utils.go +++ b/pkg/scheduler/nodezone/node_zone_utils.go @@ -67,3 +67,7 @@ func getPodNameByOrdinal(parentName string, ordinal int) string { func getNamespacedName(obj metav1.Object) string { return fmt.Sprintf("%v/%v", obj.GetNamespace(), obj.GetName()) } + +func isPodScheduled(pod *corev1.Pod) bool { + return pod != nil && pod.DeletionTimestamp == nil && pod.Spec.NodeName != "" +}