Skip to content

Commit

Permalink
This is an automated cherry-pick of #56507
Browse files Browse the repository at this point in the history
Signed-off-by: ti-chi-bot <[email protected]>
  • Loading branch information
tangenta authored and ti-chi-bot committed Dec 3, 2024
1 parent bf0766b commit 8b7743d
Show file tree
Hide file tree
Showing 7 changed files with 1,076 additions and 9 deletions.
84 changes: 84 additions & 0 deletions pkg/ddl/backfilling.go
Original file line number Diff line number Diff line change
Expand Up @@ -407,10 +407,19 @@ func splitTableRanges(t table.PhysicalTable, store kv.Storage, startKey, endKey
if err != nil {
return nil, errors.Trace(err)
}
<<<<<<< HEAD
if len(ranges) == 0 {
errMsg := fmt.Sprintf("cannot find region in range [%s, %s]", startKey.String(), endKey.String())
return nil, errors.Trace(dbterror.ErrInvalidSplitRegionRanges.GenWithStackByArgs(errMsg))
}
=======
logutil.DDLLogger().Info("load table ranges from PD done",
zap.Int64("physicalTableID", t.GetPhysicalID()),
zap.String("range start", hex.EncodeToString(ranges[0].StartKey)),
zap.String("range end", hex.EncodeToString(ranges[len(ranges)-1].EndKey)),
zap.Int("range count", len(ranges)))
failpoint.InjectCall("afterLoadTableRanges", len(ranges))
>>>>>>> 3c4b230ae89 (ddl: fix reorg handle not resumed after changing DDL owner (#56507))
return ranges, nil
}

Expand Down Expand Up @@ -669,15 +678,30 @@ func SetBackfillTaskChanSizeForTest(n int) {
//
// The above operations are completed in a transaction.
// Finally, update the concurrent processing of the total number of rows, and store the completed handle value.
<<<<<<< HEAD
func (dc *ddlCtx) writePhysicalTableRecord(sessPool *sess.Pool, t table.PhysicalTable, bfWorkerType backfillerType, reorgInfo *reorgInfo) error {
job := reorgInfo.Job
totalAddedCount := job.GetRowCount()

=======
func (dc *ddlCtx) writePhysicalTableRecord(
ctx context.Context,
sessPool *sess.Pool,
t table.PhysicalTable,
bfWorkerType backfillerType,
reorgInfo *reorgInfo,
) (err error) {
>>>>>>> 3c4b230ae89 (ddl: fix reorg handle not resumed after changing DDL owner (#56507))
startKey, endKey := reorgInfo.StartKey, reorgInfo.EndKey

if err := dc.isReorgRunnable(reorgInfo.Job.ID, false); err != nil {
return errors.Trace(err)
}
defer func() {
if err != nil && ctx.Err() != nil {
err = context.Cause(ctx)
}
}()

failpoint.Inject("MockCaseWhenParseFailure", func(val failpoint.Value) {
//nolint:forcetypeassert
Expand All @@ -702,11 +726,71 @@ func (dc *ddlCtx) writePhysicalTableRecord(sessPool *sess.Pool, t table.Physical
return errors.Trace(err)
}

<<<<<<< HEAD
taskIDAlloc := newTaskIDAllocator()
for {
kvRanges, err := splitTableRanges(t, reorgInfo.d.store, startKey, endKey, backfillTaskChanSize)
if err != nil {
return errors.Trace(err)
=======
// process result goroutine
eg.Go(func() error {
totalAddedCount := reorgInfo.Job.GetRowCount()
keeper := newDoneTaskKeeper(startKey)
cnt := 0

for {
select {
case <-egCtx.Done():
return egCtx.Err()
case result, ok := <-scheduler.resultChan():
if !ok {
logutil.DDLLogger().Info("backfill workers successfully processed",
zap.Stringer("element", reorgInfo.currElement),
zap.Int64("total added count", totalAddedCount),
zap.String("start key", hex.EncodeToString(startKey)))
return nil
}
cnt++

if result.err != nil {
logutil.DDLLogger().Warn("backfill worker failed",
zap.Int64("job ID", reorgInfo.ID),
zap.Int64("total added count", totalAddedCount),
zap.String("start key", hex.EncodeToString(startKey)),
zap.String("result next key", hex.EncodeToString(result.nextKey)),
zap.Error(result.err))
return result.err
}

if result.totalCount > 0 {
totalAddedCount = int64(result.totalCount)
} else {
totalAddedCount += int64(result.addedCount)
}
dc.getReorgCtx(reorgInfo.Job.ID).setRowCount(totalAddedCount)

keeper.updateNextKey(result.taskID, result.nextKey)

if cnt%(scheduler.currentWorkerSize()*4) == 0 {
err2 := reorgInfo.UpdateReorgMeta(keeper.nextKey, sessPool)
if err2 != nil {
logutil.DDLLogger().Warn("update reorg meta failed",
zap.Int64("job ID", reorgInfo.ID),
zap.Error(err2))
}
// We try to adjust the worker size regularly to reduce
// the overhead of loading the DDL related global variables.
err2 = scheduler.adjustWorkerSize()
if err2 != nil {
logutil.DDLLogger().Warn("cannot adjust backfill worker size",
zap.Int64("job ID", reorgInfo.ID),
zap.Error(err2))
}
failpoint.InjectCall("afterUpdateReorgMeta")
}
}
>>>>>>> 3c4b230ae89 (ddl: fix reorg handle not resumed after changing DDL owner (#56507))
}
if len(kvRanges) == 0 {
break
Expand Down
3 changes: 2 additions & 1 deletion pkg/ddl/backfilling_scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -568,6 +568,7 @@ func newTaskIDAllocator() *taskIDAllocator {
}

func (a *taskIDAllocator) alloc() int {
ret := a.id
a.id++
return a.id
return ret
}
33 changes: 33 additions & 0 deletions pkg/ddl/column_modify_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -646,3 +646,36 @@ func TestColumnTypeChangeGenUniqueChangingName(t *testing.T) {

tk.MustExec("drop table if exists t")
}

func TestModifyColumnReorgCheckpoint(t *testing.T) {
store, dom := testkit.CreateMockStoreAndDomainWithSchemaLease(t, columnModifyLease)
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
tk2 := testkit.NewTestKit(t, store)
tk2.MustExec("use test")
tk.MustExec("set global tidb_ddl_reorg_worker_cnt = 1;")
tk.MustExec("create table t (a int primary key, b bigint);")
rowCnt := 10
for i := 0; i < rowCnt; i++ {
tk.MustExec(fmt.Sprintf("insert into t values (%d, %d)", i*10000, i*10000))
}
splitTableSQL := fmt.Sprintf("split table t between (0) and (%d*10000) regions %d;", rowCnt, rowCnt)
tk.MustQuery(splitTableSQL).Check(testkit.Rows(fmt.Sprintf("%d 1", rowCnt-1)))

retireOwner := false
testfailpoint.EnableCall(t, "github.com/pingcap/tidb/pkg/ddl/afterUpdateReorgMeta", func() {
if !retireOwner {
retireOwner = true
dom.DDL().OwnerManager().ResignOwner(context.Background())
}
})

rangeCnts := []int{}
testfailpoint.EnableCall(t, "github.com/pingcap/tidb/pkg/ddl/afterLoadTableRanges", func(rangeCnt int) {
rangeCnts = append(rangeCnts, rangeCnt)
})

tk.MustExec("alter table t modify column b int;")
require.Len(t, rangeCnts, 2) // It should have two rounds for loading table ranges.
require.Less(t, rangeCnts[1], rangeCnts[0]) // Verify if the checkpoint is progressing.
}
8 changes: 4 additions & 4 deletions pkg/ddl/ingest/checkpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ type CheckpointManager struct {
// Live in memory.
mu sync.Mutex
checkpoints map[int]*taskCheckpoint // task ID -> checkpoint
// we require each task ID to be continuous and start from 1.
// we require each task ID to be continuous and start from 0.
minTaskIDFinished int
dirty bool
// Local meta.
Expand Down Expand Up @@ -167,7 +167,7 @@ func (s *CheckpointManager) Status() (keyCnt int, minKeyImported kv.Key) {
}

// Register registers a new task. taskID MUST be continuous ascending and start
// from 1.
// from 0.
//
// TODO(lance6716): remove this constraint, use endKey as taskID and use
// ordered map type for checkpoints.
Expand Down Expand Up @@ -221,14 +221,14 @@ func (s *CheckpointManager) UpdateWrittenKeys(taskID int, delta int) error {
// afterFlush should be called after all engine is flushed.
func (s *CheckpointManager) afterFlush() {
for {
cp := s.checkpoints[s.minTaskIDFinished+1]
cp := s.checkpoints[s.minTaskIDFinished]
if cp == nil || !cp.lastBatchRead || cp.writtenKeys < cp.totalKeys {
break
}
delete(s.checkpoints, s.minTaskIDFinished)
s.minTaskIDFinished++
s.flushedKeyLowWatermark = cp.endKey
s.flushedKeyCnt += cp.totalKeys
delete(s.checkpoints, s.minTaskIDFinished)
s.dirty = true
}
}
Expand Down
49 changes: 46 additions & 3 deletions pkg/ddl/ingest/checkpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,9 @@ func TestCheckpointManager(t *testing.T) {
require.NoError(t, err)
defer mgr.Close()

mgr.Register(0, []byte{'0', '9'})
mgr.Register(1, []byte{'1', '9'})
<<<<<<< HEAD
mgr.Register(2, []byte{'2', '9'})
mgr.UpdateTotalKeys(1, 100, false)
require.False(t, mgr.IsKeyProcessed([]byte{'1', '9'}))
Expand All @@ -73,20 +75,53 @@ func TestCheckpointManager(t *testing.T) {
require.True(t, mgr.IsKeyProcessed([]byte{'1', '9'}))
require.False(t, mgr.IsKeyProcessed([]byte{'2', '9'}))
require.NoError(t, mgr.UpdateWrittenKeys(2, 50))
=======
mgr.UpdateTotalKeys(0, 100, false)
require.False(t, mgr.IsKeyProcessed([]byte{'0', '9'}))
mgr.UpdateWrittenKeys(0, 100)
mgr.AdvanceWatermark(true, false)
require.False(t, mgr.IsKeyProcessed([]byte{'0', '9'}))
mgr.UpdateTotalKeys(0, 100, true)
mgr.UpdateWrittenKeys(0, 100)
mgr.AdvanceWatermark(true, false)
// The data is not imported to the storage yet.
require.False(t, mgr.IsKeyProcessed([]byte{'0', '9'}))
mgr.UpdateWrittenKeys(1, 0)
mgr.AdvanceWatermark(true, true) // Mock the data is imported to the storage.
require.True(t, mgr.IsKeyProcessed([]byte{'0', '9'}))

// Only when the last batch is completed, the job can be completed.
mgr.UpdateTotalKeys(1, 50, false)
mgr.UpdateTotalKeys(1, 50, true)
mgr.UpdateWrittenKeys(1, 50)
mgr.AdvanceWatermark(true, true)
require.True(t, mgr.IsKeyProcessed([]byte{'0', '9'}))
require.False(t, mgr.IsKeyProcessed([]byte{'1', '9'}))
mgr.UpdateWrittenKeys(1, 50)
mgr.AdvanceWatermark(true, true)
require.True(t, mgr.IsKeyProcessed([]byte{'0', '9'}))
>>>>>>> 3c4b230ae89 (ddl: fix reorg handle not resumed after changing DDL owner (#56507))
require.True(t, mgr.IsKeyProcessed([]byte{'1', '9'}))
require.True(t, mgr.IsKeyProcessed([]byte{'2', '9'}))

// Only when the subsequent job is completed, the previous job can be completed.
mgr.Register(2, []byte{'2', '9'})
mgr.Register(3, []byte{'3', '9'})
mgr.Register(4, []byte{'4', '9'})
mgr.Register(5, []byte{'5', '9'})
mgr.UpdateTotalKeys(2, 100, true)
mgr.UpdateTotalKeys(3, 100, true)
mgr.UpdateTotalKeys(4, 100, true)
<<<<<<< HEAD
mgr.UpdateTotalKeys(5, 100, true)
require.NoError(t, mgr.UpdateWrittenKeys(5, 100))
require.NoError(t, mgr.UpdateWrittenKeys(4, 100))
=======
mgr.UpdateWrittenKeys(4, 100)
mgr.AdvanceWatermark(true, true)
mgr.UpdateWrittenKeys(3, 100)
mgr.AdvanceWatermark(true, true)
require.False(t, mgr.IsKeyProcessed([]byte{'2', '9'}))
>>>>>>> 3c4b230ae89 (ddl: fix reorg handle not resumed after changing DDL owner (#56507))
require.False(t, mgr.IsKeyProcessed([]byte{'3', '9'}))
require.False(t, mgr.IsKeyProcessed([]byte{'4', '9'}))
}

func TestCheckpointManagerUpdateReorg(t *testing.T) {
Expand All @@ -107,10 +142,18 @@ func TestCheckpointManagerUpdateReorg(t *testing.T) {
require.NoError(t, err)
defer mgr.Close()

<<<<<<< HEAD
mgr.Register(1, []byte{'1', '9'})
mgr.UpdateTotalKeys(1, 100, true)
require.NoError(t, mgr.UpdateWrittenKeys(1, 100))
mgr.Flush() // Wait the global checkpoint to be updated to the reorg table.
=======
mgr.Register(0, []byte{'1', '9'})
mgr.UpdateTotalKeys(0, 100, true)
mgr.UpdateWrittenKeys(0, 100)
mgr.AdvanceWatermark(true, true)
mgr.Close() // Wait the global checkpoint to be updated to the reorg table.
>>>>>>> 3c4b230ae89 (ddl: fix reorg handle not resumed after changing DDL owner (#56507))
r, err := tk.Exec("select reorg_meta from mysql.tidb_ddl_reorg where job_id = 1 and ele_id = 1;")
require.NoError(t, err)
req := r.NewChunk(nil)
Expand Down
Loading

0 comments on commit 8b7743d

Please sign in to comment.