Skip to content

Commit

Permalink
Backport of drainer: test refactoring to clarify behavior around dele…
Browse files Browse the repository at this point in the history
…te/down nodes into release/1.5.x (#16622)

This pull request was automerged via backport-assistant
  • Loading branch information
hc-github-team-nomad-core authored Mar 23, 2023
1 parent b11eeb4 commit e865333
Show file tree
Hide file tree
Showing 4 changed files with 346 additions and 171 deletions.
142 changes: 117 additions & 25 deletions nomad/drainer/drain_testing.go
Original file line number Diff line number Diff line change
@@ -1,52 +1,144 @@
package drainer

import (
"context"
"sync"
"testing"
"time"

"golang.org/x/time/rate"

"github.com/hashicorp/nomad/helper/testlog"
"github.com/hashicorp/nomad/nomad/state"
"github.com/hashicorp/nomad/nomad/structs"
)

type MockNodeTrackerEvent struct {
NodeUpdate *structs.Node
NodeRemove string
}
// This file contains helpers for testing. The raft shims make it hard to test
// the whole package behavior of the drainer. See also nomad/drainer_int_test.go
// for integration tests.

type MockNodeTracker struct {
Nodes map[string]*structs.Node
Events []*MockNodeTrackerEvent
type MockJobWatcher struct {
drainCh chan *DrainRequest
migratedCh chan []*structs.Allocation
jobs map[structs.NamespacedID]struct{}
sync.Mutex
}

func NewMockNodeTracker() *MockNodeTracker {
return &MockNodeTracker{
Nodes: make(map[string]*structs.Node),
Events: make([]*MockNodeTrackerEvent, 0, 16),
// RegisterJobs marks the job as being watched
func (m *MockJobWatcher) RegisterJobs(jobs []structs.NamespacedID) {
m.Lock()
defer m.Unlock()
for _, job := range jobs {
m.jobs[job] = struct{}{}
}
}

func (m *MockNodeTracker) TrackedNodes() map[string]*structs.Node {
m.Lock()
defer m.Unlock()
return m.Nodes
// Drain returns the DrainRequest channel. Tests can send on this channel to
// simulate steps through the NodeDrainer watch loop. (Sending on this channel
// will block anywhere else.)
func (m *MockJobWatcher) Drain() <-chan *DrainRequest {
return m.drainCh
}

func (m *MockNodeTracker) Remove(nodeID string) {
m.Lock()
defer m.Unlock()
delete(m.Nodes, nodeID)
m.Events = append(m.Events, &MockNodeTrackerEvent{NodeRemove: nodeID})
// Migrated returns the channel of migrated allocations. Tests can send on this
// channel to simulate steps through the NodeDrainer watch loop. (Sending on
// this channel will block anywhere else.)
func (m *MockJobWatcher) Migrated() <-chan []*structs.Allocation {
return m.migratedCh
}

func (m *MockNodeTracker) Update(node *structs.Node) {
type MockDeadlineNotifier struct {
expiredCh <-chan []string
nodes map[string]struct{}
sync.Mutex
}

// NextBatch returns the channel of expired nodes. Tests can send on this
// channel to simulate timer events in the NodeDrainer watch loop. (Sending on
// this channel will block anywhere else.)
func (m *MockDeadlineNotifier) NextBatch() <-chan []string {
return m.expiredCh
}

// Remove removes the given node from being tracked for a deadline.
func (m *MockDeadlineNotifier) Remove(nodeID string) {
m.Lock()
defer m.Unlock()
m.Nodes[node.ID] = node
m.Events = append(m.Events, &MockNodeTrackerEvent{NodeUpdate: node})
delete(m.nodes, nodeID)
}

func (m *MockNodeTracker) events() []*MockNodeTrackerEvent {
// Watch marks the node as being watched; this mock throws out the timer in lieu
// of manully sending on the channel to avoid racy tests.
func (m *MockDeadlineNotifier) Watch(nodeID string, _ time.Time) {
m.Lock()
defer m.Unlock()
m.nodes[nodeID] = struct{}{}
}

type MockRaftApplierShim struct {
lock sync.Mutex
state *state.StateStore
}

// AllocUpdateDesiredTransition mocks a write to raft as a state store update
func (m *MockRaftApplierShim) AllocUpdateDesiredTransition(
allocs map[string]*structs.DesiredTransition, evals []*structs.Evaluation) (uint64, error) {

m.lock.Lock()
defer m.lock.Unlock()

index, _ := m.state.LatestIndex()
index++
err := m.state.UpdateAllocsDesiredTransitions(structs.MsgTypeTestSetup, index, allocs, evals)
return index, err
}

// NodesDrainComplete mocks a write to raft as a state store update
func (m *MockRaftApplierShim) NodesDrainComplete(
nodes []string, event *structs.NodeEvent) (uint64, error) {

m.lock.Lock()
defer m.lock.Unlock()

index, _ := m.state.LatestIndex()
index++

updates := make(map[string]*structs.DrainUpdate, len(nodes))
nodeEvents := make(map[string]*structs.NodeEvent, len(nodes))
update := &structs.DrainUpdate{}
for _, node := range nodes {
updates[node] = update
if event != nil {
nodeEvents[node] = event
}
}
now := time.Now().Unix()

err := m.state.BatchUpdateNodeDrain(structs.MsgTypeTestSetup, index, now,
updates, nodeEvents)

return index, err
}

func testNodeDrainWatcher(t *testing.T) (*nodeDrainWatcher, *state.StateStore, *NodeDrainer) {
t.Helper()
store := state.TestStateStore(t)
limiter := rate.NewLimiter(100.0, 100)
logger := testlog.HCLogger(t)

drainer := &NodeDrainer{
enabled: false,
logger: logger,
nodes: map[string]*drainingNode{},
jobWatcher: &MockJobWatcher{jobs: map[structs.NamespacedID]struct{}{}},
deadlineNotifier: &MockDeadlineNotifier{nodes: map[string]struct{}{}},
state: store,
queryLimiter: limiter,
raft: &MockRaftApplierShim{state: store},
batcher: allocMigrateBatcher{},
}

return m.Events
w := NewNodeDrainWatcher(context.Background(), limiter, store, logger, drainer)
drainer.nodeWatcher = w
return w, store, drainer
}
2 changes: 1 addition & 1 deletion nomad/drainer/watch_jobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ func NewDrainRequest(allocs []*structs.Allocation) *DrainRequest {
// DrainingJobWatcher is the interface for watching a job drain
type DrainingJobWatcher interface {
// RegisterJob is used to start watching a draining job
RegisterJobs(job []structs.NamespacedID)
RegisterJobs(jobs []structs.NamespacedID)

// Drain is used to emit allocations that should be drained.
Drain() <-chan *DrainRequest
Expand Down
36 changes: 13 additions & 23 deletions nomad/drainer/watch_nodes.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ func (n *NodeDrainer) Remove(nodeID string) {
n.l.Lock()
defer n.l.Unlock()

// TODO test the notifier is updated
// Remove it from being tracked and remove it from the dealiner
delete(n.nodes, nodeID)
n.deadlineNotifier.Remove(nodeID)
Expand All @@ -58,7 +57,6 @@ func (n *NodeDrainer) Update(node *structs.Node) {
draining.Update(node)
}

// TODO test the notifier is updated
if inf, deadline := node.DrainStrategy.DeadlineTime(); !inf {
n.deadlineNotifier.Watch(node.ID, deadline)
} else {
Expand All @@ -67,7 +65,6 @@ func (n *NodeDrainer) Update(node *structs.Node) {
n.deadlineNotifier.Remove(node.ID)
}

// TODO Test this
// Register interest in the draining jobs.
jobs, err := draining.DrainingJobs()
if err != nil {
Expand All @@ -77,8 +74,6 @@ func (n *NodeDrainer) Update(node *structs.Node) {
n.logger.Trace("node has draining jobs on it", "node_id", node.ID, "num_jobs", len(jobs))
n.jobWatcher.RegisterJobs(jobs)

// TODO Test at this layer as well that a node drain on a node without
// allocs immediately gets unmarked as draining
// Check if the node is done such that if an operator drains a node with
// nothing on it we unset drain
done, err := draining.IsDone()
Expand All @@ -88,8 +83,8 @@ func (n *NodeDrainer) Update(node *structs.Node) {
}

if done {
// Node is done draining. Stop remaining system allocs before
// marking node as complete.
// Node is done draining. Stop remaining system allocs before marking
// node as complete.
remaining, err := draining.RemainingAllocs()
if err != nil {
n.logger.Error("error getting remaining allocs on drained node", "node_id", node.ID, "error", err)
Expand Down Expand Up @@ -179,21 +174,26 @@ func (w *nodeDrainWatcher) watch() {
currentNode, tracked := tracked[nodeID]

switch {
// If the node is tracked but not draining, untrack

case tracked && !newDraining:
// If the node is tracked but not draining, untrack
w.tracker.Remove(nodeID)

// If the node is not being tracked but is draining, track
case !tracked && newDraining:
// If the node is not being tracked but is draining, track
w.tracker.Update(node)

// If the node is being tracked but has changed, update:
case tracked && newDraining && !currentNode.DrainStrategy.Equal(node.DrainStrategy):
// If the node is being tracked but has changed, update
w.tracker.Update(node)

default:
// note that down/disconnected nodes are handled the same as any
// other node here, because we don't want to stop draining a
// node that might heartbeat again. The job watcher will let us
// know if we can stop watching the node when all the allocs are
// evicted
}

// TODO(schmichael) handle the case of a lost node
}

for nodeID := range tracked {
Expand All @@ -219,7 +219,7 @@ func (w *nodeDrainWatcher) getNodes(minIndex uint64) (map[string]*structs.Node,
}

// getNodesImpl is used to get nodes from the state store, returning the set of
// nodes and the given index.
// nodes and the current node table index.
func (w *nodeDrainWatcher) getNodesImpl(ws memdb.WatchSet, state *state.StateStore) (interface{}, uint64, error) {
iter, err := state.Nodes(ws)
if err != nil {
Expand All @@ -231,7 +231,6 @@ func (w *nodeDrainWatcher) getNodesImpl(ws memdb.WatchSet, state *state.StateSto
return nil, 0, err
}

var maxIndex uint64 = 0
resp := make(map[string]*structs.Node, 64)
for {
raw := iter.Next()
Expand All @@ -241,15 +240,6 @@ func (w *nodeDrainWatcher) getNodesImpl(ws memdb.WatchSet, state *state.StateSto

node := raw.(*structs.Node)
resp[node.ID] = node
if maxIndex < node.ModifyIndex {
maxIndex = node.ModifyIndex
}
}

// Prefer using the actual max index of affected nodes since it means less
// unblocking
if maxIndex != 0 {
index = maxIndex
}

return resp, index, nil
Expand Down
Loading

0 comments on commit e865333

Please sign in to comment.