Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ddl: fix reorg handle not resumed after changing DDL owner (#56507) #57021

Open
wants to merge 1 commit into
base: release-7.1
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion ddl/backfilling.go
Original file line number Diff line number Diff line change
Expand Up @@ -422,6 +422,7 @@ func splitTableRanges(t table.PhysicalTable, store kv.Storage, startKey, endKey
errMsg := fmt.Sprintf("cannot find region in range [%s, %s]", startKey.String(), endKey.String())
return nil, errors.Trace(dbterror.ErrInvalidSplitRegionRanges.GenWithStackByArgs(errMsg))
}
failpoint.InjectCall("afterLoadTableRanges", len(ranges))
return ranges, nil
}

Expand Down Expand Up @@ -534,6 +535,7 @@ func handleOneResult(result *backfillResult, scheduler backfillScheduler, consum
logutil.BgLogger().Warn("[ddl] update reorg meta failed",
zap.Int64("job ID", reorgInfo.ID), zap.Error(err))
}
failpoint.InjectCall("afterUpdateReorgMeta")
}
// We try to adjust the worker size regularly to reduce
// the overhead of loading the DDL related global variables.
Expand Down Expand Up @@ -681,7 +683,7 @@ func SetBackfillTaskChanSizeForTest(n int) {
//
// The above operations are completed in a transaction.
// Finally, update the concurrent processing of the total number of rows, and store the completed handle value.
func (dc *ddlCtx) writePhysicalTableRecord(sessPool *sess.Pool, t table.PhysicalTable, bfWorkerType backfillerType, reorgInfo *reorgInfo) error {
func (dc *ddlCtx) writePhysicalTableRecord(sessPool *sess.Pool, t table.PhysicalTable, bfWorkerType backfillerType, reorgInfo *reorgInfo) (err error) {
job := reorgInfo.Job
totalAddedCount := job.GetRowCount()

Expand Down
3 changes: 2 additions & 1 deletion ddl/backfilling_scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -506,6 +506,7 @@ func newTaskIDAllocator() *taskIDAllocator {
}

func (a *taskIDAllocator) alloc() int {
ret := a.id
a.id++
return a.id
return ret
}
42 changes: 42 additions & 0 deletions ddl/column_modify_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"time"

"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
testddlutil "github.com/pingcap/tidb/ddl/testutil"
"github.com/pingcap/tidb/ddl/util/callback"
"github.com/pingcap/tidb/domain"
Expand Down Expand Up @@ -1029,3 +1030,44 @@ func TestColumnTypeChangeGenUniqueChangingName(t *testing.T) {

tk.MustExec("drop table if exists t")
}

func TestModifyColumnReorgCheckpoint(t *testing.T) {
store, dom := testkit.CreateMockStoreAndDomainWithSchemaLease(t, columnModifyLease)
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
tk2 := testkit.NewTestKit(t, store)
tk2.MustExec("use test")
tk.MustExec("set global tidb_ddl_reorg_worker_cnt = 1;")
tk.MustExec("create table t (a int primary key, b bigint);")
rowCnt := 10
for i := 0; i < rowCnt; i++ {
tk.MustExec(fmt.Sprintf("insert into t values (%d, %d)", i*10000, i*10000))
}
splitTableSQL := fmt.Sprintf("split table t between (0) and (%d*10000) regions %d;", rowCnt, rowCnt)
tk.MustQuery(splitTableSQL).Check(testkit.Rows(fmt.Sprintf("%d 1", rowCnt-1)))

retireOwner := false
err := failpoint.EnableCall("github.com/pingcap/tidb/pkg/ddl/afterUpdateReorgMeta", func() {
if !retireOwner {
retireOwner = true
dom.DDL().OwnerManager().ResignOwner(context.Background())
}
})
require.NoError(t, err)
defer func() {
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/ddl/afterUpdateReorgMeta"))
}()

rangeCnts := []int{}
err = failpoint.EnableCall("github.com/pingcap/tidb/pkg/ddl/afterLoadTableRanges", func(rangeCnt int) {
rangeCnts = append(rangeCnts, rangeCnt)
})
require.NoError(t, err)
defer func() {
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/ddl/afterLoadTableRanges"))
}()

tk.MustExec("alter table t modify column b int;")
require.Len(t, rangeCnts, 2) // It should have two rounds for loading table ranges.
require.Less(t, rangeCnts[1], rangeCnts[0]) // Verify if the checkpoint is progressing.
}
8 changes: 4 additions & 4 deletions ddl/ingest/checkpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ type CheckpointManager struct {
// Live in memory.
mu sync.Mutex
checkpoints map[int]*taskCheckpoint // task ID -> checkpoint
// we require each task ID to be continuous and start from 1.
// we require each task ID to be continuous and start from 0.
minTaskIDFinished int
dirty bool
// Local meta.
Expand Down Expand Up @@ -166,7 +166,7 @@ func (s *CheckpointManager) Status() (keyCnt int, minKeyImported kv.Key) {
}

// Register registers a new task. taskID MUST be continuous ascending and start
// from 1.
// from 0.
//
// TODO(lance6716): remove this constraint, use endKey as taskID and use
// ordered map type for checkpoints.
Expand Down Expand Up @@ -219,14 +219,14 @@ func (s *CheckpointManager) UpdateWrittenKeys(taskID int, delta int) error {
// afterFlush should be called after all engine is flushed.
func (s *CheckpointManager) afterFlush() {
for {
cp := s.checkpoints[s.minTaskIDFinished+1]
cp := s.checkpoints[s.minTaskIDFinished]
if cp == nil || !cp.lastBatchRead || cp.writtenKeys < cp.totalKeys {
break
}
delete(s.checkpoints, s.minTaskIDFinished)
s.minTaskIDFinished++
s.minFlushedKey = cp.endKey
s.flushedKeyCnt += cp.totalKeys
delete(s.checkpoints, s.minTaskIDFinished)
s.dirty = true
}
}
Expand Down
38 changes: 19 additions & 19 deletions ddl/ingest/checkpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,39 +52,39 @@ func TestCheckpointManager(t *testing.T) {
require.NoError(t, err)
defer mgr.Close()

mgr.Register(1, []byte{'1', '9'})
mgr.Register(2, []byte{'2', '9'})
mgr.UpdateTotalKeys(1, 100, false)
mgr.Register(0, []byte{'1', '9'})
mgr.Register(1, []byte{'2', '9'})
mgr.UpdateTotalKeys(0, 100, false)
require.False(t, mgr.IsKeyProcessed([]byte{'1', '9'}))
require.NoError(t, mgr.UpdateWrittenKeys(1, 100))
require.NoError(t, mgr.UpdateWrittenKeys(0, 100))
require.False(t, mgr.IsKeyProcessed([]byte{'1', '9'}))
mgr.UpdateTotalKeys(1, 100, true)
require.NoError(t, mgr.UpdateWrittenKeys(1, 100))
mgr.UpdateTotalKeys(0, 100, true)
require.NoError(t, mgr.UpdateWrittenKeys(0, 100))
// The data is not imported to the storage yet.
require.False(t, mgr.IsKeyProcessed([]byte{'1', '9'}))
flushCtrl.imported = true // Mock the data is imported to the storage.
require.NoError(t, mgr.UpdateWrittenKeys(2, 0))
require.NoError(t, mgr.UpdateWrittenKeys(1, 0))
require.True(t, mgr.IsKeyProcessed([]byte{'1', '9'}))

// Only when the last batch is completed, the job can be completed.
mgr.UpdateTotalKeys(2, 50, false)
mgr.UpdateTotalKeys(2, 50, true)
require.NoError(t, mgr.UpdateWrittenKeys(2, 50))
mgr.UpdateTotalKeys(1, 50, false)
mgr.UpdateTotalKeys(1, 50, true)
require.NoError(t, mgr.UpdateWrittenKeys(1, 50))
require.True(t, mgr.IsKeyProcessed([]byte{'1', '9'}))
require.False(t, mgr.IsKeyProcessed([]byte{'2', '9'}))
require.NoError(t, mgr.UpdateWrittenKeys(2, 50))
require.NoError(t, mgr.UpdateWrittenKeys(1, 50))
require.True(t, mgr.IsKeyProcessed([]byte{'1', '9'}))
require.True(t, mgr.IsKeyProcessed([]byte{'2', '9'}))

// Only when the subsequent job is completed, the previous job can be completed.
mgr.Register(3, []byte{'3', '9'})
mgr.Register(4, []byte{'4', '9'})
mgr.Register(5, []byte{'5', '9'})
mgr.Register(2, []byte{'3', '9'})
mgr.Register(3, []byte{'4', '9'})
mgr.Register(4, []byte{'5', '9'})
mgr.UpdateTotalKeys(2, 100, true)
mgr.UpdateTotalKeys(3, 100, true)
mgr.UpdateTotalKeys(4, 100, true)
mgr.UpdateTotalKeys(5, 100, true)
require.NoError(t, mgr.UpdateWrittenKeys(5, 100))
require.NoError(t, mgr.UpdateWrittenKeys(4, 100))
require.NoError(t, mgr.UpdateWrittenKeys(3, 100))
require.False(t, mgr.IsKeyProcessed([]byte{'3', '9'}))
require.False(t, mgr.IsKeyProcessed([]byte{'4', '9'}))
}
Expand All @@ -107,9 +107,9 @@ func TestCheckpointManagerUpdateReorg(t *testing.T) {
require.NoError(t, err)
defer mgr.Close()

mgr.Register(1, []byte{'1', '9'})
mgr.UpdateTotalKeys(1, 100, true)
require.NoError(t, mgr.UpdateWrittenKeys(1, 100))
mgr.Register(0, []byte{'1', '9'})
mgr.UpdateTotalKeys(0, 100, true)
require.NoError(t, mgr.UpdateWrittenKeys(0, 100))
mgr.Flush() // Wait the global checkpoint to be updated to the reorg table.
r, err := tk.Exec("select reorg_meta from mysql.tidb_ddl_reorg where job_id = 1 and ele_id = 1;")
require.NoError(t, err)
Expand Down