Skip to content

Commit

Permalink
ccl/sqlproxyccl: directory proto and test server
Browse files Browse the repository at this point in the history
* Defines a new interface between a tenant directory client and server
* Moves the tenant directory from the CC repo over
* Tenant directory modified to use the new interface
* Tenant directory modified to use stop.Stopper
* Modified pod watcher to use streaming grpc call
* Renamed package from directory to tenant, tenantID to ID etc
* Renamed references to k8s, pods to server, endpoints etc
* Prevents test tenant servers to start for non-existing/inactive tenants
* Adds ability to shut down individual tenant servers within a test cluster
* Adds a test directory server that can start/stop tenants
* Adds tests of the directory running agianst the test server
* Allow insecure connections from test tenant to KV server
* Fixed a race in kvtenantccl

Release note: None
  • Loading branch information
darinpp committed Apr 29, 2021
1 parent c8ee98d commit 65c30a3
Show file tree
Hide file tree
Showing 18 changed files with 3,719 additions and 37 deletions.
1 change: 1 addition & 0 deletions pkg/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ ALL_TESTS = [
"//pkg/ccl/partitionccl:partitionccl_test",
"//pkg/ccl/serverccl:serverccl_test",
"//pkg/ccl/sqlproxyccl/cache:cache_test",
"//pkg/ccl/sqlproxyccl/tenant:tenant_test",
"//pkg/ccl/sqlproxyccl:sqlproxyccl_test",
"//pkg/ccl/storageccl/engineccl:engineccl_test",
"//pkg/ccl/storageccl:storageccl_test",
Expand Down
4 changes: 4 additions & 0 deletions pkg/base/test_server_args.go
Original file line number Diff line number Diff line change
Expand Up @@ -239,4 +239,8 @@ type TestTenantArgs struct {

// TestingKnobs for the test server.
TestingKnobs TestingKnobs

// Test server starts with secure mode by default. When this is set to true
// it will switch to insecure
ForceInsecure bool
}
2 changes: 1 addition & 1 deletion pkg/ccl/backupccl/backup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3594,7 +3594,7 @@ func TestBackupTenantsWithRevisionHistory(t *testing.T) {
ctx, tc, sqlDB, _, cleanupFn := BackupRestoreTestSetup(t, singleNode, numAccounts, InitManualReplication)
defer cleanupFn()

_, err := tc.Servers[0].StartTenant(base.TestTenantArgs{TenantID: roachpb.MakeTenantID(10)})
_, err := tc.Servers[0].StartTenant(ctx, base.TestTenantArgs{TenantID: roachpb.MakeTenantID(10)})
require.NoError(t, err)

const msg = "can not backup tenants with revision history"
Expand Down
24 changes: 14 additions & 10 deletions pkg/ccl/kvccl/kvtenantccl/connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -365,12 +365,20 @@ func (c *Connector) getClient(ctx context.Context) (roachpb.InternalClient, erro
dialCtx := c.AnnotateCtx(context.Background())
dialCtx, cancel := c.rpcContext.Stopper.WithCancelOnQuiesce(dialCtx)
defer cancel()
err := c.rpcContext.Stopper.RunTaskWithErr(dialCtx, "kvtenant.Connector: dial", c.dialAddrs)
var client roachpb.InternalClient
err := c.rpcContext.Stopper.RunTaskWithErr(dialCtx, "kvtenant.Connector: dial",
func(ctx context.Context) error {
var err error
client, err = c.dialAddrs(ctx)
return err
})
if err != nil {
return nil, err
}
// NB: read lock not needed.
return c.mu.client, nil
c.mu.Lock()
defer c.mu.Unlock()
c.mu.client = client
return client, nil
})
c.mu.RUnlock()

Expand All @@ -387,7 +395,7 @@ func (c *Connector) getClient(ctx context.Context) (roachpb.InternalClient, erro

// dialAddrs attempts to dial each of the configured addresses in a retry loop.
// The method will only return a non-nil error on context cancellation.
func (c *Connector) dialAddrs(ctx context.Context) error {
func (c *Connector) dialAddrs(ctx context.Context) (roachpb.InternalClient, error) {
for r := retry.StartWithCtx(ctx, c.rpcRetryOptions); r.Next(); {
// Try each address on each retry iteration.
randStart := rand.Intn(len(c.addrs))
Expand All @@ -398,14 +406,10 @@ func (c *Connector) dialAddrs(ctx context.Context) error {
log.Warningf(ctx, "error dialing tenant KV address %s: %v", addr, err)
continue
}
client := roachpb.NewInternalClient(conn)
c.mu.Lock()
c.mu.client = client
c.mu.Unlock()
return nil
return roachpb.NewInternalClient(conn), nil
}
}
return ctx.Err()
return nil, ctx.Err()
}

func (c *Connector) dialAddr(ctx context.Context, addr string) (conn *grpc.ClientConn, err error) {
Expand Down
43 changes: 23 additions & 20 deletions pkg/ccl/serverccl/server_sql_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,15 +93,16 @@ func TestTenantUnauthenticatedAccess(t *testing.T) {
tc := serverutils.StartNewTestCluster(t, 1, base.TestClusterArgs{})
defer tc.Stopper().Stop(ctx)

_, err := tc.Server(0).StartTenant(base.TestTenantArgs{
TenantID: roachpb.MakeTenantID(security.EmbeddedTenantIDs()[0]),
TestingKnobs: base.TestingKnobs{
TenantTestingKnobs: &sql.TenantTestingKnobs{
// Configure the SQL server to access the wrong tenant keyspace.
TenantIDCodecOverride: roachpb.MakeTenantID(security.EmbeddedTenantIDs()[1]),
_, err := tc.Server(0).StartTenant(ctx,
base.TestTenantArgs{
TenantID: roachpb.MakeTenantID(security.EmbeddedTenantIDs()[0]),
TestingKnobs: base.TestingKnobs{
TenantTestingKnobs: &sql.TenantTestingKnobs{
// Configure the SQL server to access the wrong tenant keyspace.
TenantIDCodecOverride: roachpb.MakeTenantID(security.EmbeddedTenantIDs()[1]),
},
},
},
})
})
require.Error(t, err)
require.Regexp(t, `Unauthenticated desc = requested key .* not fully contained in tenant keyspace /Tenant/1{0-1}`, err)
}
Expand All @@ -115,9 +116,10 @@ func TestTenantHTTP(t *testing.T) {
tc := serverutils.StartNewTestCluster(t, 1, base.TestClusterArgs{})
defer tc.Stopper().Stop(ctx)

tenant, err := tc.Server(0).StartTenant(base.TestTenantArgs{
TenantID: roachpb.MakeTenantID(security.EmbeddedTenantIDs()[0]),
})
tenant, err := tc.Server(0).StartTenant(ctx,
base.TestTenantArgs{
TenantID: roachpb.MakeTenantID(security.EmbeddedTenantIDs()[0]),
})
require.NoError(t, err)
t.Run("prometheus", func(t *testing.T) {
resp, err := httputil.Get(ctx, "http://"+tenant.HTTPAddr()+"/_status/vars")
Expand Down Expand Up @@ -150,16 +152,17 @@ func TestIdleExit(t *testing.T) {

warmupDuration := 500 * time.Millisecond
countdownDuration := 4000 * time.Millisecond
tenant, err := tc.Server(0).StartTenant(base.TestTenantArgs{
TenantID: roachpb.MakeTenantID(10),
IdleExitAfter: warmupDuration,
TestingKnobs: base.TestingKnobs{
TenantTestingKnobs: &sql.TenantTestingKnobs{
IdleExitCountdownDuration: countdownDuration,
tenant, err := tc.Server(0).StartTenant(ctx,
base.TestTenantArgs{
TenantID: roachpb.MakeTenantID(10),
IdleExitAfter: warmupDuration,
TestingKnobs: base.TestingKnobs{
TenantTestingKnobs: &sql.TenantTestingKnobs{
IdleExitCountdownDuration: countdownDuration,
},
},
},
Stopper: tc.Stopper(),
})
Stopper: tc.Stopper(),
})

require.NoError(t, err)

Expand Down
75 changes: 75 additions & 0 deletions pkg/ccl/sqlproxyccl/tenant/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
load("@rules_proto//proto:defs.bzl", "proto_library")
load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")
load("@io_bazel_rules_go//proto:def.bzl", "go_proto_library")

proto_library(
name = "tenant_proto",
srcs = ["directory.proto"],
strip_import_prefix = "/pkg",
visibility = ["//visibility:public"],
deps = ["@com_github_gogo_protobuf//gogoproto:gogo_proto"],
)

go_proto_library(
name = "tenant_go_proto",
compilers = ["//pkg/cmd/protoc-gen-gogoroach:protoc-gen-gogoroach_grpc_compiler"],
importpath = "github.com/cockroachdb/cockroach/pkg/ccl/sqlproxyccl/tenant",
proto = ":tenant_proto",
visibility = ["//visibility:public"],
deps = ["@com_github_gogo_protobuf//gogoproto"],
)

go_library(
name = "tenant",
srcs = [
"directory.go",
"entry.go",
"mocks_generated.go",
"test_directory_svr.go",
],
embed = [":tenant_go_proto"],
importpath = "github.com/cockroachdb/cockroach/pkg/ccl/sqlproxyccl/tenant",
visibility = ["//visibility:public"],
deps = [
"//pkg/roachpb",
"//pkg/util/grpcutil",
"//pkg/util/log",
"//pkg/util/stop",
"//pkg/util/syncutil",
"//pkg/util/timeutil",
"@com_github_cockroachdb_errors//:errors",
"@com_github_cockroachdb_logtags//:logtags",
"@com_github_golang_mock//gomock",
"@org_golang_google_grpc//:go_default_library",
"@org_golang_google_grpc//metadata",
"@org_golang_google_grpc//status",
],
)

go_test(
name = "tenant_test",
srcs = [
"directory_test.go",
"main_test.go",
],
embed = [":tenant"],
deps = [
"//pkg/base",
"//pkg/ccl",
"//pkg/ccl/utilccl",
"//pkg/roachpb",
"//pkg/security",
"//pkg/security/securitytest",
"//pkg/server",
"//pkg/sql",
"//pkg/testutils/serverutils",
"//pkg/testutils/testcluster",
"//pkg/util/leaktest",
"//pkg/util/log",
"//pkg/util/randutil",
"//pkg/util/stop",
"@com_github_stretchr_testify//assert",
"@com_github_stretchr_testify//require",
"@org_golang_google_grpc//:go_default_library",
],
)
Loading

0 comments on commit 65c30a3

Please sign in to comment.