Skip to content

Commit

Permalink
ddl: skip getting actual end key for each range in ingest mode (pingc…
Browse files Browse the repository at this point in the history
  • Loading branch information
tangenta committed Dec 6, 2024
1 parent 94eb874 commit bd8d83e
Show file tree
Hide file tree
Showing 2 changed files with 88 additions and 34 deletions.
120 changes: 87 additions & 33 deletions pkg/ddl/backfilling.go
Original file line number Diff line number Diff line change
Expand Up @@ -394,6 +394,38 @@ func (w *backfillWorker) run(d *ddlCtx, bf backfiller, job *model.Job) {
}
}

func splitAndValidateTableRanges(
ctx context.Context,
t table.PhysicalTable,
store kv.Storage,
startKey, endKey kv.Key,
limit int,
) ([]kv.KeyRange, error) {
ranges, err := splitTableRanges(ctx, t, store, startKey, endKey, limit)
if err != nil {
return nil, err
}
return validateTableRanges(ranges, startKey, endKey)
}

func validateTableRanges(ranges []kv.KeyRange, start, end kv.Key) ([]kv.KeyRange, error) {
for i, r := range ranges {
if len(r.StartKey) == 0 {
if i != 0 {
return nil, errors.Errorf("get empty start key in the middle of ranges")
}
r.StartKey = start
}
if len(r.EndKey) == 0 {
if i != len(ranges)-1 {
return nil, errors.Errorf("get empty end key in the middle of ranges")
}
r.EndKey = end
}
}
return ranges, nil
}

// splitTableRanges uses PD region's key ranges to split the backfilling table key range space,
// to speed up backfilling data in table with disperse handle.
// The `t` should be a non-partitioned table or a partition.
Expand Down Expand Up @@ -434,40 +466,21 @@ func splitTableRanges(
return ranges, nil
}

func getBatchTasks(t table.Table, reorgInfo *reorgInfo, kvRanges []kv.KeyRange,
taskIDAlloc *taskIDAllocator) []*reorgBackfillTask {
func getBatchTasks(
t table.Table,
reorgInfo *reorgInfo,
kvRanges []kv.KeyRange,
taskIDAlloc *taskIDAllocator,
bfWorkerTp backfillerType,
) []*reorgBackfillTask {
batchTasks := make([]*reorgBackfillTask, 0, len(kvRanges))
var prefix kv.Key
if reorgInfo.mergingTmpIdx {
prefix = t.IndexPrefix()
} else {
prefix = t.RecordPrefix()
}
// Build reorg tasks.
job := reorgInfo.Job
//nolint:forcetypeassert
phyTbl := t.(table.PhysicalTable)
jobCtx := reorgInfo.NewJobContext()
for _, keyRange := range kvRanges {
for _, r := range kvRanges {
taskID := taskIDAlloc.alloc()
startKey := keyRange.StartKey
if len(startKey) == 0 {
startKey = prefix
}
endKey := keyRange.EndKey
if len(endKey) == 0 {
endKey = prefix.PrefixNext()
}
endK, err := GetRangeEndKey(jobCtx, reorgInfo.d.store, job.Priority, prefix, startKey, endKey)
if err != nil {
logutil.BgLogger().Info("get backfill range task, get reverse key failed", zap.String("category", "ddl"), zap.Error(err))
} else {
logutil.BgLogger().Info("get backfill range task, change end key", zap.String("category", "ddl"),
zap.Int("id", taskID), zap.Int64("pTbl", phyTbl.GetPhysicalID()),
zap.String("end key", hex.EncodeToString(endKey)), zap.String("current end key", hex.EncodeToString(endK)))
endKey = endK
}

startKey := r.StartKey
endKey := r.EndKey
endKey = getActualEndKey(t, reorgInfo, bfWorkerTp, startKey, endKey, taskID)
task := &reorgBackfillTask{
id: taskID,
jobID: reorgInfo.Job.ID,
Expand All @@ -481,15 +494,56 @@ func getBatchTasks(t table.Table, reorgInfo *reorgInfo, kvRanges []kv.KeyRange,
return batchTasks
}

func getActualEndKey(
t table.Table,
reorgInfo *reorgInfo,
bfTp backfillerType,
rangeStart, rangeEnd kv.Key,
taskID int,
) kv.Key {
job := reorgInfo.Job
//nolint:forcetypeassert
phyTbl := t.(table.PhysicalTable)

if bfTp == typeAddIndexMergeTmpWorker {
// Temp Index data does not grow infinitely, we can return the whole range
// and IndexMergeTmpWorker should still be finished in a bounded time.
return rangeEnd
}
if bfTp == typeAddIndexWorker && job.ReorgMeta.ReorgTp == model.ReorgTypeLitMerge {
// Ingest worker uses coprocessor to read table data. It is fast enough,
// we don't need to get the actual end key of this range.
return rangeEnd
}

// Otherwise to avoid the future data written to key range of [backfillChunkEndKey, rangeEnd) and
// backfill worker can't catch up, we shrink the end key to the actual written key for now.
jobCtx := reorgInfo.NewJobContext()

actualEndKey, err := GetRangeEndKey(jobCtx, reorgInfo.d.store, job.Priority, t.RecordPrefix(), rangeStart, rangeEnd)
if err != nil {
logutil.BgLogger().Info("get backfill range task, get reverse key failed", zap.String("category", "ddl"), zap.Error(err))
return rangeEnd
}
logutil.BgLogger().Info("get backfill range task, change end key",
zap.String("category", "ddl"),
zap.Int("id", taskID),
zap.Int64("pTbl", phyTbl.GetPhysicalID()),
zap.String("end key", hex.EncodeToString(rangeEnd)),
zap.String("current end key", hex.EncodeToString(actualEndKey)))
return actualEndKey
}

// sendTasks sends tasks to workers, and returns remaining kvRanges that is not handled.
func sendTasks(
scheduler backfillScheduler,
t table.PhysicalTable,
kvRanges []kv.KeyRange,
reorgInfo *reorgInfo,
taskIDAlloc *taskIDAllocator,
bfWorkerTp backfillerType,
) error {
batchTasks := getBatchTasks(t, reorgInfo, kvRanges, taskIDAlloc)
batchTasks := getBatchTasks(t, reorgInfo, kvRanges, taskIDAlloc, bfWorkerTp)
for _, task := range batchTasks {
if err := scheduler.sendTask(task); err != nil {
return errors.Trace(err)
Expand Down Expand Up @@ -672,7 +726,7 @@ func (dc *ddlCtx) writePhysicalTableRecord(
start, end := startKey, endKey
taskIDAlloc := newTaskIDAllocator()
for {
kvRanges, err2 := splitTableRanges(egCtx, t, reorgInfo.d.store, start, end, backfillTaskChanSize)
kvRanges, err2 := splitAndValidateTableRanges(egCtx, t, reorgInfo.d.store, start, end, backfillTaskChanSize)
if err2 != nil {
return errors.Trace(err2)
}
Expand All @@ -686,7 +740,7 @@ func (dc *ddlCtx) writePhysicalTableRecord(
zap.String("startKey", hex.EncodeToString(start)),
zap.String("endKey", hex.EncodeToString(end)))

err2 = sendTasks(scheduler, t, kvRanges, reorgInfo, taskIDAlloc)
err2 = sendTasks(scheduler, t, kvRanges, reorgInfo, taskIDAlloc, bfWorkerType)
if err2 != nil {
return errors.Trace(err2)
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/ddl/backfilling_operators.go
Original file line number Diff line number Diff line change
Expand Up @@ -295,7 +295,7 @@ func (src *TableScanTaskSource) generateTasks() error {
startKey := src.startKey
endKey := src.endKey
for {
kvRanges, err := splitTableRanges(
kvRanges, err := splitAndValidateTableRanges(
src.ctx,
src.tbl,
src.store,
Expand Down

0 comments on commit bd8d83e

Please sign in to comment.