From 0015d5d0dbbda9b810915c89778e31103aee7a39 Mon Sep 17 00:00:00 2001 From: Steven Danna Date: Mon, 21 Nov 2022 14:24:42 +0000 Subject: [PATCH] sql,streamingccl: DROP TENANT cancels running ingestion job This adds a new field to TenantInfo that stores the job ID of the replication ingestion job related to that tenant. On DROP any such job is canceled. The GC job waits for the job to enter a terminal state before issuing any cleanup commands. This waiting my not be necessary in the future if the stream ingestion job installs a protected timestamp on the tenant that is only released in OnFailOrCancel. NB: We still wait out the GC TTL even in the case that the tenant has never been ACTIVE. Release note: None --- pkg/ccl/backupccl/backup_test.go | 36 ++++++------ .../testdata/backup-restore/restore-tenants | 20 +++---- .../streamingest/stream_ingestion_job.go | 29 ++++++++-- .../streamingest/stream_ingestion_planning.go | 14 ++++- .../stream_replication_e2e_test.go | 56 ++++++++++++++++++ pkg/sql/catalog/descpb/tenant.proto | 11 ++++ pkg/sql/drop_tenant.go | 2 +- pkg/sql/gcjob/refresh_statuses.go | 45 ++++++++++++++- pkg/sql/gcjob/tenant_garbage_collection.go | 2 +- pkg/sql/gcjob_test/gc_job_test.go | 2 +- pkg/sql/logictest/testdata/logic_test/tenant | 28 ++++----- .../testdata/logic_test/tenant_builtins | 24 ++++---- pkg/sql/tenant.go | 57 +++++++++++++------ 13 files changed, 243 insertions(+), 83 deletions(-) diff --git a/pkg/ccl/backupccl/backup_test.go b/pkg/ccl/backupccl/backup_test.go index 65632e620293..ebaac3cee696 100644 --- a/pkg/ccl/backupccl/backup_test.go +++ b/pkg/ccl/backupccl/backup_test.go @@ -6822,14 +6822,14 @@ func TestBackupRestoreTenant(t *testing.T) { restoreDB := sqlutils.MakeSQLRunner(restoreTC.Conns[0]) restoreDB.CheckQueryResults(t, `select id, active, name, crdb_internal.pb_to_json('cockroach.sql.sqlbase.TenantInfo', info, true) from system.tenants`, [][]string{ - {`1`, `true`, `system`, `{"droppedName": "", "id": "1", "name": "system", "state": "ACTIVE"}`}, + {`1`, `true`, `system`, `{"droppedName": "", "id": "1", "name": "system", "state": "ACTIVE", "tenantReplicationJobId": "0"}`}, }) restoreDB.Exec(t, `RESTORE TENANT 10 FROM 'nodelocal://1/t10'`) restoreDB.CheckQueryResults(t, `SELECT id, active, name, crdb_internal.pb_to_json('cockroach.sql.sqlbase.TenantInfo', info, true) FROM system.tenants`, [][]string{ - {`1`, `true`, `system`, `{"droppedName": "", "id": "1", "name": "system", "state": "ACTIVE"}`}, - {`10`, `true`, `NULL`, `{"droppedName": "", "id": "10", "name": "", "state": "ACTIVE"}`}, + {`1`, `true`, `system`, `{"droppedName": "", "id": "1", "name": "system", "state": "ACTIVE", "tenantReplicationJobId": "0"}`}, + {`10`, `true`, `NULL`, `{"droppedName": "", "id": "10", "name": "", "state": "ACTIVE", "tenantReplicationJobId": "0"}`}, }, ) restoreDB.CheckQueryResults(t, @@ -6858,8 +6858,8 @@ func TestBackupRestoreTenant(t *testing.T) { restoreDB.CheckQueryResults(t, `select id, active, name, crdb_internal.pb_to_json('cockroach.sql.sqlbase.TenantInfo', info, true) from system.tenants`, [][]string{ - {`1`, `true`, `system`, `{"droppedName": "", "id": "1", "name": "system", "state": "ACTIVE"}`}, - {`10`, `false`, `NULL`, `{"droppedName": "", "id": "10", "name": "", "state": "DROP"}`}, + {`1`, `true`, `system`, `{"droppedName": "", "id": "1", "name": "system", "state": "ACTIVE", "tenantReplicationJobId": "0"}`}, + {`10`, `false`, `NULL`, `{"droppedName": "", "id": "10", "name": "", "state": "DROP", "tenantReplicationJobId": "0"}`}, }, ) @@ -6883,8 +6883,8 @@ func TestBackupRestoreTenant(t *testing.T) { restoreDB.CheckQueryResults(t, `select id, active, name, crdb_internal.pb_to_json('cockroach.sql.sqlbase.TenantInfo', info, true) from system.tenants`, [][]string{ - {`1`, `true`, `system`, `{"droppedName": "", "id": "1", "name": "system", "state": "ACTIVE"}`}, - {`10`, `true`, `NULL`, `{"droppedName": "", "id": "10", "name": "", "state": "ACTIVE"}`}, + {`1`, `true`, `system`, `{"droppedName": "", "id": "1", "name": "system", "state": "ACTIVE", "tenantReplicationJobId": "0"}`}, + {`10`, `true`, `NULL`, `{"droppedName": "", "id": "10", "name": "", "state": "ACTIVE", "tenantReplicationJobId": "0"}`}, }, ) @@ -6908,14 +6908,14 @@ func TestBackupRestoreTenant(t *testing.T) { restoreDB.CheckQueryResults(t, `select id, active, name, crdb_internal.pb_to_json('cockroach.sql.sqlbase.TenantInfo', info, true) from system.tenants`, [][]string{ - {`1`, `true`, `system`, `{"droppedName": "", "id": "1", "name": "system", "state": "ACTIVE"}`}, + {`1`, `true`, `system`, `{"droppedName": "", "id": "1", "name": "system", "state": "ACTIVE", "tenantReplicationJobId": "0"}`}, }) restoreDB.Exec(t, `RESTORE TENANT 10 FROM 'nodelocal://1/t10'`) restoreDB.CheckQueryResults(t, `select id, active, name, crdb_internal.pb_to_json('cockroach.sql.sqlbase.TenantInfo', info, true) from system.tenants`, [][]string{ - {`1`, `true`, `system`, `{"droppedName": "", "id": "1", "name": "system", "state": "ACTIVE"}`}, - {`10`, `true`, `NULL`, `{"droppedName": "", "id": "10", "name": "", "state": "ACTIVE"}`}, + {`1`, `true`, `system`, `{"droppedName": "", "id": "1", "name": "system", "state": "ACTIVE", "tenantReplicationJobId": "0"}`}, + {`10`, `true`, `NULL`, `{"droppedName": "", "id": "10", "name": "", "state": "ACTIVE", "tenantReplicationJobId": "0"}`}, }, ) }) @@ -6937,14 +6937,14 @@ func TestBackupRestoreTenant(t *testing.T) { restoreDB.CheckQueryResults(t, `select id, active, name, crdb_internal.pb_to_json('cockroach.sql.sqlbase.TenantInfo', info, true) from system.tenants`, [][]string{ - {`1`, `true`, `system`, `{"droppedName": "", "id": "1", "name": "system", "state": "ACTIVE"}`}, + {`1`, `true`, `system`, `{"droppedName": "", "id": "1", "name": "system", "state": "ACTIVE", "tenantReplicationJobId": "0"}`}, }) restoreDB.Exec(t, `RESTORE TENANT 10 FROM 'nodelocal://1/clusterwide'`) restoreDB.CheckQueryResults(t, `select id, active, name, crdb_internal.pb_to_json('cockroach.sql.sqlbase.TenantInfo', info, true) from system.tenants`, [][]string{ - {`1`, `true`, `system`, `{"droppedName": "", "id": "1", "name": "system", "state": "ACTIVE"}`}, - {`10`, `true`, `NULL`, `{"droppedName": "", "id": "10", "name": "", "state": "ACTIVE"}`}, + {`1`, `true`, `system`, `{"droppedName": "", "id": "1", "name": "system", "state": "ACTIVE", "tenantReplicationJobId": "0"}`}, + {`10`, `true`, `NULL`, `{"droppedName": "", "id": "10", "name": "", "state": "ACTIVE", "tenantReplicationJobId": "0"}`}, }, ) @@ -6977,16 +6977,16 @@ func TestBackupRestoreTenant(t *testing.T) { restoreDB.CheckQueryResults(t, `select id, active, name, crdb_internal.pb_to_json('cockroach.sql.sqlbase.TenantInfo', info, true) from system.tenants`, [][]string{ - {`1`, `true`, `system`, `{"droppedName": "", "id": "1", "name": "system", "state": "ACTIVE"}`}, + {`1`, `true`, `system`, `{"droppedName": "", "id": "1", "name": "system", "state": "ACTIVE", "tenantReplicationJobId": "0"}`}, }) restoreDB.Exec(t, `RESTORE FROM 'nodelocal://1/clusterwide'`) restoreDB.CheckQueryResults(t, `select id, active, name, crdb_internal.pb_to_json('cockroach.sql.sqlbase.TenantInfo', info, true) from system.tenants`, [][]string{ - {`1`, `true`, `system`, `{"droppedName": "", "id": "1", "name": "system", "state": "ACTIVE"}`}, - {`10`, `true`, `NULL`, `{"droppedName": "", "id": "10", "name": "", "state": "ACTIVE"}`}, - {`11`, `true`, `NULL`, `{"droppedName": "", "id": "11", "name": "", "state": "ACTIVE"}`}, - {`20`, `true`, `NULL`, `{"droppedName": "", "id": "20", "name": "", "state": "ACTIVE"}`}, + {`1`, `true`, `system`, `{"droppedName": "", "id": "1", "name": "system", "state": "ACTIVE", "tenantReplicationJobId": "0"}`}, + {`10`, `true`, `NULL`, `{"droppedName": "", "id": "10", "name": "", "state": "ACTIVE", "tenantReplicationJobId": "0"}`}, + {`11`, `true`, `NULL`, `{"droppedName": "", "id": "11", "name": "", "state": "ACTIVE", "tenantReplicationJobId": "0"}`}, + {`20`, `true`, `NULL`, `{"droppedName": "", "id": "20", "name": "", "state": "ACTIVE", "tenantReplicationJobId": "0"}`}, }, ) diff --git a/pkg/ccl/backupccl/testdata/backup-restore/restore-tenants b/pkg/ccl/backupccl/testdata/backup-restore/restore-tenants index eaadc0241524..c0f20e7ebe39 100644 --- a/pkg/ccl/backupccl/testdata/backup-restore/restore-tenants +++ b/pkg/ccl/backupccl/testdata/backup-restore/restore-tenants @@ -21,9 +21,9 @@ SELECT crdb_internal.destroy_tenant(5); query-sql SELECT id,active,crdb_internal.pb_to_json('cockroach.sql.sqlbase.TenantInfo', info, true) FROM system.tenants; ---- -1 true {"droppedName": "", "id": "1", "name": "system", "state": "ACTIVE"} -5 false {"droppedName": "", "id": "5", "name": "", "state": "DROP"} -6 true {"droppedName": "", "id": "6", "name": "", "state": "ACTIVE"} +1 true {"droppedName": "", "id": "1", "name": "system", "state": "ACTIVE", "tenantReplicationJobId": "0"} +5 false {"droppedName": "", "id": "5", "name": "", "state": "DROP", "tenantReplicationJobId": "0"} +6 true {"droppedName": "", "id": "6", "name": "", "state": "ACTIVE", "tenantReplicationJobId": "0"} exec-sql BACKUP INTO 'nodelocal://1/cluster' @@ -49,9 +49,9 @@ RESTORE FROM LATEST IN 'nodelocal://1/cluster' query-sql SELECT id,active,crdb_internal.pb_to_json('cockroach.sql.sqlbase.TenantInfo', info, true) FROM system.tenants; ---- -1 true {"droppedName": "", "id": "1", "name": "system", "state": "ACTIVE"} -5 false {"droppedName": "", "id": "5", "name": "", "state": "DROP"} -6 true {"droppedName": "", "id": "6", "name": "", "state": "ACTIVE"} +1 true {"droppedName": "", "id": "1", "name": "system", "state": "ACTIVE", "tenantReplicationJobId": "0"} +5 false {"droppedName": "", "id": "5", "name": "", "state": "DROP", "tenantReplicationJobId": "0"} +6 true {"droppedName": "", "id": "6", "name": "", "state": "ACTIVE", "tenantReplicationJobId": "0"} exec-sql RESTORE TENANT 6 FROM LATEST IN 'nodelocal://1/tenant6' WITH tenant = '7'; @@ -60,7 +60,7 @@ RESTORE TENANT 6 FROM LATEST IN 'nodelocal://1/tenant6' WITH tenant = '7'; query-sql SELECT id,active,crdb_internal.pb_to_json('cockroach.sql.sqlbase.TenantInfo', info, true) FROM system.tenants; ---- -1 true {"droppedName": "", "id": "1", "name": "system", "state": "ACTIVE"} -5 false {"droppedName": "", "id": "5", "name": "", "state": "DROP"} -6 true {"droppedName": "", "id": "6", "name": "", "state": "ACTIVE"} -7 true {"droppedName": "", "id": "7", "name": "", "state": "ACTIVE"} +1 true {"droppedName": "", "id": "1", "name": "system", "state": "ACTIVE", "tenantReplicationJobId": "0"} +5 false {"droppedName": "", "id": "5", "name": "", "state": "DROP", "tenantReplicationJobId": "0"} +6 true {"droppedName": "", "id": "6", "name": "", "state": "ACTIVE", "tenantReplicationJobId": "0"} +7 true {"droppedName": "", "id": "7", "name": "", "state": "ACTIVE", "tenantReplicationJobId": "0"} diff --git a/pkg/ccl/streamingccl/streamingest/stream_ingestion_job.go b/pkg/ccl/streamingccl/streamingest/stream_ingestion_job.go index ef259cfa3465..b8239ec8a237 100644 --- a/pkg/ccl/streamingccl/streamingest/stream_ingestion_job.go +++ b/pkg/ccl/streamingccl/streamingest/stream_ingestion_job.go @@ -22,6 +22,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/sql" + "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb" "github.com/cockroachdb/cockroach/pkg/sql/sem/eval" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/log" @@ -465,7 +466,14 @@ func activateTenant(ctx context.Context, execCtx interface{}, newTenantID roachp p := execCtx.(sql.JobExecContext) execCfg := p.ExecCfg() return execCfg.DB.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error { - return sql.ActivateTenant(ctx, execCfg, txn, newTenantID.ToUint64()) + info, err := sql.GetTenantRecordByID(ctx, execCfg, txn, newTenantID) + if err != nil { + return err + } + + info.State = descpb.TenantInfo_ACTIVE + info.TenantReplicationJobID = 0 + return sql.UpdateTenantRecord(ctx, execCfg, txn, info) }) } @@ -495,15 +503,26 @@ func (s *streamIngestionResumer) cancelProducerJob( // leftover in the keyspace if a ClearRange were to be issued here. In general // the tenant keyspace of a failed/canceled ingestion job should be treated as // corrupted, and the tenant should be dropped before resuming the ingestion. -// TODO(adityamaru): Add ClearRange logic once we have introduced -// synchronization between the flow tearing down and the job transitioning to a -// failed/canceled state. -func (s *streamIngestionResumer) OnFailOrCancel(ctx context.Context, _ interface{}, _ error) error { +func (s *streamIngestionResumer) OnFailOrCancel( + ctx context.Context, execCtx interface{}, _ error, +) error { // Cancel the producer job on best effort. The source job's protected timestamp is no // longer needed as this ingestion job is in 'reverting' status and we won't resume // ingestion anymore. + jobExecCtx := execCtx.(sql.JobExecContext) details := s.job.Details().(jobspb.StreamIngestionDetails) s.cancelProducerJob(ctx, details) + + tenInfo, err := sql.GetTenantRecordByID(ctx, jobExecCtx.ExecCfg(), jobExecCtx.Txn(), details.DestinationTenantID) + if err != nil { + return errors.Wrap(err, "fetch tenant info") + } + + tenInfo.TenantReplicationJobID = 0 + if err := sql.UpdateTenantRecord(ctx, jobExecCtx.ExecCfg(), jobExecCtx.Txn(), tenInfo); err != nil { + return errors.Wrap(err, "update tenant record") + } + return nil } diff --git a/pkg/ccl/streamingccl/streamingest/stream_ingestion_planning.go b/pkg/ccl/streamingccl/streamingest/stream_ingestion_planning.go index 9a4fe923ed4c..e49b976c6987 100644 --- a/pkg/ccl/streamingccl/streamingest/stream_ingestion_planning.go +++ b/pkg/ccl/streamingccl/streamingest/stream_ingestion_planning.go @@ -79,6 +79,11 @@ func ingestionPlanHook( ) } + if !p.ExecCfg().Codec.ForSystemTenant() { + return nil, nil, nil, false, pgerror.Newf(pgcode.InsufficientPrivilege, + "only the system tenant can create other tenants") + } + exprEval := p.ExprEvaluator("INGESTION") from, err := exprEval.String(ctx, ingestionStmt.ReplicationSourceAddress) @@ -127,12 +132,15 @@ func ingestionPlanHook( if _, err := sql.GetTenantRecordByName(ctx, p.ExecCfg(), p.Txn(), roachpb.TenantName(destinationTenant)); err == nil { return errors.Newf("tenant with name %q already exists", destinationTenant) } + + jobID := p.ExecCfg().JobRegistry.MakeJobID() tenantInfo := &descpb.TenantInfoWithUsage{ TenantInfo: descpb.TenantInfo{ // We leave the ID field unset so that the tenant is assigned the next // available tenant ID. - State: descpb.TenantInfo_ADD, - Name: roachpb.TenantName(destinationTenant), + State: descpb.TenantInfo_ADD, + Name: roachpb.TenantName(destinationTenant), + TenantReplicationJobID: jobID, }, } @@ -182,12 +190,12 @@ func ingestionPlanHook( Details: streamIngestionDetails, } - jobID := p.ExecCfg().JobRegistry.MakeJobID() sj, err := p.ExecCfg().JobRegistry.CreateAdoptableJobWithTxn(ctx, jr, jobID, p.Txn()) if err != nil { return err } + resultsCh <- tree.Datums{tree.NewDInt(tree.DInt(sj.ID())), tree.NewDInt(tree.DInt(streamID))} return nil } diff --git a/pkg/ccl/streamingccl/streamingest/stream_replication_e2e_test.go b/pkg/ccl/streamingccl/streamingest/stream_replication_e2e_test.go index a45afa8ee469..fb128c123ed6 100644 --- a/pkg/ccl/streamingccl/streamingest/stream_replication_e2e_test.go +++ b/pkg/ccl/streamingccl/streamingest/stream_replication_e2e_test.go @@ -756,6 +756,62 @@ func TestTenantStreamingCancelIngestion(t *testing.T) { }) } +func TestTenantStreamingDropTenantCancelsStream(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + ctx := context.Background() + args := defaultTenantStreamingClustersArgs + + testCancelIngestion := func(t *testing.T, cancelAfterPaused bool) { + c, cleanup := createTenantStreamingClusters(ctx, t, args) + defer cleanup() + producerJobID, ingestionJobID := c.startStreamReplication() + + jobutils.WaitForJobToRun(c.t, c.srcSysSQL, jobspb.JobID(producerJobID)) + jobutils.WaitForJobToRun(c.t, c.destSysSQL, jobspb.JobID(ingestionJobID)) + + c.waitUntilHighWatermark(c.srcCluster.Server(0).Clock().Now(), jobspb.JobID(ingestionJobID)) + if cancelAfterPaused { + c.destSysSQL.Exec(t, fmt.Sprintf("PAUSE JOB %d", ingestionJobID)) + jobutils.WaitForJobToPause(c.t, c.destSysSQL, jobspb.JobID(ingestionJobID)) + } + + // Set GC TTL low, so that the GC job completes quickly in the test. + c.destSysSQL.Exec(t, "ALTER RANGE tenants CONFIGURE ZONE USING gc.ttlseconds = 1;") + c.destSysSQL.Exec(t, fmt.Sprintf("DROP TENANT %s", c.args.destTenantName)) + jobutils.WaitForJobToCancel(c.t, c.destSysSQL, jobspb.JobID(ingestionJobID)) + jobutils.WaitForJobToCancel(c.t, c.srcSysSQL, jobspb.JobID(producerJobID)) + + // Check if the producer job has released protected timestamp. + stats := streamIngestionStats(t, c.destSysSQL, ingestionJobID) + require.NotNil(t, stats.ProducerStatus) + require.Nil(t, stats.ProducerStatus.ProtectedTimestamp) + + // Wait for the GC job to finish + c.destSysSQL.Exec(t, "SHOW JOBS WHEN COMPLETE SELECT job_id FROM [SHOW JOBS] WHERE job_type = 'SCHEMA CHANGE GC'") + + // Check if dest tenant key range is cleaned up. + destTenantPrefix := keys.MakeTenantPrefix(args.destTenantID) + rows, err := c.destCluster.Server(0).DB(). + Scan(ctx, destTenantPrefix, destTenantPrefix.PrefixEnd(), 10) + require.NoError(t, err) + require.Empty(t, rows) + + c.destSysSQL.CheckQueryResults(t, + fmt.Sprintf("SELECT count(*) FROM system.tenants WHERE id = %s", args.destTenantID), + [][]string{{"0"}}) + } + + t.Run("drop-tenant-after-paused", func(t *testing.T) { + testCancelIngestion(t, true) + }) + + t.Run("drop-tenant-while-running", func(t *testing.T) { + testCancelIngestion(t, false) + }) +} + func TestTenantStreamingUnavailableStreamAddress(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) diff --git a/pkg/sql/catalog/descpb/tenant.proto b/pkg/sql/catalog/descpb/tenant.proto index 08c9c034e785..0cc971cdc51b 100644 --- a/pkg/sql/catalog/descpb/tenant.proto +++ b/pkg/sql/catalog/descpb/tenant.proto @@ -37,9 +37,20 @@ message TenantInfo { optional string name = 3 [ (gogoproto.nullable) = false, (gogoproto.customtype) = "github.com/cockroachdb/cockroach/pkg/roachpb.TenantName"]; + + // DroppedName is the name the tenant had before DROP TENANT was + // run on the tenant. It should be empty for active or adding + // tenants. optional string dropped_name = 4 [ (gogoproto.nullable) = false, (gogoproto.customtype) = "github.com/cockroachdb/cockroach/pkg/roachpb.TenantName"]; + + // TenantReplicationJobID is set if this tenant is the target tenant + // of a running tenant replication job. + optional int64 tenant_replication_job_id = 5 [ + (gogoproto.nullable) = false, + (gogoproto.customname) = "TenantReplicationJobID", + (gogoproto.casttype) = "github.com/cockroachdb/cockroach/pkg/sql/catalog/catpb.JobID"]; } // TenantInfoAndUsage contains the information for a tenant in a multi-tenant diff --git a/pkg/sql/drop_tenant.go b/pkg/sql/drop_tenant.go index 06ba81806339..41e82fe7eaaf 100644 --- a/pkg/sql/drop_tenant.go +++ b/pkg/sql/drop_tenant.go @@ -32,7 +32,7 @@ func (p *planner) DropTenant(_ context.Context, n *tree.DropTenant) (planNode, e } func (n *dropTenantNode) startExec(params runParams) error { - err := params.p.DestroyTenant(params.ctx, n.name, false) + err := params.p.DestroyTenant(params.ctx, n.name, false /* synchronous */) if err != nil { if pgerror.GetPGCode(err) == pgcode.UndefinedObject && n.ifExists { return nil diff --git a/pkg/sql/gcjob/refresh_statuses.go b/pkg/sql/gcjob/refresh_statuses.go index 050ee78563cb..85190cc8056f 100644 --- a/pkg/sql/gcjob/refresh_statuses.go +++ b/pkg/sql/gcjob/refresh_statuses.go @@ -17,6 +17,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/config" "github.com/cockroachdb/cockroach/pkg/config/zonepb" + "github.com/cockroachdb/cockroach/pkg/jobs" "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/kv" @@ -28,6 +29,8 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/catalog" "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb" "github.com/cockroachdb/cockroach/pkg/sql/catalog/descs" + "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgcode" + "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/timeutil" @@ -457,8 +460,21 @@ func refreshTenant( return true, time.Time{}, nil } - // Read the tenant's GC TTL to check if the tenant's data has expired. tenID := details.Tenant.ID + // TODO(ssd): Once + // https://github.com/cockroachdb/cockroach/issues/92093 is + // done, we should be able to simply rely on the protected + // timestamp for the replication job. + jobActive, err := tenantHasActiveReplicationJob(ctx, execCfg, tenID) + if err != nil { + return false, time.Time{}, err + } + if jobActive { + log.Infof(ctx, "tenant %d has active tenant replication job, waiting for it to stop before running GC", tenID) + return false, timeutil.Now().Add(MaxSQLGCInterval), nil + } + + // Read the tenant's GC TTL to check if the tenant's data has expired. cfg := execCfg.SystemConfig.GetSystemConfig() tenantTTLSeconds := execCfg.DefaultZoneConfig.GC.TTLSeconds zoneCfg, err := cfg.GetZoneConfigForObject(keys.SystemSQLCodec, keys.TenantsRangesID) @@ -491,3 +507,30 @@ func refreshTenant( } return false, deadlineUnix, nil } + +func tenantHasActiveReplicationJob( + ctx context.Context, execCfg *sql.ExecutorConfig, tenID uint64, +) (bool, error) { + info, err := sql.GetTenantRecordByID(ctx, execCfg, nil /* txn */, roachpb.MustMakeTenantID(tenID)) + if err != nil { + if pgerror.GetPGCode(err) == pgcode.UndefinedObject { + log.Errorf(ctx, "tenant id %d not found while attempting to GC", tenID) + return false, nil + } else { + return false, errors.Wrapf(err, "fetching tenant %d", tenID) + } + } + if jobID := info.TenantReplicationJobID; jobID != 0 { + j, err := execCfg.JobRegistry.LoadJob(ctx, jobID) + if err != nil { + if errors.Is(err, &jobs.JobNotFoundError{}) { + log.Infof(ctx, "tenant replication job %d not found", jobID) + return false, nil + } else { + return false, err + } + } + return !j.Status().Terminal(), nil + } + return false, err +} diff --git a/pkg/sql/gcjob/tenant_garbage_collection.go b/pkg/sql/gcjob/tenant_garbage_collection.go index 25112e749630..97621702b27c 100644 --- a/pkg/sql/gcjob/tenant_garbage_collection.go +++ b/pkg/sql/gcjob/tenant_garbage_collection.go @@ -56,7 +56,7 @@ func gcTenant( } return nil } - return errors.Wrapf(err, "fetching tenant %d", info.ID) + return errors.Wrapf(err, "fetching tenant %d", tenID) } // This case should never happen. diff --git a/pkg/sql/gcjob_test/gc_job_test.go b/pkg/sql/gcjob_test/gc_job_test.go index b88b39e8c2f3..6be59fe6c88d 100644 --- a/pkg/sql/gcjob_test/gc_job_test.go +++ b/pkg/sql/gcjob_test/gc_job_test.go @@ -463,7 +463,7 @@ func TestGCTenant(t *testing.T) { require.EqualError( t, gcClosure(dropTenID, progress), - `GC state for tenant id:11 state:DROP name:"" dropped_name:"" is DELETED yet the tenant row still exists`, + `GC state for tenant id:11 state:DROP name:"" dropped_name:"" tenant_replication_job_id:0 is DELETED yet the tenant row still exists`, ) }) diff --git a/pkg/sql/logictest/testdata/logic_test/tenant b/pkg/sql/logictest/testdata/logic_test/tenant index 6b58beb802f8..a8e2e5ca961c 100644 --- a/pkg/sql/logictest/testdata/logic_test/tenant +++ b/pkg/sql/logictest/testdata/logic_test/tenant @@ -3,7 +3,7 @@ query IBIT colnames SELECT id, active, length(info), name FROM system.tenants ORDER BY id ---- id active length name -1 true 14 system +1 true 16 system # Create a few tenants. @@ -22,10 +22,10 @@ FROM system.tenants ORDER BY id ---- id active name crdb_internal.pb_to_json -1 true system {"droppedName": "", "id": "1", "name": "system", "state": "ACTIVE"} -2 true tenant-one {"droppedName": "", "id": "2", "name": "tenant-one", "state": "ACTIVE"} -3 true two {"droppedName": "", "id": "3", "name": "two", "state": "ACTIVE"} -4 true three {"droppedName": "", "id": "4", "name": "three", "state": "ACTIVE"} +1 true system {"droppedName": "", "id": "1", "name": "system", "state": "ACTIVE", "tenantReplicationJobId": "0"} +2 true tenant-one {"droppedName": "", "id": "2", "name": "tenant-one", "state": "ACTIVE", "tenantReplicationJobId": "0"} +3 true two {"droppedName": "", "id": "3", "name": "two", "state": "ACTIVE", "tenantReplicationJobId": "0"} +4 true three {"droppedName": "", "id": "4", "name": "three", "state": "ACTIVE", "tenantReplicationJobId": "0"} query ITT colnames SHOW TENANT system @@ -97,7 +97,7 @@ FROM system.tenants WHERE name = 'four' ORDER BY id ---- id active name crdb_internal.pb_to_json -5 true four {"droppedName": "", "id": "5", "name": "four", "state": "ACTIVE"} +5 true four {"droppedName": "", "id": "5", "name": "four", "state": "ACTIVE", "tenantReplicationJobId": "0"} statement ok DROP TENANT four @@ -151,11 +151,11 @@ FROM system.tenants ORDER BY id ---- id active name crdb_internal.pb_to_json -1 true system {"droppedName": "", "id": "1", "name": "system", "state": "ACTIVE"} -2 true tenant-one {"droppedName": "", "id": "2", "name": "tenant-one", "state": "ACTIVE"} -3 true two {"droppedName": "", "id": "3", "name": "two", "state": "ACTIVE"} -4 true three {"droppedName": "", "id": "4", "name": "three", "state": "ACTIVE"} -5 false NULL {"droppedName": "four", "id": "5", "name": "", "state": "DROP"} -6 false NULL {"droppedName": "five-requiring-quotes", "id": "6", "name": "", "state": "DROP"} -7 false NULL {"droppedName": "to-be-reclaimed", "id": "7", "name": "", "state": "DROP"} -8 true to-be-reclaimed {"droppedName": "", "id": "8", "name": "to-be-reclaimed", "state": "ACTIVE"} +1 true system {"droppedName": "", "id": "1", "name": "system", "state": "ACTIVE", "tenantReplicationJobId": "0"} +2 true tenant-one {"droppedName": "", "id": "2", "name": "tenant-one", "state": "ACTIVE", "tenantReplicationJobId": "0"} +3 true two {"droppedName": "", "id": "3", "name": "two", "state": "ACTIVE", "tenantReplicationJobId": "0"} +4 true three {"droppedName": "", "id": "4", "name": "three", "state": "ACTIVE", "tenantReplicationJobId": "0"} +5 false NULL {"droppedName": "four", "id": "5", "name": "", "state": "DROP", "tenantReplicationJobId": "0"} +6 false NULL {"droppedName": "five-requiring-quotes", "id": "6", "name": "", "state": "DROP", "tenantReplicationJobId": "0"} +7 false NULL {"droppedName": "to-be-reclaimed", "id": "7", "name": "", "state": "DROP", "tenantReplicationJobId": "0"} +8 true to-be-reclaimed {"droppedName": "", "id": "8", "name": "to-be-reclaimed", "state": "ACTIVE", "tenantReplicationJobId": "0"} diff --git a/pkg/sql/logictest/testdata/logic_test/tenant_builtins b/pkg/sql/logictest/testdata/logic_test/tenant_builtins index d38b4156ad8d..9e79825dcef9 100644 --- a/pkg/sql/logictest/testdata/logic_test/tenant_builtins +++ b/pkg/sql/logictest/testdata/logic_test/tenant_builtins @@ -3,7 +3,7 @@ query IBIT colnames SELECT id, active, length(info), name FROM system.tenants ORDER BY id ---- id active length name -1 true 14 system +1 true 16 system # Create three tenants. @@ -28,10 +28,10 @@ FROM system.tenants ORDER BY id ---- id active name crdb_internal.pb_to_json -1 true system {"droppedName": "", "id": "1", "name": "system", "state": "ACTIVE"} -2 true tenant-number-eleven {"droppedName": "", "id": "2", "name": "tenant-number-eleven", "state": "ACTIVE"} -5 true NULL {"droppedName": "", "id": "5", "name": "", "state": "ACTIVE"} -10 true tenant-number-ten {"droppedName": "", "id": "10", "name": "tenant-number-ten", "state": "ACTIVE"} +1 true system {"droppedName": "", "id": "1", "name": "system", "state": "ACTIVE", "tenantReplicationJobId": "0"} +2 true tenant-number-eleven {"droppedName": "", "id": "2", "name": "tenant-number-eleven", "state": "ACTIVE", "tenantReplicationJobId": "0"} +5 true NULL {"droppedName": "", "id": "5", "name": "", "state": "ACTIVE", "tenantReplicationJobId": "0"} +10 true tenant-number-ten {"droppedName": "", "id": "10", "name": "tenant-number-ten", "state": "ACTIVE", "tenantReplicationJobId": "0"} # Check we can add a name where none existed before. query I @@ -89,10 +89,10 @@ FROM system.tenants ORDER BY id ---- id active name crdb_internal.pb_to_json -1 true system {"droppedName": "", "id": "1", "name": "system", "state": "ACTIVE"} -2 true tenant-number-eleven {"droppedName": "", "id": "2", "name": "tenant-number-eleven", "state": "ACTIVE"} -5 false NULL {"droppedName": "my-new-tenant-name", "id": "5", "name": "", "state": "DROP"} -10 true tenant-number-ten {"droppedName": "", "id": "10", "name": "tenant-number-ten", "state": "ACTIVE"} +1 true system {"droppedName": "", "id": "1", "name": "system", "state": "ACTIVE", "tenantReplicationJobId": "0"} +2 true tenant-number-eleven {"droppedName": "", "id": "2", "name": "tenant-number-eleven", "state": "ACTIVE", "tenantReplicationJobId": "0"} +5 false NULL {"droppedName": "my-new-tenant-name", "id": "5", "name": "", "state": "DROP", "tenantReplicationJobId": "0"} +10 true tenant-number-ten {"droppedName": "", "id": "10", "name": "tenant-number-ten", "state": "ACTIVE", "tenantReplicationJobId": "0"} # Try to recreate an existing tenant. @@ -199,9 +199,9 @@ FROM system.tenants ORDER BY id ---- id active crdb_internal.pb_to_json -1 true {"droppedName": "", "id": "1", "name": "system", "state": "ACTIVE"} -2 true {"droppedName": "", "id": "2", "name": "tenant-number-eleven", "state": "ACTIVE"} -10 true {"droppedName": "", "id": "10", "name": "tenant-number-ten", "state": "ACTIVE"} +1 true {"droppedName": "", "id": "1", "name": "system", "state": "ACTIVE", "tenantReplicationJobId": "0"} +2 true {"droppedName": "", "id": "2", "name": "tenant-number-eleven", "state": "ACTIVE", "tenantReplicationJobId": "0"} +10 true {"droppedName": "", "id": "10", "name": "tenant-number-ten", "state": "ACTIVE", "tenantReplicationJobId": "0"} query error tenant resource limits require a CCL binary SELECT crdb_internal.update_tenant_resource_limits(10, 1000, 100, 0, now(), 0) diff --git a/pkg/sql/tenant.go b/pkg/sql/tenant.go index 940c0ad33fef..ad0fd0c44769 100644 --- a/pkg/sql/tenant.go +++ b/pkg/sql/tenant.go @@ -246,10 +246,16 @@ func GetTenantRecordByID( return info, nil } -// updateTenantRecord updates a tenant in system.tenants. -func updateTenantRecord( +// UpdateTenantRecord updates a tenant in system.tenants. +// +// Caller is expected to check the user's permission. +func UpdateTenantRecord( ctx context.Context, execCfg *ExecutorConfig, txn *kv.Txn, info *descpb.TenantInfo, ) error { + if err := validateTenantInfo(info); err != nil { + return err + } + tenID := info.ID active := info.State == descpb.TenantInfo_ACTIVE infoBytes, err := protoutil.Marshal(info) @@ -269,6 +275,16 @@ func updateTenantRecord( return nil } +func validateTenantInfo(info *descpb.TenantInfo) error { + if info.TenantReplicationJobID != 0 && info.State == descpb.TenantInfo_ACTIVE { + return errors.Newf("tenant in state %v with replication job ID %d", info.State, info.TenantReplicationJobID) + } + if info.DroppedName != "" && info.State != descpb.TenantInfo_DROP { + return errors.Newf("tenant in state %v with dropped name %q", info.State, info.DroppedName) + } + return nil +} + // getAvailableTenantID returns the first available ID that can be assigned to // the created tenant. Note, this ID could have previously belonged to another // tenant that has since been dropped and gc'ed. @@ -469,7 +485,7 @@ func ActivateTenant(ctx context.Context, execCfg *ExecutorConfig, txn *kv.Txn, t // Mark the tenant as active. info.State = descpb.TenantInfo_ACTIVE - if err := updateTenantRecord(ctx, execCfg, txn, info); err != nil { + if err := UpdateTenantRecord(ctx, execCfg, txn, info); err != nil { return errors.Wrap(err, "activating tenant") } @@ -501,7 +517,7 @@ func clearTenant(ctx context.Context, execCfg *ExecutorConfig, info *descpb.Tena // DestroyTenant implements the tree.TenantOperator interface. func (p *planner) DestroyTenant( - ctx context.Context, tenantName roachpb.TenantName, synchronous bool, + ctx context.Context, tenantName roachpb.TenantName, synchronousImmediateDrop bool, ) error { if err := p.validateDestroyTenant(ctx); err != nil { return err @@ -512,11 +528,13 @@ func (p *planner) DestroyTenant( return errors.Wrap(err, "destroying tenant") } - return destroyTenantInternal(ctx, p.txn, p.execCfg, &p.extendedEvalCtx, p.User(), info, synchronous) + return destroyTenantInternal(ctx, p.txn, p.execCfg, &p.extendedEvalCtx, p.User(), info, synchronousImmediateDrop) } // DestroyTenantByID implements the tree.TenantOperator interface. -func (p *planner) DestroyTenantByID(ctx context.Context, tenID uint64, synchronous bool) error { +func (p *planner) DestroyTenantByID( + ctx context.Context, tenID uint64, synchronousImmediateDrop bool, +) error { if err := p.validateDestroyTenant(ctx); err != nil { return err } @@ -525,7 +543,7 @@ func (p *planner) DestroyTenantByID(ctx context.Context, tenID uint64, synchrono if err != nil { return errors.Wrap(err, "destroying tenant") } - return destroyTenantInternal(ctx, p.txn, p.execCfg, &p.extendedEvalCtx, p.User(), info, synchronous) + return destroyTenantInternal(ctx, p.txn, p.execCfg, &p.extendedEvalCtx, p.User(), info, synchronousImmediateDrop) } func (p *planner) validateDestroyTenant(ctx context.Context) error { @@ -543,7 +561,7 @@ func destroyTenantInternal( extendedEvalCtx *extendedEvalContext, user username.SQLUsername, info *descpb.TenantInfo, - synchronous bool, + synchronousImmediateDrop bool, ) error { const op = "destroy" tenID := info.ID @@ -557,23 +575,28 @@ func destroyTenantInternal( // Mark the tenant as dropping. // - // TODO(ssd): Once available, we should cancel any running - // replication job on this tenant record. - // + // Cancel any running replication job on this tenant record. + // The GCJob will wait for this job to enter a terminal state. + if info.TenantReplicationJobID != 0 { + if err := execCfg.JobRegistry.CancelRequested(ctx, txn, info.TenantReplicationJobID); err != nil { + return errors.Wrapf(err, "canceling tenant replication job %d", info.TenantReplicationJobID) + } + } + // TODO(ssd): We may want to implement a job that waits out // any running sql pods before enqueing the GC job. info.State = descpb.TenantInfo_DROP info.DroppedName = info.Name info.Name = "" - if err := updateTenantRecord(ctx, execCfg, txn, info); err != nil { + if err := UpdateTenantRecord(ctx, execCfg, txn, info); err != nil { return errors.Wrap(err, "destroying tenant") } - jobID, err := gcTenantJob(ctx, execCfg, txn, user, tenID, synchronous) + jobID, err := gcTenantJob(ctx, execCfg, txn, user, tenID, synchronousImmediateDrop) if err != nil { return errors.Wrap(err, "scheduling gc job") } - if synchronous { + if synchronousImmediateDrop { extendedEvalCtx.Jobs.add(jobID) } return nil @@ -661,7 +684,7 @@ func gcTenantJob( txn *kv.Txn, user username.SQLUsername, tenID uint64, - synchronous bool, + dropImmediately bool, ) (jobspb.JobID, error) { // Queue a GC job that will delete the tenant data and finally remove the // row from `system.tenants`. @@ -671,7 +694,7 @@ func gcTenantJob( DropTime: timeutil.Now().UnixNano(), } progress := jobspb.SchemaChangeGCProgress{} - if synchronous { + if dropImmediately { progress.Tenant = &jobspb.SchemaChangeGCProgress_TenantProgress{ Status: jobspb.SchemaChangeGCProgress_CLEARING, } @@ -759,7 +782,7 @@ func (p *planner) UpdateTenantResourceLimits( func TestingUpdateTenantRecord( ctx context.Context, execCfg *ExecutorConfig, txn *kv.Txn, info *descpb.TenantInfo, ) error { - return updateTenantRecord(ctx, execCfg, txn, info) + return UpdateTenantRecord(ctx, execCfg, txn, info) } // RenameTenant implements the tree.TenantOperator interface.