From cab3bd2ad90664b63e99e09ab2f3fc8d87d0d1bc Mon Sep 17 00:00:00 2001 From: Tim Gross Date: Tue, 21 Mar 2023 16:19:18 -0400 Subject: [PATCH] drainer: stop watching deleted, down, or disconnected nodes When a node is down or disconnected, we can no longer gracefully migrate its allocations. Any evaluations we need to replace the allocations will have already been created by the heartbeater, so there's no more work for the drainer to do. Stop watching nodes in this state. Also, the blocking query for nodes set the maximum index to the highest index of a node it found, rather than the index of the nodes table. This misses updates to the index from deleting nodes. This was done as an performance optimization to avoid excessive unblocking, but because the query is over all nodes anyways there's no optimization to be had here. Remove the optimization so we can detect deleted nodes without having to wait for an update to an unrelated node. This changeset also refactors the tests of the draining node watcher so that we don't mock the node watcher's `Remove` and `Update` methods for its own tests. Instead we'll mock the node watcher's dependencies (the job watcher and deadline notifier) and now unit tests can cover the real code. This allows us to remove a bunch of TODOs in `watch_nodes.go` around testing. --- .changelog/16612.txt | 3 + nomad/drainer/drain_testing.go | 140 ++++++++++--- nomad/drainer/watch_jobs.go | 2 +- nomad/drainer/watch_nodes.go | 41 ++-- nomad/drainer/watch_nodes_test.go | 337 +++++++++++++++++++----------- 5 files changed, 352 insertions(+), 171 deletions(-) create mode 100644 .changelog/16612.txt diff --git a/.changelog/16612.txt b/.changelog/16612.txt new file mode 100644 index 00000000000..595845c9d12 --- /dev/null +++ b/.changelog/16612.txt @@ -0,0 +1,3 @@ +```release-note:bug +drainer: Fixed a bug where draining nodes that become lost or disconnected were still tracked by the drainer +``` diff --git a/nomad/drainer/drain_testing.go b/nomad/drainer/drain_testing.go index 61de31d813d..bb8aa3ec5b6 100644 --- a/nomad/drainer/drain_testing.go +++ b/nomad/drainer/drain_testing.go @@ -1,52 +1,142 @@ package drainer import ( + "context" "sync" + "testing" + "time" + "golang.org/x/time/rate" + + "github.com/hashicorp/nomad/helper/testlog" + "github.com/hashicorp/nomad/nomad/state" "github.com/hashicorp/nomad/nomad/structs" ) -type MockNodeTrackerEvent struct { - NodeUpdate *structs.Node - NodeRemove string -} +// This file contains helpers for testing. The raft shims make it hard to test +// the whole package behavior of the drainer. See also nomad/drainer_int_test.go +// for integration tests. -type MockNodeTracker struct { - Nodes map[string]*structs.Node - Events []*MockNodeTrackerEvent +type MockJobWatcher struct { + drainCh chan *DrainRequest + migratedCh chan []*structs.Allocation + jobs map[structs.NamespacedID]struct{} sync.Mutex } -func NewMockNodeTracker() *MockNodeTracker { - return &MockNodeTracker{ - Nodes: make(map[string]*structs.Node), - Events: make([]*MockNodeTrackerEvent, 0, 16), +// RegisterJobs marks the job as being watched +func (m *MockJobWatcher) RegisterJobs(jobs []structs.NamespacedID) { + m.Lock() + defer m.Unlock() + for _, job := range jobs { + m.jobs[job] = struct{}{} } } -func (m *MockNodeTracker) TrackedNodes() map[string]*structs.Node { - m.Lock() - defer m.Unlock() - return m.Nodes +// Drain returns the DrainRequest channel. Tests can send on this channel to +// simulate steps through the NodeDrainer watch loop. (Sending on this channel +// will block anywhere else.) +func (m *MockJobWatcher) Drain() <-chan *DrainRequest { + return m.drainCh } -func (m *MockNodeTracker) Remove(nodeID string) { - m.Lock() - defer m.Unlock() - delete(m.Nodes, nodeID) - m.Events = append(m.Events, &MockNodeTrackerEvent{NodeRemove: nodeID}) +// Migrated returns the channel of migrated allocations. Tests can send on this +// channel to simulate steps through the NodeDrainer watch loop. (Sending on +// this channel will block anywhere else.) +func (m *MockJobWatcher) Migrated() <-chan []*structs.Allocation { + return m.migratedCh +} + +type MockDeadlineNotifier struct { + expiredCh <-chan []string + nodes map[string]struct{} + sync.Mutex } -func (m *MockNodeTracker) Update(node *structs.Node) { +// NextBatch returns the channel of expired nodes. Tests can send on this +// channel to simulate timer events in the NodeDrainer watch loop. (Sending on +// this channel will block anywhere else.) +func (m *MockDeadlineNotifier) NextBatch() <-chan []string { + return m.expiredCh +} + +// Remove removes the given node from being tracked for a deadline. +func (m *MockDeadlineNotifier) Remove(nodeID string) { m.Lock() defer m.Unlock() - m.Nodes[node.ID] = node - m.Events = append(m.Events, &MockNodeTrackerEvent{NodeUpdate: node}) + delete(m.nodes, nodeID) } -func (m *MockNodeTracker) events() []*MockNodeTrackerEvent { +// Watch marks the node as being watched; this mock throws out the timer in lieu +// of manully sending on the channel to avoid racy tests. +func (m *MockDeadlineNotifier) Watch(nodeID string, _ time.Time) { m.Lock() defer m.Unlock() + m.nodes[nodeID] = struct{}{} +} + +type MockRaftApplierShim struct { + state *state.StateStore +} + +// AllocUpdateDesiredTransition mocks a write to raft as a state store update +func (m *MockRaftApplierShim) AllocUpdateDesiredTransition( + allocs map[string]*structs.DesiredTransition, evals []*structs.Evaluation) (uint64, error) { + index, _ := m.state.LatestIndex() + index++ + err := m.state.UpdateAllocsDesiredTransitions(structs.MsgTypeTestSetup, index, allocs, evals) + return index, err +} + +// NodesDrainComplete mocks a write to raft as a state store update +func (m *MockRaftApplierShim) NodesDrainComplete( + nodes []string, event *structs.NodeEvent) (uint64, error) { + index, _ := m.state.LatestIndex() + index++ + + updates := make(map[string]*structs.DrainUpdate, len(nodes)) + nodeEvents := make(map[string]*structs.NodeEvent, len(nodes)) + update := &structs.DrainUpdate{} + for _, node := range nodes { + updates[node] = update + if event != nil { + nodeEvents[node] = event + } + } + now := time.Now().Unix() + + err := m.state.BatchUpdateNodeDrain(structs.MsgTypeTestSetup, index, now, + updates, nodeEvents) + + return index, err +} + +func testNodeDrainWatcher(t *testing.T) (*nodeDrainWatcher, *state.StateStore, *NodeDrainer) { + t.Helper() + store := state.TestStateStore(t) + limiter := rate.NewLimiter(100.0, 100) + logger := testlog.HCLogger(t) + + drainer := &NodeDrainer{ + enabled: false, + logger: logger, + nodes: map[string]*drainingNode{}, + jobWatcher: &MockJobWatcher{ + drainCh: make(chan *DrainRequest), + migratedCh: make(chan []*structs.Allocation), + jobs: map[structs.NamespacedID]struct{}{}, + }, + deadlineNotifier: &MockDeadlineNotifier{ + expiredCh: make(<-chan []string), + nodes: map[string]struct{}{}, + }, + state: store, + queryLimiter: limiter, + raft: &MockRaftApplierShim{state: store}, + batcher: allocMigrateBatcher{}, + } - return m.Events + w := NewNodeDrainWatcher(context.Background(), limiter, store, logger, drainer) + drainer.nodeWatcher = w + return w, store, drainer } diff --git a/nomad/drainer/watch_jobs.go b/nomad/drainer/watch_jobs.go index 36d301cc3b5..1ed4402213f 100644 --- a/nomad/drainer/watch_jobs.go +++ b/nomad/drainer/watch_jobs.go @@ -29,7 +29,7 @@ func NewDrainRequest(allocs []*structs.Allocation) *DrainRequest { // DrainingJobWatcher is the interface for watching a job drain type DrainingJobWatcher interface { // RegisterJob is used to start watching a draining job - RegisterJobs(job []structs.NamespacedID) + RegisterJobs(jobs []structs.NamespacedID) // Drain is used to emit allocations that should be drained. Drain() <-chan *DrainRequest diff --git a/nomad/drainer/watch_nodes.go b/nomad/drainer/watch_nodes.go index 31ce4357e76..32d40745526 100644 --- a/nomad/drainer/watch_nodes.go +++ b/nomad/drainer/watch_nodes.go @@ -33,7 +33,6 @@ func (n *NodeDrainer) Remove(nodeID string) { n.l.Lock() defer n.l.Unlock() - // TODO test the notifier is updated // Remove it from being tracked and remove it from the dealiner delete(n.nodes, nodeID) n.deadlineNotifier.Remove(nodeID) @@ -58,7 +57,6 @@ func (n *NodeDrainer) Update(node *structs.Node) { draining.Update(node) } - // TODO test the notifier is updated if inf, deadline := node.DrainStrategy.DeadlineTime(); !inf { n.deadlineNotifier.Watch(node.ID, deadline) } else { @@ -67,7 +65,6 @@ func (n *NodeDrainer) Update(node *structs.Node) { n.deadlineNotifier.Remove(node.ID) } - // TODO Test this // Register interest in the draining jobs. jobs, err := draining.DrainingJobs() if err != nil { @@ -77,8 +74,6 @@ func (n *NodeDrainer) Update(node *structs.Node) { n.logger.Trace("node has draining jobs on it", "node_id", node.ID, "num_jobs", len(jobs)) n.jobWatcher.RegisterJobs(jobs) - // TODO Test at this layer as well that a node drain on a node without - // allocs immediately gets unmarked as draining // Check if the node is done such that if an operator drains a node with // nothing on it we unset drain done, err := draining.IsDone() @@ -88,8 +83,8 @@ func (n *NodeDrainer) Update(node *structs.Node) { } if done { - // Node is done draining. Stop remaining system allocs before - // marking node as complete. + // Node is done draining. Stop remaining system allocs before marking + // node as complete. remaining, err := draining.RemainingAllocs() if err != nil { n.logger.Error("error getting remaining allocs on drained node", "node_id", node.ID, "error", err) @@ -179,21 +174,31 @@ func (w *nodeDrainWatcher) watch() { currentNode, tracked := tracked[nodeID] switch { - // If the node is tracked but not draining, untrack + case tracked && !newDraining: + // If the node is tracked but not draining, untrack w.tracker.Remove(nodeID) - // If the node is not being tracked but is draining, track + case !tracked && node.UnresponsiveStatus(): + // Down or Disconnected nodes can't be migrated, so skip + // tracking them + case !tracked && newDraining: + // If the node is not being tracked but is draining, track w.tracker.Update(node) - // If the node is being tracked but has changed, update: + case tracked && node.UnresponsiveStatus(): + // Down or Disconnected nodes can't be migrated, so stop + // tracking them. The node update event that changed the status + // will trigger all the evaluations we need + w.tracker.Remove(nodeID) + case tracked && newDraining && !currentNode.DrainStrategy.Equal(node.DrainStrategy): + // If the node is being tracked but has changed, update w.tracker.Update(node) + default: } - - // TODO(schmichael) handle the case of a lost node } for nodeID := range tracked { @@ -219,7 +224,7 @@ func (w *nodeDrainWatcher) getNodes(minIndex uint64) (map[string]*structs.Node, } // getNodesImpl is used to get nodes from the state store, returning the set of -// nodes and the given index. +// nodes and the current node table index. func (w *nodeDrainWatcher) getNodesImpl(ws memdb.WatchSet, state *state.StateStore) (interface{}, uint64, error) { iter, err := state.Nodes(ws) if err != nil { @@ -231,7 +236,6 @@ func (w *nodeDrainWatcher) getNodesImpl(ws memdb.WatchSet, state *state.StateSto return nil, 0, err } - var maxIndex uint64 = 0 resp := make(map[string]*structs.Node, 64) for { raw := iter.Next() @@ -241,15 +245,6 @@ func (w *nodeDrainWatcher) getNodesImpl(ws memdb.WatchSet, state *state.StateSto node := raw.(*structs.Node) resp[node.ID] = node - if maxIndex < node.ModifyIndex { - maxIndex = node.ModifyIndex - } - } - - // Prefer using the actual max index of affected nodes since it means less - // unblocking - if maxIndex != 0 { - index = maxIndex } return resp, index, nil diff --git a/nomad/drainer/watch_nodes_test.go b/nomad/drainer/watch_nodes_test.go index f1154c46068..6d858515e62 100644 --- a/nomad/drainer/watch_nodes_test.go +++ b/nomad/drainer/watch_nodes_test.go @@ -1,41 +1,26 @@ package drainer import ( - "context" + "fmt" "testing" "time" + "github.com/shoenig/test/must" + "github.com/shoenig/test/wait" + "github.com/hashicorp/nomad/ci" - "github.com/hashicorp/nomad/helper/testlog" + "github.com/hashicorp/nomad/helper/pointer" + "github.com/hashicorp/nomad/helper/uuid" "github.com/hashicorp/nomad/nomad/mock" "github.com/hashicorp/nomad/nomad/state" "github.com/hashicorp/nomad/nomad/structs" - "github.com/hashicorp/nomad/testutil" - "github.com/stretchr/testify/require" - "golang.org/x/time/rate" ) -func testNodeDrainWatcher(t *testing.T) (*nodeDrainWatcher, *state.StateStore, *MockNodeTracker) { - t.Helper() - state := state.TestStateStore(t) - limiter := rate.NewLimiter(100.0, 100) - logger := testlog.HCLogger(t) - m := NewMockNodeTracker() - w := NewNodeDrainWatcher(context.Background(), limiter, state, logger, m) - return w, state, m -} - -func TestNodeDrainWatcher_Interface(t *testing.T) { - ci.Parallel(t) - require := require.New(t) - w, _, _ := testNodeDrainWatcher(t) - require.Implements((*DrainingNodeWatcher)(nil), w) -} - -func TestNodeDrainWatcher_AddDraining(t *testing.T) { +// TestNodeDrainWatcher_AddNodes tests that new nodes are added to the node +// watcher and deadline notifier, but only if they have a drain spec. +func TestNodeDrainWatcher_AddNodes(t *testing.T) { ci.Parallel(t) - require := require.New(t) - _, state, m := testNodeDrainWatcher(t) + _, store, tracker := testNodeDrainWatcher(t) // Create two nodes, one draining and one not draining n1, n2 := mock.Node(), mock.Node() @@ -46,137 +31,245 @@ func TestNodeDrainWatcher_AddDraining(t *testing.T) { ForceDeadline: time.Now().Add(time.Hour), } - require.Nil(state.UpsertNode(structs.MsgTypeTestSetup, 100, n1)) - require.Nil(state.UpsertNode(structs.MsgTypeTestSetup, 101, n2)) + // Create a job with a running alloc on each node + job := mock.Job() + jobID := structs.NamespacedID{Namespace: job.Namespace, ID: job.ID} + must.NoError(t, store.UpsertJob(structs.MsgTypeTestSetup, 101, job)) - testutil.WaitForResult(func() (bool, error) { - return len(m.events()) == 1, nil - }, func(err error) { - t.Fatal("No node drain events") - }) + alloc1 := mock.Alloc() + alloc1.JobID = job.ID + alloc1.Job = job + alloc1.TaskGroup = job.TaskGroups[0].Name + alloc1.NodeID = n1.ID + alloc1.DeploymentStatus = &structs.AllocDeploymentStatus{Healthy: pointer.Of(true)} + alloc2 := alloc1.Copy() + alloc2.ID = uuid.Generate() + alloc2.NodeID = n2.ID + + must.NoError(t, store.UpsertAllocs( + structs.MsgTypeTestSetup, 102, []*structs.Allocation{alloc1, alloc2})) + must.NoError(t, store.UpsertNode(structs.MsgTypeTestSetup, 103, n1)) + must.NoError(t, store.UpsertNode(structs.MsgTypeTestSetup, 104, n2)) - tracked := m.TrackedNodes() - require.NotContains(tracked, n1.ID) - require.Contains(tracked, n2.ID) - require.Equal(n2, tracked[n2.ID]) + // Only 1 node is draining, and the other should not be tracked + assertTrackerSettled(t, tracker, []string{n2.ID}) + // Notifications should fire to the job watcher and deadline notifier + must.MapContainsKey(t, tracker.jobWatcher.(*MockJobWatcher).jobs, jobID) + must.MapContainsKey(t, tracker.deadlineNotifier.(*MockDeadlineNotifier).nodes, n2.ID) } +// TestNodeDrainWatcher_Remove tests that when a node should no longer be +// tracked that we stop tracking it in the node watcher and deadline notifier. func TestNodeDrainWatcher_Remove(t *testing.T) { ci.Parallel(t) - require := require.New(t) - _, state, m := testNodeDrainWatcher(t) + _, store, tracker := testNodeDrainWatcher(t) - // Create a draining node - n := mock.Node() - n.DrainStrategy = &structs.DrainStrategy{ - DrainSpec: structs.DrainSpec{ - Deadline: time.Hour, - }, - ForceDeadline: time.Now().Add(time.Hour), - } + t.Run("stop drain", func(t *testing.T) { + n, _ := testNodeDrainWatcherSetup(t, store, tracker) + + index, _ := store.LatestIndex() + must.NoError(t, store.UpdateNodeDrain( + structs.MsgTypeTestSetup, index+1, n.ID, nil, false, 0, nil, nil, "")) + + // Node with stopped drain should no longer be tracked + assertTrackerSettled(t, tracker, []string{}) + must.MapEmpty(t, tracker.deadlineNotifier.(*MockDeadlineNotifier).nodes) + }) + + t.Run("delete node", func(t *testing.T) { + n, _ := testNodeDrainWatcherSetup(t, store, tracker) + index, _ := store.LatestIndex() + index++ + must.NoError(t, store.DeleteNode(structs.MsgTypeTestSetup, index, []string{n.ID})) + + // Node with stopped drain should no longer be tracked + assertTrackerSettled(t, tracker, []string{}) + must.MapEmpty(t, tracker.deadlineNotifier.(*MockDeadlineNotifier).nodes) + }) + + t.Run("down node", func(t *testing.T) { + n, _ := testNodeDrainWatcherSetup(t, store, tracker) - // Wait for it to be tracked - require.Nil(state.UpsertNode(structs.MsgTypeTestSetup, 100, n)) - testutil.WaitForResult(func() (bool, error) { - return len(m.events()) == 1, nil - }, func(err error) { - t.Fatal("No node drain events") + index, _ := store.LatestIndex() + n = n.Copy() + n.Status = structs.NodeStatusDown + must.NoError(t, store.UpsertNode(structs.MsgTypeTestSetup, index+1, n)) + + // Down node should no longer be tracked + assertTrackerSettled(t, tracker, []string{}) + must.MapEmpty(t, tracker.deadlineNotifier.(*MockDeadlineNotifier).nodes) }) - tracked := m.TrackedNodes() - require.Contains(tracked, n.ID) - require.Equal(n, tracked[n.ID]) + t.Run("disconnected node", func(t *testing.T) { + n, _ := testNodeDrainWatcherSetup(t, store, tracker) + index, _ := store.LatestIndex() + n = n.Copy() + n.Status = structs.NodeStatusDisconnected + must.NoError(t, store.UpsertNode(structs.MsgTypeTestSetup, index+1, n)) - // Change the node to be not draining and wait for it to be untracked - require.Nil(state.UpdateNodeDrain(structs.MsgTypeTestSetup, 101, n.ID, nil, false, 0, nil, nil, "")) - testutil.WaitForResult(func() (bool, error) { - return len(m.events()) == 2, nil - }, func(err error) { - t.Fatal("No new node drain events") + // Disconnected node should no longer be tracked + assertTrackerSettled(t, tracker, []string{}) + must.MapEmpty(t, tracker.deadlineNotifier.(*MockDeadlineNotifier).nodes) }) +} + +// TestNodeDrainWatcher_Update_Spec tests drain spec updates emit events to the +// node watcher and deadline notifier. +func TestNodeDrainWatcher_Update_Spec(t *testing.T) { + ci.Parallel(t) + _, store, tracker := testNodeDrainWatcher(t) + n, _ := testNodeDrainWatcherSetup(t, store, tracker) + + // Update the spec to extend the deadline + strategy := n.DrainStrategy.Copy() + strategy.DrainSpec.Deadline += time.Hour + index, _ := store.LatestIndex() + must.NoError(t, store.UpdateNodeDrain( + structs.MsgTypeTestSetup, index+1, n.ID, strategy, false, time.Now().Unix(), + &structs.NodeEvent{}, map[string]string{}, "", + )) - tracked = m.TrackedNodes() - require.NotContains(tracked, n.ID) + // We should see a new event + assertTrackerSettled(t, tracker, []string{n.ID}) + + // Update the spec to have an infinite deadline + strategy = strategy.Copy() + strategy.DrainSpec.Deadline = 0 + + index, _ = store.LatestIndex() + must.NoError(t, store.UpdateNodeDrain( + structs.MsgTypeTestSetup, index+1, n.ID, strategy, false, time.Now().Unix(), + &structs.NodeEvent{}, map[string]string{}, "", + )) + + // We should see a new event and the node should still be tracked but no + // longer in the deadline notifier + assertTrackerSettled(t, tracker, []string{n.ID}) + must.MapEmpty(t, tracker.deadlineNotifier.(*MockDeadlineNotifier).nodes) } -func TestNodeDrainWatcher_Remove_Nonexistent(t *testing.T) { +// TestNodeDrainWatcher_Update_IsDone tests that a node drain without allocs +// immediately gets unmarked as draining, and that we unset drain if an operator +// drains a node with nothing on it. +func TestNodeDrainWatcher_Update_IsDone(t *testing.T) { ci.Parallel(t) - require := require.New(t) - _, state, m := testNodeDrainWatcher(t) + _, store, tracker := testNodeDrainWatcher(t) // Create a draining node n := mock.Node() - n.DrainStrategy = &structs.DrainStrategy{ - DrainSpec: structs.DrainSpec{ - Deadline: time.Hour, - }, + strategy := &structs.DrainStrategy{ + DrainSpec: structs.DrainSpec{Deadline: time.Hour}, ForceDeadline: time.Now().Add(time.Hour), } + n.DrainStrategy = strategy + must.NoError(t, store.UpsertNode(structs.MsgTypeTestSetup, 100, n)) - // Wait for it to be tracked - require.Nil(state.UpsertNode(structs.MsgTypeTestSetup, 100, n)) - testutil.WaitForResult(func() (bool, error) { - return len(m.events()) == 1, nil - }, func(err error) { - t.Fatal("No node drain events") - }) + // There are no jobs on this node so the drain should immediately + // complete. we should no longer be tracking the node and its drain strategy + // should be cleared + assertTrackerSettled(t, tracker, []string{}) + must.MapEmpty(t, tracker.jobWatcher.(*MockJobWatcher).jobs) + must.MapEmpty(t, tracker.deadlineNotifier.(*MockDeadlineNotifier).nodes) + n, _ = store.NodeByID(nil, n.ID) + must.Nil(t, n.DrainStrategy) +} + +// TestNodeDrainWatcher_Update_DrainComplete tests that allocation updates that +// complete the drain emits events to the node watcher and deadline notifier. +func TestNodeDrainWatcher_Update_DrainComplete(t *testing.T) { + ci.Parallel(t) + _, store, tracker := testNodeDrainWatcher(t) + n, _ := testNodeDrainWatcherSetup(t, store, tracker) - tracked := m.TrackedNodes() - require.Contains(tracked, n.ID) - require.Equal(n, tracked[n.ID]) + // Simulate event: an alloc is terminal so DrainingJobWatcher.Migrated + // channel updates NodeDrainer, which updates Raft + _, err := tracker.raft.NodesDrainComplete([]string{n.ID}, + structs.NewNodeEvent(). + SetSubsystem(structs.NodeEventSubsystemDrain). + SetMessage(NodeDrainEventComplete)) + must.NoError(t, err) - // Delete the node - require.Nil(state.DeleteNode(structs.MsgTypeTestSetup, 101, []string{n.ID})) - testutil.WaitForResult(func() (bool, error) { - return len(m.events()) == 2, nil - }, func(err error) { - t.Fatal("No new node drain events") - }) + assertTrackerSettled(t, tracker, []string{}) - tracked = m.TrackedNodes() - require.NotContains(tracked, n.ID) + n, _ = store.NodeByID(nil, n.ID) + must.Nil(t, n.DrainStrategy) + must.MapEmpty(t, tracker.deadlineNotifier.(*MockDeadlineNotifier).nodes) } -func TestNodeDrainWatcher_Update(t *testing.T) { - ci.Parallel(t) - require := require.New(t) - _, state, m := testNodeDrainWatcher(t) +func testNodeDrainWatcherSetup( + t *testing.T, store *state.StateStore, tracker *NodeDrainer) ( + *structs.Node, structs.NamespacedID) { - // Create a draining node - n := mock.Node() - n.DrainStrategy = &structs.DrainStrategy{ - DrainSpec: structs.DrainSpec{ - Deadline: time.Hour, - }, + t.Helper() + index, _ := store.LatestIndex() + + // Create a job that will have an alloc on our node + job := mock.Job() + jobID := structs.NamespacedID{Namespace: job.Namespace, ID: job.ID} + index++ + must.NoError(t, store.UpsertJob(structs.MsgTypeTestSetup, index, job)) + + // Create draining nodes, each with its own alloc for the job running on that node + node := mock.Node() + node.DrainStrategy = &structs.DrainStrategy{ + DrainSpec: structs.DrainSpec{Deadline: time.Hour}, ForceDeadline: time.Now().Add(time.Hour), } - // Wait for it to be tracked - require.Nil(state.UpsertNode(structs.MsgTypeTestSetup, 100, n)) - testutil.WaitForResult(func() (bool, error) { - return len(m.events()) == 1, nil - }, func(err error) { - t.Fatal("No node drain events") - }) + alloc := mock.Alloc() + alloc.JobID = job.ID + alloc.Job = job + alloc.TaskGroup = job.TaskGroups[0].Name + alloc.NodeID = node.ID + alloc.DeploymentStatus = &structs.AllocDeploymentStatus{Healthy: pointer.Of(true)} + index++ + must.NoError(t, store.UpsertAllocs( + structs.MsgTypeTestSetup, index, []*structs.Allocation{alloc})) - tracked := m.TrackedNodes() - require.Contains(tracked, n.ID) - require.Equal(n, tracked[n.ID]) + index++ + must.NoError(t, store.UpsertNode(structs.MsgTypeTestSetup, index, node)) - // Change the node to have a new spec - s2 := n.DrainStrategy.Copy() - s2.Deadline += time.Hour - require.Nil(state.UpdateNodeDrain(structs.MsgTypeTestSetup, 101, n.ID, s2, false, 0, nil, nil, "")) + // Node should be tracked and notifications should fire to the job watcher + // and deadline notifier + assertTrackerSettled(t, tracker, []string{node.ID}) + must.MapContainsKey(t, tracker.jobWatcher.(*MockJobWatcher).jobs, jobID) + must.MapContainsKeys(t, + tracker.deadlineNotifier.(*MockDeadlineNotifier).nodes, []string{node.ID}) - // Wait for it to be updated - testutil.WaitForResult(func() (bool, error) { - return len(m.events()) == 2, nil - }, func(err error) { - t.Fatal("No new node drain events") - }) + return node, jobID +} + +func assertTrackerSettled(t *testing.T, tracker *NodeDrainer, nodeIDs []string) { + t.Helper() - tracked = m.TrackedNodes() - require.Contains(tracked, n.ID) - require.Equal(s2, tracked[n.ID].DrainStrategy) + must.Wait(t, wait.InitialSuccess( + wait.Timeout(100*time.Millisecond), + wait.Gap(time.Millisecond), + wait.TestFunc(func() (bool, error) { + if len(tracker.TrackedNodes()) != len(nodeIDs) { + return false, fmt.Errorf( + "expected nodes %v to become marked draining, got %d", + nodeIDs, len(tracker.TrackedNodes())) + } + return true, nil + }), + )) + + must.Wait(t, wait.ContinualSuccess( + wait.Timeout(100*time.Millisecond), + wait.Gap(10*time.Millisecond), + wait.TestFunc(func() (bool, error) { + if len(tracker.TrackedNodes()) != len(nodeIDs) { + return false, fmt.Errorf( + "expected nodes %v to stay marked draining, got %d", + nodeIDs, len(tracker.TrackedNodes())) + } + return true, nil + }), + )) + + for _, nodeID := range nodeIDs { + must.MapContainsKey(t, tracker.TrackedNodes(), nodeID) + } }