diff --git a/.changelog/16612.txt b/.changelog/16612.txt new file mode 100644 index 00000000000..595845c9d12 --- /dev/null +++ b/.changelog/16612.txt @@ -0,0 +1,3 @@ +```release-note:bug +drainer: Fixed a bug where draining nodes that become lost or disconnected were still tracked by the drainer +``` diff --git a/nomad/drainer/drain_testing.go b/nomad/drainer/drain_testing.go index 61de31d813d..bb8aa3ec5b6 100644 --- a/nomad/drainer/drain_testing.go +++ b/nomad/drainer/drain_testing.go @@ -1,52 +1,142 @@ package drainer import ( + "context" "sync" + "testing" + "time" + "golang.org/x/time/rate" + + "github.com/hashicorp/nomad/helper/testlog" + "github.com/hashicorp/nomad/nomad/state" "github.com/hashicorp/nomad/nomad/structs" ) -type MockNodeTrackerEvent struct { - NodeUpdate *structs.Node - NodeRemove string -} +// This file contains helpers for testing. The raft shims make it hard to test +// the whole package behavior of the drainer. See also nomad/drainer_int_test.go +// for integration tests. -type MockNodeTracker struct { - Nodes map[string]*structs.Node - Events []*MockNodeTrackerEvent +type MockJobWatcher struct { + drainCh chan *DrainRequest + migratedCh chan []*structs.Allocation + jobs map[structs.NamespacedID]struct{} sync.Mutex } -func NewMockNodeTracker() *MockNodeTracker { - return &MockNodeTracker{ - Nodes: make(map[string]*structs.Node), - Events: make([]*MockNodeTrackerEvent, 0, 16), +// RegisterJobs marks the job as being watched +func (m *MockJobWatcher) RegisterJobs(jobs []structs.NamespacedID) { + m.Lock() + defer m.Unlock() + for _, job := range jobs { + m.jobs[job] = struct{}{} } } -func (m *MockNodeTracker) TrackedNodes() map[string]*structs.Node { - m.Lock() - defer m.Unlock() - return m.Nodes +// Drain returns the DrainRequest channel. Tests can send on this channel to +// simulate steps through the NodeDrainer watch loop. (Sending on this channel +// will block anywhere else.) +func (m *MockJobWatcher) Drain() <-chan *DrainRequest { + return m.drainCh } -func (m *MockNodeTracker) Remove(nodeID string) { - m.Lock() - defer m.Unlock() - delete(m.Nodes, nodeID) - m.Events = append(m.Events, &MockNodeTrackerEvent{NodeRemove: nodeID}) +// Migrated returns the channel of migrated allocations. Tests can send on this +// channel to simulate steps through the NodeDrainer watch loop. (Sending on +// this channel will block anywhere else.) +func (m *MockJobWatcher) Migrated() <-chan []*structs.Allocation { + return m.migratedCh +} + +type MockDeadlineNotifier struct { + expiredCh <-chan []string + nodes map[string]struct{} + sync.Mutex } -func (m *MockNodeTracker) Update(node *structs.Node) { +// NextBatch returns the channel of expired nodes. Tests can send on this +// channel to simulate timer events in the NodeDrainer watch loop. (Sending on +// this channel will block anywhere else.) +func (m *MockDeadlineNotifier) NextBatch() <-chan []string { + return m.expiredCh +} + +// Remove removes the given node from being tracked for a deadline. +func (m *MockDeadlineNotifier) Remove(nodeID string) { m.Lock() defer m.Unlock() - m.Nodes[node.ID] = node - m.Events = append(m.Events, &MockNodeTrackerEvent{NodeUpdate: node}) + delete(m.nodes, nodeID) } -func (m *MockNodeTracker) events() []*MockNodeTrackerEvent { +// Watch marks the node as being watched; this mock throws out the timer in lieu +// of manully sending on the channel to avoid racy tests. +func (m *MockDeadlineNotifier) Watch(nodeID string, _ time.Time) { m.Lock() defer m.Unlock() + m.nodes[nodeID] = struct{}{} +} + +type MockRaftApplierShim struct { + state *state.StateStore +} + +// AllocUpdateDesiredTransition mocks a write to raft as a state store update +func (m *MockRaftApplierShim) AllocUpdateDesiredTransition( + allocs map[string]*structs.DesiredTransition, evals []*structs.Evaluation) (uint64, error) { + index, _ := m.state.LatestIndex() + index++ + err := m.state.UpdateAllocsDesiredTransitions(structs.MsgTypeTestSetup, index, allocs, evals) + return index, err +} + +// NodesDrainComplete mocks a write to raft as a state store update +func (m *MockRaftApplierShim) NodesDrainComplete( + nodes []string, event *structs.NodeEvent) (uint64, error) { + index, _ := m.state.LatestIndex() + index++ + + updates := make(map[string]*structs.DrainUpdate, len(nodes)) + nodeEvents := make(map[string]*structs.NodeEvent, len(nodes)) + update := &structs.DrainUpdate{} + for _, node := range nodes { + updates[node] = update + if event != nil { + nodeEvents[node] = event + } + } + now := time.Now().Unix() + + err := m.state.BatchUpdateNodeDrain(structs.MsgTypeTestSetup, index, now, + updates, nodeEvents) + + return index, err +} + +func testNodeDrainWatcher(t *testing.T) (*nodeDrainWatcher, *state.StateStore, *NodeDrainer) { + t.Helper() + store := state.TestStateStore(t) + limiter := rate.NewLimiter(100.0, 100) + logger := testlog.HCLogger(t) + + drainer := &NodeDrainer{ + enabled: false, + logger: logger, + nodes: map[string]*drainingNode{}, + jobWatcher: &MockJobWatcher{ + drainCh: make(chan *DrainRequest), + migratedCh: make(chan []*structs.Allocation), + jobs: map[structs.NamespacedID]struct{}{}, + }, + deadlineNotifier: &MockDeadlineNotifier{ + expiredCh: make(<-chan []string), + nodes: map[string]struct{}{}, + }, + state: store, + queryLimiter: limiter, + raft: &MockRaftApplierShim{state: store}, + batcher: allocMigrateBatcher{}, + } - return m.Events + w := NewNodeDrainWatcher(context.Background(), limiter, store, logger, drainer) + drainer.nodeWatcher = w + return w, store, drainer } diff --git a/nomad/drainer/watch_jobs.go b/nomad/drainer/watch_jobs.go index 36d301cc3b5..1ed4402213f 100644 --- a/nomad/drainer/watch_jobs.go +++ b/nomad/drainer/watch_jobs.go @@ -29,7 +29,7 @@ func NewDrainRequest(allocs []*structs.Allocation) *DrainRequest { // DrainingJobWatcher is the interface for watching a job drain type DrainingJobWatcher interface { // RegisterJob is used to start watching a draining job - RegisterJobs(job []structs.NamespacedID) + RegisterJobs(jobs []structs.NamespacedID) // Drain is used to emit allocations that should be drained. Drain() <-chan *DrainRequest diff --git a/nomad/drainer/watch_nodes.go b/nomad/drainer/watch_nodes.go index 31ce4357e76..32d40745526 100644 --- a/nomad/drainer/watch_nodes.go +++ b/nomad/drainer/watch_nodes.go @@ -33,7 +33,6 @@ func (n *NodeDrainer) Remove(nodeID string) { n.l.Lock() defer n.l.Unlock() - // TODO test the notifier is updated // Remove it from being tracked and remove it from the dealiner delete(n.nodes, nodeID) n.deadlineNotifier.Remove(nodeID) @@ -58,7 +57,6 @@ func (n *NodeDrainer) Update(node *structs.Node) { draining.Update(node) } - // TODO test the notifier is updated if inf, deadline := node.DrainStrategy.DeadlineTime(); !inf { n.deadlineNotifier.Watch(node.ID, deadline) } else { @@ -67,7 +65,6 @@ func (n *NodeDrainer) Update(node *structs.Node) { n.deadlineNotifier.Remove(node.ID) } - // TODO Test this // Register interest in the draining jobs. jobs, err := draining.DrainingJobs() if err != nil { @@ -77,8 +74,6 @@ func (n *NodeDrainer) Update(node *structs.Node) { n.logger.Trace("node has draining jobs on it", "node_id", node.ID, "num_jobs", len(jobs)) n.jobWatcher.RegisterJobs(jobs) - // TODO Test at this layer as well that a node drain on a node without - // allocs immediately gets unmarked as draining // Check if the node is done such that if an operator drains a node with // nothing on it we unset drain done, err := draining.IsDone() @@ -88,8 +83,8 @@ func (n *NodeDrainer) Update(node *structs.Node) { } if done { - // Node is done draining. Stop remaining system allocs before - // marking node as complete. + // Node is done draining. Stop remaining system allocs before marking + // node as complete. remaining, err := draining.RemainingAllocs() if err != nil { n.logger.Error("error getting remaining allocs on drained node", "node_id", node.ID, "error", err) @@ -179,21 +174,31 @@ func (w *nodeDrainWatcher) watch() { currentNode, tracked := tracked[nodeID] switch { - // If the node is tracked but not draining, untrack + case tracked && !newDraining: + // If the node is tracked but not draining, untrack w.tracker.Remove(nodeID) - // If the node is not being tracked but is draining, track + case !tracked && node.UnresponsiveStatus(): + // Down or Disconnected nodes can't be migrated, so skip + // tracking them + case !tracked && newDraining: + // If the node is not being tracked but is draining, track w.tracker.Update(node) - // If the node is being tracked but has changed, update: + case tracked && node.UnresponsiveStatus(): + // Down or Disconnected nodes can't be migrated, so stop + // tracking them. The node update event that changed the status + // will trigger all the evaluations we need + w.tracker.Remove(nodeID) + case tracked && newDraining && !currentNode.DrainStrategy.Equal(node.DrainStrategy): + // If the node is being tracked but has changed, update w.tracker.Update(node) + default: } - - // TODO(schmichael) handle the case of a lost node } for nodeID := range tracked { @@ -219,7 +224,7 @@ func (w *nodeDrainWatcher) getNodes(minIndex uint64) (map[string]*structs.Node, } // getNodesImpl is used to get nodes from the state store, returning the set of -// nodes and the given index. +// nodes and the current node table index. func (w *nodeDrainWatcher) getNodesImpl(ws memdb.WatchSet, state *state.StateStore) (interface{}, uint64, error) { iter, err := state.Nodes(ws) if err != nil { @@ -231,7 +236,6 @@ func (w *nodeDrainWatcher) getNodesImpl(ws memdb.WatchSet, state *state.StateSto return nil, 0, err } - var maxIndex uint64 = 0 resp := make(map[string]*structs.Node, 64) for { raw := iter.Next() @@ -241,15 +245,6 @@ func (w *nodeDrainWatcher) getNodesImpl(ws memdb.WatchSet, state *state.StateSto node := raw.(*structs.Node) resp[node.ID] = node - if maxIndex < node.ModifyIndex { - maxIndex = node.ModifyIndex - } - } - - // Prefer using the actual max index of affected nodes since it means less - // unblocking - if maxIndex != 0 { - index = maxIndex } return resp, index, nil diff --git a/nomad/drainer/watch_nodes_test.go b/nomad/drainer/watch_nodes_test.go index f1154c46068..6d858515e62 100644 --- a/nomad/drainer/watch_nodes_test.go +++ b/nomad/drainer/watch_nodes_test.go @@ -1,41 +1,26 @@ package drainer import ( - "context" + "fmt" "testing" "time" + "github.com/shoenig/test/must" + "github.com/shoenig/test/wait" + "github.com/hashicorp/nomad/ci" - "github.com/hashicorp/nomad/helper/testlog" + "github.com/hashicorp/nomad/helper/pointer" + "github.com/hashicorp/nomad/helper/uuid" "github.com/hashicorp/nomad/nomad/mock" "github.com/hashicorp/nomad/nomad/state" "github.com/hashicorp/nomad/nomad/structs" - "github.com/hashicorp/nomad/testutil" - "github.com/stretchr/testify/require" - "golang.org/x/time/rate" ) -func testNodeDrainWatcher(t *testing.T) (*nodeDrainWatcher, *state.StateStore, *MockNodeTracker) { - t.Helper() - state := state.TestStateStore(t) - limiter := rate.NewLimiter(100.0, 100) - logger := testlog.HCLogger(t) - m := NewMockNodeTracker() - w := NewNodeDrainWatcher(context.Background(), limiter, state, logger, m) - return w, state, m -} - -func TestNodeDrainWatcher_Interface(t *testing.T) { - ci.Parallel(t) - require := require.New(t) - w, _, _ := testNodeDrainWatcher(t) - require.Implements((*DrainingNodeWatcher)(nil), w) -} - -func TestNodeDrainWatcher_AddDraining(t *testing.T) { +// TestNodeDrainWatcher_AddNodes tests that new nodes are added to the node +// watcher and deadline notifier, but only if they have a drain spec. +func TestNodeDrainWatcher_AddNodes(t *testing.T) { ci.Parallel(t) - require := require.New(t) - _, state, m := testNodeDrainWatcher(t) + _, store, tracker := testNodeDrainWatcher(t) // Create two nodes, one draining and one not draining n1, n2 := mock.Node(), mock.Node() @@ -46,137 +31,245 @@ func TestNodeDrainWatcher_AddDraining(t *testing.T) { ForceDeadline: time.Now().Add(time.Hour), } - require.Nil(state.UpsertNode(structs.MsgTypeTestSetup, 100, n1)) - require.Nil(state.UpsertNode(structs.MsgTypeTestSetup, 101, n2)) + // Create a job with a running alloc on each node + job := mock.Job() + jobID := structs.NamespacedID{Namespace: job.Namespace, ID: job.ID} + must.NoError(t, store.UpsertJob(structs.MsgTypeTestSetup, 101, job)) - testutil.WaitForResult(func() (bool, error) { - return len(m.events()) == 1, nil - }, func(err error) { - t.Fatal("No node drain events") - }) + alloc1 := mock.Alloc() + alloc1.JobID = job.ID + alloc1.Job = job + alloc1.TaskGroup = job.TaskGroups[0].Name + alloc1.NodeID = n1.ID + alloc1.DeploymentStatus = &structs.AllocDeploymentStatus{Healthy: pointer.Of(true)} + alloc2 := alloc1.Copy() + alloc2.ID = uuid.Generate() + alloc2.NodeID = n2.ID + + must.NoError(t, store.UpsertAllocs( + structs.MsgTypeTestSetup, 102, []*structs.Allocation{alloc1, alloc2})) + must.NoError(t, store.UpsertNode(structs.MsgTypeTestSetup, 103, n1)) + must.NoError(t, store.UpsertNode(structs.MsgTypeTestSetup, 104, n2)) - tracked := m.TrackedNodes() - require.NotContains(tracked, n1.ID) - require.Contains(tracked, n2.ID) - require.Equal(n2, tracked[n2.ID]) + // Only 1 node is draining, and the other should not be tracked + assertTrackerSettled(t, tracker, []string{n2.ID}) + // Notifications should fire to the job watcher and deadline notifier + must.MapContainsKey(t, tracker.jobWatcher.(*MockJobWatcher).jobs, jobID) + must.MapContainsKey(t, tracker.deadlineNotifier.(*MockDeadlineNotifier).nodes, n2.ID) } +// TestNodeDrainWatcher_Remove tests that when a node should no longer be +// tracked that we stop tracking it in the node watcher and deadline notifier. func TestNodeDrainWatcher_Remove(t *testing.T) { ci.Parallel(t) - require := require.New(t) - _, state, m := testNodeDrainWatcher(t) + _, store, tracker := testNodeDrainWatcher(t) - // Create a draining node - n := mock.Node() - n.DrainStrategy = &structs.DrainStrategy{ - DrainSpec: structs.DrainSpec{ - Deadline: time.Hour, - }, - ForceDeadline: time.Now().Add(time.Hour), - } + t.Run("stop drain", func(t *testing.T) { + n, _ := testNodeDrainWatcherSetup(t, store, tracker) + + index, _ := store.LatestIndex() + must.NoError(t, store.UpdateNodeDrain( + structs.MsgTypeTestSetup, index+1, n.ID, nil, false, 0, nil, nil, "")) + + // Node with stopped drain should no longer be tracked + assertTrackerSettled(t, tracker, []string{}) + must.MapEmpty(t, tracker.deadlineNotifier.(*MockDeadlineNotifier).nodes) + }) + + t.Run("delete node", func(t *testing.T) { + n, _ := testNodeDrainWatcherSetup(t, store, tracker) + index, _ := store.LatestIndex() + index++ + must.NoError(t, store.DeleteNode(structs.MsgTypeTestSetup, index, []string{n.ID})) + + // Node with stopped drain should no longer be tracked + assertTrackerSettled(t, tracker, []string{}) + must.MapEmpty(t, tracker.deadlineNotifier.(*MockDeadlineNotifier).nodes) + }) + + t.Run("down node", func(t *testing.T) { + n, _ := testNodeDrainWatcherSetup(t, store, tracker) - // Wait for it to be tracked - require.Nil(state.UpsertNode(structs.MsgTypeTestSetup, 100, n)) - testutil.WaitForResult(func() (bool, error) { - return len(m.events()) == 1, nil - }, func(err error) { - t.Fatal("No node drain events") + index, _ := store.LatestIndex() + n = n.Copy() + n.Status = structs.NodeStatusDown + must.NoError(t, store.UpsertNode(structs.MsgTypeTestSetup, index+1, n)) + + // Down node should no longer be tracked + assertTrackerSettled(t, tracker, []string{}) + must.MapEmpty(t, tracker.deadlineNotifier.(*MockDeadlineNotifier).nodes) }) - tracked := m.TrackedNodes() - require.Contains(tracked, n.ID) - require.Equal(n, tracked[n.ID]) + t.Run("disconnected node", func(t *testing.T) { + n, _ := testNodeDrainWatcherSetup(t, store, tracker) + index, _ := store.LatestIndex() + n = n.Copy() + n.Status = structs.NodeStatusDisconnected + must.NoError(t, store.UpsertNode(structs.MsgTypeTestSetup, index+1, n)) - // Change the node to be not draining and wait for it to be untracked - require.Nil(state.UpdateNodeDrain(structs.MsgTypeTestSetup, 101, n.ID, nil, false, 0, nil, nil, "")) - testutil.WaitForResult(func() (bool, error) { - return len(m.events()) == 2, nil - }, func(err error) { - t.Fatal("No new node drain events") + // Disconnected node should no longer be tracked + assertTrackerSettled(t, tracker, []string{}) + must.MapEmpty(t, tracker.deadlineNotifier.(*MockDeadlineNotifier).nodes) }) +} + +// TestNodeDrainWatcher_Update_Spec tests drain spec updates emit events to the +// node watcher and deadline notifier. +func TestNodeDrainWatcher_Update_Spec(t *testing.T) { + ci.Parallel(t) + _, store, tracker := testNodeDrainWatcher(t) + n, _ := testNodeDrainWatcherSetup(t, store, tracker) + + // Update the spec to extend the deadline + strategy := n.DrainStrategy.Copy() + strategy.DrainSpec.Deadline += time.Hour + index, _ := store.LatestIndex() + must.NoError(t, store.UpdateNodeDrain( + structs.MsgTypeTestSetup, index+1, n.ID, strategy, false, time.Now().Unix(), + &structs.NodeEvent{}, map[string]string{}, "", + )) - tracked = m.TrackedNodes() - require.NotContains(tracked, n.ID) + // We should see a new event + assertTrackerSettled(t, tracker, []string{n.ID}) + + // Update the spec to have an infinite deadline + strategy = strategy.Copy() + strategy.DrainSpec.Deadline = 0 + + index, _ = store.LatestIndex() + must.NoError(t, store.UpdateNodeDrain( + structs.MsgTypeTestSetup, index+1, n.ID, strategy, false, time.Now().Unix(), + &structs.NodeEvent{}, map[string]string{}, "", + )) + + // We should see a new event and the node should still be tracked but no + // longer in the deadline notifier + assertTrackerSettled(t, tracker, []string{n.ID}) + must.MapEmpty(t, tracker.deadlineNotifier.(*MockDeadlineNotifier).nodes) } -func TestNodeDrainWatcher_Remove_Nonexistent(t *testing.T) { +// TestNodeDrainWatcher_Update_IsDone tests that a node drain without allocs +// immediately gets unmarked as draining, and that we unset drain if an operator +// drains a node with nothing on it. +func TestNodeDrainWatcher_Update_IsDone(t *testing.T) { ci.Parallel(t) - require := require.New(t) - _, state, m := testNodeDrainWatcher(t) + _, store, tracker := testNodeDrainWatcher(t) // Create a draining node n := mock.Node() - n.DrainStrategy = &structs.DrainStrategy{ - DrainSpec: structs.DrainSpec{ - Deadline: time.Hour, - }, + strategy := &structs.DrainStrategy{ + DrainSpec: structs.DrainSpec{Deadline: time.Hour}, ForceDeadline: time.Now().Add(time.Hour), } + n.DrainStrategy = strategy + must.NoError(t, store.UpsertNode(structs.MsgTypeTestSetup, 100, n)) - // Wait for it to be tracked - require.Nil(state.UpsertNode(structs.MsgTypeTestSetup, 100, n)) - testutil.WaitForResult(func() (bool, error) { - return len(m.events()) == 1, nil - }, func(err error) { - t.Fatal("No node drain events") - }) + // There are no jobs on this node so the drain should immediately + // complete. we should no longer be tracking the node and its drain strategy + // should be cleared + assertTrackerSettled(t, tracker, []string{}) + must.MapEmpty(t, tracker.jobWatcher.(*MockJobWatcher).jobs) + must.MapEmpty(t, tracker.deadlineNotifier.(*MockDeadlineNotifier).nodes) + n, _ = store.NodeByID(nil, n.ID) + must.Nil(t, n.DrainStrategy) +} + +// TestNodeDrainWatcher_Update_DrainComplete tests that allocation updates that +// complete the drain emits events to the node watcher and deadline notifier. +func TestNodeDrainWatcher_Update_DrainComplete(t *testing.T) { + ci.Parallel(t) + _, store, tracker := testNodeDrainWatcher(t) + n, _ := testNodeDrainWatcherSetup(t, store, tracker) - tracked := m.TrackedNodes() - require.Contains(tracked, n.ID) - require.Equal(n, tracked[n.ID]) + // Simulate event: an alloc is terminal so DrainingJobWatcher.Migrated + // channel updates NodeDrainer, which updates Raft + _, err := tracker.raft.NodesDrainComplete([]string{n.ID}, + structs.NewNodeEvent(). + SetSubsystem(structs.NodeEventSubsystemDrain). + SetMessage(NodeDrainEventComplete)) + must.NoError(t, err) - // Delete the node - require.Nil(state.DeleteNode(structs.MsgTypeTestSetup, 101, []string{n.ID})) - testutil.WaitForResult(func() (bool, error) { - return len(m.events()) == 2, nil - }, func(err error) { - t.Fatal("No new node drain events") - }) + assertTrackerSettled(t, tracker, []string{}) - tracked = m.TrackedNodes() - require.NotContains(tracked, n.ID) + n, _ = store.NodeByID(nil, n.ID) + must.Nil(t, n.DrainStrategy) + must.MapEmpty(t, tracker.deadlineNotifier.(*MockDeadlineNotifier).nodes) } -func TestNodeDrainWatcher_Update(t *testing.T) { - ci.Parallel(t) - require := require.New(t) - _, state, m := testNodeDrainWatcher(t) +func testNodeDrainWatcherSetup( + t *testing.T, store *state.StateStore, tracker *NodeDrainer) ( + *structs.Node, structs.NamespacedID) { - // Create a draining node - n := mock.Node() - n.DrainStrategy = &structs.DrainStrategy{ - DrainSpec: structs.DrainSpec{ - Deadline: time.Hour, - }, + t.Helper() + index, _ := store.LatestIndex() + + // Create a job that will have an alloc on our node + job := mock.Job() + jobID := structs.NamespacedID{Namespace: job.Namespace, ID: job.ID} + index++ + must.NoError(t, store.UpsertJob(structs.MsgTypeTestSetup, index, job)) + + // Create draining nodes, each with its own alloc for the job running on that node + node := mock.Node() + node.DrainStrategy = &structs.DrainStrategy{ + DrainSpec: structs.DrainSpec{Deadline: time.Hour}, ForceDeadline: time.Now().Add(time.Hour), } - // Wait for it to be tracked - require.Nil(state.UpsertNode(structs.MsgTypeTestSetup, 100, n)) - testutil.WaitForResult(func() (bool, error) { - return len(m.events()) == 1, nil - }, func(err error) { - t.Fatal("No node drain events") - }) + alloc := mock.Alloc() + alloc.JobID = job.ID + alloc.Job = job + alloc.TaskGroup = job.TaskGroups[0].Name + alloc.NodeID = node.ID + alloc.DeploymentStatus = &structs.AllocDeploymentStatus{Healthy: pointer.Of(true)} + index++ + must.NoError(t, store.UpsertAllocs( + structs.MsgTypeTestSetup, index, []*structs.Allocation{alloc})) - tracked := m.TrackedNodes() - require.Contains(tracked, n.ID) - require.Equal(n, tracked[n.ID]) + index++ + must.NoError(t, store.UpsertNode(structs.MsgTypeTestSetup, index, node)) - // Change the node to have a new spec - s2 := n.DrainStrategy.Copy() - s2.Deadline += time.Hour - require.Nil(state.UpdateNodeDrain(structs.MsgTypeTestSetup, 101, n.ID, s2, false, 0, nil, nil, "")) + // Node should be tracked and notifications should fire to the job watcher + // and deadline notifier + assertTrackerSettled(t, tracker, []string{node.ID}) + must.MapContainsKey(t, tracker.jobWatcher.(*MockJobWatcher).jobs, jobID) + must.MapContainsKeys(t, + tracker.deadlineNotifier.(*MockDeadlineNotifier).nodes, []string{node.ID}) - // Wait for it to be updated - testutil.WaitForResult(func() (bool, error) { - return len(m.events()) == 2, nil - }, func(err error) { - t.Fatal("No new node drain events") - }) + return node, jobID +} + +func assertTrackerSettled(t *testing.T, tracker *NodeDrainer, nodeIDs []string) { + t.Helper() - tracked = m.TrackedNodes() - require.Contains(tracked, n.ID) - require.Equal(s2, tracked[n.ID].DrainStrategy) + must.Wait(t, wait.InitialSuccess( + wait.Timeout(100*time.Millisecond), + wait.Gap(time.Millisecond), + wait.TestFunc(func() (bool, error) { + if len(tracker.TrackedNodes()) != len(nodeIDs) { + return false, fmt.Errorf( + "expected nodes %v to become marked draining, got %d", + nodeIDs, len(tracker.TrackedNodes())) + } + return true, nil + }), + )) + + must.Wait(t, wait.ContinualSuccess( + wait.Timeout(100*time.Millisecond), + wait.Gap(10*time.Millisecond), + wait.TestFunc(func() (bool, error) { + if len(tracker.TrackedNodes()) != len(nodeIDs) { + return false, fmt.Errorf( + "expected nodes %v to stay marked draining, got %d", + nodeIDs, len(tracker.TrackedNodes())) + } + return true, nil + }), + )) + + for _, nodeID := range nodeIDs { + must.MapContainsKey(t, tracker.TrackedNodes(), nodeID) + } }