Skip to content

Commit

Permalink
serverutils: clean up some interfaces
Browse files Browse the repository at this point in the history
In this commit:

- `TestTenantInterface` renamed to `ApplicationLayerInterface`.

- `TestServerInterface` split into:

  - `StorageLayerInterface` containing most of the methods previously
    in `TestServerInterface`.

  - `TenantControlInterface` with just the methods relating to
    starting the service for secondary tenants.

- Relevant methods moved from `TestServerInterface` to
  `ApplicationLayerInterface`:

  - `ServingSQLAddr()`
  - `RPCAddr()`
  - `LeaseManager()`
  - `DistSenderI()`
  - `MigrationServer()`
  - `SetDistSQLSpanResolver()`
  - `CollectionFactory()`
  - `SystemTableIDResolver()`

- updated comments/docstrings for the interfaces.

Release note: None
  • Loading branch information
knz committed Jul 26, 2023
1 parent f147c2b commit 1836bf2
Show file tree
Hide file tree
Showing 102 changed files with 582 additions and 497 deletions.
18 changes: 9 additions & 9 deletions pkg/ccl/backupccl/backup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1812,7 +1812,7 @@ func TestBackupRestoreResume(t *testing.T) {
const numAccounts = 1000
tc, outerDB, dir, cleanupFn := backupRestoreTestSetupWithParams(t, multiNode, numAccounts, InitManualReplication, params)
defer cleanupFn()
srv := tc.TenantOrServer(0)
srv := tc.ApplicationLayer(0)
codec := keys.MakeSQLCodec(srv.RPCContext().TenantID)
clusterID := srv.RPCContext().LogicalClusterID.Get()
backupTableDesc := desctestutils.TestingGetPublicTableDescriptor(tc.Servers[0].DB(), codec, "data", "bank")
Expand Down Expand Up @@ -1895,7 +1895,7 @@ func TestBackupRestoreResume(t *testing.T) {
sqlDB.Exec(t, `BACKUP DATABASE DATA TO $1`, restoreDir)
sqlDB.Exec(t, `CREATE DATABASE restoredb`)
restoreDatabaseID := sqlutils.QueryDatabaseID(t, sqlDB.DB, "restoredb")
restoreTableID, err := tc.TenantOrServer(0).ExecutorConfig().(sql.ExecutorConfig).DescIDGenerator.GenerateUniqueDescID(ctx)
restoreTableID, err := tc.ApplicationLayer(0).ExecutorConfig().(sql.ExecutorConfig).DescIDGenerator.GenerateUniqueDescID(ctx)
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -5440,7 +5440,7 @@ func TestBackupRestoreSequenceOwnership(t *testing.T) {
origDB.Exec(t, `BACKUP DATABASE d TO $1`, backupLoc)

getTableDescriptorFromTestCluster := func(tc *testcluster.TestCluster, database string, table string) catalog.TableDescriptor {
srv := tc.TenantOrServer(0)
srv := tc.ApplicationLayer(0)
return desctestutils.TestingGetPublicTableDescriptor(srv.DB(), srv.Codec(), database, table)
}

Expand Down Expand Up @@ -5843,7 +5843,7 @@ func TestBatchedInsertStats(t *testing.T) {
}}
server, db, _ := serverutils.StartServer(t, params)
defer server.Stopper().Stop(context.Background())
s := server.TenantOrServer()
s := server.ApplicationLayer()
sqlDB := sqlutils.MakeSQLRunner(db)
ctx := context.Background()
execCfg := s.ExecutorConfig().(sql.ExecutorConfig)
Expand Down Expand Up @@ -6392,7 +6392,7 @@ func TestRestoreErrorPropagates(t *testing.T) {
runner := sqlutils.MakeSQLRunner(db)

jobsTableKey.Lock()
jobsTableKey.key = tc.TenantOrServer(0).Codec().TablePrefix(uint32(systemschema.JobsTable.GetID()))
jobsTableKey.key = tc.ApplicationLayer(0).Codec().TablePrefix(uint32(systemschema.JobsTable.GetID()))
jobsTableKey.Unlock()

runner.Exec(t, `SET CLUSTER SETTING jobs.metrics.interval.poll = '30s'`)
Expand Down Expand Up @@ -8578,7 +8578,7 @@ func TestRestoreJobEventLogging(t *testing.T) {

var forceFailure bool
for i := range tc.Servers {
tc.TenantOrServer(i).JobRegistry().(*jobs.Registry).TestingWrapResumerConstructor(
tc.ApplicationLayer(i).JobRegistry().(*jobs.Registry).TestingWrapResumerConstructor(
jobspb.TypeRestore,
func(raw jobs.Resumer) jobs.Resumer {
r := raw.(*restoreResumer)
Expand Down Expand Up @@ -8691,7 +8691,7 @@ func TestBackupOnlyPublicIndexes(t *testing.T) {
)
defer cleanupFn()
kvDB := tc.Server(0).DB()
codec := tc.TenantOrServer(0).ExecutorConfig().(sql.ExecutorConfig).Codec
codec := tc.ApplicationLayer(0).ExecutorConfig().(sql.ExecutorConfig).Codec

locationToDir := func(location string) string {
return strings.Replace(location, localFoo, filepath.Join(rawDir, "foo"), 1)
Expand Down Expand Up @@ -10183,7 +10183,7 @@ func TestBackupNoOverwriteCheckpoint(t *testing.T) {

// List all the BACKUP-CHECKPOINTS in the progress directory and see
// if there are as many as we'd expect if none got overwritten.
execCfg := tc.TenantOrServer(0).ExecutorConfig().(sql.ExecutorConfig)
execCfg := tc.ApplicationLayer(0).ExecutorConfig().(sql.ExecutorConfig)
ctx := context.Background()
store, err := execCfg.DistSQLSrv.ExternalStorageFromURI(ctx, "userfile:///a", username.RootUserName())
require.NoError(t, err)
Expand Down Expand Up @@ -10412,7 +10412,7 @@ func TestBackupRestoreTelemetryEvents(t *testing.T) {

var forceFailure bool
for i := range tc.Servers {
tc.TenantOrServer(i).JobRegistry().(*jobs.Registry).TestingWrapResumerConstructor(
tc.ApplicationLayer(i).JobRegistry().(*jobs.Registry).TestingWrapResumerConstructor(
jobspb.TypeRestore,
func(raw jobs.Resumer) jobs.Resumer {
r := raw.(*restoreResumer)
Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/backupccl/restore_mid_schema_change_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ func validateTable(
tableName string,
isSchemaOnly bool,
) {
srv := tc.TenantOrServer(0)
srv := tc.ApplicationLayer(0)
desc := desctestutils.TestingGetPublicTableDescriptor(srv.DB(), srv.Codec(), dbName, tableName)
// There should be no mutations on these table descriptors at this point.
require.Equal(t, 0, len(desc.TableDesc().Mutations))
Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/changefeedccl/alter_changefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -662,7 +662,7 @@ func TestAlterChangefeedPersistSinkURI(t *testing.T) {

s, rawSQLDB, _ := serverutils.StartServer(t, base.TestServerArgs{})
sqlDB := sqlutils.MakeSQLRunner(rawSQLDB)
registry := s.TenantOrServer().JobRegistry().(*jobs.Registry)
registry := s.ApplicationLayer().JobRegistry().(*jobs.Registry)
ctx := context.Background()
defer s.Stopper().Stop(ctx)

Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/changefeedccl/changefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4345,7 +4345,7 @@ func TestChangefeedSchemaTTL(t *testing.T) {
}

// TODO(samiskin): tenant tests skipped because of forceTableGC not working
// with a TestTenantInterface
// with a ApplicationLayerInterface
cdcTestWithSystem(t, testFn, feedTestNoTenants)
}

Expand Down
16 changes: 8 additions & 8 deletions pkg/ccl/changefeedccl/helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -472,7 +472,7 @@ func startTestCluster(t testing.TB) (serverutils.TestClusterInterface, *gosql.DB
}

func waitForTenantPodsActive(
t testing.TB, tenantServer serverutils.TestTenantInterface, numPods int,
t testing.TB, tenantServer serverutils.ApplicationLayerInterface, numPods int,
) {
testutils.SucceedsWithin(t, func() error {
status := tenantServer.StatusServer().(serverpb.SQLStatusServer)
Expand All @@ -490,7 +490,7 @@ func waitForTenantPodsActive(

func startTestTenant(
t testing.TB, systemServer serverutils.TestServerInterface, options feedTestOptions,
) (roachpb.TenantID, serverutils.TestTenantInterface, *gosql.DB, func()) {
) (roachpb.TenantID, serverutils.ApplicationLayerInterface, *gosql.DB, func()) {
knobs := base.TestingKnobs{
DistSQL: &execinfra.TestingKnobs{Changefeed: &TestingKnobs{}},
JobsTestingKnobs: jobs.NewTestingKnobsWithShortIntervals(),
Expand Down Expand Up @@ -666,7 +666,7 @@ func serverArgsRegion(args base.TestServerArgs) string {
// expectNotice creates a pretty crude database connection that doesn't involve
// a lot of cdc test framework, use with caution. Driver-agnostic tools don't
// have clean ways of inspecting incoming notices.
func expectNotice(t *testing.T, s serverutils.TestTenantInterface, sql string, expected string) {
func expectNotice(t *testing.T, s serverutils.ApplicationLayerInterface, sql string, expected string) {
url, cleanup := sqlutils.PGUrl(t, s.SQLAddr(), t.Name(), url.User(username.RootUser))
defer cleanup()
base, err := pq.NewConnector(url.String())
Expand Down Expand Up @@ -762,7 +762,7 @@ func closeFeedIgnoreError(t testing.TB, f cdctest.TestFeed) {
// of a test running as the system tenant or a secondary tenant
type TestServer struct {
DB *gosql.DB
Server serverutils.TestTenantInterface
Server serverutils.ApplicationLayerInterface
Codec keys.SQLCodec
TestingKnobs base.TestingKnobs
}
Expand Down Expand Up @@ -912,7 +912,7 @@ func addCloudStorageOptions(t *testing.T, options *feedTestOptions) (cleanup fun
func makeFeedFactory(
t *testing.T,
sinkType string,
s serverutils.TestTenantInterface,
s serverutils.ApplicationLayerInterface,
db *gosql.DB,
testOpts ...feedTestOption,
) (factory cdctest.TestFeedFactory, sinkCleanup func()) {
Expand All @@ -924,9 +924,9 @@ func makeFeedFactoryWithOptions(
t *testing.T, sinkType string, srvOrCluster interface{}, db *gosql.DB, options feedTestOptions,
) (factory cdctest.TestFeedFactory, sinkCleanup func()) {
t.Logf("making %s feed factory", sinkType)
s := func() serverutils.TestTenantInterface {
s := func() serverutils.ApplicationLayerInterface {
switch s := srvOrCluster.(type) {
case serverutils.TestTenantInterface:
case serverutils.ApplicationLayerInterface:
return s
case serverutils.TestClusterInterface:
return s.Server(0)
Expand Down Expand Up @@ -1007,7 +1007,7 @@ func makeFeedFactoryWithOptions(
}

func getInitialDBForEnterpriseFactory(
t *testing.T, s serverutils.TestTenantInterface, rootDB *gosql.DB, opts feedTestOptions,
t *testing.T, s serverutils.ApplicationLayerInterface, rootDB *gosql.DB, opts feedTestOptions,
) (*gosql.DB, func()) {
// Instead of creating enterprise changefeeds on the root connection, we may
// choose to create them on a test user connection. This user should have the
Expand Down
4 changes: 2 additions & 2 deletions pkg/ccl/changefeedccl/protected_timestamps_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ func TestChangefeedUpdateProtectedTimestamp(t *testing.T) {
}

mkGetProtections := func(t *testing.T, ptp protectedts.Provider,
srv serverutils.TestTenantInterface, ptsReader spanconfig.ProtectedTSReader,
srv serverutils.ApplicationLayerInterface, ptsReader spanconfig.ProtectedTSReader,
span roachpb.Span) func() []hlc.Timestamp {
return func() (r []hlc.Timestamp) {
require.NoError(t,
Expand Down Expand Up @@ -200,7 +200,7 @@ func TestChangefeedProtectedTimestamps(t *testing.T) {
return nil
})
mkGetProtections = func(t *testing.T, ptp protectedts.Provider,
srv serverutils.TestTenantInterface, ptsReader spanconfig.ProtectedTSReader,
srv serverutils.ApplicationLayerInterface, ptsReader spanconfig.ProtectedTSReader,
span roachpb.Span) func() []hlc.Timestamp {
return func() (r []hlc.Timestamp) {
require.NoError(t,
Expand Down
4 changes: 2 additions & 2 deletions pkg/ccl/changefeedccl/show_changefeed_jobs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ func TestShowChangefeedJobs(t *testing.T) {

s, rawSQLDB, _ := serverutils.StartServer(t, base.TestServerArgs{})
sqlDB := sqlutils.MakeSQLRunner(rawSQLDB)
registry := s.TenantOrServer().JobRegistry().(*jobs.Registry)
registry := s.ApplicationLayer().JobRegistry().(*jobs.Registry)
ctx := context.Background()
defer s.Stopper().Stop(ctx)

Expand Down Expand Up @@ -241,7 +241,7 @@ func TestShowChangefeedJobsStatusChange(t *testing.T) {
var params base.TestServerArgs
params.Knobs.JobsTestingKnobs = jobs.NewTestingKnobsWithShortIntervals()
s, rawSQLDB, _ := serverutils.StartServer(t, params)
registry := s.TenantOrServer().JobRegistry().(*jobs.Registry)
registry := s.ApplicationLayer().JobRegistry().(*jobs.Registry)
sqlDB := sqlutils.MakeSQLRunner(rawSQLDB)
defer s.Stopper().Stop(context.Background())

Expand Down
26 changes: 13 additions & 13 deletions pkg/ccl/changefeedccl/testfeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ import (
)

type sinklessFeedFactory struct {
s serverutils.TestTenantInterface
s serverutils.ApplicationLayerInterface
// postgres url used for creating sinkless changefeeds. This may be the same as
// the rootURL.
sink url.URL
Expand All @@ -78,7 +78,7 @@ type sinklessFeedFactory struct {
// makeSinklessFeedFactory returns a TestFeedFactory implementation using the
// `experimental-sql` uri.
func makeSinklessFeedFactory(
s serverutils.TestTenantInterface, sink url.URL, rootConn url.URL, sinkForUser sinkForUser,
s serverutils.ApplicationLayerInterface, sink url.URL, rootConn url.URL, sinkForUser sinkForUser,
) cdctest.TestFeedFactory {
return &sinklessFeedFactory{s: s, sink: sink, rootURL: rootConn, sinkForUser: sinkForUser}
}
Expand Down Expand Up @@ -140,7 +140,7 @@ func (f *sinklessFeedFactory) Feed(create string, args ...interface{}) (cdctest.
}

// Server implements the TestFeedFactory interface.
func (f *sinklessFeedFactory) Server() serverutils.TestTenantInterface {
func (f *sinklessFeedFactory) Server() serverutils.ApplicationLayerInterface {
return f.s
}

Expand Down Expand Up @@ -638,7 +638,7 @@ func (s *notifyFlushSink) EncodeAndEmitRow(
var _ Sink = (*notifyFlushSink)(nil)

// feedInjectable is the subset of the
// TestServerInterface/TestTenantInterface needed for depInjector to
// TestServerInterface/ApplicationLayerInterface needed for depInjector to
// work correctly.
type feedInjectable interface {
JobRegistry() interface{}
Expand Down Expand Up @@ -739,7 +739,7 @@ func (di *depInjector) getJobFeed(jobID jobspb.JobID) *jobFeed {
}

type enterpriseFeedFactory struct {
s serverutils.TestTenantInterface
s serverutils.ApplicationLayerInterface
di *depInjector
// db is used for creating changefeeds. This may be the same as rootDB.
db *gosql.DB
Expand Down Expand Up @@ -802,9 +802,9 @@ type tableFeedFactory struct {
uri url.URL
}

func getInjectables(srvOrCluster interface{}) (serverutils.TestTenantInterface, []feedInjectable) {
func getInjectables(srvOrCluster interface{}) (serverutils.ApplicationLayerInterface, []feedInjectable) {
switch t := srvOrCluster.(type) {
case serverutils.TestTenantInterface:
case serverutils.ApplicationLayerInterface:
t.PGServer()
return t, []feedInjectable{t}
case serverutils.TestClusterInterface:
Expand Down Expand Up @@ -885,7 +885,7 @@ func (f *tableFeedFactory) Feed(
}

// Server implements the TestFeedFactory interface.
func (f *tableFeedFactory) Server() serverutils.TestTenantInterface {
func (f *tableFeedFactory) Server() serverutils.ApplicationLayerInterface {
return f.s
}

Expand Down Expand Up @@ -1128,7 +1128,7 @@ func (f *cloudFeedFactory) Feed(
}

// Server implements the TestFeedFactory interface.
func (f *cloudFeedFactory) Server() serverutils.TestTenantInterface {
func (f *cloudFeedFactory) Server() serverutils.ApplicationLayerInterface {
return f.s
}

Expand Down Expand Up @@ -1826,7 +1826,7 @@ func (k *kafkaFeedFactory) Feed(create string, args ...interface{}) (cdctest.Tes
}

// Server implements TestFeedFactory
func (k *kafkaFeedFactory) Server() serverutils.TestTenantInterface {
func (k *kafkaFeedFactory) Server() serverutils.ApplicationLayerInterface {
return k.s
}

Expand Down Expand Up @@ -2027,7 +2027,7 @@ func (f *webhookFeedFactory) Feed(create string, args ...interface{}) (cdctest.T
return c, nil
}

func (f *webhookFeedFactory) Server() serverutils.TestTenantInterface {
func (f *webhookFeedFactory) Server() serverutils.ApplicationLayerInterface {
return f.s
}

Expand Down Expand Up @@ -2338,7 +2338,7 @@ func makePubsubFeedFactory(srvOrCluster interface{}, rootDB *gosql.DB) cdctest.T
s, injectables := getInjectables(srvOrCluster)

switch t := srvOrCluster.(type) {
case serverutils.TestTenantInterface:
case serverutils.ApplicationLayerInterface:
t.DistSQLServer().(*distsql.ServerImpl).TestingKnobs.Changefeed.(*TestingKnobs).PubsubClientSkipClientCreation = true
case serverutils.TestClusterInterface:
servers := make([]feedInjectable, t.NumServers())
Expand Down Expand Up @@ -2416,7 +2416,7 @@ func (p *pubsubFeedFactory) Feed(create string, args ...interface{}) (cdctest.Te
}

// Server implements TestFeedFactory
func (p *pubsubFeedFactory) Server() serverutils.TestTenantInterface {
func (p *pubsubFeedFactory) Server() serverutils.ApplicationLayerInterface {
return p.s
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/kvccl/kvfollowerreadsccl/followerreads_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -915,7 +915,7 @@ func TestSecondaryTenantFollowerReadsRouting(t *testing.T) {
historicalQuery := `SELECT * FROM t.test AS OF SYSTEM TIME follower_read_timestamp() WHERE k=2`
recCh := make(chan tracingpb.Recording, 1)

var tenants [numNodes]serverutils.TestTenantInterface
var tenants [numNodes]serverutils.ApplicationLayerInterface
for i := 0; i < numNodes; i++ {
knobs := base.TestingKnobs{}
if i == 3 { // n4
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1019,7 +1019,7 @@ func TestScheduledJobsConsumption(t *testing.T) {
env := jobstest.NewJobSchedulerTestEnv(jobstest.UseSystemTables, timeutil.Now())
var zeroDuration time.Duration
var execSchedules func() error
var tenantServer serverutils.TestTenantInterface
var tenantServer serverutils.ApplicationLayerInterface
var tenantDB *gosql.DB
tenantServer, tenantDB = serverutils.StartTenant(t, hostServer, base.TestTenantArgs{
TenantID: serverutils.TestTenantID(),
Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/partitionccl/partition_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1340,7 +1340,7 @@ func TestPrimaryKeyChangeZoneConfigs(t *testing.T) {
ctx := context.Background()
s, sqlDB, kvDB := serverutils.StartServer(t, base.TestServerArgs{})
defer s.Stopper().Stop(ctx)
codec, sv := s.TenantOrServer().Codec(), &s.TenantOrServer().ClusterSettings().SV
codec, sv := s.ApplicationLayer().Codec(), &s.ApplicationLayer().ClusterSettings().SV
sql.SecondaryTenantZoneConfigsEnabled.Override(ctx, sv, true)

// Write a table with some partitions into the database,
Expand Down
4 changes: 2 additions & 2 deletions pkg/ccl/partitionccl/zone_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -390,8 +390,8 @@ func TestZoneConfigAppliesToTemporaryIndex(t *testing.T) {
s, sqlDB, kvDB := serverutils.StartServer(t, params)
defer s.Stopper().Stop(context.Background())
tdb := sqlutils.MakeSQLRunner(sqlDB)
codec := s.TenantOrServer().Codec()
sv := &s.TenantOrServer().ClusterSettings().SV
codec := s.ApplicationLayer().Codec()
sv := &s.ApplicationLayer().ClusterSettings().SV
sql.SecondaryTenantZoneConfigsEnabled.Override(context.Background(), sv, true)

if _, err := sqlDB.Exec(`
Expand Down
6 changes: 3 additions & 3 deletions pkg/ccl/serverccl/admin_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,13 @@ import (
var adminPrefix = "/_admin/v1/"

func getAdminJSONProto(
ts serverutils.TestTenantInterface, path string, response protoutil.Message,
ts serverutils.ApplicationLayerInterface, path string, response protoutil.Message,
) error {
return getAdminJSONProtoWithAdminOption(ts, path, response, true)
}

func getAdminJSONProtoWithAdminOption(
ts serverutils.TestTenantInterface, path string, response protoutil.Message, isAdmin bool,
ts serverutils.ApplicationLayerInterface, path string, response protoutil.Message, isAdmin bool,
) error {
return serverutils.GetJSONProtoWithAdminOption(ts, adminPrefix+path, response, isAdmin)
}
Expand Down Expand Up @@ -212,7 +212,7 @@ func TestTableAndDatabaseDetailsAndStats(t *testing.T) {
s, db, _ := serverutils.StartServer(t, base.TestServerArgs{})
defer s.Stopper().Stop(ctx)

st := s.TenantOrServer()
st := s.ApplicationLayer()
_, err := db.Exec("CREATE TABLE test (id int)")
require.NoError(t, err)
_, err = db.Exec("INSERT INTO test VALUES (1)")
Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/serverccl/diagnosticsccl/reporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -331,7 +331,7 @@ func TestUsageQuantization(t *testing.T) {
require.NoError(t, err)
}

ts := s.TenantOrServer()
ts := s.ApplicationLayer()

// Flush the SQL stat pool.
ts.SQLServer().(*sql.Server).GetSQLStatsController().ResetLocalSQLStats(ctx)
Expand Down
Loading

0 comments on commit 1836bf2

Please sign in to comment.