Skip to content

Commit

Permalink
rpc: use tenant client/server certs where appropriate
Browse files Browse the repository at this point in the history
With this PR, tenant SQL servers use tenant client certificates to
connect to the tenant server endpoint at the KV layer. They were
previously using the KV-internal node certs.

No validation is performed as of this PR, but this is the obvious
next step.

Follow-up work will assertions that make sure that we don't see
tenants "accidentally" use the node certs for some operations
when they are available (as is typically the case during testing).

Finally, there will be some work on the heartbeats exchanged by
the RPC context. We don't want a SQL tenant's time signal to
ever trigger KV nodes to crash, for example.

Touches cockroachdb#47898.

Release note: None
  • Loading branch information
tbg authored and nvanbenschoten committed Jul 28, 2020
1 parent 4b1a4d3 commit a543c0f
Show file tree
Hide file tree
Showing 5 changed files with 97 additions and 29 deletions.
94 changes: 70 additions & 24 deletions pkg/rpc/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,19 +142,46 @@ func requireSuperUser(ctx context.Context) error {
return nil
}

// NewServer is a thin wrapper around grpc.NewServer that registers a heartbeat
// service.
func NewServer(ctx *Context) *grpc.Server {
return NewServerWithInterceptor(ctx, nil)
}

// NewServerWithInterceptor is like NewServer, but accepts an additional
// interceptor which is called before streaming and unary RPCs and may inject an
// error.
func NewServerWithInterceptor(
ctx *Context, interceptor func(fullMethod string) error,
) *grpc.Server {
opts := []grpc.ServerOption{
type serverOpts struct {
interceptor func(fullMethod string) error
tenant bool
}

// ServerOption is a configuration option passed to NewServer.
type ServerOption func(*serverOpts)

// ForTenant is an option to NewServer that results in the server being set
// up to validate incoming tenants. Without this option, the created server will
// use the KV-internal node certificates. With it, it uses tenant server
// certificates.
func ForTenant(opts *serverOpts) {
opts.tenant = true
}

var _ ServerOption = ForTenant

// WithInterceptor adds an additional interceptor. The interceptor is called before
// streaming and unary RPCs and may inject an error.
//
// This option can only be used once (i.e. interceptors can not be chained).
func WithInterceptor(f func(fullMethod string) error) ServerOption {
return func(opts *serverOpts) {
if opts.interceptor != nil {
panic("interceptor can only be set once")
}
opts.interceptor = f
}
}

// NewServer sets up an RPC server. Depending on the ServerOptions, the Server
// either expects incoming connections from KV nodes, or from tenant SQL
// servers.
func NewServer(ctx *Context, opts ...ServerOption) *grpc.Server {
var o serverOpts
for _, f := range opts {
f(&o)
}
grpcOpts := []grpc.ServerOption{
// The limiting factor for lowering the max message size is the fact
// that a single large kv can be sent over the network in one message.
// Our maximum kv size is unlimited, so we need this to be very large.
Expand All @@ -178,11 +205,17 @@ func NewServerWithInterceptor(
grpc.StatsHandler(&ctx.stats),
}
if !ctx.Config.Insecure {
tlsConfig, err := ctx.GetServerTLSConfig()
var tlsConfig *tls.Config
var err error
if !o.tenant {
tlsConfig, err = ctx.GetServerTLSConfig()
} else {
tlsConfig, err = ctx.GetTenantServerTLSConfig()
}
if err != nil {
panic(err)
}
opts = append(opts, grpc.Creds(credentials.NewTLS(tlsConfig)))
grpcOpts = append(grpcOpts, grpc.Creds(credentials.NewTLS(tlsConfig)))
}

// These interceptors will be called in the order in which they appear, i.e.
Expand All @@ -191,29 +224,42 @@ func NewServerWithInterceptor(
var streamInterceptor []grpc.StreamServerInterceptor

if !ctx.Config.Insecure {
var authHook func(ctx context.Context) error
if !o.tenant {
authHook = requireSuperUser
} else {
authHook = func(ctx context.Context) error {
// TODO(tbg): pull the tenant ID from the incoming
// certificate and validate that the request is
// admissible.
return nil
}
}

// TODO(tbg): do something else if `o.tenant`.
unaryInterceptor = append(unaryInterceptor, func(
ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler,
) (interface{}, error) {
if err := requireSuperUser(ctx); err != nil {
if err := authHook(ctx); err != nil {
return nil, err
}
return handler(ctx, req)
})
streamInterceptor = append(streamInterceptor, func(
srv interface{}, stream grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler,
) error {
if err := requireSuperUser(stream.Context()); err != nil {
if err := authHook(stream.Context()); err != nil {
return err
}
return handler(srv, stream)
})
}

if interceptor != nil {
if o.interceptor != nil {
unaryInterceptor = append(unaryInterceptor, func(
ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler,
) (interface{}, error) {
if err := interceptor(info.FullMethod); err != nil {
if err := o.interceptor(info.FullMethod); err != nil {
return nil, err
}
return handler(ctx, req)
Expand All @@ -222,7 +268,7 @@ func NewServerWithInterceptor(
streamInterceptor = append(streamInterceptor, func(
srv interface{}, stream grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler,
) error {
if err := interceptor(info.FullMethod); err != nil {
if err := o.interceptor(info.FullMethod); err != nil {
return err
}
return handler(srv, stream)
Expand Down Expand Up @@ -251,10 +297,10 @@ func NewServerWithInterceptor(
// https://github.com/grpc-ecosystem/go-grpc-middleware/tree/master/tracing/opentracing
}

opts = append(opts, grpc.ChainUnaryInterceptor(unaryInterceptor...))
opts = append(opts, grpc.ChainStreamInterceptor(streamInterceptor...))
grpcOpts = append(grpcOpts, grpc.ChainUnaryInterceptor(unaryInterceptor...))
grpcOpts = append(grpcOpts, grpc.ChainStreamInterceptor(streamInterceptor...))

s := grpc.NewServer(opts...)
s := grpc.NewServer(grpcOpts...)
RegisterHeartbeatServer(s, &HeartbeatService{
clock: ctx.Clock,
remoteClockMonitor: ctx.RemoteClocks,
Expand Down Expand Up @@ -721,7 +767,7 @@ func (ctx *Context) grpcDialOptions(
var tlsConfig *tls.Config
// TODO(tbg): remove this override when the KV layer can authenticate tenant
// client certs.
const override = true
const override = false
if override || ctx.tenID == roachpb.SystemTenantID {
tlsConfig, err = ctx.GetClientTLSConfig()
} else {
Expand Down
21 changes: 21 additions & 0 deletions pkg/rpc/tls.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,27 @@ func (ctx *SecurityContext) GetServerTLSConfig() (*tls.Config, error) {
return tlsCfg, nil
}

// GetTenantServerTLSConfig returns the tenant server TLS config, initializing
// it if needed. If Insecure is true, return a nil config, otherwise asks the
// certificate manager for the tenant server TLS config.
func (ctx *SecurityContext) GetTenantServerTLSConfig() (*tls.Config, error) {
// Early out.
if ctx.config.Insecure {
return nil, nil
}

cm, err := ctx.GetCertificateManager()
if err != nil {
return nil, wrapError(err)
}

tlsCfg, err := cm.GetTenantServerTLSConfig()
if err != nil {
return nil, wrapError(err)
}
return tlsCfg, nil
}

// GetClientTLSConfig returns the client TLS config, initializing it if needed.
// If Insecure is true, return a nil config, otherwise ask the certificate
// manager for a TLS config using certs for the config.User.
Expand Down
7 changes: 4 additions & 3 deletions pkg/server/grpc_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,13 @@ type grpcServer struct {
mode serveMode
}

func newGRPCServer(rpcCtx *rpc.Context) *grpcServer {
func newGRPCServer(rpcCtx *rpc.Context, opts ...rpc.ServerOption) *grpcServer {
s := &grpcServer{}
s.mode.set(modeInitializing)
s.Server = rpc.NewServerWithInterceptor(rpcCtx, func(path string) error {
opts = append(opts, rpc.WithInterceptor(func(path string) error {
return s.intercept(path)
})
}))
s.Server = rpc.NewServer(rpcCtx, opts...)
return s
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -332,7 +332,7 @@ func NewServer(cfg Config, stopper *stop.Stopper) (*Server, error) {
// TODO(tbg): pass a different rpcContext here.
var tenantGRPC *grpcServer
if cfg.SplitListenTenant {
tenantGRPC = newGRPCServer(rpcContext)
tenantGRPC = newGRPCServer(rpcContext, rpc.ForTenant)
}

g := gossip.New(
Expand Down
2 changes: 1 addition & 1 deletion pkg/server/testserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -634,7 +634,7 @@ func (ts *TestServer) StartTenant(params base.TestTenantArgs) (pgAddr string, _
ClusterSettingsUpdater: st.MakeUpdater(),
}
}
sqlCfg.TenantKVAddrs = []string{ts.RPCAddr()}
sqlCfg.TenantKVAddrs = []string{ts.TenantAddr()}
return StartTenant(
ctx,
ts.Stopper(),
Expand Down

0 comments on commit a543c0f

Please sign in to comment.