Skip to content

Commit

Permalink
consider whether patch field is matched when assign existing pods to …
Browse files Browse the repository at this point in the history
…subset

Signed-off-by: mingzhou.swx <[email protected]>
  • Loading branch information
mingzhou.swx committed Sep 21, 2022
1 parent 4edba53 commit 60a745a
Show file tree
Hide file tree
Showing 3 changed files with 335 additions and 14 deletions.
47 changes: 39 additions & 8 deletions pkg/controller/workloadspread/workloadspread_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -355,16 +355,27 @@ func (r *ReconcileWorkloadSpread) groupPod(ws *appsv1alpha1.WorkloadSpread, pods
podMap := make(map[string][]*corev1.Pod, len(ws.Spec.Subsets)+1)
podMap[FakeSubsetName] = []*corev1.Pod{}
subsetMissingReplicas := make(map[string]int)
subsetPatchFields := make(map[string]*corev1.Pod)
// is there any pod that has wrong matched-workloadSpread annotation?
hasSolitaryPods := existsSolitaryPods(pods, ws)
for _, subset := range ws.Spec.Subsets {
podMap[subset.Name] = []*corev1.Pod{}
subsetMissingReplicas[subset.Name], _ = intstr.GetScaledValueFromIntOrPercent(
intstr.ValueOrDefault(subset.MaxReplicas, intstr.FromInt(math.MaxInt32)), int(replicas), true)
// we may use subset patch field to selector suitable subset for solitary pods.
// hence, we should unmarshal them at first, to void repeat unmarshal operations.
if subset.Patch.Raw != nil && hasSolitaryPods {
subsetPatchFields[subset.Name] = &corev1.Pod{}
if err := json.Unmarshal(subset.Patch.Raw, subsetPatchFields[subset.Name]); err != nil {
klog.Errorf("Cannot unmarshal patch for subset %v of WorkloadSpread %v", subset.Name, klog.KObj(ws))
}
}
}

// count managed pods for each subset
for i := range pods {
injectWS := getInjectWorkloadSpreadFromPod(pods[i])
if injectWS == nil || injectWS.Name != ws.Name || injectWS.Subset == "" {
if isMatchedWS(injectWS, ws) {
continue
}
if _, exist := podMap[injectWS.Subset]; !exist {
Expand All @@ -374,7 +385,7 @@ func (r *ReconcileWorkloadSpread) groupPod(ws *appsv1alpha1.WorkloadSpread, pods
}

for i := range pods {
subsetName, err := r.getSuitableSubsetNameForPod(ws, pods[i], subsetMissingReplicas)
subsetName, err := r.getSuitableSubsetNameForPod(ws, pods[i], subsetMissingReplicas, subsetPatchFields)
if err != nil {
return nil, err
}
Expand All @@ -391,11 +402,12 @@ func (r *ReconcileWorkloadSpread) groupPod(ws *appsv1alpha1.WorkloadSpread, pods
}

// getSuitableSubsetNameForPod will return (FakeSubsetName, nil) if not found suitable subset for pod
func (r *ReconcileWorkloadSpread) getSuitableSubsetNameForPod(ws *appsv1alpha1.WorkloadSpread, pod *corev1.Pod, subsetMissingReplicas map[string]int) (string, error) {
func (r *ReconcileWorkloadSpread) getSuitableSubsetNameForPod(ws *appsv1alpha1.WorkloadSpread, pod *corev1.Pod,
subsetMissingReplicas map[string]int, subsetPatchFields map[string]*corev1.Pod) (string, error) {
injectWS := getInjectWorkloadSpreadFromPod(pod)
if injectWS == nil || injectWS.Name != ws.Name || injectWS.Subset == "" {
if isMatchedWS(injectWS, ws) {
// process the pods that were created before workloadSpread
matchedSubset, err := r.getAndUpdateSuitableSubsetName(ws, pod, subsetMissingReplicas)
matchedSubset, err := r.getAndUpdateSuitableSubsetName(ws, pod, subsetMissingReplicas, subsetPatchFields)
if err != nil {
return "", err
} else if matchedSubset == nil {
Expand All @@ -408,7 +420,8 @@ func (r *ReconcileWorkloadSpread) getSuitableSubsetNameForPod(ws *appsv1alpha1.W

// getSuitableSubsetForOldPod returns a suitable subset for the pod which was created before workloadSpread.
// getSuitableSubsetForOldPod will return (nil, nil) if there is no suitable subset for the pod.
func (r *ReconcileWorkloadSpread) getAndUpdateSuitableSubsetName(ws *appsv1alpha1.WorkloadSpread, pod *corev1.Pod, subsetMissingReplicas map[string]int) (*appsv1alpha1.WorkloadSpreadSubset, error) {
func (r *ReconcileWorkloadSpread) getAndUpdateSuitableSubsetName(ws *appsv1alpha1.WorkloadSpread, pod *corev1.Pod,
subsetMissingReplicas map[string]int, subsetPatchFields map[string]*corev1.Pod) (*appsv1alpha1.WorkloadSpreadSubset, error) {
if len(pod.Spec.NodeName) == 0 {
return nil, nil
}
Expand All @@ -426,7 +439,7 @@ func (r *ReconcileWorkloadSpread) getAndUpdateSuitableSubsetName(ws *appsv1alpha
for i := range ws.Spec.Subsets {
subset := &ws.Spec.Subsets[i]
// in case of that this pod was scheduled to the node which matches a subset of workloadSpread
matched, preferredScore, err := matchesSubset(pod, node, subset)
matched, preferredNodeScore, preferredPodScore, err := matchesSubset(pod, node, subset, subsetPatchFields[subset.Name])
if err != nil {
// requiredSelectorTerm field was validated at webhook stage, so this error should not occur
// this error should not be returned, because it is a non-transient error
Expand All @@ -438,7 +451,8 @@ func (r *ReconcileWorkloadSpread) getAndUpdateSuitableSubsetName(ws *appsv1alpha
if subsetMissingReplicas[subset.Name] > 0 {
quotaScore = int64(1)
}
finalScore := preferredScore*10 + quotaScore
// preferredPodScore is in [0, 2], so it cannot affect preferredNodeScore in the following expression
finalScore := preferredNodeScore*100 + preferredPodScore*10 + quotaScore
// select the most favorite subsets for the pod by subset.PreferredNodeSelectorTerms
if matched && finalScore > maxPreferredScore {
favoriteSubset = subset
Expand Down Expand Up @@ -753,3 +767,20 @@ func (r *ReconcileWorkloadSpread) writeWorkloadSpreadStatus(ws *appsv1alpha1.Wor
func getWorkloadSpreadKey(o metav1.Object) string {
return o.GetNamespace() + "/" + o.GetName()
}

func existsSolitaryPods(pods []*corev1.Pod, ws *appsv1alpha1.WorkloadSpread) bool {
for _, pod := range pods {
injectWS := getInjectWorkloadSpreadFromPod(pod)
if !isMatchedWS(injectWS, ws) {
return true
}
}
return false
}

func isMatchedWS(injectWS *wsutil.InjectWorkloadSpread, ws *appsv1alpha1.WorkloadSpread) bool {
if injectWS == nil || injectWS.Name != ws.Name || injectWS.Subset == "" {
return false
}
return true
}
47 changes: 41 additions & 6 deletions pkg/controller/workloadspread/workloadspread_controller_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,18 +85,53 @@ func filterOutCondition(conditions []appsv1alpha1.WorkloadSpreadSubsetCondition,
return newConditions
}

func matchesSubset(pod *corev1.Pod, node *corev1.Node, subset *appsv1alpha1.WorkloadSpreadSubset) (bool, int64, error) {
func matchesSubset(pod *corev1.Pod, node *corev1.Node, subset *appsv1alpha1.WorkloadSpreadSubset, patch *corev1.Pod) (bool, int64, int64, error) {
matched, err := matchesSubsetRequiredAndToleration(pod, node, subset)
if err != nil || !matched {
return false, -1, err
return false, -1, -1, err
}

if len(subset.PreferredNodeSelectorTerms) == 0 {
return matched, 0, nil
// preferredNodeScore is in [0, total_prefer_weight]
preferredNodeScore := int64(0)
if subset.PreferredNodeSelectorTerms != nil {
nodePreferredTerms, _ := nodeaffinity.NewPreferredSchedulingTerms(subset.PreferredNodeSelectorTerms)
preferredNodeScore = nodePreferredTerms.Score(node)
}

nodePreferredTerms, _ := nodeaffinity.NewPreferredSchedulingTerms(subset.PreferredNodeSelectorTerms)
return matched, nodePreferredTerms.Score(node), nil
// preferredPodScore is in [0, 2]
preferredPodScore := int64(0)
if subset.Patch.Raw != nil {
preferredPodScore = podPreferredScore(subset, patch, pod)
}

return matched, preferredNodeScore, preferredPodScore, nil
}

func podPreferredScore(subset *appsv1alpha1.WorkloadSpreadSubset, patch, pod *corev1.Pod) int64 {
if subset.Patch.Raw == nil || patch == nil {
return 0
}
// return true if a contains b
isContained := func(a, b map[string]string) bool {
for k, v := range b {
if a[k] != v {
return false
}
}
return true
}

// currently, we only consider labels and annotations
preferredPodScore := int64(0)
if isContained(pod.Labels, patch.Labels) {
preferredPodScore++
}

if isContained(pod.Annotations, patch.Annotations) {
preferredPodScore++
}

return preferredPodScore
}

func matchesSubsetRequiredAndToleration(pod *corev1.Pod, node *corev1.Node, subset *appsv1alpha1.WorkloadSpreadSubset) (bool, error) {
Expand Down
Loading

0 comments on commit 60a745a

Please sign in to comment.