From 5b17910ae58a1def4b673eea12d4dd27ae5a202f Mon Sep 17 00:00:00 2001 From: David Taylor Date: Fri, 6 Mar 2020 23:30:36 +0000 Subject: [PATCH 1/2] *: remove jobs.Resumer.OnSuccess We can just do whatever it was doing in Resume to simplify state machine. Release note: none. --- pkg/ccl/backupccl/backup_job.go | 3 - pkg/ccl/backupccl/restore_job.go | 5 -- pkg/ccl/changefeedccl/changefeed_stmt.go | 4 -- pkg/ccl/importccl/import_processor_test.go | 9 ++- pkg/ccl/importccl/import_stmt.go | 9 +-- pkg/jobs/helpers_test.go | 15 ++-- pkg/jobs/jobs_test.go | 30 -------- pkg/jobs/registry.go | 12 +--- pkg/sql/create_stats.go | 82 ++++++++++------------ 9 files changed, 50 insertions(+), 119 deletions(-) diff --git a/pkg/ccl/backupccl/backup_job.go b/pkg/ccl/backupccl/backup_job.go index 2476e25641e1..741447a3ae99 100644 --- a/pkg/ccl/backupccl/backup_job.go +++ b/pkg/ccl/backupccl/backup_job.go @@ -531,9 +531,6 @@ func (b *backupResumer) OnFailOrCancel(context.Context, interface{}) error { return nil } -// OnSuccess is part of the jobs.Resumer interface. -func (b *backupResumer) OnSuccess(context.Context, *client.Txn) error { return nil } - // OnTerminal is part of the jobs.Resumer interface. func (b *backupResumer) OnTerminal( ctx context.Context, status jobs.Status, resultsCh chan<- tree.Datums, diff --git a/pkg/ccl/backupccl/restore_job.go b/pkg/ccl/backupccl/restore_job.go index 523c2d837c3e..0abd7c68455a 100644 --- a/pkg/ccl/backupccl/restore_job.go +++ b/pkg/ccl/backupccl/restore_job.go @@ -1162,11 +1162,6 @@ func (r *restoreResumer) dropTables(ctx context.Context, txn *client.Txn) error return nil } -// OnSuccess is part of the jobs.Resumer interface. -func (r *restoreResumer) OnSuccess(ctx context.Context, txn *client.Txn) error { - return nil -} - // restoreSystemTables atomically replaces the contents of the system tables // with the data from the restored system tables. func (r *restoreResumer) restoreSystemTables(ctx context.Context) error { diff --git a/pkg/ccl/changefeedccl/changefeed_stmt.go b/pkg/ccl/changefeedccl/changefeed_stmt.go index 95c3a1b1643d..55f463ea2829 100644 --- a/pkg/ccl/changefeedccl/changefeed_stmt.go +++ b/pkg/ccl/changefeedccl/changefeed_stmt.go @@ -19,7 +19,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/ccl/backupccl" "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/changefeedbase" "github.com/cockroachdb/cockroach/pkg/ccl/utilccl" - "github.com/cockroachdb/cockroach/pkg/internal/client" "github.com/cockroachdb/cockroach/pkg/jobs" "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" "github.com/cockroachdb/cockroach/pkg/keys" @@ -548,8 +547,5 @@ func (b *changefeedResumer) Resume( // OnFailOrCancel is part of the jobs.Resumer interface. func (b *changefeedResumer) OnFailOrCancel(context.Context, interface{}) error { return nil } -// OnSuccess is part of the jobs.Resumer interface. -func (b *changefeedResumer) OnSuccess(context.Context, *client.Txn) error { return nil } - // OnTerminal is part of the jobs.Resumer interface. func (b *changefeedResumer) OnTerminal(context.Context, jobs.Status, chan<- tree.Datums) {} diff --git a/pkg/ccl/importccl/import_processor_test.go b/pkg/ccl/importccl/import_processor_test.go index 546c1c2807aa..b7b7ec0ca95d 100644 --- a/pkg/ccl/importccl/import_processor_test.go +++ b/pkg/ccl/importccl/import_processor_test.go @@ -480,10 +480,9 @@ func (r *cancellableImportResumer) Resume( ) error { r.jobID = *r.wrapped.job.ID() r.jobIDCh <- r.jobID - return r.wrapped.Resume(r.ctx, phs, resultsCh) -} - -func (r *cancellableImportResumer) OnSuccess(ctx context.Context, txn *client.Txn) error { + if err := r.wrapped.Resume(r.ctx, phs, resultsCh); err != nil { + return err + } if r.onSuccessBarrier != nil { defer r.onSuccessBarrier.Enter()() } @@ -498,7 +497,7 @@ func (r *cancellableImportResumer) OnTerminal( func (r *cancellableImportResumer) OnFailOrCancel(ctx context.Context, phs interface{}) error { // This callback is invoked when an error or cancellation occurs - // during the import. Since our OnSuccess handler returned an + // during the import. Since our Resume handler returned an // error (after pausing the job), we need to short-circuits // jobs machinery so that this job is not marked as failed. return errors.New("bail out") diff --git a/pkg/ccl/importccl/import_stmt.go b/pkg/ccl/importccl/import_stmt.go index 85e5ec760b26..fd48b99cdeb3 100644 --- a/pkg/ccl/importccl/import_stmt.go +++ b/pkg/ccl/importccl/import_stmt.go @@ -1053,9 +1053,7 @@ func (r *importResumer) Resume( } // TODO(ajwerner): Should this actually return the error? At this point we've // successfully finished the import but failed to drop the protected - // timestamp. The reconciliation loop ought to pick it up. Ideally we'd do - // this in OnSuccess but we don't have access to the protectedts.Storage - // there. + // timestamp. The reconciliation loop ought to pick it up. if ptsID != nil && !r.testingKnobs.ignoreProtectedTimestamps { if err := p.ExecCfg().DB.Txn(ctx, func(ctx context.Context, txn *client.Txn) error { return r.releaseProtectedTimestamp(ctx, txn, p.ExecCfg().ProtectedTimestampProvider) @@ -1265,11 +1263,6 @@ func (r *importResumer) dropTables(ctx context.Context, txn *client.Txn) error { return errors.Wrap(txn.Run(ctx, b), "rolling back tables") } -// OnSuccess is part of the jobs.Resumer interface. -func (r *importResumer) OnSuccess(ctx context.Context, txn *client.Txn) error { - return nil -} - // OnTerminal is part of the jobs.Resumer interface. func (r *importResumer) OnTerminal( ctx context.Context, status jobs.Status, resultsCh chan<- tree.Datums, diff --git a/pkg/jobs/helpers_test.go b/pkg/jobs/helpers_test.go index e29d5f11fac8..1bfc2d680eab 100644 --- a/pkg/jobs/helpers_test.go +++ b/pkg/jobs/helpers_test.go @@ -13,7 +13,6 @@ package jobs import ( "context" - "github.com/cockroachdb/cockroach/pkg/internal/client" "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" ) @@ -36,7 +35,12 @@ type FakeResumer struct { func (d FakeResumer) Resume(ctx context.Context, _ interface{}, _ chan<- tree.Datums) error { if d.OnResume != nil { - return d.OnResume(ctx) + if err := d.OnResume(ctx); err != nil { + return err + } + } + if d.Success != nil { + return d.Success() } return nil } @@ -48,13 +52,6 @@ func (d FakeResumer) OnFailOrCancel(ctx context.Context, _ interface{}) error { return nil } -func (d FakeResumer) OnSuccess(_ context.Context, _ *client.Txn) error { - if d.Success != nil { - return d.Success() - } - return nil -} - func (d FakeResumer) OnTerminal(_ context.Context, _ Status, _ chan<- tree.Datums) { if d.Terminal != nil { d.Terminal() diff --git a/pkg/jobs/jobs_test.go b/pkg/jobs/jobs_test.go index 4bec9e83e0b6..5bdd92deeeb3 100644 --- a/pkg/jobs/jobs_test.go +++ b/pkg/jobs/jobs_test.go @@ -668,36 +668,6 @@ func TestRegistryLifecycle(t *testing.T) { rts.check(t, jobs.StatusFailed) }) - // Attempt to mark success, but fail. - t.Run("fail marking success", func(t *testing.T) { - rts := registryTestSuite{} - rts.setUp(t) - defer rts.tearDown() - - // Make marking success fail. - rts.successErr = errors.New("marking success failed") - j, _, err := rts.registry.CreateAndStartJob(rts.ctx, nil, rts.mockJob) - if err != nil { - t.Fatal(err) - } - rts.job = j - - rts.mu.e.ResumeStart = true - rts.resumeCheckCh <- struct{}{} - rts.check(t, jobs.StatusRunning) - - rts.resumeCh <- nil - rts.mu.e.ResumeExit++ - - rts.mu.e.Success = true - rts.mu.e.OnFailOrCancelStart = true - rts.failOrCancelCheckCh <- struct{}{} - rts.failOrCancelCh <- nil - rts.mu.e.OnFailOrCancelExit++ - rts.mu.e.Terminal++ - rts.check(t, jobs.StatusFailed) - }) - // Attempt to mark success, but fail, but fail that also. Thus it should not // trigger OnTerminal. t.Run("fail marking success and fail OnFailOrCancel", func(t *testing.T) { diff --git a/pkg/jobs/registry.go b/pkg/jobs/registry.go index b3b63d91de70..0bcb79903615 100644 --- a/pkg/jobs/registry.go +++ b/pkg/jobs/registry.go @@ -665,16 +665,6 @@ type Resumer interface { // is a sql.PlanHookState. Resume(ctx context.Context, phs interface{}, resultsCh chan<- tree.Datums) error - // OnSuccess is called when a job has completed successfully, and is called - // with the same txn that will mark the job as successful. The txn will - // only be committed if this doesn't return an error and the job state was - // successfully changed to successful. If OnSuccess returns an error, the - // job will be marked as failed. - // - // Any work this function does must still be correct if the txn is aborted at - // a later time. - OnSuccess(ctx context.Context, txn *client.Txn) error - // OnTerminal is called after a job has successfully been marked as // terminal. It should be used to perform optional cleanup and return final // results to the user. There is no guarantee that this function is ever run @@ -791,7 +781,7 @@ func (r *Registry) stepThroughStateMachine( errorMsg := fmt.Sprintf("job %d: successful bu unexpected error provided", *job.ID()) return errors.NewAssertionErrorWithWrappedErrf(jobErr, errorMsg) } - if err := job.Succeeded(ctx, resumer.OnSuccess); err != nil { + if err := job.Succeeded(ctx, nil); err != nil { // If it didn't succeed, we consider the job as failed and need to go // through reverting state first. // TODO(spaskob): this is silly, we should remove the OnSuccess hooks and diff --git a/pkg/sql/create_stats.go b/pkg/sql/create_stats.go index e6de6be67891..7ebe88701846 100644 --- a/pkg/sql/create_stats.go +++ b/pkg/sql/create_stats.go @@ -333,13 +333,10 @@ func (n *createStatsNode) makePlanForExplainDistSQL( } // createStatsResumer implements the jobs.Resumer interface for CreateStats -// jobs. A new instance is created for each job. evalCtx is populated inside -// createStatsResumer.Resume so it can be used in createStatsResumer.OnSuccess -// (if the job is successful). +// jobs. A new instance is created for each job. type createStatsResumer struct { job *jobs.Job tableID sqlbase.ID - evalCtx *extendedEvalContext } var _ jobs.Resumer = &createStatsResumer{} @@ -359,10 +356,10 @@ func (r *createStatsResumer) Resume( } r.tableID = details.Table.ID - r.evalCtx = p.ExtendedEvalContext() + evalCtx := p.ExtendedEvalContext() ci := sqlbase.ColTypeInfoFromColTypes([]types.T{}) - rows := rowcontainer.NewRowContainer(r.evalCtx.Mon.MakeBoundAccount(), ci, 0) + rows := rowcontainer.NewRowContainer(evalCtx.Mon.MakeBoundAccount(), ci, 0) defer func() { if rows != nil { rows.Close(ctx) @@ -370,17 +367,17 @@ func (r *createStatsResumer) Resume( }() dsp := p.DistSQLPlanner() - return p.ExecCfg().DB.Txn(ctx, func(ctx context.Context, txn *client.Txn) error { + if err := p.ExecCfg().DB.Txn(ctx, func(ctx context.Context, txn *client.Txn) error { if details.AsOf != nil { p.semaCtx.AsOfTimestamp = details.AsOf p.extendedEvalCtx.SetTxnTimestamp(details.AsOf.GoTime()) txn.SetFixedTimestamp(ctx, *details.AsOf) } - planCtx := dsp.NewPlanningCtx(ctx, r.evalCtx, txn) + planCtx := dsp.NewPlanningCtx(ctx, evalCtx, txn) planCtx.planner = p if err := dsp.planAndRunCreateStats( - ctx, r.evalCtx, planCtx, txn, r.job, NewRowResultWriter(rows), + ctx, evalCtx, planCtx, txn, r.job, NewRowResultWriter(rows), ); err != nil { // Check if this was a context canceled error and restart if it was. if s, ok := status.FromError(errors.UnwrapAll(err)); ok { @@ -409,6 +406,38 @@ func (r *createStatsResumer) Resume( } return nil + }); err != nil { + return err + } + + // Invalidate the local cache synchronously; this guarantees that the next + // statement in the same session won't use a stale cache (whereas the gossip + // update is handled asynchronously). + evalCtx.ExecCfg.TableStatsCache.InvalidateTableStats(ctx, r.tableID) + + // Record this statistics creation in the event log. + if !createStatsPostEvents.Get(&evalCtx.Settings.SV) { + return nil + } + + // TODO(rytaft): This creates a new transaction for the CREATE STATISTICS + // event. It must be different from the CREATE STATISTICS transaction, + // because that transaction must be read-only. In the future we may want + // to use the transaction that inserted the new stats into the + // system.table_statistics table, but that would require calling + // MakeEventLogger from the distsqlrun package. + return evalCtx.ExecCfg.DB.Txn(ctx, func(ctx context.Context, txn *client.Txn) error { + return MakeEventLogger(evalCtx.ExecCfg).InsertEventRecord( + ctx, + txn, + EventLogCreateStatistics, + int32(details.Table.ID), + int32(evalCtx.NodeID), + struct { + TableName string + Statement string + }{details.FQTableName, details.Statement}, + ) }) } @@ -459,41 +488,6 @@ func checkRunningJobs(ctx context.Context, job *jobs.Job, p *planner) error { // OnFailOrCancel is part of the jobs.Resumer interface. func (r *createStatsResumer) OnFailOrCancel(context.Context, interface{}) error { return nil } -// OnSuccess is part of the jobs.Resumer interface. -func (r *createStatsResumer) OnSuccess(ctx context.Context, _ *client.Txn) error { - details := r.job.Details().(jobspb.CreateStatsDetails) - - // Invalidate the local cache synchronously; this guarantees that the next - // statement in the same session won't use a stale cache (whereas the gossip - // update is handled asynchronously). - r.evalCtx.ExecCfg.TableStatsCache.InvalidateTableStats(ctx, r.tableID) - - // Record this statistics creation in the event log. - if !createStatsPostEvents.Get(&r.evalCtx.Settings.SV) { - return nil - } - - // TODO(rytaft): This creates a new transaction for the CREATE STATISTICS - // event. It must be different from the CREATE STATISTICS transaction, - // because that transaction must be read-only. In the future we may want - // to use the transaction that inserted the new stats into the - // system.table_statistics table, but that would require calling - // MakeEventLogger from the distsqlrun package. - return r.evalCtx.ExecCfg.DB.Txn(ctx, func(ctx context.Context, txn *client.Txn) error { - return MakeEventLogger(r.evalCtx.ExecCfg).InsertEventRecord( - ctx, - txn, - EventLogCreateStatistics, - int32(details.Table.ID), - int32(r.evalCtx.NodeID), - struct { - TableName string - Statement string - }{details.FQTableName, details.Statement}, - ) - }) -} - // OnTerminal is part of the jobs.Resumer interface. func (r *createStatsResumer) OnTerminal( ctx context.Context, status jobs.Status, resultsCh chan<- tree.Datums, From 51a0d50acce0cfab024fc216e9377018a3760722 Mon Sep 17 00:00:00 2001 From: David Taylor Date: Sat, 7 Mar 2020 03:37:46 +0000 Subject: [PATCH 2/2] *: remove jobs.Resumer.OnTerminal Whatever it was doing can be done in Resume or OnFailOrCancel, both of which are resumable (and most of these were only doing things on success anyway so they should just be last step of Resume). Release note: none. --- pkg/ccl/backupccl/backup_job.go | 34 +++++++++------------- pkg/ccl/backupccl/restore_job.go | 30 ++++++------------- pkg/ccl/changefeedccl/changefeed_stmt.go | 3 -- pkg/ccl/importccl/import_processor_test.go | 6 ---- pkg/ccl/importccl/import_stmt.go | 33 +++++++++------------ pkg/jobs/helpers_test.go | 7 ----- pkg/jobs/jobs_test.go | 32 ++++---------------- pkg/jobs/registry.go | 9 ------ pkg/sql/create_stats.go | 6 ---- 9 files changed, 41 insertions(+), 119 deletions(-) diff --git a/pkg/ccl/backupccl/backup_job.go b/pkg/ccl/backupccl/backup_job.go index 741447a3ae99..fbe1b9e03a99 100644 --- a/pkg/ccl/backupccl/backup_job.go +++ b/pkg/ccl/backupccl/backup_job.go @@ -505,6 +505,16 @@ func (b *backupResumer) Resume( if err != nil { log.Warningf(ctx, "unable to clear stats from job payload: %+v", err) } + b.deleteCheckpoint(ctx) + + resultsCh <- tree.Datums{ + tree.NewDInt(tree.DInt(*b.job.ID())), + tree.NewDString(string(jobs.StatusSucceeded)), + tree.NewDFloat(tree.DFloat(1.0)), + tree.NewDInt(tree.DInt(b.res.Rows)), + tree.NewDInt(tree.DInt(b.res.IndexEntries)), + tree.NewDInt(tree.DInt(b.res.DataSize)), + } return nil } @@ -527,14 +537,12 @@ func (b *backupResumer) clearStats(ctx context.Context, DB *client.DB) error { } // OnFailOrCancel is part of the jobs.Resumer interface. -func (b *backupResumer) OnFailOrCancel(context.Context, interface{}) error { +func (b *backupResumer) OnFailOrCancel(ctx context.Context, _ interface{}) error { + b.deleteCheckpoint(ctx) return nil } -// OnTerminal is part of the jobs.Resumer interface. -func (b *backupResumer) OnTerminal( - ctx context.Context, status jobs.Status, resultsCh chan<- tree.Datums, -) { +func (b *backupResumer) deleteCheckpoint(ctx context.Context) { // Attempt to delete BACKUP-CHECKPOINT. if err := func() error { details := b.job.Details().(jobspb.BackupDetails) @@ -552,22 +560,6 @@ func (b *backupResumer) OnTerminal( }(); err != nil { log.Warningf(ctx, "unable to delete checkpointed backup descriptor: %+v", err) } - - if status == jobs.StatusSucceeded { - // TODO(benesch): emit periodic progress updates. - - // TODO(mjibson): if a restore was resumed, then these counts will only have - // the current coordinator's counts. - - resultsCh <- tree.Datums{ - tree.NewDInt(tree.DInt(*b.job.ID())), - tree.NewDString(string(jobs.StatusSucceeded)), - tree.NewDFloat(tree.DFloat(1.0)), - tree.NewDInt(tree.DInt(b.res.Rows)), - tree.NewDInt(tree.DInt(b.res.IndexEntries)), - tree.NewDInt(tree.DInt(b.res.DataSize)), - } - } } var _ jobs.Resumer = &backupResumer{} diff --git a/pkg/ccl/backupccl/restore_job.go b/pkg/ccl/backupccl/restore_job.go index 0abd7c68455a..77017b268b87 100644 --- a/pkg/ccl/backupccl/restore_job.go +++ b/pkg/ccl/backupccl/restore_job.go @@ -1010,6 +1010,15 @@ func (r *restoreResumer) Resume( } } + resultsCh <- tree.Datums{ + tree.NewDInt(tree.DInt(*r.job.ID())), + tree.NewDString(string(jobs.StatusSucceeded)), + tree.NewDFloat(tree.DFloat(1.0)), + tree.NewDInt(tree.DInt(r.res.Rows)), + tree.NewDInt(tree.DInt(r.res.IndexEntries)), + tree.NewDInt(tree.DInt(r.res.DataSize)), + } + return nil } @@ -1201,27 +1210,6 @@ func (r *restoreResumer) restoreSystemTables(ctx context.Context) error { return nil } -// OnTerminal is part of the jobs.Resumer interface. -func (r *restoreResumer) OnTerminal( - ctx context.Context, status jobs.Status, resultsCh chan<- tree.Datums, -) { - if status == jobs.StatusSucceeded { - // TODO(benesch): emit periodic progress updates. - - // TODO(mjibson): if a restore was resumed, then these counts will only have - // the current coordinator's counts. - - resultsCh <- tree.Datums{ - tree.NewDInt(tree.DInt(*r.job.ID())), - tree.NewDString(string(jobs.StatusSucceeded)), - tree.NewDFloat(tree.DFloat(1.0)), - tree.NewDInt(tree.DInt(r.res.Rows)), - tree.NewDInt(tree.DInt(r.res.IndexEntries)), - tree.NewDInt(tree.DInt(r.res.DataSize)), - } - } -} - var _ jobs.Resumer = &restoreResumer{} func init() { diff --git a/pkg/ccl/changefeedccl/changefeed_stmt.go b/pkg/ccl/changefeedccl/changefeed_stmt.go index 55f463ea2829..7344bcb0d14f 100644 --- a/pkg/ccl/changefeedccl/changefeed_stmt.go +++ b/pkg/ccl/changefeedccl/changefeed_stmt.go @@ -546,6 +546,3 @@ func (b *changefeedResumer) Resume( // OnFailOrCancel is part of the jobs.Resumer interface. func (b *changefeedResumer) OnFailOrCancel(context.Context, interface{}) error { return nil } - -// OnTerminal is part of the jobs.Resumer interface. -func (b *changefeedResumer) OnTerminal(context.Context, jobs.Status, chan<- tree.Datums) {} diff --git a/pkg/ccl/importccl/import_processor_test.go b/pkg/ccl/importccl/import_processor_test.go index b7b7ec0ca95d..ab1c0b23c6fc 100644 --- a/pkg/ccl/importccl/import_processor_test.go +++ b/pkg/ccl/importccl/import_processor_test.go @@ -489,12 +489,6 @@ func (r *cancellableImportResumer) Resume( return errors.New("job succeed, but we're forcing it to be paused") } -func (r *cancellableImportResumer) OnTerminal( - ctx context.Context, status jobs.Status, resultsCh chan<- tree.Datums, -) { - r.wrapped.OnTerminal(ctx, status, resultsCh) -} - func (r *cancellableImportResumer) OnFailOrCancel(ctx context.Context, phs interface{}) error { // This callback is invoked when an error or cancellation occurs // during the import. Since our Resume handler returned an diff --git a/pkg/ccl/importccl/import_stmt.go b/pkg/ccl/importccl/import_stmt.go index fd48b99cdeb3..25ab5ebb066b 100644 --- a/pkg/ccl/importccl/import_stmt.go +++ b/pkg/ccl/importccl/import_stmt.go @@ -1061,6 +1061,19 @@ func (r *importResumer) Resume( log.Errorf(ctx, "failed to release protected timestamp: %v", err) } } + + telemetry.CountBucketed("import.rows", r.res.Rows) + const mb = 1 << 20 + telemetry.CountBucketed("import.size-mb", r.res.DataSize/mb) + resultsCh <- tree.Datums{ + tree.NewDInt(tree.DInt(*r.job.ID())), + tree.NewDString(string(jobs.StatusSucceeded)), + tree.NewDFloat(tree.DFloat(1.0)), + tree.NewDInt(tree.DInt(r.res.Rows)), + tree.NewDInt(tree.DInt(r.res.IndexEntries)), + tree.NewDInt(tree.DInt(r.res.DataSize)), + } + return nil } @@ -1263,26 +1276,6 @@ func (r *importResumer) dropTables(ctx context.Context, txn *client.Txn) error { return errors.Wrap(txn.Run(ctx, b), "rolling back tables") } -// OnTerminal is part of the jobs.Resumer interface. -func (r *importResumer) OnTerminal( - ctx context.Context, status jobs.Status, resultsCh chan<- tree.Datums, -) { - if status == jobs.StatusSucceeded { - telemetry.CountBucketed("import.rows", r.res.Rows) - const mb = 1 << 20 - telemetry.CountBucketed("import.size-mb", r.res.DataSize/mb) - - resultsCh <- tree.Datums{ - tree.NewDInt(tree.DInt(*r.job.ID())), - tree.NewDString(string(jobs.StatusSucceeded)), - tree.NewDFloat(tree.DFloat(1.0)), - tree.NewDInt(tree.DInt(r.res.Rows)), - tree.NewDInt(tree.DInt(r.res.IndexEntries)), - tree.NewDInt(tree.DInt(r.res.DataSize)), - } - } -} - var _ jobs.Resumer = &importResumer{} func init() { diff --git a/pkg/jobs/helpers_test.go b/pkg/jobs/helpers_test.go index 1bfc2d680eab..07ecab0d10b8 100644 --- a/pkg/jobs/helpers_test.go +++ b/pkg/jobs/helpers_test.go @@ -30,7 +30,6 @@ type FakeResumer struct { OnResume func(context.Context) error FailOrCancel func(context.Context) error Success func() error - Terminal func() } func (d FakeResumer) Resume(ctx context.Context, _ interface{}, _ chan<- tree.Datums) error { @@ -52,10 +51,4 @@ func (d FakeResumer) OnFailOrCancel(ctx context.Context, _ interface{}) error { return nil } -func (d FakeResumer) OnTerminal(_ context.Context, _ Status, _ chan<- tree.Datums) { - if d.Terminal != nil { - d.Terminal() - } -} - var _ Resumer = FakeResumer{} diff --git a/pkg/jobs/jobs_test.go b/pkg/jobs/jobs_test.go index 5bdd92deeeb3..d950ef2bb666 100644 --- a/pkg/jobs/jobs_test.go +++ b/pkg/jobs/jobs_test.go @@ -136,7 +136,7 @@ func TestJobsTableProgressFamily(t *testing.T) { } type counters struct { - ResumeExit, OnFailOrCancelExit, Terminal int + ResumeExit, OnFailOrCancelExit int // These sometimes retry so just use bool. ResumeStart, OnFailOrCancelStart, Success bool } @@ -247,14 +247,6 @@ func (rts *registryTestSuite) setUp(t *testing.T) { rts.mu.a.Success = true return rts.successErr }, - - Terminal: func() { - t.Log("Starting terminal") - rts.mu.Lock() - rts.mu.a.Terminal++ - rts.mu.Unlock() - t.Log("Exiting terminal") - }, } }) } @@ -320,7 +312,6 @@ func TestRegistryLifecycle(t *testing.T) { rts.resumeCh <- nil rts.mu.e.ResumeExit++ rts.mu.e.Success = true - rts.mu.e.Terminal++ rts.check(t, jobs.StatusSucceeded) }) @@ -342,7 +333,6 @@ func TestRegistryLifecycle(t *testing.T) { rts.resumeCh <- nil rts.mu.e.ResumeExit++ rts.mu.e.Success = true - rts.mu.e.Terminal++ rts.check(t, jobs.StatusSucceeded) }) @@ -377,7 +367,6 @@ func TestRegistryLifecycle(t *testing.T) { rts.mu.e.ResumeExit++ rts.mu.e.Success = true - rts.mu.e.Terminal++ rts.check(t, jobs.StatusSucceeded) }) @@ -415,7 +404,6 @@ func TestRegistryLifecycle(t *testing.T) { rts.failOrCancelCh <- nil rts.mu.e.OnFailOrCancelExit++ - rts.mu.e.Terminal++ rts.check(t, jobs.StatusFailed) }) @@ -440,7 +428,6 @@ func TestRegistryLifecycle(t *testing.T) { rts.failOrCancelCheckCh <- struct{}{} rts.failOrCancelCh <- nil rts.mu.e.OnFailOrCancelExit++ - rts.mu.e.Terminal++ rts.check(t, jobs.StatusCanceled) }) @@ -492,7 +479,6 @@ func TestRegistryLifecycle(t *testing.T) { rts.failOrCancelCh <- nil rts.mu.e.OnFailOrCancelExit++ - rts.mu.e.Terminal++ rts.check(t, jobs.StatusCanceled) }) @@ -530,7 +516,6 @@ func TestRegistryLifecycle(t *testing.T) { rts.failOrCancelCh <- nil rts.mu.e.OnFailOrCancelExit++ - rts.mu.e.Terminal++ rts.check(t, jobs.StatusFailed) }) @@ -637,7 +622,6 @@ func TestRegistryLifecycle(t *testing.T) { rts.resumeCh <- nil rts.mu.e.ResumeExit++ rts.mu.e.Success = true - rts.mu.e.Terminal++ rts.check(t, jobs.StatusSucceeded) }) @@ -664,12 +648,10 @@ func TestRegistryLifecycle(t *testing.T) { rts.failOrCancelCh <- nil rts.mu.e.OnFailOrCancelExit++ - rts.mu.e.Terminal++ rts.check(t, jobs.StatusFailed) }) - // Attempt to mark success, but fail, but fail that also. Thus it should not - // trigger OnTerminal. + // Attempt to mark success, but fail, but fail that also. t.Run("fail marking success and fail OnFailOrCancel", func(t *testing.T) { rts := registryTestSuite{} rts.setUp(t) @@ -698,11 +680,10 @@ func TestRegistryLifecycle(t *testing.T) { rts.failOrCancelCheckCh <- struct{}{} rts.mu.e.OnFailOrCancelExit++ rts.failOrCancelCh <- errors.New("reverting failed") - rts.mu.e.Terminal++ rts.check(t, jobs.StatusFailed) }) - // Fail the job, but also fail to mark it failed. No OnTerminal. + // Fail the job, but also fail to mark it failed. t.Run("fail marking failed", func(t *testing.T) { rts := registryTestSuite{} rts.setUp(t) @@ -730,7 +711,6 @@ func TestRegistryLifecycle(t *testing.T) { // But let it fail. rts.mu.e.OnFailOrCancelExit++ rts.failOrCancelCh <- errors.New("resume failed") - rts.mu.e.Terminal++ rts.check(t, jobs.StatusFailed) }) @@ -1856,12 +1836,12 @@ func TestJobInTxn(t *testing.T) { return jobs.FakeResumer{ OnResume: func(ctx context.Context) error { t.Logf("Resuming job: %+v", job.Payload()) + atomic.AddInt32(&hasRun, 1) return nil }, - Terminal: func() { - t.Logf("Finished job: %+v", job.Payload()) - // Inc instead of just storing 1 to count how many jobs ran. + FailOrCancel: func(ctx context.Context) error { atomic.AddInt32(&hasRun, 1) + return nil }, } }) diff --git a/pkg/jobs/registry.go b/pkg/jobs/registry.go index 0bcb79903615..e87092ec3d32 100644 --- a/pkg/jobs/registry.go +++ b/pkg/jobs/registry.go @@ -665,12 +665,6 @@ type Resumer interface { // is a sql.PlanHookState. Resume(ctx context.Context, phs interface{}, resultsCh chan<- tree.Datums) error - // OnTerminal is called after a job has successfully been marked as - // terminal. It should be used to perform optional cleanup and return final - // results to the user. There is no guarantee that this function is ever run - // (for example, if a node died immediately after Success commits). - OnTerminal(ctx context.Context, status Status, resultsCh chan<- tree.Datums) - // OnFailOrCancel is called when a job fails or is cancel-requested. // // This method will be called when a registry notices the cancel request, @@ -774,7 +768,6 @@ func (r *Registry) stepThroughStateMachine( // restarted during the next adopt loop and reverting will be retried. return errors.Wrapf(err, "job %d: could not mark as canceled: %s", *job.ID(), jobErr) } - resumer.OnTerminal(ctx, status, resultsCh) return errors.Errorf("job %s", status) case StatusSucceeded: if jobErr != nil { @@ -789,7 +782,6 @@ func (r *Registry) stepThroughStateMachine( // better. return r.stepThroughStateMachine(ctx, phs, resumer, resultsCh, job, StatusReverting, errors.Wrapf(err, "could not mark job %d as succeeded", *job.ID())) } - resumer.OnTerminal(ctx, status, resultsCh) return nil case StatusReverting: if err := job.Reverted(ctx, jobErr, nil); err != nil { @@ -834,7 +826,6 @@ func (r *Registry) stepThroughStateMachine( // restarted during the next adopt loop and reverting will be retried. return errors.Wrapf(err, "job %d: could not mark as failed: %s", *job.ID(), jobErr) } - resumer.OnTerminal(ctx, status, resultsCh) return jobErr default: return errors.AssertionFailedf("job %d: has unsupported status %s", *job.ID(), status) diff --git a/pkg/sql/create_stats.go b/pkg/sql/create_stats.go index 7ebe88701846..962451dfcb6f 100644 --- a/pkg/sql/create_stats.go +++ b/pkg/sql/create_stats.go @@ -488,12 +488,6 @@ func checkRunningJobs(ctx context.Context, job *jobs.Job, p *planner) error { // OnFailOrCancel is part of the jobs.Resumer interface. func (r *createStatsResumer) OnFailOrCancel(context.Context, interface{}) error { return nil } -// OnTerminal is part of the jobs.Resumer interface. -func (r *createStatsResumer) OnTerminal( - ctx context.Context, status jobs.Status, resultsCh chan<- tree.Datums, -) { -} - func init() { createResumerFn := func(job *jobs.Job, settings *cluster.Settings) jobs.Resumer { return &createStatsResumer{job: job}