Skip to content

Commit

Permalink
This is an automated cherry-pick of #56507
Browse files Browse the repository at this point in the history
Signed-off-by: ti-chi-bot <[email protected]>
  • Loading branch information
tangenta authored and ti-chi-bot committed Oct 31, 2024
1 parent c5dc4ba commit b815156
Show file tree
Hide file tree
Showing 7 changed files with 3,119 additions and 0 deletions.
84 changes: 84 additions & 0 deletions ddl/backfilling.go
Original file line number Diff line number Diff line change
Expand Up @@ -418,10 +418,19 @@ func splitTableRanges(t table.PhysicalTable, store kv.Storage, startKey, endKey
if err != nil {
return nil, errors.Trace(err)
}
<<<<<<< HEAD:ddl/backfilling.go
if len(ranges) == 0 {
errMsg := fmt.Sprintf("cannot find region in range [%s, %s]", startKey.String(), endKey.String())
return nil, errors.Trace(dbterror.ErrInvalidSplitRegionRanges.GenWithStackByArgs(errMsg))
}
=======
logutil.DDLLogger().Info("load table ranges from PD done",
zap.Int64("physicalTableID", t.GetPhysicalID()),
zap.String("range start", hex.EncodeToString(ranges[0].StartKey)),
zap.String("range end", hex.EncodeToString(ranges[len(ranges)-1].EndKey)),
zap.Int("range count", len(ranges)))
failpoint.InjectCall("afterLoadTableRanges", len(ranges))
>>>>>>> 3c4b230ae89 (ddl: fix reorg handle not resumed after changing DDL owner (#56507)):pkg/ddl/backfilling.go
return ranges, nil
}

Expand Down Expand Up @@ -681,15 +690,30 @@ func SetBackfillTaskChanSizeForTest(n int) {
//
// The above operations are completed in a transaction.
// Finally, update the concurrent processing of the total number of rows, and store the completed handle value.
<<<<<<< HEAD:ddl/backfilling.go
func (dc *ddlCtx) writePhysicalTableRecord(sessPool *sess.Pool, t table.PhysicalTable, bfWorkerType backfillerType, reorgInfo *reorgInfo) error {
job := reorgInfo.Job
totalAddedCount := job.GetRowCount()

=======
func (dc *ddlCtx) writePhysicalTableRecord(
ctx context.Context,
sessPool *sess.Pool,
t table.PhysicalTable,
bfWorkerType backfillerType,
reorgInfo *reorgInfo,
) (err error) {
>>>>>>> 3c4b230ae89 (ddl: fix reorg handle not resumed after changing DDL owner (#56507)):pkg/ddl/backfilling.go
startKey, endKey := reorgInfo.StartKey, reorgInfo.EndKey

if err := dc.isReorgRunnable(reorgInfo.Job.ID, false); err != nil {
return errors.Trace(err)
}
defer func() {
if err != nil && ctx.Err() != nil {
err = context.Cause(ctx)
}
}()

failpoint.Inject("MockCaseWhenParseFailure", func(val failpoint.Value) {
//nolint:forcetypeassert
Expand All @@ -714,11 +738,71 @@ func (dc *ddlCtx) writePhysicalTableRecord(sessPool *sess.Pool, t table.Physical
return errors.Trace(err)
}

<<<<<<< HEAD:ddl/backfilling.go
taskIDAlloc := newTaskIDAllocator()
for {
kvRanges, err := splitTableRanges(t, reorgInfo.d.store, startKey, endKey, backfillTaskChanSize)
if err != nil {
return errors.Trace(err)
=======
// process result goroutine
eg.Go(func() error {
totalAddedCount := reorgInfo.Job.GetRowCount()
keeper := newDoneTaskKeeper(startKey)
cnt := 0

for {
select {
case <-egCtx.Done():
return egCtx.Err()
case result, ok := <-scheduler.resultChan():
if !ok {
logutil.DDLLogger().Info("backfill workers successfully processed",
zap.Stringer("element", reorgInfo.currElement),
zap.Int64("total added count", totalAddedCount),
zap.String("start key", hex.EncodeToString(startKey)))
return nil
}
cnt++

if result.err != nil {
logutil.DDLLogger().Warn("backfill worker failed",
zap.Int64("job ID", reorgInfo.ID),
zap.Int64("total added count", totalAddedCount),
zap.String("start key", hex.EncodeToString(startKey)),
zap.String("result next key", hex.EncodeToString(result.nextKey)),
zap.Error(result.err))
return result.err
}

if result.totalCount > 0 {
totalAddedCount = int64(result.totalCount)
} else {
totalAddedCount += int64(result.addedCount)
}
dc.getReorgCtx(reorgInfo.Job.ID).setRowCount(totalAddedCount)

keeper.updateNextKey(result.taskID, result.nextKey)

if cnt%(scheduler.currentWorkerSize()*4) == 0 {
err2 := reorgInfo.UpdateReorgMeta(keeper.nextKey, sessPool)
if err2 != nil {
logutil.DDLLogger().Warn("update reorg meta failed",
zap.Int64("job ID", reorgInfo.ID),
zap.Error(err2))
}
// We try to adjust the worker size regularly to reduce
// the overhead of loading the DDL related global variables.
err2 = scheduler.adjustWorkerSize()
if err2 != nil {
logutil.DDLLogger().Warn("cannot adjust backfill worker size",
zap.Int64("job ID", reorgInfo.ID),
zap.Error(err2))
}
failpoint.InjectCall("afterUpdateReorgMeta")
}
}
>>>>>>> 3c4b230ae89 (ddl: fix reorg handle not resumed after changing DDL owner (#56507)):pkg/ddl/backfilling.go
}
if len(kvRanges) == 0 {
break
Expand Down
Loading

0 comments on commit b815156

Please sign in to comment.