Skip to content

Commit

Permalink
Merge pull request #207 from Huang-Wei/cleanup-coscheduling
Browse files Browse the repository at this point in the history
standardize error and state of return values of Permit
  • Loading branch information
k8s-ci-robot authored Nov 17, 2021
2 parents 6e58632 + 7de4102 commit 478a9cb
Show file tree
Hide file tree
Showing 4 changed files with 54 additions and 62 deletions.
29 changes: 20 additions & 9 deletions pkg/coscheduling/core/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,22 @@ import (
"sigs.k8s.io/scheduler-plugins/pkg/util"
)

type Status string

const (
// PodGroupNotSpecified denotes no PodGroup is specified in the Pod spec.
PodGroupNotSpecified Status = "PodGroup not specified"
// PodGroupNotFound denotes the specified PodGroup in the Pod spec is
// not found in API server.
PodGroupNotFound Status = "PodGroup not found"
Success Status = "Success"
Wait Status = "Wait"
)

// Manager defines the interfaces for PodGroup management.
type Manager interface {
PreFilter(context.Context, *corev1.Pod) error
Permit(context.Context, *corev1.Pod, string) (bool, error)
Permit(context.Context, *corev1.Pod) Status
PostBind(context.Context, *corev1.Pod, string)
GetPodGroup(*corev1.Pod) (string, *v1alpha1.PodGroup)
GetCreationTimestamp(*corev1.Pod, time.Time) time.Time
Expand Down Expand Up @@ -147,24 +159,23 @@ func (pgMgr *PodGroupManager) PreFilter(ctx context.Context, pod *corev1.Pod) er
}

// Permit permits a pod to run, if the minMember match, it would send a signal to chan.
func (pgMgr *PodGroupManager) Permit(ctx context.Context, pod *corev1.Pod, nodeName string) (bool, error) {
func (pgMgr *PodGroupManager) Permit(ctx context.Context, pod *corev1.Pod) Status {
pgFullName, pg := pgMgr.GetPodGroup(pod)
if pgFullName == "" {
return true, util.ErrorNotMatched
return PodGroupNotSpecified
}
if pg == nil {
// A Pod with a podGroup name but without a PodGroup found is denied.
return false, fmt.Errorf("PodGroup not found")
return PodGroupNotFound
}

assigned := pgMgr.CalculateAssignedPods(pg.Name, pg.Namespace)
// The number of pods that have been assigned nodes is calculated from the snapshot.
// The current pod in not included in the snapshot during the current scheduling cycle.
ready := int32(assigned)+1 >= pg.Spec.MinMember
if ready {
return true, nil
if int32(assigned)+1 >= pg.Spec.MinMember {
return Success
}
return false, util.ErrorWaiting
return Wait
}

// PostBind updates a PodGroup's status.
Expand Down Expand Up @@ -225,7 +236,7 @@ func (pgMgr *PodGroupManager) AddDeniedPodGroup(pgFullName string) {
pgMgr.lastDeniedPG.Add(pgFullName, "", *pgMgr.lastDeniedPGExpirationTime)
}

// DeletePodGroup delete a podGroup that pass Pre-Filter but reach PostFilter.
// DeletePermittedPodGroup deletes a podGroup that pass Pre-Filter but reach PostFilter.
func (pgMgr *PodGroupManager) DeletePermittedPodGroup(pgFullName string) {
pgMgr.permittedPG.Delete(pgFullName)
}
Expand Down
23 changes: 11 additions & 12 deletions pkg/coscheduling/core/core_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,37 +198,36 @@ func TestPermit(t *testing.T) {
name string
pod *corev1.Pod
snapshot framework.SharedLister
allow bool
want Status
}{
{
name: "pod does not belong to any pg, allow",
pod: st.MakePod().Name("p").UID("p").Namespace("ns1").Obj(),
allow: true,
name: "pod does not belong to any pg, allow",
pod: st.MakePod().Name("p").UID("p").Namespace("ns1").Obj(),
want: PodGroupNotSpecified,
},
{
name: "pod belongs to pg, a non-existing pg",
pod: st.MakePod().Name("p").UID("p").Namespace("ns1").Label(util.PodGroupLabel, "pg-noexist").Obj(),
allow: false,
name: "pod belongs to a non-existing pg",
pod: st.MakePod().Name("p").UID("p").Namespace("ns1").Label(util.PodGroupLabel, "pg-noexist").Obj(),
want: PodGroupNotFound,
},
{
name: "pod belongs to a pg that doesn't have enough pods",
pod: st.MakePod().Name("p").UID("p").Namespace("ns1").Label(util.PodGroupLabel, "pg1").Obj(),
snapshot: testutil.NewFakeSharedLister([]*corev1.Pod{}, []*corev1.Node{}),
allow: false,
want: Wait,
},
{
name: "pod belongs to a pg that has enough pods",
pod: st.MakePod().Name("p").UID("p").Namespace("ns1").Label(util.PodGroupLabel, "pg1").Obj(),
snapshot: snapshot,
allow: true,
want: Success,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
pgMgr := &PodGroupManager{pgClient: fakeClient, pgLister: pgLister, scheduleTimeout: &timeout, snapshotSharedLister: tt.snapshot}
allow, err := pgMgr.Permit(ctx, tt.pod, "test")
if allow != tt.allow {
t.Errorf("want %v, but got %v. err: %v", tt.allow, allow, err)
if got := pgMgr.Permit(ctx, tt.pod); got != tt.want {
t.Errorf("Expect %v, but got %v", tt.want, got)
}
})
}
Expand Down
53 changes: 23 additions & 30 deletions pkg/coscheduling/coscheduling.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,41 +200,35 @@ func (cs *Coscheduling) PreFilterExtensions() framework.PreFilterExtensions {

// Permit is the functions invoked by the framework at "Permit" extension point.
func (cs *Coscheduling) Permit(ctx context.Context, state *framework.CycleState, pod *v1.Pod, nodeName string) (*framework.Status, time.Duration) {
fullName := util.GetPodGroupFullName(pod)
if len(fullName) == 0 {
return framework.NewStatus(framework.Success, ""), 0
}
waitTime := *cs.scheduleTimeout
ready, err := cs.pgMgr.Permit(ctx, pod, nodeName)
if err != nil {
s := cs.pgMgr.Permit(ctx, pod)
var retStatus *framework.Status
switch s {
case core.PodGroupNotSpecified:
return framework.NewStatus(framework.Success, ""), 0
case core.PodGroupNotFound:
return framework.NewStatus(framework.Unschedulable, "PodGroup not found"), 0
case core.Wait:
klog.InfoS("Pod is waiting to be scheduled to node", "pod", klog.KObj(pod), "nodeName", nodeName)
_, pg := cs.pgMgr.GetPodGroup(pod)
if pg == nil {
return framework.NewStatus(framework.Unschedulable, "PodGroup not found"), 0
}
if wait := util.GetWaitTimeDuration(pg, cs.scheduleTimeout); wait != 0 {
waitTime = wait
}
if err == util.ErrorWaiting {
klog.InfoS("Pod is waiting to be scheduled to node", "pod", klog.KObj(pod), "node", nodeName)
return framework.NewStatus(framework.Wait, ""), waitTime
}
klog.ErrorS(err, "Permit error")
return framework.NewStatus(framework.Unschedulable, err.Error()), 0
retStatus = framework.NewStatus(framework.Wait)
case core.Success:
pgFullName := util.GetPodGroupFullName(pod)
cs.frameworkHandler.IterateOverWaitingPods(func(waitingPod framework.WaitingPod) {
if util.GetPodGroupFullName(waitingPod.GetPod()) == pgFullName {
klog.V(3).InfoS("Permit allows", "pod", klog.KObj(waitingPod.GetPod()))
waitingPod.Allow(cs.Name())
}
})
klog.V(3).InfoS("Permit allows", "pod", klog.KObj(pod))
retStatus = framework.NewStatus(framework.Success)
waitTime = 0
}

klog.V(5).InfoS("Pod requires pgName", "pod", klog.KObj(pod), "podGroup", fullName)
if !ready {
return framework.NewStatus(framework.Wait, ""), waitTime
}

cs.frameworkHandler.IterateOverWaitingPods(func(waitingPod framework.WaitingPod) {
if util.GetPodGroupFullName(waitingPod.GetPod()) == fullName {
klog.V(3).InfoS("Permit allows", "pod", klog.KObj(waitingPod.GetPod()))
waitingPod.Allow(cs.Name())
}
})
klog.V(3).InfoS("Permit allows", "pod", klog.KObj(pod))
return framework.NewStatus(framework.Success, ""), 0
return retStatus, waitTime
}

// Reserve is the functions invoked by the framework at "reserve" extension point.
Expand Down Expand Up @@ -277,8 +271,7 @@ func (cs *Coscheduling) getStateKey() framework.StateKey {
return framework.StateKey(fmt.Sprintf("Prefilter-%v", cs.Name()))
}

type noopStateData struct {
}
type noopStateData struct{}

func NewNoopStateData() framework.StateData {
return &noopStateData{}
Expand Down
11 changes: 0 additions & 11 deletions pkg/util/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,18 +16,7 @@ limitations under the License.

package util

import "fmt"

const (
// PodGroupLabel is the default label of coscheduling
PodGroupLabel = "pod-group.scheduling.sigs.k8s.io"
)

var (
// ErrorNotMatched means pod does not match coscheduling
ErrorNotMatched = fmt.Errorf("not match coscheduling")
// ErrorWaiting means pod number does not match the min pods required
ErrorWaiting = fmt.Errorf("waiting")
// ErrorResourceNotEnough means cluster resource is not enough, mainly used in Pre-Filter
ErrorResourceNotEnough = fmt.Errorf("resource not enough")
)

0 comments on commit 478a9cb

Please sign in to comment.