Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ddl: fix reorg handle not resumed after changing DDL owner #56507

Merged
merged 5 commits into from
Oct 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion pkg/ddl/backfilling.go
Original file line number Diff line number Diff line change
Expand Up @@ -510,6 +510,7 @@ func loadTableRanges(
zap.String("range start", hex.EncodeToString(ranges[0].StartKey)),
zap.String("range end", hex.EncodeToString(ranges[len(ranges)-1].EndKey)),
zap.Int("range count", len(ranges)))
failpoint.InjectCall("afterLoadTableRanges", len(ranges))
return ranges, nil
}

Expand Down Expand Up @@ -845,12 +846,17 @@ func (dc *ddlCtx) writePhysicalTableRecord(
t table.PhysicalTable,
bfWorkerType backfillerType,
reorgInfo *reorgInfo,
) error {
) (err error) {
startKey, endKey := reorgInfo.StartKey, reorgInfo.EndKey

if err := dc.isReorgRunnable(reorgInfo.Job.ID, false); err != nil {
return errors.Trace(err)
}
defer func() {
if err != nil && ctx.Err() != nil {
err = context.Cause(ctx)
}
}()

failpoint.Inject("MockCaseWhenParseFailure", func(val failpoint.Value) {
//nolint:forcetypeassert
Expand Down Expand Up @@ -931,6 +937,7 @@ func (dc *ddlCtx) writePhysicalTableRecord(
zap.Int64("job ID", reorgInfo.ID),
zap.Error(err2))
}
failpoint.InjectCall("afterUpdateReorgMeta")
}
}
}
Expand Down
3 changes: 2 additions & 1 deletion pkg/ddl/backfilling_scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -375,6 +375,7 @@ func newTaskIDAllocator() *taskIDAllocator {
}

func (a *taskIDAllocator) alloc() int {
ret := a.id
a.id++
return a.id
return ret
}
33 changes: 33 additions & 0 deletions pkg/ddl/column_modify_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -635,3 +635,36 @@ func TestColumnTypeChangeGenUniqueChangingName(t *testing.T) {

tk.MustExec("drop table if exists t")
}

func TestModifyColumnReorgCheckpoint(t *testing.T) {
store, dom := testkit.CreateMockStoreAndDomainWithSchemaLease(t, columnModifyLease)
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
tk2 := testkit.NewTestKit(t, store)
tk2.MustExec("use test")
tk.MustExec("set global tidb_ddl_reorg_worker_cnt = 1;")
tk.MustExec("create table t (a int primary key, b bigint);")
rowCnt := 10
for i := 0; i < rowCnt; i++ {
tk.MustExec(fmt.Sprintf("insert into t values (%d, %d)", i*10000, i*10000))
}
splitTableSQL := fmt.Sprintf("split table t between (0) and (%d*10000) regions %d;", rowCnt, rowCnt)
tk.MustQuery(splitTableSQL).Check(testkit.Rows(fmt.Sprintf("%d 1", rowCnt-1)))

retireOwner := false
testfailpoint.EnableCall(t, "github.com/pingcap/tidb/pkg/ddl/afterUpdateReorgMeta", func() {
if !retireOwner {
retireOwner = true
dom.DDL().OwnerManager().ResignOwner(context.Background())
}
})

rangeCnts := []int{}
testfailpoint.EnableCall(t, "github.com/pingcap/tidb/pkg/ddl/afterLoadTableRanges", func(rangeCnt int) {
rangeCnts = append(rangeCnts, rangeCnt)
})

tk.MustExec("alter table t modify column b int;")
require.Len(t, rangeCnts, 2) // It should have two rounds for loading table ranges.
require.Less(t, rangeCnts[1], rangeCnts[0]) // Verify if the checkpoint is progressing.
}
8 changes: 4 additions & 4 deletions pkg/ddl/ingest/checkpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ type CheckpointManager struct {
// Live in memory.
mu sync.Mutex
checkpoints map[int]*taskCheckpoint // task ID -> checkpoint
// we require each task ID to be continuous and start from 1.
// we require each task ID to be continuous and start from 0.
minTaskIDFinished int
dirty bool

Expand Down Expand Up @@ -184,7 +184,7 @@ func (s *CheckpointManager) Status() (keyCnt int, minKeyImported kv.Key) {
}

// Register registers a new task. taskID MUST be continuous ascending and start
// from 1.
// from 0.
//
// TODO(lance6716): remove this constraint, use endKey as taskID and use
// ordered map type for checkpoints.
Expand Down Expand Up @@ -242,14 +242,14 @@ func (s *CheckpointManager) AdvanceWatermark(flushed, imported bool) {
// afterFlush should be called after all engine is flushed.
func (s *CheckpointManager) afterFlush() {
for {
cp := s.checkpoints[s.minTaskIDFinished+1]
cp := s.checkpoints[s.minTaskIDFinished]
if cp == nil || !cp.lastBatchRead || cp.writtenKeys < cp.totalKeys {
break
}
delete(s.checkpoints, s.minTaskIDFinished)
s.minTaskIDFinished++
s.flushedKeyLowWatermark = cp.endKey
s.flushedKeyCnt += cp.totalKeys
delete(s.checkpoints, s.minTaskIDFinished)
s.dirty = true
}
}
Expand Down
50 changes: 25 additions & 25 deletions pkg/ddl/ingest/checkpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,47 +67,47 @@ func TestCheckpointManager(t *testing.T) {
require.NoError(t, err)
defer mgr.Close()

mgr.Register(0, []byte{'0', '9'})
mgr.Register(1, []byte{'1', '9'})
mgr.Register(2, []byte{'2', '9'})
mgr.UpdateTotalKeys(1, 100, false)
require.False(t, mgr.IsKeyProcessed([]byte{'1', '9'}))
mgr.UpdateWrittenKeys(1, 100)
mgr.UpdateTotalKeys(0, 100, false)
require.False(t, mgr.IsKeyProcessed([]byte{'0', '9'}))
mgr.UpdateWrittenKeys(0, 100)
mgr.AdvanceWatermark(true, false)
require.False(t, mgr.IsKeyProcessed([]byte{'1', '9'}))
mgr.UpdateTotalKeys(1, 100, true)
mgr.UpdateWrittenKeys(1, 100)
require.False(t, mgr.IsKeyProcessed([]byte{'0', '9'}))
mgr.UpdateTotalKeys(0, 100, true)
mgr.UpdateWrittenKeys(0, 100)
mgr.AdvanceWatermark(true, false)
// The data is not imported to the storage yet.
require.False(t, mgr.IsKeyProcessed([]byte{'1', '9'}))
mgr.UpdateWrittenKeys(2, 0)
require.False(t, mgr.IsKeyProcessed([]byte{'0', '9'}))
mgr.UpdateWrittenKeys(1, 0)
mgr.AdvanceWatermark(true, true) // Mock the data is imported to the storage.
require.True(t, mgr.IsKeyProcessed([]byte{'1', '9'}))
require.True(t, mgr.IsKeyProcessed([]byte{'0', '9'}))

// Only when the last batch is completed, the job can be completed.
mgr.UpdateTotalKeys(2, 50, false)
mgr.UpdateTotalKeys(2, 50, true)
mgr.UpdateWrittenKeys(2, 50)
mgr.UpdateTotalKeys(1, 50, false)
mgr.UpdateTotalKeys(1, 50, true)
mgr.UpdateWrittenKeys(1, 50)
mgr.AdvanceWatermark(true, true)
require.True(t, mgr.IsKeyProcessed([]byte{'1', '9'}))
require.False(t, mgr.IsKeyProcessed([]byte{'2', '9'}))
mgr.UpdateWrittenKeys(2, 50)
require.True(t, mgr.IsKeyProcessed([]byte{'0', '9'}))
require.False(t, mgr.IsKeyProcessed([]byte{'1', '9'}))
mgr.UpdateWrittenKeys(1, 50)
mgr.AdvanceWatermark(true, true)
require.True(t, mgr.IsKeyProcessed([]byte{'0', '9'}))
require.True(t, mgr.IsKeyProcessed([]byte{'1', '9'}))
require.True(t, mgr.IsKeyProcessed([]byte{'2', '9'}))

// Only when the subsequent job is completed, the previous job can be completed.
mgr.Register(2, []byte{'2', '9'})
mgr.Register(3, []byte{'3', '9'})
mgr.Register(4, []byte{'4', '9'})
mgr.Register(5, []byte{'5', '9'})
mgr.UpdateTotalKeys(2, 100, true)
mgr.UpdateTotalKeys(3, 100, true)
mgr.UpdateTotalKeys(4, 100, true)
mgr.UpdateTotalKeys(5, 100, true)
mgr.UpdateWrittenKeys(5, 100)
mgr.AdvanceWatermark(true, true)
mgr.UpdateWrittenKeys(4, 100)
mgr.AdvanceWatermark(true, true)
mgr.UpdateWrittenKeys(3, 100)
mgr.AdvanceWatermark(true, true)
require.False(t, mgr.IsKeyProcessed([]byte{'2', '9'}))
require.False(t, mgr.IsKeyProcessed([]byte{'3', '9'}))
require.False(t, mgr.IsKeyProcessed([]byte{'4', '9'}))
}

func TestCheckpointManagerUpdateReorg(t *testing.T) {
Expand All @@ -126,9 +126,9 @@ func TestCheckpointManagerUpdateReorg(t *testing.T) {
mgr, err := ingest.NewCheckpointManager(ctx, sessPool, 1, 1, []int64{1}, tmpFolder, mockGetTSClient{})
require.NoError(t, err)

mgr.Register(1, []byte{'1', '9'})
mgr.UpdateTotalKeys(1, 100, true)
mgr.UpdateWrittenKeys(1, 100)
mgr.Register(0, []byte{'1', '9'})
mgr.UpdateTotalKeys(0, 100, true)
mgr.UpdateWrittenKeys(0, 100)
mgr.AdvanceWatermark(true, true)
mgr.Close() // Wait the global checkpoint to be updated to the reorg table.
r, err := tk.Exec("select reorg_meta from mysql.tidb_ddl_reorg where job_id = 1 and ele_id = 1;")
Expand Down
6 changes: 3 additions & 3 deletions pkg/ddl/job_scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ type ownerListener struct {
var _ owner.Listener = (*ownerListener)(nil)

func (l *ownerListener) OnBecomeOwner() {
ctx, cancelFunc := context.WithCancel(l.ddl.ddlCtx.ctx)
ctx, cancelFunc := context.WithCancelCause(l.ddl.ddlCtx.ctx)
sysTblMgr := systable.NewManager(l.ddl.sessPool)
l.scheduler = &jobScheduler{
schCtx: ctx,
Expand Down Expand Up @@ -133,7 +133,7 @@ func (l *ownerListener) OnRetireOwner() {
type jobScheduler struct {
// *ddlCtx already have context named as "ctx", so we use "schCtx" here to avoid confusion.
schCtx context.Context
cancel context.CancelFunc
cancel context.CancelCauseFunc
wg tidbutil.WaitGroupWrapper
runningJobs *runningJobs
sysTblMgr systable.Manager
Expand Down Expand Up @@ -185,7 +185,7 @@ func (s *jobScheduler) start() {
}

func (s *jobScheduler) close() {
s.cancel()
s.cancel(dbterror.ErrNotOwner)
s.wg.Wait()
if s.reorgWorkerPool != nil {
s.reorgWorkerPool.close()
Expand Down
2 changes: 1 addition & 1 deletion tests/realtikvtest/addindextest3/operator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ func TestBackfillOperators(t *testing.T) {

tasks := sink.Collect()
require.Len(t, tasks, 10)
require.Equal(t, 1, tasks[0].ID)
require.Equal(t, 0, tasks[0].ID)
require.Equal(t, startKey, tasks[0].Start)
require.Equal(t, endKey, tasks[9].End)

Expand Down