diff --git a/client/allocrunner/state/state.go b/client/allocrunner/state/state.go index 9754e369c49..5951d61a900 100644 --- a/client/allocrunner/state/state.go +++ b/client/allocrunner/state/state.go @@ -59,3 +59,13 @@ func (s *State) Copy() *State { TaskStates: taskStates, } } + +// ClientTerminalStatus returns if the client status is terminal and will no longer transition +func (a *State) ClientTerminalStatus() bool { + switch a.ClientStatus { + case structs.AllocClientStatusComplete, structs.AllocClientStatusFailed, structs.AllocClientStatusLost: + return true + default: + return false + } +} diff --git a/client/client.go b/client/client.go index c8bed1e95a2..d7fb839de49 100644 --- a/client/client.go +++ b/client/client.go @@ -14,11 +14,11 @@ import ( "sync" "time" - metrics "github.com/armon/go-metrics" + "github.com/armon/go-metrics" consulapi "github.com/hashicorp/consul/api" "github.com/hashicorp/consul/lib" - hclog "github.com/hashicorp/go-hclog" - multierror "github.com/hashicorp/go-multierror" + "github.com/hashicorp/go-hclog" + "github.com/hashicorp/go-multierror" "github.com/hashicorp/nomad/client/allocdir" "github.com/hashicorp/nomad/client/allocrunner" "github.com/hashicorp/nomad/client/allocrunner/interfaces" @@ -2709,11 +2709,11 @@ func (c *Client) getAllocatedResources(selfNode *structs.Node) *structs.Comparab } // Sum the allocated resources - allocs := c.allAllocs() var allocated structs.ComparableResources allocatedDeviceMbits := make(map[string]int) - for _, alloc := range allocs { - if alloc.TerminalStatus() { + for _, ar := range c.getAllocRunners() { + alloc := ar.Alloc() + if alloc.ServerTerminalStatus() || ar.AllocState().ClientTerminalStatus() { continue } @@ -2760,17 +2760,6 @@ func (c *Client) getAllocatedResources(selfNode *structs.Node) *structs.Comparab return &allocated } -// allAllocs returns all the allocations managed by the client -func (c *Client) allAllocs() map[string]*structs.Allocation { - ars := c.getAllocRunners() - allocs := make(map[string]*structs.Allocation, len(ars)) - for _, ar := range ars { - a := ar.Alloc() - allocs[a.ID] = a - } - return allocs -} - // GetTaskEventHandler returns an event handler for the given allocID and task name func (c *Client) GetTaskEventHandler(allocID, taskName string) drivermanager.EventHandler { c.allocLock.RLock() diff --git a/client/client_test.go b/client/client_test.go index ea034ddf58f..9a9428bad0c 100644 --- a/client/client_test.go +++ b/client/client_test.go @@ -9,7 +9,7 @@ import ( "testing" "time" - memdb "github.com/hashicorp/go-memdb" + "github.com/hashicorp/go-memdb" "github.com/hashicorp/nomad/client/config" consulApi "github.com/hashicorp/nomad/client/consul" "github.com/hashicorp/nomad/client/fingerprint" @@ -26,7 +26,7 @@ import ( "github.com/hashicorp/nomad/testutil" "github.com/stretchr/testify/assert" - hclog "github.com/hashicorp/go-hclog" + "github.com/hashicorp/go-hclog" cstate "github.com/hashicorp/nomad/client/state" ctestutil "github.com/hashicorp/nomad/client/testutil" "github.com/stretchr/testify/require" @@ -893,7 +893,7 @@ func TestClient_BlockedAllocations(t *testing.T) { t.Fatalf("err: %v", err) } - // Enusre that the chained allocation is being tracked as blocked + // Ensure that the chained allocation is being tracked as blocked testutil.WaitForResult(func() (bool, error) { ar := c1.getAllocRunners()[alloc2.ID] if ar == nil { @@ -1414,6 +1414,116 @@ func TestClient_computeAllocatedDeviceStats(t *testing.T) { assert.EqualValues(t, expected, result) } +func TestClient_getAllocatedResources(t *testing.T) { + t.Parallel() + require := require.New(t) + client, cleanup := TestClient(t, nil) + defer cleanup() + + allocStops := mock.BatchAlloc() + allocStops.Job.TaskGroups[0].Count = 1 + allocStops.Job.TaskGroups[0].Tasks[0].Driver = "mock_driver" + allocStops.Job.TaskGroups[0].Tasks[0].Config = map[string]interface{}{ + "run_for": "1ms", + "exit_code": "0", + } + allocStops.Job.TaskGroups[0].RestartPolicy.Attempts = 0 + allocStops.AllocatedResources.Shared.DiskMB = 64 + allocStops.AllocatedResources.Tasks["web"].Cpu = structs.AllocatedCpuResources{CpuShares: 64} + allocStops.AllocatedResources.Tasks["web"].Memory = structs.AllocatedMemoryResources{MemoryMB: 64} + require.Nil(client.addAlloc(allocStops, "")) + + allocFails := mock.BatchAlloc() + allocFails.Job.TaskGroups[0].Count = 1 + allocFails.Job.TaskGroups[0].Tasks[0].Driver = "mock_driver" + allocFails.Job.TaskGroups[0].Tasks[0].Config = map[string]interface{}{ + "run_for": "1ms", + "exit_code": "1", + } + allocFails.Job.TaskGroups[0].RestartPolicy.Attempts = 0 + allocFails.AllocatedResources.Shared.DiskMB = 128 + allocFails.AllocatedResources.Tasks["web"].Cpu = structs.AllocatedCpuResources{CpuShares: 128} + allocFails.AllocatedResources.Tasks["web"].Memory = structs.AllocatedMemoryResources{MemoryMB: 128} + require.Nil(client.addAlloc(allocFails, "")) + + allocRuns := mock.Alloc() + allocRuns.Job.TaskGroups[0].Count = 1 + allocRuns.Job.TaskGroups[0].Tasks[0].Driver = "mock_driver" + allocRuns.Job.TaskGroups[0].Tasks[0].Config = map[string]interface{}{ + "run_for": "3s", + } + allocRuns.AllocatedResources.Shared.DiskMB = 256 + allocRuns.AllocatedResources.Tasks["web"].Cpu = structs.AllocatedCpuResources{CpuShares: 256} + allocRuns.AllocatedResources.Tasks["web"].Memory = structs.AllocatedMemoryResources{MemoryMB: 256} + require.Nil(client.addAlloc(allocRuns, "")) + + allocPends := mock.Alloc() + allocPends.Job.TaskGroups[0].Count = 1 + allocPends.Job.TaskGroups[0].Tasks[0].Driver = "mock_driver" + allocPends.Job.TaskGroups[0].Tasks[0].Config = map[string]interface{}{ + "run_for": "5s", + "start_block_for": "10s", + } + allocPends.AllocatedResources.Shared.DiskMB = 512 + allocPends.AllocatedResources.Tasks["web"].Cpu = structs.AllocatedCpuResources{CpuShares: 512} + allocPends.AllocatedResources.Tasks["web"].Memory = structs.AllocatedMemoryResources{MemoryMB: 512} + require.Nil(client.addAlloc(allocPends, "")) + + // wait for allocStops to stop running and for allocRuns to be pending/running + testutil.WaitForResult(func() (bool, error) { + as, err := client.GetAllocState(allocPends.ID) + if err != nil { + return false, err + } else if as.ClientStatus != structs.AllocClientStatusPending { + return false, fmt.Errorf("allocPends not yet pending: %#v", as) + } + + as, err = client.GetAllocState(allocRuns.ID) + if as.ClientStatus != structs.AllocClientStatusRunning { + return false, fmt.Errorf("allocRuns not yet running: %#v", as) + } else if err != nil { + return false, err + } + + as, err = client.GetAllocState(allocStops.ID) + if err != nil { + return false, err + } else if as.ClientStatus != structs.AllocClientStatusComplete { + return false, fmt.Errorf("allocStops not yet complete: %#v", as) + } + + as, err = client.GetAllocState(allocFails.ID) + if err != nil { + return false, err + } else if as.ClientStatus != structs.AllocClientStatusFailed { + return false, fmt.Errorf("allocFails not yet failed: %#v", as) + } + + return true, nil + }, func(err error) { + require.NoError(err) + }) + + result := client.getAllocatedResources(client.config.Node) + + expected := structs.ComparableResources{ + Flattened: structs.AllocatedTaskResources{ + Cpu: structs.AllocatedCpuResources{ + CpuShares: 768, + }, + Memory: structs.AllocatedMemoryResources{ + MemoryMB: 768, + }, + Networks: nil, + }, + Shared: structs.AllocatedSharedResources{ + DiskMB: 768, + }, + } + + assert.EqualValues(t, expected, *result) +} + func TestClient_updateNodeFromDriverUpdatesAll(t *testing.T) { t.Parallel() client, cleanup := TestClient(t, nil)