diff --git a/.changelog/17074.txt b/.changelog/17074.txt new file mode 100644 index 00000000000..2973ca75cd2 --- /dev/null +++ b/.changelog/17074.txt @@ -0,0 +1,3 @@ +```release-note:improvement +client: de-duplicate allocation client status updates and prevent allocation client status updates from being sent until clients have first synchronized with the server +``` diff --git a/client/allocrunner/alloc_runner.go b/client/allocrunner/alloc_runner.go index 7545d46fe66..3aaf13dbdda 100644 --- a/client/allocrunner/alloc_runner.go +++ b/client/allocrunner/alloc_runner.go @@ -11,6 +11,7 @@ import ( log "github.com/hashicorp/go-hclog" multierror "github.com/hashicorp/go-multierror" + "golang.org/x/exp/maps" "github.com/hashicorp/nomad/client/allocdir" "github.com/hashicorp/nomad/client/allocrunner/interfaces" @@ -123,6 +124,10 @@ type allocRunner struct { state *state.State stateLock sync.RWMutex + // lastAcknowledgedState is the alloc runner state that was last + // acknowledged by the server (may lag behind ar.state) + lastAcknowledgedState *state.State + stateDB cstate.StateDB // allocDir is used to build the allocations directory structure. @@ -738,8 +743,9 @@ func (ar *allocRunner) killTasks() map[string]*structs.TaskState { return states } -// clientAlloc takes in the task states and returns an Allocation populated -// with Client specific fields +// clientAlloc takes in the task states and returns an Allocation populated with +// Client specific fields. Note: this mutates the allocRunner's state to store +// the taskStates! func (ar *allocRunner) clientAlloc(taskStates map[string]*structs.TaskState) *structs.Allocation { ar.stateLock.Lock() defer ar.stateLock.Unlock() @@ -1394,3 +1400,50 @@ func (ar *allocRunner) GetTaskDriverCapabilities(taskName string) (*drivers.Capa return tr.DriverCapabilities() } + +// AcknowledgeState is called by the client's alloc sync when a given client +// state has been acknowledged by the server +func (ar *allocRunner) AcknowledgeState(a *state.State) { + ar.stateLock.Lock() + defer ar.stateLock.Unlock() + ar.lastAcknowledgedState = a + ar.persistLastAcknowledgedState(a) +} + +// persistLastAcknowledgedState stores the last client state acknowledged by the server +func (ar *allocRunner) persistLastAcknowledgedState(a *state.State) { + if err := ar.stateDB.PutAcknowledgedState(ar.id, a); err != nil { + // While any persistence errors are very bad, the worst case scenario + // for failing to persist last acknowledged state is that if the agent + // is restarted it will send the update again. + ar.logger.Error("error storing acknowledged allocation status", "error", err) + } +} + +// LastAcknowledgedStateIsCurrent returns true if the current state matches the +// state that was last acknowledged from a server update. This is called from +// the client in the same goroutine that called AcknowledgeState so that we +// can't get a TOCTOU error. +func (ar *allocRunner) LastAcknowledgedStateIsCurrent(a *structs.Allocation) bool { + ar.stateLock.RLock() + defer ar.stateLock.RUnlock() + + last := ar.lastAcknowledgedState + if last == nil { + return false + } + + switch { + case last.ClientStatus != a.ClientStatus: + return false + case last.ClientDescription != a.ClientDescription: + return false + case !last.DeploymentStatus.Equal(a.DeploymentStatus): + return false + case !last.NetworkStatus.Equal(a.NetworkStatus): + return false + } + return maps.EqualFunc(last.TaskStates, a.TaskStates, func(st, o *structs.TaskState) bool { + return st.Equal(o) + }) +} diff --git a/client/allocrunner/alloc_runner_test.go b/client/allocrunner/alloc_runner_test.go index ddfec1606a8..beb2705e0ee 100644 --- a/client/allocrunner/alloc_runner_test.go +++ b/client/allocrunner/alloc_runner_test.go @@ -14,8 +14,13 @@ import ( "github.com/hashicorp/consul/api" multierror "github.com/hashicorp/go-multierror" + "github.com/shoenig/test/must" + "github.com/shoenig/test/wait" + "github.com/stretchr/testify/require" + "github.com/hashicorp/nomad/ci" "github.com/hashicorp/nomad/client/allochealth" + arstate "github.com/hashicorp/nomad/client/allocrunner/state" "github.com/hashicorp/nomad/client/allocrunner/tasklifecycle" "github.com/hashicorp/nomad/client/allocrunner/taskrunner" "github.com/hashicorp/nomad/client/allocwatcher" @@ -26,9 +31,6 @@ import ( "github.com/hashicorp/nomad/nomad/mock" "github.com/hashicorp/nomad/nomad/structs" "github.com/hashicorp/nomad/testutil" - "github.com/shoenig/test/must" - "github.com/shoenig/test/wait" - "github.com/stretchr/testify/require" ) // destroy does a blocking destroy on an alloc runner @@ -2443,3 +2445,59 @@ func TestAllocRunner_PreKill_RunOnDone(t *testing.T) { wait.Gap(500*time.Millisecond), )) } + +func TestAllocRunner_LastAcknowledgedStateIsCurrent(t *testing.T) { + ci.Parallel(t) + + alloc := mock.Alloc() + task := alloc.Job.TaskGroups[0].Tasks[0] + task.Driver = "mock_driver" + task.Config = map[string]interface{}{"run_for": "2ms"} + alloc.DesiredStatus = "stop" + + conf, cleanup := testAllocRunnerConfig(t, alloc.Copy()) + t.Cleanup(cleanup) + + ar, err := NewAllocRunner(conf) + must.NoError(t, err) + + ar.SetNetworkStatus(&structs.AllocNetworkStatus{ + InterfaceName: "eth0", + Address: "192.168.1.1", + DNS: &structs.DNSConfig{}, + }) + + calloc := ar.clientAlloc(map[string]*structs.TaskState{}) + ar.AcknowledgeState(&arstate.State{ + ClientStatus: calloc.ClientStatus, + ClientDescription: calloc.ClientDescription, + DeploymentStatus: calloc.DeploymentStatus, + TaskStates: calloc.TaskStates, + NetworkStatus: calloc.NetworkStatus, + }) + + must.True(t, ar.LastAcknowledgedStateIsCurrent(calloc)) + + // clientAlloc mutates the state, so verify this doesn't break the check + // without state having been updated + calloc = ar.clientAlloc(map[string]*structs.TaskState{}) + must.True(t, ar.LastAcknowledgedStateIsCurrent(calloc)) + + // make a no-op state update + ar.SetNetworkStatus(&structs.AllocNetworkStatus{ + InterfaceName: "eth0", + Address: "192.168.1.1", + DNS: &structs.DNSConfig{}, + }) + calloc = ar.clientAlloc(map[string]*structs.TaskState{}) + must.True(t, ar.LastAcknowledgedStateIsCurrent(calloc)) + + // make a state update that should be detected as a change + ar.SetNetworkStatus(&structs.AllocNetworkStatus{ + InterfaceName: "eth0", + Address: "192.168.2.1", + DNS: &structs.DNSConfig{}, + }) + calloc = ar.clientAlloc(map[string]*structs.TaskState{}) + must.False(t, ar.LastAcknowledgedStateIsCurrent(calloc)) +} diff --git a/client/client.go b/client/client.go index c2864d75cc1..3f8d237a7b6 100644 --- a/client/client.go +++ b/client/client.go @@ -160,6 +160,8 @@ type AllocRunner interface { Signal(taskName, signal string) error GetTaskEventHandler(taskName string) drivermanager.EventHandler PersistState() error + AcknowledgeState(*arstate.State) + LastAcknowledgedStateIsCurrent(*structs.Allocation) bool RestartTask(taskName string, taskEvent *structs.TaskEvent) error RestartRunning(taskEvent *structs.TaskEvent) error @@ -512,7 +514,7 @@ func NewClient(cfg *config.Config, consulCatalog consul.CatalogAPI, consulProxie c.serviceRegWrapper = wrapper.NewHandlerWrapper(c.logger, c.consulService, c.nomadService) // Batching of initial fingerprints is done to reduce the number of node - // updates sent to the server on startup. This is the first RPC to the servers + // updates sent to the server on startup. go c.batchFirstFingerprints() // create heartbeatStop. We go after the first attempt to connect to the server, so @@ -1270,6 +1272,14 @@ func (c *Client) restoreState() error { continue } + allocState, err := c.stateDB.GetAcknowledgedState(alloc.ID) + if err != nil { + c.logger.Error("error restoring last acknowledged alloc state, will update again", + err, "alloc_id", alloc.ID) + } else { + ar.AcknowledgeState(allocState) + } + // Maybe mark the alloc for halt on missing server heartbeats if c.heartbeatStop.shouldStop(alloc) { err = c.heartbeatStop.stopAlloc(alloc.ID) @@ -2144,10 +2154,20 @@ func (c *Client) allocSync() { if len(updates) == 0 { continue } + // Ensure we never send an update before we've had at least one sync + // from the server + select { + case <-c.serversContactedCh: + default: + continue + } - sync := make([]*structs.Allocation, 0, len(updates)) - for _, alloc := range updates { - sync = append(sync, alloc) + sync := c.filterAcknowledgedUpdates(updates) + if len(sync) == 0 { + // No updates to send + updates = make(map[string]*structs.Allocation, len(updates)) + syncTicker.Reset(allocSyncIntv) + continue } // Send to server. @@ -2162,21 +2182,51 @@ func (c *Client) allocSync() { // Error updating allocations, do *not* clear // updates and retry after backoff c.logger.Error("error updating allocations", "error", err) - syncTicker.Stop() - syncTicker = time.NewTicker(c.retryIntv(allocSyncRetryIntv)) + syncTicker.Reset(c.retryIntv(allocSyncRetryIntv)) continue } + c.allocLock.RLock() + for _, update := range sync { + if ar, ok := c.allocs[update.ID]; ok { + ar.AcknowledgeState(&arstate.State{ + ClientStatus: update.ClientStatus, + ClientDescription: update.ClientDescription, + DeploymentStatus: update.DeploymentStatus, + TaskStates: update.TaskStates, + NetworkStatus: update.NetworkStatus, + }) + } + } + c.allocLock.RUnlock() + // Successfully updated allocs, reset map and ticker. // Always reset ticker to give loop time to receive // alloc updates. If the RPC took the ticker interval // we may call it in a tight loop before draining // buffered updates. updates = make(map[string]*structs.Allocation, len(updates)) - syncTicker.Stop() - syncTicker = time.NewTicker(allocSyncIntv) + syncTicker.Reset(allocSyncIntv) + } + } +} + +func (c *Client) filterAcknowledgedUpdates(updates map[string]*structs.Allocation) []*structs.Allocation { + sync := make([]*structs.Allocation, 0, len(updates)) + c.allocLock.RLock() + defer c.allocLock.RUnlock() + for allocID, update := range updates { + if ar, ok := c.allocs[allocID]; ok { + if !ar.LastAcknowledgedStateIsCurrent(update) { + sync = append(sync, update) + } + } else { + // no allocrunner (typically a failed placement), so we need + // to send update + sync = append(sync, update) } } + return sync } // allocUpdates holds the results of receiving updated allocations from the diff --git a/client/client_test.go b/client/client_test.go index fb45586fbd2..10215da8617 100644 --- a/client/client_test.go +++ b/client/client_test.go @@ -16,6 +16,7 @@ import ( memdb "github.com/hashicorp/go-memdb" "github.com/shoenig/test" "github.com/shoenig/test/must" + "github.com/shoenig/test/wait" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -554,67 +555,175 @@ func waitTilNodeReady(client *Client, t *testing.T) { }) } +// TestClient_SaveRestoreState exercises the allocrunner restore code paths +// after a client restart. It runs several jobs in different states and asserts +// the expected final state and server updates. func TestClient_SaveRestoreState(t *testing.T) { ci.Parallel(t) s1, _, cleanupS1 := testServer(t, nil) - defer cleanupS1() + t.Cleanup(cleanupS1) testutil.WaitForLeader(t, s1.RPC) c1, cleanupC1 := TestClient(t, func(c *config.Config) { c.DevMode = false c.RPCHandler = s1 }) - defer cleanupC1() + t.Cleanup(func() { + for _, ar := range c1.getAllocRunners() { + ar.Destroy() + } + for _, ar := range c1.getAllocRunners() { + <-ar.DestroyCh() + } + cleanupC1() + }) // Wait until the node is ready waitTilNodeReady(c1, t) - // Create mock allocations - job := mock.Job() - alloc1 := mock.Alloc() - alloc1.NodeID = c1.Node().ID - alloc1.Job = job - alloc1.JobID = job.ID - alloc1.Job.TaskGroups[0].Tasks[0].Driver = "mock_driver" - alloc1.Job.TaskGroups[0].Tasks[0].Config = map[string]interface{}{ - "run_for": "10s", - } - alloc1.ClientStatus = structs.AllocClientStatusRunning + migrateStrategy := structs.DefaultMigrateStrategy() + migrateStrategy.MinHealthyTime = time.Millisecond + migrateStrategy.HealthCheck = structs.MigrateStrategyHealthStates + + // Create mock jobs and allocations that will start up fast + + setup := func(id string) *structs.Job { + job := mock.MinJob() + job.ID = id + job.TaskGroups[0].Migrate = migrateStrategy + must.NoError(t, s1.RPC("Job.Register", &structs.JobRegisterRequest{ + Job: job, + WriteRequest: structs.WriteRequest{Region: "global", Namespace: job.Namespace}, + }, &structs.JobRegisterResponse{})) + return job + } + + // job1: will be left running + // job2: will be stopped before shutdown + // job3: will be stopped after shutdown + // job4: will be stopped and GC'd after shutdown + job1, job2, job3, job4 := setup("job1"), setup("job2"), setup("job3"), setup("job4") + + // Allocations should be placed + must.Wait(t, wait.InitialSuccess( + wait.ErrorFunc(func() error { + c1.allocLock.RLock() + defer c1.allocLock.RUnlock() + if len(c1.allocs) != 4 { + return fmt.Errorf("expected 4 alloc runners") + } + for _, ar := range c1.allocs { + if ar.AllocState().ClientStatus != structs.AllocClientStatusRunning { + return fmt.Errorf("expected running client status, got %v", + ar.AllocState().ClientStatus) + } + } + return nil + }), + wait.Timeout(time.Second*10), + wait.Gap(time.Millisecond*30), + )) - state := s1.State() - if err := state.UpsertJob(structs.MsgTypeTestSetup, 100, nil, job); err != nil { - t.Fatal(err) - } - if err := state.UpsertJobSummary(101, mock.JobSummary(alloc1.JobID)); err != nil { - t.Fatal(err) - } - if err := state.UpsertAllocs(structs.MsgTypeTestSetup, 102, []*structs.Allocation{alloc1}); err != nil { - t.Fatalf("err: %v", err) - } + store := s1.State() - // Allocations should get registered - testutil.WaitForResult(func() (bool, error) { - c1.allocLock.RLock() - ar := c1.allocs[alloc1.ID] - c1.allocLock.RUnlock() - if ar == nil { - return false, fmt.Errorf("nil alloc runner") - } - if ar.Alloc().ClientStatus != structs.AllocClientStatusRunning { - return false, fmt.Errorf("client status: got %v; want %v", ar.Alloc().ClientStatus, structs.AllocClientStatusRunning) - } - return true, nil - }, func(err error) { - t.Fatalf("err: %v", err) - }) + allocIDforJob := func(job *structs.Job) string { + allocs, err := store.AllocsByJob(nil, job.Namespace, job.ID, false) + must.NoError(t, err) + must.Len(t, 1, allocs) // we should only ever get 1 in this test + return allocs[0].ID + } + alloc1 := allocIDforJob(job1) + alloc2 := allocIDforJob(job2) + alloc3 := allocIDforJob(job3) + alloc4 := allocIDforJob(job4) + t.Logf("alloc1=%s alloc2=%s alloc3=%s alloc4=%s", alloc1, alloc2, alloc3, alloc4) + + // Stop the 2nd job before we shut down + must.NoError(t, s1.RPC("Job.Deregister", &structs.JobDeregisterRequest{ + JobID: job2.ID, + WriteRequest: structs.WriteRequest{Region: "global", Namespace: job2.Namespace}, + }, &structs.JobDeregisterResponse{})) + + var alloc2ModifyIndex uint64 + var alloc2AllocModifyIndex uint64 + + // Wait till we're sure the client has received the stop and updated the server + must.Wait(t, wait.InitialSuccess( + wait.ErrorFunc(func() error { + alloc, err := store.AllocByID(nil, alloc2) + must.NotNil(t, alloc) + must.NoError(t, err) + if alloc.ClientStatus != structs.AllocClientStatusComplete { + // note that the allocrunner is non-nil until it's been + // client-GC'd, so we're just looking to make sure the client + // has updated the server + return fmt.Errorf("alloc2 should have been marked completed") + } + alloc2ModifyIndex = alloc.ModifyIndex + alloc2AllocModifyIndex = alloc.AllocModifyIndex + return nil + }), + wait.Timeout(time.Second*20), + wait.Gap(time.Millisecond*30), + )) + + t.Log("shutting down client") + must.NoError(t, c1.Shutdown()) // note: this saves the client state DB + + // Stop the 3rd job while we're down + must.NoError(t, s1.RPC("Job.Deregister", &structs.JobDeregisterRequest{ + JobID: job3.ID, + WriteRequest: structs.WriteRequest{Region: "global", Namespace: job3.Namespace}, + }, &structs.JobDeregisterResponse{})) + + // Stop and purge the 4th job while we're down + must.NoError(t, s1.RPC("Job.Deregister", &structs.JobDeregisterRequest{ + JobID: job4.ID, + Purge: true, + WriteRequest: structs.WriteRequest{Region: "global", Namespace: job4.Namespace}, + }, &structs.JobDeregisterResponse{})) + + // Ensure the allocation has been deleted as well + must.NoError(t, s1.RPC("Eval.Reap", &structs.EvalReapRequest{ + Allocs: []string{alloc4}, + WriteRequest: structs.WriteRequest{Region: "global"}, + }, &structs.GenericResponse{})) + + var alloc3AllocModifyIndex uint64 + var alloc3ModifyIndex uint64 + + // Wait till we're sure the scheduler has marked alloc3 for stop and deleted alloc4 + must.Wait(t, wait.InitialSuccess( + wait.ErrorFunc(func() error { + alloc, err := store.AllocByID(nil, alloc3) + must.NotNil(t, alloc) + must.NoError(t, err) + if alloc.DesiredStatus != structs.AllocDesiredStatusStop { + return fmt.Errorf("alloc3 should have been marked for stop") + } + alloc3ModifyIndex = alloc.ModifyIndex + alloc3AllocModifyIndex = alloc.AllocModifyIndex - // Shutdown the client, saves state - if err := c1.Shutdown(); err != nil { - t.Fatalf("err: %v", err) - } + alloc, err = store.AllocByID(nil, alloc4) + must.NoError(t, err) + if alloc != nil { + return fmt.Errorf("alloc4 should have been deleted") + } + return nil + }), + wait.Timeout(time.Second*5), + wait.Gap(time.Millisecond*30), + )) + + a1, err := store.AllocByID(nil, alloc1) + var alloc1AllocModifyIndex uint64 + var alloc1ModifyIndex uint64 + alloc1ModifyIndex = a1.ModifyIndex + alloc1AllocModifyIndex = a1.AllocModifyIndex + + t.Log("starting new client") - // Create a new client logger := testlog.HCLogger(t) c1.config.Logger = logger consulCatalog := consul.NewMockCatalog(logger) @@ -625,34 +734,77 @@ func TestClient_SaveRestoreState(t *testing.T) { c1.config.PluginSingletonLoader = singleton.NewSingletonLoader(logger, c1.config.PluginLoader) c2, err := NewClient(c1.config, consulCatalog, nil, mockService, nil) - if err != nil { - t.Fatalf("err: %v", err) - } - defer c2.Shutdown() + must.NoError(t, err) - // Ensure the allocation is running - testutil.WaitForResult(func() (bool, error) { - c2.allocLock.RLock() - ar := c2.allocs[alloc1.ID] - c2.allocLock.RUnlock() - status := ar.Alloc().ClientStatus - alive := status == structs.AllocClientStatusRunning || status == structs.AllocClientStatusPending - if !alive { - return false, fmt.Errorf("incorrect client status: %#v", ar.Alloc()) + t.Cleanup(func() { + for _, ar := range c2.getAllocRunners() { + ar.Destroy() } - return true, nil - }, func(err error) { - t.Fatalf("err: %v", err) + for _, ar := range c2.getAllocRunners() { + <-ar.DestroyCh() + } + c2.Shutdown() }) - // Destroy all the allocations - for _, ar := range c2.getAllocRunners() { - ar.Destroy() - } + // Ensure only the expected allocation is running + must.Wait(t, wait.InitialSuccess( + wait.ErrorFunc(func() error { + c2.allocLock.RLock() + defer c2.allocLock.RUnlock() + if len(c2.allocs) != 3 { + // the GC'd alloc will not have restored AR + return fmt.Errorf("expected 3 alloc runners") + } + for allocID, ar := range c2.allocs { + if ar == nil { + return fmt.Errorf("nil alloc runner") + } + switch allocID { + case alloc1: + if ar.AllocState().ClientStatus != structs.AllocClientStatusRunning { + return fmt.Errorf("expected running client status, got %v", + ar.AllocState().ClientStatus) + } + default: + if ar.AllocState().ClientStatus != structs.AllocClientStatusComplete { + return fmt.Errorf("expected complete client status, got %v", + ar.AllocState().ClientStatus) + } + } + } + return nil + }), + wait.Timeout(time.Second*10), + wait.Gap(time.Millisecond*30), + )) - for _, ar := range c2.getAllocRunners() { - <-ar.DestroyCh() - } + a1, err = store.AllocByID(nil, alloc1) + must.NoError(t, err) + must.NotNil(t, a1) + test.Eq(t, alloc1AllocModifyIndex, a1.AllocModifyIndex) + test.Eq(t, alloc1ModifyIndex, a1.ModifyIndex, + test.Sprint("alloc still running should not have updated")) + + a2, err := store.AllocByID(nil, alloc2) + must.NoError(t, err) + must.NotNil(t, a2) + test.Eq(t, alloc2AllocModifyIndex, a2.AllocModifyIndex) + test.Eq(t, alloc2ModifyIndex, a2.ModifyIndex, + test.Sprintf("alloc %s stopped before shutdown should not have updated", a2.ID[:8])) + + a3, err := store.AllocByID(nil, alloc3) + must.NoError(t, err) + must.NotNil(t, a3) + test.Eq(t, alloc3AllocModifyIndex, a3.AllocModifyIndex) + test.Greater(t, alloc3ModifyIndex, a3.ModifyIndex, + test.Sprintf("alloc %s stopped during shutdown should have updated", a3.ID[:8])) + + // TODO: the alloc has been GC'd so the server will reject any update. It'd + // be nice if we could instrument the server here to ensure we didn't send + // one either. + a4, err := store.AllocByID(nil, alloc4) + must.NoError(t, err) + test.Nil(t, a4, test.Sprint("garbage collected alloc should not exist")) } func TestClient_AddAllocError(t *testing.T) { diff --git a/client/state/db_bolt.go b/client/state/db_bolt.go index c0722d65d47..53caf5b2448 100644 --- a/client/state/db_bolt.go +++ b/client/state/db_bolt.go @@ -11,6 +11,7 @@ import ( "time" hclog "github.com/hashicorp/go-hclog" + arstate "github.com/hashicorp/nomad/client/allocrunner/state" trstate "github.com/hashicorp/nomad/client/allocrunner/taskrunner/state" dmstate "github.com/hashicorp/nomad/client/devicemanager/state" "github.com/hashicorp/nomad/client/dynamicplugins" @@ -32,6 +33,7 @@ allocations/ |--> alloc -> allocEntry{*structs.Allocation} |--> deploy_status -> deployStatusEntry{*structs.AllocDeploymentStatus} |--> network_status -> networkStatusEntry{*structs.AllocNetworkStatus} + |--> acknowledged_state -> acknowledgedStateEntry{*arstate.State} |--> task-/ |--> local_state -> *trstate.LocalState # Local-only state |--> task_state -> *structs.TaskState # Syncs to servers @@ -83,6 +85,9 @@ var ( // stored under allocNetworkStatusKey = []byte("network_status") + // acknowledgedStateKey is the key *arstate.State is stored under + acknowledgedStateKey = []byte("acknowledged_state") + // checkResultsBucket is the bucket name in which check query results are stored checkResultsBucket = []byte("check_results") @@ -392,6 +397,54 @@ func (s *BoltStateDB) GetNetworkStatus(allocID string) (*structs.AllocNetworkSta return entry.NetworkStatus, nil } +// PutAcknowledgedState stores an allocation's last acknowledged state or +// returns an error if it could not be stored. +func (s *BoltStateDB) PutAcknowledgedState(allocID string, state *arstate.State, opts ...WriteOption) error { + return s.updateWithOptions(opts, func(tx *boltdd.Tx) error { + allocBkt, err := getAllocationBucket(tx, allocID) + if err != nil { + return err + } + + entry := acknowledgedStateEntry{ + State: state, + } + return allocBkt.Put(acknowledgedStateKey, &entry) + }) +} + +// GetAcknowledgedState retrieves an allocation's last acknowledged state +func (s *BoltStateDB) GetAcknowledgedState(allocID string) (*arstate.State, error) { + var entry acknowledgedStateEntry + + err := s.db.View(func(tx *boltdd.Tx) error { + allAllocsBkt := tx.Bucket(allocationsBucketName) + if allAllocsBkt == nil { + // No state, return + return nil + } + + allocBkt := allAllocsBkt.Bucket([]byte(allocID)) + if allocBkt == nil { + // No state for alloc, return + return nil + } + + return allocBkt.Get(acknowledgedStateKey, &entry) + }) + + // It's valid for this field to be nil/missing + if boltdd.IsErrNotFound(err) { + return nil, nil + } + + if err != nil { + return nil, err + } + + return entry.State, nil +} + // GetTaskRunnerState returns the LocalState and TaskState for a // TaskRunner. LocalState or TaskState will be nil if they do not exist. // @@ -851,6 +904,12 @@ func (s *BoltStateDB) init() error { }) } +// acknowledgedStateEntry wraps values in the acknowledged_state bucket, so we +// can expand it in the future if need be +type acknowledgedStateEntry struct { + State *arstate.State +} + // updateWithOptions enables adjustments to db.Update operation, including Batch mode. func (s *BoltStateDB) updateWithOptions(opts []WriteOption, updateFn func(tx *boltdd.Tx) error) error { writeOpts := mergeWriteOptions(opts) diff --git a/client/state/db_error.go b/client/state/db_error.go index 2d9c2cdb2a8..9c2499dc872 100644 --- a/client/state/db_error.go +++ b/client/state/db_error.go @@ -6,6 +6,7 @@ package state import ( "fmt" + arstate "github.com/hashicorp/nomad/client/allocrunner/state" "github.com/hashicorp/nomad/client/allocrunner/taskrunner/state" dmstate "github.com/hashicorp/nomad/client/devicemanager/state" "github.com/hashicorp/nomad/client/dynamicplugins" @@ -52,6 +53,14 @@ func (m *ErrDB) PutNetworkStatus(allocID string, ns *structs.AllocNetworkStatus, return fmt.Errorf("Error!") } +func (m *ErrDB) PutAcknowledgedState(allocID string, state *arstate.State, opts ...WriteOption) error { + return fmt.Errorf("Error!") +} + +func (m *ErrDB) GetAcknowledgedState(allocID string) (*arstate.State, error) { + return nil, fmt.Errorf("Error!") +} + func (m *ErrDB) GetTaskRunnerState(allocID string, taskName string) (*state.LocalState, *structs.TaskState, error) { return nil, nil, fmt.Errorf("Error!") } diff --git a/client/state/db_mem.go b/client/state/db_mem.go index 329fc690986..33347db67a9 100644 --- a/client/state/db_mem.go +++ b/client/state/db_mem.go @@ -7,6 +7,7 @@ import ( "sync" "github.com/hashicorp/go-hclog" + arstate "github.com/hashicorp/nomad/client/allocrunner/state" "github.com/hashicorp/nomad/client/allocrunner/taskrunner/state" dmstate "github.com/hashicorp/nomad/client/devicemanager/state" "github.com/hashicorp/nomad/client/dynamicplugins" @@ -28,6 +29,9 @@ type MemDB struct { // alloc_id -> value networkStatus map[string]*structs.AllocNetworkStatus + // alloc_id -> value + acknowledgedState map[string]*arstate.State + // alloc_id -> task_name -> value localTaskState map[string]map[string]*state.LocalState taskState map[string]map[string]*structs.TaskState @@ -55,13 +59,14 @@ type MemDB struct { func NewMemDB(logger hclog.Logger) *MemDB { logger = logger.Named("memdb") return &MemDB{ - allocs: make(map[string]*structs.Allocation), - deployStatus: make(map[string]*structs.AllocDeploymentStatus), - networkStatus: make(map[string]*structs.AllocNetworkStatus), - localTaskState: make(map[string]map[string]*state.LocalState), - taskState: make(map[string]map[string]*structs.TaskState), - checks: make(checks.ClientResults), - logger: logger, + allocs: make(map[string]*structs.Allocation), + deployStatus: make(map[string]*structs.AllocDeploymentStatus), + networkStatus: make(map[string]*structs.AllocNetworkStatus), + acknowledgedState: make(map[string]*arstate.State), + localTaskState: make(map[string]map[string]*state.LocalState), + taskState: make(map[string]map[string]*structs.TaskState), + checks: make(checks.ClientResults), + logger: logger, } } @@ -118,6 +123,19 @@ func (m *MemDB) PutNetworkStatus(allocID string, ns *structs.AllocNetworkStatus, return nil } +func (m *MemDB) PutAcknowledgedState(allocID string, state *arstate.State, opts ...WriteOption) error { + m.mu.Lock() + m.acknowledgedState[allocID] = state + defer m.mu.Unlock() + return nil +} + +func (m *MemDB) GetAcknowledgedState(allocID string) (*arstate.State, error) { + m.mu.Lock() + defer m.mu.Unlock() + return m.acknowledgedState[allocID], nil +} + func (m *MemDB) GetTaskRunnerState(allocID string, taskName string) (*state.LocalState, *structs.TaskState, error) { m.mu.RLock() defer m.mu.RUnlock() diff --git a/client/state/db_noop.go b/client/state/db_noop.go index d0db5b48ae6..eb8003abf63 100644 --- a/client/state/db_noop.go +++ b/client/state/db_noop.go @@ -4,6 +4,7 @@ package state import ( + arstate "github.com/hashicorp/nomad/client/allocrunner/state" "github.com/hashicorp/nomad/client/allocrunner/taskrunner/state" dmstate "github.com/hashicorp/nomad/client/devicemanager/state" "github.com/hashicorp/nomad/client/dynamicplugins" @@ -47,6 +48,12 @@ func (n NoopDB) PutNetworkStatus(allocID string, ds *structs.AllocNetworkStatus, return nil } +func (n NoopDB) PutAcknowledgedState(allocID string, state *arstate.State, opts ...WriteOption) error { + return nil +} + +func (n NoopDB) GetAcknowledgedState(allocID string) (*arstate.State, error) { return nil, nil } + func (n NoopDB) GetTaskRunnerState(allocID string, taskName string) (*state.LocalState, *structs.TaskState, error) { return nil, nil, nil } diff --git a/client/state/interface.go b/client/state/interface.go index 44253d94905..84a3cfad175 100644 --- a/client/state/interface.go +++ b/client/state/interface.go @@ -4,6 +4,7 @@ package state import ( + arstate "github.com/hashicorp/nomad/client/allocrunner/state" "github.com/hashicorp/nomad/client/allocrunner/taskrunner/state" dmstate "github.com/hashicorp/nomad/client/devicemanager/state" "github.com/hashicorp/nomad/client/dynamicplugins" @@ -44,6 +45,14 @@ type StateDB interface { // PutNetworkStatus puts the allocation's network status. It may be nil. PutNetworkStatus(allocID string, ns *structs.AllocNetworkStatus, opts ...WriteOption) error + // PutAcknowledgedState stores an allocation's last acknowledged state or + // returns an error if it could not be stored. + PutAcknowledgedState(string, *arstate.State, ...WriteOption) error + + // GetAcknowledgedState retrieves an allocation's last acknowledged + // state. It may be nil even if there's no error + GetAcknowledgedState(string) (*arstate.State, error) + // GetTaskRunnerState returns the LocalState and TaskState for a // TaskRunner. Either state may be nil if it is not found, but if an // error is encountered only the error will be non-nil. diff --git a/client/util.go b/client/util.go index 746f08f83d6..d1902b10444 100644 --- a/client/util.go +++ b/client/util.go @@ -41,7 +41,8 @@ func diffAllocs(existing map[string]uint64, allocs *allocUpdates) *diffResult { continue } - // Check for an update + // Check for an update (note: AllocModifyIndex is only updated for + // server updates) if pulled && alloc.AllocModifyIndex > existIndex { result.updated = append(result.updated, alloc) continue diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index b42c3f4ac2a..5203fb2a0f7 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -2823,6 +2823,13 @@ func (d *DNSConfig) Copy() *DNSConfig { } } +func (d *DNSConfig) IsZero() bool { + if d == nil { + return true + } + return len(d.Options) == 0 || len(d.Searches) == 0 || len(d.Servers) == 0 +} + // NetworkResource is used to represent available network // resources type NetworkResource struct { @@ -8358,6 +8365,16 @@ func (h *TaskHandle) Copy() *TaskHandle { return &newTH } +func (h *TaskHandle) Equal(o *TaskHandle) bool { + if h == nil || o == nil { + return h == o + } + if h.Version != o.Version { + return false + } + return bytes.Equal(h.DriverState, o.DriverState) +} + // Set of possible states for a task. const ( TaskStatePending = "pending" // The task is waiting to be run. @@ -8437,6 +8454,37 @@ func (ts *TaskState) Successful() bool { return ts.State == TaskStateDead && !ts.Failed } +func (ts *TaskState) Equal(o *TaskState) bool { + if ts.State != o.State { + return false + } + if ts.Failed != o.Failed { + return false + } + if ts.Restarts != o.Restarts { + return false + } + if ts.LastRestart != o.LastRestart { + return false + } + if ts.StartedAt != o.StartedAt { + return false + } + if ts.FinishedAt != o.FinishedAt { + return false + } + if !slices.EqualFunc(ts.Events, o.Events, func(ts, o *TaskEvent) bool { + return ts.Equal(o) + }) { + return false + } + if !ts.TaskHandle.Equal(o.TaskHandle) { + return false + } + + return true +} + const ( // TaskSetupFailure indicates that the task could not be started due to a // a setup failure. @@ -8769,6 +8817,31 @@ func (e *TaskEvent) GoString() string { return fmt.Sprintf("%v - %v", e.Time, e.Type) } +// Equal on TaskEvent ignores the deprecated fields +func (e *TaskEvent) Equal(o *TaskEvent) bool { + if e == nil || o == nil { + return e == o + } + + if e.Type != o.Type { + return false + } + if e.Time != o.Time { + return false + } + if e.Message != o.Message { + return false + } + if e.DisplayMessage != o.DisplayMessage { + return false + } + if !maps.Equal(e.Details, o.Details) { + return false + } + + return true +} + // SetDisplayMessage sets the display message of TaskEvent func (e *TaskEvent) SetDisplayMessage(msg string) *TaskEvent { e.DisplayMessage = msg @@ -11257,6 +11330,41 @@ func (a *AllocNetworkStatus) Copy() *AllocNetworkStatus { } } +func (a *AllocNetworkStatus) Equal(o *AllocNetworkStatus) bool { + // note: this accounts for when DNSConfig is non-nil but empty + switch { + case a == nil && o.IsZero(): + return true + case o == nil && a.IsZero(): + return true + case a == nil || o == nil: + return a == o + } + + switch { + case a.InterfaceName != o.InterfaceName: + return false + case a.Address != o.Address: + return false + case !a.DNS.Equal(o.DNS): + return false + } + return true +} + +func (a *AllocNetworkStatus) IsZero() bool { + if a == nil { + return true + } + if a.InterfaceName != "" || a.Address != "" { + return false + } + if !a.DNS.IsZero() { + return false + } + return true +} + // NetworkStatus is an interface satisfied by alloc runner, for acquiring the // network status of an allocation. type NetworkStatus interface { @@ -11333,6 +11441,24 @@ func (a *AllocDeploymentStatus) Copy() *AllocDeploymentStatus { return c } +func (a *AllocDeploymentStatus) Equal(o *AllocDeploymentStatus) bool { + if a == nil || o == nil { + return a == o + } + + switch { + case !pointer.Eq(a.Healthy, o.Healthy): + return false + case a.Timestamp != o.Timestamp: + return false + case a.Canary != o.Canary: + return false + case a.ModifyIndex != o.ModifyIndex: + return false + } + return true +} + const ( EvalStatusBlocked = "blocked" EvalStatusPending = "pending"