Skip to content

Commit

Permalink
host resource manager initialization
Browse files Browse the repository at this point in the history
  • Loading branch information
Yiyuanzzz committed May 12, 2023
1 parent 0202ad2 commit 69e7dfe
Show file tree
Hide file tree
Showing 23 changed files with 345 additions and 64 deletions.
12 changes: 6 additions & 6 deletions agent/acs/update_handler/updater_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ func TestPerformUpdateWithUpdatesDisabled(t *testing.T) {
Reason: ptr("Updates are disabled").(*string),
}})

taskEngine := engine.NewTaskEngine(cfg, nil, nil, nil, nil, nil, nil, nil, nil, nil)
taskEngine := engine.NewTaskEngine(cfg, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil)
msg := &ecsacs.PerformUpdateMessage{
ClusterArn: ptr("cluster").(*string),
ContainerInstanceArn: ptr("containerInstance").(*string),
Expand Down Expand Up @@ -182,7 +182,7 @@ func TestFullUpdateFlow(t *testing.T) {

require.Equal(t, "update-tar-data", writtenFile.String(), "incorrect data written")

taskEngine := engine.NewTaskEngine(cfg, nil, nil, nil, nil, nil, nil, nil, nil, nil)
taskEngine := engine.NewTaskEngine(cfg, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil)
msg := &ecsacs.PerformUpdateMessage{
ClusterArn: ptr("cluster").(*string),
ContainerInstanceArn: ptr("containerInstance").(*string),
Expand Down Expand Up @@ -250,7 +250,7 @@ func TestUndownloadedUpdate(t *testing.T) {
MessageId: ptr("mid").(*string),
}})

taskEngine := engine.NewTaskEngine(cfg, nil, nil, nil, nil, nil, nil, nil, nil, nil)
taskEngine := engine.NewTaskEngine(cfg, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil)
msg := &ecsacs.PerformUpdateMessage{
ClusterArn: ptr("cluster").(*string),
ContainerInstanceArn: ptr("containerInstance").(*string),
Expand Down Expand Up @@ -308,7 +308,7 @@ func TestDuplicateUpdateMessagesWithSuccess(t *testing.T) {

require.Equal(t, "update-tar-data", writtenFile.String(), "incorrect data written")

taskEngine := engine.NewTaskEngine(cfg, nil, nil, nil, nil, nil, nil, nil, nil, nil)
taskEngine := engine.NewTaskEngine(cfg, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil)
msg := &ecsacs.PerformUpdateMessage{
ClusterArn: ptr("cluster").(*string),
ContainerInstanceArn: ptr("containerInstance").(*string),
Expand Down Expand Up @@ -377,7 +377,7 @@ func TestDuplicateUpdateMessagesWithFailure(t *testing.T) {

require.Equal(t, "update-tar-data", writtenFile.String(), "incorrect data written")

taskEngine := engine.NewTaskEngine(cfg, nil, nil, nil, nil, nil, nil, nil, nil, nil)
taskEngine := engine.NewTaskEngine(cfg, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil)
msg := &ecsacs.PerformUpdateMessage{
ClusterArn: ptr("cluster").(*string),
ContainerInstanceArn: ptr("containerInstance").(*string),
Expand Down Expand Up @@ -448,7 +448,7 @@ func TestNewerUpdateMessages(t *testing.T) {

require.Equal(t, "newer-update-tar-data", writtenFile.String(), "incorrect data written")

taskEngine := engine.NewTaskEngine(cfg, nil, nil, nil, nil, nil, nil, nil, nil, nil)
taskEngine := engine.NewTaskEngine(cfg, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil)
msg := &ecsacs.PerformUpdateMessage{
ClusterArn: ptr("cluster").(*string),
ContainerInstanceArn: ptr("containerInstance").(*string),
Expand Down
12 changes: 12 additions & 0 deletions agent/api/ecsclient/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -306,6 +306,18 @@ func (client *APIECSClient) getResources() ([]*ecs.Resource, error) {
return []*ecs.Resource{&cpuResource, &memResource, &portResource, &udpPortResource}, nil
}

func (client *APIECSClient) GetHostResources() (map[string]*ecs.Resource, error) {
resources, err := client.getResources()
if err != nil {
return nil, err
}
resourceMap := make(map[string]*ecs.Resource)
for _, resource := range resources {
resourceMap[*resource.Name] = resource
}
return resourceMap, nil
}

func getCpuAndMemory() (int64, int64) {
memInfo, err := system.ReadMemInfo()
mem := int64(0)
Expand Down
2 changes: 2 additions & 0 deletions agent/api/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@ type ECSClient interface {
// UpdateContainerInstancesState updates the given container Instance ID with
// the given status. Only valid statuses are ACTIVE and DRAINING.
UpdateContainerInstancesState(instanceARN, status string) error
// GetHostResources retrieves a map that map the resource name to the corresponding resource
GetHostResources() (map[string]*ecs.Resource, error)
}

// ECSSDK is an interface that specifies the subset of the AWS Go SDK's ECS
Expand Down
15 changes: 15 additions & 0 deletions agent/api/mocks/api_mocks.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

28 changes: 24 additions & 4 deletions agent/app/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -298,17 +298,36 @@ func (agent *ecsAgent) doStart(containerChangeEventStream *eventstream.EventStre
return exitcodes.ExitTerminal
}
}
numGPUs := int64(0)
if agent.cfg.GPUSupportEnabled {
err := agent.initializeGPUManager()
if err != nil {
seelog.Criticalf("Could not initialize Nvidia GPU Manager: %v", err)
return exitcodes.ExitError
}
// Find number of GPUs instance has
platformDevices := agent.getPlatformDevices()
for _, device := range platformDevices {
if *device.Type == ecs.PlatformDeviceTypeGpu {
numGPUs++
}
}
}

hostResources, err := client.GetHostResources()
if err != nil {
seelog.Critical("Unable to fetch host resources")
return exitcodes.ExitError
}
hostResources["GPU"] = &ecs.Resource{
Name: utils.Strptr("GPU"),
Type: utils.Strptr("INTEGER"),
IntegerValue: &numGPUs,
}

// Create the task engine
taskEngine, currentEC2InstanceID, err := agent.newTaskEngine(
containerChangeEventStream, credentialsManager, state, imageManager, execCmdMgr, agent.serviceconnectManager)
containerChangeEventStream, credentialsManager, state, imageManager, hostResources, execCmdMgr, agent.serviceconnectManager)
if err != nil {
seelog.Criticalf("Unable to initialize new task engine: %v", err)
return exitcodes.ExitTerminal
Expand Down Expand Up @@ -515,6 +534,7 @@ func (agent *ecsAgent) newTaskEngine(containerChangeEventStream *eventstream.Eve
credentialsManager credentials.Manager,
state dockerstate.TaskEngineState,
imageManager engine.ImageManager,
hostResources map[string]*ecs.Resource,
execCmdMgr execcmd.Manager,
serviceConnectManager engineserviceconnect.Manager) (engine.TaskEngine, string, error) {

Expand All @@ -523,11 +543,11 @@ func (agent *ecsAgent) newTaskEngine(containerChangeEventStream *eventstream.Eve
if !agent.cfg.Checkpoint.Enabled() {
seelog.Info("Checkpointing not enabled; a new container instance will be created each time the agent is run")
return engine.NewTaskEngine(agent.cfg, agent.dockerClient, credentialsManager,
containerChangeEventStream, imageManager, state,
containerChangeEventStream, imageManager, hostResources, state,
agent.metadataManager, agent.resourceFields, execCmdMgr, serviceConnectManager), "", nil
}

savedData, err := agent.loadData(containerChangeEventStream, credentialsManager, state, imageManager, execCmdMgr, serviceConnectManager)
savedData, err := agent.loadData(containerChangeEventStream, credentialsManager, state, imageManager, hostResources, execCmdMgr, serviceConnectManager)
if err != nil {
seelog.Criticalf("Error loading previously saved state: %v", err)
return nil, "", err
Expand All @@ -552,7 +572,7 @@ func (agent *ecsAgent) newTaskEngine(containerChangeEventStream *eventstream.Eve
state.Reset()
// Reset taskEngine; all the other values are still default
return engine.NewTaskEngine(agent.cfg, agent.dockerClient, credentialsManager,
containerChangeEventStream, imageManager, state, agent.metadataManager,
containerChangeEventStream, imageManager, hostResources, state, agent.metadataManager,
agent.resourceFields, execCmdMgr, serviceConnectManager), currentEC2InstanceID, nil
}

Expand Down
9 changes: 6 additions & 3 deletions agent/app/agent_compatibility_linux_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,8 @@ func TestCompatibilityEnabledSuccess(t *testing.T) {
defer cancel()

containerChangeEventStream := eventstream.NewEventStream("events", ctx)
_, _, err := agent.newTaskEngine(containerChangeEventStream, creds, dockerstate.NewTaskEngineState(), images, execCmdMgr, serviceConnectManager)
hostResources := getTestHostResources()
_, _, err := agent.newTaskEngine(containerChangeEventStream, creds, dockerstate.NewTaskEngineState(), images, hostResources, execCmdMgr, serviceConnectManager)

assert.NoError(t, err)
assert.True(t, cfg.TaskCPUMemLimit.Enabled())
Expand Down Expand Up @@ -106,7 +107,8 @@ func TestCompatibilityNotSetFail(t *testing.T) {
defer cancel()

containerChangeEventStream := eventstream.NewEventStream("events", ctx)
_, _, err := agent.newTaskEngine(containerChangeEventStream, creds, dockerstate.NewTaskEngineState(), images, execCmdMgr, serviceConnectManager)
hostResources := getTestHostResources()
_, _, err := agent.newTaskEngine(containerChangeEventStream, creds, dockerstate.NewTaskEngineState(), images, hostResources, execCmdMgr, serviceConnectManager)

assert.NoError(t, err)
assert.False(t, cfg.TaskCPUMemLimit.Enabled())
Expand Down Expand Up @@ -146,7 +148,8 @@ func TestCompatibilityExplicitlyEnabledFail(t *testing.T) {
defer cancel()

containerChangeEventStream := eventstream.NewEventStream("events", ctx)
_, _, err := agent.newTaskEngine(containerChangeEventStream, creds, dockerstate.NewTaskEngineState(), images, execCmdMgr, serviceConnectManager)
hostResources := getTestHostResources()
_, _, err := agent.newTaskEngine(containerChangeEventStream, creds, dockerstate.NewTaskEngineState(), images, hostResources, execCmdMgr, serviceConnectManager)

assert.Error(t, err)
}
Expand Down
84 changes: 78 additions & 6 deletions agent/app/agent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ import (
"github.com/aws/amazon-ecs-agent/agent/sighandlers/exitcodes"
"github.com/aws/amazon-ecs-agent/agent/statemanager"
mock_statemanager "github.com/aws/amazon-ecs-agent/agent/statemanager/mocks"
"github.com/aws/amazon-ecs-agent/agent/utils"
mock_loader "github.com/aws/amazon-ecs-agent/agent/utils/loader/mocks"
mock_mobypkgwrapper "github.com/aws/amazon-ecs-agent/agent/utils/mobypkgwrapper/mocks"
"github.com/aws/amazon-ecs-agent/agent/version"
Expand Down Expand Up @@ -78,6 +79,20 @@ var apiVersions = []dockerclient.DockerVersion{
dockerclient.Version_1_22,
dockerclient.Version_1_23}
var capabilities []*ecs.Attribute
var testHostCPU = int64(1024)
var testHostMEMORY = int64(1024)
var testHostResource = map[string]*ecs.Resource{
"CPU": &ecs.Resource{
Name: utils.Strptr("CPU"),
Type: utils.Strptr("INTEGER"),
IntegerValue: &testHostCPU,
},
"MEMORY": &ecs.Resource{
Name: utils.Strptr("MEMORY"),
Type: utils.Strptr("INTEGER"),
IntegerValue: &testHostMEMORY,
},
}

func setup(t *testing.T) (*gomock.Controller,
*mock_credentials.MockManager,
Expand Down Expand Up @@ -169,6 +184,7 @@ func TestDoStartNewTaskEngineError(t *testing.T) {
ec2MetadataClient := mock_ec2.NewMockEC2MetadataClient(ctrl)
gomock.InOrder(
dockerClient.EXPECT().SupportedVersions().Return(apiVersions),
client.EXPECT().GetHostResources().Return(testHostResource, nil),
saveableOptionFactory.EXPECT().AddSaveable("TaskEngine", gomock.Any()).Return(nil),
saveableOptionFactory.EXPECT().AddSaveable("ContainerInstanceArn", gomock.Any()).Return(nil),
saveableOptionFactory.EXPECT().AddSaveable("Cluster", gomock.Any()).Return(nil),
Expand Down Expand Up @@ -225,6 +241,7 @@ func TestDoStartRegisterContainerInstanceErrorTerminal(t *testing.T) {
mockServiceConnectManager.EXPECT().SetECSClient(gomock.Any(), gomock.Any()).AnyTimes()
gomock.InOrder(
dockerClient.EXPECT().SupportedVersions().Return(apiVersions),
client.EXPECT().GetHostResources().Return(testHostResource, nil),
mockCredentialsProvider.EXPECT().Retrieve().Return(aws_credentials.Value{}, nil),
dockerClient.EXPECT().SupportedVersions().Return(nil),
dockerClient.EXPECT().KnownVersions().Return(nil),
Expand Down Expand Up @@ -281,6 +298,7 @@ func TestDoStartRegisterContainerInstanceErrorNonTerminal(t *testing.T) {
mockServiceConnectManager.EXPECT().SetECSClient(gomock.Any(), gomock.Any()).AnyTimes()
gomock.InOrder(
dockerClient.EXPECT().SupportedVersions().Return(apiVersions),
client.EXPECT().GetHostResources().Return(testHostResource, nil),
mockCredentialsProvider.EXPECT().Retrieve().Return(aws_credentials.Value{}, nil),
dockerClient.EXPECT().SupportedVersions().Return(nil),
dockerClient.EXPECT().KnownVersions().Return(nil),
Expand Down Expand Up @@ -322,6 +340,7 @@ func TestDoStartWarmPoolsError(t *testing.T) {
mockEC2Metadata := mock_ec2.NewMockEC2MetadataClient(ctrl)
gomock.InOrder(
dockerClient.EXPECT().SupportedVersions().Return(apiVersions),
client.EXPECT().GetHostResources().Return(testHostResource, nil),
)

cfg := getTestConfig()
Expand Down Expand Up @@ -441,6 +460,7 @@ func testDoStartHappyPathWithConditions(t *testing.T, blackholed bool, warmPools

gomock.InOrder(
dockerClient.EXPECT().SupportedVersions().Return(apiVersions),
client.EXPECT().GetHostResources().Return(testHostResource, nil),
mockCredentialsProvider.EXPECT().Retrieve().Return(aws_credentials.Value{}, nil),
dockerClient.EXPECT().SupportedVersions().Return(nil),
dockerClient.EXPECT().KnownVersions().Return(nil),
Expand Down Expand Up @@ -562,8 +582,10 @@ func TestNewTaskEngineRestoreFromCheckpointNoEC2InstanceIDToLoadHappyPath(t *tes
saveableOptionFactory: saveableOptionFactory,
}

hostResources := getTestHostResources()

_, instanceID, err := agent.newTaskEngine(eventstream.NewEventStream("events", ctx),
credentialsManager, dockerstate.NewTaskEngineState(), imageManager, execCmdMgr, serviceConnectManager)
credentialsManager, dockerstate.NewTaskEngineState(), imageManager, hostResources, execCmdMgr, serviceConnectManager)
assert.NoError(t, err)
assert.Equal(t, expectedInstanceID, instanceID)
assert.Equal(t, "prev-container-inst", agent.containerInstanceARN)
Expand Down Expand Up @@ -624,9 +646,10 @@ func TestNewTaskEngineRestoreFromCheckpointPreviousEC2InstanceIDLoadedHappyPath(
ec2MetadataClient: ec2MetadataClient,
saveableOptionFactory: saveableOptionFactory,
}
hostResources := getTestHostResources()

_, instanceID, err := agent.newTaskEngine(eventstream.NewEventStream("events", ctx),
credentialsManager, dockerstate.NewTaskEngineState(), imageManager, execCmdMgr, serviceConnectManager)
credentialsManager, dockerstate.NewTaskEngineState(), imageManager, hostResources, execCmdMgr, serviceConnectManager)
assert.NoError(t, err)
assert.Equal(t, expectedInstanceID, instanceID)
assert.NotEqual(t, "prev-container-inst", agent.containerInstanceARN)
Expand Down Expand Up @@ -686,8 +709,10 @@ func TestNewTaskEngineRestoreFromCheckpointClusterIDMismatch(t *testing.T) {
saveableOptionFactory: saveableOptionFactory,
}

hostResources := getTestHostResources()

_, _, err := agent.newTaskEngine(eventstream.NewEventStream("events", ctx),
credentialsManager, dockerstate.NewTaskEngineState(), imageManager, execCmdMgr, serviceConnectManager)
credentialsManager, dockerstate.NewTaskEngineState(), imageManager, hostResources, execCmdMgr, serviceConnectManager)
assert.Error(t, err)
assert.IsType(t, clusterMismatchError{}, err)
}
Expand Down Expand Up @@ -731,8 +756,10 @@ func TestNewTaskEngineRestoreFromCheckpointNewStateManagerError(t *testing.T) {
saveableOptionFactory: saveableOptionFactory,
}

hostResources := getTestHostResources()

_, _, err := agent.newTaskEngine(eventstream.NewEventStream("events", ctx),
credentialsManager, dockerstate.NewTaskEngineState(), imageManager, execCmdMgr, serviceConnectManager)
credentialsManager, dockerstate.NewTaskEngineState(), imageManager, hostResources, execCmdMgr, serviceConnectManager)
assert.Error(t, err)
assert.False(t, isTransient(err))
}
Expand Down Expand Up @@ -777,8 +804,10 @@ func TestNewTaskEngineRestoreFromCheckpointStateLoadError(t *testing.T) {
saveableOptionFactory: saveableOptionFactory,
}

hostResources := getTestHostResources()

_, _, err := agent.newTaskEngine(eventstream.NewEventStream("events", ctx),
credentialsManager, dockerstate.NewTaskEngineState(), imageManager, execCmdMgr, serviceConnectManager)
credentialsManager, dockerstate.NewTaskEngineState(), imageManager, hostResources, execCmdMgr, serviceConnectManager)
assert.Error(t, err)
assert.False(t, isTransient(err))
}
Expand Down Expand Up @@ -816,8 +845,10 @@ func TestNewTaskEngineRestoreFromCheckpoint(t *testing.T) {
}

state := dockerstate.NewTaskEngineState()
hostResources := getTestHostResources()

_, instanceID, err := agent.newTaskEngine(eventstream.NewEventStream("events", ctx),
credentialsManager, state, imageManager, execCmdMgr, serviceConnectManager)
credentialsManager, state, imageManager, hostResources, execCmdMgr, serviceConnectManager)
assert.NoError(t, err)
assert.Equal(t, testEC2InstanceID, instanceID)

Expand Down Expand Up @@ -1346,6 +1377,7 @@ func TestRegisterContainerInstanceInvalidParameterTerminalError(t *testing.T) {
mockServiceConnectManager.EXPECT().SetECSClient(gomock.Any(), gomock.Any()).AnyTimes()
gomock.InOrder(
dockerClient.EXPECT().SupportedVersions().Return(apiVersions),
client.EXPECT().GetHostResources().Return(testHostResource, nil),
mockCredentialsProvider.EXPECT().Retrieve().Return(aws_credentials.Value{}, nil),
dockerClient.EXPECT().SupportedVersions().Return(nil),
dockerClient.EXPECT().KnownVersions().Return(nil),
Expand Down Expand Up @@ -1637,6 +1669,46 @@ func getTestConfig() config.Config {
return cfg
}

func getTestHostResources() map[string]*ecs.Resource {
hostResources := make(map[string]*ecs.Resource)
CPUs := int64(1024)
hostResources["CPU"] = &ecs.Resource{
Name: utils.Strptr("CPU"),
Type: utils.Strptr("INTEGER"),
IntegerValue: &CPUs,
}
//MEMORY
memory := int64(1024)
hostResources["MEMORY"] = &ecs.Resource{
Name: utils.Strptr("MEMORY"),
Type: utils.Strptr("INTEGER"),
IntegerValue: &memory,
}
//PORTS
ports := []*string{}
hostResources["PORTS"] = &ecs.Resource{
Name: utils.Strptr("PORTS"),
Type: utils.Strptr("STRINGSET"),
StringSetValue: ports,
}

//PORTS_UDP
ports_udp := []*string{}
hostResources["PORTS_UDP"] = &ecs.Resource{
Name: utils.Strptr("PORTS_UDP"),
Type: utils.Strptr("STRINGSET"),
StringSetValue: ports_udp,
}
//GPUs
numGPUs := int64(3)
hostResources["GPU"] = &ecs.Resource{
Name: utils.Strptr("GPU"),
Type: utils.Strptr("INTEGER"),
IntegerValue: &numGPUs,
}
return hostResources
}

func newTestDataClient(t *testing.T) data.Client {
testDir := t.TempDir()

Expand Down
Loading

0 comments on commit 69e7dfe

Please sign in to comment.