Skip to content

Commit

Permalink
simplify wakeup queue monitor
Browse files Browse the repository at this point in the history
  • Loading branch information
prateekchaudhry committed May 31, 2023
1 parent 37f1f72 commit a7c2340
Show file tree
Hide file tree
Showing 4 changed files with 15 additions and 38 deletions.
2 changes: 1 addition & 1 deletion agent/engine/common_integ_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,7 @@ func skipIntegTestIfApplicable(t *testing.T) {
}
}

// Values in host resources from getTestHoustResources() should be looked at and CPU/Memory assigned
// Values in host resources from getTestHostResources() should be looked at and CPU/Memory assigned
// accordingly
func createTestContainerWithImageAndName(image string, name string) *apicontainer.Container {
return &apicontainer.Container{
Expand Down
40 changes: 12 additions & 28 deletions agent/engine/docker_task_engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ func NewDockerTaskEngine(cfg *config.Config,
state: state,
managedTasks: make(map[string]*managedTask),
stateChangeEvents: make(chan statechange.Event),
monitorQueuedTaskEvent: make(chan struct{}),
monitorQueuedTaskEvent: make(chan struct{}, 1),

credentialsManager: credentialsManager,

Expand Down Expand Up @@ -263,7 +263,7 @@ func (engine *DockerTaskEngine) reconcileHostResources() {
// Consume host resources if task has progressed
// Call to consume here should always succeed
// Idempotent consume call
if !task.IsInternal && taskStatus > apitaskstatus.TaskCreated {
if !task.IsInternal && (taskStatus == apitaskstatus.TaskCreated || taskStatus == apitaskstatus.TaskPulled) {
consumed, err := engine.hostResourceManager.consume(task.Arn, resources)
if err != nil || !consumed {
logger.Critical("Failed to consume resources for created/running tasks during reconciliation", logger.Fields{field.TaskARN: task.Arn})
Expand Down Expand Up @@ -323,19 +323,13 @@ func (engine *DockerTaskEngine) Init(ctx context.Context) error {
return nil
}

// Does a 'best effort' try to wake up monitorQueuedTasks. Always wakes up when
// length of queue is 1
// Always wakes up when at least one event arrives on buffered channel monitorQueuedTaskEvent
// but does not block if monitorQueuedTasks is already processing queued tasks
func (engine *DockerTaskEngine) wakeUpTaskQueueMonitor() {
select {
case engine.monitorQueuedTaskEvent <- struct{}{}:
default:
waitingTaskQueueSingleLen := false
engine.waitingTasksLock.Lock()
waitingTaskQueueSingleLen = len(engine.waitingTaskQueue) == 1
engine.waitingTasksLock.Unlock()
if waitingTaskQueueSingleLen {
engine.monitorQueuedTaskEvent <- struct{}{}
}
// do nothing
}
}

Expand Down Expand Up @@ -390,28 +384,18 @@ func (engine *DockerTaskEngine) monitorQueuedTasks(ctx context.Context) {
break
}
taskHostResources := task.ToHostResources()
consumed := false
consumable, err := engine.hostResourceManager.consumableSafe(taskHostResources)
consumed, err := task.engine.hostResourceManager.consume(task.Arn, taskHostResources)
if err != nil {
engine.failWaitingTask(err)
} else if consumable {
// consume resources and continue
consumed, err = task.engine.hostResourceManager.consume(task.Arn, taskHostResources)
if err != nil {
engine.failWaitingTask(err)
}
if consumed {
engine.startWaitingTask()
} else {
// not consumed
break
}
}
if consumed {
engine.startWaitingTask()
} else {
// not consumable
// not consumed, go to wait
break
}
}
logger.Debug("No more tasks in Waiting Task Queue, waiting for new tasks")
logger.Debug("No more tasks could be started at this moment, waiting")
}
}
}
Expand Down Expand Up @@ -930,7 +914,7 @@ func (engine *DockerTaskEngine) emitTaskEvent(task *apitask.Task, reason string)
resourcesToRelease := task.ToHostResources()
err := engine.hostResourceManager.release(task.Arn, resourcesToRelease)
if err != nil {
logger.Critical("Failed to release resources after tast stopped", logger.Fields{field.TaskARN: task.Arn})
logger.Critical("Failed to release resources after test stopped", logger.Fields{field.TaskARN: task.Arn})
}
}
event, err := api.NewTaskStateChangeEvent(task, reason)
Expand Down
6 changes: 0 additions & 6 deletions agent/engine/host_resource_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -226,12 +226,6 @@ func (h *HostResourceManager) checkResourcesHealth(resources map[string]*ecs.Res
return nil
}

func (h *HostResourceManager) consumableSafe(resources map[string]*ecs.Resource) (bool, error) {
h.hostResourceManagerRWLock.Lock()
defer h.hostResourceManagerRWLock.Unlock()
return h.consumable(resources)
}

// Helper function for consume to check if resources are consumable with the current account
// we have for the host resources. Should not call host resource manager lock in this func
// return values
Expand Down
5 changes: 2 additions & 3 deletions agent/engine/task_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2174,8 +2174,7 @@ func TestContainerNextStateDependsStoppedContainer(t *testing.T) {
}
}

// TestTaskWaitForHostResourceOnRestart tests task stopped by acs but hasn't
// reached stopped should block the later task to start
// TestTaskWaitForHostResources tests task queuing behavior based on available host resources
func TestTaskWaitForHostResources(t *testing.T) {
ctx, cancel := context.WithCancel(context.TODO())
defer cancel()
Expand All @@ -2184,7 +2183,7 @@ func TestTaskWaitForHostResources(t *testing.T) {
hostResourceManager := NewHostResourceManager(getTestHostResources())
taskEngine := &DockerTaskEngine{
managedTasks: make(map[string]*managedTask),
monitorQueuedTaskEvent: make(chan struct{}),
monitorQueuedTaskEvent: make(chan struct{}, 1),
hostResourceManager: &hostResourceManager,
}
go taskEngine.monitorQueuedTasks(ctx)
Expand Down

0 comments on commit a7c2340

Please sign in to comment.