Skip to content

Commit

Permalink
core: enforce strict steps for clients reconnect (#15808)
Browse files Browse the repository at this point in the history
When a Nomad client that is running an allocation with
`max_client_disconnect` set misses a heartbeat the Nomad server will
update its status to `disconnected`.

Upon reconnecting, the client will make three main RPC calls:

- `Node.UpdateStatus` is used to set the client status to `ready`.
- `Node.UpdateAlloc` is used to update the client-side information about
  allocations, such as their `ClientStatus`, task states etc.
- `Node.Register` is used to upsert the entire node information,
  including its status.

These calls are made concurrently and are also running in parallel with
the scheduler. Depending on the order they run the scheduler may end up
with incomplete data when reconciling allocations.

For example, a client disconnects and its replacement allocation cannot
be placed anywhere else, so there's a pending eval waiting for
resources.

When this client comes back the order of events may be:

1. Client calls `Node.UpdateStatus` and is now `ready`.
2. Scheduler reconciles allocations and places the replacement alloc to
   the client. The client is now assigned two allocations: the original
   alloc that is still `unknown` and the replacement that is `pending`.
3. Client calls `Node.UpdateAlloc` and updates the original alloc to
   `running`.
4. Scheduler notices too many allocs and stops the replacement.

This creates unnecessary placements or, in a different order of events,
may leave the job without any allocations running until the whole state
is updated and reconciled.

To avoid problems like this clients must update _all_ of its relevant
information before they can be considered `ready` and available for
scheduling.

To achieve this goal the RPC endpoints mentioned above have been
modified to enforce strict steps for nodes reconnecting:

- `Node.Register` does not set the client status anymore.
- `Node.UpdateStatus` sets the reconnecting client to the `initializing`
  status until it successfully calls `Node.UpdateAlloc`.

These changes are done server-side to avoid the need of additional
coordination between clients and servers. Clients are kept oblivious of
these changes and will keep making these calls as they normally would.

The verification of whether allocations have been updates is done by
storing and comparing the Raft index of the last time the client missed
a heartbeat and the last time it updated its allocations.
  • Loading branch information
lgfa29 authored Jan 25, 2023
1 parent 11af125 commit 2659757
Show file tree
Hide file tree
Showing 8 changed files with 708 additions and 198 deletions.
3 changes: 3 additions & 0 deletions .changelog/15808.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```release-note:bug
core: enforce strict ordering that node status updates are recorded after allocation updates for reconnecting clients
```
1 change: 1 addition & 0 deletions nomad/fsm_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1219,6 +1219,7 @@ func TestFSM_UpdateAllocFromClient_Unblock(t *testing.T) {
clientAlloc.ClientStatus = structs.AllocClientStatusComplete
update2 := &structs.Allocation{
ID: alloc2.ID,
NodeID: alloc2.NodeID,
ClientStatus: structs.AllocClientStatusRunning,
}

Expand Down
58 changes: 52 additions & 6 deletions nomad/node_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,11 +165,17 @@ func (n *Node) Register(args *structs.NodeRegisterRequest, reply *structs.NodeUp
return err
}

// Check if the SecretID has been tampered with
if originalNode != nil {
// Check if the SecretID has been tampered with
if args.Node.SecretID != originalNode.SecretID && originalNode.SecretID != "" {
return fmt.Errorf("node secret ID does not match. Not registering node.")
}

// Don't allow the Register method to update the node status. Only the
// UpdateStatus method should be able to do this.
if originalNode.Status != "" {
args.Node.Status = originalNode.Status
}
}

// We have a valid node connection, so add the mapping to cache the
Expand Down Expand Up @@ -449,7 +455,24 @@ func (n *Node) deregister(args *structs.NodeBatchDeregisterRequest,
return nil
}

// UpdateStatus is used to update the status of a client node
// UpdateStatus is used to update the status of a client node.
//
// Clients with non-terminal allocations must first call UpdateAlloc to be able
// to transition from the initializing status to ready.
//
// ┌────────────────────────────────────── No ───┐
// │ │
// ┌──▼───┐ ┌─────────────┐ ┌────────┴────────┐
// ── Register ─► init ├─ ready ──► Has allocs? ├─ Yes ─► Allocs updated? │
// └──▲───┘ └─────┬───────┘ └────────┬────────┘
// │ │ │
// ready └─ No ─┐ ┌─────── Yes ──┘
// │ │ │
// ┌──────┴───────┐ ┌──▼──▼─┐ ┌──────┐
// │ disconnected ◄─ disconnected ─┤ ready ├─ down ──► down │
// └──────────────┘ └───▲───┘ └──┬───┘
// │ │
// └──── ready ─────┘
func (n *Node) UpdateStatus(args *structs.NodeUpdateStatusRequest, reply *structs.NodeUpdateResponse) error {
authErr := n.srv.Authenticate(n.ctx, args)

Expand Down Expand Up @@ -508,6 +531,26 @@ func (n *Node) UpdateStatus(args *structs.NodeUpdateStatusRequest, reply *struct
// Update the timestamp of when the node status was updated
args.UpdatedAt = time.Now().Unix()

// Compute next status.
switch node.Status {
case structs.NodeStatusInit:
if args.Status == structs.NodeStatusReady {
allocs, err := snap.AllocsByNodeTerminal(ws, args.NodeID, false)
if err != nil {
return fmt.Errorf("failed to query node allocs: %v", err)
}

allocsUpdated := node.LastAllocUpdateIndex > node.LastMissedHeartbeatIndex
if len(allocs) > 0 && !allocsUpdated {
args.Status = structs.NodeStatusInit
}
}
case structs.NodeStatusDisconnected:
if args.Status == structs.NodeStatusReady {
args.Status = structs.NodeStatusInit
}
}

// Commit this update via Raft
var index uint64
if node.Status != args.Status {
Expand Down Expand Up @@ -1177,8 +1220,11 @@ func (n *Node) GetClientAllocs(args *structs.NodeSpecificRequest,
// UpdateAlloc is used to update the client status of an allocation. It should
// only be called by clients.
//
// Clients must first register and heartbeat successfully before they are able
// to call this method.
// Calling this method returns an error when:
// - The node is not registered in the server yet. Clients must first call the
// Register method.
// - The node status is down or disconnected. Clients must call the
// UpdateStatus method to update its status in the server.
func (n *Node) UpdateAlloc(args *structs.AllocUpdateRequest, reply *structs.GenericResponse) error {
authErr := n.srv.Authenticate(n.ctx, args)

Expand Down Expand Up @@ -1216,8 +1262,8 @@ func (n *Node) UpdateAlloc(args *structs.AllocUpdateRequest, reply *structs.Gene
if node == nil {
return fmt.Errorf("node %s not found", nodeID)
}
if node.Status != structs.NodeStatusReady {
return fmt.Errorf("node %s is %s, not %s", nodeID, node.Status, structs.NodeStatusReady)
if node.UnresponsiveStatus() {
return fmt.Errorf("node %s is not allowed to update allocs while in status %s", nodeID, node.Status)
}

// Ensure that evals aren't set from client RPCs
Expand Down
219 changes: 202 additions & 17 deletions nomad/node_endpoint_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package nomad

import (
"context"
"errors"
"fmt"
"net"
Expand All @@ -23,6 +24,7 @@ import (
"github.com/hashicorp/nomad/testutil"
vapi "github.com/hashicorp/vault/api"
"github.com/kr/pretty"
"github.com/shoenig/test/must"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
Expand Down Expand Up @@ -524,6 +526,193 @@ func TestClientEndpoint_UpdateStatus_Vault(t *testing.T) {
}
}

func TestClientEndpoint_UpdateStatus_Reconnect(t *testing.T) {
ci.Parallel(t)

// Setup server with tighter heartbeat so we don't have to wait so long
// for nodes to go down.
heartbeatTTL := time.Duration(500*testutil.TestMultiplier()) * time.Millisecond
s, cleanupS := TestServer(t, func(c *Config) {
c.MinHeartbeatTTL = heartbeatTTL
c.HeartbeatGrace = 2 * heartbeatTTL
})
codec := rpcClient(t, s)
defer cleanupS()
testutil.WaitForLeader(t, s.RPC)

// Register node.
node := mock.Node()
reg := &structs.NodeRegisterRequest{
Node: node,
WriteRequest: structs.WriteRequest{Region: "global"},
}
var nodeUpdateResp structs.NodeUpdateResponse
err := msgpackrpc.CallWithCodec(codec, "Node.Register", reg, &nodeUpdateResp)
must.NoError(t, err)

// Start heartbeat.
heartbeat := func(ctx context.Context) {
ticker := time.NewTicker(heartbeatTTL / 2)
defer ticker.Stop()

for {
select {
case <-ctx.Done():
return
case <-ticker.C:
if t.Failed() {
return
}

req := &structs.NodeUpdateStatusRequest{
NodeID: node.ID,
Status: structs.NodeStatusReady,
WriteRequest: structs.WriteRequest{Region: "global"},
}
var resp structs.NodeUpdateResponse
// Ignore errors since an unexpected failed heartbeat will cause
// the test conditions to fail.
msgpackrpc.CallWithCodec(codec, "Node.UpdateStatus", req, &resp)
}
}
}
heartbeatCtx, cancelHeartbeat := context.WithCancel(context.Background())
defer cancelHeartbeat()
go heartbeat(heartbeatCtx)

// Wait for node to be ready.
testutil.WaitForClientStatus(t, s.RPC, node.ID, "global", structs.NodeStatusReady)

// Register job with max_client_disconnect.
job := mock.Job()
job.Constraints = []*structs.Constraint{}
job.TaskGroups[0].Count = 1
job.TaskGroups[0].MaxClientDisconnect = pointer.Of(time.Hour)
job.TaskGroups[0].Constraints = []*structs.Constraint{}
job.TaskGroups[0].Tasks[0].Driver = "mock_driver"
job.TaskGroups[0].Tasks[0].Config = map[string]interface{}{
"run_for": "10m",
}

jobReq := &structs.JobRegisterRequest{
Job: job,
WriteRequest: structs.WriteRequest{
Region: "global",
Namespace: job.Namespace,
},
}
var jobResp structs.JobRegisterResponse
err = msgpackrpc.CallWithCodec(codec, "Job.Register", jobReq, &jobResp)
must.NoError(t, err)

// Wait for alloc to be pending in the server.
testutil.WaitForJobAllocStatus(t, s.RPC, job, map[string]int{
structs.AllocClientStatusPending: 1,
})

// Get allocs that node should run.
allocsReq := &structs.NodeSpecificRequest{
NodeID: node.ID,
QueryOptions: structs.QueryOptions{
Region: "global",
},
}
var allocsResp structs.NodeAllocsResponse
err = msgpackrpc.CallWithCodec(codec, "Node.GetAllocs", allocsReq, &allocsResp)
must.NoError(t, err)
must.Len(t, 1, allocsResp.Allocs)

// Tell server the alloc is running.
// Save the alloc so we can reuse the request later.
alloc := allocsResp.Allocs[0].Copy()
alloc.ClientStatus = structs.AllocClientStatusRunning

allocUpdateReq := &structs.AllocUpdateRequest{
Alloc: []*structs.Allocation{alloc},
WriteRequest: structs.WriteRequest{
Region: "global",
},
}
var resp structs.GenericResponse
err = msgpackrpc.CallWithCodec(codec, "Node.UpdateAlloc", allocUpdateReq, &resp)
must.NoError(t, err)

// Wait for alloc to be running in the server.
testutil.WaitForJobAllocStatus(t, s.RPC, job, map[string]int{
structs.AllocClientStatusRunning: 1,
})

// Stop heartbeat and wait for the client to be disconnected and the alloc
// to be unknown.
cancelHeartbeat()
testutil.WaitForClientStatus(t, s.RPC, node.ID, "global", structs.NodeStatusDisconnected)
testutil.WaitForJobAllocStatus(t, s.RPC, job, map[string]int{
structs.AllocClientStatusUnknown: 1,
})

// Restart heartbeat to reconnect node.
heartbeatCtx, cancelHeartbeat = context.WithCancel(context.Background())
defer cancelHeartbeat()
go heartbeat(heartbeatCtx)

// Wait a few heartbeats and check that the node is still initializing.
//
// The heartbeat should not update the node to ready until it updates its
// allocs status with the server so the scheduler have the necessary
// information to avoid unnecessary placements.
time.Sleep(3 * heartbeatTTL)
testutil.WaitForClientStatus(t, s.RPC, node.ID, "global", structs.NodeStatusInit)

// Get allocs that node should run.
// The node should only have one alloc assigned until it updates its allocs
// status with the server.
allocsReq = &structs.NodeSpecificRequest{
NodeID: node.ID,
QueryOptions: structs.QueryOptions{
Region: "global",
},
}
err = msgpackrpc.CallWithCodec(codec, "Node.GetAllocs", allocsReq, &allocsResp)
must.NoError(t, err)
must.Len(t, 1, allocsResp.Allocs)

// Tell server the alloc is still running.
err = msgpackrpc.CallWithCodec(codec, "Node.UpdateAlloc", allocUpdateReq, &resp)
must.NoError(t, err)

// The client must end in the same state as before it disconnected:
// - client status is ready.
// - only 1 alloc and the alloc is running.
// - all evals are terminal, so cluster is in a stable state.
testutil.WaitForClientStatus(t, s.RPC, node.ID, "global", structs.NodeStatusReady)
testutil.WaitForJobAllocStatus(t, s.RPC, job, map[string]int{
structs.AllocClientStatusRunning: 1,
})
testutil.WaitForResult(func() (bool, error) {
state := s.fsm.State()
ws := memdb.NewWatchSet()
evals, err := state.EvalsByJob(ws, job.Namespace, job.ID)
if err != nil {
return false, fmt.Errorf("failed to read evals: %v", err)
}
for _, eval := range evals {
// TODO: remove this check once the disconnect process stops
// leaking a max-disconnect-timeout eval.
// https://github.com/hashicorp/nomad/issues/12809
if eval.TriggeredBy == structs.EvalTriggerMaxDisconnectTimeout {
continue
}

if !eval.TerminalStatus() {
return false, fmt.Errorf("found %s eval", eval.Status)
}
}
return true, nil
}, func(err error) {
must.NoError(t, err)
})
}

func TestClientEndpoint_UpdateStatus_HeartbeatRecovery(t *testing.T) {
ci.Parallel(t)
require := require.New(t)
Expand Down Expand Up @@ -639,29 +828,25 @@ func TestClientEndpoint_Register_GetEvals(t *testing.T) {
}

// Transition it to down and then ready
node.Status = structs.NodeStatusDown
reg = &structs.NodeRegisterRequest{
Node: node,
req := &structs.NodeUpdateStatusRequest{
NodeID: node.ID,
Status: structs.NodeStatusDown,
WriteRequest: structs.WriteRequest{Region: "global"},
}

// Fetch the response
if err := msgpackrpc.CallWithCodec(codec, "Node.Register", reg, &resp); err != nil {
if err := msgpackrpc.CallWithCodec(codec, "Node.UpdateStatus", req, &resp); err != nil {
t.Fatalf("err: %v", err)
}

if len(resp.EvalIDs) != 1 {
t.Fatalf("expected one eval; got %#v", resp.EvalIDs)
}

node.Status = structs.NodeStatusReady
reg = &structs.NodeRegisterRequest{
Node: node,
req = &structs.NodeUpdateStatusRequest{
NodeID: node.ID,
Status: structs.NodeStatusReady,
WriteRequest: structs.WriteRequest{Region: "global"},
}

// Fetch the response
if err := msgpackrpc.CallWithCodec(codec, "Node.Register", reg, &resp); err != nil {
if err := msgpackrpc.CallWithCodec(codec, "Node.UpdateStatus", req, &resp); err != nil {
t.Fatalf("err: %v", err)
}

Expand Down Expand Up @@ -1369,12 +1554,12 @@ func TestClientEndpoint_Drain_Down(t *testing.T) {
require.Nil(msgpackrpc.CallWithCodec(codec, "Node.UpdateDrain", dereg, &resp2))

// Mark the node as down
node.Status = structs.NodeStatusDown
reg = &structs.NodeRegisterRequest{
Node: node,
req := &structs.NodeUpdateStatusRequest{
NodeID: node.ID,
Status: structs.NodeStatusDown,
WriteRequest: structs.WriteRequest{Region: "global"},
}
require.Nil(msgpackrpc.CallWithCodec(codec, "Node.Register", reg, &resp))
require.Nil(msgpackrpc.CallWithCodec(codec, "Node.UpdateStatus", req, &resp))

// Ensure that the allocation has transitioned to lost
testutil.WaitForResult(func() (bool, error) {
Expand Down Expand Up @@ -2581,7 +2766,7 @@ func TestClientEndpoint_UpdateAlloc_NodeNotReady(t *testing.T) {
}
var allocUpdateResp structs.NodeAllocsResponse
err = msgpackrpc.CallWithCodec(codec, "Node.UpdateAlloc", allocUpdateReq, &allocUpdateResp)
require.ErrorContains(t, err, "not ready")
require.ErrorContains(t, err, "not allowed to update allocs")

// Send request without an explicit node ID.
updatedAlloc.NodeID = ""
Expand Down
Loading

0 comments on commit 2659757

Please sign in to comment.