Skip to content

Commit

Permalink
client: de-duplicate alloc updates and gate during restore
Browse files Browse the repository at this point in the history
When client nodes are restarted, all allocations that have been scheduled on the
node have their modify index updated, including terminal allocations. There are
several contributing factors:

* The `allocSync` method that updates the servers isn't gated on first contact
  with the servers. This means that if a server updates the desired state while
  the client is down, the `allocSync` races with the `Node.ClientGetAlloc`
  RPC. This will typically result in the client updating the server with "running"
  and then immediately thereafter "complete".

* The `allocSync` method unconditionally sends the `Node.UpdateAlloc` RPC even
  if it's possible to assert that the server has definitely seen the client
  state. The allocrunner may queue-up updates even if we gate sending them. So
  then we end up with a race between the allocrunner updating its internal state
  to overwrite the previous update and `allocSync` sending the bogus or duplicate
  update.

This changeset adds tracking of server-acknowledged state to the
allocrunner. This state gets checked in the `allocSync` before adding the update
to the batch, and updated when `Node.UpdateAlloc` returns successfully. To
implement this we need to be able to equality-check the updates against the last
acknowledged state. We also need to add the last acknowledged state to the
client state DB, otherwise we'd drop unacknowledged updates across restarts.

The client restart test has been expanded to cover a variety of allocation
states, including allocs stopped before shutdown, allocs stopped by the server
while the client is down, and allocs that have been completely GC'd on the
server while the client is down. I've also bench tested scenarios where the task
workload is killed while the client is down, resulting in a failed restore.

Fixes #16381
  • Loading branch information
tgross committed May 10, 2023
1 parent 0c1b2a9 commit ee9c41b
Show file tree
Hide file tree
Showing 12 changed files with 632 additions and 87 deletions.
3 changes: 3 additions & 0 deletions .changelog/17074.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```release-note:improvement
client: de-duplicate allocation client status updates and prevent allocation client status updates from being sent until clients have first synchronized with the server
```
57 changes: 55 additions & 2 deletions client/allocrunner/alloc_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (

log "github.com/hashicorp/go-hclog"
multierror "github.com/hashicorp/go-multierror"
"golang.org/x/exp/maps"

"github.com/hashicorp/nomad/client/allocdir"
"github.com/hashicorp/nomad/client/allocrunner/interfaces"
Expand Down Expand Up @@ -123,6 +124,10 @@ type allocRunner struct {
state *state.State
stateLock sync.RWMutex

// lastAcknowledgedState is the alloc runner state that was last
// acknowledged by the server (may lag behind ar.state)
lastAcknowledgedState *state.State

stateDB cstate.StateDB

// allocDir is used to build the allocations directory structure.
Expand Down Expand Up @@ -738,8 +743,9 @@ func (ar *allocRunner) killTasks() map[string]*structs.TaskState {
return states
}

// clientAlloc takes in the task states and returns an Allocation populated
// with Client specific fields
// clientAlloc takes in the task states and returns an Allocation populated with
// Client specific fields. Note: this mutates the allocRunner's state to store
// the taskStates!
func (ar *allocRunner) clientAlloc(taskStates map[string]*structs.TaskState) *structs.Allocation {
ar.stateLock.Lock()
defer ar.stateLock.Unlock()
Expand Down Expand Up @@ -1394,3 +1400,50 @@ func (ar *allocRunner) GetTaskDriverCapabilities(taskName string) (*drivers.Capa

return tr.DriverCapabilities()
}

// AcknowledgeState is called by the client's alloc sync when a given client
// state has been acknowledged by the server
func (ar *allocRunner) AcknowledgeState(a *state.State) {
ar.stateLock.Lock()
defer ar.stateLock.Unlock()
ar.lastAcknowledgedState = a
ar.persistLastAcknowledgedState(a)
}

// persistLastAcknowledgedState stores the last client state acknowledged by the server
func (ar *allocRunner) persistLastAcknowledgedState(a *state.State) {
if err := ar.stateDB.PutAcknowledgedState(ar.id, a); err != nil {
// While any persistence errors are very bad, the worst case scenario
// for failing to persist last acknowledged state is that if the agent
// is restarted it will send the update again.
ar.logger.Error("error storing acknowledged allocation status", "error", err)
}
}

// LastAcknowledgedStateIsCurrent returns true if the current state matches the
// state that was last acknowledged from a server update. This is called from
// the client in the same goroutine that called AcknowledgeState so that we
// can't get a TOCTOU error.
func (ar *allocRunner) LastAcknowledgedStateIsCurrent(a *structs.Allocation) bool {
ar.stateLock.RLock()
defer ar.stateLock.RUnlock()

last := ar.lastAcknowledgedState
if last == nil {
return false
}

switch {
case last.ClientStatus != a.ClientStatus:
return false
case last.ClientDescription != a.ClientDescription:
return false
case !last.DeploymentStatus.Equal(a.DeploymentStatus):
return false
case !last.NetworkStatus.Equal(a.NetworkStatus):
return false
}
return maps.EqualFunc(last.TaskStates, a.TaskStates, func(st, o *structs.TaskState) bool {
return st.Equal(o)
})
}
64 changes: 61 additions & 3 deletions client/allocrunner/alloc_runner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,13 @@ import (

"github.com/hashicorp/consul/api"
multierror "github.com/hashicorp/go-multierror"
"github.com/shoenig/test/must"
"github.com/shoenig/test/wait"
"github.com/stretchr/testify/require"

"github.com/hashicorp/nomad/ci"
"github.com/hashicorp/nomad/client/allochealth"
arstate "github.com/hashicorp/nomad/client/allocrunner/state"
"github.com/hashicorp/nomad/client/allocrunner/tasklifecycle"
"github.com/hashicorp/nomad/client/allocrunner/taskrunner"
"github.com/hashicorp/nomad/client/allocwatcher"
Expand All @@ -26,9 +31,6 @@ import (
"github.com/hashicorp/nomad/nomad/mock"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/hashicorp/nomad/testutil"
"github.com/shoenig/test/must"
"github.com/shoenig/test/wait"
"github.com/stretchr/testify/require"
)

// destroy does a blocking destroy on an alloc runner
Expand Down Expand Up @@ -2443,3 +2445,59 @@ func TestAllocRunner_PreKill_RunOnDone(t *testing.T) {
wait.Gap(500*time.Millisecond),
))
}

func TestAllocRunner_LastAcknowledgedStateIsCurrent(t *testing.T) {
ci.Parallel(t)

alloc := mock.Alloc()
task := alloc.Job.TaskGroups[0].Tasks[0]
task.Driver = "mock_driver"
task.Config = map[string]interface{}{"run_for": "2ms"}
alloc.DesiredStatus = "stop"

conf, cleanup := testAllocRunnerConfig(t, alloc.Copy())
t.Cleanup(cleanup)

ar, err := NewAllocRunner(conf)
must.NoError(t, err)

ar.SetNetworkStatus(&structs.AllocNetworkStatus{
InterfaceName: "eth0",
Address: "192.168.1.1",
DNS: &structs.DNSConfig{},
})

calloc := ar.clientAlloc(map[string]*structs.TaskState{})
ar.AcknowledgeState(&arstate.State{
ClientStatus: calloc.ClientStatus,
ClientDescription: calloc.ClientDescription,
DeploymentStatus: calloc.DeploymentStatus,
TaskStates: calloc.TaskStates,
NetworkStatus: calloc.NetworkStatus,
})

must.True(t, ar.LastAcknowledgedStateIsCurrent(calloc))

// clientAlloc mutates the state, so verify this doesn't break the check
// without state having been updated
calloc = ar.clientAlloc(map[string]*structs.TaskState{})
must.True(t, ar.LastAcknowledgedStateIsCurrent(calloc))

// make a no-op state update
ar.SetNetworkStatus(&structs.AllocNetworkStatus{
InterfaceName: "eth0",
Address: "192.168.1.1",
DNS: &structs.DNSConfig{},
})
calloc = ar.clientAlloc(map[string]*structs.TaskState{})
must.True(t, ar.LastAcknowledgedStateIsCurrent(calloc))

// make a state update that should be detected as a change
ar.SetNetworkStatus(&structs.AllocNetworkStatus{
InterfaceName: "eth0",
Address: "192.168.2.1",
DNS: &structs.DNSConfig{},
})
calloc = ar.clientAlloc(map[string]*structs.TaskState{})
must.False(t, ar.LastAcknowledgedStateIsCurrent(calloc))
}
66 changes: 58 additions & 8 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,8 @@ type AllocRunner interface {
Signal(taskName, signal string) error
GetTaskEventHandler(taskName string) drivermanager.EventHandler
PersistState() error
AcknowledgeState(*arstate.State)
LastAcknowledgedStateIsCurrent(*structs.Allocation) bool

RestartTask(taskName string, taskEvent *structs.TaskEvent) error
RestartRunning(taskEvent *structs.TaskEvent) error
Expand Down Expand Up @@ -512,7 +514,7 @@ func NewClient(cfg *config.Config, consulCatalog consul.CatalogAPI, consulProxie
c.serviceRegWrapper = wrapper.NewHandlerWrapper(c.logger, c.consulService, c.nomadService)

// Batching of initial fingerprints is done to reduce the number of node
// updates sent to the server on startup. This is the first RPC to the servers
// updates sent to the server on startup.
go c.batchFirstFingerprints()

// create heartbeatStop. We go after the first attempt to connect to the server, so
Expand Down Expand Up @@ -1270,6 +1272,14 @@ func (c *Client) restoreState() error {
continue
}

allocState, err := c.stateDB.GetAcknowledgedState(alloc.ID)
if err != nil {
c.logger.Error("error restoring last acknowledged alloc state, will update again",
err, "alloc_id", alloc.ID)
} else {
ar.AcknowledgeState(allocState)
}

// Maybe mark the alloc for halt on missing server heartbeats
if c.heartbeatStop.shouldStop(alloc) {
err = c.heartbeatStop.stopAlloc(alloc.ID)
Expand Down Expand Up @@ -2144,10 +2154,20 @@ func (c *Client) allocSync() {
if len(updates) == 0 {
continue
}
// Ensure we never send an update before we've had at least one sync
// from the server
select {
case <-c.serversContactedCh:
default:
continue
}

sync := make([]*structs.Allocation, 0, len(updates))
for _, alloc := range updates {
sync = append(sync, alloc)
sync := c.filterAcknowledgedUpdates(updates)
if len(sync) == 0 {
// No updates to send
updates = make(map[string]*structs.Allocation, len(updates))
syncTicker.Reset(allocSyncIntv)
continue
}

// Send to server.
Expand All @@ -2162,21 +2182,51 @@ func (c *Client) allocSync() {
// Error updating allocations, do *not* clear
// updates and retry after backoff
c.logger.Error("error updating allocations", "error", err)
syncTicker.Stop()
syncTicker = time.NewTicker(c.retryIntv(allocSyncRetryIntv))
syncTicker.Reset(c.retryIntv(allocSyncRetryIntv))
continue
}

c.allocLock.RLock()
for _, update := range sync {
if ar, ok := c.allocs[update.ID]; ok {
ar.AcknowledgeState(&arstate.State{
ClientStatus: update.ClientStatus,
ClientDescription: update.ClientDescription,
DeploymentStatus: update.DeploymentStatus,
TaskStates: update.TaskStates,
NetworkStatus: update.NetworkStatus,
})
}
}
c.allocLock.RUnlock()

// Successfully updated allocs, reset map and ticker.
// Always reset ticker to give loop time to receive
// alloc updates. If the RPC took the ticker interval
// we may call it in a tight loop before draining
// buffered updates.
updates = make(map[string]*structs.Allocation, len(updates))
syncTicker.Stop()
syncTicker = time.NewTicker(allocSyncIntv)
syncTicker.Reset(allocSyncIntv)
}
}
}

func (c *Client) filterAcknowledgedUpdates(updates map[string]*structs.Allocation) []*structs.Allocation {
sync := make([]*structs.Allocation, 0, len(updates))
c.allocLock.RLock()
defer c.allocLock.RUnlock()
for allocID, update := range updates {
if ar, ok := c.allocs[allocID]; ok {
if !ar.LastAcknowledgedStateIsCurrent(update) {
sync = append(sync, update)
}
} else {
// no allocrunner (typically a failed placement), so we need
// to send update
sync = append(sync, update)
}
}
return sync
}

// allocUpdates holds the results of receiving updated allocations from the
Expand Down
Loading

0 comments on commit ee9c41b

Please sign in to comment.