Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

sql: fix deadlock when updating backfill progress #69040

Merged
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 34 additions & 28 deletions pkg/sql/backfill.go
Original file line number Diff line number Diff line change
Expand Up @@ -935,14 +935,15 @@ func getJobIDForMutationWithDescriptor(
"job not found for table id %d, mutation %d", tableDesc.GetID(), mutationID)
}

// nRanges returns the number of ranges that cover a set of spans.
// numRangesInSpans returns the number of ranges that cover a set of spans.
//
// It operates entirely on the current goroutine and is thus able to
// reuse an existing kv.Txn safely.
func (sc *SchemaChanger) nRanges(
ctx context.Context, txn *kv.Txn, spans []roachpb.Span,
func numRangesInSpans(
ctx context.Context, db *kv.DB, distSQLPlanner *DistSQLPlanner, spans []roachpb.Span,
) (int, error) {
spanResolver := sc.distSQLPlanner.spanResolver.NewSpanResolverIterator(txn)
txn := db.NewTxn(ctx, "num-ranges-in-spans")
spanResolver := distSQLPlanner.spanResolver.NewSpanResolverIterator(txn)
rangeIds := make(map[int64]struct{})
for _, span := range spans {
// For each span, iterate the spanResolver until it's exhausted, storing
Expand Down Expand Up @@ -1135,6 +1136,17 @@ func (sc *SchemaChanger) distIndexBackfill(
)
defer recv.Release()

getTodoSpansForUpdate := func() []roachpb.Span {
mu.Lock()
defer mu.Unlock()
if mu.updatedTodoSpans == nil {
return nil
}
return append(
make([]roachpb.Span, 0, len(mu.updatedTodoSpans)),
mu.updatedTodoSpans...,
)
}
updateJobProgress = func() error {
// Report schema change progress. We define progress at this point as the
// the fraction of fully-backfilled ranges of the primary index of the
Expand All @@ -1143,22 +1155,19 @@ func (sc *SchemaChanger) distIndexBackfill(
// change state machine or from a previous backfill attempt, we scale that
// fraction of ranges completed by the remaining fraction of the job's
// progress bar.
err := sc.db.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
mu.Lock()
updatedTodoSpans := getTodoSpansForUpdate()
if updatedTodoSpans == nil {
return nil
}
nRanges, err := numRangesInSpans(ctx, sc.db, sc.distSQLPlanner, mu.updatedTodoSpans)
if err != nil {
return err
}
if origNRanges == -1 {
origNRanges = nRanges
}
return sc.db.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
// No processor has returned completed spans yet.
if mu.updatedTodoSpans == nil {
mu.Unlock()
return nil
}
nRanges, err := sc.nRanges(ctx, txn, mu.updatedTodoSpans)
mu.Unlock()
if err != nil {
return err
}
if origNRanges == -1 {
origNRanges = nRanges
}

if nRanges < origNRanges {
fractionRangesFinished := float32(origNRanges-nRanges) / float32(origNRanges)
fractionCompleted := origFractionCompleted + fractionLeft*fractionRangesFinished
Expand All @@ -1169,21 +1178,18 @@ func (sc *SchemaChanger) distIndexBackfill(
}
return nil
})
return err
}

updateJobDetails = func() error {
err := sc.db.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
mu.Lock()
defer mu.Unlock()
updatedTodoSpans := getTodoSpansForUpdate()
return sc.db.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
// No processor has returned completed spans yet.
if mu.updatedTodoSpans == nil {
if updatedTodoSpans == nil {
return nil
}
log.VEventf(ctx, 2, "writing todo spans to job details: %+v", mu.updatedTodoSpans)
return rowexec.SetResumeSpansInJob(ctx, mu.updatedTodoSpans, mutationIdx, txn, sc.job)
log.VEventf(ctx, 2, "writing todo spans to job details: %+v", updatedTodoSpans)
return rowexec.SetResumeSpansInJob(ctx, updatedTodoSpans, mutationIdx, txn, sc.job)
})
return err
}

// Setup periodic progress update.
Expand Down Expand Up @@ -1310,7 +1316,7 @@ func (sc *SchemaChanger) distColumnBackfill(
// schema change state machine or from a previous backfill attempt,
// we scale that fraction of ranges completed by the remaining fraction
// of the job's progress bar.
nRanges, err := sc.nRanges(ctx, txn, todoSpans)
nRanges, err := numRangesInSpans(ctx, sc.db, sc.distSQLPlanner, todoSpans)
if err != nil {
return err
}
Expand Down