Skip to content
This repository has been archived by the owner on May 25, 2023. It is now read-only.

Added name when register plugin. #549

Merged
merged 1 commit into from
Jan 12, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions pkg/scheduler/framework/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@ type Action interface {
}

type Plugin interface {
// The unique name of Plugin.
Name() string

OnSessionOpen(ssn *Session)
OnSessionClose(ssn *Session)
}
Expand Down
96 changes: 58 additions & 38 deletions pkg/scheduler/framework/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,14 +45,14 @@ type Session struct {

plugins []Plugin
eventHandlers []*EventHandler
jobOrderFns []api.CompareFn
queueOrderFns []api.CompareFn
taskOrderFns []api.CompareFn
predicateFns []api.PredicateFn
preemptableFns []api.PreemptableFn
reclaimableFns []api.ReclaimableFn
overusedFns []api.ValidateFn
jobReadyFns []api.ValidateFn
jobOrderFns map[string]api.CompareFn
queueOrderFns map[string]api.CompareFn
taskOrderFns map[string]api.CompareFn
predicateFns map[string]api.PredicateFn
preemptableFns map[string]api.PreemptableFn
reclaimableFns map[string]api.ReclaimableFn
overusedFns map[string]api.ValidateFn
jobReadyFns map[string]api.ValidateFn
}

func openSession(cache cache.Cache) *Session {
Expand All @@ -62,6 +62,15 @@ func openSession(cache cache.Cache) *Session {
JobIndex: map[api.JobID]*api.JobInfo{},
NodeIndex: map[string]*api.NodeInfo{},
QueueIndex: map[api.QueueID]*api.QueueInfo{},

jobOrderFns: map[string]api.CompareFn{},
queueOrderFns: map[string]api.CompareFn{},
taskOrderFns: map[string]api.CompareFn{},
predicateFns: map[string]api.PredicateFn{},
preemptableFns: map[string]api.PreemptableFn{},
reclaimableFns: map[string]api.ReclaimableFn{},
overusedFns: map[string]api.ValidateFn{},
jobReadyFns: map[string]api.ValidateFn{},
}

snapshot := cache.Snapshot()
Expand Down Expand Up @@ -213,13 +222,15 @@ func (ssn *Session) dispatch(task *api.TaskInfo) error {

func (ssn *Session) Reclaimable(reclaimer *api.TaskInfo, reclaimees []*api.TaskInfo) []*api.TaskInfo {
var victims []*api.TaskInfo
var init bool

for _, rf := range ssn.reclaimableFns {
candidates := rf(reclaimer, reclaimees)
if victims == nil {
if !init {
victims = candidates
init = true
} else {
intersection := []*api.TaskInfo{}
var intersection []*api.TaskInfo
// Get intersection of victims and candidates.
for _, v := range victims {
for _, c := range candidates {
Expand Down Expand Up @@ -256,7 +267,10 @@ func (ssn *Session) Evict(reclaimee *api.TaskInfo, reason string) error {

// Update task in node.
if node, found := ssn.NodeIndex[reclaimee.NodeName]; found {
node.UpdateTask(reclaimee)
if err := node.UpdateTask(reclaimee); err != nil {
glog.Errorf("Failed to update task <%v/%v> in Session <%v>: %v",
reclaimee.Namespace, reclaimee.Name, ssn.UID, err)
}
}

for _, eh := range ssn.eventHandlers {
Expand All @@ -275,22 +289,28 @@ func (ssn *Session) Preemptable(preemptor *api.TaskInfo, preemptees []*api.TaskI
return nil
}

victims := ssn.preemptableFns[0](preemptor, preemptees)
for _, pf := range ssn.preemptableFns[1:] {
intersection := []*api.TaskInfo{}
var victims []*api.TaskInfo
var init bool

for _, pf := range ssn.preemptableFns {
candidates := pf(preemptor, preemptees)
// Get intersection of victims and candidates.
for _, v := range victims {
for _, c := range candidates {
if v.UID == c.UID {
intersection = append(intersection, v)
if !init {
victims = candidates
init = true
} else {
var intersection []*api.TaskInfo
// Get intersection of victims and candidates.
for _, v := range victims {
for _, c := range candidates {
if v.UID == c.UID {
intersection = append(intersection, v)
}
}
}
}

// Update victims to intersection
victims = intersection
// Update victims to intersection
victims = intersection
}
}

return victims
Expand Down Expand Up @@ -324,36 +344,36 @@ func (ssn *Session) AddEventHandler(eh *EventHandler) {
ssn.eventHandlers = append(ssn.eventHandlers, eh)
}

func (ssn *Session) AddJobOrderFn(cf api.CompareFn) {
ssn.jobOrderFns = append(ssn.jobOrderFns, cf)
func (ssn *Session) AddJobOrderFn(name string, cf api.CompareFn) {
ssn.jobOrderFns[name] = cf
}

func (ssn *Session) AddQueueOrderFn(qf api.CompareFn) {
ssn.queueOrderFns = append(ssn.queueOrderFns, qf)
func (ssn *Session) AddQueueOrderFn(name string, qf api.CompareFn) {
ssn.queueOrderFns[name] = qf
}

func (ssn *Session) AddTaskOrderFn(cf api.CompareFn) {
ssn.taskOrderFns = append(ssn.taskOrderFns, cf)
func (ssn *Session) AddTaskOrderFn(name string, cf api.CompareFn) {
ssn.taskOrderFns[name] = cf
}

func (ssn *Session) AddPreemptableFn(cf api.PreemptableFn) {
ssn.preemptableFns = append(ssn.preemptableFns, cf)
func (ssn *Session) AddPreemptableFn(name string, cf api.PreemptableFn) {
ssn.preemptableFns[name] = cf
}

func (ssn *Session) AddReclaimableFn(rf api.ReclaimableFn) {
ssn.reclaimableFns = append(ssn.reclaimableFns, rf)
func (ssn *Session) AddReclaimableFn(name string, rf api.ReclaimableFn) {
ssn.reclaimableFns[name] = rf
}

func (ssn *Session) AddJobReadyFn(vf api.ValidateFn) {
ssn.jobReadyFns = append(ssn.jobReadyFns, vf)
func (ssn *Session) AddJobReadyFn(name string, vf api.ValidateFn) {
ssn.jobReadyFns[name] = vf
}

func (ssn *Session) AddPredicateFn(pf api.PredicateFn) {
ssn.predicateFns = append(ssn.predicateFns, pf)
func (ssn *Session) AddPredicateFn(name string, pf api.PredicateFn) {
ssn.predicateFns[name] = pf
}

func (ssn *Session) AddOverusedFn(fn api.ValidateFn) {
ssn.overusedFns = append(ssn.overusedFns, fn)
func (ssn *Session) AddOverusedFn(name string, fn api.ValidateFn) {
ssn.overusedFns[name] = fn
}

func (ssn *Session) Overused(queue *api.QueueInfo) bool {
Expand Down
10 changes: 2 additions & 8 deletions pkg/scheduler/plugins/drf/drf.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,10 +107,7 @@ func (drf *drfPlugin) OnSessionOpen(ssn *framework.Session) {
return victims
}

if drf.args.PreemptableFnEnabled {
// Add Preemptable function.
ssn.AddPreemptableFn(preemptableFn)
}
ssn.AddPreemptableFn(drf.Name(), preemptableFn)

jobOrderFn := func(l interface{}, r interface{}) int {
lv := l.(*api.JobInfo)
Expand All @@ -130,10 +127,7 @@ func (drf *drfPlugin) OnSessionOpen(ssn *framework.Session) {
return 1
}

if drf.args.JobOrderFnEnabled {
// Add Job Order function.
ssn.AddJobOrderFn(jobOrderFn)
}
ssn.AddJobOrderFn(drf.Name(), jobOrderFn)

// Register event handlers.
ssn.AddEventHandler(&framework.EventHandler{
Expand Down
20 changes: 8 additions & 12 deletions pkg/scheduler/plugins/gang/gang.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,10 @@ func New(args *framework.PluginArgs) framework.Plugin {
}
}

func (gp *gangPlugin) Name() string {
return "gang"
}

// readyTaskNum return the number of tasks that are ready to run.
func readyTaskNum(job *api.JobInfo) int32 {
occupid := 0
Expand Down Expand Up @@ -102,11 +106,8 @@ func (gp *gangPlugin) OnSessionOpen(ssn *framework.Session) {
}

// TODO(k82cn): Support preempt/reclaim batch job.
ssn.AddReclaimableFn(preemptableFn)

if gp.args.PreemptableFnEnabled {
ssn.AddPreemptableFn(preemptableFn)
}
ssn.AddReclaimableFn(gp.Name(), preemptableFn)
ssn.AddPreemptableFn(gp.Name(), preemptableFn)

jobOrderFn := func(l, r interface{}) int {
lv := l.(*api.JobInfo)
Expand Down Expand Up @@ -144,13 +145,8 @@ func (gp *gangPlugin) OnSessionOpen(ssn *framework.Session) {
return 0
}

if gp.args.JobOrderFnEnabled {
ssn.AddJobOrderFn(jobOrderFn)
}

if gp.args.JobReadyFnEnabled {
ssn.AddJobReadyFn(jobReady)
}
ssn.AddJobOrderFn(gp.Name(), jobOrderFn)
ssn.AddJobReadyFn(gp.Name(), jobReady)
}

func (gp *gangPlugin) OnSessionClose(ssn *framework.Session) {
Expand Down
14 changes: 9 additions & 5 deletions pkg/scheduler/plugins/predicates/predicates.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,16 +31,20 @@ import (
"github.com/kubernetes-sigs/kube-batch/pkg/scheduler/framework"
)

type nodeAffinityPlugin struct {
type predicatesPlugin struct {
args *framework.PluginArgs
}

func New(args *framework.PluginArgs) framework.Plugin {
return &nodeAffinityPlugin{
return &predicatesPlugin{
args: args,
}
}

func (pp *predicatesPlugin) Name() string {
return "predicates"
}

type podLister struct {
session *framework.Session
}
Expand Down Expand Up @@ -108,7 +112,7 @@ func CheckNodeUnschedulable(pod *v1.Pod, nodeInfo *cache.NodeInfo) (bool, []algo
return true, nil, nil
}

func (pp *nodeAffinityPlugin) OnSessionOpen(ssn *framework.Session) {
func (pp *predicatesPlugin) OnSessionOpen(ssn *framework.Session) {
pl := &podLister{
session: ssn,
}
Expand All @@ -117,7 +121,7 @@ func (pp *nodeAffinityPlugin) OnSessionOpen(ssn *framework.Session) {
session: ssn,
}

ssn.AddPredicateFn(func(task *api.TaskInfo, node *api.NodeInfo) error {
ssn.AddPredicateFn(pp.Name(), func(task *api.TaskInfo, node *api.NodeInfo) error {
nodeInfo := cache.NewNodeInfo(node.Pods()...)
nodeInfo.SetNode(node.Node)

Expand Down Expand Up @@ -200,4 +204,4 @@ func (pp *nodeAffinityPlugin) OnSessionOpen(ssn *framework.Session) {
})
}

func (pp *nodeAffinityPlugin) OnSessionClose(ssn *framework.Session) {}
func (pp *predicatesPlugin) OnSessionClose(ssn *framework.Session) {}
13 changes: 6 additions & 7 deletions pkg/scheduler/plugins/priority/priority.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,10 @@ func New(args *framework.PluginArgs) framework.Plugin {
}
}

func (pp *priorityPlugin) Name() string {
return "priority"
}

func (pp *priorityPlugin) OnSessionOpen(ssn *framework.Session) {
taskOrderFn := func(l interface{}, r interface{}) int {
lv := l.(*api.TaskInfo)
Expand All @@ -52,9 +56,7 @@ func (pp *priorityPlugin) OnSessionOpen(ssn *framework.Session) {
}

// Add Task Order function
if pp.args.TaskOrderFnEnabled {
ssn.AddTaskOrderFn(taskOrderFn)
}
ssn.AddTaskOrderFn(pp.Name(), taskOrderFn)

jobOrderFn := func(l, r interface{}) int {
lv := l.(*api.JobInfo)
Expand All @@ -74,10 +76,7 @@ func (pp *priorityPlugin) OnSessionOpen(ssn *framework.Session) {
return 0
}

if pp.args.JobOrderFnEnabled {
// Add Job Order function
ssn.AddJobOrderFn(jobOrderFn)
}
ssn.AddJobOrderFn(pp.Name(), jobOrderFn)
}

func (pp *priorityPlugin) OnSessionClose(ssn *framework.Session) {}
10 changes: 7 additions & 3 deletions pkg/scheduler/plugins/proportion/proportion.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,10 @@ func New(args *framework.PluginArgs) framework.Plugin {
}
}

func (pp *proportionPlugin) Name() string {
return "proportion"
}

func (pp *proportionPlugin) OnSessionOpen(ssn *framework.Session) {
// Prepare scheduling data for this session.
for _, n := range ssn.Nodes {
Expand Down Expand Up @@ -141,7 +145,7 @@ func (pp *proportionPlugin) OnSessionOpen(ssn *framework.Session) {
}
}

ssn.AddQueueOrderFn(func(l, r interface{}) int {
ssn.AddQueueOrderFn(pp.Name(), func(l, r interface{}) int {
lv := l.(*api.QueueInfo)
rv := r.(*api.QueueInfo)

Expand All @@ -156,7 +160,7 @@ func (pp *proportionPlugin) OnSessionOpen(ssn *framework.Session) {
return 1
})

ssn.AddReclaimableFn(func(reclaimer *api.TaskInfo, reclaimees []*api.TaskInfo) []*api.TaskInfo {
ssn.AddReclaimableFn(pp.Name(), func(reclaimer *api.TaskInfo, reclaimees []*api.TaskInfo) []*api.TaskInfo {
var victims []*api.TaskInfo
allocations := map[api.QueueID]*api.Resource{}

Expand All @@ -183,7 +187,7 @@ func (pp *proportionPlugin) OnSessionOpen(ssn *framework.Session) {
return victims
})

ssn.AddOverusedFn(func(obj interface{}) bool {
ssn.AddOverusedFn(pp.Name(), func(obj interface{}) bool {
queue := obj.(*api.QueueInfo)
attr := pp.queueOpts[queue.UID]

Expand Down