Skip to content

Commit

Permalink
simplify wakeup queue monitor
Browse files Browse the repository at this point in the history
  • Loading branch information
prateekchaudhry committed May 31, 2023
1 parent 37f1f72 commit 539ae10
Show file tree
Hide file tree
Showing 2 changed files with 11 additions and 33 deletions.
38 changes: 11 additions & 27 deletions agent/engine/docker_task_engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ func NewDockerTaskEngine(cfg *config.Config,
state: state,
managedTasks: make(map[string]*managedTask),
stateChangeEvents: make(chan statechange.Event),
monitorQueuedTaskEvent: make(chan struct{}),
monitorQueuedTaskEvent: make(chan struct{}, 1),

credentialsManager: credentialsManager,

Expand Down Expand Up @@ -263,7 +263,7 @@ func (engine *DockerTaskEngine) reconcileHostResources() {
// Consume host resources if task has progressed
// Call to consume here should always succeed
// Idempotent consume call
if !task.IsInternal && taskStatus > apitaskstatus.TaskCreated {
if !task.IsInternal && (taskStatus == apitaskstatus.TaskCreated || taskStatus == apitaskstatus.TaskPulled) {
consumed, err := engine.hostResourceManager.consume(task.Arn, resources)
if err != nil || !consumed {
logger.Critical("Failed to consume resources for created/running tasks during reconciliation", logger.Fields{field.TaskARN: task.Arn})
Expand Down Expand Up @@ -323,19 +323,13 @@ func (engine *DockerTaskEngine) Init(ctx context.Context) error {
return nil
}

// Does a 'best effort' try to wake up monitorQueuedTasks. Always wakes up when
// length of queue is 1
// Always wakes up when at least on even arrives on buffered channel monitorQueuedTaskEvent
// but does not block if monitorQueuedTasks is already processing queued tasks
func (engine *DockerTaskEngine) wakeUpTaskQueueMonitor() {
select {
case engine.monitorQueuedTaskEvent <- struct{}{}:
default:
waitingTaskQueueSingleLen := false
engine.waitingTasksLock.Lock()
waitingTaskQueueSingleLen = len(engine.waitingTaskQueue) == 1
engine.waitingTasksLock.Unlock()
if waitingTaskQueueSingleLen {
engine.monitorQueuedTaskEvent <- struct{}{}
}
// do nothing
}
}

Expand Down Expand Up @@ -390,28 +384,18 @@ func (engine *DockerTaskEngine) monitorQueuedTasks(ctx context.Context) {
break
}
taskHostResources := task.ToHostResources()
consumed := false
consumable, err := engine.hostResourceManager.consumableSafe(taskHostResources)
consumed, err := task.engine.hostResourceManager.consume(task.Arn, taskHostResources)
if err != nil {
engine.failWaitingTask(err)
} else if consumable {
// consume resources and continue
consumed, err = task.engine.hostResourceManager.consume(task.Arn, taskHostResources)
if err != nil {
engine.failWaitingTask(err)
}
if consumed {
engine.startWaitingTask()
} else {
// not consumed
break
}
}
if consumed {
engine.startWaitingTask()
} else {
// not consumable
// not consumed, go to wait
break
}
}
logger.Debug("No more tasks in Waiting Task Queue, waiting for new tasks")
logger.Debug("No more tasks could be started at this moment, waiting")
}
}
}
Expand Down
6 changes: 0 additions & 6 deletions agent/engine/host_resource_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -226,12 +226,6 @@ func (h *HostResourceManager) checkResourcesHealth(resources map[string]*ecs.Res
return nil
}

func (h *HostResourceManager) consumableSafe(resources map[string]*ecs.Resource) (bool, error) {
h.hostResourceManagerRWLock.Lock()
defer h.hostResourceManagerRWLock.Unlock()
return h.consumable(resources)
}

// Helper function for consume to check if resources are consumable with the current account
// we have for the host resources. Should not call host resource manager lock in this func
// return values
Expand Down

0 comments on commit 539ae10

Please sign in to comment.