Skip to content

Commit

Permalink
jobs: use low-priority transactions for claim, adopt, cancel/pause
Browse files Browse the repository at this point in the history
These transactions can be slow and long-running and they hold locks. This is
unfortunate for UX reasons.

Release note: (bug fix): Improved availability of jobs table for reads in large,
global clusters by running background tasks at low priority.
  • Loading branch information
ajwerner committed May 24, 2021
1 parent 3eeb35f commit aed014f
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 5 deletions.
11 changes: 11 additions & 0 deletions pkg/jobs/adopt.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (

"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/security"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/sessiondata"
Expand Down Expand Up @@ -47,6 +48,11 @@ const (
// available.
func (r *Registry) claimJobs(ctx context.Context, s sqlliveness.Session) error {
return r.db.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
// Run the claim transaction at low priority to ensure that it does not
// contend with foreground reads.
if err := txn.SetUserPriority(roachpb.MinUserPriority); err != nil {
return errors.WithAssertionFailure(err)
}
numRows, err := r.ex.Exec(
ctx, "claim-jobs", txn, `
UPDATE system.jobs
Expand Down Expand Up @@ -283,6 +289,11 @@ func (r *Registry) runJob(

func (r *Registry) servePauseAndCancelRequests(ctx context.Context, s sqlliveness.Session) error {
return r.db.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
// Run the claim transaction at low priority to ensure that it does not
// contend with foreground reads.
if err := txn.SetUserPriority(roachpb.MinUserPriority); err != nil {
return errors.WithAssertionFailure(err)
}
// Note that we have to buffer all rows first - before processing each
// job - because we have to make sure that the query executes without an
// error (otherwise, the system.jobs table might diverge from the jobs
Expand Down
20 changes: 15 additions & 5 deletions pkg/jobs/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/security"
"github.com/cockroachdb/cockroach/pkg/server/telemetry"
"github.com/cockroachdb/cockroach/pkg/settings"
Expand Down Expand Up @@ -659,16 +660,25 @@ func (r *Registry) Start(
}

removeClaimsFromDeadSessions := func(ctx context.Context, s sqlliveness.Session) {
if _, err := r.ex.QueryRowEx(
ctx, "expire-sessions", nil,
sessiondata.InternalExecutorOverride{User: security.RootUserName()}, `
if err := r.db.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
// Run the expiration transaction at low priority to ensure that it does
// not contend with foreground reads. Note that the adoption and cancellation
// queries also use low priority so they will interact nicely.
if err := txn.SetUserPriority(roachpb.MinUserPriority); err != nil {
return errors.WithAssertionFailure(err)
}
_, err := r.ex.ExecEx(
ctx, "expire-sessions", nil,
sessiondata.InternalExecutorOverride{User: security.RootUserName()}, `
UPDATE system.jobs
SET claim_session_id = NULL
WHERE claim_session_id <> $1
AND status IN `+claimableStatusTupleString+`
AND NOT crdb_internal.sql_liveness_is_alive(claim_session_id)`,
s.ID().UnsafeBytes(),
); err != nil {
s.ID().UnsafeBytes(),
)
return err
}); err != nil {
log.Errorf(ctx, "error expiring job sessions: %s", err)
}
}
Expand Down

0 comments on commit aed014f

Please sign in to comment.