Skip to content

Commit

Permalink
Merge #72351
Browse files Browse the repository at this point in the history
72351: jobs: fix improperly wrapped errors r=ajwerner a=rafiss

refs #42510

I'm working on a linter that detects errors that are not wrapped
correctly, and it discovered these.

Release note: None

Co-authored-by: Rafi Shamim <[email protected]>
  • Loading branch information
craig[bot] and rafiss committed Nov 3, 2021
2 parents 4f86222 + bf60012 commit 05d3662
Show file tree
Hide file tree
Showing 5 changed files with 22 additions and 11 deletions.
2 changes: 1 addition & 1 deletion pkg/jobs/adopt.go
Original file line number Diff line number Diff line change
Expand Up @@ -481,7 +481,7 @@ func (r *Registry) servePauseAndCancelRequests(ctx context.Context, s sqllivenes
}
return nil
}); err != nil {
return errors.Wrapf(err, "job %d: tried to cancel but could not mark as reverting: %s", id, err)
return errors.Wrapf(err, "job %d: tried to cancel but could not mark as reverting", id)
}
log.Infof(ctx, "job %d, session id: %s canceled: the job is now reverting",
id, s.ID())
Expand Down
4 changes: 2 additions & 2 deletions pkg/jobs/jobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -452,8 +452,8 @@ func (j *Job) cancelRequested(
}
if md.Status == StatusPaused && md.Payload.FinalResumeError != nil {
decodedErr := errors.DecodeError(ctx, *md.Payload.FinalResumeError)
return fmt.Errorf("job %d is paused and has non-nil FinalResumeError "+
"%s hence cannot be canceled and should be reverted", j.ID(), decodedErr.Error())
return errors.Wrapf(decodedErr, "job %d is paused and has non-nil FinalResumeError "+
"hence cannot be canceled and should be reverted", j.ID())
}
if fn != nil {
if err := fn(ctx, txn); err != nil {
Expand Down
2 changes: 1 addition & 1 deletion pkg/jobs/jobs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -587,7 +587,7 @@ func TestRegistryLifecycle(t *testing.T) {
rts.sqlDB.Exec(t, "PAUSE JOB $1", j.ID())
rts.check(t, jobs.StatusPaused)

rts.sqlDB.ExpectErr(t, "paused and has non-nil FinalResumeError resume", "CANCEL JOB $1", j.ID())
rts.sqlDB.ExpectErr(t, "paused and has non-nil FinalResumeError .* resume failed", "CANCEL JOB $1", j.ID())
rts.check(t, jobs.StatusPaused)

rts.sqlDB.Exec(t, "RESUME JOB $1", j.ID())
Expand Down
5 changes: 2 additions & 3 deletions pkg/jobs/jobsprotectedts/jobs_protected_ts_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ package jobsprotectedts_test

import (
"context"
"fmt"
"io"
"testing"

Expand Down Expand Up @@ -100,7 +99,7 @@ func TestJobsProtectedTimestamp(t *testing.T) {
if errors.Is(err, protectedts.ErrNotExists) {
return nil
}
return fmt.Errorf("waiting for %v, got %v", protectedts.ErrNotExists, err)
return errors.NewAssertionErrorWithWrappedErrf(err, "waiting for ErrNotExists")
}
testutils.SucceedsSoon(t, func() (err error) {
return s0.DB().Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
Expand Down Expand Up @@ -177,7 +176,7 @@ func TestSchedulesProtectedTimestamp(t *testing.T) {
if errors.Is(err, protectedts.ErrNotExists) {
return nil
}
return fmt.Errorf("waiting for %v, got %v", protectedts.ErrNotExists, err)
return errors.NewAssertionErrorWithWrappedErrf(err, "waiting for ErrNotExists")
}
testutils.SucceedsSoon(t, func() (err error) {
return s0.DB().Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
Expand Down
20 changes: 16 additions & 4 deletions pkg/jobs/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -1241,7 +1241,10 @@ func (r *Registry) stepThroughStateMachine(
if err := job.canceled(ctx, nil /* txn */, nil /* fn */); err != nil {
// If we can't transactionally mark the job as canceled then it will be
// restarted during the next adopt loop and reverting will be retried.
return errors.Wrapf(err, "job %d: could not mark as canceled: %v", job.ID(), jobErr)
return errors.WithSecondaryError(
errors.Wrapf(err, "job %d: could not mark as canceled", job.ID()),
jobErr,
)
}
telemetry.Inc(TelemetryMetrics[jobType].Canceled)
return errors.WithSecondaryError(errors.Errorf("job %s", status), jobErr)
Expand All @@ -1264,7 +1267,10 @@ func (r *Registry) stepThroughStateMachine(
if err := job.reverted(ctx, nil /* txn */, jobErr, nil /* fn */); err != nil {
// If we can't transactionally mark the job as reverting then it will be
// restarted during the next adopt loop and it will be retried.
return errors.Wrapf(err, "job %d: could not mark as reverting: %s", job.ID(), jobErr)
return errors.WithSecondaryError(
errors.Wrapf(err, "job %d: could not mark as reverting", job.ID()),
jobErr,
)
}
onFailOrCancelCtx := logtags.AddTag(ctx, "job", job.ID())
var err error
Expand Down Expand Up @@ -1301,7 +1307,10 @@ func (r *Registry) stepThroughStateMachine(
if err := job.failed(ctx, nil /* txn */, jobErr, nil /* fn */); err != nil {
// If we can't transactionally mark the job as failed then it will be
// restarted during the next adopt loop and reverting will be retried.
return errors.Wrapf(err, "job %d: could not mark as failed: %s", job.ID(), jobErr)
return errors.WithSecondaryError(
errors.Wrapf(err, "job %d: could not mark as failed", job.ID()),
jobErr,
)
}
telemetry.Inc(TelemetryMetrics[jobType].Failed)
return jobErr
Expand All @@ -1316,7 +1325,10 @@ func (r *Registry) stepThroughStateMachine(
if err := job.revertFailed(ctx, nil /* txn */, jobErr, nil /* fn */); err != nil {
// If we can't transactionally mark the job as failed then it will be
// restarted during the next adopt loop and reverting will be retried.
return errors.Wrapf(err, "job %d: could not mark as revert field: %s", job.ID(), jobErr)
return errors.WithSecondaryError(
errors.Wrapf(err, "job %d: could not mark as revert field", job.ID()),
jobErr,
)
}
return jobErr
default:
Expand Down

0 comments on commit 05d3662

Please sign in to comment.