Skip to content

Commit

Permalink
Merge pull request #113552 from cockroachdb/blathers/backport-release…
Browse files Browse the repository at this point in the history
…-23.2-112613

release-23.2: jobs: handle retryable errors waiting for jobs
  • Loading branch information
rafiss authored Nov 1, 2023
2 parents cd53068 + 605872b commit 61f221a
Show file tree
Hide file tree
Showing 4 changed files with 92 additions and 1 deletion.
1 change: 1 addition & 0 deletions pkg/jobs/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,7 @@ go_test(
"//pkg/sql/catalog/colinfo",
"//pkg/sql/catalog/descpb",
"//pkg/sql/catalog/desctestutils",
"//pkg/sql/catalog/lease",
"//pkg/sql/catalog/tabledesc",
"//pkg/sql/isql",
"//pkg/sql/sem/tree",
Expand Down
82 changes: 82 additions & 0 deletions pkg/jobs/registry_external_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,14 @@ import (
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/sql"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/lease"
"github.com/cockroachdb/cockroach/pkg/sql/isql"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/sessiondata"
"github.com/cockroachdb/cockroach/pkg/sql/sqlliveness"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
"github.com/cockroachdb/cockroach/pkg/testutils/skip"
"github.com/cockroachdb/cockroach/pkg/testutils/sqlutils"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
Expand Down Expand Up @@ -718,3 +720,83 @@ SELECT unnest(execution_errors)
require.Regexp(t, err3, registry.WaitForJobs(ctx, []jobspb.JobID{id}))
})
}

// TestWaitWithRetryableError tests retryable errors when querying
// for jobs.
func TestWaitWithRetryableError(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
defer jobs.ResetConstructors()()
ctx := context.Background()

cs := cluster.MakeTestingClusterSettings()
// Set the lease duration to zero for instanty expiry.
lease.LeaseDuration.Override(ctx, &cs.SV, 0)
// Renewal timeout to 0 saying that the lease will get renewed only
// after the lease expires when a request requests the descriptor.
lease.LeaseRenewalDuration.Override(ctx, &cs.SV, 0)

var targetJobID atomic.Int64
var numberOfTimesDetected atomic.Int64
const targetNumberOfRetries = 5
args := base.TestServerArgs{
Settings: cs,
// Leasing settings used above conflict with some updates
// when starting the server, so skip those.
PartOfCluster: true,
Knobs: base.TestingKnobs{
SQLExecutor: &sql.ExecutorTestingKnobs{
DisableAutoCommitDuringExec: true,
AfterExecute: func(ctx context.Context, stmt string, err error) {
if targetJobID.Load() > 0 &&
strings.Contains(stmt, "SELECT count(*) FROM system.jobs") &&
strings.Contains(stmt, fmt.Sprintf("%d", targetJobID.Load())) {
// Leases expire almost instantly, without a renewal we will need
// a retry.
time.Sleep(time.Second)
// Detect this multiple times to ensure retries, once observed
// enough times disable the after execution.
if numberOfTimesDetected.Add(1) > targetNumberOfRetries-1 {
targetJobID.Store(0)
}
}
},
},
},
}

jobs.RegisterConstructor(jobspb.TypeImport, func(_ *jobs.Job, cs *cluster.Settings) jobs.Resumer {
return jobstest.FakeResumer{}
}, jobs.UsesTenantCostControl)
s := serverutils.StartServerOnly(t, args)
defer s.Stopper().Stop(ctx)
ts := s.ApplicationLayer()
registry := ts.JobRegistry().(*jobs.Registry)

// Create and run a dummy job.
idb := ts.InternalDB().(isql.DB)
id := registry.MakeJobID()
require.NoError(t, idb.Txn(ctx, func(ctx context.Context, txn isql.Txn) error {
_, err := registry.CreateJobWithTxn(ctx, jobs.Record{
// Job does not accept an empty Details field, so arbitrarily provide
// ImportDetails.
Details: jobspb.ImportDetails{},
Progress: jobspb.ImportProgress{},
Username: username.TestUserName(),
}, id, txn)
return err
}))
targetJobID.Store(int64(id))
require.NoError(t,
registry.WaitForJobs(
ctx, []jobspb.JobID{id},
))
if !skip.Stress() {
require.Equalf(t, int64(targetNumberOfRetries), numberOfTimesDetected.Load(), "jobs query did not retry")
} else {
// For stress be lenient since we are relying on timing for leasing
// expiration, which can be imprecise. So, lets aim for at least one
// retry.
require.GreaterOrEqualf(t, numberOfTimesDetected.Load(), int64(2), "jobs query did not retry")
}
}
8 changes: 8 additions & 0 deletions pkg/jobs/wait.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"time"

"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/sessiondata"
"github.com/cockroachdb/cockroach/pkg/sql/sqlliveness"
Expand Down Expand Up @@ -118,6 +119,13 @@ func (r *Registry) waitForJobsToBeTerminalOrPaused(
query,
)
if err != nil {
// While polling, we could encounter retryable errors, which may
// not get internally retried in the connection executor. So retry,
// here. When querying for jobs we will restart with a new transaction,
// which will hopefully not hit this retryable error.
if errors.HasInterface(err, (*pgerror.ClientVisibleRetryError)(nil)) {
continue
}
return errors.Wrap(err, "polling for queued jobs to complete")
}
if row == nil {
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/internal.go
Original file line number Diff line number Diff line change
Expand Up @@ -1775,7 +1775,7 @@ func (ief *InternalDB) txn(
}

return commitTxnFn(ctx)
}); descs.IsTwoVersionInvariantViolationError(err) {
}); errIsRetriable(err) {
continue
} else {
if err == nil {
Expand Down

0 comments on commit 61f221a

Please sign in to comment.