Skip to content

Commit

Permalink
Merge pull request #45829 from dt/on-term
Browse files Browse the repository at this point in the history
jobs, *: remove job Resumer OnSuccess and OnTerminal
  • Loading branch information
dt authored Mar 7, 2020
2 parents 50b31b6 + 51a0d50 commit eac401f
Show file tree
Hide file tree
Showing 9 changed files with 91 additions and 238 deletions.
37 changes: 13 additions & 24 deletions pkg/ccl/backupccl/backup_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -505,6 +505,16 @@ func (b *backupResumer) Resume(
if err != nil {
log.Warningf(ctx, "unable to clear stats from job payload: %+v", err)
}
b.deleteCheckpoint(ctx)

resultsCh <- tree.Datums{
tree.NewDInt(tree.DInt(*b.job.ID())),
tree.NewDString(string(jobs.StatusSucceeded)),
tree.NewDFloat(tree.DFloat(1.0)),
tree.NewDInt(tree.DInt(b.res.Rows)),
tree.NewDInt(tree.DInt(b.res.IndexEntries)),
tree.NewDInt(tree.DInt(b.res.DataSize)),
}
return nil
}

Expand All @@ -527,17 +537,12 @@ func (b *backupResumer) clearStats(ctx context.Context, DB *client.DB) error {
}

// OnFailOrCancel is part of the jobs.Resumer interface.
func (b *backupResumer) OnFailOrCancel(context.Context, interface{}) error {
func (b *backupResumer) OnFailOrCancel(ctx context.Context, _ interface{}) error {
b.deleteCheckpoint(ctx)
return nil
}

// OnSuccess is part of the jobs.Resumer interface.
func (b *backupResumer) OnSuccess(context.Context, *client.Txn) error { return nil }

// OnTerminal is part of the jobs.Resumer interface.
func (b *backupResumer) OnTerminal(
ctx context.Context, status jobs.Status, resultsCh chan<- tree.Datums,
) {
func (b *backupResumer) deleteCheckpoint(ctx context.Context) {
// Attempt to delete BACKUP-CHECKPOINT.
if err := func() error {
details := b.job.Details().(jobspb.BackupDetails)
Expand All @@ -555,22 +560,6 @@ func (b *backupResumer) OnTerminal(
}(); err != nil {
log.Warningf(ctx, "unable to delete checkpointed backup descriptor: %+v", err)
}

if status == jobs.StatusSucceeded {
// TODO(benesch): emit periodic progress updates.

// TODO(mjibson): if a restore was resumed, then these counts will only have
// the current coordinator's counts.

resultsCh <- tree.Datums{
tree.NewDInt(tree.DInt(*b.job.ID())),
tree.NewDString(string(jobs.StatusSucceeded)),
tree.NewDFloat(tree.DFloat(1.0)),
tree.NewDInt(tree.DInt(b.res.Rows)),
tree.NewDInt(tree.DInt(b.res.IndexEntries)),
tree.NewDInt(tree.DInt(b.res.DataSize)),
}
}
}

var _ jobs.Resumer = &backupResumer{}
Expand Down
35 changes: 9 additions & 26 deletions pkg/ccl/backupccl/restore_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -1010,6 +1010,15 @@ func (r *restoreResumer) Resume(
}
}

resultsCh <- tree.Datums{
tree.NewDInt(tree.DInt(*r.job.ID())),
tree.NewDString(string(jobs.StatusSucceeded)),
tree.NewDFloat(tree.DFloat(1.0)),
tree.NewDInt(tree.DInt(r.res.Rows)),
tree.NewDInt(tree.DInt(r.res.IndexEntries)),
tree.NewDInt(tree.DInt(r.res.DataSize)),
}

return nil
}

Expand Down Expand Up @@ -1162,11 +1171,6 @@ func (r *restoreResumer) dropTables(ctx context.Context, txn *client.Txn) error
return nil
}

// OnSuccess is part of the jobs.Resumer interface.
func (r *restoreResumer) OnSuccess(ctx context.Context, txn *client.Txn) error {
return nil
}

// restoreSystemTables atomically replaces the contents of the system tables
// with the data from the restored system tables.
func (r *restoreResumer) restoreSystemTables(ctx context.Context) error {
Expand Down Expand Up @@ -1206,27 +1210,6 @@ func (r *restoreResumer) restoreSystemTables(ctx context.Context) error {
return nil
}

// OnTerminal is part of the jobs.Resumer interface.
func (r *restoreResumer) OnTerminal(
ctx context.Context, status jobs.Status, resultsCh chan<- tree.Datums,
) {
if status == jobs.StatusSucceeded {
// TODO(benesch): emit periodic progress updates.

// TODO(mjibson): if a restore was resumed, then these counts will only have
// the current coordinator's counts.

resultsCh <- tree.Datums{
tree.NewDInt(tree.DInt(*r.job.ID())),
tree.NewDString(string(jobs.StatusSucceeded)),
tree.NewDFloat(tree.DFloat(1.0)),
tree.NewDInt(tree.DInt(r.res.Rows)),
tree.NewDInt(tree.DInt(r.res.IndexEntries)),
tree.NewDInt(tree.DInt(r.res.DataSize)),
}
}
}

var _ jobs.Resumer = &restoreResumer{}

func init() {
Expand Down
7 changes: 0 additions & 7 deletions pkg/ccl/changefeedccl/changefeed_stmt.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/ccl/backupccl"
"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/changefeedbase"
"github.com/cockroachdb/cockroach/pkg/ccl/utilccl"
"github.com/cockroachdb/cockroach/pkg/internal/client"
"github.com/cockroachdb/cockroach/pkg/jobs"
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/keys"
Expand Down Expand Up @@ -547,9 +546,3 @@ func (b *changefeedResumer) Resume(

// OnFailOrCancel is part of the jobs.Resumer interface.
func (b *changefeedResumer) OnFailOrCancel(context.Context, interface{}) error { return nil }

// OnSuccess is part of the jobs.Resumer interface.
func (b *changefeedResumer) OnSuccess(context.Context, *client.Txn) error { return nil }

// OnTerminal is part of the jobs.Resumer interface.
func (b *changefeedResumer) OnTerminal(context.Context, jobs.Status, chan<- tree.Datums) {}
15 changes: 4 additions & 11 deletions pkg/ccl/importccl/import_processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -480,25 +480,18 @@ func (r *cancellableImportResumer) Resume(
) error {
r.jobID = *r.wrapped.job.ID()
r.jobIDCh <- r.jobID
return r.wrapped.Resume(r.ctx, phs, resultsCh)
}

func (r *cancellableImportResumer) OnSuccess(ctx context.Context, txn *client.Txn) error {
if err := r.wrapped.Resume(r.ctx, phs, resultsCh); err != nil {
return err
}
if r.onSuccessBarrier != nil {
defer r.onSuccessBarrier.Enter()()
}
return errors.New("job succeed, but we're forcing it to be paused")
}

func (r *cancellableImportResumer) OnTerminal(
ctx context.Context, status jobs.Status, resultsCh chan<- tree.Datums,
) {
r.wrapped.OnTerminal(ctx, status, resultsCh)
}

func (r *cancellableImportResumer) OnFailOrCancel(ctx context.Context, phs interface{}) error {
// This callback is invoked when an error or cancellation occurs
// during the import. Since our OnSuccess handler returned an
// during the import. Since our Resume handler returned an
// error (after pausing the job), we need to short-circuits
// jobs machinery so that this job is not marked as failed.
return errors.New("bail out")
Expand Down
42 changes: 14 additions & 28 deletions pkg/ccl/importccl/import_stmt.go
Original file line number Diff line number Diff line change
Expand Up @@ -1053,16 +1053,27 @@ func (r *importResumer) Resume(
}
// TODO(ajwerner): Should this actually return the error? At this point we've
// successfully finished the import but failed to drop the protected
// timestamp. The reconciliation loop ought to pick it up. Ideally we'd do
// this in OnSuccess but we don't have access to the protectedts.Storage
// there.
// timestamp. The reconciliation loop ought to pick it up.
if ptsID != nil && !r.testingKnobs.ignoreProtectedTimestamps {
if err := p.ExecCfg().DB.Txn(ctx, func(ctx context.Context, txn *client.Txn) error {
return r.releaseProtectedTimestamp(ctx, txn, p.ExecCfg().ProtectedTimestampProvider)
}); err != nil {
log.Errorf(ctx, "failed to release protected timestamp: %v", err)
}
}

telemetry.CountBucketed("import.rows", r.res.Rows)
const mb = 1 << 20
telemetry.CountBucketed("import.size-mb", r.res.DataSize/mb)
resultsCh <- tree.Datums{
tree.NewDInt(tree.DInt(*r.job.ID())),
tree.NewDString(string(jobs.StatusSucceeded)),
tree.NewDFloat(tree.DFloat(1.0)),
tree.NewDInt(tree.DInt(r.res.Rows)),
tree.NewDInt(tree.DInt(r.res.IndexEntries)),
tree.NewDInt(tree.DInt(r.res.DataSize)),
}

return nil
}

Expand Down Expand Up @@ -1265,31 +1276,6 @@ func (r *importResumer) dropTables(ctx context.Context, txn *client.Txn) error {
return errors.Wrap(txn.Run(ctx, b), "rolling back tables")
}

// OnSuccess is part of the jobs.Resumer interface.
func (r *importResumer) OnSuccess(ctx context.Context, txn *client.Txn) error {
return nil
}

// OnTerminal is part of the jobs.Resumer interface.
func (r *importResumer) OnTerminal(
ctx context.Context, status jobs.Status, resultsCh chan<- tree.Datums,
) {
if status == jobs.StatusSucceeded {
telemetry.CountBucketed("import.rows", r.res.Rows)
const mb = 1 << 20
telemetry.CountBucketed("import.size-mb", r.res.DataSize/mb)

resultsCh <- tree.Datums{
tree.NewDInt(tree.DInt(*r.job.ID())),
tree.NewDString(string(jobs.StatusSucceeded)),
tree.NewDFloat(tree.DFloat(1.0)),
tree.NewDInt(tree.DInt(r.res.Rows)),
tree.NewDInt(tree.DInt(r.res.IndexEntries)),
tree.NewDInt(tree.DInt(r.res.DataSize)),
}
}
}

var _ jobs.Resumer = &importResumer{}

func init() {
Expand Down
22 changes: 6 additions & 16 deletions pkg/jobs/helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ package jobs
import (
"context"

"github.com/cockroachdb/cockroach/pkg/internal/client"
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
)
Expand All @@ -31,12 +30,16 @@ type FakeResumer struct {
OnResume func(context.Context) error
FailOrCancel func(context.Context) error
Success func() error
Terminal func()
}

func (d FakeResumer) Resume(ctx context.Context, _ interface{}, _ chan<- tree.Datums) error {
if d.OnResume != nil {
return d.OnResume(ctx)
if err := d.OnResume(ctx); err != nil {
return err
}
}
if d.Success != nil {
return d.Success()
}
return nil
}
Expand All @@ -48,17 +51,4 @@ func (d FakeResumer) OnFailOrCancel(ctx context.Context, _ interface{}) error {
return nil
}

func (d FakeResumer) OnSuccess(_ context.Context, _ *client.Txn) error {
if d.Success != nil {
return d.Success()
}
return nil
}

func (d FakeResumer) OnTerminal(_ context.Context, _ Status, _ chan<- tree.Datums) {
if d.Terminal != nil {
d.Terminal()
}
}

var _ Resumer = FakeResumer{}
Loading

0 comments on commit eac401f

Please sign in to comment.