Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

jobs, *: remove job Resumer OnSuccess and OnTerminal #45829

Merged
merged 2 commits into from
Mar 7, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 13 additions & 24 deletions pkg/ccl/backupccl/backup_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -505,6 +505,16 @@ func (b *backupResumer) Resume(
if err != nil {
log.Warningf(ctx, "unable to clear stats from job payload: %+v", err)
}
b.deleteCheckpoint(ctx)

resultsCh <- tree.Datums{
tree.NewDInt(tree.DInt(*b.job.ID())),
tree.NewDString(string(jobs.StatusSucceeded)),
tree.NewDFloat(tree.DFloat(1.0)),
tree.NewDInt(tree.DInt(b.res.Rows)),
tree.NewDInt(tree.DInt(b.res.IndexEntries)),
tree.NewDInt(tree.DInt(b.res.DataSize)),
}
return nil
}

Expand All @@ -527,17 +537,12 @@ func (b *backupResumer) clearStats(ctx context.Context, DB *client.DB) error {
}

// OnFailOrCancel is part of the jobs.Resumer interface.
func (b *backupResumer) OnFailOrCancel(context.Context, interface{}) error {
func (b *backupResumer) OnFailOrCancel(ctx context.Context, _ interface{}) error {
b.deleteCheckpoint(ctx)
return nil
}

// OnSuccess is part of the jobs.Resumer interface.
func (b *backupResumer) OnSuccess(context.Context, *client.Txn) error { return nil }

// OnTerminal is part of the jobs.Resumer interface.
func (b *backupResumer) OnTerminal(
ctx context.Context, status jobs.Status, resultsCh chan<- tree.Datums,
) {
func (b *backupResumer) deleteCheckpoint(ctx context.Context) {
// Attempt to delete BACKUP-CHECKPOINT.
if err := func() error {
details := b.job.Details().(jobspb.BackupDetails)
Expand All @@ -555,22 +560,6 @@ func (b *backupResumer) OnTerminal(
}(); err != nil {
log.Warningf(ctx, "unable to delete checkpointed backup descriptor: %+v", err)
}

if status == jobs.StatusSucceeded {
// TODO(benesch): emit periodic progress updates.

// TODO(mjibson): if a restore was resumed, then these counts will only have
// the current coordinator's counts.

resultsCh <- tree.Datums{
tree.NewDInt(tree.DInt(*b.job.ID())),
tree.NewDString(string(jobs.StatusSucceeded)),
tree.NewDFloat(tree.DFloat(1.0)),
tree.NewDInt(tree.DInt(b.res.Rows)),
tree.NewDInt(tree.DInt(b.res.IndexEntries)),
tree.NewDInt(tree.DInt(b.res.DataSize)),
}
}
}

var _ jobs.Resumer = &backupResumer{}
Expand Down
35 changes: 9 additions & 26 deletions pkg/ccl/backupccl/restore_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -1010,6 +1010,15 @@ func (r *restoreResumer) Resume(
}
}

resultsCh <- tree.Datums{
tree.NewDInt(tree.DInt(*r.job.ID())),
tree.NewDString(string(jobs.StatusSucceeded)),
tree.NewDFloat(tree.DFloat(1.0)),
tree.NewDInt(tree.DInt(r.res.Rows)),
tree.NewDInt(tree.DInt(r.res.IndexEntries)),
tree.NewDInt(tree.DInt(r.res.DataSize)),
}

return nil
}

Expand Down Expand Up @@ -1162,11 +1171,6 @@ func (r *restoreResumer) dropTables(ctx context.Context, txn *client.Txn) error
return nil
}

// OnSuccess is part of the jobs.Resumer interface.
func (r *restoreResumer) OnSuccess(ctx context.Context, txn *client.Txn) error {
return nil
}

// restoreSystemTables atomically replaces the contents of the system tables
// with the data from the restored system tables.
func (r *restoreResumer) restoreSystemTables(ctx context.Context) error {
Expand Down Expand Up @@ -1206,27 +1210,6 @@ func (r *restoreResumer) restoreSystemTables(ctx context.Context) error {
return nil
}

// OnTerminal is part of the jobs.Resumer interface.
func (r *restoreResumer) OnTerminal(
ctx context.Context, status jobs.Status, resultsCh chan<- tree.Datums,
) {
if status == jobs.StatusSucceeded {
// TODO(benesch): emit periodic progress updates.

// TODO(mjibson): if a restore was resumed, then these counts will only have
// the current coordinator's counts.

resultsCh <- tree.Datums{
tree.NewDInt(tree.DInt(*r.job.ID())),
tree.NewDString(string(jobs.StatusSucceeded)),
tree.NewDFloat(tree.DFloat(1.0)),
tree.NewDInt(tree.DInt(r.res.Rows)),
tree.NewDInt(tree.DInt(r.res.IndexEntries)),
tree.NewDInt(tree.DInt(r.res.DataSize)),
}
}
}

var _ jobs.Resumer = &restoreResumer{}

func init() {
Expand Down
7 changes: 0 additions & 7 deletions pkg/ccl/changefeedccl/changefeed_stmt.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/ccl/backupccl"
"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/changefeedbase"
"github.com/cockroachdb/cockroach/pkg/ccl/utilccl"
"github.com/cockroachdb/cockroach/pkg/internal/client"
"github.com/cockroachdb/cockroach/pkg/jobs"
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/keys"
Expand Down Expand Up @@ -547,9 +546,3 @@ func (b *changefeedResumer) Resume(

// OnFailOrCancel is part of the jobs.Resumer interface.
func (b *changefeedResumer) OnFailOrCancel(context.Context, interface{}) error { return nil }

// OnSuccess is part of the jobs.Resumer interface.
func (b *changefeedResumer) OnSuccess(context.Context, *client.Txn) error { return nil }

// OnTerminal is part of the jobs.Resumer interface.
func (b *changefeedResumer) OnTerminal(context.Context, jobs.Status, chan<- tree.Datums) {}
15 changes: 4 additions & 11 deletions pkg/ccl/importccl/import_processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -480,25 +480,18 @@ func (r *cancellableImportResumer) Resume(
) error {
r.jobID = *r.wrapped.job.ID()
r.jobIDCh <- r.jobID
return r.wrapped.Resume(r.ctx, phs, resultsCh)
}

func (r *cancellableImportResumer) OnSuccess(ctx context.Context, txn *client.Txn) error {
if err := r.wrapped.Resume(r.ctx, phs, resultsCh); err != nil {
return err
}
if r.onSuccessBarrier != nil {
defer r.onSuccessBarrier.Enter()()
}
return errors.New("job succeed, but we're forcing it to be paused")
}

func (r *cancellableImportResumer) OnTerminal(
ctx context.Context, status jobs.Status, resultsCh chan<- tree.Datums,
) {
r.wrapped.OnTerminal(ctx, status, resultsCh)
}

func (r *cancellableImportResumer) OnFailOrCancel(ctx context.Context, phs interface{}) error {
// This callback is invoked when an error or cancellation occurs
// during the import. Since our OnSuccess handler returned an
// during the import. Since our Resume handler returned an
// error (after pausing the job), we need to short-circuits
// jobs machinery so that this job is not marked as failed.
return errors.New("bail out")
Expand Down
42 changes: 14 additions & 28 deletions pkg/ccl/importccl/import_stmt.go
Original file line number Diff line number Diff line change
Expand Up @@ -1053,16 +1053,27 @@ func (r *importResumer) Resume(
}
// TODO(ajwerner): Should this actually return the error? At this point we've
// successfully finished the import but failed to drop the protected
// timestamp. The reconciliation loop ought to pick it up. Ideally we'd do
// this in OnSuccess but we don't have access to the protectedts.Storage
// there.
// timestamp. The reconciliation loop ought to pick it up.
if ptsID != nil && !r.testingKnobs.ignoreProtectedTimestamps {
if err := p.ExecCfg().DB.Txn(ctx, func(ctx context.Context, txn *client.Txn) error {
return r.releaseProtectedTimestamp(ctx, txn, p.ExecCfg().ProtectedTimestampProvider)
}); err != nil {
log.Errorf(ctx, "failed to release protected timestamp: %v", err)
}
}

telemetry.CountBucketed("import.rows", r.res.Rows)
const mb = 1 << 20
telemetry.CountBucketed("import.size-mb", r.res.DataSize/mb)
resultsCh <- tree.Datums{
tree.NewDInt(tree.DInt(*r.job.ID())),
tree.NewDString(string(jobs.StatusSucceeded)),
tree.NewDFloat(tree.DFloat(1.0)),
tree.NewDInt(tree.DInt(r.res.Rows)),
tree.NewDInt(tree.DInt(r.res.IndexEntries)),
tree.NewDInt(tree.DInt(r.res.DataSize)),
}

return nil
}

Expand Down Expand Up @@ -1265,31 +1276,6 @@ func (r *importResumer) dropTables(ctx context.Context, txn *client.Txn) error {
return errors.Wrap(txn.Run(ctx, b), "rolling back tables")
}

// OnSuccess is part of the jobs.Resumer interface.
func (r *importResumer) OnSuccess(ctx context.Context, txn *client.Txn) error {
return nil
}

// OnTerminal is part of the jobs.Resumer interface.
func (r *importResumer) OnTerminal(
ctx context.Context, status jobs.Status, resultsCh chan<- tree.Datums,
) {
if status == jobs.StatusSucceeded {
telemetry.CountBucketed("import.rows", r.res.Rows)
const mb = 1 << 20
telemetry.CountBucketed("import.size-mb", r.res.DataSize/mb)

resultsCh <- tree.Datums{
tree.NewDInt(tree.DInt(*r.job.ID())),
tree.NewDString(string(jobs.StatusSucceeded)),
tree.NewDFloat(tree.DFloat(1.0)),
tree.NewDInt(tree.DInt(r.res.Rows)),
tree.NewDInt(tree.DInt(r.res.IndexEntries)),
tree.NewDInt(tree.DInt(r.res.DataSize)),
}
}
}

var _ jobs.Resumer = &importResumer{}

func init() {
Expand Down
22 changes: 6 additions & 16 deletions pkg/jobs/helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ package jobs
import (
"context"

"github.com/cockroachdb/cockroach/pkg/internal/client"
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
)
Expand All @@ -31,12 +30,16 @@ type FakeResumer struct {
OnResume func(context.Context) error
FailOrCancel func(context.Context) error
Success func() error
Terminal func()
}

func (d FakeResumer) Resume(ctx context.Context, _ interface{}, _ chan<- tree.Datums) error {
if d.OnResume != nil {
return d.OnResume(ctx)
if err := d.OnResume(ctx); err != nil {
return err
}
}
if d.Success != nil {
return d.Success()
}
return nil
}
Expand All @@ -48,17 +51,4 @@ func (d FakeResumer) OnFailOrCancel(ctx context.Context, _ interface{}) error {
return nil
}

func (d FakeResumer) OnSuccess(_ context.Context, _ *client.Txn) error {
if d.Success != nil {
return d.Success()
}
return nil
}

func (d FakeResumer) OnTerminal(_ context.Context, _ Status, _ chan<- tree.Datums) {
if d.Terminal != nil {
d.Terminal()
}
}

var _ Resumer = FakeResumer{}
Loading