Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

core: enforce strict steps for clients reconnect #15808

Merged
merged 13 commits into from
Jan 25, 2023
3 changes: 3 additions & 0 deletions .changelog/15808.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```release-note:bug
core: enforce strict steps for clients reconnect
lgfa29 marked this conversation as resolved.
Show resolved Hide resolved
```
43 changes: 40 additions & 3 deletions nomad/node_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,11 +157,17 @@ func (n *Node) Register(args *structs.NodeRegisterRequest, reply *structs.NodeUp
return err
}

// Check if the SecretID has been tampered with
if originalNode != nil {
// Check if the SecretID has been tampered with
if args.Node.SecretID != originalNode.SecretID && originalNode.SecretID != "" {
return fmt.Errorf("node secret ID does not match. Not registering node.")
}

// Don't allow the Register method to update the node status. Only the
// UpdateStatus method should be able to do this.
if originalNode.Status != "" {
args.Node.Status = originalNode.Status
}
}

// We have a valid node connection, so add the mapping to cache the
Expand Down Expand Up @@ -486,6 +492,26 @@ func (n *Node) UpdateStatus(args *structs.NodeUpdateStatusRequest, reply *struct
// Update the timestamp of when the node status was updated
args.UpdatedAt = time.Now().Unix()

// Compute next status.
switch node.Status {
case structs.NodeStatusInit:
if args.Status == structs.NodeStatusReady {
allocs, err := snap.AllocsByNodeTerminal(ws, args.NodeID, false)
if err != nil {
return fmt.Errorf("failed to query node allocs: %v", err)
}

allocsUpdated := node.LastAllocUpdateIndex > node.LastMissedHeartbeatIndex
if len(allocs) > 0 && !allocsUpdated {
args.Status = structs.NodeStatusInit
}
}
case structs.NodeStatusDisconnected:
if args.Status == structs.NodeStatusReady {
args.Status = structs.NodeStatusInit
}
}

// Commit this update via Raft
var index uint64
if node.Status != args.Status {
Expand Down Expand Up @@ -1179,8 +1205,8 @@ func (n *Node) UpdateAlloc(args *structs.AllocUpdateRequest, reply *structs.Gene
if node == nil {
return fmt.Errorf("node %s not found", nodeID)
}
if node.Status != structs.NodeStatusReady {
return fmt.Errorf("node %s is %s, not %s", nodeID, node.Status, structs.NodeStatusReady)
if node.UnresponsiveStatus() {
return fmt.Errorf("node %s is not allow to update allocs while in status %s", nodeID, node.Status)
lgfa29 marked this conversation as resolved.
Show resolved Hide resolved
lgfa29 marked this conversation as resolved.
Show resolved Hide resolved
}

// Ensure that evals aren't set from client RPCs
Expand Down Expand Up @@ -1313,6 +1339,17 @@ func (n *Node) UpdateAlloc(args *structs.AllocUpdateRequest, reply *structs.Gene
return err
}

// Update node alloc update index.
copyNode := node.Copy()
copyNode.LastAllocUpdateIndex = future.Index()

_, _, err = n.srv.raftApply(structs.NodeRegisterRequestType, &structs.NodeRegisterRequest{
Node: copyNode,
})
if err != nil {
return fmt.Errorf("node update failed: %v", err)
}
lgfa29 marked this conversation as resolved.
Show resolved Hide resolved

// Setup the response
reply.Index = future.Index()
return nil
Expand Down
206 changes: 189 additions & 17 deletions nomad/node_endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/hashicorp/nomad/testutil"
vapi "github.com/hashicorp/vault/api"
"github.com/kr/pretty"
"github.com/shoenig/test/must"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
Expand Down Expand Up @@ -524,6 +525,181 @@ func TestClientEndpoint_UpdateStatus_Vault(t *testing.T) {
}
}

func TestClientEndpoint_UpdateStatus_Reconnect(t *testing.T) {
ci.Parallel(t)

// Setup server with tighther heartbeat so we don't have to wait so long
lgfa29 marked this conversation as resolved.
Show resolved Hide resolved
// for nodes to go down.
heartbeatTTL := time.Duration(500*testutil.TestMultiplier()) * time.Millisecond
s, cleanupS := TestServer(t, func(c *Config) {
c.MinHeartbeatTTL = heartbeatTTL
c.HeartbeatGrace = 2 * heartbeatTTL
})
codec := rpcClient(t, s)
defer cleanupS()
testutil.WaitForLeader(t, s.RPC)

// Register node.
node := mock.Node()
reg := &structs.NodeRegisterRequest{
Node: node,
WriteRequest: structs.WriteRequest{Region: "global"},
}
var nodeUpdateResp structs.NodeUpdateResponse
err := msgpackrpc.CallWithCodec(codec, "Node.Register", reg, &nodeUpdateResp)
must.NoError(t, err)

// Start heartbeat.
stopHeartbeat := make(chan interface{})
lgfa29 marked this conversation as resolved.
Show resolved Hide resolved
heartbeat := func() {
ticker := time.NewTicker(heartbeatTTL / 2)
tgross marked this conversation as resolved.
Show resolved Hide resolved
for {
select {
case <-stopHeartbeat:
ticker.Stop()
return
case <-ticker.C:
if t.Failed() {
return
}

req := &structs.NodeUpdateStatusRequest{
NodeID: node.ID,
Status: structs.NodeStatusReady,
WriteRequest: structs.WriteRequest{Region: "global"},
}
var resp structs.NodeUpdateResponse
// Ignore errors since an unexpected failed hearbeat will cause
lgfa29 marked this conversation as resolved.
Show resolved Hide resolved
// the test conditions to fail.
msgpackrpc.CallWithCodec(codec, "Node.UpdateStatus", req, &resp)
}
}
}
go heartbeat()

// Wait for node to be ready.
testutil.WaitForClientStatus(t, s.RPC, node.ID, "global", structs.NodeStatusReady)

// Register job with max_client_disconnect.
job := mock.Job()
job.Constraints = []*structs.Constraint{}
job.TaskGroups[0].Count = 1
job.TaskGroups[0].MaxClientDisconnect = pointer.Of(time.Hour)
job.TaskGroups[0].Constraints = []*structs.Constraint{}
job.TaskGroups[0].Tasks[0].Driver = "mock_driver"
job.TaskGroups[0].Tasks[0].Config = map[string]interface{}{
"run_for": "10m",
}

jobReq := &structs.JobRegisterRequest{
Job: job,
WriteRequest: structs.WriteRequest{
Region: "global",
Namespace: job.Namespace,
},
}
var jobResp structs.JobRegisterResponse
err = msgpackrpc.CallWithCodec(codec, "Job.Register", jobReq, &jobResp)
must.NoError(t, err)

// Wait for alloc run be pending in the server.
testutil.WaitForJobAllocStatus(t, s.RPC, job, map[string]int{
structs.AllocClientStatusPending: 1,
})

// Get allocs that node should run.
allocsReq := &structs.NodeSpecificRequest{
NodeID: node.ID,
QueryOptions: structs.QueryOptions{
Region: "global",
},
}
var allocsResp structs.NodeAllocsResponse
err = msgpackrpc.CallWithCodec(codec, "Node.GetAllocs", allocsReq, &allocsResp)
must.NoError(t, err)
must.Len(t, 1, allocsResp.Allocs)

// Tell server the alloc is running.
// Save the alloc so we can reuse the request later.
alloc := allocsResp.Allocs[0].Copy()
alloc.ClientStatus = structs.AllocClientStatusRunning

allocUpdateReq := &structs.AllocUpdateRequest{
Alloc: []*structs.Allocation{alloc},
WriteRequest: structs.WriteRequest{
Region: "global",
},
}
var resp structs.GenericResponse
err = msgpackrpc.CallWithCodec(codec, "Node.UpdateAlloc", allocUpdateReq, &resp)
must.NoError(t, err)

// Wait for alloc run be running in the server.
lgfa29 marked this conversation as resolved.
Show resolved Hide resolved
testutil.WaitForJobAllocStatus(t, s.RPC, job, map[string]int{
structs.AllocClientStatusRunning: 1,
})

// Stop heartbeat and wait for the client to be disconnected and the alloc
// to be unknown.
close(stopHeartbeat)
testutil.WaitForClientStatus(t, s.RPC, node.ID, "global", structs.NodeStatusDisconnected)
testutil.WaitForJobAllocStatus(t, s.RPC, job, map[string]int{
structs.AllocClientStatusUnknown: 1,
})

// There should be a pending eval for the alloc replacement.
state := s.fsm.State()
ws := memdb.NewWatchSet()
evals, err := state.EvalsByJob(ws, job.Namespace, job.ID)
found := false
for _, eval := range evals {
if eval.Status == structs.EvalStatusPending {
found = true
break
}
}
must.True(t, found)
lgfa29 marked this conversation as resolved.
Show resolved Hide resolved

// Restart heartbeat to reconnect node.
stopHeartbeat = make(chan interface{})
go heartbeat()

// Wait for node to be initializing.
// It must remain initializing until it updates its allocs with the server
// so the scheduler have the necessary information to avoid unnecessary
// placements by the pending eval.
testutil.WaitForClientStatus(t, s.RPC, node.ID, "global", structs.NodeStatusInit)

// Get allocs that node should run.
// The node should only have one alloc assigned until it updates its allocs
// status with the server.
allocsReq = &structs.NodeSpecificRequest{
NodeID: node.ID,
QueryOptions: structs.QueryOptions{
Region: "global",
},
}
err = msgpackrpc.CallWithCodec(codec, "Node.GetAllocs", allocsReq, &allocsResp)
must.NoError(t, err)
must.Len(t, 1, allocsResp.Allocs)

// Tell server the alloc is running.
err = msgpackrpc.CallWithCodec(codec, "Node.UpdateAlloc", allocUpdateReq, &resp)
must.NoError(t, err)

// Wait for alloc run be running in the server.
testutil.WaitForJobAllocStatus(t, s.RPC, job, map[string]int{
structs.AllocClientStatusRunning: 1,
})
lgfa29 marked this conversation as resolved.
Show resolved Hide resolved

// Wait for the client to be ready.
testutil.WaitForClientStatus(t, s.RPC, node.ID, "global", structs.NodeStatusReady)

// Cleanup heartbeat goroutine before exiting.
close(stopHeartbeat)
testutil.WaitForClientStatus(t, s.RPC, node.ID, "global", structs.NodeStatusDisconnected)
}

func TestClientEndpoint_UpdateStatus_HeartbeatRecovery(t *testing.T) {
ci.Parallel(t)
require := require.New(t)
Expand Down Expand Up @@ -639,29 +815,25 @@ func TestClientEndpoint_Register_GetEvals(t *testing.T) {
}

// Transition it to down and then ready
node.Status = structs.NodeStatusDown
reg = &structs.NodeRegisterRequest{
Node: node,
req := &structs.NodeUpdateStatusRequest{
NodeID: node.ID,
Status: structs.NodeStatusDown,
WriteRequest: structs.WriteRequest{Region: "global"},
}

// Fetch the response
if err := msgpackrpc.CallWithCodec(codec, "Node.Register", reg, &resp); err != nil {
if err := msgpackrpc.CallWithCodec(codec, "Node.UpdateStatus", req, &resp); err != nil {
tgross marked this conversation as resolved.
Show resolved Hide resolved
t.Fatalf("err: %v", err)
}

if len(resp.EvalIDs) != 1 {
t.Fatalf("expected one eval; got %#v", resp.EvalIDs)
}

node.Status = structs.NodeStatusReady
reg = &structs.NodeRegisterRequest{
Node: node,
req = &structs.NodeUpdateStatusRequest{
NodeID: node.ID,
Status: structs.NodeStatusReady,
WriteRequest: structs.WriteRequest{Region: "global"},
}

// Fetch the response
if err := msgpackrpc.CallWithCodec(codec, "Node.Register", reg, &resp); err != nil {
if err := msgpackrpc.CallWithCodec(codec, "Node.UpdateStatus", req, &resp); err != nil {
t.Fatalf("err: %v", err)
}

Expand Down Expand Up @@ -1369,12 +1541,12 @@ func TestClientEndpoint_Drain_Down(t *testing.T) {
require.Nil(msgpackrpc.CallWithCodec(codec, "Node.UpdateDrain", dereg, &resp2))

// Mark the node as down
node.Status = structs.NodeStatusDown
reg = &structs.NodeRegisterRequest{
Node: node,
req := &structs.NodeUpdateStatusRequest{
NodeID: node.ID,
Status: structs.NodeStatusDown,
WriteRequest: structs.WriteRequest{Region: "global"},
}
require.Nil(msgpackrpc.CallWithCodec(codec, "Node.Register", reg, &resp))
require.Nil(msgpackrpc.CallWithCodec(codec, "Node.UpdateStatus", req, &resp))

// Ensure that the allocation has transitioned to lost
testutil.WaitForResult(func() (bool, error) {
Expand Down Expand Up @@ -2581,7 +2753,7 @@ func TestClientEndpoint_UpdateAlloc_NodeNotReady(t *testing.T) {
}
var allocUpdateResp structs.NodeAllocsResponse
err = msgpackrpc.CallWithCodec(codec, "Node.UpdateAlloc", allocUpdateReq, &allocUpdateResp)
require.ErrorContains(t, err, "not ready")
require.ErrorContains(t, err, "not allow to update allocs")
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.


// Send request without an explicit node ID.
updatedAlloc.NodeID = ""
Expand Down
20 changes: 20 additions & 0 deletions nomad/state/state_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -909,6 +909,11 @@ func upsertNodeTxn(txn *txn, index uint64, node *structs.Node) error {
node.CreateIndex = exist.CreateIndex
node.ModifyIndex = index

// Update last missed heartbeat if the node became unresponsive.
if !exist.UnresponsiveStatus() && node.UnresponsiveStatus() {
node.LastMissedHeartbeatIndex = index
}

// Retain node events that have already been set on the node
node.Events = exist.Events

Expand All @@ -923,6 +928,16 @@ func upsertNodeTxn(txn *txn, index uint64, node *structs.Node) error {
node.SchedulingEligibility = exist.SchedulingEligibility // Retain the eligibility
node.DrainStrategy = exist.DrainStrategy // Retain the drain strategy
node.LastDrain = exist.LastDrain // Retain the drain metadata

// Retain the last index the node missed a heartbeat.
if node.LastMissedHeartbeatIndex < exist.LastMissedHeartbeatIndex {
node.LastMissedHeartbeatIndex = exist.LastMissedHeartbeatIndex
}

// Retain the last index the node updated its allocs.
if node.LastAllocUpdateIndex < exist.LastAllocUpdateIndex {
node.LastAllocUpdateIndex = exist.LastAllocUpdateIndex
}
} else {
// Because this is the first time the node is being registered, we should
// also create a node registration event
Expand Down Expand Up @@ -1029,6 +1044,11 @@ func (s *StateStore) updateNodeStatusTxn(txn *txn, nodeID, status string, update
copyNode.Status = status
copyNode.ModifyIndex = txn.Index

// Update last missed heartbeat if the node became unresponsive.
if !existingNode.UnresponsiveStatus() && copyNode.UnresponsiveStatus() {
copyNode.LastMissedHeartbeatIndex = txn.Index
}

// Insert the node
if err := txn.Insert("nodes", copyNode); err != nil {
return fmt.Errorf("node update failed: %v", err)
Expand Down
Loading