Skip to content

Commit

Permalink
client/metadata: fix crasher caused by AllowStale = false (#16549)
Browse files Browse the repository at this point in the history
Fixes #16517

Given a 3 Server cluster with at least 1 Client connected to Follower 1:

If a NodeMeta.{Apply,Read} for the Client request is received by
Follower 1 with `AllowStale = false` the Follower will forward the
request to the Leader.

The Leader, not being connected to the target Client, will forward the
RPC to Follower 1.

Follower 1, seeing AllowStale=false, will forward the request to the
Leader.

The Leader, not being connected to... well hoppefully you get the
picture: an infinite loop occurs.
  • Loading branch information
schmichael authored Mar 20, 2023
1 parent 695df42 commit fb08518
Show file tree
Hide file tree
Showing 13 changed files with 197 additions and 31 deletions.
3 changes: 3 additions & 0 deletions .changelog/16549.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```release-note:bug
core: Fixed a bug where Dynamic Node Metadata requests could crash servers
```
6 changes: 0 additions & 6 deletions api/allocations.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,9 +148,6 @@ func (a *Allocations) GC(alloc *Allocation, q *QueryOptions) error {
// Note: for cluster topologies where API consumers don't have network access to
// Nomad clients, set api.ClientConnTimeout to a small value (ex 1ms) to avoid
// long pauses on this API call.
//
// BREAKING: This method will have the following signature in 1.6.0
// func (a *Allocations) Restart(allocID string, taskName string, allTasks bool, w *WriteOptions) (*WriteMeta, error) {
func (a *Allocations) Restart(alloc *Allocation, taskName string, q *QueryOptions) error {
req := AllocationRestartRequest{
TaskName: taskName,
Expand Down Expand Up @@ -223,9 +220,6 @@ type AllocStopResponse struct {
// Note: for cluster topologies where API consumers don't have network access to
// Nomad clients, set api.ClientConnTimeout to a small value (ex 1ms) to avoid
// long pauses on this API call.
//
// BREAKING: This method will have the following signature in 1.6.0
// func (a *Allocations) Signal(allocID string, task string, signal string, w *WriteOptions) (*WriteMeta, error) {
func (a *Allocations) Signal(alloc *Allocation, q *QueryOptions, task, signal string) error {
req := AllocSignalRequest{
Signal: signal,
Expand Down
30 changes: 27 additions & 3 deletions api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -937,9 +937,8 @@ func (c *Client) query(endpoint string, out any, q *QueryOptions) (*QueryMeta, e
return qm, nil
}

// putQuery is used to do a PUT request when doing a read against an endpoint
// and deserialize the response into an interface using standard Nomad
// conventions.
// putQuery is used to do a PUT request when doing a "write" to a Client RPC.
// Client RPCs must use QueryOptions to allow setting AllowStale=true.
func (c *Client) putQuery(endpoint string, in, out any, q *QueryOptions) (*QueryMeta, error) {
r, err := c.newRequest("PUT", endpoint)
if err != nil {
Expand Down Expand Up @@ -969,6 +968,31 @@ func (c *Client) put(endpoint string, in, out any, q *WriteOptions) (*WriteMeta,
return c.write(http.MethodPut, endpoint, in, out, q)
}

// postQuery is used to do a POST request when doing a "write" to a Client RPC.
// Client RPCs must use QueryOptions to allow setting AllowStale=true.
func (c *Client) postQuery(endpoint string, in, out any, q *QueryOptions) (*QueryMeta, error) {
r, err := c.newRequest("POST", endpoint)
if err != nil {
return nil, err
}
r.setQueryOptions(q)
r.obj = in
rtt, resp, err := requireOK(c.doRequest(r))
if err != nil {
return nil, err
}
defer resp.Body.Close()

qm := &QueryMeta{}
parseQueryMeta(resp, qm)
qm.RequestTime = rtt

if err := decodeBody(resp, out); err != nil {
return nil, err
}
return qm, nil
}

// post is used to do a POST request against an endpoint and
// serialize/deserialized using the standard Nomad conventions.
func (c *Client) post(endpoint string, in, out any, q *WriteOptions) (*WriteMeta, error) {
Expand Down
4 changes: 2 additions & 2 deletions api/node_meta.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,9 @@ func (n *Nodes) Meta() *NodeMeta {

// Apply dynamic Node metadata updates to a Node. If NodeID is unset then Node
// receiving the request is modified.
func (n *NodeMeta) Apply(meta *NodeMetaApplyRequest, qo *WriteOptions) (*NodeMetaResponse, error) {
func (n *NodeMeta) Apply(meta *NodeMetaApplyRequest, qo *QueryOptions) (*NodeMetaResponse, error) {
var out NodeMetaResponse
_, err := n.client.post("/v1/client/metadata", meta, &out, qo)
_, err := n.client.postQuery("/v1/client/metadata", meta, &out, qo)
if err != nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion command/agent/meta_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ func (s *HTTPServer) nodeMetaApply(resp http.ResponseWriter, req *http.Request)
return nil, CodedError(http.StatusBadRequest, err.Error())
}

s.parseWriteRequest(req, &args.WriteRequest)
s.parse(resp, req, &args.QueryOptions.Region, &args.QueryOptions)
parseNode(req, &args.NodeID)

// Determine the handler to use
Expand Down
8 changes: 8 additions & 0 deletions contributing/checklist-rpc-endpoint.md
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,14 @@ Prefer adding a new message to changing any existing RPC messages.
upgraded, so use this to guard sending the new RPC, else send the old RPC
* Version must match the actual release version!

* [ ] If implementing a Client RPC...
* Use `QueryOptions` instead of `WriteRequest` in the Request struct as
`WriteRequest` is only for *Raft* writes.
* Set `QueryOptions.AllowStale = true` in the *Server* RPC forwarder to avoid
an infinite loop between leaders and followers when a Client RPC is
forwarded through a follower. See
https://github.com/hashicorp/nomad/issues/16517

## Docs

* [ ] Changelog
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ require (
github.com/mitchellh/go-glint v0.0.0-20210722152315-6515ceb4a127
github.com/mitchellh/go-homedir v1.1.0
github.com/mitchellh/go-ps v0.0.0-20190716172923-621e5597135b
github.com/mitchellh/go-testing-interface v1.14.1
github.com/mitchellh/go-testing-interface v1.14.2-0.20210821155943-2d9075ca8770
github.com/mitchellh/hashstructure v1.1.0
github.com/mitchellh/mapstructure v1.5.0
github.com/mitchellh/reflectwalk v1.0.2
Expand Down
3 changes: 2 additions & 1 deletion go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1091,8 +1091,9 @@ github.com/mitchellh/go-ps v0.0.0-20190716172923-621e5597135b h1:9+ke9YJ9KGWw5AN
github.com/mitchellh/go-ps v0.0.0-20190716172923-621e5597135b/go.mod h1:r1VsdOzOPt1ZSrGZWFoNhsAedKnEd6r9Np1+5blZCWk=
github.com/mitchellh/go-testing-interface v0.0.0-20171004221916-a61a99592b77/go.mod h1:kRemZodwjscx+RGhAo8eIhFbs2+BFgRtFPeD/KE+zxI=
github.com/mitchellh/go-testing-interface v1.0.0/go.mod h1:kRemZodwjscx+RGhAo8eIhFbs2+BFgRtFPeD/KE+zxI=
github.com/mitchellh/go-testing-interface v1.14.1 h1:jrgshOhYAUVNMAJiKbEu7EqAwgJJ2JqpQmpLJOu07cU=
github.com/mitchellh/go-testing-interface v1.14.1/go.mod h1:gfgS7OtZj6MA4U1UrDRp04twqAjfvlZyCfX3sDjEym8=
github.com/mitchellh/go-testing-interface v1.14.2-0.20210821155943-2d9075ca8770 h1:drhDO54gdT/a15GBcMRmunZiNcLgPiFIJa23KzmcvcU=
github.com/mitchellh/go-testing-interface v1.14.2-0.20210821155943-2d9075ca8770/go.mod h1:SO/iHr6q2EzbqRApt+8/E9wqebTwQn5y+UlB04bxzo0=
github.com/mitchellh/go-wordwrap v0.0.0-20150314170334-ad45545899c7/go.mod h1:ZXFpozHsX6DPmq2I0TCekCxypsnAUbP2oI0UX1GXzOo=
github.com/mitchellh/go-wordwrap v1.0.0/go.mod h1:ZXFpozHsX6DPmq2I0TCekCxypsnAUbP2oI0UX1GXzOo=
github.com/mitchellh/go-wordwrap v1.0.1 h1:TLuKupo69TCn6TQSyGxwI1EblZZEsQ0vMlAFQflz0v0=
Expand Down
8 changes: 8 additions & 0 deletions nomad/client_meta_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,10 @@ func newNodeMetaEndpoint(srv *Server) *NodeMeta {
func (n *NodeMeta) Apply(args *structs.NodeMetaApplyRequest, reply *structs.NodeMetaResponse) error {
const method = "NodeMeta.Apply"

// Prevent infinite loop between leader and
// follower-with-the-target-node-connection.
args.QueryOptions.AllowStale = true

authErr := n.srv.Authenticate(nil, args)
if done, err := n.srv.forward(method, args, args, reply); done {
return err
Expand All @@ -48,6 +52,10 @@ func (n *NodeMeta) Apply(args *structs.NodeMetaApplyRequest, reply *structs.Node
func (n *NodeMeta) Read(args *structs.NodeSpecificRequest, reply *structs.NodeMetaResponse) error {
const method = "NodeMeta.Read"

// Prevent infinite loop between leader and
// follower-with-the-target-node-connection.
args.QueryOptions.AllowStale = true

authErr := n.srv.Authenticate(nil, args)
if done, err := n.srv.forward(method, args, args, reply); done {
return err
Expand Down
129 changes: 129 additions & 0 deletions nomad/client_meta_endpoint_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
package nomad

import (
"testing"

"github.com/hashicorp/nomad/ci"
"github.com/hashicorp/nomad/client"
"github.com/hashicorp/nomad/client/config"
"github.com/hashicorp/nomad/helper/pointer"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/hashicorp/nomad/testutil"
"github.com/shoenig/test/must"
)

// TestNodeMeta_Forward asserts that Client RPCs do not result in infinite
// loops. For example in a cluster with 1 Leader, 2 Followers, and a Node
// connected to Follower 1:
//
// If a NodeMeta.Apply RPC with AllowStale=false is received by Follower 1, it
// will honor AllowStale=false and forward the request to the Leader.
//
// The Leader will accept the RPC, notice that Follower 1 has a connection to
// the Node, and the Leader will send the request back to Follower 1.
//
// Follower 1, ever respectful of AllowStale=false, will forward it back to the
// Leader.
//
// The Leader, being unable to forward to the Node, will send it back to
// Follower 1.
//
// This argument will continue until one of the Servers runs out of memory or
// patience and stomps away in anger (crashes). Like any good argument the
// ending is never pretty as the Servers will suffer CPU starvation and
// potentially Raft flapping before anyone actually OOMs.
//
// See https://github.com/hashicorp/nomad/issues/16517 for details.
//
// If test fails it will do so spectacularly by consuming all available CPU and
// potentially all available memory. Running it in a VM or container is
// suggested.
func TestNodeMeta_Forward(t *testing.T) {
ci.Parallel(t)

servers := []*Server{}
for i := 0; i < 3; i++ {
s, cleanup := TestServer(t, func(c *Config) {
c.BootstrapExpect = 3
c.NumSchedulers = 0
})
t.Cleanup(cleanup)
servers = append(servers, s)
}

TestJoin(t, servers...)
leader := testutil.WaitForLeaders(t, servers[0].RPC, servers[1].RPC, servers[2].RPC)

followers := []string{}
for _, s := range servers {
if addr := s.config.RPCAddr.String(); addr != leader {
followers = append(followers, addr)
}
}
t.Logf("leader=%s followers=%q", leader, followers)

clients := []*client.Client{}
for i := 0; i < 4; i++ {
c, cleanup := client.TestClient(t, func(c *config.Config) {
// Clients will rebalance across all servers, but try to get them to use
// followers to ensure we don't hit the loop in #16517
c.Servers = followers
})
defer cleanup()
clients = append(clients, c)
}
for _, c := range clients {
testutil.WaitForClient(t, servers[0].RPC, c.NodeID(), c.Region())
}

agentRPCs := []func(string, any, any) error{}
nodeIDs := make([]string, 0, len(clients))

// Build list of agents and node IDs
for _, s := range servers {
agentRPCs = append(agentRPCs, s.RPC)
}

for _, c := range clients {
agentRPCs = append(agentRPCs, c.RPC)
nodeIDs = append(nodeIDs, c.NodeID())
}

region := clients[0].Region()

// Apply metadata to every client through every agent to ensure forwarding
// always works regardless of path taken.
for _, rpc := range agentRPCs {
for _, nodeID := range nodeIDs {
args := &structs.NodeMetaApplyRequest{
// Intentionally don't set QueryOptions.AllowStale to exercise #16517
QueryOptions: structs.QueryOptions{
Region: region,
},
NodeID: nodeID,
Meta: map[string]*string{"testing": pointer.Of("123")},
}
reply := &structs.NodeMetaResponse{}
must.NoError(t, rpc("NodeMeta.Apply", args, reply))
must.MapNotEmpty(t, reply.Meta)
}
}

for _, rpc := range agentRPCs {
for _, nodeID := range nodeIDs {
args := &structs.NodeSpecificRequest{
// Intentionally don't set QueryOptions.AllowStale to exercise #16517
QueryOptions: structs.QueryOptions{
Region: region,
},
NodeID: nodeID,
}
reply := &structs.NodeMetaResponse{}
must.NoError(t, rpc("NodeMeta.Read", args, reply))
must.MapNotEmpty(t, reply.Meta)
must.Eq(t, reply.Meta["testing"], "123")
must.MapNotEmpty(t, reply.Dynamic)
must.Eq(t, *reply.Dynamic["testing"], "123")
}
}
}
2 changes: 1 addition & 1 deletion nomad/structs/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -357,7 +357,7 @@ func (di *DriverInfo) HealthCheckEquals(other *DriverInfo) bool {

// NodeMetaApplyRequest is used to update Node metadata on Client agents.
type NodeMetaApplyRequest struct {
WriteRequest
QueryOptions // Client RPCs must use QueryOptions to set AllowStale=true

// NodeID is the node being targeted by this request (or the node
// receiving this request if NodeID is empty).
Expand Down
18 changes: 7 additions & 11 deletions nomad/testing.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"math/rand"
"net"
"sync/atomic"
"testing"
"time"

"github.com/hashicorp/nomad/ci"
Expand All @@ -15,14 +14,15 @@ import (
"github.com/hashicorp/nomad/nomad/mock"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/hashicorp/nomad/version"
testing "github.com/mitchellh/go-testing-interface"
"github.com/shoenig/test/must"
)

var (
nodeNumber int32 = 0
)

func TestACLServer(t *testing.T, cb func(*Config)) (*Server, *structs.ACLToken, func()) {
func TestACLServer(t testing.T, cb func(*Config)) (*Server, *structs.ACLToken, func()) {
server, cleanup := TestServer(t, func(c *Config) {
c.ACLEnabled = true
if cb != nil {
Expand All @@ -37,13 +37,13 @@ func TestACLServer(t *testing.T, cb func(*Config)) (*Server, *structs.ACLToken,
return server, token, cleanup
}

func TestServer(t *testing.T, cb func(*Config)) (*Server, func()) {
func TestServer(t testing.T, cb func(*Config)) (*Server, func()) {
s, c, err := TestServerErr(t, cb)
must.NoError(t, err, must.Sprint("failed to start test server"))
return s, c
}

func TestServerErr(t *testing.T, cb func(*Config)) (*Server, func(), error) {
func TestServerErr(t testing.T, cb func(*Config)) (*Server, func(), error) {
// Setup the default settings
config := DefaultConfig()

Expand Down Expand Up @@ -150,19 +150,15 @@ func TestServerErr(t *testing.T, cb func(*Config)) (*Server, func(), error) {
return nil, nil, errors.New("unable to acquire ports for test server")
}

func TestJoin(t *testing.T, servers ...*Server) {
func TestJoin(t testing.T, servers ...*Server) {
for i := 0; i < len(servers)-1; i++ {
addr := fmt.Sprintf("127.0.0.1:%d",
servers[i].config.SerfConfig.MemberlistConfig.BindPort)

for j := i + 1; j < len(servers); j++ {
num, err := servers[j].Join([]string{addr})
if err != nil {
t.Fatalf("err: %v", err)
}
if num != 1 {
t.Fatalf("bad: %d", num)
}
must.NoError(t, err)
must.Eq(t, 1, num)
}
}
}
13 changes: 8 additions & 5 deletions testutil/wait.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,15 +143,16 @@ func WaitForLeader(t testing.TB, rpc rpcFn) {
})
}

// WaitForLeaders blocks until each serverRPC knows the leader.
func WaitForLeaders(t testing.TB, serverRPCs ...rpcFn) {
// WaitForLeaders blocks until each rpcs knows the leader.
func WaitForLeaders(t testing.TB, rpcs ...rpcFn) string {
t.Helper()

for i := 0; i < len(serverRPCs); i++ {
var leader string
for i := 0; i < len(rpcs); i++ {
ok := func() (bool, error) {
leader = ""
args := &structs.GenericRequest{}
var leader string
err := serverRPCs[i]("Status.Leader", args, &leader)
err := rpcs[i]("Status.Leader", args, &leader)
return leader != "", err
}
must.Wait(t, wait.InitialSuccess(
Expand All @@ -160,6 +161,8 @@ func WaitForLeaders(t testing.TB, serverRPCs ...rpcFn) {
wait.Gap(1*time.Second),
))
}

return leader
}

// WaitForClient blocks until the client can be found
Expand Down

0 comments on commit fb08518

Please sign in to comment.