diff --git a/pkg/ddl/backfilling.go b/pkg/ddl/backfilling.go index 394b1e77436ed..abf10591f1cec 100644 --- a/pkg/ddl/backfilling.go +++ b/pkg/ddl/backfilling.go @@ -327,7 +327,7 @@ func (w *backfillWorker) handleBackfillTask(d *ddlCtx, task *reorgBackfillTask, // we will never cancel the job once there is panic in bf.BackfillData. // Because reorgRecordTask may run a long time, // we should check whether this ddl job is still runnable. - err := d.isReorgRunnable(jobID, false) + err := d.isReorgRunnable(d.ctx, false) if err != nil { result.err = err return result @@ -677,8 +677,7 @@ func (dc *ddlCtx) runAddIndexInLocalIngestMode( t table.PhysicalTable, reorgInfo *reorgInfo, ) error { - // TODO(tangenta): support adjust worker count dynamically. - if err := dc.isReorgRunnable(reorgInfo.Job.ID, false); err != nil { + if err := dc.isReorgRunnable(ctx, false); err != nil { return errors.Trace(err) } job := reorgInfo.Job @@ -921,7 +920,7 @@ func (dc *ddlCtx) writePhysicalTableRecord( ) (err error) { startKey, endKey := reorgInfo.StartKey, reorgInfo.EndKey - if err := dc.isReorgRunnable(reorgInfo.Job.ID, false); err != nil { + if err := dc.isReorgRunnable(ctx, false); err != nil { return errors.Trace(err) } defer func() { diff --git a/pkg/ddl/ddl_test.go b/pkg/ddl/ddl_test.go index 4fd597a1bb099..8ac57e2da507d 100644 --- a/pkg/ddl/ddl_test.go +++ b/pkg/ddl/ddl_test.go @@ -52,11 +52,6 @@ type DDLForTest interface { RemoveReorgCtx(id int64) } -// IsReorgCanceled exports for testing. -func (rc *reorgCtx) IsReorgCanceled() bool { - return rc.isReorgCanceled() -} - // NewReorgCtx exports for testing. func (d *ddl) NewReorgCtx(jobID int64, rowCount int64) *reorgCtx { return d.newReorgCtx(jobID, rowCount) diff --git a/pkg/ddl/index.go b/pkg/ddl/index.go index 83f3a1ebc272c..c51888b17dfd5 100644 --- a/pkg/ddl/index.go +++ b/pkg/ddl/index.go @@ -2388,7 +2388,7 @@ func (w *worker) addTableIndex( // TODO: Support typeAddIndexMergeTmpWorker. if reorgInfo.ReorgMeta.IsDistReorg && !reorgInfo.mergingTmpIdx { if reorgInfo.ReorgMeta.ReorgTp == model.ReorgTypeLitMerge { - err := w.executeDistTask(t, reorgInfo) + err := w.executeDistTask(ctx, t, reorgInfo) if err != nil { return err } @@ -2469,7 +2469,7 @@ func checkDuplicateForUniqueIndex(ctx context.Context, t table.Table, reorgInfo return nil } -func (w *worker) executeDistTask(t table.Table, reorgInfo *reorgInfo) error { +func (w *worker) executeDistTask(stepCtx context.Context, t table.Table, reorgInfo *reorgInfo) error { if reorgInfo.mergingTmpIdx { return errors.New("do not support merge index") } @@ -2520,7 +2520,7 @@ func (w *worker) executeDistTask(t table.Table, reorgInfo *reorgInfo) error { return err } err = handle.WaitTaskDoneOrPaused(ctx, task.ID) - if err := w.isReorgRunnable(reorgInfo.Job.ID, true); err != nil { + if err := w.isReorgRunnable(stepCtx, true); err != nil { if dbterror.ErrPausedDDLJob.Equal(err) { logutil.DDLLogger().Warn("job paused by user", zap.Error(err)) return dbterror.ErrPausedDDLJob.GenWithStackByArgs(reorgInfo.Job.ID) @@ -2557,7 +2557,7 @@ func (w *worker) executeDistTask(t table.Table, reorgInfo *reorgInfo) error { defer close(done) err := submitAndWaitTask(ctx, taskKey, taskType, concurrency, reorgInfo.ReorgMeta.TargetScope, metaData) failpoint.InjectCall("pauseAfterDistTaskFinished") - if err := w.isReorgRunnable(reorgInfo.Job.ID, true); err != nil { + if err := w.isReorgRunnable(stepCtx, true); err != nil { if dbterror.ErrPausedDDLJob.Equal(err) { logutil.DDLLogger().Warn("job paused by user", zap.Error(err)) return dbterror.ErrPausedDDLJob.GenWithStackByArgs(reorgInfo.Job.ID) @@ -2578,7 +2578,7 @@ func (w *worker) executeDistTask(t table.Table, reorgInfo *reorgInfo) error { w.updateDistTaskRowCount(taskKey, reorgInfo.Job.ID) return nil case <-checkFinishTk.C: - if err = w.isReorgRunnable(reorgInfo.Job.ID, true); err != nil { + if err = w.isReorgRunnable(stepCtx, true); err != nil { if dbterror.ErrPausedDDLJob.Equal(err) { if err = handle.PauseTask(w.workCtx, taskKey); err != nil { logutil.DDLLogger().Error("pause task error", zap.String("task_key", taskKey), zap.Error(err)) diff --git a/pkg/ddl/job_worker.go b/pkg/ddl/job_worker.go index 5a5c45f127d36..95ac00cd87889 100644 --- a/pkg/ddl/job_worker.go +++ b/pkg/ddl/job_worker.go @@ -859,7 +859,7 @@ func (w *worker) runOneJobStep( logutil.DDLLogger().Info("job is paused", zap.Int64("job_id", job.ID), zap.Stringer("state", latestJob.State)) - cancelStep(dbterror.ErrPausedDDLJob) + cancelStep(dbterror.ErrPausedDDLJob.FastGenByArgs(job.ID)) return case model.JobStateDone, model.JobStateSynced: return diff --git a/pkg/ddl/job_worker_test.go b/pkg/ddl/job_worker_test.go index 5d64cd3577206..d99e3f44b3fa0 100644 --- a/pkg/ddl/job_worker_test.go +++ b/pkg/ddl/job_worker_test.go @@ -252,27 +252,3 @@ func TestJobNeedGC(t *testing.T) { }}} require.True(t, ddl.JobNeedGC(job)) } - -func TestUsingReorgCtx(t *testing.T) { - _, domain := testkit.CreateMockStoreAndDomainWithSchemaLease(t, testLease) - d := domain.DDL() - - wg := util.WaitGroupWrapper{} - wg.Run(func() { - jobID := int64(1) - for i := 0; i < 500; i++ { - d.(ddl.DDLForTest).NewReorgCtx(jobID, 0) - d.(ddl.DDLForTest).GetReorgCtx(jobID).IsReorgCanceled() - d.(ddl.DDLForTest).RemoveReorgCtx(jobID) - } - }) - wg.Run(func() { - jobID := int64(1) - for i := 0; i < 500; i++ { - d.(ddl.DDLForTest).NewReorgCtx(jobID, 0) - d.(ddl.DDLForTest).GetReorgCtx(jobID).IsReorgCanceled() - d.(ddl.DDLForTest).RemoveReorgCtx(jobID) - } - }) - wg.Wait() -} diff --git a/pkg/ddl/reorg.go b/pkg/ddl/reorg.go index e4f388cd9d28f..ed7e28852c52d 100644 --- a/pkg/ddl/reorg.go +++ b/pkg/ddl/reorg.go @@ -74,7 +74,6 @@ type reorgCtx struct { doneCh chan reorgFnResult // rowCount is used to simulate a job's row count. rowCount int64 - jobState model.JobState mu struct { sync.Mutex @@ -275,20 +274,6 @@ func reorgTimeZoneWithTzLoc(tzLoc *model.TimeZoneLocation) (*time.Location, erro return tzLoc.GetLocation() } -func (rc *reorgCtx) notifyJobState(state model.JobState) { - atomic.StoreInt32((*int32)(&rc.jobState), int32(state)) -} - -func (rc *reorgCtx) isReorgCanceled() bool { - s := atomic.LoadInt32((*int32)(&rc.jobState)) - return int32(model.JobStateCancelled) == s || int32(model.JobStateCancelling) == s -} - -func (rc *reorgCtx) isReorgPaused() bool { - s := atomic.LoadInt32((*int32)(&rc.jobState)) - return int32(model.JobStatePaused) == s || int32(model.JobStatePausing) == s -} - func (rc *reorgCtx) setRowCount(count int64) { atomic.StoreInt64(&rc.rowCount, count) } @@ -566,28 +551,14 @@ func getTableTotalCount(w *worker, tblInfo *model.TableInfo) int64 { return rows[0].GetInt64(0) } -func (dc *ddlCtx) isReorgCancelled(jobID int64) bool { - return dc.getReorgCtx(jobID).isReorgCanceled() -} -func (dc *ddlCtx) isReorgPaused(jobID int64) bool { - return dc.getReorgCtx(jobID).isReorgPaused() -} - -func (dc *ddlCtx) isReorgRunnable(jobID int64, isDistReorg bool) error { +func (dc *ddlCtx) isReorgRunnable(ctx context.Context, isDistReorg bool) error { if dc.ctx.Err() != nil { // Worker is closed. So it can't do the reorganization. return dbterror.ErrInvalidWorker.GenWithStack("worker is closed") } - // TODO(lance6716): check ctx.Err? - if dc.isReorgCancelled(jobID) { - // Job is cancelled. So it can't be done. - return dbterror.ErrCancelledDDLJob - } - - if dc.isReorgPaused(jobID) { - logutil.DDLLogger().Warn("job paused by user", zap.String("ID", dc.uuid)) - return dbterror.ErrPausedDDLJob.GenWithStackByArgs(jobID) + if ctx.Err() != nil { + return context.Cause(ctx) } // If isDistReorg is true, we needn't check if it is owner. diff --git a/tests/realtikvtest/addindextest1/disttask_test.go b/tests/realtikvtest/addindextest1/disttask_test.go index 8e5b008c4f7eb..5d024ff04b26b 100644 --- a/tests/realtikvtest/addindextest1/disttask_test.go +++ b/tests/realtikvtest/addindextest1/disttask_test.go @@ -17,9 +17,11 @@ package addindextest import ( "context" "fmt" + "strings" "sync" "sync/atomic" "testing" + "time" "github.com/pingcap/failpoint" "github.com/pingcap/tidb/pkg/config" @@ -109,7 +111,7 @@ func TestAddIndexDistBasic(t *testing.T) { tk.MustExec(`set global tidb_enable_dist_task=0;`) } -func TestAddIndexDistCancel(t *testing.T) { +func TestAddIndexDistCancelWithPartition(t *testing.T) { store := realtikvtest.CreateMockStoreAndSetup(t) if store.Name() != "TiKV" { t.Skip("TiKV store only") @@ -150,6 +152,69 @@ func TestAddIndexDistCancel(t *testing.T) { tk.MustExec(`set global tidb_enable_dist_task=0;`) } +func TestAddIndexDistCancel(t *testing.T) { + store := realtikvtest.CreateMockStoreAndSetup(t) + tk := testkit.NewTestKit(t, store) + tk.MustExec("drop database if exists addindexlit;") + tk.MustExec("create database addindexlit;") + tk.MustExec("use addindexlit;") + tk.MustExec(`set global tidb_ddl_enable_fast_reorg=on;`) + tk.MustExec("create table t (a int, b int);") + tk.MustExec("insert into t values (1, 1), (2, 2), (3, 3);") + + tk2 := testkit.NewTestKit(t, store) + tk2.MustExec("use addindexlit;") + tk2.MustExec(`set global tidb_ddl_enable_fast_reorg=on;`) + tk2.MustExec("create table t2 (a int, b int);") + tk2.MustExec("insert into t2 values (1, 1), (2, 2), (3, 3);") + + wg := &sync.WaitGroup{} + wg.Add(2) + go func() { + tk.MustExec("alter table t add index idx(a);") + wg.Done() + }() + go func() { + tk2.MustExec("alter table t2 add index idx_b(b);") + wg.Done() + }() + wg.Wait() + rows := tk.MustQuery("admin show ddl jobs 2;").Rows() + require.Len(t, rows, 2) + require.True(t, strings.Contains(rows[0][12].(string) /* comments */, "ingest")) + require.True(t, strings.Contains(rows[1][12].(string) /* comments */, "ingest")) + require.Equal(t, rows[0][7].(string) /* row_count */, "3") + require.Equal(t, rows[1][7].(string) /* row_count */, "3") + + tk.MustExec("set @@global.tidb_enable_dist_task = 1;") + + // test cancel is timely + enter := make(chan struct{}) + testfailpoint.EnableCall( + t, + "github.com/pingcap/tidb/pkg/lightning/backend/local/beforeExecuteRegionJob", + func(ctx context.Context) { + close(enter) + select { + case <-time.After(time.Second * 30): + case <-ctx.Done(): + } + }) + wg.Add(1) + go func() { + defer wg.Done() + err := tk2.ExecToErr("alter table t add index idx_ba(b, a);") + require.ErrorContains(t, err, "Cancelled DDL job") + }() + <-enter + jobID := tk.MustQuery("admin show ddl jobs 1;").Rows()[0][0].(string) + now := time.Now() + tk.MustExec("admin cancel ddl jobs " + jobID) + wg.Wait() + // cancel should be timely + require.Less(t, time.Since(now).Seconds(), 20.0) +} + func TestAddIndexDistPauseAndResume(t *testing.T) { t.Skip("unstable") // TODO(tangenta): fix this unstable test store := realtikvtest.CreateMockStoreAndSetup(t)