Skip to content

Commit

Permalink
drainer: stop watching deleted, down, or disconnected nodes
Browse files Browse the repository at this point in the history
When a node is down or disconnected, we can no longer gracefully migrate its
allocations. Any evaluations we need to replace the allocations will have
already been created by the heartbeater, so there's no more work for the drainer
to do. Stop watching nodes in this state.

Also, the blocking query for nodes set the maximum index to the highest index of
a node it found, rather than the index of the nodes table. This misses updates
to the index from deleting nodes. This was done as an performance optimization
to avoid excessive unblocking, but because the query is over all nodes anyways
there's no optimization to be had here. Remove the optimization so we can detect
deleted nodes without having to wait for an update to an unrelated node.

This changeset also refactors the tests of the draining node watcher so that we
don't mock the node watcher's `Remove` and `Update` methods for its own
tests. Instead we'll mock the node watcher's dependencies (the job watcher and
deadline notifier) and now unit tests can cover the real code. This allows us to
remove a bunch of TODOs in `watch_nodes.go` around testing.
  • Loading branch information
tgross committed Mar 22, 2023
1 parent cc110f4 commit cab3bd2
Show file tree
Hide file tree
Showing 5 changed files with 352 additions and 171 deletions.
3 changes: 3 additions & 0 deletions .changelog/16612.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```release-note:bug
drainer: Fixed a bug where draining nodes that become lost or disconnected were still tracked by the drainer
```
140 changes: 115 additions & 25 deletions nomad/drainer/drain_testing.go
Original file line number Diff line number Diff line change
@@ -1,52 +1,142 @@
package drainer

import (
"context"
"sync"
"testing"
"time"

"golang.org/x/time/rate"

"github.com/hashicorp/nomad/helper/testlog"
"github.com/hashicorp/nomad/nomad/state"
"github.com/hashicorp/nomad/nomad/structs"
)

type MockNodeTrackerEvent struct {
NodeUpdate *structs.Node
NodeRemove string
}
// This file contains helpers for testing. The raft shims make it hard to test
// the whole package behavior of the drainer. See also nomad/drainer_int_test.go
// for integration tests.

type MockNodeTracker struct {
Nodes map[string]*structs.Node
Events []*MockNodeTrackerEvent
type MockJobWatcher struct {
drainCh chan *DrainRequest
migratedCh chan []*structs.Allocation
jobs map[structs.NamespacedID]struct{}
sync.Mutex
}

func NewMockNodeTracker() *MockNodeTracker {
return &MockNodeTracker{
Nodes: make(map[string]*structs.Node),
Events: make([]*MockNodeTrackerEvent, 0, 16),
// RegisterJobs marks the job as being watched
func (m *MockJobWatcher) RegisterJobs(jobs []structs.NamespacedID) {
m.Lock()
defer m.Unlock()
for _, job := range jobs {
m.jobs[job] = struct{}{}
}
}

func (m *MockNodeTracker) TrackedNodes() map[string]*structs.Node {
m.Lock()
defer m.Unlock()
return m.Nodes
// Drain returns the DrainRequest channel. Tests can send on this channel to
// simulate steps through the NodeDrainer watch loop. (Sending on this channel
// will block anywhere else.)
func (m *MockJobWatcher) Drain() <-chan *DrainRequest {
return m.drainCh
}

func (m *MockNodeTracker) Remove(nodeID string) {
m.Lock()
defer m.Unlock()
delete(m.Nodes, nodeID)
m.Events = append(m.Events, &MockNodeTrackerEvent{NodeRemove: nodeID})
// Migrated returns the channel of migrated allocations. Tests can send on this
// channel to simulate steps through the NodeDrainer watch loop. (Sending on
// this channel will block anywhere else.)
func (m *MockJobWatcher) Migrated() <-chan []*structs.Allocation {
return m.migratedCh
}

type MockDeadlineNotifier struct {
expiredCh <-chan []string
nodes map[string]struct{}
sync.Mutex
}

func (m *MockNodeTracker) Update(node *structs.Node) {
// NextBatch returns the channel of expired nodes. Tests can send on this
// channel to simulate timer events in the NodeDrainer watch loop. (Sending on
// this channel will block anywhere else.)
func (m *MockDeadlineNotifier) NextBatch() <-chan []string {
return m.expiredCh
}

// Remove removes the given node from being tracked for a deadline.
func (m *MockDeadlineNotifier) Remove(nodeID string) {
m.Lock()
defer m.Unlock()
m.Nodes[node.ID] = node
m.Events = append(m.Events, &MockNodeTrackerEvent{NodeUpdate: node})
delete(m.nodes, nodeID)
}

func (m *MockNodeTracker) events() []*MockNodeTrackerEvent {
// Watch marks the node as being watched; this mock throws out the timer in lieu
// of manully sending on the channel to avoid racy tests.
func (m *MockDeadlineNotifier) Watch(nodeID string, _ time.Time) {
m.Lock()
defer m.Unlock()
m.nodes[nodeID] = struct{}{}
}

type MockRaftApplierShim struct {
state *state.StateStore
}

// AllocUpdateDesiredTransition mocks a write to raft as a state store update
func (m *MockRaftApplierShim) AllocUpdateDesiredTransition(
allocs map[string]*structs.DesiredTransition, evals []*structs.Evaluation) (uint64, error) {
index, _ := m.state.LatestIndex()
index++
err := m.state.UpdateAllocsDesiredTransitions(structs.MsgTypeTestSetup, index, allocs, evals)
return index, err
}

// NodesDrainComplete mocks a write to raft as a state store update
func (m *MockRaftApplierShim) NodesDrainComplete(
nodes []string, event *structs.NodeEvent) (uint64, error) {
index, _ := m.state.LatestIndex()
index++

updates := make(map[string]*structs.DrainUpdate, len(nodes))
nodeEvents := make(map[string]*structs.NodeEvent, len(nodes))
update := &structs.DrainUpdate{}
for _, node := range nodes {
updates[node] = update
if event != nil {
nodeEvents[node] = event
}
}
now := time.Now().Unix()

err := m.state.BatchUpdateNodeDrain(structs.MsgTypeTestSetup, index, now,
updates, nodeEvents)

return index, err
}

func testNodeDrainWatcher(t *testing.T) (*nodeDrainWatcher, *state.StateStore, *NodeDrainer) {
t.Helper()
store := state.TestStateStore(t)
limiter := rate.NewLimiter(100.0, 100)
logger := testlog.HCLogger(t)

drainer := &NodeDrainer{
enabled: false,
logger: logger,
nodes: map[string]*drainingNode{},
jobWatcher: &MockJobWatcher{
drainCh: make(chan *DrainRequest),
migratedCh: make(chan []*structs.Allocation),
jobs: map[structs.NamespacedID]struct{}{},
},
deadlineNotifier: &MockDeadlineNotifier{
expiredCh: make(<-chan []string),
nodes: map[string]struct{}{},
},
state: store,
queryLimiter: limiter,
raft: &MockRaftApplierShim{state: store},
batcher: allocMigrateBatcher{},
}

return m.Events
w := NewNodeDrainWatcher(context.Background(), limiter, store, logger, drainer)
drainer.nodeWatcher = w
return w, store, drainer
}
2 changes: 1 addition & 1 deletion nomad/drainer/watch_jobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ func NewDrainRequest(allocs []*structs.Allocation) *DrainRequest {
// DrainingJobWatcher is the interface for watching a job drain
type DrainingJobWatcher interface {
// RegisterJob is used to start watching a draining job
RegisterJobs(job []structs.NamespacedID)
RegisterJobs(jobs []structs.NamespacedID)

// Drain is used to emit allocations that should be drained.
Drain() <-chan *DrainRequest
Expand Down
41 changes: 18 additions & 23 deletions nomad/drainer/watch_nodes.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ func (n *NodeDrainer) Remove(nodeID string) {
n.l.Lock()
defer n.l.Unlock()

// TODO test the notifier is updated
// Remove it from being tracked and remove it from the dealiner
delete(n.nodes, nodeID)
n.deadlineNotifier.Remove(nodeID)
Expand All @@ -58,7 +57,6 @@ func (n *NodeDrainer) Update(node *structs.Node) {
draining.Update(node)
}

// TODO test the notifier is updated
if inf, deadline := node.DrainStrategy.DeadlineTime(); !inf {
n.deadlineNotifier.Watch(node.ID, deadline)
} else {
Expand All @@ -67,7 +65,6 @@ func (n *NodeDrainer) Update(node *structs.Node) {
n.deadlineNotifier.Remove(node.ID)
}

// TODO Test this
// Register interest in the draining jobs.
jobs, err := draining.DrainingJobs()
if err != nil {
Expand All @@ -77,8 +74,6 @@ func (n *NodeDrainer) Update(node *structs.Node) {
n.logger.Trace("node has draining jobs on it", "node_id", node.ID, "num_jobs", len(jobs))
n.jobWatcher.RegisterJobs(jobs)

// TODO Test at this layer as well that a node drain on a node without
// allocs immediately gets unmarked as draining
// Check if the node is done such that if an operator drains a node with
// nothing on it we unset drain
done, err := draining.IsDone()
Expand All @@ -88,8 +83,8 @@ func (n *NodeDrainer) Update(node *structs.Node) {
}

if done {
// Node is done draining. Stop remaining system allocs before
// marking node as complete.
// Node is done draining. Stop remaining system allocs before marking
// node as complete.
remaining, err := draining.RemainingAllocs()
if err != nil {
n.logger.Error("error getting remaining allocs on drained node", "node_id", node.ID, "error", err)
Expand Down Expand Up @@ -179,21 +174,31 @@ func (w *nodeDrainWatcher) watch() {
currentNode, tracked := tracked[nodeID]

switch {
// If the node is tracked but not draining, untrack

case tracked && !newDraining:
// If the node is tracked but not draining, untrack
w.tracker.Remove(nodeID)

// If the node is not being tracked but is draining, track
case !tracked && node.UnresponsiveStatus():
// Down or Disconnected nodes can't be migrated, so skip
// tracking them

case !tracked && newDraining:
// If the node is not being tracked but is draining, track
w.tracker.Update(node)

// If the node is being tracked but has changed, update:
case tracked && node.UnresponsiveStatus():
// Down or Disconnected nodes can't be migrated, so stop
// tracking them. The node update event that changed the status
// will trigger all the evaluations we need
w.tracker.Remove(nodeID)

case tracked && newDraining && !currentNode.DrainStrategy.Equal(node.DrainStrategy):
// If the node is being tracked but has changed, update
w.tracker.Update(node)

default:
}

// TODO(schmichael) handle the case of a lost node
}

for nodeID := range tracked {
Expand All @@ -219,7 +224,7 @@ func (w *nodeDrainWatcher) getNodes(minIndex uint64) (map[string]*structs.Node,
}

// getNodesImpl is used to get nodes from the state store, returning the set of
// nodes and the given index.
// nodes and the current node table index.
func (w *nodeDrainWatcher) getNodesImpl(ws memdb.WatchSet, state *state.StateStore) (interface{}, uint64, error) {
iter, err := state.Nodes(ws)
if err != nil {
Expand All @@ -231,7 +236,6 @@ func (w *nodeDrainWatcher) getNodesImpl(ws memdb.WatchSet, state *state.StateSto
return nil, 0, err
}

var maxIndex uint64 = 0
resp := make(map[string]*structs.Node, 64)
for {
raw := iter.Next()
Expand All @@ -241,15 +245,6 @@ func (w *nodeDrainWatcher) getNodesImpl(ws memdb.WatchSet, state *state.StateSto

node := raw.(*structs.Node)
resp[node.ID] = node
if maxIndex < node.ModifyIndex {
maxIndex = node.ModifyIndex
}
}

// Prefer using the actual max index of affected nodes since it means less
// unblocking
if maxIndex != 0 {
index = maxIndex
}

return resp, index, nil
Expand Down
Loading

0 comments on commit cab3bd2

Please sign in to comment.