Skip to content

Commit

Permalink
[1.15.x] grpc: ensure grpc resolver correctly uses lan/wan addresses …
Browse files Browse the repository at this point in the history
…on servers

The grpc resolver implementation is fed from changes to the
router.Router. Within the router there is a map of various areas storing
the addressing information for servers in those areas. All map entries
are of the WAN variety except a single special entry for the LAN.

Addressing information in the LAN "area" are local addresses intended
for use when making a client-to-server or server-to-server request.

The client agent correctly updates this LAN area when receiving lan serf
events, so by extension the grpc resolver works fine in that scenario.

The server agent only initially populates a single entry in the LAN area
(for itself) on startup, and then never mutates that area map again.
For normal RPCs a different structure is used for LAN routing.

Additionally when selecting a server to contact in the local datacenter
it will randomly select addresses from either the LAN or WAN addressed
entries in the map.

Unfortunately this means that the grpc resolver stack as it exists on
server agents is either broken or only accidentally functions by having
servers dial each other over the WAN-accessible address. If the operator
disables the serf wan port completely likely this incidental functioning
would break.

This PR enforces that local requests for servers (both for stale reads
or leader forwarded requests) exclusively use the LAN "area" information
and also fixes it so that servers keep that area up to date in the
router.

A test for the grpc resolver logic was added, as well as a higher level
full-stack test to ensure the externally perceived bug does not return.
  • Loading branch information
rboyer committed May 9, 2023
1 parent 500178a commit f000010
Show file tree
Hide file tree
Showing 10 changed files with 565 additions and 33 deletions.
10 changes: 7 additions & 3 deletions agent/consul/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -501,11 +501,15 @@ func newClient(t *testing.T, config *Config) *Client {
return client
}

func newTestResolverConfig(t *testing.T, suffix string) resolver.Config {
func newTestResolverConfig(t *testing.T, suffix string, dc, agentType string) resolver.Config {
n := t.Name()
s := strings.Replace(n, "/", "", -1)
s = strings.Replace(s, "_", "", -1)
return resolver.Config{Authority: strings.ToLower(s) + "-" + suffix}
return resolver.Config{
Datacenter: dc,
AgentType: agentType,
Authority: strings.ToLower(s) + "-" + suffix,
}
}

func newDefaultDeps(t *testing.T, c *Config) Deps {
Expand All @@ -520,7 +524,7 @@ func newDefaultDeps(t *testing.T, c *Config) Deps {
tls, err := tlsutil.NewConfigurator(c.TLSConfig, logger)
require.NoError(t, err, "failed to create tls configuration")

resolverBuilder := resolver.NewServerResolverBuilder(newTestResolverConfig(t, c.NodeName+"-"+c.Datacenter))
resolverBuilder := resolver.NewServerResolverBuilder(newTestResolverConfig(t, c.NodeName+"-"+c.Datacenter, c.Datacenter, "server"))
resolver.Register(resolverBuilder)

balancerBuilder := balancer.NewBuilder(resolverBuilder.Authority(), testutil.Logger(t))
Expand Down
4 changes: 4 additions & 0 deletions agent/consul/server_serf.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/hashicorp/consul/lib"
libserf "github.com/hashicorp/consul/lib/serf"
"github.com/hashicorp/consul/logging"
"github.com/hashicorp/consul/types"
)

const (
Expand Down Expand Up @@ -356,6 +357,7 @@ func (s *Server) lanNodeJoin(me serf.MemberEvent) {

// Update server lookup
s.serverLookup.AddServer(serverMeta)
s.router.AddServer(types.AreaLAN, serverMeta)

// If we're still expecting to bootstrap, may need to handle this.
if s.config.BootstrapExpect != 0 {
Expand All @@ -377,6 +379,7 @@ func (s *Server) lanNodeUpdate(me serf.MemberEvent) {

// Update server lookup
s.serverLookup.AddServer(serverMeta)
s.router.AddServer(types.AreaLAN, serverMeta)
}
}

Expand Down Expand Up @@ -515,5 +518,6 @@ func (s *Server) lanNodeFailed(me serf.MemberEvent) {

// Update id to address map
s.serverLookup.RemoveServer(serverMeta)
s.router.RemoveServer(types.AreaLAN, serverMeta)
}
}
5 changes: 4 additions & 1 deletion agent/consul/subscribe_backend_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -383,7 +383,10 @@ func newClientWithGRPCPlumbing(t *testing.T, ops ...func(*Config)) (*Client, *re
}

resolverBuilder := resolver.NewServerResolverBuilder(newTestResolverConfig(t,
"client."+config.Datacenter+"."+string(config.NodeID)))
"client."+config.Datacenter+"."+string(config.NodeID),
config.Datacenter,
"client",
))

resolver.Register(resolverBuilder)
t.Cleanup(func() {
Expand Down
79 changes: 59 additions & 20 deletions agent/grpc-internal/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,8 @@ func TestNewDialer_WithTLSWrapper(t *testing.T) {
require.NoError(t, err)
t.Cleanup(logError(t, lis.Close))

builder := resolver.NewServerResolverBuilder(newConfig(t))
builder.AddServer(types.AreaWAN, &metadata.Server{
builder := resolver.NewServerResolverBuilder(newConfig(t, "dc1", "server"))
builder.AddServer(types.AreaLAN, &metadata.Server{
Name: "server-1",
ID: "ID1",
Datacenter: "dc1",
Expand Down Expand Up @@ -87,7 +87,7 @@ func TestNewDialer_WithALPNWrapper(t *testing.T) {
p.Wait()
}()

builder := resolver.NewServerResolverBuilder(newConfig(t))
builder := resolver.NewServerResolverBuilder(newConfig(t, "dc1", "server"))
builder.AddServer(types.AreaWAN, &metadata.Server{
Name: "server-1",
ID: "ID1",
Expand Down Expand Up @@ -142,7 +142,7 @@ func TestNewDialer_WithALPNWrapper(t *testing.T) {
func TestNewDialer_IntegrationWithTLSEnabledHandler(t *testing.T) {
// if this test is failing because of expired certificates
// use the procedure in test/CA-GENERATION.md
res := resolver.NewServerResolverBuilder(newConfig(t))
res := resolver.NewServerResolverBuilder(newConfig(t, "dc1", "server"))
registerWithGRPC(t, res)

tlsConf, err := tlsutil.NewConfigurator(tlsutil.Config{
Expand All @@ -159,9 +159,17 @@ func TestNewDialer_IntegrationWithTLSEnabledHandler(t *testing.T) {
srv := newSimpleTestServer(t, "server-1", "dc1", tlsConf)

md := srv.Metadata()
res.AddServer(types.AreaWAN, md)
res.AddServer(types.AreaLAN, md)
t.Cleanup(srv.shutdown)

{
// Put a duplicate instance of this on the WAN that will
// fail if we accidentally use it.
srv := newPanicTestServer(t, hclog.Default(), "server-1", "dc1", nil)
res.AddServer(types.AreaWAN, srv.Metadata())
t.Cleanup(srv.shutdown)
}

pool := NewClientConnPool(ClientConnPoolConfig{
Servers: res,
TLSWrapper: TLSWrapper(tlsConf.OutgoingRPCWrapper()),
Expand Down Expand Up @@ -190,7 +198,7 @@ func TestNewDialer_IntegrationWithTLSEnabledHandler_viaMeshGateway(t *testing.T)
// use the procedure in test/CA-GENERATION.md
gwAddr := ipaddr.FormatAddressPort("127.0.0.1", freeport.GetOne(t))

res := resolver.NewServerResolverBuilder(newConfig(t))
res := resolver.NewServerResolverBuilder(newConfig(t, "dc2", "server"))
registerWithGRPC(t, res)

tlsConf, err := tlsutil.NewConfigurator(tlsutil.Config{
Expand Down Expand Up @@ -266,7 +274,7 @@ func TestNewDialer_IntegrationWithTLSEnabledHandler_viaMeshGateway(t *testing.T)

func TestClientConnPool_IntegrationWithGRPCResolver_Failover(t *testing.T) {
count := 4
res := resolver.NewServerResolverBuilder(newConfig(t))
res := resolver.NewServerResolverBuilder(newConfig(t, "dc1", "server"))
registerWithGRPC(t, res)
pool := NewClientConnPool(ClientConnPoolConfig{
Servers: res,
Expand All @@ -278,9 +286,18 @@ func TestClientConnPool_IntegrationWithGRPCResolver_Failover(t *testing.T) {

for i := 0; i < count; i++ {
name := fmt.Sprintf("server-%d", i)
srv := newSimpleTestServer(t, name, "dc1", nil)
res.AddServer(types.AreaWAN, srv.Metadata())
t.Cleanup(srv.shutdown)
{
srv := newSimpleTestServer(t, name, "dc1", nil)
res.AddServer(types.AreaLAN, srv.Metadata())
t.Cleanup(srv.shutdown)
}
{
// Put a duplicate instance of this on the WAN that will
// fail if we accidentally use it.
srv := newPanicTestServer(t, hclog.Default(), name, "dc1", nil)
res.AddServer(types.AreaWAN, srv.Metadata())
t.Cleanup(srv.shutdown)
}
}

conn, err := pool.ClientConn("dc1")
Expand All @@ -293,7 +310,7 @@ func TestClientConnPool_IntegrationWithGRPCResolver_Failover(t *testing.T) {
first, err := client.Something(ctx, &testservice.Req{})
require.NoError(t, err)

res.RemoveServer(types.AreaWAN, &metadata.Server{ID: first.ServerName, Datacenter: "dc1"})
res.RemoveServer(types.AreaLAN, &metadata.Server{ID: first.ServerName, Datacenter: "dc1"})

resp, err := client.Something(ctx, &testservice.Req{})
require.NoError(t, err)
Expand All @@ -302,7 +319,7 @@ func TestClientConnPool_IntegrationWithGRPCResolver_Failover(t *testing.T) {

func TestClientConnPool_ForwardToLeader_Failover(t *testing.T) {
count := 3
res := resolver.NewServerResolverBuilder(newConfig(t))
res := resolver.NewServerResolverBuilder(newConfig(t, "dc1", "server"))
registerWithGRPC(t, res)
pool := NewClientConnPool(ClientConnPoolConfig{
Servers: res,
Expand All @@ -315,10 +332,19 @@ func TestClientConnPool_ForwardToLeader_Failover(t *testing.T) {
var servers []testServer
for i := 0; i < count; i++ {
name := fmt.Sprintf("server-%d", i)
srv := newSimpleTestServer(t, name, "dc1", nil)
res.AddServer(types.AreaWAN, srv.Metadata())
servers = append(servers, srv)
t.Cleanup(srv.shutdown)
{
srv := newSimpleTestServer(t, name, "dc1", nil)
res.AddServer(types.AreaLAN, srv.Metadata())
servers = append(servers, srv)
t.Cleanup(srv.shutdown)
}
{
// Put a duplicate instance of this on the WAN that will
// fail if we accidentally use it.
srv := newPanicTestServer(t, hclog.Default(), name, "dc1", nil)
res.AddServer(types.AreaWAN, srv.Metadata())
t.Cleanup(srv.shutdown)
}
}

// Set the leader address to the first server.
Expand All @@ -345,17 +371,21 @@ func TestClientConnPool_ForwardToLeader_Failover(t *testing.T) {
require.Equal(t, resp.ServerName, servers[1].name)
}

func newConfig(t *testing.T) resolver.Config {
func newConfig(t *testing.T, dc, agentType string) resolver.Config {
n := t.Name()
s := strings.Replace(n, "/", "", -1)
s = strings.Replace(s, "_", "", -1)
return resolver.Config{Authority: strings.ToLower(s)}
return resolver.Config{
Datacenter: dc,
AgentType: agentType,
Authority: strings.ToLower(s),
}
}

func TestClientConnPool_IntegrationWithGRPCResolver_MultiDC(t *testing.T) {
dcs := []string{"dc1", "dc2", "dc3"}

res := resolver.NewServerResolverBuilder(newConfig(t))
res := resolver.NewServerResolverBuilder(newConfig(t, "dc1", "server"))
registerWithGRPC(t, res)
pool := NewClientConnPool(ClientConnPoolConfig{
Servers: res,
Expand All @@ -368,7 +398,16 @@ func TestClientConnPool_IntegrationWithGRPCResolver_MultiDC(t *testing.T) {
for _, dc := range dcs {
name := "server-0-" + dc
srv := newSimpleTestServer(t, name, dc, nil)
res.AddServer(types.AreaWAN, srv.Metadata())
if dc == "dc1" {
res.AddServer(types.AreaLAN, srv.Metadata())
// Put a duplicate instance of this on the WAN that will
// fail if we accidentally use it.
srvBad := newPanicTestServer(t, hclog.Default(), name, dc, nil)
res.AddServer(types.AreaWAN, srvBad.Metadata())
t.Cleanup(srvBad.shutdown)
} else {
res.AddServer(types.AreaWAN, srv.Metadata())
}
t.Cleanup(srv.shutdown)
}

Expand Down
4 changes: 2 additions & 2 deletions agent/grpc-internal/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,11 @@ func TestHandler_PanicRecoveryInterceptor(t *testing.T) {
Output: &buf,
})

res := resolver.NewServerResolverBuilder(newConfig(t))
res := resolver.NewServerResolverBuilder(newConfig(t, "dc1", "server"))
registerWithGRPC(t, res)

srv := newPanicTestServer(t, logger, "server-1", "dc1", nil)
res.AddServer(types.AreaWAN, srv.Metadata())
res.AddServer(types.AreaLAN, srv.Metadata())
t.Cleanup(srv.shutdown)

pool := NewClientConnPool(ClientConnPoolConfig{
Expand Down
Loading

0 comments on commit f000010

Please sign in to comment.