Skip to content

Commit

Permalink
Merge #92295
Browse files Browse the repository at this point in the history
92295: jobs: pass in memory job to pts manager api r=fqazi a=msbutler

This allows the in memory job in the restore resumer to stay up to date with pts modifications.

Informs #91148

Release note: None

Co-authored-by: Michael Butler <[email protected]>
  • Loading branch information
craig[bot] and msbutler committed Nov 24, 2022
2 parents 4ae94a5 + dd8eb95 commit 3cfd872
Show file tree
Hide file tree
Showing 6 changed files with 55 additions and 51 deletions.
65 changes: 26 additions & 39 deletions pkg/jobs/jobsprotectedts/jobs_protected_ts_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,17 +92,16 @@ func NewManager(

// TryToProtectBeforeGC adds a protected timestamp record for a historical
// transaction for a specific table, once a certain percentage of the GC TTL has
// elapsed. This method can be preferred in scenarios where the cost of installing
// a protected timestamp is more expensive relative to the typical length of an
// operation (for example multi region). The approach here is heuristic
// and can be considered a best-effort basis since the GC TTL could change or
// the caller may not invoke this early enough in the transaction. Returns a Cleaner
// function to cancel installation or remove the protected timestamp.
// elapsed. This method can be preferred in scenarios where the cost of
// installing a protected timestamp is more expensive relative to the typical
// length of an operation (for example multi region). The approach here is
// heuristic and can be considered a best-effort basis since the GC TTL could
// change or the caller may not invoke this early enough in the transaction.
// Returns a Cleaner function to cancel installation or remove the protected
// timestamp. Note, the function assumes the in-memory job is up to date with
// the persisted job record.
func (p *Manager) TryToProtectBeforeGC(
ctx context.Context,
jobID jobspb.JobID,
tableDesc catalog.TableDescriptor,
readAsOf hlc.Timestamp,
ctx context.Context, job *jobs.Job, tableDesc catalog.TableDescriptor, readAsOf hlc.Timestamp,
) Cleaner {
waitGrp := ctxgroup.WithContext(ctx)
protectedTSInstallCancel := make(chan struct{})
Expand All @@ -127,7 +126,8 @@ func (p *Manager) TryToProtectBeforeGC(

select {
case <-time.After(waitBeforeProtectedTS):
unprotectCallback, err = p.Protect(ctx, jobID, tableDesc, readAsOf)
target := ptpb.MakeSchemaObjectsTarget(descpb.IDs{tableDesc.GetID()})
unprotectCallback, err = p.Protect(ctx, job, target, readAsOf)
if err != nil {
return err
}
Expand All @@ -152,27 +152,20 @@ func (p *Manager) TryToProtectBeforeGC(
}
}

// Protect adds a protected timestamp record for a historical
// transaction for a specific table immediately in a synchronous
// manner. If an existing record is found, it will be updated with
// a new timestamp. Returns a Cleaner function to remove the protected timestamp,
// if one was installed.
// Protect adds a protected timestamp record for a historical transaction for a
// specific table immediately in a synchronous manner. If an existing record is
// found, it will be updated with a new timestamp. Returns a Cleaner function to
// remove the protected timestamp, if one was installed. Note, the function
// assumes the in-memory job is up to date with the persisted job record.
func (p *Manager) Protect(
ctx context.Context,
jobID jobspb.JobID,
tableDesc catalog.TableDescriptor,
readAsOf hlc.Timestamp,
ctx context.Context, job *jobs.Job, target *ptpb.Target, readAsOf hlc.Timestamp,
) (Cleaner, error) {
// If we are not running a historical query, nothing to do here.
if readAsOf.IsEmpty() {
return nil, nil
}
var protectedtsID *uuid.UUID
err := p.db.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
job, err := p.jr.LoadJobWithTxn(ctx, jobID, txn)
if err != nil {
return err
}
details := job.Details()
protectedtsID = getProtectedTSOnJob(details)
// Check if there is an existing protected timestamp ID on the job,
Expand All @@ -187,9 +180,8 @@ func (p *Manager) Protect(
md.Payload.Details = jobspb.WrapPayloadDetails(details)
ju.UpdatePayload(md.Payload)

target := ptpb.MakeSchemaObjectsTarget(descpb.IDs{tableDesc.GetID()})
rec := MakeRecord(*protectedtsID,
int64(jobID), readAsOf, nil, Jobs, target)
int64(job.ID()), readAsOf, nil, Jobs, target)
return p.protectedTSProvider.Protect(ctx, txn, rec)
})
}
Expand All @@ -200,31 +192,26 @@ func (p *Manager) Protect(
return nil, err
}
return func(ctx context.Context) error {
return p.Unprotect(ctx, jobID)
return p.Unprotect(ctx, job)
}, nil
}

// Unprotect based on a job ID, mainly for last resort cleanup.
// Note: This should only be used for job cleanup if is not currently,
// Unprotect the pts associated with the job, mainly for last resort cleanup.
// The function assumes the in-memory job is up to date with the persisted job
// record. Note: This should only be used for job cleanup if is not currently,
// executing.
func (p *Manager) Unprotect(ctx context.Context, jobID jobspb.JobID) error {
func (p *Manager) Unprotect(ctx context.Context, job *jobs.Job) error {
return p.db.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
job, err := p.jr.LoadJobWithTxn(ctx, jobID, txn)
if err != nil {
return err
}
// Fetch the protected timestamp UUID from the job, if one exists.
details := job.Details()
protectedtsID := getProtectedTSOnJob(details)
protectedtsID := getProtectedTSOnJob(job.Details())
if protectedtsID == nil {
return nil
}
// If we do find one then we need to clean up the protected timestamp,
// and remove it from the job.
return job.Update(ctx, txn, func(txn *kv.Txn, md jobs.JobMetadata, ju *jobs.JobUpdater) error {
details = md.Payload.UnwrapDetails()
details = setProtectedTSOnJob(details, nil)
md.Payload.Details = jobspb.WrapPayloadDetails(details)
updatedDetails := setProtectedTSOnJob(job.Details(), nil)
md.Payload.Details = jobspb.WrapPayloadDetails(updatedDetails)
ju.UpdatePayload(md.Payload)
return p.protectedTSProvider.Release(ctx, txn, *protectedtsID)
})
Expand Down
4 changes: 2 additions & 2 deletions pkg/sql/backfill.go
Original file line number Diff line number Diff line change
Expand Up @@ -1577,7 +1577,7 @@ func ValidateInvertedIndexes(

// Removes the protected timestamp, if one was added when this
// function returns.
protectedTSCleaner := protectedTSManager.TryToProtectBeforeGC(ctx, job.ID(), tableDesc, runHistoricalTxn.ReadAsOf())
protectedTSCleaner := protectedTSManager.TryToProtectBeforeGC(ctx, job, tableDesc, runHistoricalTxn.ReadAsOf())
defer func() {
if unprotectErr := protectedTSCleaner(ctx); unprotectErr != nil {
err = errors.CombineErrors(err, unprotectErr)
Expand Down Expand Up @@ -1783,7 +1783,7 @@ func ValidateForwardIndexes(

// Removes the protected timestamp, if one was added when this
// function returns.
protectedTSCleaner := protectedTSManager.TryToProtectBeforeGC(ctx, job.ID(), tableDesc, runHistoricalTxn.ReadAsOf())
protectedTSCleaner := protectedTSManager.TryToProtectBeforeGC(ctx, job, tableDesc, runHistoricalTxn.ReadAsOf())
defer func() {
if unprotectErr := protectedTSCleaner(ctx); unprotectErr != nil {
err = errors.CombineErrors(err, unprotectErr)
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/schema_changer.go
Original file line number Diff line number Diff line change
Expand Up @@ -813,7 +813,7 @@ func (sc *SchemaChanger) handlePermanentSchemaChangeError(
) error {
// Clean up any protected timestamps as a last resort, in case the job
// execution never did itself.
if err := sc.execCfg.ProtectedTimestampManager.Unprotect(ctx, sc.job.ID()); err != nil {
if err := sc.execCfg.ProtectedTimestampManager.Unprotect(ctx, sc.job); err != nil {
log.Warningf(ctx, "unexpected error cleaning up protected timestamp %v", err)
}
// Ensure that this is a table descriptor and that the mutation is first in
Expand Down
1 change: 1 addition & 0 deletions pkg/sql/schemachanger/scexec/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ go_library(
"//pkg/jobs/jobspb",
"//pkg/jobs/jobsprotectedts",
"//pkg/keys",
"//pkg/kv/kvserver/protectedts/ptpb",
"//pkg/roachpb",
"//pkg/security/username",
"//pkg/settings/cluster",
Expand Down
32 changes: 24 additions & 8 deletions pkg/sql/schemachanger/scexec/dependencies.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/jobs/jobsprotectedts"
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/protectedts/ptpb"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/security/username"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
Expand Down Expand Up @@ -401,15 +402,30 @@ type StatsRefresher interface {
// the GC interval is encountered.
type ProtectedTimestampManager interface {
// TryToProtectBeforeGC adds a protected timestamp record for a historical
// transaction for a specific table, once a certain percentage of the GC time has
// elapsed. This is done on a best effort bases using a timer relative to
// the GC TTL, and should be done fairy early in the transaction.
// transaction for a specific table, once a certain percentage of the GC time
// has elapsed. This is done on a best effort bases using a timer relative to
// the GC TTL, and should be done fairy early in the transaction. Note, the
// function assumes the in-memory job is up to date with the persisted job
// record.
TryToProtectBeforeGC(
ctx context.Context, jobID jobspb.JobID, tableDesc catalog.TableDescriptor, readAsOf hlc.Timestamp,
ctx context.Context, job *jobs.Job, tableDesc catalog.TableDescriptor, readAsOf hlc.Timestamp,
) jobsprotectedts.Cleaner

// Unprotect unprotects just based on a job ID, mainly for last resort cleanup.
// Note: This should only be used for job cleanup if its not currently,
// executing.
Unprotect(ctx context.Context, jobID jobspb.JobID) error
// Protect adds a protected timestamp record for a historical transaction for
// a specific target immediately. If an existing record is found, it will be
// updated with a new timestamp. Returns a Cleaner function to remove the
// protected timestamp, if one was installed. Note, the function assumes the
// in-memory job is up to date with the persisted job record.
Protect(
ctx context.Context,
job *jobs.Job,
target *ptpb.Target,
readAsOf hlc.Timestamp,
) (jobsprotectedts.Cleaner, error)

// Unprotect unprotects the spans associated with the job, mainly for last
// resort cleanup. The function assumes the in-memory job is up to date with
// the persisted job record. Note: This should only be used for job cleanup if
// its not currently, executing.
Unprotect(ctx context.Context, job *jobs.Job) error
}
2 changes: 1 addition & 1 deletion pkg/sql/schemachanger/scjob/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ func (n *newSchemaChangeResumer) OnFailOrCancel(
n.rollback = true
// Clean up any protected timestamps as a last resort, in case the job
// execution never did itself.
if err := execCfg.ProtectedTimestampManager.Unprotect(ctx, n.job.ID()); err != nil {
if err := execCfg.ProtectedTimestampManager.Unprotect(ctx, n.job); err != nil {
log.Warningf(ctx, "unable to revert protected timestamp %v", err)
}
return n.run(ctx, execCtx)
Expand Down

0 comments on commit 3cfd872

Please sign in to comment.