Skip to content

Commit

Permalink
rpc: use tenant client/server certs where appropriate
Browse files Browse the repository at this point in the history
With this PR, tenant SQL servers use tenant client certificates to
connect to the tenant server endpoint at the KV layer. They were
previously using the KV-internal node certs.

No validation is performed as of this PR, but this is the obvious
next step.

Follow-up work will assertions that make sure that we don't see
tenants "accidentally" use the node certs for some operations
when they are available (as is typically the case during testing).

Finally, there will be some work on the heartbeats exchanged by
the RPC context. We don't want a SQL tenant's time signal to
ever trigger KV nodes to crash, for example.

Touches cockroachdb#47898.

Release note: None
  • Loading branch information
tbg committed Aug 4, 2020
1 parent 98866a4 commit b0c4384
Show file tree
Hide file tree
Showing 12 changed files with 181 additions and 65 deletions.
6 changes: 4 additions & 2 deletions pkg/base/test_server_args.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,10 @@ type TestServerArgs struct {
Addr string
// SQLAddr (if nonempty) is the SQL address to use for the test server.
SQLAddr string
// TenantAddr (if nonempty) is the tenant KV address to use for the test server.
TenantAddr string
// TenantAddr is the tenant KV address to use for the test server. If this
// is nil, the tenant server will be set up using a random port. If this
// is the empty string, no tenant server will be set up.
TenantAddr *string
// HTTPAddr (if nonempty) is the HTTP address to use for the test server.
HTTPAddr string
// DisableTLSForHTTP if set, disables TLS for the HTTP interface.
Expand Down
5 changes: 5 additions & 0 deletions pkg/cli/cli_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,11 @@ func createTestCerts(certsDir string) (cleanup func() error) {
filepath.Join(security.EmbeddedCertsDir, security.EmbeddedNodeKey),
filepath.Join(security.EmbeddedCertsDir, security.EmbeddedRootCert),
filepath.Join(security.EmbeddedCertsDir, security.EmbeddedRootKey),

filepath.Join(security.EmbeddedCertsDir, security.EmbeddedTenantServerCACert),
filepath.Join(security.EmbeddedCertsDir, security.EmbeddedTenantServerCert),
filepath.Join(security.EmbeddedCertsDir, security.EmbeddedTenantServerKey),
filepath.Join(security.EmbeddedCertsDir, security.EmbeddedTenantClientCACert),
}

for _, a := range assets {
Expand Down
3 changes: 3 additions & 0 deletions pkg/cli/demo_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -312,6 +312,9 @@ func testServerArgsForTransientCluster(
SQLMemoryPoolSize: demoCtx.sqlPoolMemorySize,
CacheSize: demoCtx.cacheSize,
NoAutoInitializeCluster: true,
// This disables the tenant server. We could enable it but would have to
// generate the suitable certs at the caller who wishes to do so.
TenantAddr: new(string),
}

if demoCtx.localities != nil {
Expand Down
2 changes: 2 additions & 0 deletions pkg/cli/demo_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ func TestTestServerArgsForTransientCluster(t *testing.T) {
SQLMemoryPoolSize: 2 << 10,
CacheSize: 1 << 10,
NoAutoInitializeCluster: true,
TenantAddr: new(string),
},
},
{
Expand All @@ -59,6 +60,7 @@ func TestTestServerArgsForTransientCluster(t *testing.T) {
SQLMemoryPoolSize: 4 << 10,
CacheSize: 4 << 10,
NoAutoInitializeCluster: true,
TenantAddr: new(string),
},
},
}
Expand Down
96 changes: 69 additions & 27 deletions pkg/rpc/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,19 +142,46 @@ func requireSuperUser(ctx context.Context) error {
return nil
}

// NewServer is a thin wrapper around grpc.NewServer that registers a heartbeat
// service.
func NewServer(ctx *Context) *grpc.Server {
return NewServerWithInterceptor(ctx, nil)
}

// NewServerWithInterceptor is like NewServer, but accepts an additional
// interceptor which is called before streaming and unary RPCs and may inject an
// error.
func NewServerWithInterceptor(
ctx *Context, interceptor func(fullMethod string) error,
) *grpc.Server {
opts := []grpc.ServerOption{
type serverOpts struct {
interceptor func(fullMethod string) error
tenant bool
}

// ServerOption is a configuration option passed to NewServer.
type ServerOption func(*serverOpts)

// ForTenant is an option to NewServer that results in the server being set
// up to validate incoming tenants. Without this option, the created server will
// use the KV-internal node certificates. With it, it uses tenant server
// certificates.
func ForTenant(opts *serverOpts) {
opts.tenant = true
}

var _ ServerOption = ForTenant

// WithInterceptor adds an additional interceptor. The interceptor is called before
// streaming and unary RPCs and may inject an error.
//
// This option can only be used once (i.e. interceptors can not be chained).
func WithInterceptor(f func(fullMethod string) error) ServerOption {
return func(opts *serverOpts) {
if opts.interceptor != nil {
panic("interceptor can only be set once")
}
opts.interceptor = f
}
}

// NewServer sets up an RPC server. Depending on the ServerOptions, the Server
// either expects incoming connections from KV nodes, or from tenant SQL
// servers.
func NewServer(ctx *Context, opts ...ServerOption) *grpc.Server {
var o serverOpts
for _, f := range opts {
f(&o)
}
grpcOpts := []grpc.ServerOption{
// The limiting factor for lowering the max message size is the fact
// that a single large kv can be sent over the network in one message.
// Our maximum kv size is unlimited, so we need this to be very large.
Expand All @@ -178,11 +205,17 @@ func NewServerWithInterceptor(
grpc.StatsHandler(&ctx.stats),
}
if !ctx.Config.Insecure {
tlsConfig, err := ctx.GetServerTLSConfig()
var tlsConfig *tls.Config
var err error
if !o.tenant {
tlsConfig, err = ctx.GetServerTLSConfig()
} else {
tlsConfig, err = ctx.GetTenantServerTLSConfig()
}
if err != nil {
panic(err)
}
opts = append(opts, grpc.Creds(credentials.NewTLS(tlsConfig)))
grpcOpts = append(grpcOpts, grpc.Creds(credentials.NewTLS(tlsConfig)))
}

// These interceptors will be called in the order in which they appear, i.e.
Expand All @@ -191,29 +224,41 @@ func NewServerWithInterceptor(
var streamInterceptor []grpc.StreamServerInterceptor

if !ctx.Config.Insecure {
var authHook func(ctx context.Context) error
if !o.tenant {
authHook = requireSuperUser
} else {
authHook = func(ctx context.Context) error {
// TODO(tbg): pull the tenant ID from the incoming
// certificate and validate that the request is
// admissible.
return nil
}
}

unaryInterceptor = append(unaryInterceptor, func(
ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler,
) (interface{}, error) {
if err := requireSuperUser(ctx); err != nil {
if err := authHook(ctx); err != nil {
return nil, err
}
return handler(ctx, req)
})
streamInterceptor = append(streamInterceptor, func(
srv interface{}, stream grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler,
) error {
if err := requireSuperUser(stream.Context()); err != nil {
if err := authHook(stream.Context()); err != nil {
return err
}
return handler(srv, stream)
})
}

if interceptor != nil {
if o.interceptor != nil {
unaryInterceptor = append(unaryInterceptor, func(
ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler,
) (interface{}, error) {
if err := interceptor(info.FullMethod); err != nil {
if err := o.interceptor(info.FullMethod); err != nil {
return nil, err
}
return handler(ctx, req)
Expand All @@ -222,7 +267,7 @@ func NewServerWithInterceptor(
streamInterceptor = append(streamInterceptor, func(
srv interface{}, stream grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler,
) error {
if err := interceptor(info.FullMethod); err != nil {
if err := o.interceptor(info.FullMethod); err != nil {
return err
}
return handler(srv, stream)
Expand Down Expand Up @@ -251,10 +296,10 @@ func NewServerWithInterceptor(
// https://github.com/grpc-ecosystem/go-grpc-middleware/tree/master/tracing/opentracing
}

opts = append(opts, grpc.ChainUnaryInterceptor(unaryInterceptor...))
opts = append(opts, grpc.ChainStreamInterceptor(streamInterceptor...))
grpcOpts = append(grpcOpts, grpc.ChainUnaryInterceptor(unaryInterceptor...))
grpcOpts = append(grpcOpts, grpc.ChainStreamInterceptor(streamInterceptor...))

s := grpc.NewServer(opts...)
s := grpc.NewServer(grpcOpts...)
RegisterHeartbeatServer(s, &HeartbeatService{
clock: ctx.Clock,
remoteClockMonitor: ctx.RemoteClocks,
Expand Down Expand Up @@ -719,10 +764,7 @@ func (ctx *Context) grpcDialOptions(
} else {
var err error
var tlsConfig *tls.Config
// TODO(tbg): remove this override when the KV layer can authenticate tenant
// client certs.
const override = true
if override || ctx.tenID == roachpb.SystemTenantID {
if ctx.tenID == roachpb.SystemTenantID {
tlsConfig, err = ctx.GetClientTLSConfig()
} else {
tlsConfig, err = ctx.GetTenantClientTLSConfig()
Expand Down
21 changes: 21 additions & 0 deletions pkg/rpc/tls.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,27 @@ func (ctx *SecurityContext) GetServerTLSConfig() (*tls.Config, error) {
return tlsCfg, nil
}

// GetTenantServerTLSConfig returns the tenant server TLS config, initializing
// it if needed. If Insecure is true, return a nil config, otherwise asks the
// certificate manager for the tenant server TLS config.
func (ctx *SecurityContext) GetTenantServerTLSConfig() (*tls.Config, error) {
// Early out.
if ctx.config.Insecure {
return nil, nil
}

cm, err := ctx.GetCertificateManager()
if err != nil {
return nil, wrapError(err)
}

tlsCfg, err := cm.GetTenantServerTLSConfig()
if err != nil {
return nil, wrapError(err)
}
return tlsCfg, nil
}

// GetClientTLSConfig returns the client TLS config, initializing it if needed.
// If Insecure is true, return a nil config, otherwise ask the certificate
// manager for a TLS config using certs for the config.User.
Expand Down
2 changes: 1 addition & 1 deletion pkg/security/certificate_loader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ func TestLoadEmbeddedCerts(t *testing.T) {

// Check that all non-CA pairs include a key.
for _, c := range certs {
if c.FileUsage == security.CAPem {
if c.FileUsage == security.CAPem || c.FileUsage == security.TenantServerCAPem || c.FileUsage == security.TenantClientCAPem {
if len(c.KeyFilename) != 0 {
t.Errorf("CA key was loaded for CertInfo %+v", c)
}
Expand Down
83 changes: 57 additions & 26 deletions pkg/security/certs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -247,25 +247,64 @@ func TestGenerateNodeCerts(t *testing.T) {
// node.crt: dual-purpose node certificate
// client.root.crt: client certificate for the root user.
func generateBaseCerts(certsDir string) error {
if err := security.CreateCAPair(
certsDir, filepath.Join(certsDir, security.EmbeddedCAKey),
testKeySize, time.Hour*96, true, true,
); err != nil {
return errors.Errorf("could not generate CA pair: %v", err)
{
caKey := filepath.Join(certsDir, security.EmbeddedCAKey)

if err := security.CreateCAPair(
certsDir, caKey,
testKeySize, time.Hour*96, true, true,
); err != nil {
return err
}

if err := security.CreateNodePair(
certsDir, caKey,
testKeySize, time.Hour*48, true, []string{"127.0.0.1"},
); err != nil {
return err
}

if err := security.CreateClientPair(
certsDir, caKey,
testKeySize, time.Hour*48, true, security.RootUser, false,
); err != nil {
return err
}
}

if err := security.CreateNodePair(
certsDir, filepath.Join(certsDir, security.EmbeddedCAKey),
testKeySize, time.Hour*48, true, []string{"127.0.0.1"},
); err != nil {
return errors.Errorf("could not generate Node pair: %v", err)
{
caKey := filepath.Join(certsDir, security.EmbeddedTenantServerCAKey)
if err := security.CreateTenantServerCAPair(
certsDir, caKey,
testKeySize, time.Hour*96, true, true,
); err != nil {
return err
}

if err := security.CreateTenantServerPair(certsDir, caKey,
testKeySize, time.Hour*48, true, []string{"127.0.0.1"},
); err != nil {
return err
}
}

if err := security.CreateClientPair(
certsDir, filepath.Join(certsDir, security.EmbeddedCAKey),
testKeySize, time.Hour*48, true, security.RootUser, false,
); err != nil {
return errors.Errorf("could not generate Client pair: %v", err)
{
caKey := filepath.Join(certsDir, security.EmbeddedTenantClientCAKey)
if err := security.CreateTenantClientCAPair(
certsDir, caKey,
testKeySize, time.Hour*96, true, true,
); err != nil {
return err
}

tcp, err := security.CreateTenantClientPair(certsDir, caKey,
testKeySize, time.Hour*48, "10")
if err != nil {
return err
}
if err := security.WriteTenantClientPair(certsDir, tcp, true); err != nil {
return err
}
}

return nil
Expand All @@ -278,19 +317,11 @@ func generateBaseCerts(certsDir string) error {
// client.node.crt: node client cert: signed by ca-client.crt
// client.root.crt: root client cert: signed by ca-client.crt
func generateSplitCACerts(certsDir string) error {
if err := security.CreateCAPair(
certsDir, filepath.Join(certsDir, security.EmbeddedCAKey),
testKeySize, time.Hour*96, true, true,
); err != nil {
return errors.Errorf("could not generate CA pair: %v", err)
if err := generateBaseCerts(certsDir); err != nil {
return err
}

if err := security.CreateNodePair(
certsDir, filepath.Join(certsDir, security.EmbeddedCAKey),
testKeySize, time.Hour*48, true, []string{"127.0.0.1"},
); err != nil {
return errors.Errorf("could not generate Node pair: %v", err)
}
// Overwrite those certs that we want to split.

if err := security.CreateClientCAPair(
certsDir, filepath.Join(certsDir, security.EmbeddedClientCAKey),
Expand Down
3 changes: 3 additions & 0 deletions pkg/security/tls.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,14 @@ const (
// See 'securitytest/test_certs/regenerate.sh'.
var EmbeddedTenantIDs = func() []uint64 { return []uint64{10, 11, 20} }

// Embedded certificates specific to multi-tenancy testing.
const (
EmbeddedTenantServerCACert = "ca-server-tenant.crt" // CA for tenant server (KV server)
EmbeddedTenantServerCAKey = "ca-server-tenant.key" // CA for tenant server (KV server)
EmbeddedTenantServerCert = "server-tenant.crt" // tenant server (KV server) cert
EmbeddedTenantServerKey = "server-tenant.key" // tenant server (KV server) key
EmbeddedTenantClientCACert = "ca-client-tenant.crt" // CA for client connections (auth broker)
EmbeddedTenantClientCAKey = "ca-client-tenant.key" // CA for client connections (auth broker)
EmbeddedTenantClientCert = "client-tenant.123456789.crt" // tenant client cert (SQL server)
EmbeddedTenantClientKey = "client-tenant.123456789.key" // tenant client key (SQL server)
)
Expand Down
7 changes: 4 additions & 3 deletions pkg/server/grpc_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,13 @@ type grpcServer struct {
mode serveMode
}

func newGRPCServer(rpcCtx *rpc.Context) *grpcServer {
func newGRPCServer(rpcCtx *rpc.Context, opts ...rpc.ServerOption) *grpcServer {
s := &grpcServer{}
s.mode.set(modeInitializing)
s.Server = rpc.NewServerWithInterceptor(rpcCtx, func(path string) error {
opts = append(opts, rpc.WithInterceptor(func(path string) error {
return s.intercept(path)
})
}))
s.Server = rpc.NewServer(rpcCtx, opts...)
return s
}

Expand Down
3 changes: 1 addition & 2 deletions pkg/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -329,10 +329,9 @@ func NewServer(cfg Config, stopper *stop.Stopper) (*Server, error) {

grpc := newGRPCServer(rpcContext)

// TODO(tbg): pass a different rpcContext here.
var tenantGRPC *grpcServer
if cfg.SplitListenTenant {
tenantGRPC = newGRPCServer(rpcContext)
tenantGRPC = newGRPCServer(rpcContext, rpc.ForTenant)
}

g := gossip.New(
Expand Down
Loading

0 comments on commit b0c4384

Please sign in to comment.