Skip to content

Commit

Permalink
sql,streamingccl: DROP TENANT cancels running ingestion job
Browse files Browse the repository at this point in the history
This adds a new field to TenantInfo that stores the job ID of the
replication ingestion job related to that tenant.

On DROP any such job is canceled.

The GC job waits for the job to enter a terminal state before issuing
any cleanup commands. This waiting my not be necessary in the future
if the stream ingestion job installs a protected timestamp on the
tenant that is only released in OnFailOrCancel.

NB: We still wait out the GC TTL even in the case that the tenant has
never been ACTIVE.

Release note: None
  • Loading branch information
stevendanna committed Nov 21, 2022
1 parent ae919e3 commit 0015d5d
Show file tree
Hide file tree
Showing 13 changed files with 243 additions and 83 deletions.
36 changes: 18 additions & 18 deletions pkg/ccl/backupccl/backup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6822,14 +6822,14 @@ func TestBackupRestoreTenant(t *testing.T) {
restoreDB := sqlutils.MakeSQLRunner(restoreTC.Conns[0])

restoreDB.CheckQueryResults(t, `select id, active, name, crdb_internal.pb_to_json('cockroach.sql.sqlbase.TenantInfo', info, true) from system.tenants`, [][]string{
{`1`, `true`, `system`, `{"droppedName": "", "id": "1", "name": "system", "state": "ACTIVE"}`},
{`1`, `true`, `system`, `{"droppedName": "", "id": "1", "name": "system", "state": "ACTIVE", "tenantReplicationJobId": "0"}`},
})
restoreDB.Exec(t, `RESTORE TENANT 10 FROM 'nodelocal://1/t10'`)
restoreDB.CheckQueryResults(t,
`SELECT id, active, name, crdb_internal.pb_to_json('cockroach.sql.sqlbase.TenantInfo', info, true) FROM system.tenants`,
[][]string{
{`1`, `true`, `system`, `{"droppedName": "", "id": "1", "name": "system", "state": "ACTIVE"}`},
{`10`, `true`, `NULL`, `{"droppedName": "", "id": "10", "name": "", "state": "ACTIVE"}`},
{`1`, `true`, `system`, `{"droppedName": "", "id": "1", "name": "system", "state": "ACTIVE", "tenantReplicationJobId": "0"}`},
{`10`, `true`, `NULL`, `{"droppedName": "", "id": "10", "name": "", "state": "ACTIVE", "tenantReplicationJobId": "0"}`},
},
)
restoreDB.CheckQueryResults(t,
Expand Down Expand Up @@ -6858,8 +6858,8 @@ func TestBackupRestoreTenant(t *testing.T) {
restoreDB.CheckQueryResults(t,
`select id, active, name, crdb_internal.pb_to_json('cockroach.sql.sqlbase.TenantInfo', info, true) from system.tenants`,
[][]string{
{`1`, `true`, `system`, `{"droppedName": "", "id": "1", "name": "system", "state": "ACTIVE"}`},
{`10`, `false`, `NULL`, `{"droppedName": "", "id": "10", "name": "", "state": "DROP"}`},
{`1`, `true`, `system`, `{"droppedName": "", "id": "1", "name": "system", "state": "ACTIVE", "tenantReplicationJobId": "0"}`},
{`10`, `false`, `NULL`, `{"droppedName": "", "id": "10", "name": "", "state": "DROP", "tenantReplicationJobId": "0"}`},
},
)

Expand All @@ -6883,8 +6883,8 @@ func TestBackupRestoreTenant(t *testing.T) {
restoreDB.CheckQueryResults(t,
`select id, active, name, crdb_internal.pb_to_json('cockroach.sql.sqlbase.TenantInfo', info, true) from system.tenants`,
[][]string{
{`1`, `true`, `system`, `{"droppedName": "", "id": "1", "name": "system", "state": "ACTIVE"}`},
{`10`, `true`, `NULL`, `{"droppedName": "", "id": "10", "name": "", "state": "ACTIVE"}`},
{`1`, `true`, `system`, `{"droppedName": "", "id": "1", "name": "system", "state": "ACTIVE", "tenantReplicationJobId": "0"}`},
{`10`, `true`, `NULL`, `{"droppedName": "", "id": "10", "name": "", "state": "ACTIVE", "tenantReplicationJobId": "0"}`},
},
)

Expand All @@ -6908,14 +6908,14 @@ func TestBackupRestoreTenant(t *testing.T) {
restoreDB.CheckQueryResults(t,
`select id, active, name, crdb_internal.pb_to_json('cockroach.sql.sqlbase.TenantInfo', info, true) from system.tenants`,
[][]string{
{`1`, `true`, `system`, `{"droppedName": "", "id": "1", "name": "system", "state": "ACTIVE"}`},
{`1`, `true`, `system`, `{"droppedName": "", "id": "1", "name": "system", "state": "ACTIVE", "tenantReplicationJobId": "0"}`},
})
restoreDB.Exec(t, `RESTORE TENANT 10 FROM 'nodelocal://1/t10'`)
restoreDB.CheckQueryResults(t,
`select id, active, name, crdb_internal.pb_to_json('cockroach.sql.sqlbase.TenantInfo', info, true) from system.tenants`,
[][]string{
{`1`, `true`, `system`, `{"droppedName": "", "id": "1", "name": "system", "state": "ACTIVE"}`},
{`10`, `true`, `NULL`, `{"droppedName": "", "id": "10", "name": "", "state": "ACTIVE"}`},
{`1`, `true`, `system`, `{"droppedName": "", "id": "1", "name": "system", "state": "ACTIVE", "tenantReplicationJobId": "0"}`},
{`10`, `true`, `NULL`, `{"droppedName": "", "id": "10", "name": "", "state": "ACTIVE", "tenantReplicationJobId": "0"}`},
},
)
})
Expand All @@ -6937,14 +6937,14 @@ func TestBackupRestoreTenant(t *testing.T) {
restoreDB.CheckQueryResults(t,
`select id, active, name, crdb_internal.pb_to_json('cockroach.sql.sqlbase.TenantInfo', info, true) from system.tenants`,
[][]string{
{`1`, `true`, `system`, `{"droppedName": "", "id": "1", "name": "system", "state": "ACTIVE"}`},
{`1`, `true`, `system`, `{"droppedName": "", "id": "1", "name": "system", "state": "ACTIVE", "tenantReplicationJobId": "0"}`},
})
restoreDB.Exec(t, `RESTORE TENANT 10 FROM 'nodelocal://1/clusterwide'`)
restoreDB.CheckQueryResults(t,
`select id, active, name, crdb_internal.pb_to_json('cockroach.sql.sqlbase.TenantInfo', info, true) from system.tenants`,
[][]string{
{`1`, `true`, `system`, `{"droppedName": "", "id": "1", "name": "system", "state": "ACTIVE"}`},
{`10`, `true`, `NULL`, `{"droppedName": "", "id": "10", "name": "", "state": "ACTIVE"}`},
{`1`, `true`, `system`, `{"droppedName": "", "id": "1", "name": "system", "state": "ACTIVE", "tenantReplicationJobId": "0"}`},
{`10`, `true`, `NULL`, `{"droppedName": "", "id": "10", "name": "", "state": "ACTIVE", "tenantReplicationJobId": "0"}`},
},
)

Expand Down Expand Up @@ -6977,16 +6977,16 @@ func TestBackupRestoreTenant(t *testing.T) {
restoreDB.CheckQueryResults(t,
`select id, active, name, crdb_internal.pb_to_json('cockroach.sql.sqlbase.TenantInfo', info, true) from system.tenants`,
[][]string{
{`1`, `true`, `system`, `{"droppedName": "", "id": "1", "name": "system", "state": "ACTIVE"}`},
{`1`, `true`, `system`, `{"droppedName": "", "id": "1", "name": "system", "state": "ACTIVE", "tenantReplicationJobId": "0"}`},
})
restoreDB.Exec(t, `RESTORE FROM 'nodelocal://1/clusterwide'`)
restoreDB.CheckQueryResults(t,
`select id, active, name, crdb_internal.pb_to_json('cockroach.sql.sqlbase.TenantInfo', info, true) from system.tenants`,
[][]string{
{`1`, `true`, `system`, `{"droppedName": "", "id": "1", "name": "system", "state": "ACTIVE"}`},
{`10`, `true`, `NULL`, `{"droppedName": "", "id": "10", "name": "", "state": "ACTIVE"}`},
{`11`, `true`, `NULL`, `{"droppedName": "", "id": "11", "name": "", "state": "ACTIVE"}`},
{`20`, `true`, `NULL`, `{"droppedName": "", "id": "20", "name": "", "state": "ACTIVE"}`},
{`1`, `true`, `system`, `{"droppedName": "", "id": "1", "name": "system", "state": "ACTIVE", "tenantReplicationJobId": "0"}`},
{`10`, `true`, `NULL`, `{"droppedName": "", "id": "10", "name": "", "state": "ACTIVE", "tenantReplicationJobId": "0"}`},
{`11`, `true`, `NULL`, `{"droppedName": "", "id": "11", "name": "", "state": "ACTIVE", "tenantReplicationJobId": "0"}`},
{`20`, `true`, `NULL`, `{"droppedName": "", "id": "20", "name": "", "state": "ACTIVE", "tenantReplicationJobId": "0"}`},
},
)

Expand Down
20 changes: 10 additions & 10 deletions pkg/ccl/backupccl/testdata/backup-restore/restore-tenants
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,9 @@ SELECT crdb_internal.destroy_tenant(5);
query-sql
SELECT id,active,crdb_internal.pb_to_json('cockroach.sql.sqlbase.TenantInfo', info, true) FROM system.tenants;
----
1 true {"droppedName": "", "id": "1", "name": "system", "state": "ACTIVE"}
5 false {"droppedName": "", "id": "5", "name": "", "state": "DROP"}
6 true {"droppedName": "", "id": "6", "name": "", "state": "ACTIVE"}
1 true {"droppedName": "", "id": "1", "name": "system", "state": "ACTIVE", "tenantReplicationJobId": "0"}
5 false {"droppedName": "", "id": "5", "name": "", "state": "DROP", "tenantReplicationJobId": "0"}
6 true {"droppedName": "", "id": "6", "name": "", "state": "ACTIVE", "tenantReplicationJobId": "0"}

exec-sql
BACKUP INTO 'nodelocal://1/cluster'
Expand All @@ -49,9 +49,9 @@ RESTORE FROM LATEST IN 'nodelocal://1/cluster'
query-sql
SELECT id,active,crdb_internal.pb_to_json('cockroach.sql.sqlbase.TenantInfo', info, true) FROM system.tenants;
----
1 true {"droppedName": "", "id": "1", "name": "system", "state": "ACTIVE"}
5 false {"droppedName": "", "id": "5", "name": "", "state": "DROP"}
6 true {"droppedName": "", "id": "6", "name": "", "state": "ACTIVE"}
1 true {"droppedName": "", "id": "1", "name": "system", "state": "ACTIVE", "tenantReplicationJobId": "0"}
5 false {"droppedName": "", "id": "5", "name": "", "state": "DROP", "tenantReplicationJobId": "0"}
6 true {"droppedName": "", "id": "6", "name": "", "state": "ACTIVE", "tenantReplicationJobId": "0"}

exec-sql
RESTORE TENANT 6 FROM LATEST IN 'nodelocal://1/tenant6' WITH tenant = '7';
Expand All @@ -60,7 +60,7 @@ RESTORE TENANT 6 FROM LATEST IN 'nodelocal://1/tenant6' WITH tenant = '7';
query-sql
SELECT id,active,crdb_internal.pb_to_json('cockroach.sql.sqlbase.TenantInfo', info, true) FROM system.tenants;
----
1 true {"droppedName": "", "id": "1", "name": "system", "state": "ACTIVE"}
5 false {"droppedName": "", "id": "5", "name": "", "state": "DROP"}
6 true {"droppedName": "", "id": "6", "name": "", "state": "ACTIVE"}
7 true {"droppedName": "", "id": "7", "name": "", "state": "ACTIVE"}
1 true {"droppedName": "", "id": "1", "name": "system", "state": "ACTIVE", "tenantReplicationJobId": "0"}
5 false {"droppedName": "", "id": "5", "name": "", "state": "DROP", "tenantReplicationJobId": "0"}
6 true {"droppedName": "", "id": "6", "name": "", "state": "ACTIVE", "tenantReplicationJobId": "0"}
7 true {"droppedName": "", "id": "7", "name": "", "state": "ACTIVE", "tenantReplicationJobId": "0"}
29 changes: 24 additions & 5 deletions pkg/ccl/streamingccl/streamingest/stream_ingestion_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/sql"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb"
"github.com/cockroachdb/cockroach/pkg/sql/sem/eval"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/log"
Expand Down Expand Up @@ -465,7 +466,14 @@ func activateTenant(ctx context.Context, execCtx interface{}, newTenantID roachp
p := execCtx.(sql.JobExecContext)
execCfg := p.ExecCfg()
return execCfg.DB.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
return sql.ActivateTenant(ctx, execCfg, txn, newTenantID.ToUint64())
info, err := sql.GetTenantRecordByID(ctx, execCfg, txn, newTenantID)
if err != nil {
return err
}

info.State = descpb.TenantInfo_ACTIVE
info.TenantReplicationJobID = 0
return sql.UpdateTenantRecord(ctx, execCfg, txn, info)
})
}

Expand Down Expand Up @@ -495,15 +503,26 @@ func (s *streamIngestionResumer) cancelProducerJob(
// leftover in the keyspace if a ClearRange were to be issued here. In general
// the tenant keyspace of a failed/canceled ingestion job should be treated as
// corrupted, and the tenant should be dropped before resuming the ingestion.
// TODO(adityamaru): Add ClearRange logic once we have introduced
// synchronization between the flow tearing down and the job transitioning to a
// failed/canceled state.
func (s *streamIngestionResumer) OnFailOrCancel(ctx context.Context, _ interface{}, _ error) error {
func (s *streamIngestionResumer) OnFailOrCancel(
ctx context.Context, execCtx interface{}, _ error,
) error {
// Cancel the producer job on best effort. The source job's protected timestamp is no
// longer needed as this ingestion job is in 'reverting' status and we won't resume
// ingestion anymore.
jobExecCtx := execCtx.(sql.JobExecContext)
details := s.job.Details().(jobspb.StreamIngestionDetails)
s.cancelProducerJob(ctx, details)

tenInfo, err := sql.GetTenantRecordByID(ctx, jobExecCtx.ExecCfg(), jobExecCtx.Txn(), details.DestinationTenantID)
if err != nil {
return errors.Wrap(err, "fetch tenant info")
}

tenInfo.TenantReplicationJobID = 0
if err := sql.UpdateTenantRecord(ctx, jobExecCtx.ExecCfg(), jobExecCtx.Txn(), tenInfo); err != nil {
return errors.Wrap(err, "update tenant record")
}

return nil
}

Expand Down
14 changes: 11 additions & 3 deletions pkg/ccl/streamingccl/streamingest/stream_ingestion_planning.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,11 @@ func ingestionPlanHook(
)
}

if !p.ExecCfg().Codec.ForSystemTenant() {
return nil, nil, nil, false, pgerror.Newf(pgcode.InsufficientPrivilege,
"only the system tenant can create other tenants")
}

exprEval := p.ExprEvaluator("INGESTION")

from, err := exprEval.String(ctx, ingestionStmt.ReplicationSourceAddress)
Expand Down Expand Up @@ -127,12 +132,15 @@ func ingestionPlanHook(
if _, err := sql.GetTenantRecordByName(ctx, p.ExecCfg(), p.Txn(), roachpb.TenantName(destinationTenant)); err == nil {
return errors.Newf("tenant with name %q already exists", destinationTenant)
}

jobID := p.ExecCfg().JobRegistry.MakeJobID()
tenantInfo := &descpb.TenantInfoWithUsage{
TenantInfo: descpb.TenantInfo{
// We leave the ID field unset so that the tenant is assigned the next
// available tenant ID.
State: descpb.TenantInfo_ADD,
Name: roachpb.TenantName(destinationTenant),
State: descpb.TenantInfo_ADD,
Name: roachpb.TenantName(destinationTenant),
TenantReplicationJobID: jobID,
},
}

Expand Down Expand Up @@ -182,12 +190,12 @@ func ingestionPlanHook(
Details: streamIngestionDetails,
}

jobID := p.ExecCfg().JobRegistry.MakeJobID()
sj, err := p.ExecCfg().JobRegistry.CreateAdoptableJobWithTxn(ctx, jr,
jobID, p.Txn())
if err != nil {
return err
}

resultsCh <- tree.Datums{tree.NewDInt(tree.DInt(sj.ID())), tree.NewDInt(tree.DInt(streamID))}
return nil
}
Expand Down
56 changes: 56 additions & 0 deletions pkg/ccl/streamingccl/streamingest/stream_replication_e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -756,6 +756,62 @@ func TestTenantStreamingCancelIngestion(t *testing.T) {
})
}

func TestTenantStreamingDropTenantCancelsStream(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

ctx := context.Background()
args := defaultTenantStreamingClustersArgs

testCancelIngestion := func(t *testing.T, cancelAfterPaused bool) {
c, cleanup := createTenantStreamingClusters(ctx, t, args)
defer cleanup()
producerJobID, ingestionJobID := c.startStreamReplication()

jobutils.WaitForJobToRun(c.t, c.srcSysSQL, jobspb.JobID(producerJobID))
jobutils.WaitForJobToRun(c.t, c.destSysSQL, jobspb.JobID(ingestionJobID))

c.waitUntilHighWatermark(c.srcCluster.Server(0).Clock().Now(), jobspb.JobID(ingestionJobID))
if cancelAfterPaused {
c.destSysSQL.Exec(t, fmt.Sprintf("PAUSE JOB %d", ingestionJobID))
jobutils.WaitForJobToPause(c.t, c.destSysSQL, jobspb.JobID(ingestionJobID))
}

// Set GC TTL low, so that the GC job completes quickly in the test.
c.destSysSQL.Exec(t, "ALTER RANGE tenants CONFIGURE ZONE USING gc.ttlseconds = 1;")
c.destSysSQL.Exec(t, fmt.Sprintf("DROP TENANT %s", c.args.destTenantName))
jobutils.WaitForJobToCancel(c.t, c.destSysSQL, jobspb.JobID(ingestionJobID))
jobutils.WaitForJobToCancel(c.t, c.srcSysSQL, jobspb.JobID(producerJobID))

// Check if the producer job has released protected timestamp.
stats := streamIngestionStats(t, c.destSysSQL, ingestionJobID)
require.NotNil(t, stats.ProducerStatus)
require.Nil(t, stats.ProducerStatus.ProtectedTimestamp)

// Wait for the GC job to finish
c.destSysSQL.Exec(t, "SHOW JOBS WHEN COMPLETE SELECT job_id FROM [SHOW JOBS] WHERE job_type = 'SCHEMA CHANGE GC'")

// Check if dest tenant key range is cleaned up.
destTenantPrefix := keys.MakeTenantPrefix(args.destTenantID)
rows, err := c.destCluster.Server(0).DB().
Scan(ctx, destTenantPrefix, destTenantPrefix.PrefixEnd(), 10)
require.NoError(t, err)
require.Empty(t, rows)

c.destSysSQL.CheckQueryResults(t,
fmt.Sprintf("SELECT count(*) FROM system.tenants WHERE id = %s", args.destTenantID),
[][]string{{"0"}})
}

t.Run("drop-tenant-after-paused", func(t *testing.T) {
testCancelIngestion(t, true)
})

t.Run("drop-tenant-while-running", func(t *testing.T) {
testCancelIngestion(t, false)
})
}

func TestTenantStreamingUnavailableStreamAddress(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
Expand Down
11 changes: 11 additions & 0 deletions pkg/sql/catalog/descpb/tenant.proto
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,20 @@ message TenantInfo {
optional string name = 3 [
(gogoproto.nullable) = false,
(gogoproto.customtype) = "github.com/cockroachdb/cockroach/pkg/roachpb.TenantName"];

// DroppedName is the name the tenant had before DROP TENANT was
// run on the tenant. It should be empty for active or adding
// tenants.
optional string dropped_name = 4 [
(gogoproto.nullable) = false,
(gogoproto.customtype) = "github.com/cockroachdb/cockroach/pkg/roachpb.TenantName"];

// TenantReplicationJobID is set if this tenant is the target tenant
// of a running tenant replication job.
optional int64 tenant_replication_job_id = 5 [
(gogoproto.nullable) = false,
(gogoproto.customname) = "TenantReplicationJobID",
(gogoproto.casttype) = "github.com/cockroachdb/cockroach/pkg/sql/catalog/catpb.JobID"];
}

// TenantInfoAndUsage contains the information for a tenant in a multi-tenant
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/drop_tenant.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ func (p *planner) DropTenant(_ context.Context, n *tree.DropTenant) (planNode, e
}

func (n *dropTenantNode) startExec(params runParams) error {
err := params.p.DestroyTenant(params.ctx, n.name, false)
err := params.p.DestroyTenant(params.ctx, n.name, false /* synchronous */)
if err != nil {
if pgerror.GetPGCode(err) == pgcode.UndefinedObject && n.ifExists {
return nil
Expand Down
45 changes: 44 additions & 1 deletion pkg/sql/gcjob/refresh_statuses.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (

"github.com/cockroachdb/cockroach/pkg/config"
"github.com/cockroachdb/cockroach/pkg/config/zonepb"
"github.com/cockroachdb/cockroach/pkg/jobs"
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/kv"
Expand All @@ -28,6 +29,8 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/catalog"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descs"
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgcode"
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
Expand Down Expand Up @@ -457,8 +460,21 @@ func refreshTenant(
return true, time.Time{}, nil
}

// Read the tenant's GC TTL to check if the tenant's data has expired.
tenID := details.Tenant.ID
// TODO(ssd): Once
// https://github.com/cockroachdb/cockroach/issues/92093 is
// done, we should be able to simply rely on the protected
// timestamp for the replication job.
jobActive, err := tenantHasActiveReplicationJob(ctx, execCfg, tenID)
if err != nil {
return false, time.Time{}, err
}
if jobActive {
log.Infof(ctx, "tenant %d has active tenant replication job, waiting for it to stop before running GC", tenID)
return false, timeutil.Now().Add(MaxSQLGCInterval), nil
}

// Read the tenant's GC TTL to check if the tenant's data has expired.
cfg := execCfg.SystemConfig.GetSystemConfig()
tenantTTLSeconds := execCfg.DefaultZoneConfig.GC.TTLSeconds
zoneCfg, err := cfg.GetZoneConfigForObject(keys.SystemSQLCodec, keys.TenantsRangesID)
Expand Down Expand Up @@ -491,3 +507,30 @@ func refreshTenant(
}
return false, deadlineUnix, nil
}

func tenantHasActiveReplicationJob(
ctx context.Context, execCfg *sql.ExecutorConfig, tenID uint64,
) (bool, error) {
info, err := sql.GetTenantRecordByID(ctx, execCfg, nil /* txn */, roachpb.MustMakeTenantID(tenID))
if err != nil {
if pgerror.GetPGCode(err) == pgcode.UndefinedObject {
log.Errorf(ctx, "tenant id %d not found while attempting to GC", tenID)
return false, nil
} else {
return false, errors.Wrapf(err, "fetching tenant %d", tenID)
}
}
if jobID := info.TenantReplicationJobID; jobID != 0 {
j, err := execCfg.JobRegistry.LoadJob(ctx, jobID)
if err != nil {
if errors.Is(err, &jobs.JobNotFoundError{}) {
log.Infof(ctx, "tenant replication job %d not found", jobID)
return false, nil
} else {
return false, err
}
}
return !j.Status().Terminal(), nil
}
return false, err
}
Loading

0 comments on commit 0015d5d

Please sign in to comment.