Skip to content

Commit

Permalink
ddl: check context done in isReorgRunnable function (#57813) (#57820)
Browse files Browse the repository at this point in the history
close #57325
  • Loading branch information
ti-chi-bot authored Nov 29, 2024
1 parent 43b21d4 commit e49a151
Show file tree
Hide file tree
Showing 7 changed files with 78 additions and 72 deletions.
7 changes: 3 additions & 4 deletions pkg/ddl/backfilling.go
Original file line number Diff line number Diff line change
Expand Up @@ -327,7 +327,7 @@ func (w *backfillWorker) handleBackfillTask(d *ddlCtx, task *reorgBackfillTask,
// we will never cancel the job once there is panic in bf.BackfillData.
// Because reorgRecordTask may run a long time,
// we should check whether this ddl job is still runnable.
err := d.isReorgRunnable(jobID, false)
err := d.isReorgRunnable(d.ctx, false)
if err != nil {
result.err = err
return result
Expand Down Expand Up @@ -677,8 +677,7 @@ func (dc *ddlCtx) runAddIndexInLocalIngestMode(
t table.PhysicalTable,
reorgInfo *reorgInfo,
) error {
// TODO(tangenta): support adjust worker count dynamically.
if err := dc.isReorgRunnable(reorgInfo.Job.ID, false); err != nil {
if err := dc.isReorgRunnable(ctx, false); err != nil {
return errors.Trace(err)
}
job := reorgInfo.Job
Expand Down Expand Up @@ -921,7 +920,7 @@ func (dc *ddlCtx) writePhysicalTableRecord(
) (err error) {
startKey, endKey := reorgInfo.StartKey, reorgInfo.EndKey

if err := dc.isReorgRunnable(reorgInfo.Job.ID, false); err != nil {
if err := dc.isReorgRunnable(ctx, false); err != nil {
return errors.Trace(err)
}
defer func() {
Expand Down
5 changes: 0 additions & 5 deletions pkg/ddl/ddl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,11 +52,6 @@ type DDLForTest interface {
RemoveReorgCtx(id int64)
}

// IsReorgCanceled exports for testing.
func (rc *reorgCtx) IsReorgCanceled() bool {
return rc.isReorgCanceled()
}

// NewReorgCtx exports for testing.
func (d *ddl) NewReorgCtx(jobID int64, rowCount int64) *reorgCtx {
return d.newReorgCtx(jobID, rowCount)
Expand Down
10 changes: 5 additions & 5 deletions pkg/ddl/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -2388,7 +2388,7 @@ func (w *worker) addTableIndex(
// TODO: Support typeAddIndexMergeTmpWorker.
if reorgInfo.ReorgMeta.IsDistReorg && !reorgInfo.mergingTmpIdx {
if reorgInfo.ReorgMeta.ReorgTp == model.ReorgTypeLitMerge {
err := w.executeDistTask(t, reorgInfo)
err := w.executeDistTask(ctx, t, reorgInfo)
if err != nil {
return err
}
Expand Down Expand Up @@ -2469,7 +2469,7 @@ func checkDuplicateForUniqueIndex(ctx context.Context, t table.Table, reorgInfo
return nil
}

func (w *worker) executeDistTask(t table.Table, reorgInfo *reorgInfo) error {
func (w *worker) executeDistTask(stepCtx context.Context, t table.Table, reorgInfo *reorgInfo) error {
if reorgInfo.mergingTmpIdx {
return errors.New("do not support merge index")
}
Expand Down Expand Up @@ -2520,7 +2520,7 @@ func (w *worker) executeDistTask(t table.Table, reorgInfo *reorgInfo) error {
return err
}
err = handle.WaitTaskDoneOrPaused(ctx, task.ID)
if err := w.isReorgRunnable(reorgInfo.Job.ID, true); err != nil {
if err := w.isReorgRunnable(stepCtx, true); err != nil {
if dbterror.ErrPausedDDLJob.Equal(err) {
logutil.DDLLogger().Warn("job paused by user", zap.Error(err))
return dbterror.ErrPausedDDLJob.GenWithStackByArgs(reorgInfo.Job.ID)
Expand Down Expand Up @@ -2557,7 +2557,7 @@ func (w *worker) executeDistTask(t table.Table, reorgInfo *reorgInfo) error {
defer close(done)
err := submitAndWaitTask(ctx, taskKey, taskType, concurrency, reorgInfo.ReorgMeta.TargetScope, metaData)
failpoint.InjectCall("pauseAfterDistTaskFinished")
if err := w.isReorgRunnable(reorgInfo.Job.ID, true); err != nil {
if err := w.isReorgRunnable(stepCtx, true); err != nil {
if dbterror.ErrPausedDDLJob.Equal(err) {
logutil.DDLLogger().Warn("job paused by user", zap.Error(err))
return dbterror.ErrPausedDDLJob.GenWithStackByArgs(reorgInfo.Job.ID)
Expand All @@ -2578,7 +2578,7 @@ func (w *worker) executeDistTask(t table.Table, reorgInfo *reorgInfo) error {
w.updateDistTaskRowCount(taskKey, reorgInfo.Job.ID)
return nil
case <-checkFinishTk.C:
if err = w.isReorgRunnable(reorgInfo.Job.ID, true); err != nil {
if err = w.isReorgRunnable(stepCtx, true); err != nil {
if dbterror.ErrPausedDDLJob.Equal(err) {
if err = handle.PauseTask(w.workCtx, taskKey); err != nil {
logutil.DDLLogger().Error("pause task error", zap.String("task_key", taskKey), zap.Error(err))
Expand Down
2 changes: 1 addition & 1 deletion pkg/ddl/job_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -859,7 +859,7 @@ func (w *worker) runOneJobStep(
logutil.DDLLogger().Info("job is paused",
zap.Int64("job_id", job.ID),
zap.Stringer("state", latestJob.State))
cancelStep(dbterror.ErrPausedDDLJob)
cancelStep(dbterror.ErrPausedDDLJob.FastGenByArgs(job.ID))
return
case model.JobStateDone, model.JobStateSynced:
return
Expand Down
24 changes: 0 additions & 24 deletions pkg/ddl/job_worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -252,27 +252,3 @@ func TestJobNeedGC(t *testing.T) {
}}}
require.True(t, ddl.JobNeedGC(job))
}

func TestUsingReorgCtx(t *testing.T) {
_, domain := testkit.CreateMockStoreAndDomainWithSchemaLease(t, testLease)
d := domain.DDL()

wg := util.WaitGroupWrapper{}
wg.Run(func() {
jobID := int64(1)
for i := 0; i < 500; i++ {
d.(ddl.DDLForTest).NewReorgCtx(jobID, 0)
d.(ddl.DDLForTest).GetReorgCtx(jobID).IsReorgCanceled()
d.(ddl.DDLForTest).RemoveReorgCtx(jobID)
}
})
wg.Run(func() {
jobID := int64(1)
for i := 0; i < 500; i++ {
d.(ddl.DDLForTest).NewReorgCtx(jobID, 0)
d.(ddl.DDLForTest).GetReorgCtx(jobID).IsReorgCanceled()
d.(ddl.DDLForTest).RemoveReorgCtx(jobID)
}
})
wg.Wait()
}
35 changes: 3 additions & 32 deletions pkg/ddl/reorg.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,6 @@ type reorgCtx struct {
doneCh chan reorgFnResult
// rowCount is used to simulate a job's row count.
rowCount int64
jobState model.JobState

mu struct {
sync.Mutex
Expand Down Expand Up @@ -275,20 +274,6 @@ func reorgTimeZoneWithTzLoc(tzLoc *model.TimeZoneLocation) (*time.Location, erro
return tzLoc.GetLocation()
}

func (rc *reorgCtx) notifyJobState(state model.JobState) {
atomic.StoreInt32((*int32)(&rc.jobState), int32(state))
}

func (rc *reorgCtx) isReorgCanceled() bool {
s := atomic.LoadInt32((*int32)(&rc.jobState))
return int32(model.JobStateCancelled) == s || int32(model.JobStateCancelling) == s
}

func (rc *reorgCtx) isReorgPaused() bool {
s := atomic.LoadInt32((*int32)(&rc.jobState))
return int32(model.JobStatePaused) == s || int32(model.JobStatePausing) == s
}

func (rc *reorgCtx) setRowCount(count int64) {
atomic.StoreInt64(&rc.rowCount, count)
}
Expand Down Expand Up @@ -566,28 +551,14 @@ func getTableTotalCount(w *worker, tblInfo *model.TableInfo) int64 {
return rows[0].GetInt64(0)
}

func (dc *ddlCtx) isReorgCancelled(jobID int64) bool {
return dc.getReorgCtx(jobID).isReorgCanceled()
}
func (dc *ddlCtx) isReorgPaused(jobID int64) bool {
return dc.getReorgCtx(jobID).isReorgPaused()
}

func (dc *ddlCtx) isReorgRunnable(jobID int64, isDistReorg bool) error {
func (dc *ddlCtx) isReorgRunnable(ctx context.Context, isDistReorg bool) error {
if dc.ctx.Err() != nil {
// Worker is closed. So it can't do the reorganization.
return dbterror.ErrInvalidWorker.GenWithStack("worker is closed")
}

// TODO(lance6716): check ctx.Err?
if dc.isReorgCancelled(jobID) {
// Job is cancelled. So it can't be done.
return dbterror.ErrCancelledDDLJob
}

if dc.isReorgPaused(jobID) {
logutil.DDLLogger().Warn("job paused by user", zap.String("ID", dc.uuid))
return dbterror.ErrPausedDDLJob.GenWithStackByArgs(jobID)
if ctx.Err() != nil {
return context.Cause(ctx)
}

// If isDistReorg is true, we needn't check if it is owner.
Expand Down
67 changes: 66 additions & 1 deletion tests/realtikvtest/addindextest1/disttask_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,11 @@ package addindextest
import (
"context"
"fmt"
"strings"
"sync"
"sync/atomic"
"testing"
"time"

"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/pkg/config"
Expand Down Expand Up @@ -109,7 +111,7 @@ func TestAddIndexDistBasic(t *testing.T) {
tk.MustExec(`set global tidb_enable_dist_task=0;`)
}

func TestAddIndexDistCancel(t *testing.T) {
func TestAddIndexDistCancelWithPartition(t *testing.T) {
store := realtikvtest.CreateMockStoreAndSetup(t)
if store.Name() != "TiKV" {
t.Skip("TiKV store only")
Expand Down Expand Up @@ -150,6 +152,69 @@ func TestAddIndexDistCancel(t *testing.T) {
tk.MustExec(`set global tidb_enable_dist_task=0;`)
}

func TestAddIndexDistCancel(t *testing.T) {
store := realtikvtest.CreateMockStoreAndSetup(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec("drop database if exists addindexlit;")
tk.MustExec("create database addindexlit;")
tk.MustExec("use addindexlit;")
tk.MustExec(`set global tidb_ddl_enable_fast_reorg=on;`)
tk.MustExec("create table t (a int, b int);")
tk.MustExec("insert into t values (1, 1), (2, 2), (3, 3);")

tk2 := testkit.NewTestKit(t, store)
tk2.MustExec("use addindexlit;")
tk2.MustExec(`set global tidb_ddl_enable_fast_reorg=on;`)
tk2.MustExec("create table t2 (a int, b int);")
tk2.MustExec("insert into t2 values (1, 1), (2, 2), (3, 3);")

wg := &sync.WaitGroup{}
wg.Add(2)
go func() {
tk.MustExec("alter table t add index idx(a);")
wg.Done()
}()
go func() {
tk2.MustExec("alter table t2 add index idx_b(b);")
wg.Done()
}()
wg.Wait()
rows := tk.MustQuery("admin show ddl jobs 2;").Rows()
require.Len(t, rows, 2)
require.True(t, strings.Contains(rows[0][12].(string) /* comments */, "ingest"))
require.True(t, strings.Contains(rows[1][12].(string) /* comments */, "ingest"))
require.Equal(t, rows[0][7].(string) /* row_count */, "3")
require.Equal(t, rows[1][7].(string) /* row_count */, "3")

tk.MustExec("set @@global.tidb_enable_dist_task = 1;")

// test cancel is timely
enter := make(chan struct{})
testfailpoint.EnableCall(
t,
"github.com/pingcap/tidb/pkg/lightning/backend/local/beforeExecuteRegionJob",
func(ctx context.Context) {
close(enter)
select {
case <-time.After(time.Second * 30):
case <-ctx.Done():
}
})
wg.Add(1)
go func() {
defer wg.Done()
err := tk2.ExecToErr("alter table t add index idx_ba(b, a);")
require.ErrorContains(t, err, "Cancelled DDL job")
}()
<-enter
jobID := tk.MustQuery("admin show ddl jobs 1;").Rows()[0][0].(string)
now := time.Now()
tk.MustExec("admin cancel ddl jobs " + jobID)
wg.Wait()
// cancel should be timely
require.Less(t, time.Since(now).Seconds(), 20.0)
}

func TestAddIndexDistPauseAndResume(t *testing.T) {
t.Skip("unstable") // TODO(tangenta): fix this unstable test
store := realtikvtest.CreateMockStoreAndSetup(t)
Expand Down

0 comments on commit e49a151

Please sign in to comment.