Skip to content

Commit

Permalink
Optimize container launch priority performance
Browse files Browse the repository at this point in the history
Signed-off-by: FillZpp <[email protected]>
  • Loading branch information
FillZpp committed Jan 22, 2024
1 parent 30a660b commit d2ac892
Show file tree
Hide file tree
Showing 6 changed files with 599 additions and 71 deletions.
4 changes: 4 additions & 0 deletions apis/apps/pub/launch_priority.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,4 +29,8 @@ const (
ContainerLaunchPriorityKey = "apps.kruise.io/container-launch-priority"
// ContainerLaunchOrdered is the annotation value that indicates containers in pod should be launched by ordinal.
ContainerLaunchOrdered = "Ordered"

// ContainerLaunchPriorityCompletedKey is the annotation indicates the pod has all its priorities
// patched into its barrier configmap.
ContainerLaunchPriorityCompletedKey = "apps.kruise.io/container-launch-priority-completed"
)
Original file line number Diff line number Diff line change
Expand Up @@ -19,18 +19,16 @@ package containerlauchpriority
import (
"context"
"fmt"
"strconv"
"sort"
"time"

"github.com/openkruise/kruise/pkg/util"
utilclient "github.com/openkruise/kruise/pkg/util/client"
utilcontainerlaunchpriority "github.com/openkruise/kruise/pkg/util/containerlaunchpriority"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/tools/record"
"k8s.io/klog/v2"
"k8s.io/kube-openapi/pkg/util/sets"
podutil "k8s.io/kubernetes/pkg/api/v1/pod"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller"
Expand All @@ -40,6 +38,11 @@ import (
"sigs.k8s.io/controller-runtime/pkg/predicate"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
"sigs.k8s.io/controller-runtime/pkg/source"

appspub "github.com/openkruise/kruise/apis/apps/pub"
"github.com/openkruise/kruise/pkg/util"
utilclient "github.com/openkruise/kruise/pkg/util/client"
utilcontainerlaunchpriority "github.com/openkruise/kruise/pkg/util/containerlaunchpriority"
)

const (
Expand Down Expand Up @@ -70,15 +73,11 @@ func add(mgr manager.Manager, r *ReconcileContainerLaunchPriority) error {
err = c.Watch(&source.Kind{Type: &v1.Pod{}}, &handler.EnqueueRequestForObject{}, predicate.Funcs{
CreateFunc: func(e event.CreateEvent) bool {
pod := e.Object.(*v1.Pod)
_, containersReady := podutil.GetPodCondition(&pod.Status, v1.ContainersReady)
// If in vk scenario, there will be not containerReady condition
return utilcontainerlaunchpriority.ExistsPriorities(pod) && (containersReady == nil || containersReady.Status != v1.ConditionTrue)
return shouldEnqueue(pod, mgr.GetCache())

Check warning on line 76 in pkg/controller/containerlaunchpriority/container_launch_priority_controller.go

View check run for this annotation

Codecov / codecov/patch

pkg/controller/containerlaunchpriority/container_launch_priority_controller.go#L76

Added line #L76 was not covered by tests
},
UpdateFunc: func(e event.UpdateEvent) bool {
pod := e.ObjectNew.(*v1.Pod)
_, containersReady := podutil.GetPodCondition(&pod.Status, v1.ContainersReady)
// If in vk scenario, there will be not containerReady condition
return utilcontainerlaunchpriority.ExistsPriorities(pod) && (containersReady == nil || containersReady.Status != v1.ConditionTrue)
return shouldEnqueue(pod, mgr.GetCache())

Check warning on line 80 in pkg/controller/containerlaunchpriority/container_launch_priority_controller.go

View check run for this annotation

Codecov / codecov/patch

pkg/controller/containerlaunchpriority/container_launch_priority_controller.go#L80

Added line #L80 was not covered by tests
},
DeleteFunc: func(e event.DeleteEvent) bool {
return false
Expand All @@ -94,6 +93,30 @@ func add(mgr manager.Manager, r *ReconcileContainerLaunchPriority) error {
return nil
}

func shouldEnqueue(pod *v1.Pod, r client.Reader) bool {
if pod.Annotations[appspub.ContainerLaunchPriorityCompletedKey] == "true" {
return false
}
if _, containersReady := podutil.GetPodCondition(&pod.Status, v1.ContainersReady); containersReady != nil && containersReady.Status == v1.ConditionTrue {
return false
}

Check warning on line 102 in pkg/controller/containerlaunchpriority/container_launch_priority_controller.go

View check run for this annotation

Codecov / codecov/patch

pkg/controller/containerlaunchpriority/container_launch_priority_controller.go#L101-L102

Added lines #L101 - L102 were not covered by tests

nextPriorities := findNextPriorities(pod)
if len(nextPriorities) == 0 {
return false
}

Check warning on line 107 in pkg/controller/containerlaunchpriority/container_launch_priority_controller.go

View check run for this annotation

Codecov / codecov/patch

pkg/controller/containerlaunchpriority/container_launch_priority_controller.go#L106-L107

Added lines #L106 - L107 were not covered by tests

var barrier = &v1.ConfigMap{}
var barrierNamespacedName = types.NamespacedName{
Namespace: pod.GetNamespace(),
Name: pod.Name + "-barrier",
}
if err := r.Get(context.TODO(), barrierNamespacedName, barrier); err != nil {
return true
}

Check warning on line 116 in pkg/controller/containerlaunchpriority/container_launch_priority_controller.go

View check run for this annotation

Codecov / codecov/patch

pkg/controller/containerlaunchpriority/container_launch_priority_controller.go#L115-L116

Added lines #L115 - L116 were not covered by tests
return !isExistsInBarrier(nextPriorities[len(nextPriorities)-1], barrier)
}

var _ reconcile.Reconciler = &ReconcileContainerLaunchPriority{}

// ReconcileContainerLaunchPriority reconciles a Pod object
Expand Down Expand Up @@ -151,52 +174,73 @@ func (r *ReconcileContainerLaunchPriority) Reconcile(_ context.Context, request
return reconcile.Result{}, err
}

// set next starting containers
_, containersReady := podutil.GetPodCondition(&pod.Status, v1.ContainersReady)
if containersReady != nil && containersReady.Status != v1.ConditionTrue {
patchKey := r.findNextPatchKey(pod)
if patchKey == nil {
return reconcile.Result{}, nil
}
key := "p_" + strconv.Itoa(*patchKey)
if err = r.patchOnKeyNotExist(barrier, key); err != nil {
return reconcile.Result{}, err
}
// handle the pod and barrier
if err = r.handle(pod, barrier); err != nil {
return reconcile.Result{}, err

Check warning on line 179 in pkg/controller/containerlaunchpriority/container_launch_priority_controller.go

View check run for this annotation

Codecov / codecov/patch

pkg/controller/containerlaunchpriority/container_launch_priority_controller.go#L179

Added line #L179 was not covered by tests
}

return reconcile.Result{}, nil
}

func (r *ReconcileContainerLaunchPriority) findNextPatchKey(pod *v1.Pod) *int {
var priority *int
var containerPendingSet = make(map[string]bool)
func (r *ReconcileContainerLaunchPriority) handle(pod *v1.Pod, barrier *v1.ConfigMap) error {
nextPriorities := findNextPriorities(pod)

// If there is no more priorities, or the lowest priority exists in barrier, mask as completed.
if len(nextPriorities) == 0 || isExistsInBarrier(nextPriorities[0], barrier) {
return r.patchCompleted(pod)
}

// Try to add the current priority if not exists.
if !isExistsInBarrier(nextPriorities[len(nextPriorities)-1], barrier) {
if err := r.addPriorityIntoBarrier(barrier, nextPriorities[len(nextPriorities)-1]); err != nil {
return err
}

Check warning on line 197 in pkg/controller/containerlaunchpriority/container_launch_priority_controller.go

View check run for this annotation

Codecov / codecov/patch

pkg/controller/containerlaunchpriority/container_launch_priority_controller.go#L196-L197

Added lines #L196 - L197 were not covered by tests
}

// After adding the current priority, if the lowest priority is same to the current one, mark as completed.
if nextPriorities[len(nextPriorities)-1] == nextPriorities[0] {
return r.patchCompleted(pod)
}
return nil
}

func (r *ReconcileContainerLaunchPriority) addPriorityIntoBarrier(barrier *v1.ConfigMap, priority int) error {
klog.V(3).Infof("Adding priority %d into barrier %s/%s", priority, barrier.Namespace, barrier.Name)
body := fmt.Sprintf(`{"data":{"%s":"true"}}`, utilcontainerlaunchpriority.GetKey(priority))
return r.Client.Patch(context.TODO(), barrier, client.RawPatch(types.StrategicMergePatchType, []byte(body)))
}

func (r *ReconcileContainerLaunchPriority) patchCompleted(pod *v1.Pod) error {
klog.V(3).Infof("Marking pod %s/%s as launch priority completed", pod.Namespace, pod.Name)
body := fmt.Sprintf(`{"metadata":{"annotations":{"%s":"true"}}}`, appspub.ContainerLaunchPriorityCompletedKey)
return r.Client.Patch(context.TODO(), pod, client.RawPatch(types.StrategicMergePatchType, []byte(body)))
}

func findNextPriorities(pod *v1.Pod) (priorities []int) {
containerReadySet := sets.NewString()
for _, status := range pod.Status.ContainerStatuses {
if status.Ready {
continue
containerReadySet.Insert(status.Name)
}
containerPendingSet[status.Name] = true
}
for _, c := range pod.Spec.Containers {
if _, ok := containerPendingSet[c.Name]; ok {
p := utilcontainerlaunchpriority.GetContainerPriority(&c)
if p == nil {
continue
}
if priority == nil || *p > *priority {
priority = p
}
if containerReadySet.Has(c.Name) {
continue
}
priority := utilcontainerlaunchpriority.GetContainerPriority(&c)
if priority == nil {
continue
}

priorities = append(priorities, *priority)
}
if len(priorities) > 0 {
sort.Ints(priorities)
}
return priority
return
}

func (r *ReconcileContainerLaunchPriority) patchOnKeyNotExist(barrier *v1.ConfigMap, key string) error {
if _, ok := barrier.Data[key]; !ok {
body := fmt.Sprintf(
`{"data":{"%s":"true"}}`,
key,
)
return r.Client.Patch(context.TODO(), barrier, client.RawPatch(types.StrategicMergePatchType, []byte(body)))
}
return nil
func isExistsInBarrier(priority int, barrier *v1.ConfigMap) bool {
_, exists := barrier.Data[utilcontainerlaunchpriority.GetKey(priority)]
return exists
}
Loading

0 comments on commit d2ac892

Please sign in to comment.