Skip to content

Commit

Permalink
*: remove jobs.Resumer.OnTerminal
Browse files Browse the repository at this point in the history
Whatever it was doing can be done in Resume or OnFailOrCancel,
both of which are resumable (and most of these were only doing
things on success anyway so they should just be last step of
Resume).

Release note: none.
  • Loading branch information
dt committed Mar 7, 2020
1 parent 5b17910 commit 51a0d50
Show file tree
Hide file tree
Showing 9 changed files with 41 additions and 119 deletions.
34 changes: 13 additions & 21 deletions pkg/ccl/backupccl/backup_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -505,6 +505,16 @@ func (b *backupResumer) Resume(
if err != nil {
log.Warningf(ctx, "unable to clear stats from job payload: %+v", err)
}
b.deleteCheckpoint(ctx)

resultsCh <- tree.Datums{
tree.NewDInt(tree.DInt(*b.job.ID())),
tree.NewDString(string(jobs.StatusSucceeded)),
tree.NewDFloat(tree.DFloat(1.0)),
tree.NewDInt(tree.DInt(b.res.Rows)),
tree.NewDInt(tree.DInt(b.res.IndexEntries)),
tree.NewDInt(tree.DInt(b.res.DataSize)),
}
return nil
}

Expand All @@ -527,14 +537,12 @@ func (b *backupResumer) clearStats(ctx context.Context, DB *client.DB) error {
}

// OnFailOrCancel is part of the jobs.Resumer interface.
func (b *backupResumer) OnFailOrCancel(context.Context, interface{}) error {
func (b *backupResumer) OnFailOrCancel(ctx context.Context, _ interface{}) error {
b.deleteCheckpoint(ctx)
return nil
}

// OnTerminal is part of the jobs.Resumer interface.
func (b *backupResumer) OnTerminal(
ctx context.Context, status jobs.Status, resultsCh chan<- tree.Datums,
) {
func (b *backupResumer) deleteCheckpoint(ctx context.Context) {
// Attempt to delete BACKUP-CHECKPOINT.
if err := func() error {
details := b.job.Details().(jobspb.BackupDetails)
Expand All @@ -552,22 +560,6 @@ func (b *backupResumer) OnTerminal(
}(); err != nil {
log.Warningf(ctx, "unable to delete checkpointed backup descriptor: %+v", err)
}

if status == jobs.StatusSucceeded {
// TODO(benesch): emit periodic progress updates.

// TODO(mjibson): if a restore was resumed, then these counts will only have
// the current coordinator's counts.

resultsCh <- tree.Datums{
tree.NewDInt(tree.DInt(*b.job.ID())),
tree.NewDString(string(jobs.StatusSucceeded)),
tree.NewDFloat(tree.DFloat(1.0)),
tree.NewDInt(tree.DInt(b.res.Rows)),
tree.NewDInt(tree.DInt(b.res.IndexEntries)),
tree.NewDInt(tree.DInt(b.res.DataSize)),
}
}
}

var _ jobs.Resumer = &backupResumer{}
Expand Down
30 changes: 9 additions & 21 deletions pkg/ccl/backupccl/restore_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -1010,6 +1010,15 @@ func (r *restoreResumer) Resume(
}
}

resultsCh <- tree.Datums{
tree.NewDInt(tree.DInt(*r.job.ID())),
tree.NewDString(string(jobs.StatusSucceeded)),
tree.NewDFloat(tree.DFloat(1.0)),
tree.NewDInt(tree.DInt(r.res.Rows)),
tree.NewDInt(tree.DInt(r.res.IndexEntries)),
tree.NewDInt(tree.DInt(r.res.DataSize)),
}

return nil
}

Expand Down Expand Up @@ -1201,27 +1210,6 @@ func (r *restoreResumer) restoreSystemTables(ctx context.Context) error {
return nil
}

// OnTerminal is part of the jobs.Resumer interface.
func (r *restoreResumer) OnTerminal(
ctx context.Context, status jobs.Status, resultsCh chan<- tree.Datums,
) {
if status == jobs.StatusSucceeded {
// TODO(benesch): emit periodic progress updates.

// TODO(mjibson): if a restore was resumed, then these counts will only have
// the current coordinator's counts.

resultsCh <- tree.Datums{
tree.NewDInt(tree.DInt(*r.job.ID())),
tree.NewDString(string(jobs.StatusSucceeded)),
tree.NewDFloat(tree.DFloat(1.0)),
tree.NewDInt(tree.DInt(r.res.Rows)),
tree.NewDInt(tree.DInt(r.res.IndexEntries)),
tree.NewDInt(tree.DInt(r.res.DataSize)),
}
}
}

var _ jobs.Resumer = &restoreResumer{}

func init() {
Expand Down
3 changes: 0 additions & 3 deletions pkg/ccl/changefeedccl/changefeed_stmt.go
Original file line number Diff line number Diff line change
Expand Up @@ -546,6 +546,3 @@ func (b *changefeedResumer) Resume(

// OnFailOrCancel is part of the jobs.Resumer interface.
func (b *changefeedResumer) OnFailOrCancel(context.Context, interface{}) error { return nil }

// OnTerminal is part of the jobs.Resumer interface.
func (b *changefeedResumer) OnTerminal(context.Context, jobs.Status, chan<- tree.Datums) {}
6 changes: 0 additions & 6 deletions pkg/ccl/importccl/import_processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -489,12 +489,6 @@ func (r *cancellableImportResumer) Resume(
return errors.New("job succeed, but we're forcing it to be paused")
}

func (r *cancellableImportResumer) OnTerminal(
ctx context.Context, status jobs.Status, resultsCh chan<- tree.Datums,
) {
r.wrapped.OnTerminal(ctx, status, resultsCh)
}

func (r *cancellableImportResumer) OnFailOrCancel(ctx context.Context, phs interface{}) error {
// This callback is invoked when an error or cancellation occurs
// during the import. Since our Resume handler returned an
Expand Down
33 changes: 13 additions & 20 deletions pkg/ccl/importccl/import_stmt.go
Original file line number Diff line number Diff line change
Expand Up @@ -1061,6 +1061,19 @@ func (r *importResumer) Resume(
log.Errorf(ctx, "failed to release protected timestamp: %v", err)
}
}

telemetry.CountBucketed("import.rows", r.res.Rows)
const mb = 1 << 20
telemetry.CountBucketed("import.size-mb", r.res.DataSize/mb)
resultsCh <- tree.Datums{
tree.NewDInt(tree.DInt(*r.job.ID())),
tree.NewDString(string(jobs.StatusSucceeded)),
tree.NewDFloat(tree.DFloat(1.0)),
tree.NewDInt(tree.DInt(r.res.Rows)),
tree.NewDInt(tree.DInt(r.res.IndexEntries)),
tree.NewDInt(tree.DInt(r.res.DataSize)),
}

return nil
}

Expand Down Expand Up @@ -1263,26 +1276,6 @@ func (r *importResumer) dropTables(ctx context.Context, txn *client.Txn) error {
return errors.Wrap(txn.Run(ctx, b), "rolling back tables")
}

// OnTerminal is part of the jobs.Resumer interface.
func (r *importResumer) OnTerminal(
ctx context.Context, status jobs.Status, resultsCh chan<- tree.Datums,
) {
if status == jobs.StatusSucceeded {
telemetry.CountBucketed("import.rows", r.res.Rows)
const mb = 1 << 20
telemetry.CountBucketed("import.size-mb", r.res.DataSize/mb)

resultsCh <- tree.Datums{
tree.NewDInt(tree.DInt(*r.job.ID())),
tree.NewDString(string(jobs.StatusSucceeded)),
tree.NewDFloat(tree.DFloat(1.0)),
tree.NewDInt(tree.DInt(r.res.Rows)),
tree.NewDInt(tree.DInt(r.res.IndexEntries)),
tree.NewDInt(tree.DInt(r.res.DataSize)),
}
}
}

var _ jobs.Resumer = &importResumer{}

func init() {
Expand Down
7 changes: 0 additions & 7 deletions pkg/jobs/helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ type FakeResumer struct {
OnResume func(context.Context) error
FailOrCancel func(context.Context) error
Success func() error
Terminal func()
}

func (d FakeResumer) Resume(ctx context.Context, _ interface{}, _ chan<- tree.Datums) error {
Expand All @@ -52,10 +51,4 @@ func (d FakeResumer) OnFailOrCancel(ctx context.Context, _ interface{}) error {
return nil
}

func (d FakeResumer) OnTerminal(_ context.Context, _ Status, _ chan<- tree.Datums) {
if d.Terminal != nil {
d.Terminal()
}
}

var _ Resumer = FakeResumer{}
32 changes: 6 additions & 26 deletions pkg/jobs/jobs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ func TestJobsTableProgressFamily(t *testing.T) {
}

type counters struct {
ResumeExit, OnFailOrCancelExit, Terminal int
ResumeExit, OnFailOrCancelExit int
// These sometimes retry so just use bool.
ResumeStart, OnFailOrCancelStart, Success bool
}
Expand Down Expand Up @@ -247,14 +247,6 @@ func (rts *registryTestSuite) setUp(t *testing.T) {
rts.mu.a.Success = true
return rts.successErr
},

Terminal: func() {
t.Log("Starting terminal")
rts.mu.Lock()
rts.mu.a.Terminal++
rts.mu.Unlock()
t.Log("Exiting terminal")
},
}
})
}
Expand Down Expand Up @@ -320,7 +312,6 @@ func TestRegistryLifecycle(t *testing.T) {
rts.resumeCh <- nil
rts.mu.e.ResumeExit++
rts.mu.e.Success = true
rts.mu.e.Terminal++
rts.check(t, jobs.StatusSucceeded)
})

Expand All @@ -342,7 +333,6 @@ func TestRegistryLifecycle(t *testing.T) {
rts.resumeCh <- nil
rts.mu.e.ResumeExit++
rts.mu.e.Success = true
rts.mu.e.Terminal++
rts.check(t, jobs.StatusSucceeded)
})

Expand Down Expand Up @@ -377,7 +367,6 @@ func TestRegistryLifecycle(t *testing.T) {
rts.mu.e.ResumeExit++

rts.mu.e.Success = true
rts.mu.e.Terminal++
rts.check(t, jobs.StatusSucceeded)
})

Expand Down Expand Up @@ -415,7 +404,6 @@ func TestRegistryLifecycle(t *testing.T) {

rts.failOrCancelCh <- nil
rts.mu.e.OnFailOrCancelExit++
rts.mu.e.Terminal++
rts.check(t, jobs.StatusFailed)
})

Expand All @@ -440,7 +428,6 @@ func TestRegistryLifecycle(t *testing.T) {
rts.failOrCancelCheckCh <- struct{}{}
rts.failOrCancelCh <- nil
rts.mu.e.OnFailOrCancelExit++
rts.mu.e.Terminal++

rts.check(t, jobs.StatusCanceled)
})
Expand Down Expand Up @@ -492,7 +479,6 @@ func TestRegistryLifecycle(t *testing.T) {

rts.failOrCancelCh <- nil
rts.mu.e.OnFailOrCancelExit++
rts.mu.e.Terminal++
rts.check(t, jobs.StatusCanceled)
})

Expand Down Expand Up @@ -530,7 +516,6 @@ func TestRegistryLifecycle(t *testing.T) {

rts.failOrCancelCh <- nil
rts.mu.e.OnFailOrCancelExit++
rts.mu.e.Terminal++
rts.check(t, jobs.StatusFailed)
})

Expand Down Expand Up @@ -637,7 +622,6 @@ func TestRegistryLifecycle(t *testing.T) {
rts.resumeCh <- nil
rts.mu.e.ResumeExit++
rts.mu.e.Success = true
rts.mu.e.Terminal++
rts.check(t, jobs.StatusSucceeded)
})

Expand All @@ -664,12 +648,10 @@ func TestRegistryLifecycle(t *testing.T) {

rts.failOrCancelCh <- nil
rts.mu.e.OnFailOrCancelExit++
rts.mu.e.Terminal++
rts.check(t, jobs.StatusFailed)
})

// Attempt to mark success, but fail, but fail that also. Thus it should not
// trigger OnTerminal.
// Attempt to mark success, but fail, but fail that also.
t.Run("fail marking success and fail OnFailOrCancel", func(t *testing.T) {
rts := registryTestSuite{}
rts.setUp(t)
Expand Down Expand Up @@ -698,11 +680,10 @@ func TestRegistryLifecycle(t *testing.T) {
rts.failOrCancelCheckCh <- struct{}{}
rts.mu.e.OnFailOrCancelExit++
rts.failOrCancelCh <- errors.New("reverting failed")
rts.mu.e.Terminal++
rts.check(t, jobs.StatusFailed)
})

// Fail the job, but also fail to mark it failed. No OnTerminal.
// Fail the job, but also fail to mark it failed.
t.Run("fail marking failed", func(t *testing.T) {
rts := registryTestSuite{}
rts.setUp(t)
Expand Down Expand Up @@ -730,7 +711,6 @@ func TestRegistryLifecycle(t *testing.T) {
// But let it fail.
rts.mu.e.OnFailOrCancelExit++
rts.failOrCancelCh <- errors.New("resume failed")
rts.mu.e.Terminal++
rts.check(t, jobs.StatusFailed)
})

Expand Down Expand Up @@ -1856,12 +1836,12 @@ func TestJobInTxn(t *testing.T) {
return jobs.FakeResumer{
OnResume: func(ctx context.Context) error {
t.Logf("Resuming job: %+v", job.Payload())
atomic.AddInt32(&hasRun, 1)
return nil
},
Terminal: func() {
t.Logf("Finished job: %+v", job.Payload())
// Inc instead of just storing 1 to count how many jobs ran.
FailOrCancel: func(ctx context.Context) error {
atomic.AddInt32(&hasRun, 1)
return nil
},
}
})
Expand Down
9 changes: 0 additions & 9 deletions pkg/jobs/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -665,12 +665,6 @@ type Resumer interface {
// is a sql.PlanHookState.
Resume(ctx context.Context, phs interface{}, resultsCh chan<- tree.Datums) error

// OnTerminal is called after a job has successfully been marked as
// terminal. It should be used to perform optional cleanup and return final
// results to the user. There is no guarantee that this function is ever run
// (for example, if a node died immediately after Success commits).
OnTerminal(ctx context.Context, status Status, resultsCh chan<- tree.Datums)

// OnFailOrCancel is called when a job fails or is cancel-requested.
//
// This method will be called when a registry notices the cancel request,
Expand Down Expand Up @@ -774,7 +768,6 @@ func (r *Registry) stepThroughStateMachine(
// restarted during the next adopt loop and reverting will be retried.
return errors.Wrapf(err, "job %d: could not mark as canceled: %s", *job.ID(), jobErr)
}
resumer.OnTerminal(ctx, status, resultsCh)
return errors.Errorf("job %s", status)
case StatusSucceeded:
if jobErr != nil {
Expand All @@ -789,7 +782,6 @@ func (r *Registry) stepThroughStateMachine(
// better.
return r.stepThroughStateMachine(ctx, phs, resumer, resultsCh, job, StatusReverting, errors.Wrapf(err, "could not mark job %d as succeeded", *job.ID()))
}
resumer.OnTerminal(ctx, status, resultsCh)
return nil
case StatusReverting:
if err := job.Reverted(ctx, jobErr, nil); err != nil {
Expand Down Expand Up @@ -834,7 +826,6 @@ func (r *Registry) stepThroughStateMachine(
// restarted during the next adopt loop and reverting will be retried.
return errors.Wrapf(err, "job %d: could not mark as failed: %s", *job.ID(), jobErr)
}
resumer.OnTerminal(ctx, status, resultsCh)
return jobErr
default:
return errors.AssertionFailedf("job %d: has unsupported status %s", *job.ID(), status)
Expand Down
6 changes: 0 additions & 6 deletions pkg/sql/create_stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -488,12 +488,6 @@ func checkRunningJobs(ctx context.Context, job *jobs.Job, p *planner) error {
// OnFailOrCancel is part of the jobs.Resumer interface.
func (r *createStatsResumer) OnFailOrCancel(context.Context, interface{}) error { return nil }

// OnTerminal is part of the jobs.Resumer interface.
func (r *createStatsResumer) OnTerminal(
ctx context.Context, status jobs.Status, resultsCh chan<- tree.Datums,
) {
}

func init() {
createResumerFn := func(job *jobs.Job, settings *cluster.Settings) jobs.Resumer {
return &createStatsResumer{job: job}
Expand Down

0 comments on commit 51a0d50

Please sign in to comment.