Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

core: enforce strict steps for clients reconnect #15808

Merged
merged 13 commits into from
Jan 25, 2023
43 changes: 40 additions & 3 deletions nomad/node_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,11 +157,17 @@ func (n *Node) Register(args *structs.NodeRegisterRequest, reply *structs.NodeUp
return err
}

// Check if the SecretID has been tampered with
if originalNode != nil {
// Check if the SecretID has been tampered with
if args.Node.SecretID != originalNode.SecretID && originalNode.SecretID != "" {
return fmt.Errorf("node secret ID does not match. Not registering node.")
}

// Don't allow the Register method to update the node status. Only the
// UpdateStatus method should be able to do this.
if originalNode.Status != "" {
args.Node.Status = originalNode.Status
}
}

// We have a valid node connection, so add the mapping to cache the
Expand Down Expand Up @@ -486,6 +492,26 @@ func (n *Node) UpdateStatus(args *structs.NodeUpdateStatusRequest, reply *struct
// Update the timestamp of when the node status was updated
args.UpdatedAt = time.Now().Unix()

// Compute next status.
switch node.Status {
case structs.NodeStatusInit:
if args.Status == structs.NodeStatusReady {
allocs, err := snap.AllocsByNodeTerminal(ws, args.NodeID, false)
if err != nil {
return fmt.Errorf("failed to query node allocs: %v", err)
}

allocsUpdated := node.LastAllocUpdateIndex > node.LastMissedHeartbeatIndex
if len(allocs) > 0 && !allocsUpdated {
args.Status = structs.NodeStatusInit
}
}
case structs.NodeStatusDisconnected:
if args.Status == structs.NodeStatusReady {
args.Status = structs.NodeStatusInit
}
}

// Commit this update via Raft
var index uint64
if node.Status != args.Status {
Expand Down Expand Up @@ -1179,8 +1205,8 @@ func (n *Node) UpdateAlloc(args *structs.AllocUpdateRequest, reply *structs.Gene
if node == nil {
return fmt.Errorf("node %s not found", nodeID)
}
if node.Status != structs.NodeStatusReady {
return fmt.Errorf("node %s is %s, not %s", nodeID, node.Status, structs.NodeStatusReady)
if node.UnresponsiveStatus() {
return fmt.Errorf("node %s is not allow to update allocs while in status %s", nodeID, node.Status)
lgfa29 marked this conversation as resolved.
Show resolved Hide resolved
lgfa29 marked this conversation as resolved.
Show resolved Hide resolved
}

// Ensure that evals aren't set from client RPCs
Expand Down Expand Up @@ -1313,6 +1339,17 @@ func (n *Node) UpdateAlloc(args *structs.AllocUpdateRequest, reply *structs.Gene
return err
}

// Update node alloc update index.
copyNode := node.Copy()
copyNode.LastAllocUpdateIndex = future.Index()

_, _, err = n.srv.raftApply(structs.NodeRegisterRequestType, &structs.NodeRegisterRequest{
Node: copyNode,
})
if err != nil {
return fmt.Errorf("node update failed: %v", err)
}
lgfa29 marked this conversation as resolved.
Show resolved Hide resolved

// Setup the response
reply.Index = future.Index()
return nil
Expand Down
196 changes: 179 additions & 17 deletions nomad/node_endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/hashicorp/nomad/testutil"
vapi "github.com/hashicorp/vault/api"
"github.com/kr/pretty"
"github.com/shoenig/test/must"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
Expand Down Expand Up @@ -524,6 +525,171 @@ func TestClientEndpoint_UpdateStatus_Vault(t *testing.T) {
}
}

func TestClientEndpoint_UpdateStatus_Reconnect(t *testing.T) {
ci.Parallel(t)

// Setup server with tighther heartbeat so we don't have to wait so long
lgfa29 marked this conversation as resolved.
Show resolved Hide resolved
// for nodes to go down.
heartbeatTTL := time.Duration(500*testutil.TestMultiplier()) * time.Millisecond
s, cleanupS := TestServer(t, func(c *Config) {
c.MinHeartbeatTTL = heartbeatTTL
c.HeartbeatGrace = 2 * heartbeatTTL
})
codec := rpcClient(t, s)
defer cleanupS()
testutil.WaitForLeader(t, s.RPC)

// Register node.
node := mock.Node()
reg := &structs.NodeRegisterRequest{
Node: node,
WriteRequest: structs.WriteRequest{Region: "global"},
}
var nodeUpdateResp structs.NodeUpdateResponse
err := msgpackrpc.CallWithCodec(codec, "Node.Register", reg, &nodeUpdateResp)
must.NoError(t, err)

// Start heartbeat.
stopHeartbeat := make(chan interface{})
lgfa29 marked this conversation as resolved.
Show resolved Hide resolved
heartbeat := func() {
ticker := time.NewTicker(heartbeatTTL / 2)
tgross marked this conversation as resolved.
Show resolved Hide resolved
for {
select {
case <-stopHeartbeat:
ticker.Stop()
return
case <-ticker.C:
hb := &structs.NodeUpdateStatusRequest{
NodeID: node.ID,
Status: structs.NodeStatusReady,
WriteRequest: structs.WriteRequest{Region: "global"},
}
err := msgpackrpc.CallWithCodec(codec, "Node.UpdateStatus", hb, &nodeUpdateResp)
must.NoError(t, err)
}
}
}
go heartbeat()

// Wait for node to be ready.
testutil.WaitForClientStatus(t, s.RPC, node.ID, "global", structs.NodeStatusReady)

// Register job with max_client_disconnect.
job := mock.Job()
job.Constraints = []*structs.Constraint{}
job.TaskGroups[0].Count = 1
job.TaskGroups[0].MaxClientDisconnect = pointer.Of(time.Hour)
job.TaskGroups[0].Constraints = []*structs.Constraint{}
job.TaskGroups[0].Tasks[0].Driver = "mock_driver"
job.TaskGroups[0].Tasks[0].Config = map[string]interface{}{
"run_for": "10m",
}

jobReq := &structs.JobRegisterRequest{
Job: job,
WriteRequest: structs.WriteRequest{
Region: "global",
Namespace: job.Namespace,
},
}
var jobResp structs.JobRegisterResponse
err = msgpackrpc.CallWithCodec(codec, "Job.Register", jobReq, &jobResp)
must.NoError(t, err)

// Wait for alloc run be pending in the server.
testutil.WaitForJobAllocStatus(t, s.RPC, job, map[string]int{
structs.AllocClientStatusPending: 1,
})

// Get allocs that node should run.
allocsReq := &structs.NodeSpecificRequest{
NodeID: node.ID,
QueryOptions: structs.QueryOptions{
Region: "global",
},
}
var allocsResp structs.NodeAllocsResponse
err = msgpackrpc.CallWithCodec(codec, "Node.GetAllocs", allocsReq, &allocsResp)
must.NoError(t, err)
must.Len(t, 1, allocsResp.Allocs)

// Tell server the alloc is running.
// Save the alloc so we can reuse the request later.
alloc := allocsResp.Allocs[0].Copy()
alloc.ClientStatus = structs.AllocClientStatusRunning

allocUpdateReq := &structs.AllocUpdateRequest{
Alloc: []*structs.Allocation{alloc},
WriteRequest: structs.WriteRequest{
Region: "global",
},
}
var resp structs.GenericResponse
err = msgpackrpc.CallWithCodec(codec, "Node.UpdateAlloc", allocUpdateReq, &resp)
must.NoError(t, err)

// Wait for alloc run be running in the server.
lgfa29 marked this conversation as resolved.
Show resolved Hide resolved
testutil.WaitForJobAllocStatus(t, s.RPC, job, map[string]int{
structs.AllocClientStatusRunning: 1,
})

// Stop heartbeat and wait for the client to be disconnected and the alloc
// to be unknown.
close(stopHeartbeat)
testutil.WaitForClientStatus(t, s.RPC, node.ID, "global", structs.NodeStatusDisconnected)
testutil.WaitForJobAllocStatus(t, s.RPC, job, map[string]int{
structs.AllocClientStatusUnknown: 1,
})

// There should be a pending eval for the alloc replacement.
state := s.fsm.State()
ws := memdb.NewWatchSet()
evals, err := state.EvalsByJob(ws, job.Namespace, job.ID)
found := false
for _, eval := range evals {
if eval.Status == structs.EvalStatusPending {
found = true
break
}
}
must.True(t, found)
lgfa29 marked this conversation as resolved.
Show resolved Hide resolved

// Restart heartbeat to reconnect node.
stopHeartbeat = make(chan interface{})
go heartbeat()

// Wait for node to be initializing.
// It must remain initializing until it updates its allocs with the server
// so the scheduler have the necessary information to avoid unnecessary
// placements by the pending eval.
testutil.WaitForClientStatus(t, s.RPC, node.ID, "global", structs.NodeStatusInit)

// Get allocs that node should run.
// The node should only have one alloc assigned until it updates its allocs
// status with the server.
allocsReq = &structs.NodeSpecificRequest{
NodeID: node.ID,
QueryOptions: structs.QueryOptions{
Region: "global",
},
}
err = msgpackrpc.CallWithCodec(codec, "Node.GetAllocs", allocsReq, &allocsResp)
must.NoError(t, err)
must.Len(t, 1, allocsResp.Allocs)

// Tell server the alloc is running.
err = msgpackrpc.CallWithCodec(codec, "Node.UpdateAlloc", allocUpdateReq, &resp)
must.NoError(t, err)

// Wait for alloc run be running in the server.
testutil.WaitForJobAllocStatus(t, s.RPC, job, map[string]int{
structs.AllocClientStatusRunning: 1,
})
lgfa29 marked this conversation as resolved.
Show resolved Hide resolved

// Wait for the client to be ready.
testutil.WaitForClientStatus(t, s.RPC, node.ID, "global", structs.NodeStatusReady)
}

func TestClientEndpoint_UpdateStatus_HeartbeatRecovery(t *testing.T) {
ci.Parallel(t)
require := require.New(t)
Expand Down Expand Up @@ -639,29 +805,25 @@ func TestClientEndpoint_Register_GetEvals(t *testing.T) {
}

// Transition it to down and then ready
node.Status = structs.NodeStatusDown
reg = &structs.NodeRegisterRequest{
Node: node,
req := &structs.NodeUpdateStatusRequest{
NodeID: node.ID,
Status: structs.NodeStatusDown,
WriteRequest: structs.WriteRequest{Region: "global"},
}

// Fetch the response
if err := msgpackrpc.CallWithCodec(codec, "Node.Register", reg, &resp); err != nil {
if err := msgpackrpc.CallWithCodec(codec, "Node.UpdateStatus", req, &resp); err != nil {
tgross marked this conversation as resolved.
Show resolved Hide resolved
t.Fatalf("err: %v", err)
}

if len(resp.EvalIDs) != 1 {
t.Fatalf("expected one eval; got %#v", resp.EvalIDs)
}

node.Status = structs.NodeStatusReady
reg = &structs.NodeRegisterRequest{
Node: node,
req = &structs.NodeUpdateStatusRequest{
NodeID: node.ID,
Status: structs.NodeStatusReady,
WriteRequest: structs.WriteRequest{Region: "global"},
}

// Fetch the response
if err := msgpackrpc.CallWithCodec(codec, "Node.Register", reg, &resp); err != nil {
if err := msgpackrpc.CallWithCodec(codec, "Node.UpdateStatus", req, &resp); err != nil {
t.Fatalf("err: %v", err)
}

Expand Down Expand Up @@ -1369,12 +1531,12 @@ func TestClientEndpoint_Drain_Down(t *testing.T) {
require.Nil(msgpackrpc.CallWithCodec(codec, "Node.UpdateDrain", dereg, &resp2))

// Mark the node as down
node.Status = structs.NodeStatusDown
reg = &structs.NodeRegisterRequest{
Node: node,
req := &structs.NodeUpdateStatusRequest{
NodeID: node.ID,
Status: structs.NodeStatusDown,
WriteRequest: structs.WriteRequest{Region: "global"},
}
require.Nil(msgpackrpc.CallWithCodec(codec, "Node.Register", reg, &resp))
require.Nil(msgpackrpc.CallWithCodec(codec, "Node.UpdateStatus", req, &resp))

// Ensure that the allocation has transitioned to lost
testutil.WaitForResult(func() (bool, error) {
Expand Down Expand Up @@ -2581,7 +2743,7 @@ func TestClientEndpoint_UpdateAlloc_NodeNotReady(t *testing.T) {
}
var allocUpdateResp structs.NodeAllocsResponse
err = msgpackrpc.CallWithCodec(codec, "Node.UpdateAlloc", allocUpdateReq, &allocUpdateResp)
require.ErrorContains(t, err, "not ready")
require.ErrorContains(t, err, "not allow to update allocs")
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.


// Send request without an explicit node ID.
updatedAlloc.NodeID = ""
Expand Down
20 changes: 20 additions & 0 deletions nomad/state/state_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -909,6 +909,11 @@ func upsertNodeTxn(txn *txn, index uint64, node *structs.Node) error {
node.CreateIndex = exist.CreateIndex
node.ModifyIndex = index

// Update last missed heartbeat if the node became unresponsive.
if !exist.UnresponsiveStatus() && node.UnresponsiveStatus() {
node.LastMissedHeartbeatIndex = index
}

// Retain node events that have already been set on the node
node.Events = exist.Events

Expand All @@ -923,6 +928,16 @@ func upsertNodeTxn(txn *txn, index uint64, node *structs.Node) error {
node.SchedulingEligibility = exist.SchedulingEligibility // Retain the eligibility
node.DrainStrategy = exist.DrainStrategy // Retain the drain strategy
node.LastDrain = exist.LastDrain // Retain the drain metadata

// Retain the last index the node missed a heartbeat.
if node.LastMissedHeartbeatIndex < exist.LastMissedHeartbeatIndex {
node.LastMissedHeartbeatIndex = exist.LastMissedHeartbeatIndex
}

// Retain the last index the node updated its allocs.
if node.LastAllocUpdateIndex < exist.LastAllocUpdateIndex {
node.LastAllocUpdateIndex = exist.LastAllocUpdateIndex
}
} else {
// Because this is the first time the node is being registered, we should
// also create a node registration event
Expand Down Expand Up @@ -1029,6 +1044,11 @@ func (s *StateStore) updateNodeStatusTxn(txn *txn, nodeID, status string, update
copyNode.Status = status
copyNode.ModifyIndex = txn.Index

// Update last missed heartbeat if the node became unresponsive.
if !existingNode.UnresponsiveStatus() && copyNode.UnresponsiveStatus() {
copyNode.LastMissedHeartbeatIndex = txn.Index
}

// Insert the node
if err := txn.Insert("nodes", copyNode); err != nil {
return fmt.Errorf("node update failed: %v", err)
Expand Down
19 changes: 19 additions & 0 deletions nomad/structs/structs.go
Original file line number Diff line number Diff line change
Expand Up @@ -2090,6 +2090,14 @@ type Node struct {
// LastDrain contains metadata about the most recent drain operation
LastDrain *DrainMetadata

// LastMissedHeartbeatIndex stores the Raft index when the node
// last missed a heartbeat.
LastMissedHeartbeatIndex uint64
Copy link
Contributor Author

@lgfa29 lgfa29 Jan 19, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure how accurate this name is. It's more like the first time the node last transition to an unresponsive status, but I couldn't think of a good name for this 😅

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah this name really implies to me that it's going to keep getting updated, whereas it's the index the node became unresponsive. It might be nice if we could clear the value once we're certain the node is live again, and that'd make things a little less confusing at the cost of a little extra logic to check for 0 in the UpdateStatus RPC.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah good point. Resetting it zero may help to indicate that this field is sort edge triggered. I pushed a commit do just that. Thanks!


// LastAllocUpdateIndex stores the Raft index of the last time the node
// updatedd its allocations status.
LastAllocUpdateIndex uint64

// Raft Indexes
CreateIndex uint64
ModifyIndex uint64
Expand Down Expand Up @@ -2184,6 +2192,17 @@ func (n *Node) Copy() *Node {
return &nn
}

// UnresponsiveStatus returns true if the node is a status where it is not
// communicating with the server.
func (n *Node) UnresponsiveStatus() bool {
switch n.Status {
case NodeStatusDown, NodeStatusDisconnected:
return true
default:
return false
}
}

// TerminalStatus returns if the current status is terminal and
// will no longer transition.
func (n *Node) TerminalStatus() bool {
Expand Down
Loading