Skip to content

Commit

Permalink
sql: fix deadlock when updating backfill progress
Browse files Browse the repository at this point in the history
The root cause here is that we acquired the mutex inside the transaction which
also laid down intents. This was not a problem in earlier iterations of this
code because of the FOR UPDATE logic which would, generally, in theory, order
the transactions such that the first one to acquire the mutex would be the
first to lay down an intent, thus avoiding the deadlock by ordering the
acquisitions. That was changed in #68244, which removed the FOR UPDATE.

What we see now is that you have a transaction doing the progress update which
hits a restart but has laid down an intent. Then we have a transaction which
is doing a details update that starts and acquires the mutex but blocks on the
intent of the other transaction. That other transaction now is blocked on the
mutex and we have a deadlock.

The solution here is to not acquire the mutex inside these transactions.
Instead, the code copies out the relevant state prior to issuing the
transaction. The cost here should be pretty minimal and the staleness in
the fact of retries is the least of my concerns.

No release note because the code in #68244 has never been released.

Release note: None
  • Loading branch information
ajwerner committed Aug 18, 2021
1 parent d13d9cf commit ec29064
Showing 1 changed file with 34 additions and 28 deletions.
62 changes: 34 additions & 28 deletions pkg/sql/backfill.go
Original file line number Diff line number Diff line change
Expand Up @@ -935,14 +935,15 @@ func getJobIDForMutationWithDescriptor(
"job not found for table id %d, mutation %d", tableDesc.GetID(), mutationID)
}

// nRanges returns the number of ranges that cover a set of spans.
// numRangesInSpans returns the number of ranges that cover a set of spans.
//
// It operates entirely on the current goroutine and is thus able to
// reuse an existing kv.Txn safely.
func (sc *SchemaChanger) nRanges(
ctx context.Context, txn *kv.Txn, spans []roachpb.Span,
func numRangesInSpans(
ctx context.Context, db *kv.DB, distSQLPlanner *DistSQLPlanner, spans []roachpb.Span,
) (int, error) {
spanResolver := sc.distSQLPlanner.spanResolver.NewSpanResolverIterator(txn)
txn := db.NewTxn(ctx, "num-ranges-in-spans")
spanResolver := distSQLPlanner.spanResolver.NewSpanResolverIterator(txn)
rangeIds := make(map[int64]struct{})
for _, span := range spans {
// For each span, iterate the spanResolver until it's exhausted, storing
Expand Down Expand Up @@ -1135,6 +1136,17 @@ func (sc *SchemaChanger) distIndexBackfill(
)
defer recv.Release()

getTodoSpansForUpdate := func() []roachpb.Span {
mu.Lock()
defer mu.Unlock()
if mu.updatedTodoSpans == nil {
return nil
}
return append(
make([]roachpb.Span, 0, len(mu.updatedTodoSpans)),
mu.updatedTodoSpans...,
)
}
updateJobProgress = func() error {
// Report schema change progress. We define progress at this point as the
// the fraction of fully-backfilled ranges of the primary index of the
Expand All @@ -1143,22 +1155,19 @@ func (sc *SchemaChanger) distIndexBackfill(
// change state machine or from a previous backfill attempt, we scale that
// fraction of ranges completed by the remaining fraction of the job's
// progress bar.
err := sc.db.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
mu.Lock()
updatedTodoSpans := getTodoSpansForUpdate()
if updatedTodoSpans == nil {
return nil
}
nRanges, err := numRangesInSpans(ctx, sc.db, sc.distSQLPlanner, mu.updatedTodoSpans)
if err != nil {
return err
}
if origNRanges == -1 {
origNRanges = nRanges
}
return sc.db.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
// No processor has returned completed spans yet.
if mu.updatedTodoSpans == nil {
mu.Unlock()
return nil
}
nRanges, err := sc.nRanges(ctx, txn, mu.updatedTodoSpans)
mu.Unlock()
if err != nil {
return err
}
if origNRanges == -1 {
origNRanges = nRanges
}

if nRanges < origNRanges {
fractionRangesFinished := float32(origNRanges-nRanges) / float32(origNRanges)
fractionCompleted := origFractionCompleted + fractionLeft*fractionRangesFinished
Expand All @@ -1169,21 +1178,18 @@ func (sc *SchemaChanger) distIndexBackfill(
}
return nil
})
return err
}

updateJobDetails = func() error {
err := sc.db.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
mu.Lock()
defer mu.Unlock()
updatedTodoSpans := getTodoSpansForUpdate()
return sc.db.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
// No processor has returned completed spans yet.
if mu.updatedTodoSpans == nil {
if updatedTodoSpans == nil {
return nil
}
log.VEventf(ctx, 2, "writing todo spans to job details: %+v", mu.updatedTodoSpans)
return rowexec.SetResumeSpansInJob(ctx, mu.updatedTodoSpans, mutationIdx, txn, sc.job)
log.VEventf(ctx, 2, "writing todo spans to job details: %+v", updatedTodoSpans)
return rowexec.SetResumeSpansInJob(ctx, updatedTodoSpans, mutationIdx, txn, sc.job)
})
return err
}

// Setup periodic progress update.
Expand Down Expand Up @@ -1310,7 +1316,7 @@ func (sc *SchemaChanger) distColumnBackfill(
// schema change state machine or from a previous backfill attempt,
// we scale that fraction of ranges completed by the remaining fraction
// of the job's progress bar.
nRanges, err := sc.nRanges(ctx, txn, todoSpans)
nRanges, err := numRangesInSpans(ctx, sc.db, sc.distSQLPlanner, todoSpans)
if err != nil {
return err
}
Expand Down

0 comments on commit ec29064

Please sign in to comment.