diff --git a/pkg/controller/workloadspread/workloadspread_controller.go b/pkg/controller/workloadspread/workloadspread_controller.go index a3fe0308b3..716de82849 100644 --- a/pkg/controller/workloadspread/workloadspread_controller.go +++ b/pkg/controller/workloadspread/workloadspread_controller.go @@ -364,7 +364,7 @@ func (r *ReconcileWorkloadSpread) groupPod(ws *appsv1alpha1.WorkloadSpread, pods // count managed pods for each subset for i := range pods { injectWS := getInjectWorkloadSpreadFromPod(pods[i]) - if injectWS == nil || injectWS.Name != ws.Name || injectWS.Subset == "" { + if isNotMatchedWS(injectWS, ws) { continue } if _, exist := podMap[injectWS.Subset]; !exist { @@ -393,7 +393,7 @@ func (r *ReconcileWorkloadSpread) groupPod(ws *appsv1alpha1.WorkloadSpread, pods // getSuitableSubsetNameForPod will return (FakeSubsetName, nil) if not found suitable subset for pod func (r *ReconcileWorkloadSpread) getSuitableSubsetNameForPod(ws *appsv1alpha1.WorkloadSpread, pod *corev1.Pod, subsetMissingReplicas map[string]int) (string, error) { injectWS := getInjectWorkloadSpreadFromPod(pod) - if injectWS == nil || injectWS.Name != ws.Name || injectWS.Subset == "" { + if isNotMatchedWS(injectWS, ws) { // process the pods that were created before workloadSpread matchedSubset, err := r.getAndUpdateSuitableSubsetName(ws, pod, subsetMissingReplicas) if err != nil { @@ -426,23 +426,17 @@ func (r *ReconcileWorkloadSpread) getAndUpdateSuitableSubsetName(ws *appsv1alpha for i := range ws.Spec.Subsets { subset := &ws.Spec.Subsets[i] // in case of that this pod was scheduled to the node which matches a subset of workloadSpread - matched, preferredScore, err := matchesSubset(pod, node, subset) + matched, preferredScore, err := matchesSubset(pod, node, subset, subsetMissingReplicas[subset.Name]) if err != nil { // requiredSelectorTerm field was validated at webhook stage, so this error should not occur // this error should not be returned, because it is a non-transient error klog.Errorf("unexpected error occurred when matching pod (%s/%s) with subset, please check requiredSelectorTerm field of subset (%s) in WorkloadSpread (%s/%s), err: %s", pod.Namespace, pod.Name, subset.Name, ws.Namespace, ws.Name, err.Error()) } - quotaScore := int64(0) - // we prefer the subset that still has room for more replicas - if subsetMissingReplicas[subset.Name] > 0 { - quotaScore = int64(1) - } - finalScore := preferredScore*10 + quotaScore // select the most favorite subsets for the pod by subset.PreferredNodeSelectorTerms - if matched && finalScore > maxPreferredScore { + if matched && preferredScore > maxPreferredScore { favoriteSubset = subset - maxPreferredScore = finalScore + maxPreferredScore = preferredScore } } @@ -753,3 +747,10 @@ func (r *ReconcileWorkloadSpread) writeWorkloadSpreadStatus(ws *appsv1alpha1.Wor func getWorkloadSpreadKey(o metav1.Object) string { return o.GetNamespace() + "/" + o.GetName() } + +func isNotMatchedWS(injectWS *wsutil.InjectWorkloadSpread, ws *appsv1alpha1.WorkloadSpread) bool { + if injectWS == nil || injectWS.Name != ws.Name || injectWS.Subset == "" { + return true + } + return false +} diff --git a/pkg/controller/workloadspread/workloadspread_controller_test.go b/pkg/controller/workloadspread/workloadspread_controller_test.go index b9254ba04a..f589a6d794 100644 --- a/pkg/controller/workloadspread/workloadspread_controller_test.go +++ b/pkg/controller/workloadspread/workloadspread_controller_test.go @@ -1426,7 +1426,7 @@ func TestWorkloadSpreadReconcile(t *testing.T) { if !reflect.DeepEqual(annotation1, annotation2) { fmt.Println(annotation1) fmt.Println(annotation2) - t.Fatalf("set Pod deletion-coset annotation failed") + t.Fatalf("set Pod deletion-cost annotation failed") } } diff --git a/pkg/controller/workloadspread/workloadspread_controller_utils.go b/pkg/controller/workloadspread/workloadspread_controller_utils.go index 718859f742..eeb175bd15 100644 --- a/pkg/controller/workloadspread/workloadspread_controller_utils.go +++ b/pkg/controller/workloadspread/workloadspread_controller_utils.go @@ -17,13 +17,17 @@ limitations under the License. package workloadspread import ( + "encoding/json" + "reflect" + + appsv1alpha1 "github.com/openkruise/kruise/apis/apps/v1alpha1" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/strategicpatch" schedulecorev1 "k8s.io/component-helpers/scheduling/corev1" v1helper "k8s.io/component-helpers/scheduling/corev1" "k8s.io/component-helpers/scheduling/corev1/nodeaffinity" - - appsv1alpha1 "github.com/openkruise/kruise/apis/apps/v1alpha1" + "k8s.io/klog/v2" ) // NewWorkloadSpreadSubsetCondition creates a new WorkloadSpreadSubset condition. @@ -85,18 +89,60 @@ func filterOutCondition(conditions []appsv1alpha1.WorkloadSpreadSubsetCondition, return newConditions } -func matchesSubset(pod *corev1.Pod, node *corev1.Node, subset *appsv1alpha1.WorkloadSpreadSubset) (bool, int64, error) { +func matchesSubset(pod *corev1.Pod, node *corev1.Node, subset *appsv1alpha1.WorkloadSpreadSubset, missingReplicas int) (bool, int64, error) { + // necessary condition matched, err := matchesSubsetRequiredAndToleration(pod, node, subset) if err != nil || !matched { return false, -1, err } - if len(subset.PreferredNodeSelectorTerms) == 0 { - return matched, 0, nil + // preferredNodeScore is in [0, total_prefer_weight] + preferredNodeScore := int64(0) + if subset.PreferredNodeSelectorTerms != nil { + nodePreferredTerms, _ := nodeaffinity.NewPreferredSchedulingTerms(subset.PreferredNodeSelectorTerms) + preferredNodeScore = nodePreferredTerms.Score(node) } - nodePreferredTerms, _ := nodeaffinity.NewPreferredSchedulingTerms(subset.PreferredNodeSelectorTerms) - return matched, nodePreferredTerms.Score(node), nil + // preferredPodScore is in [0, 1] + preferredPodScore := int64(0) + if subset.Patch.Raw != nil { + preferredPodScore = podPreferredScore(subset, pod) + } + + // we prefer the subset that still has room for more replicas + quotaScore := int64(0) + if missingReplicas > 0 { + quotaScore = int64(1) + } + + // preferredPodScore is in [0, 1], so it cannot affect preferredNodeScore in the following expression + preferredScore := preferredNodeScore*100 + preferredPodScore*10 + quotaScore + return matched, preferredScore, nil +} + +func podPreferredScore(subset *appsv1alpha1.WorkloadSpreadSubset, pod *corev1.Pod) int64 { + podBytes, _ := json.Marshal(pod) + modified, err := strategicpatch.StrategicMergePatch(podBytes, subset.Patch.Raw, &corev1.Pod{}) + if err != nil { + klog.Errorf("failed to merge patch raw for pod %v and subset %v", klog.KObj(pod), subset.Name) + return 0 + } + patchedPod := &corev1.Pod{} + err = json.Unmarshal(modified, patchedPod) + if err != nil { + klog.Errorf("failed to unmarshal for pod %v and subset %v", klog.KObj(pod), subset.Name) + return 0 + } + // TODO: consider json annotation just like `{"json_key": ["value1", "value2"]}`. + // currently, we exclude annotations field because annotation may contain some filed we cannot handle. + // For example, we cannot judge whether the following two annotations are equal via DeepEqual method: + // example.com/list: '["a", "b", "c"]' + // example.com/list: '["b", "a", "c"]' + patchedPod.Annotations = pod.Annotations + if reflect.DeepEqual(pod, patchedPod) { + return 1 + } + return 0 } func matchesSubsetRequiredAndToleration(pod *corev1.Pod, node *corev1.Node, subset *appsv1alpha1.WorkloadSpreadSubset) (bool, error) { diff --git a/pkg/controller/workloadspread/workloadspread_controller_utils_test.go b/pkg/controller/workloadspread/workloadspread_controller_utils_test.go new file mode 100644 index 0000000000..966ec426fb --- /dev/null +++ b/pkg/controller/workloadspread/workloadspread_controller_utils_test.go @@ -0,0 +1,239 @@ +/* +Copyright 2021 The Kruise Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package workloadspread + +import ( + "fmt" + "testing" + + appsv1alpha1 "github.com/openkruise/kruise/apis/apps/v1alpha1" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/util/intstr" +) + +const ( + testScheduleRequiredKey = "apps.kruise.io/schedule-required" + testSchedulePreferredKey = "apps.kruise.io/schedule-preferred" + testNodeTestLabelKey = "apps.kruise.io/is-test" + testNodeUnitTestLabelKey = "apps.kruise.io/is-unit-test" + testNodeTolerationKey = "apps.kruise.io/toleration" + testSubsetPatchLabelKey = "apps.kruise.io/patch-label" + testSubsetPatchAnnoKey = "apps.kruise.io/patch-annotation" +) + +var ( + matchSubsetDemo = appsv1alpha1.WorkloadSpreadSubset{ + Name: "subset-a", + MaxReplicas: &intstr.IntOrString{Type: intstr.Int, IntVal: 5}, + Tolerations: []corev1.Toleration{ + { + Key: testNodeTolerationKey, + Operator: corev1.TolerationOpEqual, + Value: "true", + }, + }, + RequiredNodeSelectorTerm: &corev1.NodeSelectorTerm{ + MatchExpressions: []corev1.NodeSelectorRequirement{ + { + Key: testScheduleRequiredKey, + Operator: corev1.NodeSelectorOpIn, + Values: []string{"true"}, + }, + }, + }, + PreferredNodeSelectorTerms: []corev1.PreferredSchedulingTerm{ + { + Weight: 100, + Preference: corev1.NodeSelectorTerm{ + MatchExpressions: []corev1.NodeSelectorRequirement{ + { + Key: testSchedulePreferredKey, + Operator: corev1.NodeSelectorOpIn, + Values: []string{"true"}, + }, + }, + }, + }, + }, + Patch: runtime.RawExtension{ + Raw: []byte(fmt.Sprintf(`{"metadata":{"labels":{"%s":"true"},"annotations":{"%s":"true"}}}`, testSubsetPatchLabelKey, testSubsetPatchAnnoKey)), + }, + } + + matchPodDemo = corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Annotations: map[string]string{ + testSubsetPatchAnnoKey: "true", + }, + Labels: map[string]string{ + testSubsetPatchLabelKey: "true", + }, + }, + Spec: corev1.PodSpec{ + Affinity: &corev1.Affinity{ + NodeAffinity: &corev1.NodeAffinity{ + RequiredDuringSchedulingIgnoredDuringExecution: &corev1.NodeSelector{ + NodeSelectorTerms: []corev1.NodeSelectorTerm{ + { + MatchExpressions: []corev1.NodeSelectorRequirement{ + { + Key: testNodeTestLabelKey, + Operator: corev1.NodeSelectorOpIn, + Values: []string{"true"}, + }, + { + Key: testNodeUnitTestLabelKey, + Operator: corev1.NodeSelectorOpIn, + Values: []string{"true"}, + }, + }, + }, + }, + }, + }, + }, + }, + } + + matchNodeDemo = corev1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Labels: map[string]string{ + testNodeTestLabelKey: "true", + testNodeUnitTestLabelKey: "true", + testScheduleRequiredKey: "true", + testSchedulePreferredKey: "true", + }, + }, + Spec: corev1.NodeSpec{ + Taints: []corev1.Taint{ + { + Key: testNodeTolerationKey, + Value: "true", + Effect: corev1.TaintEffectNoSchedule, + }, + }, + }, + } +) + +func TestMatchSubset(t *testing.T) { + cases := []struct { + name string + isMatch bool + score int64 + getSubset func() *appsv1alpha1.WorkloadSpreadSubset + getNode func() *corev1.Node + getPod func() *corev1.Pod + }{ + { + name: "match=true, Score=10010", + isMatch: true, + score: 10010, + getSubset: func() *appsv1alpha1.WorkloadSpreadSubset { + return matchSubsetDemo.DeepCopy() + }, + getNode: func() *corev1.Node { + return matchNodeDemo.DeepCopy() + }, + getPod: func() *corev1.Pod { + return matchPodDemo.DeepCopy() + }, + }, + { + name: "match=true, Score=10000", + isMatch: true, + score: 10000, + getSubset: func() *appsv1alpha1.WorkloadSpreadSubset { + return matchSubsetDemo.DeepCopy() + }, + getNode: func() *corev1.Node { + return matchNodeDemo.DeepCopy() + }, + getPod: func() *corev1.Pod { + pod := matchPodDemo.DeepCopy() + pod.Labels[testSubsetPatchLabelKey] = "false" + pod.Annotations[testSubsetPatchAnnoKey] = "false" + return pod + }, + }, + { + name: "match=true, Score=10, preferred key not match", + isMatch: true, + score: 10, + getSubset: func() *appsv1alpha1.WorkloadSpreadSubset { + return matchSubsetDemo.DeepCopy() + }, + getNode: func() *corev1.Node { + node := matchNodeDemo.DeepCopy() + node.Labels[testSchedulePreferredKey] = "false" + return node + }, + getPod: func() *corev1.Pod { + return matchPodDemo.DeepCopy() + }, + }, + { + name: "match=false, Score=-1, required key not match", + isMatch: false, + score: -1, + getSubset: func() *appsv1alpha1.WorkloadSpreadSubset { + return matchSubsetDemo.DeepCopy() + }, + getNode: func() *corev1.Node { + node := matchNodeDemo.DeepCopy() + node.Labels[testScheduleRequiredKey] = "false" + return node + }, + getPod: func() *corev1.Pod { + return matchPodDemo.DeepCopy() + }, + }, + { + name: "match=false, Score=-1, toleration key not match", + isMatch: false, + score: -1, + getSubset: func() *appsv1alpha1.WorkloadSpreadSubset { + subset := matchSubsetDemo.DeepCopy() + subset.Tolerations[0].Value = "false" + return subsetDemo.DeepCopy() + }, + getNode: func() *corev1.Node { + return matchNodeDemo.DeepCopy() + }, + getPod: func() *corev1.Pod { + return matchPodDemo.DeepCopy() + }, + }, + } + + for _, cs := range cases { + t.Run(cs.name, func(t *testing.T) { + subset := cs.getSubset() + node := cs.getNode() + pod := cs.getPod() + isMatch, score, err := matchesSubset(pod, node, subset, 0) + if err != nil { + t.Fatal("unexpected err occurred") + } + if isMatch != cs.isMatch || score != cs.score { + t.Fatalf("expect: %v %v, but got %v %v ", cs.isMatch, cs.score, isMatch, score) + } + }) + } +}