diff --git a/pkg/sql/backfill.go b/pkg/sql/backfill.go index e063fa9d2676..501935e287c2 100644 --- a/pkg/sql/backfill.go +++ b/pkg/sql/backfill.go @@ -935,14 +935,15 @@ func getJobIDForMutationWithDescriptor( "job not found for table id %d, mutation %d", tableDesc.GetID(), mutationID) } -// nRanges returns the number of ranges that cover a set of spans. +// numRangesInSpans returns the number of ranges that cover a set of spans. // // It operates entirely on the current goroutine and is thus able to // reuse an existing kv.Txn safely. -func (sc *SchemaChanger) nRanges( - ctx context.Context, txn *kv.Txn, spans []roachpb.Span, +func numRangesInSpans( + ctx context.Context, db *kv.DB, distSQLPlanner *DistSQLPlanner, spans []roachpb.Span, ) (int, error) { - spanResolver := sc.distSQLPlanner.spanResolver.NewSpanResolverIterator(txn) + txn := db.NewTxn(ctx, "num-ranges-in-spans") + spanResolver := distSQLPlanner.spanResolver.NewSpanResolverIterator(txn) rangeIds := make(map[int64]struct{}) for _, span := range spans { // For each span, iterate the spanResolver until it's exhausted, storing @@ -1135,6 +1136,17 @@ func (sc *SchemaChanger) distIndexBackfill( ) defer recv.Release() + getTodoSpansForUpdate := func() []roachpb.Span { + mu.Lock() + defer mu.Unlock() + if mu.updatedTodoSpans == nil { + return nil + } + return append( + make([]roachpb.Span, 0, len(mu.updatedTodoSpans)), + mu.updatedTodoSpans..., + ) + } updateJobProgress = func() error { // Report schema change progress. We define progress at this point as the // the fraction of fully-backfilled ranges of the primary index of the @@ -1143,22 +1155,19 @@ func (sc *SchemaChanger) distIndexBackfill( // change state machine or from a previous backfill attempt, we scale that // fraction of ranges completed by the remaining fraction of the job's // progress bar. - err := sc.db.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error { - mu.Lock() + updatedTodoSpans := getTodoSpansForUpdate() + if updatedTodoSpans == nil { + return nil + } + nRanges, err := numRangesInSpans(ctx, sc.db, sc.distSQLPlanner, mu.updatedTodoSpans) + if err != nil { + return err + } + if origNRanges == -1 { + origNRanges = nRanges + } + return sc.db.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error { // No processor has returned completed spans yet. - if mu.updatedTodoSpans == nil { - mu.Unlock() - return nil - } - nRanges, err := sc.nRanges(ctx, txn, mu.updatedTodoSpans) - mu.Unlock() - if err != nil { - return err - } - if origNRanges == -1 { - origNRanges = nRanges - } - if nRanges < origNRanges { fractionRangesFinished := float32(origNRanges-nRanges) / float32(origNRanges) fractionCompleted := origFractionCompleted + fractionLeft*fractionRangesFinished @@ -1169,21 +1178,18 @@ func (sc *SchemaChanger) distIndexBackfill( } return nil }) - return err } updateJobDetails = func() error { - err := sc.db.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error { - mu.Lock() - defer mu.Unlock() + updatedTodoSpans := getTodoSpansForUpdate() + return sc.db.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error { // No processor has returned completed spans yet. - if mu.updatedTodoSpans == nil { + if updatedTodoSpans == nil { return nil } - log.VEventf(ctx, 2, "writing todo spans to job details: %+v", mu.updatedTodoSpans) - return rowexec.SetResumeSpansInJob(ctx, mu.updatedTodoSpans, mutationIdx, txn, sc.job) + log.VEventf(ctx, 2, "writing todo spans to job details: %+v", updatedTodoSpans) + return rowexec.SetResumeSpansInJob(ctx, updatedTodoSpans, mutationIdx, txn, sc.job) }) - return err } // Setup periodic progress update. @@ -1310,7 +1316,7 @@ func (sc *SchemaChanger) distColumnBackfill( // schema change state machine or from a previous backfill attempt, // we scale that fraction of ranges completed by the remaining fraction // of the job's progress bar. - nRanges, err := sc.nRanges(ctx, txn, todoSpans) + nRanges, err := numRangesInSpans(ctx, sc.db, sc.distSQLPlanner, todoSpans) if err != nil { return err }