Skip to content

Commit

Permalink
test: fix flaky test TestParallelDDL (#53636)
Browse files Browse the repository at this point in the history
close #39222
  • Loading branch information
D3Hunter authored May 29, 2024
1 parent 6efce0f commit c9c865c
Show file tree
Hide file tree
Showing 3 changed files with 36 additions and 70 deletions.
1 change: 1 addition & 0 deletions pkg/ddl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -311,6 +311,7 @@ go_test(
"//pkg/tablecodec",
"//pkg/testkit",
"//pkg/testkit/external",
"//pkg/testkit/testfailpoint",
"//pkg/testkit/testsetup",
"//pkg/testkit/testutil",
"//pkg/types",
Expand Down
1 change: 1 addition & 0 deletions pkg/ddl/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -1201,6 +1201,7 @@ func (d *ddl) DoDDLJob(ctx sessionctx.Context, job *model.Job) error {
// The transaction of enqueuing job is failed.
return errors.Trace(err)
}
failpoint.InjectCall("waitJobSubmitted")

sessVars := ctx.GetSessionVars()
sessVars.StmtCtx.IsDDLJobInQueue = true
Expand Down
104 changes: 34 additions & 70 deletions pkg/ddl/ddl_worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package ddl_test
import (
"strconv"
"sync"
"sync/atomic"
"testing"
"time"

Expand All @@ -27,6 +28,7 @@ import (
"github.com/pingcap/tidb/pkg/parser/model"
"github.com/pingcap/tidb/pkg/sessionctx"
"github.com/pingcap/tidb/pkg/testkit"
"github.com/pingcap/tidb/pkg/testkit/testfailpoint"
"github.com/pingcap/tidb/pkg/util"
"github.com/stretchr/testify/require"
)
Expand Down Expand Up @@ -168,76 +170,38 @@ func TestParallelDDL(t *testing.T) {

seqIDs := make([]int, 11)

wg.Run(func() {
tk := testkit.NewTestKit(t, store)
tk.MustExec("alter table test_parallel_ddl_1.t1 add index db1_idx1(c1)")
rs := tk.MustQuery("select json_extract(@@tidb_last_ddl_info, '$.seq_num')")
seqIDs[0], _ = strconv.Atoi(rs.Rows()[0][0].(string))
})
time.Sleep(5 * time.Millisecond)
wg.Run(func() {
tk := testkit.NewTestKit(t, store)
tk.MustExec("alter table test_parallel_ddl_1.t1 add column c3 int")
rs := tk.MustQuery("select json_extract(@@tidb_last_ddl_info, '$.seq_num')")
seqIDs[1], _ = strconv.Atoi(rs.Rows()[0][0].(string))
})
time.Sleep(5 * time.Millisecond)
wg.Run(func() {
tk := testkit.NewTestKit(t, store)
tk.MustExec("alter table test_parallel_ddl_1.t1 add index db1_idxx(c1)")
rs := tk.MustQuery("select json_extract(@@tidb_last_ddl_info, '$.seq_num')")
seqIDs[2], _ = strconv.Atoi(rs.Rows()[0][0].(string))
})
time.Sleep(5 * time.Millisecond)
wg.Run(func() {
tk := testkit.NewTestKit(t, store)
tk.MustExec("alter table test_parallel_ddl_1.t2 drop column c3")
rs := tk.MustQuery("select json_extract(@@tidb_last_ddl_info, '$.seq_num')")
seqIDs[3], _ = strconv.Atoi(rs.Rows()[0][0].(string))
})
time.Sleep(5 * time.Millisecond)
wg.Run(func() {
tk := testkit.NewTestKit(t, store)
tk.MustExec("alter table test_parallel_ddl_1.t1 drop index db1_idx2")
rs := tk.MustQuery("select json_extract(@@tidb_last_ddl_info, '$.seq_num')")
seqIDs[4], _ = strconv.Atoi(rs.Rows()[0][0].(string))
})
time.Sleep(5 * time.Millisecond)
wg.Run(func() {
tk := testkit.NewTestKit(t, store)
tk.MustExec("alter table test_parallel_ddl_1.t2 add index db1_idx2(c2)")
rs := tk.MustQuery("select json_extract(@@tidb_last_ddl_info, '$.seq_num')")
seqIDs[5], _ = strconv.Atoi(rs.Rows()[0][0].(string))
})
time.Sleep(5 * time.Millisecond)
wg.Run(func() {
tk := testkit.NewTestKit(t, store)
tk.MustExec("alter table test_parallel_ddl_2.t3 drop column c4")
rs := tk.MustQuery("select json_extract(@@tidb_last_ddl_info, '$.seq_num')")
seqIDs[6], _ = strconv.Atoi(rs.Rows()[0][0].(string))
})
time.Sleep(5 * time.Millisecond)
wg.Run(func() {
tk := testkit.NewTestKit(t, store)
tk.MustExec("alter table test_parallel_ddl_2.t3 auto_id_cache 1024")
rs := tk.MustQuery("select json_extract(@@tidb_last_ddl_info, '$.seq_num')")
seqIDs[7], _ = strconv.Atoi(rs.Rows()[0][0].(string))
})
time.Sleep(5 * time.Millisecond)
wg.Run(func() {
tk := testkit.NewTestKit(t, store)
tk.MustExec("alter table test_parallel_ddl_1.t1 add index db1_idx3(c2)")
rs := tk.MustQuery("select json_extract(@@tidb_last_ddl_info, '$.seq_num')")
seqIDs[8], _ = strconv.Atoi(rs.Rows()[0][0].(string))
})
time.Sleep(5 * time.Millisecond)
wg.Run(func() {
tk := testkit.NewTestKit(t, store)
tk.MustExec("drop database test_parallel_ddl_2")
rs := tk.MustQuery("select json_extract(@@tidb_last_ddl_info, '$.seq_num')")
seqIDs[9], _ = strconv.Atoi(rs.Rows()[0][0].(string))
})
time.Sleep(5 * time.Millisecond)
var enable atomic.Bool
ch := make(chan struct{})
testfailpoint.EnableCall(t, "github.com/pingcap/tidb/pkg/ddl/waitJobSubmitted",
func() {
if enable.Load() {
<-ch
}
},
)
enable.Store(true)
for i, sql := range []string{
"alter table test_parallel_ddl_1.t1 add index db1_idx1(c1)",
"alter table test_parallel_ddl_1.t1 add column c3 int",
"alter table test_parallel_ddl_1.t1 add index db1_idxx(c1)",
"alter table test_parallel_ddl_1.t2 drop column c3",
"alter table test_parallel_ddl_1.t1 drop index db1_idx2",
"alter table test_parallel_ddl_1.t2 add index db1_idx2(c2)",
"alter table test_parallel_ddl_2.t3 drop column c4",
"alter table test_parallel_ddl_2.t3 auto_id_cache 1024",
"alter table test_parallel_ddl_1.t1 add index db1_idx3(c2)",
"drop database test_parallel_ddl_2",
} {
idx := i
wg.Run(func() {
tk2 := testkit.NewTestKit(t, store)
tk2.MustExec(sql)
rs := tk2.MustQuery("select json_extract(@@tidb_last_ddl_info, '$.seq_num')")
seqIDs[idx], _ = strconv.Atoi(rs.Rows()[0][0].(string))
})
ch <- struct{}{}
}
enable.Store(false)
wg.Run(func() {
tk := testkit.NewTestKit(t, store)
_ = tk.ExecToErr("alter table test_parallel_ddl_2.t3 add index db3_idx1(c2)")
Expand Down

0 comments on commit c9c865c

Please sign in to comment.