Skip to content

Commit

Permalink
consider whether patch field is matched when assign existing pods to …
Browse files Browse the repository at this point in the history
…subset

Signed-off-by: mingzhou.swx <[email protected]>
  • Loading branch information
mingzhou.swx committed Sep 22, 2022
1 parent 4edba53 commit f92d1d4
Show file tree
Hide file tree
Showing 3 changed files with 296 additions and 16 deletions.
23 changes: 12 additions & 11 deletions pkg/controller/workloadspread/workloadspread_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -364,7 +364,7 @@ func (r *ReconcileWorkloadSpread) groupPod(ws *appsv1alpha1.WorkloadSpread, pods
// count managed pods for each subset
for i := range pods {
injectWS := getInjectWorkloadSpreadFromPod(pods[i])
if injectWS == nil || injectWS.Name != ws.Name || injectWS.Subset == "" {
if isNotMatchedWS(injectWS, ws) {
continue
}
if _, exist := podMap[injectWS.Subset]; !exist {
Expand Down Expand Up @@ -393,7 +393,7 @@ func (r *ReconcileWorkloadSpread) groupPod(ws *appsv1alpha1.WorkloadSpread, pods
// getSuitableSubsetNameForPod will return (FakeSubsetName, nil) if not found suitable subset for pod
func (r *ReconcileWorkloadSpread) getSuitableSubsetNameForPod(ws *appsv1alpha1.WorkloadSpread, pod *corev1.Pod, subsetMissingReplicas map[string]int) (string, error) {
injectWS := getInjectWorkloadSpreadFromPod(pod)
if injectWS == nil || injectWS.Name != ws.Name || injectWS.Subset == "" {
if isNotMatchedWS(injectWS, ws) {
// process the pods that were created before workloadSpread
matchedSubset, err := r.getAndUpdateSuitableSubsetName(ws, pod, subsetMissingReplicas)
if err != nil {
Expand Down Expand Up @@ -426,23 +426,17 @@ func (r *ReconcileWorkloadSpread) getAndUpdateSuitableSubsetName(ws *appsv1alpha
for i := range ws.Spec.Subsets {
subset := &ws.Spec.Subsets[i]
// in case of that this pod was scheduled to the node which matches a subset of workloadSpread
matched, preferredScore, err := matchesSubset(pod, node, subset)
matched, preferredScore, err := matchesSubset(pod, node, subset, subsetMissingReplicas[subset.Name])
if err != nil {
// requiredSelectorTerm field was validated at webhook stage, so this error should not occur
// this error should not be returned, because it is a non-transient error
klog.Errorf("unexpected error occurred when matching pod (%s/%s) with subset, please check requiredSelectorTerm field of subset (%s) in WorkloadSpread (%s/%s), err: %s",
pod.Namespace, pod.Name, subset.Name, ws.Namespace, ws.Name, err.Error())
}
quotaScore := int64(0)
// we prefer the subset that still has room for more replicas
if subsetMissingReplicas[subset.Name] > 0 {
quotaScore = int64(1)
}
finalScore := preferredScore*10 + quotaScore
// select the most favorite subsets for the pod by subset.PreferredNodeSelectorTerms
if matched && finalScore > maxPreferredScore {
if matched && preferredScore > maxPreferredScore {
favoriteSubset = subset
maxPreferredScore = finalScore
maxPreferredScore = preferredScore
}
}

Expand Down Expand Up @@ -753,3 +747,10 @@ func (r *ReconcileWorkloadSpread) writeWorkloadSpreadStatus(ws *appsv1alpha1.Wor
func getWorkloadSpreadKey(o metav1.Object) string {
return o.GetNamespace() + "/" + o.GetName()
}

func isNotMatchedWS(injectWS *wsutil.InjectWorkloadSpread, ws *appsv1alpha1.WorkloadSpread) bool {
if injectWS == nil || injectWS.Name != ws.Name || injectWS.Subset == "" {
return false
}
return true
}
50 changes: 45 additions & 5 deletions pkg/controller/workloadspread/workloadspread_controller_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,15 @@ limitations under the License.
package workloadspread

import (
"encoding/json"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/strategicpatch"
schedulecorev1 "k8s.io/component-helpers/scheduling/corev1"
v1helper "k8s.io/component-helpers/scheduling/corev1"
"k8s.io/component-helpers/scheduling/corev1/nodeaffinity"
"k8s.io/klog/v2"
"reflect"

appsv1alpha1 "github.com/openkruise/kruise/apis/apps/v1alpha1"
)
Expand Down Expand Up @@ -85,18 +89,54 @@ func filterOutCondition(conditions []appsv1alpha1.WorkloadSpreadSubsetCondition,
return newConditions
}

func matchesSubset(pod *corev1.Pod, node *corev1.Node, subset *appsv1alpha1.WorkloadSpreadSubset) (bool, int64, error) {
func matchesSubset(pod *corev1.Pod, node *corev1.Node, subset *appsv1alpha1.WorkloadSpreadSubset, missingReplicas int) (bool, int64, error) {
// necessary condition
matched, err := matchesSubsetRequiredAndToleration(pod, node, subset)
if err != nil || !matched {
return false, -1, err
}

if len(subset.PreferredNodeSelectorTerms) == 0 {
return matched, 0, nil
// preferredNodeScore is in [0, total_prefer_weight]
preferredNodeScore := int64(0)
if subset.PreferredNodeSelectorTerms != nil {
nodePreferredTerms, _ := nodeaffinity.NewPreferredSchedulingTerms(subset.PreferredNodeSelectorTerms)
preferredNodeScore = nodePreferredTerms.Score(node)
}

nodePreferredTerms, _ := nodeaffinity.NewPreferredSchedulingTerms(subset.PreferredNodeSelectorTerms)
return matched, nodePreferredTerms.Score(node), nil
// preferredPodScore is in [0, 2]
preferredPodScore := int64(0)
if subset.Patch.Raw != nil {
preferredPodScore = podPreferredScore(subset, pod)
}

// we prefer the subset that still has room for more replicas
quotaScore := int64(0)
if missingReplicas > 0 {
quotaScore = int64(1)
}

// preferredPodScore is in {0, 1}, so it cannot affect preferredNodeScore in the following expression
preferredScore := preferredNodeScore*100 + preferredPodScore*10 + quotaScore
return matched, preferredScore, nil
}

func podPreferredScore(subset *appsv1alpha1.WorkloadSpreadSubset, pod *corev1.Pod) int64 {
podBytes, _ := json.Marshal(pod)
modified, err := strategicpatch.StrategicMergePatch(podBytes, subset.Patch.Raw, &corev1.Pod{})
if err != nil {
klog.Errorf("failed to merge patch raw for pod %v and subset %v", klog.KObj(pod), subset.Name)
return 0
}
patchedPod := &corev1.Pod{}
err = json.Unmarshal(modified, patchedPod)
if err != nil {
klog.Errorf("failed to unmarshal for pod %v and subset %v", klog.KObj(pod), subset.Name)
return 0
}
if reflect.DeepEqual(pod, patchedPod) {
return 1
}
return 0
}

func matchesSubsetRequiredAndToleration(pod *corev1.Pod, node *corev1.Node, subset *appsv1alpha1.WorkloadSpreadSubset) (bool, error) {
Expand Down
239 changes: 239 additions & 0 deletions pkg/controller/workloadspread/workloadspread_controller_utils_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,239 @@
/*
Copyright 2021 The Kruise Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package workloadspread

import (
"fmt"
"testing"

appsv1alpha1 "github.com/openkruise/kruise/apis/apps/v1alpha1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/intstr"
)

const (
testScheduleRequiredKey = "apps.kruise.io/schedule-required"
testSchedulePreferredKey = "apps.kruise.io/schedule-preferred"
testNodeTestLabelKey = "apps.kruise.io/is-test"
testNodeUnitTestLabelKey = "apps.kruise.io/is-unit-test"
testNodeTolerationKey = "apps.kruise.io/toleration"
testSubsetPatchLabelKey = "apps.kruise.io/patch-label"
testSubsetPatchAnnoKey = "apps.kruise.io/patch-annotation"
)

var (
matchSubsetDemo = appsv1alpha1.WorkloadSpreadSubset{
Name: "subset-a",
MaxReplicas: &intstr.IntOrString{Type: intstr.Int, IntVal: 5},
Tolerations: []corev1.Toleration{
{
Key: testNodeTolerationKey,
Operator: corev1.TolerationOpEqual,
Value: "true",
},
},
RequiredNodeSelectorTerm: &corev1.NodeSelectorTerm{
MatchExpressions: []corev1.NodeSelectorRequirement{
{
Key: testScheduleRequiredKey,
Operator: corev1.NodeSelectorOpIn,
Values: []string{"true"},
},
},
},
PreferredNodeSelectorTerms: []corev1.PreferredSchedulingTerm{
{
Weight: 100,
Preference: corev1.NodeSelectorTerm{
MatchExpressions: []corev1.NodeSelectorRequirement{
{
Key: testSchedulePreferredKey,
Operator: corev1.NodeSelectorOpIn,
Values: []string{"true"},
},
},
},
},
},
Patch: runtime.RawExtension{
Raw: []byte(fmt.Sprintf(`{"metadata":{"labels":{"%s":"true"},"annotations":{"%s":"true"}}}`, testSubsetPatchLabelKey, testSubsetPatchAnnoKey)),
},
}

matchPodDemo = corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Annotations: map[string]string{
testSubsetPatchAnnoKey: "true",
},
Labels: map[string]string{
testSubsetPatchLabelKey: "true",
},
},
Spec: corev1.PodSpec{
Affinity: &corev1.Affinity{
NodeAffinity: &corev1.NodeAffinity{
RequiredDuringSchedulingIgnoredDuringExecution: &corev1.NodeSelector{
NodeSelectorTerms: []corev1.NodeSelectorTerm{
{
MatchExpressions: []corev1.NodeSelectorRequirement{
{
Key: testNodeTestLabelKey,
Operator: corev1.NodeSelectorOpIn,
Values: []string{"true"},
},
{
Key: testNodeUnitTestLabelKey,
Operator: corev1.NodeSelectorOpIn,
Values: []string{"true"},
},
},
},
},
},
},
},
},
}

matchNodeDemo = corev1.Node{
ObjectMeta: metav1.ObjectMeta{
Labels: map[string]string{
testNodeTestLabelKey: "true",
testNodeUnitTestLabelKey: "true",
testScheduleRequiredKey: "true",
testSchedulePreferredKey: "true",
},
},
Spec: corev1.NodeSpec{
Taints: []corev1.Taint{
{
Key: testNodeTolerationKey,
Value: "true",
Effect: corev1.TaintEffectNoSchedule,
},
},
},
}
)

func TestMatchSubset(t *testing.T) {
cases := []struct {
name string
isMatch bool
score int64
getSubset func() *appsv1alpha1.WorkloadSpreadSubset
getNode func() *corev1.Node
getPod func() *corev1.Pod
}{
{
name: "match=true, Score=10010",
isMatch: true,
score: 10010,
getSubset: func() *appsv1alpha1.WorkloadSpreadSubset {
return matchSubsetDemo.DeepCopy()
},
getNode: func() *corev1.Node {
return matchNodeDemo.DeepCopy()
},
getPod: func() *corev1.Pod {
return matchPodDemo.DeepCopy()
},
},
{
name: "match=true, Score=10000",
isMatch: true,
score: 10000,
getSubset: func() *appsv1alpha1.WorkloadSpreadSubset {
return matchSubsetDemo.DeepCopy()
},
getNode: func() *corev1.Node {
return matchNodeDemo.DeepCopy()
},
getPod: func() *corev1.Pod {
pod := matchPodDemo.DeepCopy()
pod.Labels[testSubsetPatchLabelKey] = "false"
pod.Annotations[testSubsetPatchAnnoKey] = "false"
return pod
},
},
{
name: "match=true, Score=10, preferred key not match",
isMatch: true,
score: 10,
getSubset: func() *appsv1alpha1.WorkloadSpreadSubset {
return matchSubsetDemo.DeepCopy()
},
getNode: func() *corev1.Node {
node := matchNodeDemo.DeepCopy()
node.Labels[testSchedulePreferredKey] = "false"
return node
},
getPod: func() *corev1.Pod {
return matchPodDemo.DeepCopy()
},
},
{
name: "match=false, Score=-1, required key not match",
isMatch: false,
score: -1,
getSubset: func() *appsv1alpha1.WorkloadSpreadSubset {
return matchSubsetDemo.DeepCopy()
},
getNode: func() *corev1.Node {
node := matchNodeDemo.DeepCopy()
node.Labels[testScheduleRequiredKey] = "false"
return node
},
getPod: func() *corev1.Pod {
return matchPodDemo.DeepCopy()
},
},
{
name: "match=false, Score=-1, toleration key not match",
isMatch: false,
score: -1,
getSubset: func() *appsv1alpha1.WorkloadSpreadSubset {
subset := matchSubsetDemo.DeepCopy()
subset.Tolerations[0].Value = "false"
return subsetDemo.DeepCopy()
},
getNode: func() *corev1.Node {
return matchNodeDemo.DeepCopy()
},
getPod: func() *corev1.Pod {
return matchPodDemo.DeepCopy()
},
},
}

for _, cs := range cases {
t.Run(cs.name, func(t *testing.T) {
subset := cs.getSubset()
node := cs.getNode()
pod := cs.getPod()
isMatch, score, err := matchesSubset(pod, node, subset, 0)
if err != nil {
t.Fatal("unexpected err occurred")
}
if isMatch != cs.isMatch || score != cs.score {
t.Fatalf("expect: %v %v, but got %v %v ", cs.isMatch, cs.score, isMatch, score)
}
})
}
}

0 comments on commit f92d1d4

Please sign in to comment.