diff --git a/agent/acs/handler/acs_handler_test.go b/agent/acs/handler/acs_handler_test.go index bbd80f18e08..1bfacf8c9bb 100644 --- a/agent/acs/handler/acs_handler_test.go +++ b/agent/acs/handler/acs_handler_test.go @@ -1313,7 +1313,6 @@ func validateAddedTask(expectedTask apitask.Task, addedTask apitask.Task) error Family: addedTask.Family, Version: addedTask.Version, DesiredStatusUnsafe: addedTask.GetDesiredStatus(), - StartSequenceNumber: addedTask.StartSequenceNumber, } if !reflect.DeepEqual(expectedTask, taskToCompareFromAdded) { diff --git a/agent/api/ecsclient/client.go b/agent/api/ecsclient/client.go index 94bfd4a6f47..6e6911de76b 100644 --- a/agent/api/ecsclient/client.go +++ b/agent/api/ecsclient/client.go @@ -315,6 +315,10 @@ func (client *APIECSClient) GetHostResources() (map[string]*ecs.Resource, error) } resourceMap := make(map[string]*ecs.Resource) for _, resource := range resources { + if *resource.Name == "PORTS" { + // Except for RCI, TCP Ports are named as PORTS_TCP in agent for Host Resources purpose + resource.Name = utils.Strptr("PORTS_TCP") + } resourceMap[*resource.Name] = resource } return resourceMap, nil diff --git a/agent/api/task/task.go b/agent/api/task/task.go index 9b46e297340..e9c47d42dc0 100644 --- a/agent/api/task/task.go +++ b/agent/api/task/task.go @@ -234,9 +234,6 @@ type Task struct { // is handled properly so that the state storage continues to work. SentStatusUnsafe apitaskstatus.TaskStatus `json:"SentStatus"` - StartSequenceNumber int64 - StopSequenceNumber int64 - // ExecutionCredentialsID is the ID of credentials that are used by agent to // perform some action at the task level, such as pulling image from ECR ExecutionCredentialsID string `json:"executionCredentialsID"` @@ -312,11 +309,6 @@ func TaskFromACS(acsTask *ecsacs.Task, envelope *ecsacs.PayloadMessage) (*Task, if err := json.Unmarshal(data, task); err != nil { return nil, err } - if task.GetDesiredStatus() == apitaskstatus.TaskRunning && envelope.SeqNum != nil { - task.StartSequenceNumber = *envelope.SeqNum - } else if task.GetDesiredStatus() == apitaskstatus.TaskStopped && envelope.SeqNum != nil { - task.StopSequenceNumber = *envelope.SeqNum - } // Overrides the container command if it's set for _, container := range task.Containers { @@ -2831,22 +2823,6 @@ func (task *Task) GetAppMesh() *apiappmesh.AppMesh { return task.AppMesh } -// GetStopSequenceNumber returns the stop sequence number of a task -func (task *Task) GetStopSequenceNumber() int64 { - task.lock.RLock() - defer task.lock.RUnlock() - - return task.StopSequenceNumber -} - -// SetStopSequenceNumber sets the stop seqence number of a task -func (task *Task) SetStopSequenceNumber(seqnum int64) { - task.lock.Lock() - defer task.lock.Unlock() - - task.StopSequenceNumber = seqnum -} - // SetPullStartedAt sets the task pullstartedat timestamp and returns whether // this field was updated or not func (task *Task) SetPullStartedAt(timestamp time.Time) bool { @@ -3522,10 +3498,6 @@ func (task *Task) IsServiceConnectConnectionDraining() bool { // // * GPU // - Return num of gpus requested (len of GPUIDs field) -// -// TODO remove this once ToHostResources is used -// -//lint:file-ignore U1000 Ignore all unused code func (task *Task) ToHostResources() map[string]*ecs.Resource { resources := make(map[string]*ecs.Resource) // CPU @@ -3639,3 +3611,13 @@ func (task *Task) ToHostResources() map[string]*ecs.Resource { }) return resources } + +func (task *Task) HasActiveContainers() bool { + for _, container := range task.Containers { + containerStatus := container.GetKnownStatus() + if containerStatus >= apicontainerstatus.ContainerPulled && containerStatus <= apicontainerstatus.ContainerResourcesProvisioned { + return true + } + } + return false +} diff --git a/agent/api/task/task_test.go b/agent/api/task/task_test.go index 30fbf8d1824..08123e684e0 100644 --- a/agent/api/task/task_test.go +++ b/agent/api/task/task_test.go @@ -1860,10 +1860,9 @@ func TestTaskFromACS(t *testing.T) { Type: "elastic-inference", }, }, - StartSequenceNumber: 42, - CPU: 2.0, - Memory: 512, - ResourcesMapUnsafe: make(map[string][]taskresource.TaskResource), + CPU: 2.0, + Memory: 512, + ResourcesMapUnsafe: make(map[string][]taskresource.TaskResource), } seqNum := int64(42) diff --git a/agent/api/task/taskvolume_test.go b/agent/api/task/taskvolume_test.go index ed0f8ad9a91..c0602bf7705 100644 --- a/agent/api/task/taskvolume_test.go +++ b/agent/api/task/taskvolume_test.go @@ -119,8 +119,6 @@ func TestMarshalTaskVolumesEFS(t *testing.T) { "PullStoppedAt": "0001-01-01T00:00:00Z", "ExecutionStoppedAt": "0001-01-01T00:00:00Z", "SentStatus": "NONE", - "StartSequenceNumber": 0, - "StopSequenceNumber": 0, "executionCredentialsID": "", "ENI": null, "AppMesh": null, @@ -168,8 +166,6 @@ func TestUnmarshalTaskVolumesEFS(t *testing.T) { "PullStoppedAt": "0001-01-01T00:00:00Z", "ExecutionStoppedAt": "0001-01-01T00:00:00Z", "SentStatus": "NONE", - "StartSequenceNumber": 0, - "StopSequenceNumber": 0, "executionCredentialsID": "", "ENI": null, "AppMesh": null, diff --git a/agent/api/task/taskvolume_windows_test.go b/agent/api/task/taskvolume_windows_test.go index 98070de3f3f..85e6d954bde 100644 --- a/agent/api/task/taskvolume_windows_test.go +++ b/agent/api/task/taskvolume_windows_test.go @@ -77,8 +77,6 @@ func TestMarshalTaskVolumeFSxWindowsFileServer(t *testing.T) { "PullStoppedAt": "0001-01-01T00:00:00Z", "ExecutionStoppedAt": "0001-01-01T00:00:00Z", "SentStatus": "NONE", - "StartSequenceNumber": 0, - "StopSequenceNumber": 0, "executionCredentialsID": "", "ENI": null, "AppMesh": null, @@ -118,8 +116,6 @@ func TestUnmarshalTaskVolumeFSxWindowsFileServer(t *testing.T) { "PullStoppedAt": "0001-01-01T00:00:00Z", "ExecutionStoppedAt": "0001-01-01T00:00:00Z", "SentStatus": "NONE", - "StartSequenceNumber": 0, - "StopSequenceNumber": 0, "executionCredentialsID": "", "ENI": null, "AppMesh": null, diff --git a/agent/app/agent_unix_test.go b/agent/app/agent_unix_test.go index 1dbeb40ce82..bc290a7cc42 100644 --- a/agent/app/agent_unix_test.go +++ b/agent/app/agent_unix_test.go @@ -478,6 +478,7 @@ func TestDoStartCgroupInitHappyPath(t *testing.T) { state.EXPECT().AllImageStates().Return(nil), state.EXPECT().AllENIAttachments().Return(nil), state.EXPECT().AllTasks().Return(nil), + state.EXPECT().AllTasks().Return(nil), client.EXPECT().DiscoverPollEndpoint(gomock.Any()).Do(func(x interface{}) { // Ensures that the test waits until acs session has bee started discoverEndpointsInvoked.Done() @@ -646,6 +647,7 @@ func TestDoStartGPUManagerHappyPath(t *testing.T) { state.EXPECT().AllImageStates().Return(nil), state.EXPECT().AllENIAttachments().Return(nil), state.EXPECT().AllTasks().Return(nil), + state.EXPECT().AllTasks().Return(nil), client.EXPECT().DiscoverPollEndpoint(gomock.Any()).Do(func(x interface{}) { // Ensures that the test waits until acs session has been started discoverEndpointsInvoked.Done() diff --git a/agent/engine/common_integ_test.go b/agent/engine/common_integ_test.go index d190e375b63..9dd5ce08d96 100644 --- a/agent/engine/common_integ_test.go +++ b/agent/engine/common_integ_test.go @@ -225,6 +225,8 @@ func skipIntegTestIfApplicable(t *testing.T) { } } +// Values in host resources from getTestHostResources() should be looked at and CPU/Memory assigned +// accordingly func createTestContainerWithImageAndName(image string, name string) *apicontainer.Container { return &apicontainer.Container{ Name: name, @@ -232,7 +234,7 @@ func createTestContainerWithImageAndName(image string, name string) *apicontaine Command: []string{}, Essential: true, DesiredStatusUnsafe: apicontainerstatus.ContainerRunning, - CPU: 1024, + CPU: 256, Memory: 128, } } diff --git a/agent/engine/docker_image_manager_integ_test.go b/agent/engine/docker_image_manager_integ_test.go index 031a7bd1415..938a77a3c53 100644 --- a/agent/engine/docker_image_manager_integ_test.go +++ b/agent/engine/docker_image_manager_integ_test.go @@ -568,7 +568,7 @@ func createImageCleanupHappyTestTask(taskName string) *apitask.Task { Image: test1Image1Name, Essential: false, DesiredStatusUnsafe: apicontainerstatus.ContainerRunning, - CPU: 512, + CPU: 256, Memory: 256, }, { @@ -576,7 +576,7 @@ func createImageCleanupHappyTestTask(taskName string) *apitask.Task { Image: test1Image2Name, Essential: false, DesiredStatusUnsafe: apicontainerstatus.ContainerRunning, - CPU: 512, + CPU: 256, Memory: 256, }, { @@ -584,7 +584,7 @@ func createImageCleanupHappyTestTask(taskName string) *apitask.Task { Image: test1Image3Name, Essential: false, DesiredStatusUnsafe: apicontainerstatus.ContainerRunning, - CPU: 512, + CPU: 256, Memory: 256, }, }, @@ -603,7 +603,7 @@ func createImageCleanupThresholdTestTask(taskName string) *apitask.Task { Image: test2Image1Name, Essential: false, DesiredStatusUnsafe: apicontainerstatus.ContainerRunning, - CPU: 512, + CPU: 256, Memory: 256, }, { @@ -611,7 +611,7 @@ func createImageCleanupThresholdTestTask(taskName string) *apitask.Task { Image: test2Image2Name, Essential: false, DesiredStatusUnsafe: apicontainerstatus.ContainerRunning, - CPU: 512, + CPU: 256, Memory: 256, }, { @@ -619,7 +619,7 @@ func createImageCleanupThresholdTestTask(taskName string) *apitask.Task { Image: test2Image3Name, Essential: false, DesiredStatusUnsafe: apicontainerstatus.ContainerRunning, - CPU: 512, + CPU: 256, Memory: 256, }, }, diff --git a/agent/engine/docker_task_engine.go b/agent/engine/docker_task_engine.go index 36b40b01ee3..4e3128b89c6 100644 --- a/agent/engine/docker_task_engine.go +++ b/agent/engine/docker_task_engine.go @@ -53,7 +53,6 @@ import ( "github.com/aws/amazon-ecs-agent/agent/taskresource/firelens" "github.com/aws/amazon-ecs-agent/agent/utils" "github.com/aws/amazon-ecs-agent/agent/utils/retry" - utilsync "github.com/aws/amazon-ecs-agent/agent/utils/sync" "github.com/aws/amazon-ecs-agent/agent/utils/ttime" "github.com/aws/aws-sdk-go/aws" "github.com/docker/docker/api/types" @@ -135,10 +134,12 @@ type DockerTaskEngine struct { state dockerstate.TaskEngineState managedTasks map[string]*managedTask - taskStopGroup *utilsync.SequentialWaitGroup + // waitingTasksQueue is a FIFO queue of tasks waiting to acquire host resources + waitingTaskQueue []*managedTask - events <-chan dockerapi.DockerContainerChangeEvent - stateChangeEvents chan statechange.Event + events <-chan dockerapi.DockerContainerChangeEvent + monitorQueuedTaskEvent chan struct{} + stateChangeEvents chan statechange.Event client dockerapi.DockerClient dataClient data.Client @@ -154,6 +155,8 @@ type DockerTaskEngine struct { // all tasks, it must not acquire it for any significant duration // The write mutex should be taken when adding and removing tasks from managedTasks. tasksLock sync.RWMutex + // waitingTasksLock is a mutext for operations on waitingTasksQueue + waitingTasksLock sync.RWMutex credentialsManager credentials.Manager _time ttime.Time @@ -207,10 +210,10 @@ func NewDockerTaskEngine(cfg *config.Config, client: client, dataClient: data.NewNoopClient(), - state: state, - managedTasks: make(map[string]*managedTask), - taskStopGroup: utilsync.NewSequentialWaitGroup(), - stateChangeEvents: make(chan statechange.Event), + state: state, + managedTasks: make(map[string]*managedTask), + stateChangeEvents: make(chan statechange.Event), + monitorQueuedTaskEvent: make(chan struct{}, 1), credentialsManager: credentialsManager, @@ -238,6 +241,37 @@ func NewDockerTaskEngine(cfg *config.Config, return dockerTaskEngine } +// Reconcile state of host resource manager with task status in managedTasks Slice +// Done on agent restarts +func (engine *DockerTaskEngine) reconcileHostResources() { + logger.Info("Reconciling host resources") + for _, task := range engine.state.AllTasks() { + taskStatus := task.GetKnownStatus() + resources := task.ToHostResources() + + // Release stopped tasks host resources + // Call to release here for stopped tasks should always succeed + // Idempotent release call + if taskStatus.Terminal() { + err := engine.hostResourceManager.release(task.Arn, resources) + if err != nil { + logger.Critical("Failed to release resources during reconciliation", logger.Fields{field.TaskARN: task.Arn}) + } + continue + } + + // Consume host resources if task has progressed (check if any container has progressed) + // Call to consume here should always succeed + // Idempotent consume call + if !task.IsInternal && task.HasActiveContainers() { + consumed, err := engine.hostResourceManager.consume(task.Arn, resources) + if err != nil || !consumed { + logger.Critical("Failed to consume resources for created/running tasks during reconciliation", logger.Fields{field.TaskARN: task.Arn}) + } + } + } +} + func (engine *DockerTaskEngine) initializeContainerStatusToTransitionFunction() { containerStatusToTransitionFunction := map[apicontainerstatus.ContainerStatus]transitionApplyFunc{ apicontainerstatus.ContainerPulled: engine.pullContainer, @@ -280,6 +314,7 @@ func (engine *DockerTaskEngine) Init(ctx context.Context) error { return err } engine.synchronizeState() + go engine.monitorQueuedTasks(derivedCtx) // Now catch up and start processing new events per normal go engine.handleDockerEvents(derivedCtx) engine.initialized = true @@ -288,6 +323,96 @@ func (engine *DockerTaskEngine) Init(ctx context.Context) error { return nil } +// Always wakes up when at least one event arrives on buffered channel monitorQueuedTaskEvent +// but does not block if monitorQueuedTasks is already processing queued tasks +func (engine *DockerTaskEngine) wakeUpTaskQueueMonitor() { + select { + case engine.monitorQueuedTaskEvent <- struct{}{}: + default: + // do nothing + } +} + +func (engine *DockerTaskEngine) topTask() (*managedTask, error) { + engine.waitingTasksLock.Lock() + defer engine.waitingTasksLock.Unlock() + if len(engine.waitingTaskQueue) > 0 { + return engine.waitingTaskQueue[0], nil + } + return nil, fmt.Errorf("no tasks in waiting queue") +} + +func (engine *DockerTaskEngine) enqueueTask(task *managedTask) { + engine.waitingTasksLock.Lock() + engine.waitingTaskQueue = append(engine.waitingTaskQueue, task) + engine.waitingTasksLock.Unlock() + logger.Debug("Enqueued task in Waiting Task Queue", logger.Fields{field.TaskARN: task.Arn}) + engine.wakeUpTaskQueueMonitor() +} + +func (engine *DockerTaskEngine) dequeueTask() (*managedTask, error) { + engine.waitingTasksLock.Lock() + defer engine.waitingTasksLock.Unlock() + if len(engine.waitingTaskQueue) > 0 { + task := engine.waitingTaskQueue[0] + engine.waitingTaskQueue = engine.waitingTaskQueue[1:] + logger.Debug("Dequeued task from Waiting Task Queue", logger.Fields{field.TaskARN: task.Arn}) + return task, nil + } + + return nil, fmt.Errorf("no tasks in waiting queue") +} + +// monitorQueuedTasks starts as many tasks as possible based on FIFO order of waitingTaskQueue +// and availability of host resources. When no more tasks can be started, it will wait on +// monitorQueuedTaskEvent channel. This channel receives (best effort) messages when +// - a task stops +// - a new task is queued up +// It does not need to receive all messages, as if the routine is going through the queue, it +// may schedule more than one task for a single 'event' received +func (engine *DockerTaskEngine) monitorQueuedTasks(ctx context.Context) { + logger.Info("Monitoring Task Queue started") + for { + select { + case <-ctx.Done(): + return + case <-engine.monitorQueuedTaskEvent: + // Dequeue as many tasks as possible and start wake up their goroutines + for { + task, err := engine.topTask() + if err != nil { + break + } + taskHostResources := task.ToHostResources() + consumed, err := task.engine.hostResourceManager.consume(task.Arn, taskHostResources) + if err != nil { + engine.failWaitingTask(err) + } + if consumed { + engine.startWaitingTask() + } else { + // not consumed, go to wait + break + } + } + logger.Debug("No more tasks could be started at this moment, waiting") + } + } +} + +func (engine *DockerTaskEngine) failWaitingTask(err error) { + task, _ := engine.dequeueTask() + logger.Error(fmt.Sprintf("Error consuming resources due to invalid task config : %s", err.Error()), logger.Fields{field.TaskARN: task.Arn}) + task.SetDesiredStatus(apitaskstatus.TaskStopped) + task.consumedHostResourceEvent <- struct{}{} +} + +func (engine *DockerTaskEngine) startWaitingTask() { + task, _ := engine.dequeueTask() + logger.Info("Host resources consumed, progressing task", logger.Fields{field.TaskARN: task.Arn}) + task.consumedHostResourceEvent <- struct{}{} +} + func (engine *DockerTaskEngine) startPeriodicExecAgentsMonitoring(ctx context.Context) { engine.monitorExecAgentsTicker = time.NewTicker(engine.monitorExecAgentsInterval) for { @@ -469,6 +594,9 @@ func (engine *DockerTaskEngine) synchronizeState() { engine.saveTaskData(task) } + // Before starting managedTask goroutines, pre-allocate resources for already running + // tasks in host resource manager + engine.reconcileHostResources() for _, task := range tasksToStart { engine.startTask(task) } @@ -493,11 +621,6 @@ func (engine *DockerTaskEngine) filterTasksToStartUnsafe(tasks []*apitask.Task) } tasksToStart = append(tasksToStart, task) - - // Put tasks that are stopped by acs but hasn't been stopped in wait group - if task.GetDesiredStatus().Terminal() && task.GetStopSequenceNumber() != 0 { - engine.taskStopGroup.Add(task.GetStopSequenceNumber(), 1) - } } return tasksToStart @@ -785,6 +908,15 @@ func (engine *DockerTaskEngine) deleteTask(task *apitask.Task) { } func (engine *DockerTaskEngine) emitTaskEvent(task *apitask.Task, reason string) { + if task.GetKnownStatus().Terminal() { + // Always do (idempotent) release host resources whenever state change with + // known status == STOPPED is done to ensure sync between tasks and host resource manager + resourcesToRelease := task.ToHostResources() + err := engine.hostResourceManager.release(task.Arn, resourcesToRelease) + if err != nil { + logger.Critical("Failed to release resources after test stopped", logger.Fields{field.TaskARN: task.Arn}) + } + } event, err := api.NewTaskStateChangeEvent(task, reason) if err != nil { if _, ok := err.(api.ErrShouldNotSendEvent); ok { @@ -2124,16 +2256,13 @@ func (engine *DockerTaskEngine) updateTaskUnsafe(task *apitask.Task, update *api logger.Debug("Putting update on the acs channel", logger.Fields{ field.TaskID: task.GetID(), field.DesiredStatus: updateDesiredStatus.String(), - field.Sequence: update.StopSequenceNumber, }) managedTask.emitACSTransition(acsTransition{ desiredStatus: updateDesiredStatus, - seqnum: update.StopSequenceNumber, }) logger.Debug("Update taken off the acs channel", logger.Fields{ field.TaskID: task.GetID(), field.DesiredStatus: updateDesiredStatus.String(), - field.Sequence: update.StopSequenceNumber, }) } diff --git a/agent/engine/docker_task_engine_test.go b/agent/engine/docker_task_engine_test.go index 1d29a845afd..9d396cf86e1 100644 --- a/agent/engine/docker_task_engine_test.go +++ b/agent/engine/docker_task_engine_test.go @@ -566,7 +566,6 @@ func TestStopWithPendingStops(t *testing.T) { testTime.EXPECT().After(gomock.Any()).AnyTimes() sleepTask1 := testdata.LoadTask("sleep5") - sleepTask1.StartSequenceNumber = 5 sleepTask2 := testdata.LoadTask("sleep5") sleepTask2.Arn = "arn2" eventStream := make(chan dockerapi.DockerContainerChangeEvent) @@ -594,13 +593,11 @@ func TestStopWithPendingStops(t *testing.T) { stopSleep2 := testdata.LoadTask("sleep5") stopSleep2.Arn = "arn2" stopSleep2.SetDesiredStatus(apitaskstatus.TaskStopped) - stopSleep2.StopSequenceNumber = 4 taskEngine.AddTask(stopSleep2) taskEngine.AddTask(sleepTask1) stopSleep1 := testdata.LoadTask("sleep5") stopSleep1.SetDesiredStatus(apitaskstatus.TaskStopped) - stopSleep1.StopSequenceNumber = 5 taskEngine.AddTask(stopSleep1) pullDone <- true // this means the PullImage is only called once due to the task is stopped before it @@ -1639,11 +1636,11 @@ func TestPullAndUpdateContainerReference(t *testing.T) { // agent starts, container created, metadata file created, agent restarted, container recovered // during task engine init, metadata file updated func TestMetadataFileUpdatedAgentRestart(t *testing.T) { - conf := &defaultConfig + conf := defaultConfig conf.ContainerMetadataEnabled = config.BooleanDefaultFalse{Value: config.ExplicitlyEnabled} ctx, cancel := context.WithCancel(context.TODO()) defer cancel() - ctrl, client, _, privateTaskEngine, _, imageManager, metadataManager, serviceConnectManager := mocks(t, ctx, conf) + ctrl, client, _, privateTaskEngine, _, imageManager, metadataManager, serviceConnectManager := mocks(t, ctx, &conf) defer ctrl.Finish() var metadataUpdateWG sync.WaitGroup @@ -1869,81 +1866,6 @@ func TestNewTaskTransitionOnRestart(t *testing.T) { assert.True(t, ok, "task wasnot started") } -// TestTaskWaitForHostResourceOnRestart tests task stopped by acs but hasn't -// reached stopped should block the later task to start -func TestTaskWaitForHostResourceOnRestart(t *testing.T) { - // Task 1 stopped by backend - taskStoppedByACS := testdata.LoadTask("sleep5") - taskStoppedByACS.SetDesiredStatus(apitaskstatus.TaskStopped) - taskStoppedByACS.SetStopSequenceNumber(1) - taskStoppedByACS.SetKnownStatus(apitaskstatus.TaskRunning) - // Task 2 has essential container stopped - taskEssentialContainerStopped := testdata.LoadTask("sleep5") - taskEssentialContainerStopped.Arn = "task_Essential_Container_Stopped" - taskEssentialContainerStopped.SetDesiredStatus(apitaskstatus.TaskStopped) - taskEssentialContainerStopped.SetKnownStatus(apitaskstatus.TaskRunning) - // Normal task 3 needs to be started - taskNotStarted := testdata.LoadTask("sleep5") - taskNotStarted.Arn = "task_Not_started" - - conf := &defaultConfig - conf.ContainerMetadataEnabled = config.BooleanDefaultFalse{Value: config.ExplicitlyDisabled} - ctx, cancel := context.WithCancel(context.TODO()) - defer cancel() - ctrl, client, _, privateTaskEngine, _, imageManager, _, serviceConnectManager := mocks(t, ctx, conf) - defer ctrl.Finish() - - client.EXPECT().Version(gomock.Any(), gomock.Any()).MaxTimes(1) - client.EXPECT().ContainerEvents(gomock.Any()).MaxTimes(1) - serviceConnectManager.EXPECT().GetAppnetContainerTarballDir().AnyTimes() - - err := privateTaskEngine.Init(ctx) - assert.NoError(t, err) - - taskEngine := privateTaskEngine.(*DockerTaskEngine) - taskEngine.State().AddTask(taskStoppedByACS) - taskEngine.State().AddTask(taskNotStarted) - taskEngine.State().AddTask(taskEssentialContainerStopped) - - taskEngine.State().AddContainer(&apicontainer.DockerContainer{ - Container: taskStoppedByACS.Containers[0], - DockerID: containerID + "1", - DockerName: dockerContainerName + "1", - }, taskStoppedByACS) - taskEngine.State().AddContainer(&apicontainer.DockerContainer{ - Container: taskNotStarted.Containers[0], - DockerID: containerID + "2", - DockerName: dockerContainerName + "2", - }, taskNotStarted) - taskEngine.State().AddContainer(&apicontainer.DockerContainer{ - Container: taskEssentialContainerStopped.Containers[0], - DockerID: containerID + "3", - DockerName: dockerContainerName + "3", - }, taskEssentialContainerStopped) - - // these are performed in synchronizeState on restart - client.EXPECT().DescribeContainer(gomock.Any(), gomock.Any()).Return(apicontainerstatus.ContainerRunning, dockerapi.DockerContainerMetadata{ - DockerID: containerID, - }).Times(3) - imageManager.EXPECT().RecordContainerReference(gomock.Any()).Times(3) - // start the two tasks - taskEngine.synchronizeState() - - var waitStopWG sync.WaitGroup - waitStopWG.Add(1) - go func() { - // This is to confirm the other task is waiting - time.Sleep(1 * time.Second) - // Remove the task sequence number 1 from waitgroup - taskEngine.taskStopGroup.Done(1) - waitStopWG.Done() - }() - - // task with sequence number 2 should wait until 1 is removed from the waitgroup - taskEngine.taskStopGroup.Wait(2) - waitStopWG.Wait() -} - // TestPullStartedStoppedAtWasSetCorrectly tests the PullStartedAt and PullStoppedAt // was set correctly func TestPullStartedStoppedAtWasSetCorrectly(t *testing.T) { diff --git a/agent/engine/dockerstate/json_test.go b/agent/engine/dockerstate/json_test.go index c584c35787a..dd8f8222642 100644 --- a/agent/engine/dockerstate/json_test.go +++ b/agent/engine/dockerstate/json_test.go @@ -124,8 +124,6 @@ const ( "KnownStatus": "RUNNING", "KnownTime": "2017-11-01T20:24:21.449897483Z", "SentStatus": "RUNNING", - "StartSequenceNumber": 9, - "StopSequenceNumber": 0, "ENI": { "ec2Id": "eni-abcd", "IPV4Addresses": [ diff --git a/agent/engine/engine_windows_integ_test.go b/agent/engine/engine_windows_integ_test.go index fbc05a3d1c4..406915b995d 100644 --- a/agent/engine/engine_windows_integ_test.go +++ b/agent/engine/engine_windows_integ_test.go @@ -75,13 +75,15 @@ var endpoint = utils.DefaultIfBlank(os.Getenv(DockerEndpointEnvVariable), docker // TODO implement this func isDockerRunning() bool { return true } +// Values in host resources from getTestHoustResources() should be looked at and CPU/Memory assigned +// accordingly func createTestContainer() *apicontainer.Container { return &apicontainer.Container{ Name: "windows", Image: testBaseImage, Essential: true, DesiredStatusUnsafe: apicontainerstatus.ContainerRunning, - CPU: 512, + CPU: 256, Memory: 256, } } diff --git a/agent/engine/host_resource_manager.go b/agent/engine/host_resource_manager.go index 50ae60c0d51..49f3c7b27f6 100644 --- a/agent/engine/host_resource_manager.go +++ b/agent/engine/host_resource_manager.go @@ -25,9 +25,6 @@ import ( "github.com/aws/amazon-ecs-agent/agent/utils" ) -// TODO remove this once resource, consume are used -//lint:file-ignore U1000 Ignore all unused code - const ( CPU = "CPU" GPU = "GPU" @@ -92,6 +89,13 @@ func (h *HostResourceManager) consumeStringSetType(resourceType string, resource } } +func (h *HostResourceManager) checkTaskConsumed(taskArn string) bool { + h.hostResourceManagerRWLock.Lock() + defer h.hostResourceManagerRWLock.Unlock() + _, ok := h.taskConsumed[taskArn] + return ok +} + // Returns if resources consumed or not and error status // false, nil -> did not consume, task should stay pending // false, err -> resources map has errors, task should fail as cannot schedule with 'wrong' resource map (this basically never happens) @@ -195,7 +199,7 @@ func (h *HostResourceManager) checkResourcesHealth(resources map[string]*ecs.Res for resourceKey, resourceVal := range resources { _, ok := h.initialHostResource[resourceKey] if !ok { - logger.Error(fmt.Sprintf("resource %s not found in ", resourceKey)) + logger.Error(fmt.Sprintf("resource %s not found in host resources", resourceKey)) return &InvalidHostResource{resourceKey} } @@ -256,6 +260,9 @@ func (h *HostResourceManager) consumable(resources map[string]*ecs.Resource) (bo func removeSubSlice(s1 []*string, s2 []*string) []*string { begin := 0 end := len(s1) - 1 + if len(s2) == 0 { + return s1 + } for ; begin < len(s1); begin++ { if *s1[begin] == *s2[0] { break diff --git a/agent/engine/task_manager.go b/agent/engine/task_manager.go index e8a8a770ee9..0fc59c60b2f 100644 --- a/agent/engine/task_manager.go +++ b/agent/engine/task_manager.go @@ -42,7 +42,6 @@ import ( "github.com/aws/amazon-ecs-agent/agent/taskresource" resourcestatus "github.com/aws/amazon-ecs-agent/agent/taskresource/status" "github.com/aws/amazon-ecs-agent/agent/utils/retry" - utilsync "github.com/aws/amazon-ecs-agent/agent/utils/sync" "github.com/aws/amazon-ecs-agent/agent/utils/ttime" ) @@ -135,12 +134,12 @@ type managedTask struct { credentialsManager credentials.Manager cniClient ecscni.CNIClient dockerClient dockerapi.DockerClient - taskStopWG *utilsync.SequentialWaitGroup acsMessages chan acsTransition dockerMessages chan dockerContainerChange resourceStateChangeEvent chan resourceStateChange stateChangeEvents chan statechange.Event + consumedHostResourceEvent chan struct{} containerChangeEventStream *eventstream.EventStream // unexpectedStart is a once that controls stopping a container that @@ -177,6 +176,7 @@ func (engine *DockerTaskEngine) newManagedTask(task *apitask.Task) *managedTask acsMessages: make(chan acsTransition), dockerMessages: make(chan dockerContainerChange), resourceStateChangeEvent: make(chan resourceStateChange), + consumedHostResourceEvent: make(chan struct{}, 1), engine: engine, cfg: engine.cfg, stateChangeEvents: engine.stateChangeEvents, @@ -184,7 +184,6 @@ func (engine *DockerTaskEngine) newManagedTask(task *apitask.Task) *managedTask credentialsManager: engine.credentialsManager, cniClient: engine.cniClient, dockerClient: engine.client, - taskStopWG: engine.taskStopGroup, steadyStatePollInterval: engine.taskSteadyStatePollInterval, steadyStatePollIntervalJitter: engine.taskSteadyStatePollIntervalJitter, } @@ -243,13 +242,8 @@ func (mtask *managedTask) overseeTask() { mtask.engine.checkTearDownPauseContainer(mtask.Task) // TODO [SC]: We need to also tear down pause containets in bridge mode for SC-enabled tasks mtask.cleanupCredentials() - if mtask.StopSequenceNumber != 0 { - logger.Debug("Marking done for this sequence", logger.Fields{ - field.TaskID: mtask.GetID(), - field.Sequence: mtask.StopSequenceNumber, - }) - mtask.taskStopWG.Done(mtask.StopSequenceNumber) - } + // Send event to monitor queue task routine to check for any pending tasks to progress + mtask.engine.wakeUpTaskQueueMonitor() // TODO: make this idempotent on agent restart go mtask.releaseIPInIPAM() mtask.cleanupTask(retry.AddJitter(mtask.cfg.TaskCleanupWaitDuration, mtask.cfg.TaskCleanupWaitDurationJitter)) @@ -275,43 +269,21 @@ func (mtask *managedTask) emitCurrentStatus() { } // waitForHostResources waits for host resources to become available to start -// the task. This involves waiting for previous stops to complete so the -// resources become free. +// the task. It will wait for event on this task's consumedHostResourceEvent +// channel from monitorQueuedTasks routine to wake up func (mtask *managedTask) waitForHostResources() { - if mtask.StartSequenceNumber == 0 { - // This is the first transition on this host. No need to wait - return - } - if mtask.GetDesiredStatus().Terminal() { - // Task's desired status is STOPPED. No need to wait in this case either - return - } - - logger.Info("Waiting for any previous stops to complete", logger.Fields{ - field.TaskID: mtask.GetID(), - field.Sequence: mtask.StartSequenceNumber, - }) - - othersStoppedCtx, cancel := context.WithCancel(mtask.ctx) - defer cancel() - - go func() { - mtask.taskStopWG.Wait(mtask.StartSequenceNumber) - cancel() - }() - - for !mtask.waitEvent(othersStoppedCtx.Done()) { - if mtask.GetDesiredStatus().Terminal() { - // If we end up here, that means we received a start then stop for this - // task before a task that was expected to stop before it could - // actually stop - break + if !mtask.IsInternal && !mtask.engine.hostResourceManager.checkTaskConsumed(mtask.Arn) { + // Internal tasks are started right away as their resources are not accounted for + mtask.engine.enqueueTask(mtask) + for !mtask.waitEvent(mtask.consumedHostResourceEvent) { + if mtask.GetDesiredStatus().Terminal() { + // If we end up here, that means we received a start then stop for this + // task before a task that was expected to stop before it could + // actually stop + break + } } } - logger.Info("Wait over; ready to move towards desired status", logger.Fields{ - field.TaskID: mtask.GetID(), - field.DesiredStatus: mtask.GetDesiredStatus().String(), - }) } // waitSteady waits for a task to leave steady-state by waiting for a new @@ -406,26 +378,15 @@ func (mtask *managedTask) handleDesiredStatusChange(desiredStatus apitaskstatus. field.TaskID: mtask.GetID(), field.DesiredStatus: desiredStatus.String(), field.Sequence: seqnum, - "StopNumber": mtask.StopSequenceNumber, }) if desiredStatus <= mtask.GetDesiredStatus() { logger.Debug("Redundant task transition; ignoring", logger.Fields{ field.TaskID: mtask.GetID(), field.DesiredStatus: desiredStatus.String(), field.Sequence: seqnum, - "StopNumber": mtask.StopSequenceNumber, }) return } - if desiredStatus == apitaskstatus.TaskStopped && seqnum != 0 && mtask.GetStopSequenceNumber() == 0 { - logger.Info("Managed task moving to stopped, adding to stopgroup with sequence number", - logger.Fields{ - field.TaskID: mtask.GetID(), - field.Sequence: seqnum, - }) - mtask.SetStopSequenceNumber(seqnum) - mtask.taskStopWG.Add(seqnum, 1) - } mtask.SetDesiredStatus(desiredStatus) mtask.UpdateDesiredStatus() mtask.engine.saveTaskData(mtask.Task) @@ -606,6 +567,15 @@ func getContainerEventLogFields(c api.ContainerStateChange) logger.Fields { func (mtask *managedTask) emitTaskEvent(task *apitask.Task, reason string) { taskKnownStatus := task.GetKnownStatus() + // Always do (idempotent) release host resources whenever state change with + // known status == STOPPED is done to ensure sync between tasks and host resource manager + if taskKnownStatus.Terminal() { + resourcesToRelease := mtask.ToHostResources() + err := mtask.engine.hostResourceManager.release(mtask.Arn, resourcesToRelease) + if err != nil { + logger.Critical("Failed to release resources after tast stopped", logger.Fields{field.TaskARN: mtask.Arn}) + } + } if !taskKnownStatus.BackendRecognized() { logger.Debug("Skipping event emission for task", logger.Fields{ field.TaskID: mtask.GetID(), diff --git a/agent/engine/task_manager_data_test.go b/agent/engine/task_manager_data_test.go index e4eb48df071..0e7aac8332b 100644 --- a/agent/engine/task_manager_data_test.go +++ b/agent/engine/task_manager_data_test.go @@ -31,7 +31,6 @@ import ( resourcestatus "github.com/aws/amazon-ecs-agent/agent/taskresource/status" resourcetype "github.com/aws/amazon-ecs-agent/agent/taskresource/types" "github.com/aws/amazon-ecs-agent/agent/taskresource/volume" - utilsync "github.com/aws/amazon-ecs-agent/agent/utils/sync" "github.com/pkg/errors" "github.com/stretchr/testify/assert" @@ -84,7 +83,6 @@ func TestHandleDesiredStatusChangeSaveData(t *testing.T) { engine: &DockerTaskEngine{ dataClient: dataClient, }, - taskStopWG: utilsync.NewSequentialWaitGroup(), } mtask.handleDesiredStatusChange(tc.targetDesiredStatus, int64(1)) tasks, err := dataClient.GetTasks() diff --git a/agent/engine/task_manager_test.go b/agent/engine/task_manager_test.go index 931dbf8c7aa..1d5289d2a71 100644 --- a/agent/engine/task_manager_test.go +++ b/agent/engine/task_manager_test.go @@ -30,7 +30,6 @@ import ( "github.com/aws/amazon-ecs-agent/agent/taskresource" mock_taskresource "github.com/aws/amazon-ecs-agent/agent/taskresource/mocks" resourcestatus "github.com/aws/amazon-ecs-agent/agent/taskresource/status" - utilsync "github.com/aws/amazon-ecs-agent/agent/utils/sync" "github.com/aws/amazon-ecs-agent/agent/api" apicontainer "github.com/aws/amazon-ecs-agent/agent/api/container" @@ -831,6 +830,7 @@ func TestStartContainerTransitionsInvokesHandleContainerChange(t *testing.T) { containerChangeEventStream := eventstream.NewEventStream(eventStreamName, context.Background()) containerChangeEventStream.StartListening() + hostResourceManager := NewHostResourceManager(getTestHostResources()) stateChangeEvents := make(chan statechange.Event) task := &managedTask{ @@ -844,6 +844,7 @@ func TestStartContainerTransitionsInvokesHandleContainerChange(t *testing.T) { containerChangeEventStream: containerChangeEventStream, stateChangeEvents: stateChangeEvents, dataClient: data.NewNoopClient(), + hostResourceManager: &hostResourceManager, }, stateChangeEvents: stateChangeEvents, containerChangeEventStream: containerChangeEventStream, @@ -964,13 +965,15 @@ func TestWaitForContainerTransitionsForTerminalTask(t *testing.T) { func TestOnContainersUnableToTransitionStateForDesiredStoppedTask(t *testing.T) { stateChangeEvents := make(chan statechange.Event) + hostResourceManager := NewHostResourceManager(getTestHostResources()) task := &managedTask{ Task: &apitask.Task{ Containers: []*apicontainer.Container{}, DesiredStatusUnsafe: apitaskstatus.TaskStopped, }, engine: &DockerTaskEngine{ - stateChangeEvents: stateChangeEvents, + stateChangeEvents: stateChangeEvents, + hostResourceManager: &hostResourceManager, }, stateChangeEvents: stateChangeEvents, ctx: context.TODO(), @@ -1780,31 +1783,6 @@ func TestHandleContainerChangeUpdateMetadataRedundant(t *testing.T) { assert.Equal(t, timeNow, containerCreateTime) } -func TestWaitForHostResources(t *testing.T) { - taskStopWG := utilsync.NewSequentialWaitGroup() - taskStopWG.Add(1, 1) - ctx, cancel := context.WithCancel(context.Background()) - - mtask := &managedTask{ - ctx: ctx, - cancel: cancel, - taskStopWG: taskStopWG, - Task: &apitask.Task{ - StartSequenceNumber: 1, - }, - } - - var waitForHostResourcesWG sync.WaitGroup - waitForHostResourcesWG.Add(1) - go func() { - mtask.waitForHostResources() - waitForHostResourcesWG.Done() - }() - - taskStopWG.Done(1) - waitForHostResourcesWG.Wait() -} - func TestWaitForResourceTransition(t *testing.T) { task := &managedTask{ Task: &apitask.Task{ @@ -2195,3 +2173,56 @@ func TestContainerNextStateDependsStoppedContainer(t *testing.T) { }) } } + +// TestTaskWaitForHostResources tests task queuing behavior based on available host resources +func TestTaskWaitForHostResources(t *testing.T) { + ctx, cancel := context.WithCancel(context.TODO()) + defer cancel() + + // 1 vCPU available on host + hostResourceManager := NewHostResourceManager(getTestHostResources()) + taskEngine := &DockerTaskEngine{ + managedTasks: make(map[string]*managedTask), + monitorQueuedTaskEvent: make(chan struct{}, 1), + hostResourceManager: &hostResourceManager, + } + go taskEngine.monitorQueuedTasks(ctx) + // 3 tasks requesting 0.5 vCPUs each + tasks := []*apitask.Task{} + for i := 0; i < 3; i++ { + task := testdata.LoadTask("sleep5") + task.Arn = fmt.Sprintf("arn%d", i) + task.CPU = float64(0.5) + mtask := &managedTask{ + Task: task, + engine: taskEngine, + consumedHostResourceEvent: make(chan struct{}, 1), + } + tasks = append(tasks, task) + taskEngine.managedTasks[task.Arn] = mtask + } + + // acquire for host resources order arn0, arn1, arn2 + go func() { + taskEngine.managedTasks["arn0"].waitForHostResources() + taskEngine.managedTasks["arn1"].waitForHostResources() + taskEngine.managedTasks["arn2"].waitForHostResources() + }() + time.Sleep(500 * time.Millisecond) + + // Verify waiting queue is waiting at arn2 + topTask, err := taskEngine.topTask() + assert.NoError(t, err) + assert.Equal(t, topTask.Arn, "arn2") + + // Remove 1 task + taskResources := taskEngine.managedTasks["arn0"].ToHostResources() + taskEngine.hostResourceManager.release("arn0", taskResources) + taskEngine.wakeUpTaskQueueMonitor() + + time.Sleep(500 * time.Millisecond) + + // Verify arn2 got dequeued + topTask, err = taskEngine.topTask() + assert.Error(t, err) +} diff --git a/agent/utils/sync/sequential_waitgroup.go b/agent/utils/sync/sequential_waitgroup.go deleted file mode 100644 index 78b0e266196..00000000000 --- a/agent/utils/sync/sequential_waitgroup.go +++ /dev/null @@ -1,91 +0,0 @@ -// Copyright Amazon.com Inc. or its affiliates. All Rights Reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"). You may -// not use this file except in compliance with the License. A copy of the -// License is located at -// -// http://aws.amazon.com/apache2.0/ -// -// or in the "license" file accompanying this file. This file is distributed -// on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either -// express or implied. See the License for the specific language governing -// permissions and limitations under the License. - -// Package sync is an analogue to the stdlib sync package. -// It contains lowlevel synchonization primitives, but not quite as low level as 'sync' does -package sync - -import stdsync "sync" - -// A SequentialWaitGroup waits for a collection of goroutines to finish. Each -// goroutine may add itself to the waitgroup with 'Add', providing a sequence -// number. Each goroutine should then call 'Done' with its sequence number when finished. -// Elsewhere, 'Wait' can be used to wait for all groups at or below the -// provided sequence number to complete. -type SequentialWaitGroup struct { - mutex stdsync.Mutex - // Implement our own semaphore over using sync.WaitGroup so that we can safely GC our map - semaphores map[int64]int - change *stdsync.Cond -} - -func NewSequentialWaitGroup() *SequentialWaitGroup { - return &SequentialWaitGroup{ - semaphores: make(map[int64]int), - change: stdsync.NewCond(&stdsync.Mutex{}), - } -} - -// Add adds the given delta to the waitgroup at the given sequence -func (s *SequentialWaitGroup) Add(sequence int64, delta int) { - s.mutex.Lock() - defer s.mutex.Unlock() - - _, ok := s.semaphores[sequence] - if ok { - s.semaphores[sequence] += delta - } else { - s.semaphores[sequence] = delta - } - if s.semaphores[sequence] <= 0 { - delete(s.semaphores, sequence) - s.change.Broadcast() - } -} - -// Done decrements the waitgroup at the given sequence by one -func (s *SequentialWaitGroup) Done(sequence int64) { - s.mutex.Lock() - defer s.mutex.Unlock() - _, ok := s.semaphores[sequence] - if ok { - s.semaphores[sequence]-- - if s.semaphores[sequence] == 0 { - delete(s.semaphores, sequence) - s.change.Broadcast() - } - } -} - -// Wait waits for all waitgroups at or below the given sequence to complete. -// Please note that this is *INCLUSIVE* of the sequence -func (s *SequentialWaitGroup) Wait(sequence int64) { - waitOver := func() bool { - s.mutex.Lock() - defer s.mutex.Unlock() - for storedSequence := range s.semaphores { - if storedSequence <= sequence { - // At least one non-empty seqnum greater than ours; wait more - return false - } - } - return true - } - - s.change.L.Lock() - defer s.change.L.Unlock() - // Wake up to check if our wait is over after each element being deleted from the map - for !waitOver() { - s.change.Wait() - } -} diff --git a/agent/utils/sync/sequential_waitgroup_test.go b/agent/utils/sync/sequential_waitgroup_test.go deleted file mode 100644 index ff289f364c6..00000000000 --- a/agent/utils/sync/sequential_waitgroup_test.go +++ /dev/null @@ -1,70 +0,0 @@ -//go:build unit -// +build unit - -// Copyright Amazon.com Inc. or its affiliates. All Rights Reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"). You may -// not use this file except in compliance with the License. A copy of the -// License is located at -// -// http://aws.amazon.com/apache2.0/ -// -// or in the "license" file accompanying this file. This file is distributed -// on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either -// express or implied. See the License for the specific language governing -// permissions and limitations under the License. - -package sync - -import "testing" - -func TestSequentialWaitgroup(t *testing.T) { - wg := NewSequentialWaitGroup() - wg.Add(1, 1) - wg.Add(2, 1) - wg.Add(1, 1) - - // Wait for '0' should not fail, nothing for sequence numbers below it - wg.Wait(0) - - done := make(chan bool) - go func() { - wg.Done(1) - wg.Done(1) - wg.Wait(1) - wg.Done(2) - wg.Wait(2) - done <- true - }() - <-done -} - -func TestManyDones(t *testing.T) { - wg := NewSequentialWaitGroup() - - waitGroupCount := 10 - for i := 1; i < waitGroupCount; i++ { - wg.Add(int64(i), i) - } - - for i := 1; i < waitGroupCount; i++ { - wg.Wait(int64(i - 1)) - - isAwake := make(chan bool) - go func(i int64) { - wg.Wait(i) - isAwake <- true - }(int64(i)) - - for j := 0; j < i; j++ { - if j < i-1 { - select { - case <-isAwake: - t.Fatal("Should not be awake before all dones called") - default: - } - } - wg.Done(int64(i)) - } - } -}