Skip to content

Commit

Permalink
rpc,*: don't panic in rpc.NewServerEx
Browse files Browse the repository at this point in the history
Prior to this patch, the function `rpc.NewServerEx()` would panic if
it was unable to loads its TLS settings. This is incorrect - we want
to report a simple operational error in that case, since the problem
is actionable.

The reason why this problem does not seem to directly impact
production clusters is that there is another check in
`server.NewServer()` that asserts the TLS settings are valid before
constructing the rpc server.
This check was incomplete however, because there is a small
race condition in between the two where the files could
be removed after `server.NewServer()` finds them, and before
`rpc.NewServer()` needs them.

We do not generally like `panic()` calls in production code, so this
needed an improvement anyway.

Release note: None
  • Loading branch information
knz committed Jan 31, 2023
1 parent e4a4340 commit ce8961c
Show file tree
Hide file tree
Showing 20 changed files with 107 additions and 81 deletions.
1 change: 1 addition & 0 deletions pkg/blobs/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ go_test(
"@com_github_cockroachdb_errors//:errors",
"@com_github_cockroachdb_errors//oserror",
"@com_github_stretchr_testify//assert",
"@com_github_stretchr_testify//require",
],
)

Expand Down
26 changes: 12 additions & 14 deletions pkg/blobs/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/netutil"
"github.com/cockroachdb/cockroach/pkg/util/stop"
"github.com/cockroachdb/errors"
"github.com/stretchr/testify/require"
)

func createTestResources(t testing.TB) (string, string, *stop.Stopper, func()) {
Expand All @@ -55,27 +56,24 @@ func setUpService(
localExternalDir string,
remoteExternalDir string,
) BlobClientFactory {
s := rpc.NewServer(rpcContext)
s, err := rpc.NewServer(rpcContext)
require.NoError(t, err)

remoteBlobServer, err := NewBlobService(remoteExternalDir)
if err != nil {
t.Fatal(err)
}
require.NoError(t, err)

blobspb.RegisterBlobServer(s, remoteBlobServer)
ln, err := netutil.ListenAndServeGRPC(rpcContext.Stopper, s, util.TestAddr)
if err != nil {
t.Fatal(err)
}
require.NoError(t, err)

s2 := rpc.NewServer(rpcContext)
s2, err := rpc.NewServer(rpcContext)
require.NoError(t, err)
localBlobServer, err := NewBlobService(localExternalDir)
if err != nil {
t.Fatal(err)
}
require.NoError(t, err)

blobspb.RegisterBlobServer(s2, localBlobServer)
ln2, err := netutil.ListenAndServeGRPC(rpcContext.Stopper, s2, util.TestAddr)
if err != nil {
t.Fatal(err)
}
require.NoError(t, err)

localDialer := nodedialer.New(rpcContext,
func(nodeID roachpb.NodeID) (net.Addr, error) {
Expand Down
12 changes: 8 additions & 4 deletions pkg/ccl/kvccl/kvtenantccl/connector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,8 @@ func TestConnectorGossipSubscription(t *testing.T) {
defer stopper.Stop(ctx)
clock := hlc.NewClockWithSystemTimeSource(time.Nanosecond /* maxOffset */)
rpcContext := rpc.NewInsecureTestingContext(ctx, clock, stopper)
s := rpc.NewServer(rpcContext)
s, err := rpc.NewServer(rpcContext)
require.NoError(t, err)

// Test setting the cluster ID by setting it to nil then ensuring it's later
// set to the original ID value.
Expand Down Expand Up @@ -359,7 +360,8 @@ func TestConnectorRangeLookup(t *testing.T) {
defer stopper.Stop(ctx)
clock := hlc.NewClockWithSystemTimeSource(time.Nanosecond /* maxOffset */)
rpcContext := rpc.NewInsecureTestingContext(ctx, clock, stopper)
s := rpc.NewServer(rpcContext)
s, err := rpc.NewServer(rpcContext)
require.NoError(t, err)

rangeLookupRespC := make(chan *roachpb.RangeLookupResponse, 1)
rangeLookupFn := func(_ context.Context, req *roachpb.RangeLookupRequest) (*roachpb.RangeLookupResponse, error) {
Expand Down Expand Up @@ -444,7 +446,8 @@ func TestConnectorRetriesUnreachable(t *testing.T) {

clock := hlc.NewClockWithSystemTimeSource(time.Nanosecond /* maxOffset */)
rpcContext := rpc.NewInsecureTestingContext(ctx, clock, stopper)
s := rpc.NewServer(rpcContext)
s, err := rpc.NewServer(rpcContext)
require.NoError(t, err)

node1 := &roachpb.NodeDescriptor{NodeID: 1, Address: util.MakeUnresolvedAddr("tcp", "1.1.1.1")}
node2 := &roachpb.NodeDescriptor{NodeID: 2, Address: util.MakeUnresolvedAddr("tcp", "2.2.2.2")}
Expand Down Expand Up @@ -539,7 +542,8 @@ func TestConnectorRetriesError(t *testing.T) {
gossipSubFn func(req *roachpb.GossipSubscriptionRequest, stream roachpb.Internal_GossipSubscriptionServer) error,
rangeLookupFn func(_ context.Context, req *roachpb.RangeLookupRequest) (*roachpb.RangeLookupResponse, error),
) string {
internalServer := rpc.NewServer(rpcContext)
internalServer, err := rpc.NewServer(rpcContext)
require.NoError(t, err)
roachpb.RegisterInternalServer(internalServer, &mockServer{rangeLookupFn: rangeLookupFn, gossipSubFn: gossipSubFn})
ln, err := net.Listen(util.TestAddr.Network(), util.TestAddr.String())
require.NoError(t, err)
Expand Down
3 changes: 2 additions & 1 deletion pkg/ccl/kvccl/kvtenantccl/setting_overrides_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,8 @@ func TestConnectorSettingOverrides(t *testing.T) {
defer stopper.Stop(ctx)
clock := hlc.NewClockWithSystemTimeSource(time.Nanosecond /* maxOffset */)
rpcContext := rpc.NewInsecureTestingContext(ctx, clock, stopper)
s := rpc.NewServer(rpcContext)
s, err := rpc.NewServer(rpcContext)
require.NoError(t, err)

tenantID := roachpb.MustMakeTenantID(5)
gossipSubFn := func(req *roachpb.GossipSubscriptionRequest, stream roachpb.Internal_GossipSubscriptionServer) error {
Expand Down
1 change: 1 addition & 0 deletions pkg/gossip/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ go_test(
"//pkg/util/timeutil",
"//pkg/util/uuid",
"@com_github_cockroachdb_errors//:errors",
"@com_github_stretchr_testify//require",
"@org_golang_google_grpc//:go_default_library",
],
)
Expand Down
35 changes: 15 additions & 20 deletions pkg/gossip/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/stop"
"github.com/cockroachdb/cockroach/pkg/util/uuid"
"github.com/cockroachdb/errors"
"github.com/stretchr/testify/require"
"google.golang.org/grpc"
)

Expand Down Expand Up @@ -58,20 +59,17 @@ func startGossipAtAddr(
rpcContext := rpc.NewInsecureTestingContextWithClusterID(ctx, clock, stopper, clusterID)
rpcContext.NodeID.Set(ctx, nodeID)

server := rpc.NewServer(rpcContext)
server, err := rpc.NewServer(rpcContext)
require.NoError(t, err)
g := NewTest(nodeID, stopper, registry, zonepb.DefaultZoneConfigRef())
RegisterGossipServer(server, g)
ln, err := netutil.ListenAndServeGRPC(stopper, server, addr)
if err != nil {
t.Fatal(err)
}
require.NoError(t, err)
addr = ln.Addr()
if err := g.SetNodeDescriptor(&roachpb.NodeDescriptor{
require.NoError(t, g.SetNodeDescriptor(&roachpb.NodeDescriptor{
NodeID: nodeID,
Address: util.MakeUnresolvedAddr(addr.Network(), addr.String()),
}); err != nil {
t.Fatal(err)
}
}))
g.start(addr)
time.Sleep(time.Millisecond)
return g, rpcContext
Expand Down Expand Up @@ -121,22 +119,20 @@ func startFakeServerGossips(
clock := hlc.NewClockWithSystemTimeSource(time.Nanosecond /* maxOffset */)
lRPCContext := rpc.NewInsecureTestingContextWithClusterID(ctx, clock, stopper, clusterID)

lserver := rpc.NewServer(lRPCContext)
lserver, err := rpc.NewServer(lRPCContext)
require.NoError(t, err)
local := NewTest(localNodeID, stopper, metric.NewRegistry(), zonepb.DefaultZoneConfigRef())
RegisterGossipServer(lserver, local)
lln, err := netutil.ListenAndServeGRPC(stopper, lserver, util.IsolatedTestAddr)
if err != nil {
t.Fatal(err)
}
require.NoError(t, err)
local.start(lln.Addr())

rRPCContext := rpc.NewInsecureTestingContextWithClusterID(ctx, clock, stopper, clusterID)
rserver := rpc.NewServer(rRPCContext)
rserver, err := rpc.NewServer(rRPCContext)
require.NoError(t, err)
remote := newFakeGossipServer(rserver, stopper)
rln, err := netutil.ListenAndServeGRPC(stopper, rserver, util.IsolatedTestAddr)
if err != nil {
t.Fatal(err)
}
require.NoError(t, err)
addr := rln.Addr()
remote.nodeAddr = util.MakeUnresolvedAddr(addr.Network(), addr.String())

Expand Down Expand Up @@ -480,16 +476,15 @@ func TestClientRegisterWithInitNodeID(t *testing.T) {
nodeID := roachpb.NodeID(i + 1)

rpcContext := rpc.NewInsecureTestingContextWithClusterID(ctx, clock, stopper, clusterID)
server := rpc.NewServer(rpcContext)
server, err := rpc.NewServer(rpcContext)
require.NoError(t, err)
// node ID must be non-zero
gnode := NewTest(nodeID, stopper, metric.NewRegistry(), zonepb.DefaultZoneConfigRef())
RegisterGossipServer(server, gnode)
g = append(g, gnode)

ln, err := netutil.ListenAndServeGRPC(stopper, server, util.IsolatedTestAddr)
if err != nil {
t.Fatal(err)
}
require.NoError(t, err)

// Connect to the first gossip node.
if gossipAddr == "" {
Expand Down
8 changes: 4 additions & 4 deletions pkg/gossip/gossip_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/stop"
"github.com/cockroachdb/cockroach/pkg/util/uuid"
"github.com/cockroachdb/errors"
"github.com/stretchr/testify/require"
)

// TestGossipInfoStore verifies operation of gossip instance infostore.
Expand Down Expand Up @@ -706,7 +707,8 @@ func TestGossipJoinTwoClusters(t *testing.T) {
clock := hlc.NewClockWithSystemTimeSource(time.Nanosecond /* maxOffset */)
rpcContext := rpc.NewInsecureTestingContextWithClusterID(ctx, clock, stopper, clusterID)

server := rpc.NewServer(rpcContext)
server, err := rpc.NewServer(rpcContext)
require.NoError(t, err)

// node ID must be non-zero
gnode := NewTest(roachpb.NodeID(i+1), stopper, metric.NewRegistry(), zonepb.DefaultZoneConfigRef())
Expand All @@ -717,9 +719,7 @@ func TestGossipJoinTwoClusters(t *testing.T) {
gnode.clusterID.Set(context.Background(), clusterIDs[i])

ln, err := netutil.ListenAndServeGRPC(stopper, server, util.IsolatedTestAddr)
if err != nil {
t.Fatal(err)
}
require.NoError(t, err)
addrs = append(addrs, ln.Addr())

// Only the third node has addresses.
Expand Down
5 changes: 4 additions & 1 deletion pkg/gossip/simulation/network.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,10 @@ func NewNetwork(

// CreateNode creates a simulation node and starts an RPC server for it.
func (n *Network) CreateNode(defaultZoneConfig *zonepb.ZoneConfig) (*Node, error) {
server := rpc.NewServer(n.RPCContext)
server, err := rpc.NewServer(n.RPCContext)
if err != nil {
return nil, err
}
ln, err := net.Listen(util.IsolatedTestAddr.Network(), util.IsolatedTestAddr.String())
if err != nil {
return nil, err
Expand Down
11 changes: 4 additions & 7 deletions pkg/kv/kvclient/kvcoord/send_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,20 +136,17 @@ func TestSendToOneClient(t *testing.T) {
// checks to avoid log.Fatal.
rpcContext.TestingAllowNamedRPCToAnonymousServer = true

s := rpc.NewServer(rpcContext)
s, err := rpc.NewServer(rpcContext)
require.NoError(t, err)
roachpb.RegisterInternalServer(s, Node(0))
ln, err := netutil.ListenAndServeGRPC(rpcContext.Stopper, s, util.TestAddr)
if err != nil {
t.Fatal(err)
}
require.NoError(t, err)
nodeDialer := nodedialer.New(rpcContext, func(roachpb.NodeID) (net.Addr, error) {
return ln.Addr(), nil
})

reply, err := sendBatch(ctx, t, nil, []net.Addr{ln.Addr()}, rpcContext, nodeDialer)
if err != nil {
t.Fatal(err)
}
require.NoError(t, err)
if reply == nil {
t.Errorf("expected reply")
}
Expand Down
5 changes: 4 additions & 1 deletion pkg/kv/kvserver/closedts/sidetransport/sender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -336,7 +336,10 @@ func newMockSideTransportGRPCServerWithOpts(
}

clock := hlc.NewClockWithSystemTimeSource(time.Nanosecond /* maxOffset */)
grpcServer := rpc.NewServer(rpc.NewInsecureTestingContext(ctx, clock, stopper))
grpcServer, err := rpc.NewServer(rpc.NewInsecureTestingContext(ctx, clock, stopper))
if err != nil {
return nil, err
}
ctpb.RegisterSideTransportServer(grpcServer, receiver)
go func() {
_ /* err */ = grpcServer.Serve(lis)
Expand Down
7 changes: 3 additions & 4 deletions pkg/kv/kvserver/raft_transport_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,8 @@ func (rttc *raftTransportTestContext) AddNode(nodeID roachpb.NodeID) *kvserver.R
func (rttc *raftTransportTestContext) AddNodeWithoutGossip(
nodeID roachpb.NodeID, addr net.Addr, stopper *stop.Stopper,
) (*kvserver.RaftTransport, net.Addr) {
grpcServer := rpc.NewServer(rttc.nodeRPCContext)
grpcServer, err := rpc.NewServer(rttc.nodeRPCContext)
require.NoError(rttc.t, err)
ctwWithTracer := log.MakeTestingAmbientCtxWithNewTracer()
transport := kvserver.NewRaftTransport(
ctwWithTracer,
Expand All @@ -177,9 +178,7 @@ func (rttc *raftTransportTestContext) AddNodeWithoutGossip(
)
rttc.transports[nodeID] = transport
ln, err := netutil.ListenAndServeGRPC(stopper, grpcServer, addr)
if err != nil {
rttc.t.Fatal(err)
}
require.NoError(rttc.t, err)
return transport, ln.Addr()
}

Expand Down
4 changes: 3 additions & 1 deletion pkg/kv/kvserver/raft_transport_unit_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/cockroach/pkg/util/uuid"
"github.com/cockroachdb/errors"
"github.com/stretchr/testify/require"
)

func TestRaftTransportStartNewQueue(t *testing.T) {
Expand All @@ -55,7 +56,8 @@ func TestRaftTransportStartNewQueue(t *testing.T) {

// mrs := &dummyMultiRaftServer{}

grpcServer := rpc.NewServer(rpcC)
grpcServer, err := rpc.NewServer(rpcC)
require.NoError(t, err)
// RegisterMultiRaftServer(grpcServer, mrs)

var addr net.Addr
Expand Down
3 changes: 2 additions & 1 deletion pkg/kv/kvserver/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,8 @@ func createTestStoreWithoutStart(
Settings: cfg.Settings,
})
stopper.SetTracer(cfg.AmbientCtx.Tracer)
server := rpc.NewServer(rpcContext) // never started
server, err := rpc.NewServer(rpcContext) // never started
require.NoError(t, err)

// Some tests inject their own Gossip and StorePool, via
// createTestAllocatorWithKnobs, at the time of writing
Expand Down
16 changes: 9 additions & 7 deletions pkg/rpc/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,9 +154,9 @@ func WithInterceptor(f func(fullMethod string) error) ServerOption {
// NewServer sets up an RPC server. Depending on the ServerOptions, the Server
// either expects incoming connections from KV nodes, or from tenant SQL
// servers.
func NewServer(rpcCtx *Context, opts ...ServerOption) *grpc.Server {
srv, _ /* interceptors */ := NewServerEx(rpcCtx, opts...)
return srv
func NewServer(rpcCtx *Context, opts ...ServerOption) (*grpc.Server, error) {
srv, _ /* interceptors */, err := NewServerEx(rpcCtx, opts...)
return srv, err
}

// ServerInterceptorInfo contains the server-side interceptors that a server
Expand All @@ -181,7 +181,9 @@ type ClientInterceptorInfo struct {
// been registered with gRPC for the server. These interceptors can be used
// manually when bypassing gRPC to call into the server (like the
// internalClientAdapter does).
func NewServerEx(rpcCtx *Context, opts ...ServerOption) (*grpc.Server, ServerInterceptorInfo) {
func NewServerEx(
rpcCtx *Context, opts ...ServerOption,
) (s *grpc.Server, sii ServerInterceptorInfo, err error) {
var o serverOpts
for _, f := range opts {
f(&o)
Expand Down Expand Up @@ -210,7 +212,7 @@ func NewServerEx(rpcCtx *Context, opts ...ServerOption) (*grpc.Server, ServerInt
if !rpcCtx.Config.Insecure {
tlsConfig, err := rpcCtx.GetServerTLSConfig()
if err != nil {
panic(err)
return nil, sii, err
}
grpcOpts = append(grpcOpts, grpc.Creds(credentials.NewTLS(tlsConfig)))
}
Expand Down Expand Up @@ -279,12 +281,12 @@ func NewServerEx(rpcCtx *Context, opts ...ServerOption) (*grpc.Server, ServerInt
grpcOpts = append(grpcOpts, grpc.ChainUnaryInterceptor(unaryInterceptor...))
grpcOpts = append(grpcOpts, grpc.ChainStreamInterceptor(streamInterceptor...))

s := grpc.NewServer(grpcOpts...)
s = grpc.NewServer(grpcOpts...)
RegisterHeartbeatServer(s, rpcCtx.NewHeartbeatService())
return s, ServerInterceptorInfo{
UnaryInterceptors: unaryInterceptor,
StreamInterceptors: streamInterceptor,
}
}, nil
}

// Connection is a wrapper around grpc.ClientConn. It prevents the underlying
Expand Down
Loading

0 comments on commit ce8961c

Please sign in to comment.