From 83d36526a6e588d630a6365756a05a6f09d166ca Mon Sep 17 00:00:00 2001 From: Aaron Lehmann Date: Mon, 13 Mar 2017 13:54:43 -0700 Subject: [PATCH] scheduler: Fix accounting when task ends up in multiple groups Tasks are split between task groups based on common specs. This allows nodes to only be ranked once per group, not once per task. This logic doesn't work correctly because maps are marshalled in a random order. Currently, the same task can end up in a multiple groups (say, if it's updated multiple times, and the marshalling ends up being different). To make sure we don't try to schedule the same task twice within the same batch, use a map for unassignedTasks instead of a linked list. Note this doesn't fix the brokenness of task spec deduplication based on marshalling the protobuf. This is a fix for the symptom that can be backported, and I'm going to replace the marshalling stuff in a different PR. Signed-off-by: Aaron Lehmann --- manager/scheduler/scheduler.go | 26 ++++++++++++++------------ 1 file changed, 14 insertions(+), 12 deletions(-) diff --git a/manager/scheduler/scheduler.go b/manager/scheduler/scheduler.go index 55420a2d42..7935187e93 100644 --- a/manager/scheduler/scheduler.go +++ b/manager/scheduler/scheduler.go @@ -1,7 +1,6 @@ package scheduler import ( - "container/list" "time" "github.com/docker/swarmkit/api" @@ -30,7 +29,7 @@ type schedulingDecision struct { // Scheduler assigns tasks to nodes. type Scheduler struct { store *store.MemoryStore - unassignedTasks *list.List + unassignedTasks map[string]*api.Task // preassignedTasks already have NodeID, need resource validation preassignedTasks map[string]*api.Task nodeSet nodeSet @@ -47,7 +46,7 @@ type Scheduler struct { func New(store *store.MemoryStore) *Scheduler { return &Scheduler{ store: store, - unassignedTasks: list.New(), + unassignedTasks: make(map[string]*api.Task), preassignedTasks: make(map[string]*api.Task), allTasks: make(map[string]*api.Task), stopChan: make(chan struct{}), @@ -191,7 +190,7 @@ func (s *Scheduler) Stop() { // enqueue queues a task for scheduling. func (s *Scheduler) enqueue(t *api.Task) { - s.unassignedTasks.PushBack(t) + s.unassignedTasks[t.ID] = t } func (s *Scheduler) createTask(ctx context.Context, t *api.Task) int { @@ -333,15 +332,12 @@ func (s *Scheduler) processPreassignedTasks(ctx context.Context) { // tick attempts to schedule the queue. func (s *Scheduler) tick(ctx context.Context) { tasksByCommonSpec := make(map[string]map[string]*api.Task) - schedulingDecisions := make(map[string]schedulingDecision, s.unassignedTasks.Len()) + schedulingDecisions := make(map[string]schedulingDecision, len(s.unassignedTasks)) - var next *list.Element - for e := s.unassignedTasks.Front(); e != nil; e = next { - next = e.Next() - t := s.allTasks[e.Value.(*api.Task).ID] + for taskID, t := range s.unassignedTasks { if t == nil || t.NodeID != "" { // task deleted or already assigned - s.unassignedTasks.Remove(e) + delete(s.unassignedTasks, taskID) continue } @@ -362,8 +358,8 @@ func (s *Scheduler) tick(ctx context.Context) { if tasksByCommonSpec[taskGroupKey] == nil { tasksByCommonSpec[taskGroupKey] = make(map[string]*api.Task) } - tasksByCommonSpec[taskGroupKey][t.ID] = t - s.unassignedTasks.Remove(e) + tasksByCommonSpec[taskGroupKey][taskID] = t + delete(s.unassignedTasks, taskID) } for _, taskGroup := range tasksByCommonSpec { @@ -602,6 +598,12 @@ func (s *Scheduler) scheduleNTasksOnNodes(ctx context.Context, n int, taskGroup nodeIter := 0 nodeCount := len(nodes) for taskID, t := range taskGroup { + // Skip tasks which were already scheduled because they ended + // up in two groups at once. + if _, exists := schedulingDecisions[taskID]; exists { + continue + } + node := &nodes[nodeIter%nodeCount] log.G(ctx).WithField("task.id", t.ID).Debugf("assigning to node %s", node.ID)