Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

core: enforce strict steps for clients reconnect #15808

Merged
merged 13 commits into from
Jan 25, 2023
3 changes: 3 additions & 0 deletions .changelog/15808.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```release-note:bug
core: enforce strict ordering that node status updates are recorded after allocation updates for reconnecting clients
```
1 change: 1 addition & 0 deletions nomad/fsm_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1219,6 +1219,7 @@ func TestFSM_UpdateAllocFromClient_Unblock(t *testing.T) {
clientAlloc.ClientStatus = structs.AllocClientStatusComplete
update2 := &structs.Allocation{
ID: alloc2.ID,
NodeID: alloc2.NodeID,
ClientStatus: structs.AllocClientStatusRunning,
}

Expand Down
58 changes: 52 additions & 6 deletions nomad/node_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,11 +157,17 @@ func (n *Node) Register(args *structs.NodeRegisterRequest, reply *structs.NodeUp
return err
}

// Check if the SecretID has been tampered with
if originalNode != nil {
// Check if the SecretID has been tampered with
if args.Node.SecretID != originalNode.SecretID && originalNode.SecretID != "" {
return fmt.Errorf("node secret ID does not match. Not registering node.")
}

// Don't allow the Register method to update the node status. Only the
// UpdateStatus method should be able to do this.
if originalNode.Status != "" {
args.Node.Status = originalNode.Status
}
}

// We have a valid node connection, so add the mapping to cache the
Expand Down Expand Up @@ -433,7 +439,24 @@ func (n *Node) deregister(args *structs.NodeBatchDeregisterRequest,
return nil
}

// UpdateStatus is used to update the status of a client node
// UpdateStatus is used to update the status of a client node.
//
// Clients with non-terminal allocations must first call UpdateAlloc to be able
// to transition from the initializing status to ready.
//
// ┌────────────────────────────────────── No ───┐
// │ │
// ┌──▼───┐ ┌─────────────┐ ┌────────┴────────┐
// ── Register ─► init ├─ ready ──► Has allocs? ├─ Yes ─► Allocs updated? │
// └──▲───┘ └─────┬───────┘ └────────┬────────┘
// │ │ │
// ready └─ No ─┐ ┌─────── Yes ──┘
// │ │ │
// ┌──────┴───────┐ ┌──▼──▼─┐ ┌──────┐
// │ disconnected ◄─ disconnected ─┤ ready ├─ down ──► down │
// └──────────────┘ └───▲───┘ └──┬───┘
// │ │
// └──── ready ─────┘
lgfa29 marked this conversation as resolved.
Show resolved Hide resolved
func (n *Node) UpdateStatus(args *structs.NodeUpdateStatusRequest, reply *structs.NodeUpdateResponse) error {
isForwarded := args.IsForwarded()
if done, err := n.srv.forward("Node.UpdateStatus", args, args, reply); done {
Expand Down Expand Up @@ -486,6 +509,26 @@ func (n *Node) UpdateStatus(args *structs.NodeUpdateStatusRequest, reply *struct
// Update the timestamp of when the node status was updated
args.UpdatedAt = time.Now().Unix()

// Compute next status.
switch node.Status {
case structs.NodeStatusInit:
if args.Status == structs.NodeStatusReady {
allocs, err := snap.AllocsByNodeTerminal(ws, args.NodeID, false)
if err != nil {
return fmt.Errorf("failed to query node allocs: %v", err)
}

allocsUpdated := node.LastAllocUpdateIndex > node.LastMissedHeartbeatIndex
if len(allocs) > 0 && !allocsUpdated {
args.Status = structs.NodeStatusInit
}
}
case structs.NodeStatusDisconnected:
if args.Status == structs.NodeStatusReady {
args.Status = structs.NodeStatusInit
}
}

// Commit this update via Raft
var index uint64
if node.Status != args.Status {
Expand Down Expand Up @@ -1146,8 +1189,11 @@ func (n *Node) GetClientAllocs(args *structs.NodeSpecificRequest,
// UpdateAlloc is used to update the client status of an allocation. It should
// only be called by clients.
//
// Clients must first register and heartbeat successfully before they are able
// to call this method.
// Calling this method returns an error when:
// - The node is not registered in the server yet. Clients must first call the
// Register method.
// - The node status is down or disconnected. Clients must call the
// UpdateStatus method to update its status in the server.
func (n *Node) UpdateAlloc(args *structs.AllocUpdateRequest, reply *structs.GenericResponse) error {
// Ensure the connection was initiated by another client if TLS is used.
err := validateTLSCertificateLevel(n.srv, n.ctx, tlsCertificateLevelClient)
Expand Down Expand Up @@ -1179,8 +1225,8 @@ func (n *Node) UpdateAlloc(args *structs.AllocUpdateRequest, reply *structs.Gene
if node == nil {
return fmt.Errorf("node %s not found", nodeID)
}
if node.Status != structs.NodeStatusReady {
return fmt.Errorf("node %s is %s, not %s", nodeID, node.Status, structs.NodeStatusReady)
if node.UnresponsiveStatus() {
return fmt.Errorf("node %s is not allowed to update allocs while in status %s", nodeID, node.Status)
}

// Ensure that evals aren't set from client RPCs
Expand Down
219 changes: 202 additions & 17 deletions nomad/node_endpoint_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package nomad

import (
"context"
"errors"
"fmt"
"net"
Expand All @@ -23,6 +24,7 @@ import (
"github.com/hashicorp/nomad/testutil"
vapi "github.com/hashicorp/vault/api"
"github.com/kr/pretty"
"github.com/shoenig/test/must"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
Expand Down Expand Up @@ -524,6 +526,193 @@ func TestClientEndpoint_UpdateStatus_Vault(t *testing.T) {
}
}

func TestClientEndpoint_UpdateStatus_Reconnect(t *testing.T) {
ci.Parallel(t)

// Setup server with tighter heartbeat so we don't have to wait so long
// for nodes to go down.
heartbeatTTL := time.Duration(500*testutil.TestMultiplier()) * time.Millisecond
s, cleanupS := TestServer(t, func(c *Config) {
c.MinHeartbeatTTL = heartbeatTTL
c.HeartbeatGrace = 2 * heartbeatTTL
})
codec := rpcClient(t, s)
defer cleanupS()
testutil.WaitForLeader(t, s.RPC)

// Register node.
node := mock.Node()
reg := &structs.NodeRegisterRequest{
Node: node,
WriteRequest: structs.WriteRequest{Region: "global"},
}
var nodeUpdateResp structs.NodeUpdateResponse
err := msgpackrpc.CallWithCodec(codec, "Node.Register", reg, &nodeUpdateResp)
must.NoError(t, err)

// Start heartbeat.
heartbeat := func(ctx context.Context) {
ticker := time.NewTicker(heartbeatTTL / 2)
tgross marked this conversation as resolved.
Show resolved Hide resolved
defer ticker.Stop()

for {
select {
case <-ctx.Done():
return
case <-ticker.C:
if t.Failed() {
return
}

req := &structs.NodeUpdateStatusRequest{
NodeID: node.ID,
Status: structs.NodeStatusReady,
WriteRequest: structs.WriteRequest{Region: "global"},
}
var resp structs.NodeUpdateResponse
// Ignore errors since an unexpected failed heartbeat will cause
// the test conditions to fail.
msgpackrpc.CallWithCodec(codec, "Node.UpdateStatus", req, &resp)
}
}
}
heartbeatCtx, cancelHeartbeat := context.WithCancel(context.Background())
defer cancelHeartbeat()
go heartbeat(heartbeatCtx)

// Wait for node to be ready.
testutil.WaitForClientStatus(t, s.RPC, node.ID, "global", structs.NodeStatusReady)

// Register job with max_client_disconnect.
job := mock.Job()
job.Constraints = []*structs.Constraint{}
job.TaskGroups[0].Count = 1
job.TaskGroups[0].MaxClientDisconnect = pointer.Of(time.Hour)
job.TaskGroups[0].Constraints = []*structs.Constraint{}
job.TaskGroups[0].Tasks[0].Driver = "mock_driver"
job.TaskGroups[0].Tasks[0].Config = map[string]interface{}{
"run_for": "10m",
}

jobReq := &structs.JobRegisterRequest{
Job: job,
WriteRequest: structs.WriteRequest{
Region: "global",
Namespace: job.Namespace,
},
}
var jobResp structs.JobRegisterResponse
err = msgpackrpc.CallWithCodec(codec, "Job.Register", jobReq, &jobResp)
must.NoError(t, err)

// Wait for alloc to be pending in the server.
testutil.WaitForJobAllocStatus(t, s.RPC, job, map[string]int{
structs.AllocClientStatusPending: 1,
})

// Get allocs that node should run.
allocsReq := &structs.NodeSpecificRequest{
NodeID: node.ID,
QueryOptions: structs.QueryOptions{
Region: "global",
},
}
var allocsResp structs.NodeAllocsResponse
err = msgpackrpc.CallWithCodec(codec, "Node.GetAllocs", allocsReq, &allocsResp)
must.NoError(t, err)
must.Len(t, 1, allocsResp.Allocs)

// Tell server the alloc is running.
// Save the alloc so we can reuse the request later.
alloc := allocsResp.Allocs[0].Copy()
alloc.ClientStatus = structs.AllocClientStatusRunning

allocUpdateReq := &structs.AllocUpdateRequest{
Alloc: []*structs.Allocation{alloc},
WriteRequest: structs.WriteRequest{
Region: "global",
},
}
var resp structs.GenericResponse
err = msgpackrpc.CallWithCodec(codec, "Node.UpdateAlloc", allocUpdateReq, &resp)
must.NoError(t, err)

// Wait for alloc to be running in the server.
testutil.WaitForJobAllocStatus(t, s.RPC, job, map[string]int{
structs.AllocClientStatusRunning: 1,
})

// Stop heartbeat and wait for the client to be disconnected and the alloc
// to be unknown.
cancelHeartbeat()
testutil.WaitForClientStatus(t, s.RPC, node.ID, "global", structs.NodeStatusDisconnected)
testutil.WaitForJobAllocStatus(t, s.RPC, job, map[string]int{
structs.AllocClientStatusUnknown: 1,
})

// Restart heartbeat to reconnect node.
heartbeatCtx, cancelHeartbeat = context.WithCancel(context.Background())
defer cancelHeartbeat()
go heartbeat(heartbeatCtx)

// Wait a few heartbeats and check that the node is still initializing.
//
// The heartbeat should not update the node to ready until it updates its
// allocs status with the server so the scheduler have the necessary
// information to avoid unnecessary placements.
time.Sleep(3 * heartbeatTTL)
testutil.WaitForClientStatus(t, s.RPC, node.ID, "global", structs.NodeStatusInit)

// Get allocs that node should run.
// The node should only have one alloc assigned until it updates its allocs
// status with the server.
allocsReq = &structs.NodeSpecificRequest{
NodeID: node.ID,
QueryOptions: structs.QueryOptions{
Region: "global",
},
}
err = msgpackrpc.CallWithCodec(codec, "Node.GetAllocs", allocsReq, &allocsResp)
must.NoError(t, err)
must.Len(t, 1, allocsResp.Allocs)

// Tell server the alloc is still running.
err = msgpackrpc.CallWithCodec(codec, "Node.UpdateAlloc", allocUpdateReq, &resp)
must.NoError(t, err)

// The client must end in the same state as before it disconnected:
// - client status is ready.
// - only 1 alloc and the alloc is running.
// - all evals are terminal, so cluster is in a stable state.
testutil.WaitForClientStatus(t, s.RPC, node.ID, "global", structs.NodeStatusReady)
testutil.WaitForJobAllocStatus(t, s.RPC, job, map[string]int{
structs.AllocClientStatusRunning: 1,
})
testutil.WaitForResult(func() (bool, error) {
state := s.fsm.State()
ws := memdb.NewWatchSet()
evals, err := state.EvalsByJob(ws, job.Namespace, job.ID)
if err != nil {
return false, fmt.Errorf("failed to read evals: %v", err)
}
for _, eval := range evals {
// TODO: remove this check once the disconnect process stops
// leaking a max-disconnect-timeout eval.
// https://github.com/hashicorp/nomad/issues/12809
if eval.TriggeredBy == structs.EvalTriggerMaxDisconnectTimeout {
continue
}

if !eval.TerminalStatus() {
return false, fmt.Errorf("found %s eval", eval.Status)
}
}
return true, nil
}, func(err error) {
must.NoError(t, err)
})
}

func TestClientEndpoint_UpdateStatus_HeartbeatRecovery(t *testing.T) {
ci.Parallel(t)
require := require.New(t)
Expand Down Expand Up @@ -639,29 +828,25 @@ func TestClientEndpoint_Register_GetEvals(t *testing.T) {
}

// Transition it to down and then ready
node.Status = structs.NodeStatusDown
reg = &structs.NodeRegisterRequest{
Node: node,
req := &structs.NodeUpdateStatusRequest{
NodeID: node.ID,
Status: structs.NodeStatusDown,
WriteRequest: structs.WriteRequest{Region: "global"},
}

// Fetch the response
if err := msgpackrpc.CallWithCodec(codec, "Node.Register", reg, &resp); err != nil {
if err := msgpackrpc.CallWithCodec(codec, "Node.UpdateStatus", req, &resp); err != nil {
tgross marked this conversation as resolved.
Show resolved Hide resolved
t.Fatalf("err: %v", err)
}

if len(resp.EvalIDs) != 1 {
t.Fatalf("expected one eval; got %#v", resp.EvalIDs)
}

node.Status = structs.NodeStatusReady
reg = &structs.NodeRegisterRequest{
Node: node,
req = &structs.NodeUpdateStatusRequest{
NodeID: node.ID,
Status: structs.NodeStatusReady,
WriteRequest: structs.WriteRequest{Region: "global"},
}

// Fetch the response
if err := msgpackrpc.CallWithCodec(codec, "Node.Register", reg, &resp); err != nil {
if err := msgpackrpc.CallWithCodec(codec, "Node.UpdateStatus", req, &resp); err != nil {
t.Fatalf("err: %v", err)
}

Expand Down Expand Up @@ -1369,12 +1554,12 @@ func TestClientEndpoint_Drain_Down(t *testing.T) {
require.Nil(msgpackrpc.CallWithCodec(codec, "Node.UpdateDrain", dereg, &resp2))

// Mark the node as down
node.Status = structs.NodeStatusDown
reg = &structs.NodeRegisterRequest{
Node: node,
req := &structs.NodeUpdateStatusRequest{
NodeID: node.ID,
Status: structs.NodeStatusDown,
WriteRequest: structs.WriteRequest{Region: "global"},
}
require.Nil(msgpackrpc.CallWithCodec(codec, "Node.Register", reg, &resp))
require.Nil(msgpackrpc.CallWithCodec(codec, "Node.UpdateStatus", req, &resp))

// Ensure that the allocation has transitioned to lost
testutil.WaitForResult(func() (bool, error) {
Expand Down Expand Up @@ -2581,7 +2766,7 @@ func TestClientEndpoint_UpdateAlloc_NodeNotReady(t *testing.T) {
}
var allocUpdateResp structs.NodeAllocsResponse
err = msgpackrpc.CallWithCodec(codec, "Node.UpdateAlloc", allocUpdateReq, &allocUpdateResp)
require.ErrorContains(t, err, "not ready")
require.ErrorContains(t, err, "not allowed to update allocs")

// Send request without an explicit node ID.
updatedAlloc.NodeID = ""
Expand Down
Loading