From c9c865ced461b01b08095c760b705642c7f6c838 Mon Sep 17 00:00:00 2001 From: D3Hunter Date: Wed, 29 May 2024 15:58:21 +0800 Subject: [PATCH] test: fix flaky test TestParallelDDL (#53636) close pingcap/tidb#39222 --- pkg/ddl/BUILD.bazel | 1 + pkg/ddl/ddl.go | 1 + pkg/ddl/ddl_worker_test.go | 104 ++++++++++++------------------------- 3 files changed, 36 insertions(+), 70 deletions(-) diff --git a/pkg/ddl/BUILD.bazel b/pkg/ddl/BUILD.bazel index 2cdc6965d747a..d1651a502a240 100644 --- a/pkg/ddl/BUILD.bazel +++ b/pkg/ddl/BUILD.bazel @@ -311,6 +311,7 @@ go_test( "//pkg/tablecodec", "//pkg/testkit", "//pkg/testkit/external", + "//pkg/testkit/testfailpoint", "//pkg/testkit/testsetup", "//pkg/testkit/testutil", "//pkg/types", diff --git a/pkg/ddl/ddl.go b/pkg/ddl/ddl.go index ee9eade3f54cc..b1c6ae786a2ef 100644 --- a/pkg/ddl/ddl.go +++ b/pkg/ddl/ddl.go @@ -1201,6 +1201,7 @@ func (d *ddl) DoDDLJob(ctx sessionctx.Context, job *model.Job) error { // The transaction of enqueuing job is failed. return errors.Trace(err) } + failpoint.InjectCall("waitJobSubmitted") sessVars := ctx.GetSessionVars() sessVars.StmtCtx.IsDDLJobInQueue = true diff --git a/pkg/ddl/ddl_worker_test.go b/pkg/ddl/ddl_worker_test.go index bfcbdf866d2e8..9ea7e2bb8d6c1 100644 --- a/pkg/ddl/ddl_worker_test.go +++ b/pkg/ddl/ddl_worker_test.go @@ -17,6 +17,7 @@ package ddl_test import ( "strconv" "sync" + "sync/atomic" "testing" "time" @@ -27,6 +28,7 @@ import ( "github.com/pingcap/tidb/pkg/parser/model" "github.com/pingcap/tidb/pkg/sessionctx" "github.com/pingcap/tidb/pkg/testkit" + "github.com/pingcap/tidb/pkg/testkit/testfailpoint" "github.com/pingcap/tidb/pkg/util" "github.com/stretchr/testify/require" ) @@ -168,76 +170,38 @@ func TestParallelDDL(t *testing.T) { seqIDs := make([]int, 11) - wg.Run(func() { - tk := testkit.NewTestKit(t, store) - tk.MustExec("alter table test_parallel_ddl_1.t1 add index db1_idx1(c1)") - rs := tk.MustQuery("select json_extract(@@tidb_last_ddl_info, '$.seq_num')") - seqIDs[0], _ = strconv.Atoi(rs.Rows()[0][0].(string)) - }) - time.Sleep(5 * time.Millisecond) - wg.Run(func() { - tk := testkit.NewTestKit(t, store) - tk.MustExec("alter table test_parallel_ddl_1.t1 add column c3 int") - rs := tk.MustQuery("select json_extract(@@tidb_last_ddl_info, '$.seq_num')") - seqIDs[1], _ = strconv.Atoi(rs.Rows()[0][0].(string)) - }) - time.Sleep(5 * time.Millisecond) - wg.Run(func() { - tk := testkit.NewTestKit(t, store) - tk.MustExec("alter table test_parallel_ddl_1.t1 add index db1_idxx(c1)") - rs := tk.MustQuery("select json_extract(@@tidb_last_ddl_info, '$.seq_num')") - seqIDs[2], _ = strconv.Atoi(rs.Rows()[0][0].(string)) - }) - time.Sleep(5 * time.Millisecond) - wg.Run(func() { - tk := testkit.NewTestKit(t, store) - tk.MustExec("alter table test_parallel_ddl_1.t2 drop column c3") - rs := tk.MustQuery("select json_extract(@@tidb_last_ddl_info, '$.seq_num')") - seqIDs[3], _ = strconv.Atoi(rs.Rows()[0][0].(string)) - }) - time.Sleep(5 * time.Millisecond) - wg.Run(func() { - tk := testkit.NewTestKit(t, store) - tk.MustExec("alter table test_parallel_ddl_1.t1 drop index db1_idx2") - rs := tk.MustQuery("select json_extract(@@tidb_last_ddl_info, '$.seq_num')") - seqIDs[4], _ = strconv.Atoi(rs.Rows()[0][0].(string)) - }) - time.Sleep(5 * time.Millisecond) - wg.Run(func() { - tk := testkit.NewTestKit(t, store) - tk.MustExec("alter table test_parallel_ddl_1.t2 add index db1_idx2(c2)") - rs := tk.MustQuery("select json_extract(@@tidb_last_ddl_info, '$.seq_num')") - seqIDs[5], _ = strconv.Atoi(rs.Rows()[0][0].(string)) - }) - time.Sleep(5 * time.Millisecond) - wg.Run(func() { - tk := testkit.NewTestKit(t, store) - tk.MustExec("alter table test_parallel_ddl_2.t3 drop column c4") - rs := tk.MustQuery("select json_extract(@@tidb_last_ddl_info, '$.seq_num')") - seqIDs[6], _ = strconv.Atoi(rs.Rows()[0][0].(string)) - }) - time.Sleep(5 * time.Millisecond) - wg.Run(func() { - tk := testkit.NewTestKit(t, store) - tk.MustExec("alter table test_parallel_ddl_2.t3 auto_id_cache 1024") - rs := tk.MustQuery("select json_extract(@@tidb_last_ddl_info, '$.seq_num')") - seqIDs[7], _ = strconv.Atoi(rs.Rows()[0][0].(string)) - }) - time.Sleep(5 * time.Millisecond) - wg.Run(func() { - tk := testkit.NewTestKit(t, store) - tk.MustExec("alter table test_parallel_ddl_1.t1 add index db1_idx3(c2)") - rs := tk.MustQuery("select json_extract(@@tidb_last_ddl_info, '$.seq_num')") - seqIDs[8], _ = strconv.Atoi(rs.Rows()[0][0].(string)) - }) - time.Sleep(5 * time.Millisecond) - wg.Run(func() { - tk := testkit.NewTestKit(t, store) - tk.MustExec("drop database test_parallel_ddl_2") - rs := tk.MustQuery("select json_extract(@@tidb_last_ddl_info, '$.seq_num')") - seqIDs[9], _ = strconv.Atoi(rs.Rows()[0][0].(string)) - }) - time.Sleep(5 * time.Millisecond) + var enable atomic.Bool + ch := make(chan struct{}) + testfailpoint.EnableCall(t, "github.com/pingcap/tidb/pkg/ddl/waitJobSubmitted", + func() { + if enable.Load() { + <-ch + } + }, + ) + enable.Store(true) + for i, sql := range []string{ + "alter table test_parallel_ddl_1.t1 add index db1_idx1(c1)", + "alter table test_parallel_ddl_1.t1 add column c3 int", + "alter table test_parallel_ddl_1.t1 add index db1_idxx(c1)", + "alter table test_parallel_ddl_1.t2 drop column c3", + "alter table test_parallel_ddl_1.t1 drop index db1_idx2", + "alter table test_parallel_ddl_1.t2 add index db1_idx2(c2)", + "alter table test_parallel_ddl_2.t3 drop column c4", + "alter table test_parallel_ddl_2.t3 auto_id_cache 1024", + "alter table test_parallel_ddl_1.t1 add index db1_idx3(c2)", + "drop database test_parallel_ddl_2", + } { + idx := i + wg.Run(func() { + tk2 := testkit.NewTestKit(t, store) + tk2.MustExec(sql) + rs := tk2.MustQuery("select json_extract(@@tidb_last_ddl_info, '$.seq_num')") + seqIDs[idx], _ = strconv.Atoi(rs.Rows()[0][0].(string)) + }) + ch <- struct{}{} + } + enable.Store(false) wg.Run(func() { tk := testkit.NewTestKit(t, store) _ = tk.ExecToErr("alter table test_parallel_ddl_2.t3 add index db3_idx1(c2)")