diff --git a/pkg/ddl/backfilling.go b/pkg/ddl/backfilling.go index 7bad21458d0d1..bcb80bea944de 100644 --- a/pkg/ddl/backfilling.go +++ b/pkg/ddl/backfilling.go @@ -510,6 +510,7 @@ func loadTableRanges( zap.String("range start", hex.EncodeToString(ranges[0].StartKey)), zap.String("range end", hex.EncodeToString(ranges[len(ranges)-1].EndKey)), zap.Int("range count", len(ranges))) + failpoint.InjectCall("afterLoadTableRanges", len(ranges)) return ranges, nil } @@ -845,12 +846,17 @@ func (dc *ddlCtx) writePhysicalTableRecord( t table.PhysicalTable, bfWorkerType backfillerType, reorgInfo *reorgInfo, -) error { +) (err error) { startKey, endKey := reorgInfo.StartKey, reorgInfo.EndKey if err := dc.isReorgRunnable(reorgInfo.Job.ID, false); err != nil { return errors.Trace(err) } + defer func() { + if err != nil && ctx.Err() != nil { + err = context.Cause(ctx) + } + }() failpoint.Inject("MockCaseWhenParseFailure", func(val failpoint.Value) { //nolint:forcetypeassert @@ -931,6 +937,7 @@ func (dc *ddlCtx) writePhysicalTableRecord( zap.Int64("job ID", reorgInfo.ID), zap.Error(err2)) } + failpoint.InjectCall("afterUpdateReorgMeta") } } } diff --git a/pkg/ddl/backfilling_scheduler.go b/pkg/ddl/backfilling_scheduler.go index 45f5ba1f99151..ddf78334e2200 100644 --- a/pkg/ddl/backfilling_scheduler.go +++ b/pkg/ddl/backfilling_scheduler.go @@ -375,6 +375,7 @@ func newTaskIDAllocator() *taskIDAllocator { } func (a *taskIDAllocator) alloc() int { + ret := a.id a.id++ - return a.id + return ret } diff --git a/pkg/ddl/column_modify_test.go b/pkg/ddl/column_modify_test.go index db172fc47d15f..dc1924d0744a3 100644 --- a/pkg/ddl/column_modify_test.go +++ b/pkg/ddl/column_modify_test.go @@ -635,3 +635,36 @@ func TestColumnTypeChangeGenUniqueChangingName(t *testing.T) { tk.MustExec("drop table if exists t") } + +func TestModifyColumnReorgCheckpoint(t *testing.T) { + store, dom := testkit.CreateMockStoreAndDomainWithSchemaLease(t, columnModifyLease) + tk := testkit.NewTestKit(t, store) + tk.MustExec("use test") + tk2 := testkit.NewTestKit(t, store) + tk2.MustExec("use test") + tk.MustExec("set global tidb_ddl_reorg_worker_cnt = 1;") + tk.MustExec("create table t (a int primary key, b bigint);") + rowCnt := 10 + for i := 0; i < rowCnt; i++ { + tk.MustExec(fmt.Sprintf("insert into t values (%d, %d)", i*10000, i*10000)) + } + splitTableSQL := fmt.Sprintf("split table t between (0) and (%d*10000) regions %d;", rowCnt, rowCnt) + tk.MustQuery(splitTableSQL).Check(testkit.Rows(fmt.Sprintf("%d 1", rowCnt-1))) + + retireOwner := false + testfailpoint.EnableCall(t, "github.com/pingcap/tidb/pkg/ddl/afterUpdateReorgMeta", func() { + if !retireOwner { + retireOwner = true + dom.DDL().OwnerManager().ResignOwner(context.Background()) + } + }) + + rangeCnts := []int{} + testfailpoint.EnableCall(t, "github.com/pingcap/tidb/pkg/ddl/afterLoadTableRanges", func(rangeCnt int) { + rangeCnts = append(rangeCnts, rangeCnt) + }) + + tk.MustExec("alter table t modify column b int;") + require.Len(t, rangeCnts, 2) // It should have two rounds for loading table ranges. + require.Less(t, rangeCnts[1], rangeCnts[0]) // Verify if the checkpoint is progressing. +} diff --git a/pkg/ddl/ingest/checkpoint.go b/pkg/ddl/ingest/checkpoint.go index 9faf8bb1d8e7b..0d89a859ca3a7 100644 --- a/pkg/ddl/ingest/checkpoint.go +++ b/pkg/ddl/ingest/checkpoint.go @@ -59,7 +59,7 @@ type CheckpointManager struct { // Live in memory. mu sync.Mutex checkpoints map[int]*taskCheckpoint // task ID -> checkpoint - // we require each task ID to be continuous and start from 1. + // we require each task ID to be continuous and start from 0. minTaskIDFinished int dirty bool @@ -184,7 +184,7 @@ func (s *CheckpointManager) Status() (keyCnt int, minKeyImported kv.Key) { } // Register registers a new task. taskID MUST be continuous ascending and start -// from 1. +// from 0. // // TODO(lance6716): remove this constraint, use endKey as taskID and use // ordered map type for checkpoints. @@ -242,14 +242,14 @@ func (s *CheckpointManager) AdvanceWatermark(flushed, imported bool) { // afterFlush should be called after all engine is flushed. func (s *CheckpointManager) afterFlush() { for { - cp := s.checkpoints[s.minTaskIDFinished+1] + cp := s.checkpoints[s.minTaskIDFinished] if cp == nil || !cp.lastBatchRead || cp.writtenKeys < cp.totalKeys { break } + delete(s.checkpoints, s.minTaskIDFinished) s.minTaskIDFinished++ s.flushedKeyLowWatermark = cp.endKey s.flushedKeyCnt += cp.totalKeys - delete(s.checkpoints, s.minTaskIDFinished) s.dirty = true } } diff --git a/pkg/ddl/ingest/checkpoint_test.go b/pkg/ddl/ingest/checkpoint_test.go index cec86f301feff..7a39d3e319246 100644 --- a/pkg/ddl/ingest/checkpoint_test.go +++ b/pkg/ddl/ingest/checkpoint_test.go @@ -67,47 +67,47 @@ func TestCheckpointManager(t *testing.T) { require.NoError(t, err) defer mgr.Close() + mgr.Register(0, []byte{'0', '9'}) mgr.Register(1, []byte{'1', '9'}) - mgr.Register(2, []byte{'2', '9'}) - mgr.UpdateTotalKeys(1, 100, false) - require.False(t, mgr.IsKeyProcessed([]byte{'1', '9'})) - mgr.UpdateWrittenKeys(1, 100) + mgr.UpdateTotalKeys(0, 100, false) + require.False(t, mgr.IsKeyProcessed([]byte{'0', '9'})) + mgr.UpdateWrittenKeys(0, 100) mgr.AdvanceWatermark(true, false) - require.False(t, mgr.IsKeyProcessed([]byte{'1', '9'})) - mgr.UpdateTotalKeys(1, 100, true) - mgr.UpdateWrittenKeys(1, 100) + require.False(t, mgr.IsKeyProcessed([]byte{'0', '9'})) + mgr.UpdateTotalKeys(0, 100, true) + mgr.UpdateWrittenKeys(0, 100) mgr.AdvanceWatermark(true, false) // The data is not imported to the storage yet. - require.False(t, mgr.IsKeyProcessed([]byte{'1', '9'})) - mgr.UpdateWrittenKeys(2, 0) + require.False(t, mgr.IsKeyProcessed([]byte{'0', '9'})) + mgr.UpdateWrittenKeys(1, 0) mgr.AdvanceWatermark(true, true) // Mock the data is imported to the storage. - require.True(t, mgr.IsKeyProcessed([]byte{'1', '9'})) + require.True(t, mgr.IsKeyProcessed([]byte{'0', '9'})) // Only when the last batch is completed, the job can be completed. - mgr.UpdateTotalKeys(2, 50, false) - mgr.UpdateTotalKeys(2, 50, true) - mgr.UpdateWrittenKeys(2, 50) + mgr.UpdateTotalKeys(1, 50, false) + mgr.UpdateTotalKeys(1, 50, true) + mgr.UpdateWrittenKeys(1, 50) mgr.AdvanceWatermark(true, true) - require.True(t, mgr.IsKeyProcessed([]byte{'1', '9'})) - require.False(t, mgr.IsKeyProcessed([]byte{'2', '9'})) - mgr.UpdateWrittenKeys(2, 50) + require.True(t, mgr.IsKeyProcessed([]byte{'0', '9'})) + require.False(t, mgr.IsKeyProcessed([]byte{'1', '9'})) + mgr.UpdateWrittenKeys(1, 50) mgr.AdvanceWatermark(true, true) + require.True(t, mgr.IsKeyProcessed([]byte{'0', '9'})) require.True(t, mgr.IsKeyProcessed([]byte{'1', '9'})) - require.True(t, mgr.IsKeyProcessed([]byte{'2', '9'})) // Only when the subsequent job is completed, the previous job can be completed. + mgr.Register(2, []byte{'2', '9'}) mgr.Register(3, []byte{'3', '9'}) mgr.Register(4, []byte{'4', '9'}) - mgr.Register(5, []byte{'5', '9'}) + mgr.UpdateTotalKeys(2, 100, true) mgr.UpdateTotalKeys(3, 100, true) mgr.UpdateTotalKeys(4, 100, true) - mgr.UpdateTotalKeys(5, 100, true) - mgr.UpdateWrittenKeys(5, 100) - mgr.AdvanceWatermark(true, true) mgr.UpdateWrittenKeys(4, 100) mgr.AdvanceWatermark(true, true) + mgr.UpdateWrittenKeys(3, 100) + mgr.AdvanceWatermark(true, true) + require.False(t, mgr.IsKeyProcessed([]byte{'2', '9'})) require.False(t, mgr.IsKeyProcessed([]byte{'3', '9'})) - require.False(t, mgr.IsKeyProcessed([]byte{'4', '9'})) } func TestCheckpointManagerUpdateReorg(t *testing.T) { @@ -126,9 +126,9 @@ func TestCheckpointManagerUpdateReorg(t *testing.T) { mgr, err := ingest.NewCheckpointManager(ctx, sessPool, 1, 1, []int64{1}, tmpFolder, mockGetTSClient{}) require.NoError(t, err) - mgr.Register(1, []byte{'1', '9'}) - mgr.UpdateTotalKeys(1, 100, true) - mgr.UpdateWrittenKeys(1, 100) + mgr.Register(0, []byte{'1', '9'}) + mgr.UpdateTotalKeys(0, 100, true) + mgr.UpdateWrittenKeys(0, 100) mgr.AdvanceWatermark(true, true) mgr.Close() // Wait the global checkpoint to be updated to the reorg table. r, err := tk.Exec("select reorg_meta from mysql.tidb_ddl_reorg where job_id = 1 and ele_id = 1;") diff --git a/pkg/ddl/job_scheduler.go b/pkg/ddl/job_scheduler.go index 53306abf9dfc5..b884cc2263f1d 100644 --- a/pkg/ddl/job_scheduler.go +++ b/pkg/ddl/job_scheduler.go @@ -98,7 +98,7 @@ type ownerListener struct { var _ owner.Listener = (*ownerListener)(nil) func (l *ownerListener) OnBecomeOwner() { - ctx, cancelFunc := context.WithCancel(l.ddl.ddlCtx.ctx) + ctx, cancelFunc := context.WithCancelCause(l.ddl.ddlCtx.ctx) sysTblMgr := systable.NewManager(l.ddl.sessPool) l.scheduler = &jobScheduler{ schCtx: ctx, @@ -133,7 +133,7 @@ func (l *ownerListener) OnRetireOwner() { type jobScheduler struct { // *ddlCtx already have context named as "ctx", so we use "schCtx" here to avoid confusion. schCtx context.Context - cancel context.CancelFunc + cancel context.CancelCauseFunc wg tidbutil.WaitGroupWrapper runningJobs *runningJobs sysTblMgr systable.Manager @@ -185,7 +185,7 @@ func (s *jobScheduler) start() { } func (s *jobScheduler) close() { - s.cancel() + s.cancel(dbterror.ErrNotOwner) s.wg.Wait() if s.reorgWorkerPool != nil { s.reorgWorkerPool.close() diff --git a/tests/realtikvtest/addindextest3/operator_test.go b/tests/realtikvtest/addindextest3/operator_test.go index 8c3b22322b2fe..5fac1ec6063ae 100644 --- a/tests/realtikvtest/addindextest3/operator_test.go +++ b/tests/realtikvtest/addindextest3/operator_test.go @@ -74,7 +74,7 @@ func TestBackfillOperators(t *testing.T) { tasks := sink.Collect() require.Len(t, tasks, 10) - require.Equal(t, 1, tasks[0].ID) + require.Equal(t, 0, tasks[0].ID) require.Equal(t, startKey, tasks[0].Start) require.Equal(t, endKey, tasks[9].End)