Skip to content

Commit

Permalink
Merge #51503 #52281
Browse files Browse the repository at this point in the history
51503: rpc: use tenant client/server certs r=nvanbenschoten a=tbg


With this PR, tenant SQL servers use tenant client certificates to
connect to the tenant server endpoint at the KV layer. They were
previously using the KV-internal node certs.

No validation is performed as of this PR, but this is the obvious
next step.

Follow-up work will assertions that make sure that we don't see
tenants "accidentally" use the node certs for some operations
when they are available (as is typically the case during testing).

Finally, there will be some work on the heartbeats exchanged by
the RPC context. We don't want a SQL tenant's time signal to
ever trigger KV nodes to crash, for example.

Touches #47898.

Release note: None

- cli/flags,config: new flag for tenant KV listen addr
- sql: route tenant KV traffic to tenant KV address
- roachtest: configure --tenant-addr flag in acceptance/multitenant
- rpc: add TenantID to rpc.ContextOptions
- security: slight test improvement
- rpc: pass TenantID to SecurityContext to CertManager
- security: use a single test_certs dir
- security: embed certs for a few hard-coded tenants
- rpc: *almost* use tenant client certs (on tenants)
- rpc: use tenant client/server certs where appropriate


52281: bulkio: Correctly handle exhausting retries when reading from HTTP. r=rohany a=miretskiy

Fixes #52279

Return an error if we exhaust the retry budget when reading from HTTP.

Release Notes: None

Co-authored-by: Tobias Schottdorf <[email protected]>
Co-authored-by: Yevgeniy Miretskiy <[email protected]>
  • Loading branch information
3 people committed Aug 4, 2020
3 parents 71e1113 + b0c4384 + eec1064 commit dd21d90
Show file tree
Hide file tree
Showing 72 changed files with 1,071 additions and 623 deletions.
1 change: 1 addition & 0 deletions pkg/acceptance/localcluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -272,6 +272,7 @@ func (c *Cluster) makeNode(ctx context.Context, nodeIdx int, cfg NodeConfig) (*N
Insecure: true,
}
rpcCtx := rpc.NewContext(rpc.ContextOptions{
TenantID: roachpb.SystemTenantID,
AmbientCtx: log.AmbientContext{Tracer: tracing.NewTracer()},
Config: baseCtx,
Clock: hlc.NewClock(hlc.UnixNano, 0),
Expand Down
6 changes: 4 additions & 2 deletions pkg/base/test_server_args.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,10 @@ type TestServerArgs struct {
Addr string
// SQLAddr (if nonempty) is the SQL address to use for the test server.
SQLAddr string
// TenantAddr (if nonempty) is the tenant KV address to use for the test server.
TenantAddr string
// TenantAddr is the tenant KV address to use for the test server. If this
// is nil, the tenant server will be set up using a random port. If this
// is the empty string, no tenant server will be set up.
TenantAddr *string
// HTTPAddr (if nonempty) is the HTTP address to use for the test server.
HTTPAddr string
// DisableTLSForHTTP if set, disables TLS for the HTTP interface.
Expand Down
3 changes: 3 additions & 0 deletions pkg/ccl/backupccl/backup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4613,6 +4613,9 @@ func TestBackupRestoreTenant(t *testing.T) {
defer cleanupFn()
srv := tc.Server(0)

// NB: tenant certs for 10, 11, 20 are embedded. See:
_ = security.EmbeddedTenantIDs()

// Setup a few tenants, each with a different table.
conn10 := serverutils.StartTenant(t, srv, base.TestTenantArgs{TenantID: roachpb.MakeTenantID(10), TenantInfo: []byte("ten")})
defer conn10.Close()
Expand Down
7 changes: 6 additions & 1 deletion pkg/ccl/serverccl/server_sql_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/security"
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgcode"
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
"github.com/cockroachdb/cockroach/pkg/testutils/sqlutils"
Expand Down Expand Up @@ -42,7 +43,11 @@ func TestSQLServer(t *testing.T) {
tc := serverutils.StartTestCluster(t, 3, base.TestClusterArgs{})
defer tc.Stopper().Stop(ctx)

db := serverutils.StartTenant(t, tc.Server(0), base.TestTenantArgs{TenantID: roachpb.MakeTenantID(10)})
db := serverutils.StartTenant(
t,
tc.Server(0),
base.TestTenantArgs{TenantID: roachpb.MakeTenantID(security.EmbeddedTenantIDs()[0])},
)
defer db.Close()
r := sqlutils.MakeSQLRunner(db)
r.QueryStr(t, `SELECT 1`)
Expand Down
5 changes: 5 additions & 0 deletions pkg/cli/cli_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,11 @@ func createTestCerts(certsDir string) (cleanup func() error) {
filepath.Join(security.EmbeddedCertsDir, security.EmbeddedNodeKey),
filepath.Join(security.EmbeddedCertsDir, security.EmbeddedRootCert),
filepath.Join(security.EmbeddedCertsDir, security.EmbeddedRootKey),

filepath.Join(security.EmbeddedCertsDir, security.EmbeddedTenantServerCACert),
filepath.Join(security.EmbeddedCertsDir, security.EmbeddedTenantServerCert),
filepath.Join(security.EmbeddedCertsDir, security.EmbeddedTenantServerKey),
filepath.Join(security.EmbeddedCertsDir, security.EmbeddedTenantClientCACert),
}

for _, a := range assets {
Expand Down
3 changes: 2 additions & 1 deletion pkg/cli/client_url.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"strings"

"github.com/cockroachdb/cockroach/pkg/cli/cliflags"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/rpc"
"github.com/cockroachdb/cockroach/pkg/security"
"github.com/cockroachdb/errors"
Expand Down Expand Up @@ -351,7 +352,7 @@ func (cliCtx *cliContext) makeClientConnURL() (url.URL, error) {
if userName == "" {
userName = security.RootUser
}
sCtx := rpc.MakeSecurityContext(cliCtx.Config)
sCtx := rpc.MakeSecurityContext(cliCtx.Config, roachpb.SystemTenantID)
if err := sCtx.LoadSecurityOptions(
opts, userName,
); err != nil {
Expand Down
3 changes: 3 additions & 0 deletions pkg/cli/demo_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -312,6 +312,9 @@ func testServerArgsForTransientCluster(
SQLMemoryPoolSize: demoCtx.sqlPoolMemorySize,
CacheSize: demoCtx.cacheSize,
NoAutoInitializeCluster: true,
// This disables the tenant server. We could enable it but would have to
// generate the suitable certs at the caller who wishes to do so.
TenantAddr: new(string),
}

if demoCtx.localities != nil {
Expand Down
2 changes: 2 additions & 0 deletions pkg/cli/demo_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ func TestTestServerArgsForTransientCluster(t *testing.T) {
SQLMemoryPoolSize: 2 << 10,
CacheSize: 1 << 10,
NoAutoInitializeCluster: true,
TenantAddr: new(string),
},
},
{
Expand All @@ -59,6 +60,7 @@ func TestTestServerArgsForTransientCluster(t *testing.T) {
SQLMemoryPoolSize: 4 << 10,
CacheSize: 4 << 10,
NoAutoInitializeCluster: true,
TenantAddr: new(string),
},
},
}
Expand Down
6 changes: 4 additions & 2 deletions pkg/cli/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/build"
"github.com/cockroachdb/cockroach/pkg/cli/cliflags"
"github.com/cockroachdb/cockroach/pkg/geo/geos"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/rpc"
"github.com/cockroachdb/cockroach/pkg/security"
"github.com/cockroachdb/cockroach/pkg/server"
Expand Down Expand Up @@ -655,7 +656,7 @@ func runStart(cmd *cobra.Command, args []string, disableReplication bool) error
// (Re-)compute the client connection URL. We cannot do this
// earlier (e.g. above, in the runStart function) because
// at this time the address and port have not been resolved yet.
sCtx := rpc.MakeSecurityContext(serverCfg.Config)
sCtx := rpc.MakeSecurityContext(serverCfg.Config, roachpb.SystemTenantID)
pgURL, err := sCtx.PGURL(url.User(security.RootUser))
if err != nil {
log.Errorf(ctx, "failed computing the URL: %v", err)
Expand Down Expand Up @@ -828,7 +829,7 @@ If problems persist, please see %s.`
// (Re-)compute the client connection URL. We cannot do this
// earlier (e.g. above, in the runStart function) because
// at this time the address and port have not been resolved yet.
sCtx := rpc.MakeSecurityContext(serverCfg.Config)
sCtx := rpc.MakeSecurityContext(serverCfg.Config, roachpb.SystemTenantID)
pgURL, err := sCtx.PGURL(url.User(security.RootUser))
if err != nil {
log.Errorf(ctx, "failed computing the URL: %v", err)
Expand Down Expand Up @@ -1366,6 +1367,7 @@ func getClientGRPCConn(
clock := hlc.NewClock(hlc.UnixNano, 0)
stopper := stop.NewStopper()
rpcContext := rpc.NewContext(rpc.ContextOptions{
TenantID: roachpb.SystemTenantID,
AmbientCtx: log.AmbientContext{Tracer: cfg.Settings.Tracer},
Config: cfg.Config,
Clock: clock,
Expand Down
1 change: 1 addition & 0 deletions pkg/gossip/simulation/network.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ func NewNetwork(
Stopper: stopper,
}
n.RPCContext = rpc.NewContext(rpc.ContextOptions{
TenantID: roachpb.SystemTenantID,
AmbientCtx: log.AmbientContext{Tracer: tracing.NewTracer()},
Config: &base.Config{Insecure: true},
Clock: hlc.NewClock(hlc.UnixNano, time.Nanosecond),
Expand Down
2 changes: 2 additions & 0 deletions pkg/kv/kvserver/allocator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5611,6 +5611,7 @@ func TestAllocatorFullDisks(t *testing.T) {
// Model a set of stores in a cluster doing rebalancing, with ranges being
// randomly added occasionally.
rpcContext := rpc.NewContext(rpc.ContextOptions{
TenantID: roachpb.SystemTenantID,
AmbientCtx: log.AmbientContext{Tracer: st.Tracer},
Config: &base.Config{Insecure: true},
Clock: clock,
Expand Down Expand Up @@ -5753,6 +5754,7 @@ func Example_rebalancing() {
// Model a set of stores in a cluster,
// adding / rebalancing ranges of random sizes.
rpcContext := rpc.NewContext(rpc.ContextOptions{
TenantID: roachpb.SystemTenantID,
AmbientCtx: log.AmbientContext{Tracer: st.Tracer},
Config: &base.Config{Insecure: true},
Clock: clock,
Expand Down
2 changes: 2 additions & 0 deletions pkg/kv/kvserver/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,7 @@ func createTestStoreWithOpts(
storeCfg.AmbientCtx = ac

rpcContext := rpc.NewContext(rpc.ContextOptions{
TenantID: roachpb.SystemTenantID,
AmbientCtx: ac,
Config: &base.Config{Insecure: true},
Clock: storeCfg.Clock,
Expand Down Expand Up @@ -373,6 +374,7 @@ func (m *multiTestContext) Start(t testing.TB, numStores int) {
st := cluster.MakeTestingClusterSettings()
if m.rpcContext == nil {
m.rpcContext = rpc.NewContext(rpc.ContextOptions{
TenantID: roachpb.SystemTenantID,
AmbientCtx: log.AmbientContext{Tracer: st.Tracer},
Config: &base.Config{Insecure: true},
Clock: m.clock(),
Expand Down
1 change: 1 addition & 0 deletions pkg/kv/kvserver/queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -579,6 +579,7 @@ func TestNeedsSystemConfig(t *testing.T) {
// Use a gossip instance that won't have the system config available in it.
// bqNeedsSysCfg will not add the replica or process it without a system config.
rpcContext := rpc.NewContext(rpc.ContextOptions{
TenantID: roachpb.SystemTenantID,
AmbientCtx: tc.store.cfg.AmbientCtx,
Config: &base.Config{Insecure: true},
Clock: tc.store.cfg.Clock,
Expand Down
1 change: 1 addition & 0 deletions pkg/kv/kvserver/raft_transport_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ func newRaftTransportTestContext(t testing.TB) *raftTransportTestContext {
transports: map[roachpb.NodeID]*kvserver.RaftTransport{},
}
rttc.nodeRPCContext = rpc.NewContext(rpc.ContextOptions{
TenantID: roachpb.SystemTenantID,
AmbientCtx: log.AmbientContext{Tracer: tracing.NewTracer()},
Config: testutils.NewNodeTestBaseContext(),
Clock: hlc.NewClock(hlc.UnixNano, time.Nanosecond),
Expand Down
1 change: 1 addition & 0 deletions pkg/kv/kvserver/raft_transport_unit_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ func TestRaftTransportStartNewQueue(t *testing.T) {

st := cluster.MakeTestingClusterSettings()
rpcC := rpc.NewContext(rpc.ContextOptions{
TenantID: roachpb.SystemTenantID,
Config: &base.Config{Insecure: true},
Clock: hlc.NewClock(hlc.UnixNano, 500*time.Millisecond),
Stopper: stopper,
Expand Down
1 change: 1 addition & 0 deletions pkg/kv/kvserver/replica_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,7 @@ func (tc *testContext) StartWithStoreConfigAndVersion(
config.TestingSetupZoneConfigHook(stopper)
if tc.gossip == nil {
rpcContext := rpc.NewContext(rpc.ContextOptions{
TenantID: roachpb.SystemTenantID,
AmbientCtx: cfg.AmbientCtx,
Config: &base.Config{Insecure: true},
Clock: cfg.Clock,
Expand Down
1 change: 1 addition & 0 deletions pkg/kv/kvserver/store_pool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ func createTestStorePool(
clock := hlc.NewClock(mc.UnixNano, time.Nanosecond)
st := cluster.MakeTestingClusterSettings()
rpcContext := rpc.NewContext(rpc.ContextOptions{
TenantID: roachpb.SystemTenantID,
AmbientCtx: log.AmbientContext{Tracer: st.Tracer},
Config: &base.Config{Insecure: true},
Clock: clock,
Expand Down
1 change: 1 addition & 0 deletions pkg/kv/kvserver/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,7 @@ func createTestStoreWithoutStart(
config.TestingSetupZoneConfigHook(stopper)

rpcContext := rpc.NewContext(rpc.ContextOptions{
TenantID: roachpb.SystemTenantID,
AmbientCtx: cfg.AmbientCtx,
Config: &base.Config{Insecure: true},
Clock: cfg.Clock,
Expand Down
Loading

0 comments on commit dd21d90

Please sign in to comment.