diff --git a/pkg/ccl/backupccl/backup_test.go b/pkg/ccl/backupccl/backup_test.go index 65632e620293..ebaac3cee696 100644 --- a/pkg/ccl/backupccl/backup_test.go +++ b/pkg/ccl/backupccl/backup_test.go @@ -6822,14 +6822,14 @@ func TestBackupRestoreTenant(t *testing.T) { restoreDB := sqlutils.MakeSQLRunner(restoreTC.Conns[0]) restoreDB.CheckQueryResults(t, `select id, active, name, crdb_internal.pb_to_json('cockroach.sql.sqlbase.TenantInfo', info, true) from system.tenants`, [][]string{ - {`1`, `true`, `system`, `{"droppedName": "", "id": "1", "name": "system", "state": "ACTIVE"}`}, + {`1`, `true`, `system`, `{"droppedName": "", "id": "1", "name": "system", "state": "ACTIVE", "tenantReplicationJobId": "0"}`}, }) restoreDB.Exec(t, `RESTORE TENANT 10 FROM 'nodelocal://1/t10'`) restoreDB.CheckQueryResults(t, `SELECT id, active, name, crdb_internal.pb_to_json('cockroach.sql.sqlbase.TenantInfo', info, true) FROM system.tenants`, [][]string{ - {`1`, `true`, `system`, `{"droppedName": "", "id": "1", "name": "system", "state": "ACTIVE"}`}, - {`10`, `true`, `NULL`, `{"droppedName": "", "id": "10", "name": "", "state": "ACTIVE"}`}, + {`1`, `true`, `system`, `{"droppedName": "", "id": "1", "name": "system", "state": "ACTIVE", "tenantReplicationJobId": "0"}`}, + {`10`, `true`, `NULL`, `{"droppedName": "", "id": "10", "name": "", "state": "ACTIVE", "tenantReplicationJobId": "0"}`}, }, ) restoreDB.CheckQueryResults(t, @@ -6858,8 +6858,8 @@ func TestBackupRestoreTenant(t *testing.T) { restoreDB.CheckQueryResults(t, `select id, active, name, crdb_internal.pb_to_json('cockroach.sql.sqlbase.TenantInfo', info, true) from system.tenants`, [][]string{ - {`1`, `true`, `system`, `{"droppedName": "", "id": "1", "name": "system", "state": "ACTIVE"}`}, - {`10`, `false`, `NULL`, `{"droppedName": "", "id": "10", "name": "", "state": "DROP"}`}, + {`1`, `true`, `system`, `{"droppedName": "", "id": "1", "name": "system", "state": "ACTIVE", "tenantReplicationJobId": "0"}`}, + {`10`, `false`, `NULL`, `{"droppedName": "", "id": "10", "name": "", "state": "DROP", "tenantReplicationJobId": "0"}`}, }, ) @@ -6883,8 +6883,8 @@ func TestBackupRestoreTenant(t *testing.T) { restoreDB.CheckQueryResults(t, `select id, active, name, crdb_internal.pb_to_json('cockroach.sql.sqlbase.TenantInfo', info, true) from system.tenants`, [][]string{ - {`1`, `true`, `system`, `{"droppedName": "", "id": "1", "name": "system", "state": "ACTIVE"}`}, - {`10`, `true`, `NULL`, `{"droppedName": "", "id": "10", "name": "", "state": "ACTIVE"}`}, + {`1`, `true`, `system`, `{"droppedName": "", "id": "1", "name": "system", "state": "ACTIVE", "tenantReplicationJobId": "0"}`}, + {`10`, `true`, `NULL`, `{"droppedName": "", "id": "10", "name": "", "state": "ACTIVE", "tenantReplicationJobId": "0"}`}, }, ) @@ -6908,14 +6908,14 @@ func TestBackupRestoreTenant(t *testing.T) { restoreDB.CheckQueryResults(t, `select id, active, name, crdb_internal.pb_to_json('cockroach.sql.sqlbase.TenantInfo', info, true) from system.tenants`, [][]string{ - {`1`, `true`, `system`, `{"droppedName": "", "id": "1", "name": "system", "state": "ACTIVE"}`}, + {`1`, `true`, `system`, `{"droppedName": "", "id": "1", "name": "system", "state": "ACTIVE", "tenantReplicationJobId": "0"}`}, }) restoreDB.Exec(t, `RESTORE TENANT 10 FROM 'nodelocal://1/t10'`) restoreDB.CheckQueryResults(t, `select id, active, name, crdb_internal.pb_to_json('cockroach.sql.sqlbase.TenantInfo', info, true) from system.tenants`, [][]string{ - {`1`, `true`, `system`, `{"droppedName": "", "id": "1", "name": "system", "state": "ACTIVE"}`}, - {`10`, `true`, `NULL`, `{"droppedName": "", "id": "10", "name": "", "state": "ACTIVE"}`}, + {`1`, `true`, `system`, `{"droppedName": "", "id": "1", "name": "system", "state": "ACTIVE", "tenantReplicationJobId": "0"}`}, + {`10`, `true`, `NULL`, `{"droppedName": "", "id": "10", "name": "", "state": "ACTIVE", "tenantReplicationJobId": "0"}`}, }, ) }) @@ -6937,14 +6937,14 @@ func TestBackupRestoreTenant(t *testing.T) { restoreDB.CheckQueryResults(t, `select id, active, name, crdb_internal.pb_to_json('cockroach.sql.sqlbase.TenantInfo', info, true) from system.tenants`, [][]string{ - {`1`, `true`, `system`, `{"droppedName": "", "id": "1", "name": "system", "state": "ACTIVE"}`}, + {`1`, `true`, `system`, `{"droppedName": "", "id": "1", "name": "system", "state": "ACTIVE", "tenantReplicationJobId": "0"}`}, }) restoreDB.Exec(t, `RESTORE TENANT 10 FROM 'nodelocal://1/clusterwide'`) restoreDB.CheckQueryResults(t, `select id, active, name, crdb_internal.pb_to_json('cockroach.sql.sqlbase.TenantInfo', info, true) from system.tenants`, [][]string{ - {`1`, `true`, `system`, `{"droppedName": "", "id": "1", "name": "system", "state": "ACTIVE"}`}, - {`10`, `true`, `NULL`, `{"droppedName": "", "id": "10", "name": "", "state": "ACTIVE"}`}, + {`1`, `true`, `system`, `{"droppedName": "", "id": "1", "name": "system", "state": "ACTIVE", "tenantReplicationJobId": "0"}`}, + {`10`, `true`, `NULL`, `{"droppedName": "", "id": "10", "name": "", "state": "ACTIVE", "tenantReplicationJobId": "0"}`}, }, ) @@ -6977,16 +6977,16 @@ func TestBackupRestoreTenant(t *testing.T) { restoreDB.CheckQueryResults(t, `select id, active, name, crdb_internal.pb_to_json('cockroach.sql.sqlbase.TenantInfo', info, true) from system.tenants`, [][]string{ - {`1`, `true`, `system`, `{"droppedName": "", "id": "1", "name": "system", "state": "ACTIVE"}`}, + {`1`, `true`, `system`, `{"droppedName": "", "id": "1", "name": "system", "state": "ACTIVE", "tenantReplicationJobId": "0"}`}, }) restoreDB.Exec(t, `RESTORE FROM 'nodelocal://1/clusterwide'`) restoreDB.CheckQueryResults(t, `select id, active, name, crdb_internal.pb_to_json('cockroach.sql.sqlbase.TenantInfo', info, true) from system.tenants`, [][]string{ - {`1`, `true`, `system`, `{"droppedName": "", "id": "1", "name": "system", "state": "ACTIVE"}`}, - {`10`, `true`, `NULL`, `{"droppedName": "", "id": "10", "name": "", "state": "ACTIVE"}`}, - {`11`, `true`, `NULL`, `{"droppedName": "", "id": "11", "name": "", "state": "ACTIVE"}`}, - {`20`, `true`, `NULL`, `{"droppedName": "", "id": "20", "name": "", "state": "ACTIVE"}`}, + {`1`, `true`, `system`, `{"droppedName": "", "id": "1", "name": "system", "state": "ACTIVE", "tenantReplicationJobId": "0"}`}, + {`10`, `true`, `NULL`, `{"droppedName": "", "id": "10", "name": "", "state": "ACTIVE", "tenantReplicationJobId": "0"}`}, + {`11`, `true`, `NULL`, `{"droppedName": "", "id": "11", "name": "", "state": "ACTIVE", "tenantReplicationJobId": "0"}`}, + {`20`, `true`, `NULL`, `{"droppedName": "", "id": "20", "name": "", "state": "ACTIVE", "tenantReplicationJobId": "0"}`}, }, ) diff --git a/pkg/ccl/backupccl/testdata/backup-restore/restore-tenants b/pkg/ccl/backupccl/testdata/backup-restore/restore-tenants index eaadc0241524..c0f20e7ebe39 100644 --- a/pkg/ccl/backupccl/testdata/backup-restore/restore-tenants +++ b/pkg/ccl/backupccl/testdata/backup-restore/restore-tenants @@ -21,9 +21,9 @@ SELECT crdb_internal.destroy_tenant(5); query-sql SELECT id,active,crdb_internal.pb_to_json('cockroach.sql.sqlbase.TenantInfo', info, true) FROM system.tenants; ---- -1 true {"droppedName": "", "id": "1", "name": "system", "state": "ACTIVE"} -5 false {"droppedName": "", "id": "5", "name": "", "state": "DROP"} -6 true {"droppedName": "", "id": "6", "name": "", "state": "ACTIVE"} +1 true {"droppedName": "", "id": "1", "name": "system", "state": "ACTIVE", "tenantReplicationJobId": "0"} +5 false {"droppedName": "", "id": "5", "name": "", "state": "DROP", "tenantReplicationJobId": "0"} +6 true {"droppedName": "", "id": "6", "name": "", "state": "ACTIVE", "tenantReplicationJobId": "0"} exec-sql BACKUP INTO 'nodelocal://1/cluster' @@ -49,9 +49,9 @@ RESTORE FROM LATEST IN 'nodelocal://1/cluster' query-sql SELECT id,active,crdb_internal.pb_to_json('cockroach.sql.sqlbase.TenantInfo', info, true) FROM system.tenants; ---- -1 true {"droppedName": "", "id": "1", "name": "system", "state": "ACTIVE"} -5 false {"droppedName": "", "id": "5", "name": "", "state": "DROP"} -6 true {"droppedName": "", "id": "6", "name": "", "state": "ACTIVE"} +1 true {"droppedName": "", "id": "1", "name": "system", "state": "ACTIVE", "tenantReplicationJobId": "0"} +5 false {"droppedName": "", "id": "5", "name": "", "state": "DROP", "tenantReplicationJobId": "0"} +6 true {"droppedName": "", "id": "6", "name": "", "state": "ACTIVE", "tenantReplicationJobId": "0"} exec-sql RESTORE TENANT 6 FROM LATEST IN 'nodelocal://1/tenant6' WITH tenant = '7'; @@ -60,7 +60,7 @@ RESTORE TENANT 6 FROM LATEST IN 'nodelocal://1/tenant6' WITH tenant = '7'; query-sql SELECT id,active,crdb_internal.pb_to_json('cockroach.sql.sqlbase.TenantInfo', info, true) FROM system.tenants; ---- -1 true {"droppedName": "", "id": "1", "name": "system", "state": "ACTIVE"} -5 false {"droppedName": "", "id": "5", "name": "", "state": "DROP"} -6 true {"droppedName": "", "id": "6", "name": "", "state": "ACTIVE"} -7 true {"droppedName": "", "id": "7", "name": "", "state": "ACTIVE"} +1 true {"droppedName": "", "id": "1", "name": "system", "state": "ACTIVE", "tenantReplicationJobId": "0"} +5 false {"droppedName": "", "id": "5", "name": "", "state": "DROP", "tenantReplicationJobId": "0"} +6 true {"droppedName": "", "id": "6", "name": "", "state": "ACTIVE", "tenantReplicationJobId": "0"} +7 true {"droppedName": "", "id": "7", "name": "", "state": "ACTIVE", "tenantReplicationJobId": "0"} diff --git a/pkg/ccl/streamingccl/streamingest/stream_ingestion_job.go b/pkg/ccl/streamingccl/streamingest/stream_ingestion_job.go index ef259cfa3465..b8239ec8a237 100644 --- a/pkg/ccl/streamingccl/streamingest/stream_ingestion_job.go +++ b/pkg/ccl/streamingccl/streamingest/stream_ingestion_job.go @@ -22,6 +22,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/sql" + "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb" "github.com/cockroachdb/cockroach/pkg/sql/sem/eval" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/log" @@ -465,7 +466,14 @@ func activateTenant(ctx context.Context, execCtx interface{}, newTenantID roachp p := execCtx.(sql.JobExecContext) execCfg := p.ExecCfg() return execCfg.DB.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error { - return sql.ActivateTenant(ctx, execCfg, txn, newTenantID.ToUint64()) + info, err := sql.GetTenantRecordByID(ctx, execCfg, txn, newTenantID) + if err != nil { + return err + } + + info.State = descpb.TenantInfo_ACTIVE + info.TenantReplicationJobID = 0 + return sql.UpdateTenantRecord(ctx, execCfg, txn, info) }) } @@ -495,15 +503,26 @@ func (s *streamIngestionResumer) cancelProducerJob( // leftover in the keyspace if a ClearRange were to be issued here. In general // the tenant keyspace of a failed/canceled ingestion job should be treated as // corrupted, and the tenant should be dropped before resuming the ingestion. -// TODO(adityamaru): Add ClearRange logic once we have introduced -// synchronization between the flow tearing down and the job transitioning to a -// failed/canceled state. -func (s *streamIngestionResumer) OnFailOrCancel(ctx context.Context, _ interface{}, _ error) error { +func (s *streamIngestionResumer) OnFailOrCancel( + ctx context.Context, execCtx interface{}, _ error, +) error { // Cancel the producer job on best effort. The source job's protected timestamp is no // longer needed as this ingestion job is in 'reverting' status and we won't resume // ingestion anymore. + jobExecCtx := execCtx.(sql.JobExecContext) details := s.job.Details().(jobspb.StreamIngestionDetails) s.cancelProducerJob(ctx, details) + + tenInfo, err := sql.GetTenantRecordByID(ctx, jobExecCtx.ExecCfg(), jobExecCtx.Txn(), details.DestinationTenantID) + if err != nil { + return errors.Wrap(err, "fetch tenant info") + } + + tenInfo.TenantReplicationJobID = 0 + if err := sql.UpdateTenantRecord(ctx, jobExecCtx.ExecCfg(), jobExecCtx.Txn(), tenInfo); err != nil { + return errors.Wrap(err, "update tenant record") + } + return nil } diff --git a/pkg/ccl/streamingccl/streamingest/stream_ingestion_planning.go b/pkg/ccl/streamingccl/streamingest/stream_ingestion_planning.go index 9a4fe923ed4c..e49b976c6987 100644 --- a/pkg/ccl/streamingccl/streamingest/stream_ingestion_planning.go +++ b/pkg/ccl/streamingccl/streamingest/stream_ingestion_planning.go @@ -79,6 +79,11 @@ func ingestionPlanHook( ) } + if !p.ExecCfg().Codec.ForSystemTenant() { + return nil, nil, nil, false, pgerror.Newf(pgcode.InsufficientPrivilege, + "only the system tenant can create other tenants") + } + exprEval := p.ExprEvaluator("INGESTION") from, err := exprEval.String(ctx, ingestionStmt.ReplicationSourceAddress) @@ -127,12 +132,15 @@ func ingestionPlanHook( if _, err := sql.GetTenantRecordByName(ctx, p.ExecCfg(), p.Txn(), roachpb.TenantName(destinationTenant)); err == nil { return errors.Newf("tenant with name %q already exists", destinationTenant) } + + jobID := p.ExecCfg().JobRegistry.MakeJobID() tenantInfo := &descpb.TenantInfoWithUsage{ TenantInfo: descpb.TenantInfo{ // We leave the ID field unset so that the tenant is assigned the next // available tenant ID. - State: descpb.TenantInfo_ADD, - Name: roachpb.TenantName(destinationTenant), + State: descpb.TenantInfo_ADD, + Name: roachpb.TenantName(destinationTenant), + TenantReplicationJobID: jobID, }, } @@ -182,12 +190,12 @@ func ingestionPlanHook( Details: streamIngestionDetails, } - jobID := p.ExecCfg().JobRegistry.MakeJobID() sj, err := p.ExecCfg().JobRegistry.CreateAdoptableJobWithTxn(ctx, jr, jobID, p.Txn()) if err != nil { return err } + resultsCh <- tree.Datums{tree.NewDInt(tree.DInt(sj.ID())), tree.NewDInt(tree.DInt(streamID))} return nil } diff --git a/pkg/ccl/streamingccl/streamingest/stream_replication_e2e_test.go b/pkg/ccl/streamingccl/streamingest/stream_replication_e2e_test.go index a45afa8ee469..fb128c123ed6 100644 --- a/pkg/ccl/streamingccl/streamingest/stream_replication_e2e_test.go +++ b/pkg/ccl/streamingccl/streamingest/stream_replication_e2e_test.go @@ -756,6 +756,62 @@ func TestTenantStreamingCancelIngestion(t *testing.T) { }) } +func TestTenantStreamingDropTenantCancelsStream(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + ctx := context.Background() + args := defaultTenantStreamingClustersArgs + + testCancelIngestion := func(t *testing.T, cancelAfterPaused bool) { + c, cleanup := createTenantStreamingClusters(ctx, t, args) + defer cleanup() + producerJobID, ingestionJobID := c.startStreamReplication() + + jobutils.WaitForJobToRun(c.t, c.srcSysSQL, jobspb.JobID(producerJobID)) + jobutils.WaitForJobToRun(c.t, c.destSysSQL, jobspb.JobID(ingestionJobID)) + + c.waitUntilHighWatermark(c.srcCluster.Server(0).Clock().Now(), jobspb.JobID(ingestionJobID)) + if cancelAfterPaused { + c.destSysSQL.Exec(t, fmt.Sprintf("PAUSE JOB %d", ingestionJobID)) + jobutils.WaitForJobToPause(c.t, c.destSysSQL, jobspb.JobID(ingestionJobID)) + } + + // Set GC TTL low, so that the GC job completes quickly in the test. + c.destSysSQL.Exec(t, "ALTER RANGE tenants CONFIGURE ZONE USING gc.ttlseconds = 1;") + c.destSysSQL.Exec(t, fmt.Sprintf("DROP TENANT %s", c.args.destTenantName)) + jobutils.WaitForJobToCancel(c.t, c.destSysSQL, jobspb.JobID(ingestionJobID)) + jobutils.WaitForJobToCancel(c.t, c.srcSysSQL, jobspb.JobID(producerJobID)) + + // Check if the producer job has released protected timestamp. + stats := streamIngestionStats(t, c.destSysSQL, ingestionJobID) + require.NotNil(t, stats.ProducerStatus) + require.Nil(t, stats.ProducerStatus.ProtectedTimestamp) + + // Wait for the GC job to finish + c.destSysSQL.Exec(t, "SHOW JOBS WHEN COMPLETE SELECT job_id FROM [SHOW JOBS] WHERE job_type = 'SCHEMA CHANGE GC'") + + // Check if dest tenant key range is cleaned up. + destTenantPrefix := keys.MakeTenantPrefix(args.destTenantID) + rows, err := c.destCluster.Server(0).DB(). + Scan(ctx, destTenantPrefix, destTenantPrefix.PrefixEnd(), 10) + require.NoError(t, err) + require.Empty(t, rows) + + c.destSysSQL.CheckQueryResults(t, + fmt.Sprintf("SELECT count(*) FROM system.tenants WHERE id = %s", args.destTenantID), + [][]string{{"0"}}) + } + + t.Run("drop-tenant-after-paused", func(t *testing.T) { + testCancelIngestion(t, true) + }) + + t.Run("drop-tenant-while-running", func(t *testing.T) { + testCancelIngestion(t, false) + }) +} + func TestTenantStreamingUnavailableStreamAddress(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) diff --git a/pkg/sql/catalog/descpb/tenant.proto b/pkg/sql/catalog/descpb/tenant.proto index 08c9c034e785..0cc971cdc51b 100644 --- a/pkg/sql/catalog/descpb/tenant.proto +++ b/pkg/sql/catalog/descpb/tenant.proto @@ -37,9 +37,20 @@ message TenantInfo { optional string name = 3 [ (gogoproto.nullable) = false, (gogoproto.customtype) = "github.com/cockroachdb/cockroach/pkg/roachpb.TenantName"]; + + // DroppedName is the name the tenant had before DROP TENANT was + // run on the tenant. It should be empty for active or adding + // tenants. optional string dropped_name = 4 [ (gogoproto.nullable) = false, (gogoproto.customtype) = "github.com/cockroachdb/cockroach/pkg/roachpb.TenantName"]; + + // TenantReplicationJobID is set if this tenant is the target tenant + // of a running tenant replication job. + optional int64 tenant_replication_job_id = 5 [ + (gogoproto.nullable) = false, + (gogoproto.customname) = "TenantReplicationJobID", + (gogoproto.casttype) = "github.com/cockroachdb/cockroach/pkg/sql/catalog/catpb.JobID"]; } // TenantInfoAndUsage contains the information for a tenant in a multi-tenant diff --git a/pkg/sql/drop_tenant.go b/pkg/sql/drop_tenant.go index 06ba81806339..41e82fe7eaaf 100644 --- a/pkg/sql/drop_tenant.go +++ b/pkg/sql/drop_tenant.go @@ -32,7 +32,7 @@ func (p *planner) DropTenant(_ context.Context, n *tree.DropTenant) (planNode, e } func (n *dropTenantNode) startExec(params runParams) error { - err := params.p.DestroyTenant(params.ctx, n.name, false) + err := params.p.DestroyTenant(params.ctx, n.name, false /* synchronous */) if err != nil { if pgerror.GetPGCode(err) == pgcode.UndefinedObject && n.ifExists { return nil diff --git a/pkg/sql/gcjob/refresh_statuses.go b/pkg/sql/gcjob/refresh_statuses.go index 050ee78563cb..85190cc8056f 100644 --- a/pkg/sql/gcjob/refresh_statuses.go +++ b/pkg/sql/gcjob/refresh_statuses.go @@ -17,6 +17,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/config" "github.com/cockroachdb/cockroach/pkg/config/zonepb" + "github.com/cockroachdb/cockroach/pkg/jobs" "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/kv" @@ -28,6 +29,8 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/catalog" "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb" "github.com/cockroachdb/cockroach/pkg/sql/catalog/descs" + "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgcode" + "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/timeutil" @@ -457,8 +460,21 @@ func refreshTenant( return true, time.Time{}, nil } - // Read the tenant's GC TTL to check if the tenant's data has expired. tenID := details.Tenant.ID + // TODO(ssd): Once + // https://github.com/cockroachdb/cockroach/issues/92093 is + // done, we should be able to simply rely on the protected + // timestamp for the replication job. + jobActive, err := tenantHasActiveReplicationJob(ctx, execCfg, tenID) + if err != nil { + return false, time.Time{}, err + } + if jobActive { + log.Infof(ctx, "tenant %d has active tenant replication job, waiting for it to stop before running GC", tenID) + return false, timeutil.Now().Add(MaxSQLGCInterval), nil + } + + // Read the tenant's GC TTL to check if the tenant's data has expired. cfg := execCfg.SystemConfig.GetSystemConfig() tenantTTLSeconds := execCfg.DefaultZoneConfig.GC.TTLSeconds zoneCfg, err := cfg.GetZoneConfigForObject(keys.SystemSQLCodec, keys.TenantsRangesID) @@ -491,3 +507,30 @@ func refreshTenant( } return false, deadlineUnix, nil } + +func tenantHasActiveReplicationJob( + ctx context.Context, execCfg *sql.ExecutorConfig, tenID uint64, +) (bool, error) { + info, err := sql.GetTenantRecordByID(ctx, execCfg, nil /* txn */, roachpb.MustMakeTenantID(tenID)) + if err != nil { + if pgerror.GetPGCode(err) == pgcode.UndefinedObject { + log.Errorf(ctx, "tenant id %d not found while attempting to GC", tenID) + return false, nil + } else { + return false, errors.Wrapf(err, "fetching tenant %d", tenID) + } + } + if jobID := info.TenantReplicationJobID; jobID != 0 { + j, err := execCfg.JobRegistry.LoadJob(ctx, jobID) + if err != nil { + if errors.Is(err, &jobs.JobNotFoundError{}) { + log.Infof(ctx, "tenant replication job %d not found", jobID) + return false, nil + } else { + return false, err + } + } + return !j.Status().Terminal(), nil + } + return false, err +} diff --git a/pkg/sql/gcjob/tenant_garbage_collection.go b/pkg/sql/gcjob/tenant_garbage_collection.go index 25112e749630..97621702b27c 100644 --- a/pkg/sql/gcjob/tenant_garbage_collection.go +++ b/pkg/sql/gcjob/tenant_garbage_collection.go @@ -56,7 +56,7 @@ func gcTenant( } return nil } - return errors.Wrapf(err, "fetching tenant %d", info.ID) + return errors.Wrapf(err, "fetching tenant %d", tenID) } // This case should never happen. diff --git a/pkg/sql/gcjob_test/gc_job_test.go b/pkg/sql/gcjob_test/gc_job_test.go index b88b39e8c2f3..6be59fe6c88d 100644 --- a/pkg/sql/gcjob_test/gc_job_test.go +++ b/pkg/sql/gcjob_test/gc_job_test.go @@ -463,7 +463,7 @@ func TestGCTenant(t *testing.T) { require.EqualError( t, gcClosure(dropTenID, progress), - `GC state for tenant id:11 state:DROP name:"" dropped_name:"" is DELETED yet the tenant row still exists`, + `GC state for tenant id:11 state:DROP name:"" dropped_name:"" tenant_replication_job_id:0 is DELETED yet the tenant row still exists`, ) }) diff --git a/pkg/sql/logictest/testdata/logic_test/tenant b/pkg/sql/logictest/testdata/logic_test/tenant index 6b58beb802f8..a8e2e5ca961c 100644 --- a/pkg/sql/logictest/testdata/logic_test/tenant +++ b/pkg/sql/logictest/testdata/logic_test/tenant @@ -3,7 +3,7 @@ query IBIT colnames SELECT id, active, length(info), name FROM system.tenants ORDER BY id ---- id active length name -1 true 14 system +1 true 16 system # Create a few tenants. @@ -22,10 +22,10 @@ FROM system.tenants ORDER BY id ---- id active name crdb_internal.pb_to_json -1 true system {"droppedName": "", "id": "1", "name": "system", "state": "ACTIVE"} -2 true tenant-one {"droppedName": "", "id": "2", "name": "tenant-one", "state": "ACTIVE"} -3 true two {"droppedName": "", "id": "3", "name": "two", "state": "ACTIVE"} -4 true three {"droppedName": "", "id": "4", "name": "three", "state": "ACTIVE"} +1 true system {"droppedName": "", "id": "1", "name": "system", "state": "ACTIVE", "tenantReplicationJobId": "0"} +2 true tenant-one {"droppedName": "", "id": "2", "name": "tenant-one", "state": "ACTIVE", "tenantReplicationJobId": "0"} +3 true two {"droppedName": "", "id": "3", "name": "two", "state": "ACTIVE", "tenantReplicationJobId": "0"} +4 true three {"droppedName": "", "id": "4", "name": "three", "state": "ACTIVE", "tenantReplicationJobId": "0"} query ITT colnames SHOW TENANT system @@ -97,7 +97,7 @@ FROM system.tenants WHERE name = 'four' ORDER BY id ---- id active name crdb_internal.pb_to_json -5 true four {"droppedName": "", "id": "5", "name": "four", "state": "ACTIVE"} +5 true four {"droppedName": "", "id": "5", "name": "four", "state": "ACTIVE", "tenantReplicationJobId": "0"} statement ok DROP TENANT four @@ -151,11 +151,11 @@ FROM system.tenants ORDER BY id ---- id active name crdb_internal.pb_to_json -1 true system {"droppedName": "", "id": "1", "name": "system", "state": "ACTIVE"} -2 true tenant-one {"droppedName": "", "id": "2", "name": "tenant-one", "state": "ACTIVE"} -3 true two {"droppedName": "", "id": "3", "name": "two", "state": "ACTIVE"} -4 true three {"droppedName": "", "id": "4", "name": "three", "state": "ACTIVE"} -5 false NULL {"droppedName": "four", "id": "5", "name": "", "state": "DROP"} -6 false NULL {"droppedName": "five-requiring-quotes", "id": "6", "name": "", "state": "DROP"} -7 false NULL {"droppedName": "to-be-reclaimed", "id": "7", "name": "", "state": "DROP"} -8 true to-be-reclaimed {"droppedName": "", "id": "8", "name": "to-be-reclaimed", "state": "ACTIVE"} +1 true system {"droppedName": "", "id": "1", "name": "system", "state": "ACTIVE", "tenantReplicationJobId": "0"} +2 true tenant-one {"droppedName": "", "id": "2", "name": "tenant-one", "state": "ACTIVE", "tenantReplicationJobId": "0"} +3 true two {"droppedName": "", "id": "3", "name": "two", "state": "ACTIVE", "tenantReplicationJobId": "0"} +4 true three {"droppedName": "", "id": "4", "name": "three", "state": "ACTIVE", "tenantReplicationJobId": "0"} +5 false NULL {"droppedName": "four", "id": "5", "name": "", "state": "DROP", "tenantReplicationJobId": "0"} +6 false NULL {"droppedName": "five-requiring-quotes", "id": "6", "name": "", "state": "DROP", "tenantReplicationJobId": "0"} +7 false NULL {"droppedName": "to-be-reclaimed", "id": "7", "name": "", "state": "DROP", "tenantReplicationJobId": "0"} +8 true to-be-reclaimed {"droppedName": "", "id": "8", "name": "to-be-reclaimed", "state": "ACTIVE", "tenantReplicationJobId": "0"} diff --git a/pkg/sql/logictest/testdata/logic_test/tenant_builtins b/pkg/sql/logictest/testdata/logic_test/tenant_builtins index d38b4156ad8d..9e79825dcef9 100644 --- a/pkg/sql/logictest/testdata/logic_test/tenant_builtins +++ b/pkg/sql/logictest/testdata/logic_test/tenant_builtins @@ -3,7 +3,7 @@ query IBIT colnames SELECT id, active, length(info), name FROM system.tenants ORDER BY id ---- id active length name -1 true 14 system +1 true 16 system # Create three tenants. @@ -28,10 +28,10 @@ FROM system.tenants ORDER BY id ---- id active name crdb_internal.pb_to_json -1 true system {"droppedName": "", "id": "1", "name": "system", "state": "ACTIVE"} -2 true tenant-number-eleven {"droppedName": "", "id": "2", "name": "tenant-number-eleven", "state": "ACTIVE"} -5 true NULL {"droppedName": "", "id": "5", "name": "", "state": "ACTIVE"} -10 true tenant-number-ten {"droppedName": "", "id": "10", "name": "tenant-number-ten", "state": "ACTIVE"} +1 true system {"droppedName": "", "id": "1", "name": "system", "state": "ACTIVE", "tenantReplicationJobId": "0"} +2 true tenant-number-eleven {"droppedName": "", "id": "2", "name": "tenant-number-eleven", "state": "ACTIVE", "tenantReplicationJobId": "0"} +5 true NULL {"droppedName": "", "id": "5", "name": "", "state": "ACTIVE", "tenantReplicationJobId": "0"} +10 true tenant-number-ten {"droppedName": "", "id": "10", "name": "tenant-number-ten", "state": "ACTIVE", "tenantReplicationJobId": "0"} # Check we can add a name where none existed before. query I @@ -89,10 +89,10 @@ FROM system.tenants ORDER BY id ---- id active name crdb_internal.pb_to_json -1 true system {"droppedName": "", "id": "1", "name": "system", "state": "ACTIVE"} -2 true tenant-number-eleven {"droppedName": "", "id": "2", "name": "tenant-number-eleven", "state": "ACTIVE"} -5 false NULL {"droppedName": "my-new-tenant-name", "id": "5", "name": "", "state": "DROP"} -10 true tenant-number-ten {"droppedName": "", "id": "10", "name": "tenant-number-ten", "state": "ACTIVE"} +1 true system {"droppedName": "", "id": "1", "name": "system", "state": "ACTIVE", "tenantReplicationJobId": "0"} +2 true tenant-number-eleven {"droppedName": "", "id": "2", "name": "tenant-number-eleven", "state": "ACTIVE", "tenantReplicationJobId": "0"} +5 false NULL {"droppedName": "my-new-tenant-name", "id": "5", "name": "", "state": "DROP", "tenantReplicationJobId": "0"} +10 true tenant-number-ten {"droppedName": "", "id": "10", "name": "tenant-number-ten", "state": "ACTIVE", "tenantReplicationJobId": "0"} # Try to recreate an existing tenant. @@ -199,9 +199,9 @@ FROM system.tenants ORDER BY id ---- id active crdb_internal.pb_to_json -1 true {"droppedName": "", "id": "1", "name": "system", "state": "ACTIVE"} -2 true {"droppedName": "", "id": "2", "name": "tenant-number-eleven", "state": "ACTIVE"} -10 true {"droppedName": "", "id": "10", "name": "tenant-number-ten", "state": "ACTIVE"} +1 true {"droppedName": "", "id": "1", "name": "system", "state": "ACTIVE", "tenantReplicationJobId": "0"} +2 true {"droppedName": "", "id": "2", "name": "tenant-number-eleven", "state": "ACTIVE", "tenantReplicationJobId": "0"} +10 true {"droppedName": "", "id": "10", "name": "tenant-number-ten", "state": "ACTIVE", "tenantReplicationJobId": "0"} query error tenant resource limits require a CCL binary SELECT crdb_internal.update_tenant_resource_limits(10, 1000, 100, 0, now(), 0) diff --git a/pkg/sql/tenant.go b/pkg/sql/tenant.go index 940c0ad33fef..ad0fd0c44769 100644 --- a/pkg/sql/tenant.go +++ b/pkg/sql/tenant.go @@ -246,10 +246,16 @@ func GetTenantRecordByID( return info, nil } -// updateTenantRecord updates a tenant in system.tenants. -func updateTenantRecord( +// UpdateTenantRecord updates a tenant in system.tenants. +// +// Caller is expected to check the user's permission. +func UpdateTenantRecord( ctx context.Context, execCfg *ExecutorConfig, txn *kv.Txn, info *descpb.TenantInfo, ) error { + if err := validateTenantInfo(info); err != nil { + return err + } + tenID := info.ID active := info.State == descpb.TenantInfo_ACTIVE infoBytes, err := protoutil.Marshal(info) @@ -269,6 +275,16 @@ func updateTenantRecord( return nil } +func validateTenantInfo(info *descpb.TenantInfo) error { + if info.TenantReplicationJobID != 0 && info.State == descpb.TenantInfo_ACTIVE { + return errors.Newf("tenant in state %v with replication job ID %d", info.State, info.TenantReplicationJobID) + } + if info.DroppedName != "" && info.State != descpb.TenantInfo_DROP { + return errors.Newf("tenant in state %v with dropped name %q", info.State, info.DroppedName) + } + return nil +} + // getAvailableTenantID returns the first available ID that can be assigned to // the created tenant. Note, this ID could have previously belonged to another // tenant that has since been dropped and gc'ed. @@ -469,7 +485,7 @@ func ActivateTenant(ctx context.Context, execCfg *ExecutorConfig, txn *kv.Txn, t // Mark the tenant as active. info.State = descpb.TenantInfo_ACTIVE - if err := updateTenantRecord(ctx, execCfg, txn, info); err != nil { + if err := UpdateTenantRecord(ctx, execCfg, txn, info); err != nil { return errors.Wrap(err, "activating tenant") } @@ -501,7 +517,7 @@ func clearTenant(ctx context.Context, execCfg *ExecutorConfig, info *descpb.Tena // DestroyTenant implements the tree.TenantOperator interface. func (p *planner) DestroyTenant( - ctx context.Context, tenantName roachpb.TenantName, synchronous bool, + ctx context.Context, tenantName roachpb.TenantName, synchronousImmediateDrop bool, ) error { if err := p.validateDestroyTenant(ctx); err != nil { return err @@ -512,11 +528,13 @@ func (p *planner) DestroyTenant( return errors.Wrap(err, "destroying tenant") } - return destroyTenantInternal(ctx, p.txn, p.execCfg, &p.extendedEvalCtx, p.User(), info, synchronous) + return destroyTenantInternal(ctx, p.txn, p.execCfg, &p.extendedEvalCtx, p.User(), info, synchronousImmediateDrop) } // DestroyTenantByID implements the tree.TenantOperator interface. -func (p *planner) DestroyTenantByID(ctx context.Context, tenID uint64, synchronous bool) error { +func (p *planner) DestroyTenantByID( + ctx context.Context, tenID uint64, synchronousImmediateDrop bool, +) error { if err := p.validateDestroyTenant(ctx); err != nil { return err } @@ -525,7 +543,7 @@ func (p *planner) DestroyTenantByID(ctx context.Context, tenID uint64, synchrono if err != nil { return errors.Wrap(err, "destroying tenant") } - return destroyTenantInternal(ctx, p.txn, p.execCfg, &p.extendedEvalCtx, p.User(), info, synchronous) + return destroyTenantInternal(ctx, p.txn, p.execCfg, &p.extendedEvalCtx, p.User(), info, synchronousImmediateDrop) } func (p *planner) validateDestroyTenant(ctx context.Context) error { @@ -543,7 +561,7 @@ func destroyTenantInternal( extendedEvalCtx *extendedEvalContext, user username.SQLUsername, info *descpb.TenantInfo, - synchronous bool, + synchronousImmediateDrop bool, ) error { const op = "destroy" tenID := info.ID @@ -557,23 +575,28 @@ func destroyTenantInternal( // Mark the tenant as dropping. // - // TODO(ssd): Once available, we should cancel any running - // replication job on this tenant record. - // + // Cancel any running replication job on this tenant record. + // The GCJob will wait for this job to enter a terminal state. + if info.TenantReplicationJobID != 0 { + if err := execCfg.JobRegistry.CancelRequested(ctx, txn, info.TenantReplicationJobID); err != nil { + return errors.Wrapf(err, "canceling tenant replication job %d", info.TenantReplicationJobID) + } + } + // TODO(ssd): We may want to implement a job that waits out // any running sql pods before enqueing the GC job. info.State = descpb.TenantInfo_DROP info.DroppedName = info.Name info.Name = "" - if err := updateTenantRecord(ctx, execCfg, txn, info); err != nil { + if err := UpdateTenantRecord(ctx, execCfg, txn, info); err != nil { return errors.Wrap(err, "destroying tenant") } - jobID, err := gcTenantJob(ctx, execCfg, txn, user, tenID, synchronous) + jobID, err := gcTenantJob(ctx, execCfg, txn, user, tenID, synchronousImmediateDrop) if err != nil { return errors.Wrap(err, "scheduling gc job") } - if synchronous { + if synchronousImmediateDrop { extendedEvalCtx.Jobs.add(jobID) } return nil @@ -661,7 +684,7 @@ func gcTenantJob( txn *kv.Txn, user username.SQLUsername, tenID uint64, - synchronous bool, + dropImmediately bool, ) (jobspb.JobID, error) { // Queue a GC job that will delete the tenant data and finally remove the // row from `system.tenants`. @@ -671,7 +694,7 @@ func gcTenantJob( DropTime: timeutil.Now().UnixNano(), } progress := jobspb.SchemaChangeGCProgress{} - if synchronous { + if dropImmediately { progress.Tenant = &jobspb.SchemaChangeGCProgress_TenantProgress{ Status: jobspb.SchemaChangeGCProgress_CLEARING, } @@ -759,7 +782,7 @@ func (p *planner) UpdateTenantResourceLimits( func TestingUpdateTenantRecord( ctx context.Context, execCfg *ExecutorConfig, txn *kv.Txn, info *descpb.TenantInfo, ) error { - return updateTenantRecord(ctx, execCfg, txn, info) + return UpdateTenantRecord(ctx, execCfg, txn, info) } // RenameTenant implements the tree.TenantOperator interface.