Skip to content

Commit

Permalink
cherry-pick schedule optimization (#369) (#401)
Browse files Browse the repository at this point in the history
schedule optimization (#369)

(cherry picked from commit 824a12a)
  • Loading branch information
MegaByte875 authored Nov 21, 2023
1 parent 52b9565 commit 7621ef0
Show file tree
Hide file tree
Showing 2 changed files with 208 additions and 10 deletions.
206 changes: 198 additions & 8 deletions pkg/scheduler/nodezone/node_zone.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,30 +19,42 @@ package nodezone
import (
"context"
"fmt"
"time"

corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/sets"
corelisters "k8s.io/client-go/listers/core/v1"
"k8s.io/klog/v2"
"k8s.io/kubernetes/pkg/scheduler/framework"
)

// NodeZone is a plugin that checks node zone.
type NodeZone struct {
cmLister corelisters.ConfigMapLister
handler framework.Handle
cmLister corelisters.ConfigMapLister
podLister corelisters.PodLister
nodeLister corelisters.NodeLister
}

var _ framework.PreFilterPlugin = &NodeZone{}
var _ framework.FilterPlugin = &NodeZone{}
var _ framework.PermitPlugin = &NodeZone{}
var _ framework.ReservePlugin = &NodeZone{}
var _ framework.EnqueueExtensions = &NodeZone{}

const (
ConfigMap framework.GVK = "ConfigMap"

// Name is the name of the plugin used in the plugin registry and configurations.
Name = "NodeZone"

// AvailableZones is the number failure domains over which we should spread.
AvailableZones = 3

// WaitTime is the max wait time in Permit Stage.
WaitTime = time.Second * 10

// preFilterStateKey is the key in CycleState to NodeZone pre-computed data.
// Using the name of the plugin will likely help us avoid collisions with other plugins.
preFilterStateKey = "PreFilter" + Name
Expand Down Expand Up @@ -129,6 +141,74 @@ func getPreFilterState(cycleState *framework.CycleState) (preFilterState, error)
return s, nil
}

func (pl *NodeZone) getPod(podName, namespace string) (*corev1.Pod, error) {
pod, err := pl.podLister.Pods(namespace).Get(podName)
if err != nil {
return nil, err
}
return pod, nil
}

func (pl *NodeZone) getNode(nodeName string) (*corev1.Node, error) {
node, err := pl.nodeLister.Get(nodeName)
if err != nil {
return nil, err
}
return node, nil
}

// getTopologyZones find all zones that represents a logical failure domain.
func (pl *NodeZone) getTopologyZones() ([]string, error) {
nodeInfo, err := pl.handler.SnapshotSharedLister().NodeInfos().List()
if err != nil {
return nil, fmt.Errorf("listing NodeInfos: %v", err)
}

zones := sets.New[string]()
for _, node := range nodeInfo {
if len(zones) == AvailableZones {
break
}
nodeZone, ok := node.Node().GetLabels()[corev1.LabelTopologyZone]
if !ok {
continue
}
zones.Insert(nodeZone)
}
sortedZones := sets.List(zones)
return sortedZones, nil
}

// activateSiblings stashes the pods belonging to the same workload of the given pod
// in the given state, with a reserved key "kubernetes.io/pods-to-activate".
func (pl *NodeZone) activateSiblings(pod *corev1.Pod, state *framework.CycleState) {
pods, err := pl.podLister.Pods(pod.Namespace).List(labels.SelectorFromSet(pod.GetLabels()))
if err != nil {
klog.ErrorS(err, "failed to list pods belong to a workload: %v", err)
return
}

for i := range pods {
if pods[i].UID == pod.UID {
pods = append(pods[:i], pods[i+1:]...)
break
}
}

if len(pods) != 0 {
if c, err := state.Read(framework.PodsToActivateKey); err == nil {
if s, ok := c.(*framework.PodsToActivate); ok {
s.Lock()
for _, pod := range pods {
namespacedName := getNamespacedName(pod)
s.Map[namespacedName] = pod
}
s.Unlock()
}
}
}
}

// Filter invoked at the filter extension point.
func (pl *NodeZone) Filter(ctx context.Context, cycleState *framework.CycleState, pod *corev1.Pod, nodeInfo *framework.NodeInfo) *framework.Status {
if len(pod.Spec.TopologySpreadConstraints) == 0 {
Expand All @@ -143,25 +223,130 @@ func (pl *NodeZone) Filter(ctx context.Context, cycleState *framework.CycleState
return framework.AsStatus(err)
}

zone, ok := nodeInfo.Node().GetLabels()[corev1.LabelTopologyZone]
nodeZone, ok := nodeInfo.Node().GetLabels()[corev1.LabelTopologyZone]
if !ok {
return framework.NewStatus(framework.UnschedulableAndUnresolvable, ErrReasonNoLabelTopologyZone)
}

stateZone := state[pod.Name]
if stateZone != "" && stateZone != zone {
klog.Infof("pod [%s/%s] not fit node %s", pod.Namespace, pod.Name, nodeInfo.Node().Name)
return framework.NewStatus(framework.UnschedulableAndUnresolvable, ErrReasonNotMatch)
if stateZone != "" {
if stateZone != nodeZone {
klog.V(5).Infof("pod [%s/%s] not fit node %s due to zone mapping exists", pod.Namespace, pod.Name, nodeInfo.Node().Name)
return framework.NewStatus(framework.UnschedulableAndUnresolvable, ErrReasonNotMatch)
}
return nil
}

zones, err := pl.getTopologyZones()
if err != nil {
return framework.AsStatus(err)
}
zoneIndex := make(map[string]int)
for index, zone := range zones {
zoneIndex[zone] = index
}

klog.V(5).Infof("available zones: %v", zones)

parentName, ordinal := getParentNameAndOrdinal(pod)
m := ordinal % AvailableZones
if m == 0 {
return nil
}

anchorOrdinal := ordinal - m
anchorName := getPodNameByOrdinal(parentName, anchorOrdinal)
anchorPod, err := pl.getPod(anchorName, pod.Namespace)
if err != nil {
if !apierrors.IsNotFound(err) {
return framework.AsStatus(err)
}
}

if anchorPod != nil && anchorPod.Spec.NodeName != "" {
anchorNode, err := pl.getNode(anchorPod.Spec.NodeName)
if s := getErrorAsStatus(err); !s.IsSuccess() {
return s
}
anchorZone, ok := anchorNode.GetLabels()[corev1.LabelTopologyZone]
if !ok {
return framework.NewStatus(framework.UnschedulableAndUnresolvable, ErrReasonNoLabelTopologyZone)
}
shift := zoneIndex[anchorZone]
klog.V(5).Infof("anchor pod %s zone shift %d", anchorName, shift)
idealZone := zones[(shift+m)%AvailableZones]
if idealZone != nodeZone {
klog.V(5).Infof("pod [%s/%s] not fit node %s in zone %s, ideal zone %s", pod.Namespace, pod.Name, nodeInfo.Node().Name, nodeZone, idealZone)
return framework.NewStatus(framework.UnschedulableAndUnresolvable, ErrReasonNotMatch)
}
}

return nil
}

// Permit is the functions invoked by the framework at "Permit" extension point.
func (pl *NodeZone) Permit(ctx context.Context, state *framework.CycleState, pod *corev1.Pod, nodeName string) (*framework.Status, time.Duration) {
if !needSchedule(pod.Name) {
return framework.NewStatus(framework.Success), 0
}

parentName, ordinal := getParentNameAndOrdinal(pod)
m := ordinal % AvailableZones
if m == 0 {
return framework.NewStatus(framework.Success), 0
}

anchorOrdinal := ordinal - m
anchorName := getPodNameByOrdinal(parentName, anchorOrdinal)
anchorPod, err := pl.getPod(anchorName, pod.Namespace)
if err != nil {
if !apierrors.IsNotFound(err) {
return framework.AsStatus(err), 0
}
}
if anchorPod == nil || anchorPod.Spec.NodeName == "" {
// We will also request to move the sibling pods back to activeQ.
pl.activateSiblings(pod, state)
return framework.NewStatus(framework.Wait), WaitTime
}
pl.handler.IterateOverWaitingPods(func(waitingPod framework.WaitingPod) {
wParentName, wOrdinal := getParentNameAndOrdinal(waitingPod.GetPod())
if wParentName == parentName && ordinal == wOrdinal {
klog.V(3).InfoS("Permit allows", "pod", klog.KObj(waitingPod.GetPod()))
waitingPod.Allow(pl.Name())
}
})
klog.V(3).InfoS("Permit allows", "pod", klog.KObj(pod))
return framework.NewStatus(framework.Success), 0
}

// Reserve is the functions invoked by the framework at "reserve" extension point.
func (pl *NodeZone) Reserve(ctx context.Context, state *framework.CycleState, pod *corev1.Pod, nodeName string) *framework.Status {
return nil
}

// Unreserve rejects all other adjacent Pods times out.
func (pl *NodeZone) Unreserve(ctx context.Context, state *framework.CycleState, pod *corev1.Pod, nodeName string) {
parentName, ordinal := getParentNameAndOrdinal(pod)
m := ordinal % AvailableZones
if m == 0 {
return
}

pl.handler.IterateOverWaitingPods(func(waitingPod framework.WaitingPod) {
wParentName, wOrdinal := getParentNameAndOrdinal(waitingPod.GetPod())
if wParentName == parentName && ordinal == wOrdinal {
klog.V(3).InfoS("Unreserve rejects", "pod", klog.KObj(waitingPod.GetPod()))
waitingPod.Reject(pl.Name(), "rejection in Unreserve")
}
})
}

// EventsToRegister returns the possible events that may make a Pod
// failed by this plugin schedulable.
func (pl *NodeZone) EventsToRegister() []framework.ClusterEvent {
return []framework.ClusterEvent{
{Resource: ConfigMap, ActionType: framework.Add},
{Resource: framework.Pod, ActionType: framework.All},
{Resource: framework.Node, ActionType: framework.All},
}
}
Expand All @@ -170,7 +355,12 @@ func (pl *NodeZone) EventsToRegister() []framework.ClusterEvent {
func New(_ runtime.Object, handle framework.Handle) (framework.Plugin, error) {
informerFactory := handle.SharedInformerFactory()
cmLister := informerFactory.Core().V1().ConfigMaps().Lister()
podLister := informerFactory.Core().V1().Pods().Lister()
nodeLister := informerFactory.Core().V1().Nodes().Lister()
return &NodeZone{
handle,
cmLister,
podLister,
nodeLister,
}, nil
}
12 changes: 10 additions & 2 deletions pkg/scheduler/nodezone/node_zone_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,9 @@ import (
"strconv"
"strings"

corev1 "k8s.io/api/core/v1"

"github.com/vesoft-inc/nebula-operator/apis/apps/v1alpha1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

var statefulPodRegex = regexp.MustCompile("(.*)-([0-9]+)$")
Expand Down Expand Up @@ -59,3 +59,11 @@ func needSchedule(podName string) bool {
return strings.Contains(podName, v1alpha1.GraphdComponentType.String()) ||
strings.Contains(podName, v1alpha1.StoragedComponentType.String())
}

func getPodNameByOrdinal(parentName string, ordinal int) string {
return fmt.Sprintf("%s-%d", parentName, ordinal)
}

func getNamespacedName(obj metav1.Object) string {
return fmt.Sprintf("%v/%v", obj.GetNamespace(), obj.GetName())
}

0 comments on commit 7621ef0

Please sign in to comment.