Skip to content

Commit

Permalink
jobs: handle retryable errors waiting for jobs
Browse files Browse the repository at this point in the history
Previously, when waiting for jobs if the internal executor hit any
retryable errors, we would incorrectly bubble these errors up. This
would intermittently be seen in leasing tests, which intentionally
disable renewals, increasing the likelihood of transaction retry errors.
To address this, this patch will modify waitForJobs to handle retryable
errors gracefully.

Fixes: #112392,#110642

Release note: None
  • Loading branch information
fqazi committed Oct 18, 2023
1 parent 45bfb4b commit 605872b
Show file tree
Hide file tree
Showing 4 changed files with 92 additions and 1 deletion.
1 change: 1 addition & 0 deletions pkg/jobs/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,7 @@ go_test(
"//pkg/sql/catalog/colinfo",
"//pkg/sql/catalog/descpb",
"//pkg/sql/catalog/desctestutils",
"//pkg/sql/catalog/lease",
"//pkg/sql/catalog/tabledesc",
"//pkg/sql/isql",
"//pkg/sql/sem/tree",
Expand Down
82 changes: 82 additions & 0 deletions pkg/jobs/registry_external_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,14 @@ import (
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/sql"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/lease"
"github.com/cockroachdb/cockroach/pkg/sql/isql"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/sessiondata"
"github.com/cockroachdb/cockroach/pkg/sql/sqlliveness"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
"github.com/cockroachdb/cockroach/pkg/testutils/skip"
"github.com/cockroachdb/cockroach/pkg/testutils/sqlutils"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
Expand Down Expand Up @@ -718,3 +720,83 @@ SELECT unnest(execution_errors)
require.Regexp(t, err3, registry.WaitForJobs(ctx, []jobspb.JobID{id}))
})
}

// TestWaitWithRetryableError tests retryable errors when querying
// for jobs.
func TestWaitWithRetryableError(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
defer jobs.ResetConstructors()()
ctx := context.Background()

cs := cluster.MakeTestingClusterSettings()
// Set the lease duration to zero for instanty expiry.
lease.LeaseDuration.Override(ctx, &cs.SV, 0)
// Renewal timeout to 0 saying that the lease will get renewed only
// after the lease expires when a request requests the descriptor.
lease.LeaseRenewalDuration.Override(ctx, &cs.SV, 0)

var targetJobID atomic.Int64
var numberOfTimesDetected atomic.Int64
const targetNumberOfRetries = 5
args := base.TestServerArgs{
Settings: cs,
// Leasing settings used above conflict with some updates
// when starting the server, so skip those.
PartOfCluster: true,
Knobs: base.TestingKnobs{
SQLExecutor: &sql.ExecutorTestingKnobs{
DisableAutoCommitDuringExec: true,
AfterExecute: func(ctx context.Context, stmt string, err error) {
if targetJobID.Load() > 0 &&
strings.Contains(stmt, "SELECT count(*) FROM system.jobs") &&
strings.Contains(stmt, fmt.Sprintf("%d", targetJobID.Load())) {
// Leases expire almost instantly, without a renewal we will need
// a retry.
time.Sleep(time.Second)
// Detect this multiple times to ensure retries, once observed
// enough times disable the after execution.
if numberOfTimesDetected.Add(1) > targetNumberOfRetries-1 {
targetJobID.Store(0)
}
}
},
},
},
}

jobs.RegisterConstructor(jobspb.TypeImport, func(_ *jobs.Job, cs *cluster.Settings) jobs.Resumer {
return jobstest.FakeResumer{}
}, jobs.UsesTenantCostControl)
s := serverutils.StartServerOnly(t, args)
defer s.Stopper().Stop(ctx)
ts := s.ApplicationLayer()
registry := ts.JobRegistry().(*jobs.Registry)

// Create and run a dummy job.
idb := ts.InternalDB().(isql.DB)
id := registry.MakeJobID()
require.NoError(t, idb.Txn(ctx, func(ctx context.Context, txn isql.Txn) error {
_, err := registry.CreateJobWithTxn(ctx, jobs.Record{
// Job does not accept an empty Details field, so arbitrarily provide
// ImportDetails.
Details: jobspb.ImportDetails{},
Progress: jobspb.ImportProgress{},
Username: username.TestUserName(),
}, id, txn)
return err
}))
targetJobID.Store(int64(id))
require.NoError(t,
registry.WaitForJobs(
ctx, []jobspb.JobID{id},
))
if !skip.Stress() {
require.Equalf(t, int64(targetNumberOfRetries), numberOfTimesDetected.Load(), "jobs query did not retry")
} else {
// For stress be lenient since we are relying on timing for leasing
// expiration, which can be imprecise. So, lets aim for at least one
// retry.
require.GreaterOrEqualf(t, numberOfTimesDetected.Load(), int64(2), "jobs query did not retry")
}
}
8 changes: 8 additions & 0 deletions pkg/jobs/wait.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"time"

"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/sessiondata"
"github.com/cockroachdb/cockroach/pkg/sql/sqlliveness"
Expand Down Expand Up @@ -118,6 +119,13 @@ func (r *Registry) waitForJobsToBeTerminalOrPaused(
query,
)
if err != nil {
// While polling, we could encounter retryable errors, which may
// not get internally retried in the connection executor. So retry,
// here. When querying for jobs we will restart with a new transaction,
// which will hopefully not hit this retryable error.
if errors.HasInterface(err, (*pgerror.ClientVisibleRetryError)(nil)) {
continue
}
return errors.Wrap(err, "polling for queued jobs to complete")
}
if row == nil {
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/internal.go
Original file line number Diff line number Diff line change
Expand Up @@ -1775,7 +1775,7 @@ func (ief *InternalDB) txn(
}

return commitTxnFn(ctx)
}); descs.IsTwoVersionInvariantViolationError(err) {
}); errIsRetriable(err) {
continue
} else {
if err == nil {
Expand Down

0 comments on commit 605872b

Please sign in to comment.