Skip to content

Commit

Permalink
allocator: Less aggressive retry
Browse files Browse the repository at this point in the history
Instead of retrying unallocated tasks, services, and networks every time
data changes in the store, limit these retries to every 5 minutes.

When a repeated attempt to allocate one of these objects fails, log it
at the debug log level, to reduce noise in the logs.

Signed-off-by: Aaron Lehmann <[email protected]>
  • Loading branch information
aaronlehmann committed Mar 8, 2017
1 parent e4762bc commit 6e78fc2
Show file tree
Hide file tree
Showing 2 changed files with 54 additions and 19 deletions.
5 changes: 5 additions & 0 deletions manager/allocator/allocator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,11 @@ import (
"github.com/stretchr/testify/require"
)

func init() {
// set artificially low retry interval for testing
retryInterval = 5 * time.Millisecond
}

func TestAllocator(t *testing.T) {
s := store.NewMemoryStore(nil)
assert.NotNil(t, s)
Expand Down
68 changes: 49 additions & 19 deletions manager/allocator/network.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,11 @@ const (
allocatedStatusMessage = "pending task scheduling"
)

var errNoChanges = errors.New("task unchanged")
var (
errNoChanges = errors.New("task unchanged")

retryInterval = 5 * time.Minute
)

func newIngressNetwork() *api.Network {
return &api.Network{
Expand Down Expand Up @@ -57,19 +61,28 @@ type networkContext struct {
// the actual network allocation.
nwkAllocator *networkallocator.NetworkAllocator

// A table of unallocated tasks which will be revisited if any thing
// A set of tasks which are ready to be allocated as a batch. This is
// distinct from "unallocatedTasks" which are tasks that failed to
// allocate on the first try, being held for a future retry.
pendingTasks map[string]*api.Task

// A set of unallocated tasks which will be revisited if any thing
// changes in system state that might help task allocation.
unallocatedTasks map[string]*api.Task

// A table of unallocated services which will be revisited if
// A set of unallocated services which will be revisited if
// any thing changes in system state that might help service
// allocation.
unallocatedServices map[string]*api.Service

// A table of unallocated networks which will be revisited if
// A set of unallocated networks which will be revisited if
// any thing changes in system state that might help network
// allocation.
unallocatedNetworks map[string]*api.Network

// lastRetry is the last timestamp when unallocated
// tasks/services/networks were retried.
lastRetry time.Time
}

func (a *Allocator) doNetworkInit(ctx context.Context) (err error) {
Expand All @@ -80,10 +93,12 @@ func (a *Allocator) doNetworkInit(ctx context.Context) (err error) {

nc := &networkContext{
nwkAllocator: na,
pendingTasks: make(map[string]*api.Task),
unallocatedTasks: make(map[string]*api.Task),
unallocatedServices: make(map[string]*api.Service),
unallocatedNetworks: make(map[string]*api.Network),
ingressNetwork: newIngressNetwork(),
lastRetry: time.Now(),
}
a.netCtx = nc
defer func() {
Expand Down Expand Up @@ -401,12 +416,22 @@ func (a *Allocator) doNetworkAlloc(ctx context.Context, ev events.Event) {
case state.EventCreateNode, state.EventUpdateNode, state.EventDeleteNode:
a.doNodeAlloc(ctx, ev)
case state.EventCreateTask, state.EventUpdateTask, state.EventDeleteTask:
a.doTaskAlloc(ctx, ev)
a.doTaskAlloc(ctx, ev, nc.pendingTasks)
case state.EventCommit:
a.procUnallocatedNetworks(ctx)
a.procUnallocatedServices(ctx)
a.procUnallocatedTasksNetwork(ctx)
return
a.procTasksNetwork(ctx, nc.pendingTasks, false)

if time.Since(nc.lastRetry) > retryInterval {
a.procUnallocatedNetworks(ctx)
a.procUnallocatedServices(ctx)
a.procTasksNetwork(ctx, nc.unallocatedTasks, true)
nc.lastRetry = time.Now()
}

// Any left over tasks are moved to the unallocated set
for _, t := range nc.pendingTasks {
nc.unallocatedTasks[t.ID] = t
}
nc.pendingTasks = make(map[string]*api.Task)
}
}

Expand Down Expand Up @@ -551,7 +576,7 @@ func (a *Allocator) taskCreateNetworkAttachments(t *api.Task, s *api.Service) {
taskUpdateNetworks(t, networks)
}

func (a *Allocator) doTaskAlloc(ctx context.Context, ev events.Event) {
func (a *Allocator) doTaskAlloc(ctx context.Context, ev events.Event, toAllocate map[string]*api.Task) {
var (
isDelete bool
t *api.Task
Expand Down Expand Up @@ -579,14 +604,16 @@ func (a *Allocator) doTaskAlloc(ctx context.Context, ev events.Event) {
}
}

// Cleanup any task references that might exist in unallocatedTasks
// Cleanup any task references that might exist
delete(toAllocate, t.ID)
delete(nc.unallocatedTasks, t.ID)
return
}

// If we are already in allocated state, there is
// absolutely nothing else to do.
if t.Status.State >= api.TaskStatePending {
delete(toAllocate, t.ID)
delete(nc.unallocatedTasks, t.ID)
return
}
Expand Down Expand Up @@ -616,7 +643,7 @@ func (a *Allocator) doTaskAlloc(ctx context.Context, ev events.Event) {
// based on service spec.
a.taskCreateNetworkAttachments(t, s)

nc.unallocatedTasks[t.ID] = t
toAllocate[t.ID] = t
}

func (a *Allocator) allocateNode(ctx context.Context, node *api.Node) error {
Expand Down Expand Up @@ -948,15 +975,18 @@ func (a *Allocator) procUnallocatedServices(ctx context.Context) {
}
}

func (a *Allocator) procUnallocatedTasksNetwork(ctx context.Context) {
nc := a.netCtx
allocatedTasks := make([]*api.Task, 0, len(nc.unallocatedTasks))
func (a *Allocator) procTasksNetwork(ctx context.Context, toAllocate map[string]*api.Task, quiet bool) {
allocatedTasks := make([]*api.Task, 0, len(toAllocate))

for _, t := range nc.unallocatedTasks {
for _, t := range toAllocate {
if err := a.allocateTask(ctx, t); err == nil {
allocatedTasks = append(allocatedTasks, t)
} else if err != errNoChanges {
log.G(ctx).WithError(err).Error("task allocation failure")
if quiet {
log.G(ctx).WithError(err).Debug("task allocation failure")
} else {
log.G(ctx).WithError(err).Error("task allocation failure")
}
}
}

Expand All @@ -978,11 +1008,11 @@ func (a *Allocator) procUnallocatedTasksNetwork(ctx context.Context) {
})

if err != nil {
log.G(ctx).WithError(err).Error("failed a store batch operation while processing unallocated tasks")
log.G(ctx).WithError(err).Error("failed a store batch operation while processing tasks")
}

for _, t := range allocatedTasks[:committed] {
delete(nc.unallocatedTasks, t.ID)
delete(toAllocate, t.ID)
}
}

Expand Down

0 comments on commit 6e78fc2

Please sign in to comment.