Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

client: don't use Status RPC for Consul discovery (#16490) #16490

Merged
merged 7 commits into from
Mar 16, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .changelog/16490.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```release-note:bug
client: Fixed a bug where clients using Consul discovery to join the cluster would get permission denied errors
```
1 change: 1 addition & 0 deletions .semgrep/rpc_endpoint.yml
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ rules:
- pattern-not: '"CSIPlugin.List"'
- pattern-not: '"Status.Leader"'
- pattern-not: '"Status.Peers"'
- pattern-not: '"Status.RPCServers"'
- pattern-not: '"Status.Version"'
message: "RPC method $METHOD appears to be unauthenticated"
languages:
Expand Down
33 changes: 16 additions & 17 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -2906,7 +2906,8 @@ func (c *Client) consulDiscoveryImpl() error {
dcs = dcs[0:helper.Min(len(dcs), datacenterQueryLimit)]
}

// Query for servers in this client's region only
// Query for servers in this client's region only. Note this has to be an
// unauthenticated request because we haven't registered yet.
region := c.Region()
rpcargs := structs.GenericRequest{
QueryOptions: structs.QueryOptions{
Expand Down Expand Up @@ -2944,26 +2945,24 @@ DISCOLOOP:
continue
}

// Query the members from the region that Consul gave us, and
// extract the client-advertise RPC address from each member
var membersResp structs.ServerMembersResponse
if err := c.connPool.RPC(region, addr, "Status.Members", rpcargs, &membersResp); err != nil {
srv := &servers.Server{Addr: addr}
nomadServers = append(nomadServers, srv)

// Query the client-advertise RPC addresses from the region that
// Consul gave us
var members []string
if err := c.connPool.RPC(region, addr, "Status.RPCServers", rpcargs, &members); err != nil {
tgross marked this conversation as resolved.
Show resolved Hide resolved
mErr.Errors = append(mErr.Errors, err)
continue
}
for _, member := range membersResp.Members {
if addrTag, ok := member.Tags["rpc_addr"]; ok {
if portTag, ok := member.Tags["port"]; ok {
addr, err := net.ResolveTCPAddr("tcp",
fmt.Sprintf("%s:%s", addrTag, portTag))
if err != nil {
mErr.Errors = append(mErr.Errors, err)
continue
}
srv := &servers.Server{Addr: addr}
nomadServers = append(nomadServers, srv)
}
for _, member := range members {
addr, err := net.ResolveTCPAddr("tcp", member)
if err != nil {
mErr.Errors = append(mErr.Errors, err)
continue
}
srv := &servers.Server{Addr: addr}
nomadServers = append(nomadServers, srv)
}

if len(nomadServers) > 0 {
Expand Down
12 changes: 10 additions & 2 deletions nomad/acl.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,9 +161,14 @@ func (s *Server) remoteIPFromRPCContext(ctx *RPCContext) (net.IP, error) {
// for the identity they intend the operation to be performed with.
func (s *Server) ResolveACL(args structs.RequestWithIdentity) (*acl.ACL, error) {
identity := args.GetIdentity()
if !s.config.ACLEnabled || identity == nil {
if !s.config.ACLEnabled {
return nil, nil
tgross marked this conversation as resolved.
Show resolved Hide resolved
}
if identity == nil {
// Server.Authenticate should never return a nil identity unless there's
// an authentication error, but enforce that invariant here
return nil, structs.ErrPermissionDenied
}
aclToken := identity.GetACLToken()
if aclToken != nil {
return s.ResolveACLForToken(aclToken)
Expand All @@ -172,7 +177,10 @@ func (s *Server) ResolveACL(args structs.RequestWithIdentity) (*acl.ACL, error)
if claims != nil {
return s.ResolveClaims(claims)
}
return nil, nil

// return an error here so that we enforce the invariant that we check for
// Identity.ClientID before trying to resolve ACLs
return nil, structs.ErrPermissionDenied
}

// ResolveACLForToken resolves an ACL from a token only. It should be used only
Expand Down
25 changes: 25 additions & 0 deletions nomad/status_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,31 @@ func (s *Status) Peers(args *structs.GenericRequest, reply *[]string) error {
return nil
}

// RPCServers is used to get all the RPC server addresses in a region
func (s *Status) RPCServers(args *structs.GenericRequest, reply *[]string) error {
tgross marked this conversation as resolved.
Show resolved Hide resolved
tgross marked this conversation as resolved.
Show resolved Hide resolved
// note: we're intentionally throwing away any auth error here and only
// authenticate so that we can measure rate metrics
s.srv.Authenticate(s.ctx, args)
s.srv.MeasureRPCRate("status", structs.RateMetricList, args)

if args.Region == "" {
args.Region = s.srv.config.Region
}
if done, err := s.srv.forward("Status.RPCServers", args, args, reply); done {
return err
}

future := s.srv.raft.GetConfiguration()
if err := future.Error(); err != nil {
return err
}

for _, server := range future.Configuration().Servers {
*reply = append(*reply, s.srv.localPeers[server.Address].RPCAddr.String())
}
return nil
}

// Members return the list of servers in a cluster that a particular server is
// aware of
func (s *Status) Members(args *structs.GenericRequest, reply *structs.ServerMembersResponse) error {
Expand Down
54 changes: 52 additions & 2 deletions nomad/status_endpoint_test.go
Original file line number Diff line number Diff line change
@@ -1,17 +1,21 @@
package nomad

import (
"fmt"
"net"
"testing"

msgpackrpc "github.com/hashicorp/net-rpc-msgpackrpc"
"github.com/shoenig/test/must"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/hashicorp/nomad/acl"
"github.com/hashicorp/nomad/ci"
"github.com/hashicorp/nomad/helper/uuid"
"github.com/hashicorp/nomad/nomad/mock"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/hashicorp/nomad/testutil"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func TestStatusPing(t *testing.T) {
Expand Down Expand Up @@ -73,6 +77,52 @@ func TestStatusPeers(t *testing.T) {
}
}

func TestStatus_RPCServers(t *testing.T) {
ci.Parallel(t)

advAddr1 := "127.0.1.1:1234"
adv1, err := net.ResolveTCPAddr("tcp", advAddr1)
must.NoError(t, err)

s1, cleanupS1 := TestServer(t, func(c *Config) {
c.Region = "region1"
c.ClientRPCAdvertise = adv1
})
defer cleanupS1()

s2, cleanupS2 := TestServer(t, func(c *Config) {
c.Region = "region2"
})
defer cleanupS2()

// Join them together
s2Addr := fmt.Sprintf("127.0.0.1:%d", s2.config.SerfConfig.MemberlistConfig.BindPort)
n, err := s1.Join([]string{s2Addr})
must.NoError(t, err, must.Sprintf("Failed joining: %v (%d joined)", err, n))

codec := rpcClient(t, s1)

t.Run("own region", func(t *testing.T) {
arg := &structs.GenericRequest{
QueryOptions: structs.QueryOptions{Region: "region1"},
}
var members []string
must.NoError(t, msgpackrpc.CallWithCodec(codec, "Status.RPCServers", arg, &members))
must.Len(t, 1, members)
must.Eq(t, advAddr1, members[0])
})

t.Run("other region", func(t *testing.T) {
arg := &structs.GenericRequest{
QueryOptions: structs.QueryOptions{Region: "region2"},
}
var members []string
must.NoError(t, msgpackrpc.CallWithCodec(codec, "Status.RPCServers", arg, &members))
must.Len(t, 1, members)
must.Eq(t, s2.clientRpcAdvertise.String(), members[0])
})
}

func TestStatusMembers(t *testing.T) {
ci.Parallel(t)

Expand Down