Skip to content

Commit

Permalink
jobs, *: use separate interface for job exec
Browse files Browse the repository at this point in the history
Previously we passed PlanHookState to job execution methods like Resume or OnFailOrCancel.
PlanHookState is a window into planner designed to surface its API to plannning code
that exists outside the SQL package and is injected via hooks. Planning code expects to use
certain methods that assume they are running during statement execution, and in particular
have access to the transaction in which that statement is being executed, for example to
resolve roles or privilages.

Job Execution on the other hand is not done during statement execution and critically does
not have a txn set. Previously the PlanHookState argument to job execution was backed by a
planner as its concrete type that had a nil txn. However the API being shared with that used
by planning code made it easy to accidentally call methods that assumed they were only ever
called during statement planning during job execution, and thus violate the assumptions that
txn could be nil or something, leading to bugs.

This changes the job execution methods to expect to be passed a new type, JobExecContext,
that has only the subset of methods that we expect to use during execution and, more importantly,
does not have the methods exposed that should only be called during statement evaluation,
and would be callable on PlanHookState.

For now the implementation of JobExecContext is still backed by a planner as fields like
ExtendedEvalContext are fairly closely tied to sql.planner. Later work might try to restrict
this API as well.

Release note: none.
  • Loading branch information
dt committed Oct 28, 2020
1 parent 8652b7a commit 54dbbe5
Show file tree
Hide file tree
Showing 26 changed files with 206 additions and 142 deletions.
14 changes: 7 additions & 7 deletions pkg/ccl/backupccl/backup_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ func clusterNodeCount(gw gossip.OptionalGossip) (int, error) {
// file.
func backup(
ctx context.Context,
phs sql.PlanHookState,
execCtx sql.JobExecContext,
defaultURI string,
urisByLocalityKV map[string]string,
db *kv.DB,
Expand Down Expand Up @@ -286,7 +286,7 @@ func backup(

if err := distBackup(
ctx,
phs,
execCtx,
spans,
introducedSpans,
pkIDs,
Expand Down Expand Up @@ -403,10 +403,10 @@ type backupResumer struct {

// Resume is part of the jobs.Resumer interface.
func (b *backupResumer) Resume(
ctx context.Context, phs interface{}, resultsCh chan<- tree.Datums,
ctx context.Context, execCtx interface{}, resultsCh chan<- tree.Datums,
) error {
details := b.job.Details().(jobspb.BackupDetails)
p := phs.(sql.PlanHookState)
p := execCtx.(sql.JobExecContext)

// For all backups, partitioned or not, the main BACKUP manifest is stored at
// details.URI.
Expand Down Expand Up @@ -655,18 +655,18 @@ func (b *backupResumer) clearStats(ctx context.Context, DB *kv.DB) error {
}

// OnFailOrCancel is part of the jobs.Resumer interface.
func (b *backupResumer) OnFailOrCancel(ctx context.Context, phs interface{}) error {
func (b *backupResumer) OnFailOrCancel(ctx context.Context, execCtx interface{}) error {
defer b.maybeNotifyScheduledJobCompletion(
ctx,
jobs.StatusFailed,
phs.(sql.PlanHookState).ExecCfg(),
execCtx.(sql.JobExecContext).ExecCfg(),
)

telemetry.Count("backup.total.failed")
telemetry.CountBucketed("backup.duration-sec.failed",
int64(timeutil.Since(timeutil.FromUnixMicros(b.job.Payload().StartedMicros)).Seconds()))

p := phs.(sql.PlanHookState)
p := execCtx.(sql.JobExecContext)
cfg := p.ExecCfg()
if err := b.clearStats(ctx, p.ExecCfg().DB); err != nil {
log.Warningf(ctx, "unable to clear stats from job payload: %+v", err)
Expand Down
12 changes: 6 additions & 6 deletions pkg/ccl/backupccl/backup_processor_planning.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import (
// build up the BulkOpSummary.
func distBackup(
ctx context.Context,
phs sql.PlanHookState,
execCtx sql.JobExecContext,
spans roachpb.Spans,
introducedSpans roachpb.Spans,
pkIDs map[uint64]bool,
Expand All @@ -45,10 +45,10 @@ func distBackup(
ctx = logtags.AddTag(ctx, "backup-distsql", nil)
var noTxn *kv.Txn

dsp := phs.DistSQLPlanner()
evalCtx := phs.ExtendedEvalContext()
dsp := execCtx.DistSQLPlanner()
evalCtx := execCtx.ExtendedEvalContext()

planCtx, _, err := dsp.SetupAllNodesPlanning(ctx, evalCtx, phs.ExecCfg())
planCtx, _, err := dsp.SetupAllNodesPlanning(ctx, evalCtx, execCtx.ExecCfg())
if err != nil {
return err
}
Expand All @@ -64,8 +64,8 @@ func distBackup(
mvccFilter,
encryption,
startTime, endTime,
phs.User(),
phs.ExecCfg(),
execCtx.User(),
execCtx.ExecCfg(),
)
if err != nil {
return err
Expand Down
18 changes: 9 additions & 9 deletions pkg/ccl/backupccl/restore_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -480,7 +480,7 @@ func rewriteBackupSpanKey(kr *storageccl.KeyRewriter, key roachpb.Key) (roachpb.
// files.
func restore(
restoreCtx context.Context,
phs sql.PlanHookState,
execCtx sql.JobExecContext,
numClusterNodes int,
backupManifests []BackupManifest,
backupLocalityInfo []jobspb.RestoreDetails_BackupLocalityInfo,
Expand All @@ -491,7 +491,7 @@ func restore(
job *jobs.Job,
encryption *jobspb.BackupEncryptionOptions,
) (RowCount, error) {
user := phs.User()
user := execCtx.User()
// A note about contexts and spans in this method: the top-level context
// `restoreCtx` is used for orchestration logging. All operations that carry
// out work get their individual contexts.
Expand Down Expand Up @@ -620,7 +620,7 @@ func restore(
// TODO(pbardea): Improve logging in processors.
if err := distRestore(
restoreCtx,
phs,
execCtx,
importSpanChunks,
pkIDs,
encryption,
Expand Down Expand Up @@ -652,7 +652,7 @@ func restore(
// be broken down into two methods.
func loadBackupSQLDescs(
ctx context.Context,
p sql.PlanHookState,
p sql.JobExecContext,
details jobspb.RestoreDetails,
encryption *jobspb.BackupEncryptionOptions,
) ([]BackupManifest, BackupManifest, []catalog.Descriptor, error) {
Expand Down Expand Up @@ -832,7 +832,7 @@ func getTempSystemDBID(details jobspb.RestoreDetails) descpb.ID {
// createImportingDescriptors create the tables that we will restore into. It also
// fetches the information from the old tables that we need for the restore.
func createImportingDescriptors(
ctx context.Context, p sql.PlanHookState, sqlDescs []catalog.Descriptor, r *restoreResumer,
ctx context.Context, p sql.JobExecContext, sqlDescs []catalog.Descriptor, r *restoreResumer,
) (tables []catalog.TableDescriptor, oldTableIDs []descpb.ID, spans []roachpb.Span, err error) {
details := r.job.Details().(jobspb.RestoreDetails)

Expand Down Expand Up @@ -1094,10 +1094,10 @@ func createImportingDescriptors(

// Resume is part of the jobs.Resumer interface.
func (r *restoreResumer) Resume(
ctx context.Context, phs interface{}, resultsCh chan<- tree.Datums,
ctx context.Context, execCtx interface{}, resultsCh chan<- tree.Datums,
) error {
details := r.job.Details().(jobspb.RestoreDetails)
p := phs.(sql.PlanHookState)
p := execCtx.(sql.JobExecContext)
r.versionAtLeast20_2 = p.ExecCfg().Settings.Version.IsActive(
ctx, clusterversion.VersionLeasedDatabaseDescriptors)

Expand Down Expand Up @@ -1475,14 +1475,14 @@ func (r *restoreResumer) publishDescriptors(
// has been committed from a restore that has failed or been canceled. It does
// this by adding the table descriptors in DROP state, which causes the schema
// change stuff to delete the keys in the background.
func (r *restoreResumer) OnFailOrCancel(ctx context.Context, phs interface{}) error {
func (r *restoreResumer) OnFailOrCancel(ctx context.Context, execCtx interface{}) error {
telemetry.Count("restore.total.failed")
telemetry.CountBucketed("restore.duration-sec.failed",
int64(timeutil.Since(timeutil.FromUnixMicros(r.job.Payload().StartedMicros)).Seconds()))

details := r.job.Details().(jobspb.RestoreDetails)

execCfg := phs.(sql.PlanHookState).ExecCfg()
execCfg := execCtx.(sql.JobExecContext).ExecCfg()
return descs.Txn(ctx, execCfg.Settings, execCfg.LeaseManager, execCfg.InternalExecutor,
execCfg.DB, func(ctx context.Context, txn *kv.Txn, descsCol *descs.Collection) error {
for _, tenant := range details.Tenants {
Expand Down
12 changes: 6 additions & 6 deletions pkg/ccl/backupccl/restore_processor_planning.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ import (
// This method also closes the given progCh.
func distRestore(
ctx context.Context,
phs sql.PlanHookState,
execCtx sql.JobExecContext,
chunks [][]execinfrapb.RestoreSpanEntry,
pkIDs map[uint64]bool,
encryption *jobspb.BackupEncryptionOptions,
Expand All @@ -50,13 +50,13 @@ func distRestore(
defer close(progCh)
var noTxn *kv.Txn

dsp := phs.DistSQLPlanner()
evalCtx := phs.ExtendedEvalContext()
dsp := execCtx.DistSQLPlanner()
evalCtx := execCtx.ExtendedEvalContext()

if encryption != nil && encryption.Mode == jobspb.EncryptionMode_KMS {
kms, err := cloud.KMSFromURI(encryption.KMSInfo.Uri, &backupKMSEnv{
settings: phs.ExecCfg().Settings,
conf: &phs.ExecCfg().ExternalIODirConfig,
settings: execCtx.ExecCfg().Settings,
conf: &execCtx.ExecCfg().ExternalIODirConfig,
})
if err != nil {
return err
Expand All @@ -75,7 +75,7 @@ func distRestore(
fileEncryption = &roachpb.FileEncryptionOptions{Key: encryption.Key}
}

planCtx, _, err := dsp.SetupAllNodesPlanning(ctx, evalCtx, phs.ExecCfg())
planCtx, _, err := dsp.SetupAllNodesPlanning(ctx, evalCtx, execCtx.ExecCfg())
if err != nil {
return err
}
Expand Down
12 changes: 6 additions & 6 deletions pkg/ccl/changefeedccl/changefeed_dist.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ var changefeedResultTypes = []*types.T{
// progress of the changefeed's corresponding system job.
func distChangefeedFlow(
ctx context.Context,
phs sql.PlanHookState,
execCtx sql.JobExecContext,
jobID int64,
details jobspb.ChangefeedDetails,
progress jobspb.Progress,
Expand Down Expand Up @@ -99,7 +99,7 @@ func distChangefeedFlow(
spansTS = initialHighWater
}

execCfg := phs.ExecCfg()
execCfg := execCtx.ExecCfg()
trackedSpans, err := fetchSpansForTargets(ctx, execCfg.DB, execCfg.Codec, details.Targets, spansTS)
if err != nil {
return err
Expand All @@ -111,8 +111,8 @@ func distChangefeedFlow(
if err != nil {
return err
}
dsp := phs.DistSQLPlanner()
evalCtx := phs.ExtendedEvalContext()
dsp := execCtx.DistSQLPlanner()
evalCtx := execCtx.ExtendedEvalContext()
planCtx := dsp.NewPlanningCtx(ctx, evalCtx, nil /* planner */, noTxn, true /* distribute */)

var spanPartitions []sql.SpanPartition
Expand Down Expand Up @@ -143,7 +143,7 @@ func distChangefeedFlow(
corePlacement[i].Core.ChangeAggregator = &execinfrapb.ChangeAggregatorSpec{
Watches: watches,
Feed: details,
UserProto: phs.User().EncodeProto(),
UserProto: execCtx.User().EncodeProto(),
}
}
// NB: This SpanFrontier processor depends on the set of tracked spans being
Expand All @@ -154,7 +154,7 @@ func distChangefeedFlow(
TrackedSpans: trackedSpans,
Feed: details,
JobID: jobID,
UserProto: phs.User().EncodeProto(),
UserProto: execCtx.User().EncodeProto(),
}

p := sql.MakePhysicalPlan(gatewayNodeID)
Expand Down
20 changes: 10 additions & 10 deletions pkg/ccl/changefeedccl/changefeed_stmt.go
Original file line number Diff line number Diff line change
Expand Up @@ -576,10 +576,10 @@ func generateChangefeedSessionID() string {

// Resume is part of the jobs.Resumer interface.
func (b *changefeedResumer) Resume(
ctx context.Context, planHookState interface{}, startedCh chan<- tree.Datums,
ctx context.Context, exec interface{}, startedCh chan<- tree.Datums,
) error {
phs := planHookState.(sql.PlanHookState)
execCfg := phs.ExecCfg()
jobExec := exec.(sql.JobExecContext)
execCfg := jobExec.ExecCfg()
jobID := *b.job.ID()
details := b.job.Details().(jobspb.ChangefeedDetails)
progress := b.job.Progress()
Expand All @@ -595,7 +595,7 @@ func (b *changefeedResumer) Resume(
}
var err error
for r := retry.StartWithCtx(ctx, opts); r.Next(); {
if err = distChangefeedFlow(ctx, phs, jobID, details, progress, startedCh); err == nil {
if err = distChangefeedFlow(ctx, jobExec, jobID, details, progress, startedCh); err == nil {
return nil
}
if !IsRetryableError(err) {
Expand Down Expand Up @@ -646,9 +646,9 @@ func (b *changefeedResumer) Resume(
}

// OnFailOrCancel is part of the jobs.Resumer interface.
func (b *changefeedResumer) OnFailOrCancel(ctx context.Context, planHookState interface{}) error {
phs := planHookState.(sql.PlanHookState)
execCfg := phs.ExecCfg()
func (b *changefeedResumer) OnFailOrCancel(ctx context.Context, jobExec interface{}) error {
exec := jobExec.(sql.JobExecContext)
execCfg := exec.ExecCfg()
progress := b.job.Progress()
b.maybeCleanUpProtectedTimestamp(ctx, execCfg.DB, execCfg.ProtectedTimestampProvider,
progress.GetChangefeed().ProtectedTimestampRecord)
Expand All @@ -660,7 +660,7 @@ func (b *changefeedResumer) OnFailOrCancel(ctx context.Context, planHookState in
telemetry.Count(`changefeed.enterprise.cancel`)
} else {
telemetry.Count(`changefeed.enterprise.fail`)
phs.ExecCfg().JobRegistry.MetricsStruct().Changefeed.(*Metrics).Failures.Inc(1)
exec.ExecCfg().JobRegistry.MetricsStruct().Changefeed.(*Metrics).Failures.Inc(1)
}
return nil
}
Expand Down Expand Up @@ -688,7 +688,7 @@ var _ jobs.PauseRequester = (*changefeedResumer)(nil)
// paused, we want to install a protected timestamp at the most recent high
// watermark if there isn't already one.
func (b *changefeedResumer) OnPauseRequest(
ctx context.Context, planHookState interface{}, txn *kv.Txn, progress *jobspb.Progress,
ctx context.Context, jobExec interface{}, txn *kv.Txn, progress *jobspb.Progress,
) error {
details := b.job.Details().(jobspb.ChangefeedDetails)
if _, shouldPause := details.Opts[changefeedbase.OptProtectDataFromGCOnPause]; !shouldPause {
Expand All @@ -713,7 +713,7 @@ func (b *changefeedResumer) OnPauseRequest(
return nil
}

pts := planHookState.(sql.PlanHookState).ExecCfg().ProtectedTimestampProvider
pts := jobExec.(sql.JobExecContext).ExecCfg().ProtectedTimestampProvider
return createProtectedTimestampRecord(ctx, pts, txn, *b.job.ID(),
details.Targets, *resolved, cp)
}
6 changes: 3 additions & 3 deletions pkg/ccl/importccl/import_processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -548,11 +548,11 @@ type cancellableImportResumer struct {
}

func (r *cancellableImportResumer) Resume(
_ context.Context, phs interface{}, resultsCh chan<- tree.Datums,
_ context.Context, execCtx interface{}, resultsCh chan<- tree.Datums,
) error {
r.jobID = *r.wrapped.job.ID()
r.jobIDCh <- r.jobID
if err := r.wrapped.Resume(r.ctx, phs, resultsCh); err != nil {
if err := r.wrapped.Resume(r.ctx, execCtx, resultsCh); err != nil {
return err
}
if r.onSuccessBarrier != nil {
Expand All @@ -561,7 +561,7 @@ func (r *cancellableImportResumer) Resume(
return errors.New("job succeed, but we're forcing it to be paused")
}

func (r *cancellableImportResumer) OnFailOrCancel(ctx context.Context, phs interface{}) error {
func (r *cancellableImportResumer) OnFailOrCancel(ctx context.Context, execCtx interface{}) error {
// This callback is invoked when an error or cancellation occurs
// during the import. Since our Resume handler returned an
// error (after pausing the job), we need to short-circuits
Expand Down
Loading

0 comments on commit 54dbbe5

Please sign in to comment.