Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WorkloadSpread consider whether patch field is matched when assign existing pods to subset #1083

Merged
merged 1 commit into from
Sep 23, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 12 additions & 11 deletions pkg/controller/workloadspread/workloadspread_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -364,7 +364,7 @@ func (r *ReconcileWorkloadSpread) groupPod(ws *appsv1alpha1.WorkloadSpread, pods
// count managed pods for each subset
for i := range pods {
injectWS := getInjectWorkloadSpreadFromPod(pods[i])
if injectWS == nil || injectWS.Name != ws.Name || injectWS.Subset == "" {
if isNotMatchedWS(injectWS, ws) {
continue
}
if _, exist := podMap[injectWS.Subset]; !exist {
Expand Down Expand Up @@ -393,7 +393,7 @@ func (r *ReconcileWorkloadSpread) groupPod(ws *appsv1alpha1.WorkloadSpread, pods
// getSuitableSubsetNameForPod will return (FakeSubsetName, nil) if not found suitable subset for pod
func (r *ReconcileWorkloadSpread) getSuitableSubsetNameForPod(ws *appsv1alpha1.WorkloadSpread, pod *corev1.Pod, subsetMissingReplicas map[string]int) (string, error) {
injectWS := getInjectWorkloadSpreadFromPod(pod)
if injectWS == nil || injectWS.Name != ws.Name || injectWS.Subset == "" {
if isNotMatchedWS(injectWS, ws) {
// process the pods that were created before workloadSpread
matchedSubset, err := r.getAndUpdateSuitableSubsetName(ws, pod, subsetMissingReplicas)
if err != nil {
Expand Down Expand Up @@ -426,23 +426,17 @@ func (r *ReconcileWorkloadSpread) getAndUpdateSuitableSubsetName(ws *appsv1alpha
for i := range ws.Spec.Subsets {
subset := &ws.Spec.Subsets[i]
// in case of that this pod was scheduled to the node which matches a subset of workloadSpread
matched, preferredScore, err := matchesSubset(pod, node, subset)
matched, preferredScore, err := matchesSubset(pod, node, subset, subsetMissingReplicas[subset.Name])
if err != nil {
// requiredSelectorTerm field was validated at webhook stage, so this error should not occur
// this error should not be returned, because it is a non-transient error
klog.Errorf("unexpected error occurred when matching pod (%s/%s) with subset, please check requiredSelectorTerm field of subset (%s) in WorkloadSpread (%s/%s), err: %s",
pod.Namespace, pod.Name, subset.Name, ws.Namespace, ws.Name, err.Error())
}
quotaScore := int64(0)
// we prefer the subset that still has room for more replicas
if subsetMissingReplicas[subset.Name] > 0 {
quotaScore = int64(1)
}
finalScore := preferredScore*10 + quotaScore
// select the most favorite subsets for the pod by subset.PreferredNodeSelectorTerms
if matched && finalScore > maxPreferredScore {
if matched && preferredScore > maxPreferredScore {
favoriteSubset = subset
maxPreferredScore = finalScore
maxPreferredScore = preferredScore
}
}

Expand Down Expand Up @@ -753,3 +747,10 @@ func (r *ReconcileWorkloadSpread) writeWorkloadSpreadStatus(ws *appsv1alpha1.Wor
func getWorkloadSpreadKey(o metav1.Object) string {
return o.GetNamespace() + "/" + o.GetName()
}

func isNotMatchedWS(injectWS *wsutil.InjectWorkloadSpread, ws *appsv1alpha1.WorkloadSpread) bool {
if injectWS == nil || injectWS.Name != ws.Name || injectWS.Subset == "" {
return true
}
return false
}
Original file line number Diff line number Diff line change
Expand Up @@ -1426,7 +1426,7 @@ func TestWorkloadSpreadReconcile(t *testing.T) {
if !reflect.DeepEqual(annotation1, annotation2) {
fmt.Println(annotation1)
fmt.Println(annotation2)
t.Fatalf("set Pod deletion-coset annotation failed")
t.Fatalf("set Pod deletion-cost annotation failed")
}
}

Expand Down
60 changes: 53 additions & 7 deletions pkg/controller/workloadspread/workloadspread_controller_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,17 @@ limitations under the License.
package workloadspread

import (
"encoding/json"
veophi marked this conversation as resolved.
Show resolved Hide resolved
"reflect"

appsv1alpha1 "github.com/openkruise/kruise/apis/apps/v1alpha1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/strategicpatch"
schedulecorev1 "k8s.io/component-helpers/scheduling/corev1"
v1helper "k8s.io/component-helpers/scheduling/corev1"
"k8s.io/component-helpers/scheduling/corev1/nodeaffinity"

appsv1alpha1 "github.com/openkruise/kruise/apis/apps/v1alpha1"
"k8s.io/klog/v2"
)

// NewWorkloadSpreadSubsetCondition creates a new WorkloadSpreadSubset condition.
Expand Down Expand Up @@ -85,18 +89,60 @@ func filterOutCondition(conditions []appsv1alpha1.WorkloadSpreadSubsetCondition,
return newConditions
}

func matchesSubset(pod *corev1.Pod, node *corev1.Node, subset *appsv1alpha1.WorkloadSpreadSubset) (bool, int64, error) {
func matchesSubset(pod *corev1.Pod, node *corev1.Node, subset *appsv1alpha1.WorkloadSpreadSubset, missingReplicas int) (bool, int64, error) {
// necessary condition
matched, err := matchesSubsetRequiredAndToleration(pod, node, subset)
if err != nil || !matched {
return false, -1, err
}

if len(subset.PreferredNodeSelectorTerms) == 0 {
return matched, 0, nil
// preferredNodeScore is in [0, total_prefer_weight]
preferredNodeScore := int64(0)
if subset.PreferredNodeSelectorTerms != nil {
nodePreferredTerms, _ := nodeaffinity.NewPreferredSchedulingTerms(subset.PreferredNodeSelectorTerms)
preferredNodeScore = nodePreferredTerms.Score(node)
}

nodePreferredTerms, _ := nodeaffinity.NewPreferredSchedulingTerms(subset.PreferredNodeSelectorTerms)
return matched, nodePreferredTerms.Score(node), nil
// preferredPodScore is in [0, 1]
preferredPodScore := int64(0)
if subset.Patch.Raw != nil {
preferredPodScore = podPreferredScore(subset, pod)
}

// we prefer the subset that still has room for more replicas
quotaScore := int64(0)
if missingReplicas > 0 {
quotaScore = int64(1)
}

// preferredPodScore is in [0, 1], so it cannot affect preferredNodeScore in the following expression
preferredScore := preferredNodeScore*100 + preferredPodScore*10 + quotaScore
return matched, preferredScore, nil
}

func podPreferredScore(subset *appsv1alpha1.WorkloadSpreadSubset, pod *corev1.Pod) int64 {
podBytes, _ := json.Marshal(pod)
modified, err := strategicpatch.StrategicMergePatch(podBytes, subset.Patch.Raw, &corev1.Pod{})
if err != nil {
klog.Errorf("failed to merge patch raw for pod %v and subset %v", klog.KObj(pod), subset.Name)
return 0
}
patchedPod := &corev1.Pod{}
err = json.Unmarshal(modified, patchedPod)
if err != nil {
klog.Errorf("failed to unmarshal for pod %v and subset %v", klog.KObj(pod), subset.Name)
return 0
}
// TODO: consider json annotation just like `{"json_key": ["value1", "value2"]}`.
// currently, we exclude annotations field because annotation may contain some filed we cannot handle.
// For example, we cannot judge whether the following two annotations are equal via DeepEqual method:
// example.com/list: '["a", "b", "c"]'
// example.com/list: '["b", "a", "c"]'
patchedPod.Annotations = pod.Annotations
if reflect.DeepEqual(pod, patchedPod) {
return 1
}
return 0
}

func matchesSubsetRequiredAndToleration(pod *corev1.Pod, node *corev1.Node, subset *appsv1alpha1.WorkloadSpreadSubset) (bool, error) {
Expand Down
239 changes: 239 additions & 0 deletions pkg/controller/workloadspread/workloadspread_controller_utils_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,239 @@
/*
Copyright 2021 The Kruise Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package workloadspread

import (
"fmt"
"testing"

appsv1alpha1 "github.com/openkruise/kruise/apis/apps/v1alpha1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/intstr"
)

const (
testScheduleRequiredKey = "apps.kruise.io/schedule-required"
testSchedulePreferredKey = "apps.kruise.io/schedule-preferred"
testNodeTestLabelKey = "apps.kruise.io/is-test"
testNodeUnitTestLabelKey = "apps.kruise.io/is-unit-test"
testNodeTolerationKey = "apps.kruise.io/toleration"
testSubsetPatchLabelKey = "apps.kruise.io/patch-label"
testSubsetPatchAnnoKey = "apps.kruise.io/patch-annotation"
)

var (
matchSubsetDemo = appsv1alpha1.WorkloadSpreadSubset{
Name: "subset-a",
MaxReplicas: &intstr.IntOrString{Type: intstr.Int, IntVal: 5},
Tolerations: []corev1.Toleration{
{
Key: testNodeTolerationKey,
Operator: corev1.TolerationOpEqual,
Value: "true",
},
},
RequiredNodeSelectorTerm: &corev1.NodeSelectorTerm{
MatchExpressions: []corev1.NodeSelectorRequirement{
{
Key: testScheduleRequiredKey,
Operator: corev1.NodeSelectorOpIn,
Values: []string{"true"},
},
},
},
PreferredNodeSelectorTerms: []corev1.PreferredSchedulingTerm{
{
Weight: 100,
Preference: corev1.NodeSelectorTerm{
MatchExpressions: []corev1.NodeSelectorRequirement{
{
Key: testSchedulePreferredKey,
Operator: corev1.NodeSelectorOpIn,
Values: []string{"true"},
},
},
},
},
},
Patch: runtime.RawExtension{
Raw: []byte(fmt.Sprintf(`{"metadata":{"labels":{"%s":"true"},"annotations":{"%s":"true"}}}`, testSubsetPatchLabelKey, testSubsetPatchAnnoKey)),
},
}

matchPodDemo = corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Annotations: map[string]string{
testSubsetPatchAnnoKey: "true",
},
Labels: map[string]string{
testSubsetPatchLabelKey: "true",
},
},
Spec: corev1.PodSpec{
Affinity: &corev1.Affinity{
NodeAffinity: &corev1.NodeAffinity{
RequiredDuringSchedulingIgnoredDuringExecution: &corev1.NodeSelector{
NodeSelectorTerms: []corev1.NodeSelectorTerm{
{
MatchExpressions: []corev1.NodeSelectorRequirement{
{
Key: testNodeTestLabelKey,
Operator: corev1.NodeSelectorOpIn,
Values: []string{"true"},
},
{
Key: testNodeUnitTestLabelKey,
Operator: corev1.NodeSelectorOpIn,
Values: []string{"true"},
},
},
},
},
},
},
},
},
}

matchNodeDemo = corev1.Node{
ObjectMeta: metav1.ObjectMeta{
Labels: map[string]string{
testNodeTestLabelKey: "true",
testNodeUnitTestLabelKey: "true",
testScheduleRequiredKey: "true",
testSchedulePreferredKey: "true",
},
},
Spec: corev1.NodeSpec{
Taints: []corev1.Taint{
{
Key: testNodeTolerationKey,
Value: "true",
Effect: corev1.TaintEffectNoSchedule,
},
},
},
}
)

func TestMatchSubset(t *testing.T) {
cases := []struct {
name string
isMatch bool
score int64
getSubset func() *appsv1alpha1.WorkloadSpreadSubset
getNode func() *corev1.Node
getPod func() *corev1.Pod
}{
{
name: "match=true, Score=10010",
isMatch: true,
score: 10010,
getSubset: func() *appsv1alpha1.WorkloadSpreadSubset {
return matchSubsetDemo.DeepCopy()
},
getNode: func() *corev1.Node {
return matchNodeDemo.DeepCopy()
},
getPod: func() *corev1.Pod {
return matchPodDemo.DeepCopy()
},
},
{
name: "match=true, Score=10000",
isMatch: true,
score: 10000,
getSubset: func() *appsv1alpha1.WorkloadSpreadSubset {
return matchSubsetDemo.DeepCopy()
},
getNode: func() *corev1.Node {
return matchNodeDemo.DeepCopy()
},
getPod: func() *corev1.Pod {
pod := matchPodDemo.DeepCopy()
pod.Labels[testSubsetPatchLabelKey] = "false"
pod.Annotations[testSubsetPatchAnnoKey] = "false"
return pod
},
},
{
name: "match=true, Score=10, preferred key not match",
isMatch: true,
score: 10,
getSubset: func() *appsv1alpha1.WorkloadSpreadSubset {
return matchSubsetDemo.DeepCopy()
},
getNode: func() *corev1.Node {
node := matchNodeDemo.DeepCopy()
node.Labels[testSchedulePreferredKey] = "false"
return node
},
getPod: func() *corev1.Pod {
return matchPodDemo.DeepCopy()
},
},
{
name: "match=false, Score=-1, required key not match",
isMatch: false,
score: -1,
getSubset: func() *appsv1alpha1.WorkloadSpreadSubset {
return matchSubsetDemo.DeepCopy()
},
getNode: func() *corev1.Node {
node := matchNodeDemo.DeepCopy()
node.Labels[testScheduleRequiredKey] = "false"
return node
},
getPod: func() *corev1.Pod {
return matchPodDemo.DeepCopy()
},
},
{
name: "match=false, Score=-1, toleration key not match",
isMatch: false,
score: -1,
getSubset: func() *appsv1alpha1.WorkloadSpreadSubset {
subset := matchSubsetDemo.DeepCopy()
subset.Tolerations[0].Value = "false"
return subsetDemo.DeepCopy()
},
getNode: func() *corev1.Node {
return matchNodeDemo.DeepCopy()
},
getPod: func() *corev1.Pod {
return matchPodDemo.DeepCopy()
},
},
}

for _, cs := range cases {
t.Run(cs.name, func(t *testing.T) {
subset := cs.getSubset()
node := cs.getNode()
pod := cs.getPod()
isMatch, score, err := matchesSubset(pod, node, subset, 0)
if err != nil {
t.Fatal("unexpected err occurred")
}
if isMatch != cs.isMatch || score != cs.score {
t.Fatalf("expect: %v %v, but got %v %v ", cs.isMatch, cs.score, isMatch, score)
}
})
}
}