diff --git a/pkg/scheduler/nodezone/node_zone.go b/pkg/scheduler/nodezone/node_zone.go index 41b4a9ec..e9dbfff9 100644 --- a/pkg/scheduler/nodezone/node_zone.go +++ b/pkg/scheduler/nodezone/node_zone.go @@ -19,10 +19,13 @@ package nodezone import ( "context" "fmt" + "time" corev1 "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/util/sets" corelisters "k8s.io/client-go/listers/core/v1" "k8s.io/klog/v2" "k8s.io/kubernetes/pkg/scheduler/framework" @@ -30,19 +33,28 @@ import ( // NodeZone is a plugin that checks node zone. type NodeZone struct { - cmLister corelisters.ConfigMapLister + handler framework.Handle + cmLister corelisters.ConfigMapLister + podLister corelisters.PodLister + nodeLister corelisters.NodeLister } var _ framework.PreFilterPlugin = &NodeZone{} var _ framework.FilterPlugin = &NodeZone{} +var _ framework.PermitPlugin = &NodeZone{} +var _ framework.ReservePlugin = &NodeZone{} var _ framework.EnqueueExtensions = &NodeZone{} const ( - ConfigMap framework.GVK = "ConfigMap" - // Name is the name of the plugin used in the plugin registry and configurations. Name = "NodeZone" + // AvailableZones is the number failure domains over which we should spread. + AvailableZones = 3 + + // WaitTime is the max wait time in Permit Stage. + WaitTime = time.Second * 10 + // preFilterStateKey is the key in CycleState to NodeZone pre-computed data. // Using the name of the plugin will likely help us avoid collisions with other plugins. preFilterStateKey = "PreFilter" + Name @@ -129,6 +141,74 @@ func getPreFilterState(cycleState *framework.CycleState) (preFilterState, error) return s, nil } +func (pl *NodeZone) getPod(podName, namespace string) (*corev1.Pod, error) { + pod, err := pl.podLister.Pods(namespace).Get(podName) + if err != nil { + return nil, err + } + return pod, nil +} + +func (pl *NodeZone) getNode(nodeName string) (*corev1.Node, error) { + node, err := pl.nodeLister.Get(nodeName) + if err != nil { + return nil, err + } + return node, nil +} + +// getTopologyZones find all zones that represents a logical failure domain. +func (pl *NodeZone) getTopologyZones() ([]string, error) { + nodeInfo, err := pl.handler.SnapshotSharedLister().NodeInfos().List() + if err != nil { + return nil, fmt.Errorf("listing NodeInfos: %v", err) + } + + zones := sets.New[string]() + for _, node := range nodeInfo { + if len(zones) == AvailableZones { + break + } + nodeZone, ok := node.Node().GetLabels()[corev1.LabelTopologyZone] + if !ok { + continue + } + zones.Insert(nodeZone) + } + sortedZones := sets.List(zones) + return sortedZones, nil +} + +// activateSiblings stashes the pods belonging to the same workload of the given pod +// in the given state, with a reserved key "kubernetes.io/pods-to-activate". +func (pl *NodeZone) activateSiblings(pod *corev1.Pod, state *framework.CycleState) { + pods, err := pl.podLister.Pods(pod.Namespace).List(labels.SelectorFromSet(pod.GetLabels())) + if err != nil { + klog.ErrorS(err, "failed to list pods belong to a workload: %v", err) + return + } + + for i := range pods { + if pods[i].UID == pod.UID { + pods = append(pods[:i], pods[i+1:]...) + break + } + } + + if len(pods) != 0 { + if c, err := state.Read(framework.PodsToActivateKey); err == nil { + if s, ok := c.(*framework.PodsToActivate); ok { + s.Lock() + for _, pod := range pods { + namespacedName := getNamespacedName(pod) + s.Map[namespacedName] = pod + } + s.Unlock() + } + } + } +} + // Filter invoked at the filter extension point. func (pl *NodeZone) Filter(ctx context.Context, cycleState *framework.CycleState, pod *corev1.Pod, nodeInfo *framework.NodeInfo) *framework.Status { if len(pod.Spec.TopologySpreadConstraints) == 0 { @@ -143,25 +223,130 @@ func (pl *NodeZone) Filter(ctx context.Context, cycleState *framework.CycleState return framework.AsStatus(err) } - zone, ok := nodeInfo.Node().GetLabels()[corev1.LabelTopologyZone] + nodeZone, ok := nodeInfo.Node().GetLabels()[corev1.LabelTopologyZone] if !ok { return framework.NewStatus(framework.UnschedulableAndUnresolvable, ErrReasonNoLabelTopologyZone) } stateZone := state[pod.Name] - if stateZone != "" && stateZone != zone { - klog.Infof("pod [%s/%s] not fit node %s", pod.Namespace, pod.Name, nodeInfo.Node().Name) - return framework.NewStatus(framework.UnschedulableAndUnresolvable, ErrReasonNotMatch) + if stateZone != "" { + if stateZone != nodeZone { + klog.V(5).Infof("pod [%s/%s] not fit node %s due to zone mapping exists", pod.Namespace, pod.Name, nodeInfo.Node().Name) + return framework.NewStatus(framework.UnschedulableAndUnresolvable, ErrReasonNotMatch) + } + return nil } + zones, err := pl.getTopologyZones() + if err != nil { + return framework.AsStatus(err) + } + zoneIndex := make(map[string]int) + for index, zone := range zones { + zoneIndex[zone] = index + } + + klog.V(5).Infof("available zones: %v", zones) + + parentName, ordinal := getParentNameAndOrdinal(pod) + m := ordinal % AvailableZones + if m == 0 { + return nil + } + + anchorOrdinal := ordinal - m + anchorName := getPodNameByOrdinal(parentName, anchorOrdinal) + anchorPod, err := pl.getPod(anchorName, pod.Namespace) + if err != nil { + if !apierrors.IsNotFound(err) { + return framework.AsStatus(err) + } + } + + if anchorPod != nil && anchorPod.Spec.NodeName != "" { + anchorNode, err := pl.getNode(anchorPod.Spec.NodeName) + if s := getErrorAsStatus(err); !s.IsSuccess() { + return s + } + anchorZone, ok := anchorNode.GetLabels()[corev1.LabelTopologyZone] + if !ok { + return framework.NewStatus(framework.UnschedulableAndUnresolvable, ErrReasonNoLabelTopologyZone) + } + shift := zoneIndex[anchorZone] + klog.V(5).Infof("anchor pod %s zone shift %d", anchorName, shift) + idealZone := zones[(shift+m)%AvailableZones] + if idealZone != nodeZone { + klog.V(5).Infof("pod [%s/%s] not fit node %s in zone %s, ideal zone %s", pod.Namespace, pod.Name, nodeInfo.Node().Name, nodeZone, idealZone) + return framework.NewStatus(framework.UnschedulableAndUnresolvable, ErrReasonNotMatch) + } + } + + return nil +} + +// Permit is the functions invoked by the framework at "Permit" extension point. +func (pl *NodeZone) Permit(ctx context.Context, state *framework.CycleState, pod *corev1.Pod, nodeName string) (*framework.Status, time.Duration) { + if !needSchedule(pod.Name) { + return framework.NewStatus(framework.Success), 0 + } + + parentName, ordinal := getParentNameAndOrdinal(pod) + m := ordinal % AvailableZones + if m == 0 { + return framework.NewStatus(framework.Success), 0 + } + + anchorOrdinal := ordinal - m + anchorName := getPodNameByOrdinal(parentName, anchorOrdinal) + anchorPod, err := pl.getPod(anchorName, pod.Namespace) + if err != nil { + if !apierrors.IsNotFound(err) { + return framework.AsStatus(err), 0 + } + } + if anchorPod == nil || anchorPod.Spec.NodeName == "" { + // We will also request to move the sibling pods back to activeQ. + pl.activateSiblings(pod, state) + return framework.NewStatus(framework.Wait), WaitTime + } + pl.handler.IterateOverWaitingPods(func(waitingPod framework.WaitingPod) { + wParentName, wOrdinal := getParentNameAndOrdinal(waitingPod.GetPod()) + if wParentName == parentName && ordinal == wOrdinal { + klog.V(3).InfoS("Permit allows", "pod", klog.KObj(waitingPod.GetPod())) + waitingPod.Allow(pl.Name()) + } + }) + klog.V(3).InfoS("Permit allows", "pod", klog.KObj(pod)) + return framework.NewStatus(framework.Success), 0 +} + +// Reserve is the functions invoked by the framework at "reserve" extension point. +func (pl *NodeZone) Reserve(ctx context.Context, state *framework.CycleState, pod *corev1.Pod, nodeName string) *framework.Status { return nil } +// Unreserve rejects all other adjacent Pods times out. +func (pl *NodeZone) Unreserve(ctx context.Context, state *framework.CycleState, pod *corev1.Pod, nodeName string) { + parentName, ordinal := getParentNameAndOrdinal(pod) + m := ordinal % AvailableZones + if m == 0 { + return + } + + pl.handler.IterateOverWaitingPods(func(waitingPod framework.WaitingPod) { + wParentName, wOrdinal := getParentNameAndOrdinal(waitingPod.GetPod()) + if wParentName == parentName && ordinal == wOrdinal { + klog.V(3).InfoS("Unreserve rejects", "pod", klog.KObj(waitingPod.GetPod())) + waitingPod.Reject(pl.Name(), "rejection in Unreserve") + } + }) +} + // EventsToRegister returns the possible events that may make a Pod // failed by this plugin schedulable. func (pl *NodeZone) EventsToRegister() []framework.ClusterEvent { return []framework.ClusterEvent{ - {Resource: ConfigMap, ActionType: framework.Add}, + {Resource: framework.Pod, ActionType: framework.All}, {Resource: framework.Node, ActionType: framework.All}, } } @@ -170,7 +355,12 @@ func (pl *NodeZone) EventsToRegister() []framework.ClusterEvent { func New(_ runtime.Object, handle framework.Handle) (framework.Plugin, error) { informerFactory := handle.SharedInformerFactory() cmLister := informerFactory.Core().V1().ConfigMaps().Lister() + podLister := informerFactory.Core().V1().Pods().Lister() + nodeLister := informerFactory.Core().V1().Nodes().Lister() return &NodeZone{ + handle, cmLister, + podLister, + nodeLister, }, nil } diff --git a/pkg/scheduler/nodezone/node_zone_utils.go b/pkg/scheduler/nodezone/node_zone_utils.go index f42a8677..08df549d 100644 --- a/pkg/scheduler/nodezone/node_zone_utils.go +++ b/pkg/scheduler/nodezone/node_zone_utils.go @@ -22,9 +22,9 @@ import ( "strconv" "strings" - corev1 "k8s.io/api/core/v1" - "github.com/vesoft-inc/nebula-operator/apis/apps/v1alpha1" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) var statefulPodRegex = regexp.MustCompile("(.*)-([0-9]+)$") @@ -59,3 +59,11 @@ func needSchedule(podName string) bool { return strings.Contains(podName, v1alpha1.GraphdComponentType.String()) || strings.Contains(podName, v1alpha1.StoragedComponentType.String()) } + +func getPodNameByOrdinal(parentName string, ordinal int) string { + return fmt.Sprintf("%s-%d", parentName, ordinal) +} + +func getNamespacedName(obj metav1.Object) string { + return fmt.Sprintf("%v/%v", obj.GetNamespace(), obj.GetName()) +}