From f0ac43e570c546d106d542e9907e32f6f5c1bfb5 Mon Sep 17 00:00:00 2001 From: tangenta Date: Mon, 25 Nov 2024 14:10:08 +0800 Subject: [PATCH] ddl: fix reorg handle not resumed after changing DDL owner --- ddl/backfilling.go | 4 +++- ddl/backfilling_scheduler.go | 3 ++- ddl/column_modify_test.go | 42 +++++++++++++++++++++++++++++++++++ ddl/ingest/checkpoint.go | 8 +++---- ddl/ingest/checkpoint_test.go | 38 +++++++++++++++---------------- 5 files changed, 70 insertions(+), 25 deletions(-) diff --git a/ddl/backfilling.go b/ddl/backfilling.go index 61edd6857a6c1..2a75a6a670209 100644 --- a/ddl/backfilling.go +++ b/ddl/backfilling.go @@ -422,6 +422,7 @@ func splitTableRanges(t table.PhysicalTable, store kv.Storage, startKey, endKey errMsg := fmt.Sprintf("cannot find region in range [%s, %s]", startKey.String(), endKey.String()) return nil, errors.Trace(dbterror.ErrInvalidSplitRegionRanges.GenWithStackByArgs(errMsg)) } + failpoint.InjectCall("afterLoadTableRanges", len(ranges)) return ranges, nil } @@ -534,6 +535,7 @@ func handleOneResult(result *backfillResult, scheduler backfillScheduler, consum logutil.BgLogger().Warn("[ddl] update reorg meta failed", zap.Int64("job ID", reorgInfo.ID), zap.Error(err)) } + failpoint.InjectCall("afterUpdateReorgMeta") } // We try to adjust the worker size regularly to reduce // the overhead of loading the DDL related global variables. @@ -681,7 +683,7 @@ func SetBackfillTaskChanSizeForTest(n int) { // // The above operations are completed in a transaction. // Finally, update the concurrent processing of the total number of rows, and store the completed handle value. -func (dc *ddlCtx) writePhysicalTableRecord(sessPool *sess.Pool, t table.PhysicalTable, bfWorkerType backfillerType, reorgInfo *reorgInfo) error { +func (dc *ddlCtx) writePhysicalTableRecord(sessPool *sess.Pool, t table.PhysicalTable, bfWorkerType backfillerType, reorgInfo *reorgInfo) (err error) { job := reorgInfo.Job totalAddedCount := job.GetRowCount() diff --git a/ddl/backfilling_scheduler.go b/ddl/backfilling_scheduler.go index ff60137a258aa..24f62727542e2 100644 --- a/ddl/backfilling_scheduler.go +++ b/ddl/backfilling_scheduler.go @@ -506,6 +506,7 @@ func newTaskIDAllocator() *taskIDAllocator { } func (a *taskIDAllocator) alloc() int { + ret := a.id a.id++ - return a.id + return ret } diff --git a/ddl/column_modify_test.go b/ddl/column_modify_test.go index d02db5986273c..18b6ee6ed41b1 100644 --- a/ddl/column_modify_test.go +++ b/ddl/column_modify_test.go @@ -24,6 +24,7 @@ import ( "time" "github.com/pingcap/errors" + "github.com/pingcap/failpoint" testddlutil "github.com/pingcap/tidb/ddl/testutil" "github.com/pingcap/tidb/ddl/util/callback" "github.com/pingcap/tidb/domain" @@ -1029,3 +1030,44 @@ func TestColumnTypeChangeGenUniqueChangingName(t *testing.T) { tk.MustExec("drop table if exists t") } + +func TestModifyColumnReorgCheckpoint(t *testing.T) { + store, dom := testkit.CreateMockStoreAndDomainWithSchemaLease(t, columnModifyLease) + tk := testkit.NewTestKit(t, store) + tk.MustExec("use test") + tk2 := testkit.NewTestKit(t, store) + tk2.MustExec("use test") + tk.MustExec("set global tidb_ddl_reorg_worker_cnt = 1;") + tk.MustExec("create table t (a int primary key, b bigint);") + rowCnt := 10 + for i := 0; i < rowCnt; i++ { + tk.MustExec(fmt.Sprintf("insert into t values (%d, %d)", i*10000, i*10000)) + } + splitTableSQL := fmt.Sprintf("split table t between (0) and (%d*10000) regions %d;", rowCnt, rowCnt) + tk.MustQuery(splitTableSQL).Check(testkit.Rows(fmt.Sprintf("%d 1", rowCnt-1))) + + retireOwner := false + err := failpoint.EnableCall("github.com/pingcap/tidb/pkg/ddl/afterUpdateReorgMeta", func() { + if !retireOwner { + retireOwner = true + dom.DDL().OwnerManager().ResignOwner(context.Background()) + } + }) + require.NoError(t, err) + defer func() { + require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/ddl/afterUpdateReorgMeta")) + }() + + rangeCnts := []int{} + err = failpoint.EnableCall("github.com/pingcap/tidb/pkg/ddl/afterLoadTableRanges", func(rangeCnt int) { + rangeCnts = append(rangeCnts, rangeCnt) + }) + require.NoError(t, err) + defer func() { + require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/ddl/afterLoadTableRanges")) + }() + + tk.MustExec("alter table t modify column b int;") + require.Len(t, rangeCnts, 2) // It should have two rounds for loading table ranges. + require.Less(t, rangeCnts[1], rangeCnts[0]) // Verify if the checkpoint is progressing. +} diff --git a/ddl/ingest/checkpoint.go b/ddl/ingest/checkpoint.go index 33880a78ce4bc..1262e5b0af150 100644 --- a/ddl/ingest/checkpoint.go +++ b/ddl/ingest/checkpoint.go @@ -55,7 +55,7 @@ type CheckpointManager struct { // Live in memory. mu sync.Mutex checkpoints map[int]*taskCheckpoint // task ID -> checkpoint - // we require each task ID to be continuous and start from 1. + // we require each task ID to be continuous and start from 0. minTaskIDFinished int dirty bool // Local meta. @@ -166,7 +166,7 @@ func (s *CheckpointManager) Status() (keyCnt int, minKeyImported kv.Key) { } // Register registers a new task. taskID MUST be continuous ascending and start -// from 1. +// from 0. // // TODO(lance6716): remove this constraint, use endKey as taskID and use // ordered map type for checkpoints. @@ -219,14 +219,14 @@ func (s *CheckpointManager) UpdateWrittenKeys(taskID int, delta int) error { // afterFlush should be called after all engine is flushed. func (s *CheckpointManager) afterFlush() { for { - cp := s.checkpoints[s.minTaskIDFinished+1] + cp := s.checkpoints[s.minTaskIDFinished] if cp == nil || !cp.lastBatchRead || cp.writtenKeys < cp.totalKeys { break } + delete(s.checkpoints, s.minTaskIDFinished) s.minTaskIDFinished++ s.minFlushedKey = cp.endKey s.flushedKeyCnt += cp.totalKeys - delete(s.checkpoints, s.minTaskIDFinished) s.dirty = true } } diff --git a/ddl/ingest/checkpoint_test.go b/ddl/ingest/checkpoint_test.go index 7847fc9a58a4d..cb2532525235d 100644 --- a/ddl/ingest/checkpoint_test.go +++ b/ddl/ingest/checkpoint_test.go @@ -52,39 +52,39 @@ func TestCheckpointManager(t *testing.T) { require.NoError(t, err) defer mgr.Close() - mgr.Register(1, []byte{'1', '9'}) - mgr.Register(2, []byte{'2', '9'}) - mgr.UpdateTotalKeys(1, 100, false) + mgr.Register(0, []byte{'1', '9'}) + mgr.Register(1, []byte{'2', '9'}) + mgr.UpdateTotalKeys(0, 100, false) require.False(t, mgr.IsKeyProcessed([]byte{'1', '9'})) - require.NoError(t, mgr.UpdateWrittenKeys(1, 100)) + require.NoError(t, mgr.UpdateWrittenKeys(0, 100)) require.False(t, mgr.IsKeyProcessed([]byte{'1', '9'})) - mgr.UpdateTotalKeys(1, 100, true) - require.NoError(t, mgr.UpdateWrittenKeys(1, 100)) + mgr.UpdateTotalKeys(0, 100, true) + require.NoError(t, mgr.UpdateWrittenKeys(0, 100)) // The data is not imported to the storage yet. require.False(t, mgr.IsKeyProcessed([]byte{'1', '9'})) flushCtrl.imported = true // Mock the data is imported to the storage. - require.NoError(t, mgr.UpdateWrittenKeys(2, 0)) + require.NoError(t, mgr.UpdateWrittenKeys(1, 0)) require.True(t, mgr.IsKeyProcessed([]byte{'1', '9'})) // Only when the last batch is completed, the job can be completed. - mgr.UpdateTotalKeys(2, 50, false) - mgr.UpdateTotalKeys(2, 50, true) - require.NoError(t, mgr.UpdateWrittenKeys(2, 50)) + mgr.UpdateTotalKeys(1, 50, false) + mgr.UpdateTotalKeys(1, 50, true) + require.NoError(t, mgr.UpdateWrittenKeys(1, 50)) require.True(t, mgr.IsKeyProcessed([]byte{'1', '9'})) require.False(t, mgr.IsKeyProcessed([]byte{'2', '9'})) - require.NoError(t, mgr.UpdateWrittenKeys(2, 50)) + require.NoError(t, mgr.UpdateWrittenKeys(1, 50)) require.True(t, mgr.IsKeyProcessed([]byte{'1', '9'})) require.True(t, mgr.IsKeyProcessed([]byte{'2', '9'})) // Only when the subsequent job is completed, the previous job can be completed. - mgr.Register(3, []byte{'3', '9'}) - mgr.Register(4, []byte{'4', '9'}) - mgr.Register(5, []byte{'5', '9'}) + mgr.Register(2, []byte{'3', '9'}) + mgr.Register(3, []byte{'4', '9'}) + mgr.Register(4, []byte{'5', '9'}) + mgr.UpdateTotalKeys(2, 100, true) mgr.UpdateTotalKeys(3, 100, true) mgr.UpdateTotalKeys(4, 100, true) - mgr.UpdateTotalKeys(5, 100, true) - require.NoError(t, mgr.UpdateWrittenKeys(5, 100)) require.NoError(t, mgr.UpdateWrittenKeys(4, 100)) + require.NoError(t, mgr.UpdateWrittenKeys(3, 100)) require.False(t, mgr.IsKeyProcessed([]byte{'3', '9'})) require.False(t, mgr.IsKeyProcessed([]byte{'4', '9'})) } @@ -107,9 +107,9 @@ func TestCheckpointManagerUpdateReorg(t *testing.T) { require.NoError(t, err) defer mgr.Close() - mgr.Register(1, []byte{'1', '9'}) - mgr.UpdateTotalKeys(1, 100, true) - require.NoError(t, mgr.UpdateWrittenKeys(1, 100)) + mgr.Register(0, []byte{'1', '9'}) + mgr.UpdateTotalKeys(0, 100, true) + require.NoError(t, mgr.UpdateWrittenKeys(0, 100)) mgr.Flush() // Wait the global checkpoint to be updated to the reorg table. r, err := tk.Exec("select reorg_meta from mysql.tidb_ddl_reorg where job_id = 1 and ele_id = 1;") require.NoError(t, err)