diff --git a/pkg/ccl/backupccl/backup_job.go b/pkg/ccl/backupccl/backup_job.go index 2476e25641e1..fbe1b9e03a99 100644 --- a/pkg/ccl/backupccl/backup_job.go +++ b/pkg/ccl/backupccl/backup_job.go @@ -505,6 +505,16 @@ func (b *backupResumer) Resume( if err != nil { log.Warningf(ctx, "unable to clear stats from job payload: %+v", err) } + b.deleteCheckpoint(ctx) + + resultsCh <- tree.Datums{ + tree.NewDInt(tree.DInt(*b.job.ID())), + tree.NewDString(string(jobs.StatusSucceeded)), + tree.NewDFloat(tree.DFloat(1.0)), + tree.NewDInt(tree.DInt(b.res.Rows)), + tree.NewDInt(tree.DInt(b.res.IndexEntries)), + tree.NewDInt(tree.DInt(b.res.DataSize)), + } return nil } @@ -527,17 +537,12 @@ func (b *backupResumer) clearStats(ctx context.Context, DB *client.DB) error { } // OnFailOrCancel is part of the jobs.Resumer interface. -func (b *backupResumer) OnFailOrCancel(context.Context, interface{}) error { +func (b *backupResumer) OnFailOrCancel(ctx context.Context, _ interface{}) error { + b.deleteCheckpoint(ctx) return nil } -// OnSuccess is part of the jobs.Resumer interface. -func (b *backupResumer) OnSuccess(context.Context, *client.Txn) error { return nil } - -// OnTerminal is part of the jobs.Resumer interface. -func (b *backupResumer) OnTerminal( - ctx context.Context, status jobs.Status, resultsCh chan<- tree.Datums, -) { +func (b *backupResumer) deleteCheckpoint(ctx context.Context) { // Attempt to delete BACKUP-CHECKPOINT. if err := func() error { details := b.job.Details().(jobspb.BackupDetails) @@ -555,22 +560,6 @@ func (b *backupResumer) OnTerminal( }(); err != nil { log.Warningf(ctx, "unable to delete checkpointed backup descriptor: %+v", err) } - - if status == jobs.StatusSucceeded { - // TODO(benesch): emit periodic progress updates. - - // TODO(mjibson): if a restore was resumed, then these counts will only have - // the current coordinator's counts. - - resultsCh <- tree.Datums{ - tree.NewDInt(tree.DInt(*b.job.ID())), - tree.NewDString(string(jobs.StatusSucceeded)), - tree.NewDFloat(tree.DFloat(1.0)), - tree.NewDInt(tree.DInt(b.res.Rows)), - tree.NewDInt(tree.DInt(b.res.IndexEntries)), - tree.NewDInt(tree.DInt(b.res.DataSize)), - } - } } var _ jobs.Resumer = &backupResumer{} diff --git a/pkg/ccl/backupccl/restore_job.go b/pkg/ccl/backupccl/restore_job.go index 523c2d837c3e..77017b268b87 100644 --- a/pkg/ccl/backupccl/restore_job.go +++ b/pkg/ccl/backupccl/restore_job.go @@ -1010,6 +1010,15 @@ func (r *restoreResumer) Resume( } } + resultsCh <- tree.Datums{ + tree.NewDInt(tree.DInt(*r.job.ID())), + tree.NewDString(string(jobs.StatusSucceeded)), + tree.NewDFloat(tree.DFloat(1.0)), + tree.NewDInt(tree.DInt(r.res.Rows)), + tree.NewDInt(tree.DInt(r.res.IndexEntries)), + tree.NewDInt(tree.DInt(r.res.DataSize)), + } + return nil } @@ -1162,11 +1171,6 @@ func (r *restoreResumer) dropTables(ctx context.Context, txn *client.Txn) error return nil } -// OnSuccess is part of the jobs.Resumer interface. -func (r *restoreResumer) OnSuccess(ctx context.Context, txn *client.Txn) error { - return nil -} - // restoreSystemTables atomically replaces the contents of the system tables // with the data from the restored system tables. func (r *restoreResumer) restoreSystemTables(ctx context.Context) error { @@ -1206,27 +1210,6 @@ func (r *restoreResumer) restoreSystemTables(ctx context.Context) error { return nil } -// OnTerminal is part of the jobs.Resumer interface. -func (r *restoreResumer) OnTerminal( - ctx context.Context, status jobs.Status, resultsCh chan<- tree.Datums, -) { - if status == jobs.StatusSucceeded { - // TODO(benesch): emit periodic progress updates. - - // TODO(mjibson): if a restore was resumed, then these counts will only have - // the current coordinator's counts. - - resultsCh <- tree.Datums{ - tree.NewDInt(tree.DInt(*r.job.ID())), - tree.NewDString(string(jobs.StatusSucceeded)), - tree.NewDFloat(tree.DFloat(1.0)), - tree.NewDInt(tree.DInt(r.res.Rows)), - tree.NewDInt(tree.DInt(r.res.IndexEntries)), - tree.NewDInt(tree.DInt(r.res.DataSize)), - } - } -} - var _ jobs.Resumer = &restoreResumer{} func init() { diff --git a/pkg/ccl/changefeedccl/changefeed_stmt.go b/pkg/ccl/changefeedccl/changefeed_stmt.go index 95c3a1b1643d..7344bcb0d14f 100644 --- a/pkg/ccl/changefeedccl/changefeed_stmt.go +++ b/pkg/ccl/changefeedccl/changefeed_stmt.go @@ -19,7 +19,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/ccl/backupccl" "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/changefeedbase" "github.com/cockroachdb/cockroach/pkg/ccl/utilccl" - "github.com/cockroachdb/cockroach/pkg/internal/client" "github.com/cockroachdb/cockroach/pkg/jobs" "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" "github.com/cockroachdb/cockroach/pkg/keys" @@ -547,9 +546,3 @@ func (b *changefeedResumer) Resume( // OnFailOrCancel is part of the jobs.Resumer interface. func (b *changefeedResumer) OnFailOrCancel(context.Context, interface{}) error { return nil } - -// OnSuccess is part of the jobs.Resumer interface. -func (b *changefeedResumer) OnSuccess(context.Context, *client.Txn) error { return nil } - -// OnTerminal is part of the jobs.Resumer interface. -func (b *changefeedResumer) OnTerminal(context.Context, jobs.Status, chan<- tree.Datums) {} diff --git a/pkg/ccl/importccl/import_processor_test.go b/pkg/ccl/importccl/import_processor_test.go index 546c1c2807aa..ab1c0b23c6fc 100644 --- a/pkg/ccl/importccl/import_processor_test.go +++ b/pkg/ccl/importccl/import_processor_test.go @@ -480,25 +480,18 @@ func (r *cancellableImportResumer) Resume( ) error { r.jobID = *r.wrapped.job.ID() r.jobIDCh <- r.jobID - return r.wrapped.Resume(r.ctx, phs, resultsCh) -} - -func (r *cancellableImportResumer) OnSuccess(ctx context.Context, txn *client.Txn) error { + if err := r.wrapped.Resume(r.ctx, phs, resultsCh); err != nil { + return err + } if r.onSuccessBarrier != nil { defer r.onSuccessBarrier.Enter()() } return errors.New("job succeed, but we're forcing it to be paused") } -func (r *cancellableImportResumer) OnTerminal( - ctx context.Context, status jobs.Status, resultsCh chan<- tree.Datums, -) { - r.wrapped.OnTerminal(ctx, status, resultsCh) -} - func (r *cancellableImportResumer) OnFailOrCancel(ctx context.Context, phs interface{}) error { // This callback is invoked when an error or cancellation occurs - // during the import. Since our OnSuccess handler returned an + // during the import. Since our Resume handler returned an // error (after pausing the job), we need to short-circuits // jobs machinery so that this job is not marked as failed. return errors.New("bail out") diff --git a/pkg/ccl/importccl/import_stmt.go b/pkg/ccl/importccl/import_stmt.go index 85e5ec760b26..25ab5ebb066b 100644 --- a/pkg/ccl/importccl/import_stmt.go +++ b/pkg/ccl/importccl/import_stmt.go @@ -1053,9 +1053,7 @@ func (r *importResumer) Resume( } // TODO(ajwerner): Should this actually return the error? At this point we've // successfully finished the import but failed to drop the protected - // timestamp. The reconciliation loop ought to pick it up. Ideally we'd do - // this in OnSuccess but we don't have access to the protectedts.Storage - // there. + // timestamp. The reconciliation loop ought to pick it up. if ptsID != nil && !r.testingKnobs.ignoreProtectedTimestamps { if err := p.ExecCfg().DB.Txn(ctx, func(ctx context.Context, txn *client.Txn) error { return r.releaseProtectedTimestamp(ctx, txn, p.ExecCfg().ProtectedTimestampProvider) @@ -1063,6 +1061,19 @@ func (r *importResumer) Resume( log.Errorf(ctx, "failed to release protected timestamp: %v", err) } } + + telemetry.CountBucketed("import.rows", r.res.Rows) + const mb = 1 << 20 + telemetry.CountBucketed("import.size-mb", r.res.DataSize/mb) + resultsCh <- tree.Datums{ + tree.NewDInt(tree.DInt(*r.job.ID())), + tree.NewDString(string(jobs.StatusSucceeded)), + tree.NewDFloat(tree.DFloat(1.0)), + tree.NewDInt(tree.DInt(r.res.Rows)), + tree.NewDInt(tree.DInt(r.res.IndexEntries)), + tree.NewDInt(tree.DInt(r.res.DataSize)), + } + return nil } @@ -1265,31 +1276,6 @@ func (r *importResumer) dropTables(ctx context.Context, txn *client.Txn) error { return errors.Wrap(txn.Run(ctx, b), "rolling back tables") } -// OnSuccess is part of the jobs.Resumer interface. -func (r *importResumer) OnSuccess(ctx context.Context, txn *client.Txn) error { - return nil -} - -// OnTerminal is part of the jobs.Resumer interface. -func (r *importResumer) OnTerminal( - ctx context.Context, status jobs.Status, resultsCh chan<- tree.Datums, -) { - if status == jobs.StatusSucceeded { - telemetry.CountBucketed("import.rows", r.res.Rows) - const mb = 1 << 20 - telemetry.CountBucketed("import.size-mb", r.res.DataSize/mb) - - resultsCh <- tree.Datums{ - tree.NewDInt(tree.DInt(*r.job.ID())), - tree.NewDString(string(jobs.StatusSucceeded)), - tree.NewDFloat(tree.DFloat(1.0)), - tree.NewDInt(tree.DInt(r.res.Rows)), - tree.NewDInt(tree.DInt(r.res.IndexEntries)), - tree.NewDInt(tree.DInt(r.res.DataSize)), - } - } -} - var _ jobs.Resumer = &importResumer{} func init() { diff --git a/pkg/jobs/helpers_test.go b/pkg/jobs/helpers_test.go index e29d5f11fac8..07ecab0d10b8 100644 --- a/pkg/jobs/helpers_test.go +++ b/pkg/jobs/helpers_test.go @@ -13,7 +13,6 @@ package jobs import ( "context" - "github.com/cockroachdb/cockroach/pkg/internal/client" "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" ) @@ -31,12 +30,16 @@ type FakeResumer struct { OnResume func(context.Context) error FailOrCancel func(context.Context) error Success func() error - Terminal func() } func (d FakeResumer) Resume(ctx context.Context, _ interface{}, _ chan<- tree.Datums) error { if d.OnResume != nil { - return d.OnResume(ctx) + if err := d.OnResume(ctx); err != nil { + return err + } + } + if d.Success != nil { + return d.Success() } return nil } @@ -48,17 +51,4 @@ func (d FakeResumer) OnFailOrCancel(ctx context.Context, _ interface{}) error { return nil } -func (d FakeResumer) OnSuccess(_ context.Context, _ *client.Txn) error { - if d.Success != nil { - return d.Success() - } - return nil -} - -func (d FakeResumer) OnTerminal(_ context.Context, _ Status, _ chan<- tree.Datums) { - if d.Terminal != nil { - d.Terminal() - } -} - var _ Resumer = FakeResumer{} diff --git a/pkg/jobs/jobs_test.go b/pkg/jobs/jobs_test.go index 4bec9e83e0b6..d950ef2bb666 100644 --- a/pkg/jobs/jobs_test.go +++ b/pkg/jobs/jobs_test.go @@ -136,7 +136,7 @@ func TestJobsTableProgressFamily(t *testing.T) { } type counters struct { - ResumeExit, OnFailOrCancelExit, Terminal int + ResumeExit, OnFailOrCancelExit int // These sometimes retry so just use bool. ResumeStart, OnFailOrCancelStart, Success bool } @@ -247,14 +247,6 @@ func (rts *registryTestSuite) setUp(t *testing.T) { rts.mu.a.Success = true return rts.successErr }, - - Terminal: func() { - t.Log("Starting terminal") - rts.mu.Lock() - rts.mu.a.Terminal++ - rts.mu.Unlock() - t.Log("Exiting terminal") - }, } }) } @@ -320,7 +312,6 @@ func TestRegistryLifecycle(t *testing.T) { rts.resumeCh <- nil rts.mu.e.ResumeExit++ rts.mu.e.Success = true - rts.mu.e.Terminal++ rts.check(t, jobs.StatusSucceeded) }) @@ -342,7 +333,6 @@ func TestRegistryLifecycle(t *testing.T) { rts.resumeCh <- nil rts.mu.e.ResumeExit++ rts.mu.e.Success = true - rts.mu.e.Terminal++ rts.check(t, jobs.StatusSucceeded) }) @@ -377,7 +367,6 @@ func TestRegistryLifecycle(t *testing.T) { rts.mu.e.ResumeExit++ rts.mu.e.Success = true - rts.mu.e.Terminal++ rts.check(t, jobs.StatusSucceeded) }) @@ -415,7 +404,6 @@ func TestRegistryLifecycle(t *testing.T) { rts.failOrCancelCh <- nil rts.mu.e.OnFailOrCancelExit++ - rts.mu.e.Terminal++ rts.check(t, jobs.StatusFailed) }) @@ -440,7 +428,6 @@ func TestRegistryLifecycle(t *testing.T) { rts.failOrCancelCheckCh <- struct{}{} rts.failOrCancelCh <- nil rts.mu.e.OnFailOrCancelExit++ - rts.mu.e.Terminal++ rts.check(t, jobs.StatusCanceled) }) @@ -492,7 +479,6 @@ func TestRegistryLifecycle(t *testing.T) { rts.failOrCancelCh <- nil rts.mu.e.OnFailOrCancelExit++ - rts.mu.e.Terminal++ rts.check(t, jobs.StatusCanceled) }) @@ -530,7 +516,6 @@ func TestRegistryLifecycle(t *testing.T) { rts.failOrCancelCh <- nil rts.mu.e.OnFailOrCancelExit++ - rts.mu.e.Terminal++ rts.check(t, jobs.StatusFailed) }) @@ -637,7 +622,6 @@ func TestRegistryLifecycle(t *testing.T) { rts.resumeCh <- nil rts.mu.e.ResumeExit++ rts.mu.e.Success = true - rts.mu.e.Terminal++ rts.check(t, jobs.StatusSucceeded) }) @@ -664,42 +648,10 @@ func TestRegistryLifecycle(t *testing.T) { rts.failOrCancelCh <- nil rts.mu.e.OnFailOrCancelExit++ - rts.mu.e.Terminal++ rts.check(t, jobs.StatusFailed) }) - // Attempt to mark success, but fail. - t.Run("fail marking success", func(t *testing.T) { - rts := registryTestSuite{} - rts.setUp(t) - defer rts.tearDown() - - // Make marking success fail. - rts.successErr = errors.New("marking success failed") - j, _, err := rts.registry.CreateAndStartJob(rts.ctx, nil, rts.mockJob) - if err != nil { - t.Fatal(err) - } - rts.job = j - - rts.mu.e.ResumeStart = true - rts.resumeCheckCh <- struct{}{} - rts.check(t, jobs.StatusRunning) - - rts.resumeCh <- nil - rts.mu.e.ResumeExit++ - - rts.mu.e.Success = true - rts.mu.e.OnFailOrCancelStart = true - rts.failOrCancelCheckCh <- struct{}{} - rts.failOrCancelCh <- nil - rts.mu.e.OnFailOrCancelExit++ - rts.mu.e.Terminal++ - rts.check(t, jobs.StatusFailed) - }) - - // Attempt to mark success, but fail, but fail that also. Thus it should not - // trigger OnTerminal. + // Attempt to mark success, but fail, but fail that also. t.Run("fail marking success and fail OnFailOrCancel", func(t *testing.T) { rts := registryTestSuite{} rts.setUp(t) @@ -728,11 +680,10 @@ func TestRegistryLifecycle(t *testing.T) { rts.failOrCancelCheckCh <- struct{}{} rts.mu.e.OnFailOrCancelExit++ rts.failOrCancelCh <- errors.New("reverting failed") - rts.mu.e.Terminal++ rts.check(t, jobs.StatusFailed) }) - // Fail the job, but also fail to mark it failed. No OnTerminal. + // Fail the job, but also fail to mark it failed. t.Run("fail marking failed", func(t *testing.T) { rts := registryTestSuite{} rts.setUp(t) @@ -760,7 +711,6 @@ func TestRegistryLifecycle(t *testing.T) { // But let it fail. rts.mu.e.OnFailOrCancelExit++ rts.failOrCancelCh <- errors.New("resume failed") - rts.mu.e.Terminal++ rts.check(t, jobs.StatusFailed) }) @@ -1886,12 +1836,12 @@ func TestJobInTxn(t *testing.T) { return jobs.FakeResumer{ OnResume: func(ctx context.Context) error { t.Logf("Resuming job: %+v", job.Payload()) + atomic.AddInt32(&hasRun, 1) return nil }, - Terminal: func() { - t.Logf("Finished job: %+v", job.Payload()) - // Inc instead of just storing 1 to count how many jobs ran. + FailOrCancel: func(ctx context.Context) error { atomic.AddInt32(&hasRun, 1) + return nil }, } }) diff --git a/pkg/jobs/registry.go b/pkg/jobs/registry.go index b3b63d91de70..e87092ec3d32 100644 --- a/pkg/jobs/registry.go +++ b/pkg/jobs/registry.go @@ -665,22 +665,6 @@ type Resumer interface { // is a sql.PlanHookState. Resume(ctx context.Context, phs interface{}, resultsCh chan<- tree.Datums) error - // OnSuccess is called when a job has completed successfully, and is called - // with the same txn that will mark the job as successful. The txn will - // only be committed if this doesn't return an error and the job state was - // successfully changed to successful. If OnSuccess returns an error, the - // job will be marked as failed. - // - // Any work this function does must still be correct if the txn is aborted at - // a later time. - OnSuccess(ctx context.Context, txn *client.Txn) error - - // OnTerminal is called after a job has successfully been marked as - // terminal. It should be used to perform optional cleanup and return final - // results to the user. There is no guarantee that this function is ever run - // (for example, if a node died immediately after Success commits). - OnTerminal(ctx context.Context, status Status, resultsCh chan<- tree.Datums) - // OnFailOrCancel is called when a job fails or is cancel-requested. // // This method will be called when a registry notices the cancel request, @@ -784,14 +768,13 @@ func (r *Registry) stepThroughStateMachine( // restarted during the next adopt loop and reverting will be retried. return errors.Wrapf(err, "job %d: could not mark as canceled: %s", *job.ID(), jobErr) } - resumer.OnTerminal(ctx, status, resultsCh) return errors.Errorf("job %s", status) case StatusSucceeded: if jobErr != nil { errorMsg := fmt.Sprintf("job %d: successful bu unexpected error provided", *job.ID()) return errors.NewAssertionErrorWithWrappedErrf(jobErr, errorMsg) } - if err := job.Succeeded(ctx, resumer.OnSuccess); err != nil { + if err := job.Succeeded(ctx, nil); err != nil { // If it didn't succeed, we consider the job as failed and need to go // through reverting state first. // TODO(spaskob): this is silly, we should remove the OnSuccess hooks and @@ -799,7 +782,6 @@ func (r *Registry) stepThroughStateMachine( // better. return r.stepThroughStateMachine(ctx, phs, resumer, resultsCh, job, StatusReverting, errors.Wrapf(err, "could not mark job %d as succeeded", *job.ID())) } - resumer.OnTerminal(ctx, status, resultsCh) return nil case StatusReverting: if err := job.Reverted(ctx, jobErr, nil); err != nil { @@ -844,7 +826,6 @@ func (r *Registry) stepThroughStateMachine( // restarted during the next adopt loop and reverting will be retried. return errors.Wrapf(err, "job %d: could not mark as failed: %s", *job.ID(), jobErr) } - resumer.OnTerminal(ctx, status, resultsCh) return jobErr default: return errors.AssertionFailedf("job %d: has unsupported status %s", *job.ID(), status) diff --git a/pkg/sql/create_stats.go b/pkg/sql/create_stats.go index e6de6be67891..962451dfcb6f 100644 --- a/pkg/sql/create_stats.go +++ b/pkg/sql/create_stats.go @@ -333,13 +333,10 @@ func (n *createStatsNode) makePlanForExplainDistSQL( } // createStatsResumer implements the jobs.Resumer interface for CreateStats -// jobs. A new instance is created for each job. evalCtx is populated inside -// createStatsResumer.Resume so it can be used in createStatsResumer.OnSuccess -// (if the job is successful). +// jobs. A new instance is created for each job. type createStatsResumer struct { job *jobs.Job tableID sqlbase.ID - evalCtx *extendedEvalContext } var _ jobs.Resumer = &createStatsResumer{} @@ -359,10 +356,10 @@ func (r *createStatsResumer) Resume( } r.tableID = details.Table.ID - r.evalCtx = p.ExtendedEvalContext() + evalCtx := p.ExtendedEvalContext() ci := sqlbase.ColTypeInfoFromColTypes([]types.T{}) - rows := rowcontainer.NewRowContainer(r.evalCtx.Mon.MakeBoundAccount(), ci, 0) + rows := rowcontainer.NewRowContainer(evalCtx.Mon.MakeBoundAccount(), ci, 0) defer func() { if rows != nil { rows.Close(ctx) @@ -370,17 +367,17 @@ func (r *createStatsResumer) Resume( }() dsp := p.DistSQLPlanner() - return p.ExecCfg().DB.Txn(ctx, func(ctx context.Context, txn *client.Txn) error { + if err := p.ExecCfg().DB.Txn(ctx, func(ctx context.Context, txn *client.Txn) error { if details.AsOf != nil { p.semaCtx.AsOfTimestamp = details.AsOf p.extendedEvalCtx.SetTxnTimestamp(details.AsOf.GoTime()) txn.SetFixedTimestamp(ctx, *details.AsOf) } - planCtx := dsp.NewPlanningCtx(ctx, r.evalCtx, txn) + planCtx := dsp.NewPlanningCtx(ctx, evalCtx, txn) planCtx.planner = p if err := dsp.planAndRunCreateStats( - ctx, r.evalCtx, planCtx, txn, r.job, NewRowResultWriter(rows), + ctx, evalCtx, planCtx, txn, r.job, NewRowResultWriter(rows), ); err != nil { // Check if this was a context canceled error and restart if it was. if s, ok := status.FromError(errors.UnwrapAll(err)); ok { @@ -409,6 +406,38 @@ func (r *createStatsResumer) Resume( } return nil + }); err != nil { + return err + } + + // Invalidate the local cache synchronously; this guarantees that the next + // statement in the same session won't use a stale cache (whereas the gossip + // update is handled asynchronously). + evalCtx.ExecCfg.TableStatsCache.InvalidateTableStats(ctx, r.tableID) + + // Record this statistics creation in the event log. + if !createStatsPostEvents.Get(&evalCtx.Settings.SV) { + return nil + } + + // TODO(rytaft): This creates a new transaction for the CREATE STATISTICS + // event. It must be different from the CREATE STATISTICS transaction, + // because that transaction must be read-only. In the future we may want + // to use the transaction that inserted the new stats into the + // system.table_statistics table, but that would require calling + // MakeEventLogger from the distsqlrun package. + return evalCtx.ExecCfg.DB.Txn(ctx, func(ctx context.Context, txn *client.Txn) error { + return MakeEventLogger(evalCtx.ExecCfg).InsertEventRecord( + ctx, + txn, + EventLogCreateStatistics, + int32(details.Table.ID), + int32(evalCtx.NodeID), + struct { + TableName string + Statement string + }{details.FQTableName, details.Statement}, + ) }) } @@ -459,47 +488,6 @@ func checkRunningJobs(ctx context.Context, job *jobs.Job, p *planner) error { // OnFailOrCancel is part of the jobs.Resumer interface. func (r *createStatsResumer) OnFailOrCancel(context.Context, interface{}) error { return nil } -// OnSuccess is part of the jobs.Resumer interface. -func (r *createStatsResumer) OnSuccess(ctx context.Context, _ *client.Txn) error { - details := r.job.Details().(jobspb.CreateStatsDetails) - - // Invalidate the local cache synchronously; this guarantees that the next - // statement in the same session won't use a stale cache (whereas the gossip - // update is handled asynchronously). - r.evalCtx.ExecCfg.TableStatsCache.InvalidateTableStats(ctx, r.tableID) - - // Record this statistics creation in the event log. - if !createStatsPostEvents.Get(&r.evalCtx.Settings.SV) { - return nil - } - - // TODO(rytaft): This creates a new transaction for the CREATE STATISTICS - // event. It must be different from the CREATE STATISTICS transaction, - // because that transaction must be read-only. In the future we may want - // to use the transaction that inserted the new stats into the - // system.table_statistics table, but that would require calling - // MakeEventLogger from the distsqlrun package. - return r.evalCtx.ExecCfg.DB.Txn(ctx, func(ctx context.Context, txn *client.Txn) error { - return MakeEventLogger(r.evalCtx.ExecCfg).InsertEventRecord( - ctx, - txn, - EventLogCreateStatistics, - int32(details.Table.ID), - int32(r.evalCtx.NodeID), - struct { - TableName string - Statement string - }{details.FQTableName, details.Statement}, - ) - }) -} - -// OnTerminal is part of the jobs.Resumer interface. -func (r *createStatsResumer) OnTerminal( - ctx context.Context, status jobs.Status, resultsCh chan<- tree.Datums, -) { -} - func init() { createResumerFn := func(job *jobs.Job, settings *cluster.Settings) jobs.Resumer { return &createStatsResumer{job: job}