Skip to content

Commit

Permalink
ddl: make ddl scheduler prefers pick up already processing job (#38898)
Browse files Browse the repository at this point in the history
close #38900
  • Loading branch information
xiongjiwei authored Nov 16, 2022
1 parent ba35a37 commit 154f027
Show file tree
Hide file tree
Showing 2 changed files with 66 additions and 1 deletion.
2 changes: 1 addition & 1 deletion ddl/job_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ func (dc *ddlCtx) excludeJobIDs() string {
}

const (
getJobSQL = "select job_meta, processing from mysql.tidb_ddl_job where job_id in (select min(job_id) from mysql.tidb_ddl_job group by schema_ids, table_ids) and %s reorg %s order by processing desc, job_id"
getJobSQL = "select job_meta, processing from mysql.tidb_ddl_job where job_id in (select min(job_id) from mysql.tidb_ddl_job group by schema_ids, table_ids, processing order by processing desc limit 1) and %s reorg %s order by processing desc, job_id"
)

type jobType int
Expand Down
65 changes: 65 additions & 0 deletions ddl/job_table_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,3 +177,68 @@ func check(t *testing.T, record []int64, ids ...int64) {
}
}
}

func TestAlwaysChoiceProcessingJob(t *testing.T) {
if !variable.EnableConcurrentDDL.Load() {
t.Skipf("test requires concurrent ddl")
}
store, dom := testkit.CreateMockStoreAndDomain(t)

d := dom.DDL()

tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
tk.MustExec("create table t(a int, b int)")

ddlJobs := []string{
"alter table t add index idx(a)",
"alter table t add index idx(b)",
}

hook := &ddl.TestDDLCallback{}
var wg util.WaitGroupWrapper
wg.Add(1)
var once sync.Once
var idxa, idxb int64
hook.OnGetJobBeforeExported = func(jobType string) {
once.Do(func() {
var jobs []*model.Job
for i, job := range ddlJobs {
wg.Run(func() {
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
recordSet, _ := tk.Exec(job)
if recordSet != nil {
require.NoError(t, recordSet.Close())
}
})
for {
time.Sleep(time.Millisecond * 100)
var err error
jobs, err = ddl.GetAllDDLJobs(testkit.NewTestKit(t, store).Session(), nil)
require.NoError(t, err)
if len(jobs) == i+1 {
break
}
}
}
idxa = jobs[0].ID
idxb = jobs[1].ID
require.Greater(t, idxb, idxa)
tk := testkit.NewTestKit(t, store)
tk.MustExec("update mysql.tidb_ddl_job set processing = 1 where job_id = ?", idxb)
wg.Done()
})
}

record := make([]int64, 0, 16)
hook.OnGetJobAfterExported = func(jobType string, job *model.Job) {
// record the job schedule order
record = append(record, job.ID)
}

d.SetHook(hook)
wg.Wait()

check(t, record, idxb, idxa)
}

0 comments on commit 154f027

Please sign in to comment.