Skip to content

Commit

Permalink
Merge pull request #3684 from Yiyuanzzz/feature/task-resource-accounting
Browse files Browse the repository at this point in the history
Host Resource Manager initialization
  • Loading branch information
Yiyuanzzz authored May 16, 2023
2 parents 0202ad2 + fb28a48 commit 6caa2c2
Show file tree
Hide file tree
Showing 23 changed files with 352 additions and 64 deletions.
12 changes: 6 additions & 6 deletions agent/acs/update_handler/updater_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ func TestPerformUpdateWithUpdatesDisabled(t *testing.T) {
Reason: ptr("Updates are disabled").(*string),
}})

taskEngine := engine.NewTaskEngine(cfg, nil, nil, nil, nil, nil, nil, nil, nil, nil)
taskEngine := engine.NewTaskEngine(cfg, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil)
msg := &ecsacs.PerformUpdateMessage{
ClusterArn: ptr("cluster").(*string),
ContainerInstanceArn: ptr("containerInstance").(*string),
Expand Down Expand Up @@ -182,7 +182,7 @@ func TestFullUpdateFlow(t *testing.T) {

require.Equal(t, "update-tar-data", writtenFile.String(), "incorrect data written")

taskEngine := engine.NewTaskEngine(cfg, nil, nil, nil, nil, nil, nil, nil, nil, nil)
taskEngine := engine.NewTaskEngine(cfg, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil)
msg := &ecsacs.PerformUpdateMessage{
ClusterArn: ptr("cluster").(*string),
ContainerInstanceArn: ptr("containerInstance").(*string),
Expand Down Expand Up @@ -250,7 +250,7 @@ func TestUndownloadedUpdate(t *testing.T) {
MessageId: ptr("mid").(*string),
}})

taskEngine := engine.NewTaskEngine(cfg, nil, nil, nil, nil, nil, nil, nil, nil, nil)
taskEngine := engine.NewTaskEngine(cfg, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil)
msg := &ecsacs.PerformUpdateMessage{
ClusterArn: ptr("cluster").(*string),
ContainerInstanceArn: ptr("containerInstance").(*string),
Expand Down Expand Up @@ -308,7 +308,7 @@ func TestDuplicateUpdateMessagesWithSuccess(t *testing.T) {

require.Equal(t, "update-tar-data", writtenFile.String(), "incorrect data written")

taskEngine := engine.NewTaskEngine(cfg, nil, nil, nil, nil, nil, nil, nil, nil, nil)
taskEngine := engine.NewTaskEngine(cfg, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil)
msg := &ecsacs.PerformUpdateMessage{
ClusterArn: ptr("cluster").(*string),
ContainerInstanceArn: ptr("containerInstance").(*string),
Expand Down Expand Up @@ -377,7 +377,7 @@ func TestDuplicateUpdateMessagesWithFailure(t *testing.T) {

require.Equal(t, "update-tar-data", writtenFile.String(), "incorrect data written")

taskEngine := engine.NewTaskEngine(cfg, nil, nil, nil, nil, nil, nil, nil, nil, nil)
taskEngine := engine.NewTaskEngine(cfg, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil)
msg := &ecsacs.PerformUpdateMessage{
ClusterArn: ptr("cluster").(*string),
ContainerInstanceArn: ptr("containerInstance").(*string),
Expand Down Expand Up @@ -448,7 +448,7 @@ func TestNewerUpdateMessages(t *testing.T) {

require.Equal(t, "newer-update-tar-data", writtenFile.String(), "incorrect data written")

taskEngine := engine.NewTaskEngine(cfg, nil, nil, nil, nil, nil, nil, nil, nil, nil)
taskEngine := engine.NewTaskEngine(cfg, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil)
msg := &ecsacs.PerformUpdateMessage{
ClusterArn: ptr("cluster").(*string),
ContainerInstanceArn: ptr("containerInstance").(*string),
Expand Down
14 changes: 14 additions & 0 deletions agent/api/ecsclient/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -306,6 +306,20 @@ func (client *APIECSClient) getResources() ([]*ecs.Resource, error) {
return []*ecs.Resource{&cpuResource, &memResource, &portResource, &udpPortResource}, nil
}

// GetHostResources calling getHostResources to get a list of CPU, MEMORY, PORTS and PORTS_UPD resources
// and return a resourceMap that map the resource name to each resource
func (client *APIECSClient) GetHostResources() (map[string]*ecs.Resource, error) {
resources, err := client.getResources()
if err != nil {
return nil, err
}
resourceMap := make(map[string]*ecs.Resource)
for _, resource := range resources {
resourceMap[*resource.Name] = resource
}
return resourceMap, nil
}

func getCpuAndMemory() (int64, int64) {
memInfo, err := system.ReadMemInfo()
mem := int64(0)
Expand Down
2 changes: 2 additions & 0 deletions agent/api/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@ type ECSClient interface {
// UpdateContainerInstancesState updates the given container Instance ID with
// the given status. Only valid statuses are ACTIVE and DRAINING.
UpdateContainerInstancesState(instanceARN, status string) error
// GetHostResources retrieves a map that map the resource name to the corresponding resource
GetHostResources() (map[string]*ecs.Resource, error)
}

// ECSSDK is an interface that specifies the subset of the AWS Go SDK's ECS
Expand Down
15 changes: 15 additions & 0 deletions agent/api/mocks/api_mocks.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

28 changes: 24 additions & 4 deletions agent/app/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -298,17 +298,36 @@ func (agent *ecsAgent) doStart(containerChangeEventStream *eventstream.EventStre
return exitcodes.ExitTerminal
}
}
hostResources, err := client.GetHostResources()
if err != nil {
seelog.Critical("Unable to fetch host resources")
return exitcodes.ExitError
}
numGPUs := int64(0)
if agent.cfg.GPUSupportEnabled {
err := agent.initializeGPUManager()
if err != nil {
seelog.Criticalf("Could not initialize Nvidia GPU Manager: %v", err)
return exitcodes.ExitError
}
// Find number of GPUs instance has
platformDevices := agent.getPlatformDevices()
for _, device := range platformDevices {
if *device.Type == ecs.PlatformDeviceTypeGpu {
numGPUs++
}
}
}

hostResources["GPU"] = &ecs.Resource{
Name: utils.Strptr("GPU"),
Type: utils.Strptr("INTEGER"),
IntegerValue: &numGPUs,
}

// Create the task engine
taskEngine, currentEC2InstanceID, err := agent.newTaskEngine(
containerChangeEventStream, credentialsManager, state, imageManager, execCmdMgr, agent.serviceconnectManager)
containerChangeEventStream, credentialsManager, state, imageManager, hostResources, execCmdMgr, agent.serviceconnectManager)
if err != nil {
seelog.Criticalf("Unable to initialize new task engine: %v", err)
return exitcodes.ExitTerminal
Expand Down Expand Up @@ -515,6 +534,7 @@ func (agent *ecsAgent) newTaskEngine(containerChangeEventStream *eventstream.Eve
credentialsManager credentials.Manager,
state dockerstate.TaskEngineState,
imageManager engine.ImageManager,
hostResources map[string]*ecs.Resource,
execCmdMgr execcmd.Manager,
serviceConnectManager engineserviceconnect.Manager) (engine.TaskEngine, string, error) {

Expand All @@ -523,11 +543,11 @@ func (agent *ecsAgent) newTaskEngine(containerChangeEventStream *eventstream.Eve
if !agent.cfg.Checkpoint.Enabled() {
seelog.Info("Checkpointing not enabled; a new container instance will be created each time the agent is run")
return engine.NewTaskEngine(agent.cfg, agent.dockerClient, credentialsManager,
containerChangeEventStream, imageManager, state,
containerChangeEventStream, imageManager, hostResources, state,
agent.metadataManager, agent.resourceFields, execCmdMgr, serviceConnectManager), "", nil
}

savedData, err := agent.loadData(containerChangeEventStream, credentialsManager, state, imageManager, execCmdMgr, serviceConnectManager)
savedData, err := agent.loadData(containerChangeEventStream, credentialsManager, state, imageManager, hostResources, execCmdMgr, serviceConnectManager)
if err != nil {
seelog.Criticalf("Error loading previously saved state: %v", err)
return nil, "", err
Expand All @@ -552,7 +572,7 @@ func (agent *ecsAgent) newTaskEngine(containerChangeEventStream *eventstream.Eve
state.Reset()
// Reset taskEngine; all the other values are still default
return engine.NewTaskEngine(agent.cfg, agent.dockerClient, credentialsManager,
containerChangeEventStream, imageManager, state, agent.metadataManager,
containerChangeEventStream, imageManager, hostResources, state, agent.metadataManager,
agent.resourceFields, execCmdMgr, serviceConnectManager), currentEC2InstanceID, nil
}

Expand Down
9 changes: 6 additions & 3 deletions agent/app/agent_compatibility_linux_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,8 @@ func TestCompatibilityEnabledSuccess(t *testing.T) {
defer cancel()

containerChangeEventStream := eventstream.NewEventStream("events", ctx)
_, _, err := agent.newTaskEngine(containerChangeEventStream, creds, dockerstate.NewTaskEngineState(), images, execCmdMgr, serviceConnectManager)
hostResources := getTestHostResources()
_, _, err := agent.newTaskEngine(containerChangeEventStream, creds, dockerstate.NewTaskEngineState(), images, hostResources, execCmdMgr, serviceConnectManager)

assert.NoError(t, err)
assert.True(t, cfg.TaskCPUMemLimit.Enabled())
Expand Down Expand Up @@ -106,7 +107,8 @@ func TestCompatibilityNotSetFail(t *testing.T) {
defer cancel()

containerChangeEventStream := eventstream.NewEventStream("events", ctx)
_, _, err := agent.newTaskEngine(containerChangeEventStream, creds, dockerstate.NewTaskEngineState(), images, execCmdMgr, serviceConnectManager)
hostResources := getTestHostResources()
_, _, err := agent.newTaskEngine(containerChangeEventStream, creds, dockerstate.NewTaskEngineState(), images, hostResources, execCmdMgr, serviceConnectManager)

assert.NoError(t, err)
assert.False(t, cfg.TaskCPUMemLimit.Enabled())
Expand Down Expand Up @@ -146,7 +148,8 @@ func TestCompatibilityExplicitlyEnabledFail(t *testing.T) {
defer cancel()

containerChangeEventStream := eventstream.NewEventStream("events", ctx)
_, _, err := agent.newTaskEngine(containerChangeEventStream, creds, dockerstate.NewTaskEngineState(), images, execCmdMgr, serviceConnectManager)
hostResources := getTestHostResources()
_, _, err := agent.newTaskEngine(containerChangeEventStream, creds, dockerstate.NewTaskEngineState(), images, hostResources, execCmdMgr, serviceConnectManager)

assert.Error(t, err)
}
Expand Down
Loading

0 comments on commit 6caa2c2

Please sign in to comment.