From 9687c3fb75e72006776fa43113f4bc1b02e0cc1b Mon Sep 17 00:00:00 2001 From: tangenta Date: Thu, 29 Dec 2022 15:00:09 +0800 Subject: [PATCH 01/11] ddl: persist index info after changing backfill state --- ddl/index.go | 1 + ddl/multi_schema_change_test.go | 11 +++++++++++ executor/batch_checker.go | 2 +- 3 files changed, 13 insertions(+), 1 deletion(-) diff --git a/ddl/index.go b/ddl/index.go index eba86473acba6..a1c329e8c0408 100644 --- a/ddl/index.go +++ b/ddl/index.go @@ -857,6 +857,7 @@ func doReorgWorkForCreateIndex(w *worker, d *ddlCtx, t *meta.Meta, job *model.Jo return false, ver, err } indexInfo.BackfillState = model.BackfillStateInapplicable // Prevent double-write on this index. + ver, err = updateVersionAndTableInfo(d, t, job, tbl.Meta(), true) return true, ver, nil default: return false, 0, dbterror.ErrInvalidDDLState.GenWithStackByArgs("backfill", indexInfo.BackfillState) diff --git a/ddl/multi_schema_change_test.go b/ddl/multi_schema_change_test.go index 0f8aeca87802c..fd5106accfa08 100644 --- a/ddl/multi_schema_change_test.go +++ b/ddl/multi_schema_change_test.go @@ -1208,6 +1208,17 @@ func TestMultiSchemaChangeSchemaVersion(t *testing.T) { dom.DDL().SetHook(originHook) } +func TestMultiSchemaChangeAddIndexChangeColumn(t *testing.T) { + store := testkit.CreateMockStore(t) + tk := testkit.NewTestKit(t, store) + tk.MustExec("use test;") + tk.MustExec("CREATE TABLE t (a SMALLINT DEFAULT '30219', b TIME NULL DEFAULT '02:45:06', PRIMARY KEY (a));") + tk.MustExec("ALTER TABLE t ADD unique INDEX idx4 (b), change column a e MEDIUMINT DEFAULT '5280454' FIRST;") + tk.MustExec("insert ignore into t (e) values (5586359),(501788),(-5961048),(220083),(-4917129),(-7267211),(7750448);") + tk.MustQuery("select * from t;").Check(testkit.Rows()) + tk.MustExec("admin check table t;") +} + func TestMultiSchemaChangeMixedWithUpdate(t *testing.T) { store, dom := testkit.CreateMockStoreAndDomain(t) tk := testkit.NewTestKit(t, store) diff --git a/executor/batch_checker.go b/executor/batch_checker.go index 79a6748b2d5c3..70466cbd22cd0 100644 --- a/executor/batch_checker.go +++ b/executor/batch_checker.go @@ -181,7 +181,7 @@ func getKeysNeedCheckOneRow(ctx sessionctx.Context, t table.Table, row []types.D continue } // If index is used ingest ways, then we should check key from temp index. - if v.Meta().BackfillState != model.BackfillStateInapplicable { + if v.Meta().State != model.StatePublic && v.Meta().BackfillState != model.BackfillStateInapplicable { _, key, _ = tables.GenTempIdxKeyByState(v.Meta(), key) } colValStr, err1 := formatDataForDupError(colVals) From 0dc68732b1cfa68b7b4960dbb76693e287e118ec Mon Sep 17 00:00:00 2001 From: tangenta Date: Thu, 29 Dec 2022 15:13:09 +0800 Subject: [PATCH 02/11] fix linter --- ddl/index.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ddl/index.go b/ddl/index.go index a1c329e8c0408..b2af49c353fe5 100644 --- a/ddl/index.go +++ b/ddl/index.go @@ -858,7 +858,7 @@ func doReorgWorkForCreateIndex(w *worker, d *ddlCtx, t *meta.Meta, job *model.Jo } indexInfo.BackfillState = model.BackfillStateInapplicable // Prevent double-write on this index. ver, err = updateVersionAndTableInfo(d, t, job, tbl.Meta(), true) - return true, ver, nil + return true, ver, err default: return false, 0, dbterror.ErrInvalidDDLState.GenWithStackByArgs("backfill", indexInfo.BackfillState) } From b1a6c382fbe828513f6dd39d1e84d6a58e086b83 Mon Sep 17 00:00:00 2001 From: tangenta Date: Thu, 29 Dec 2022 22:23:48 +0800 Subject: [PATCH 03/11] fix test TestMultiSchemaChangeAddIndexChangeColumn --- ddl/multi_schema_change_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ddl/multi_schema_change_test.go b/ddl/multi_schema_change_test.go index fd5106accfa08..bf4aef776d291 100644 --- a/ddl/multi_schema_change_test.go +++ b/ddl/multi_schema_change_test.go @@ -1215,7 +1215,7 @@ func TestMultiSchemaChangeAddIndexChangeColumn(t *testing.T) { tk.MustExec("CREATE TABLE t (a SMALLINT DEFAULT '30219', b TIME NULL DEFAULT '02:45:06', PRIMARY KEY (a));") tk.MustExec("ALTER TABLE t ADD unique INDEX idx4 (b), change column a e MEDIUMINT DEFAULT '5280454' FIRST;") tk.MustExec("insert ignore into t (e) values (5586359),(501788),(-5961048),(220083),(-4917129),(-7267211),(7750448);") - tk.MustQuery("select * from t;").Check(testkit.Rows()) + tk.MustQuery("select * from t;").Check(testkit.Rows("5586359 02:45:06")) tk.MustExec("admin check table t;") } From 4a29216c8670e12460bb21743bbbec4cad821a1c Mon Sep 17 00:00:00 2001 From: tangenta Date: Thu, 19 Jan 2023 14:06:57 +0800 Subject: [PATCH 04/11] add debug info --- executor/partition_table_test.go | 10 ++++++++++ testkit/testkit.go | 34 ++++++++++++++++++++++++++++++++ 2 files changed, 44 insertions(+) diff --git a/executor/partition_table_test.go b/executor/partition_table_test.go index 5696b56f6f730..806064bbfaf37 100644 --- a/executor/partition_table_test.go +++ b/executor/partition_table_test.go @@ -18,6 +18,7 @@ import ( "fmt" "math/rand" "strings" + "sync" "testing" "time" @@ -3814,6 +3815,15 @@ func TestIssue35181(t *testing.T) { } func TestIssue21732(t *testing.T) { + var wg sync.WaitGroup + wg.Add(1) + dumpChan := make(chan struct{}) + defer func() { + close(dumpChan) + wg.Wait() + }() + go testkit.DebugDumpOnTimeout(&wg, dumpChan, 20*time.Second) + store := testkit.CreateMockStore(t) tk := testkit.NewTestKit(t, store) diff --git a/testkit/testkit.go b/testkit/testkit.go index db86548ee3bfd..63b5d271711a8 100644 --- a/testkit/testkit.go +++ b/testkit/testkit.go @@ -19,6 +19,9 @@ package testkit import ( "context" "fmt" + "log" + "os" + "runtime/pprof" "strings" "sync" "testing" @@ -532,3 +535,34 @@ func (c *RegionProperityClient) SendRequest(ctx context.Context, addr string, re } return c.Client.SendRequest(ctx, addr, req, timeout) } + +// DebugDumpOnTimeout will dump stack traces and possible blockers after given timeout. +// wg is the WaitGroup to mark as done when finished (to avoid runaway goroutines) +// c is the channel that will signal or close to cancel the timeout. +func DebugDumpOnTimeout(wg *sync.WaitGroup, c chan struct{}, d time.Duration) { + select { + case <-time.After(d): + log.Print("Injected timeout, dumping all goroutines:") + _ = pprof.Lookup("goroutine").WriteTo(os.Stdout, 2) + log.Print("dumping all stack traces led to possible block:") + _ = pprof.Lookup("block").WriteTo(os.Stdout, 2) + log.Print("dumping all stack traces holding mutexes:") + _ = pprof.Lookup("mutex").WriteTo(os.Stdout, 2) + log.Print("dumping all stack traces led to creation of new OS threads:") + _ = pprof.Lookup("threadcreate").WriteTo(os.Stdout, 2) + log.Print("Waiting 2 seconds and to see if things changed...") + time.Sleep(2 * time.Second) + log.Print("Injected timeout, dumping all goroutines:") + _ = pprof.Lookup("goroutine").WriteTo(os.Stdout, 2) + log.Print("dumping all stack traces led to possible block:") + _ = pprof.Lookup("block").WriteTo(os.Stdout, 2) + log.Print("dumping all stack traces holding mutexes:") + _ = pprof.Lookup("mutex").WriteTo(os.Stdout, 2) + log.Print("dumping all stack traces led to creation of new OS threads:") + _ = pprof.Lookup("threadcreate").WriteTo(os.Stdout, 2) + panic("Injected timeout") + case <-c: + // Test finished + } + wg.Done() +} From 4eeca4ad8390cc92b87e5aa446051f38b12a5416 Mon Sep 17 00:00:00 2001 From: tangenta Date: Thu, 19 Jan 2023 16:22:36 +0800 Subject: [PATCH 05/11] remove switch concurrent ddl test --- ddl/concurrentddltest/BUILD.bazel | 26 ----- ddl/concurrentddltest/main_test.go | 45 -------- ddl/concurrentddltest/switch_test.go | 149 --------------------------- 3 files changed, 220 deletions(-) delete mode 100644 ddl/concurrentddltest/BUILD.bazel delete mode 100644 ddl/concurrentddltest/main_test.go delete mode 100644 ddl/concurrentddltest/switch_test.go diff --git a/ddl/concurrentddltest/BUILD.bazel b/ddl/concurrentddltest/BUILD.bazel deleted file mode 100644 index d5acc141896c5..0000000000000 --- a/ddl/concurrentddltest/BUILD.bazel +++ /dev/null @@ -1,26 +0,0 @@ -load("@io_bazel_rules_go//go:def.bzl", "go_test") - -go_test( - name = "concurrentddltest_test", - timeout = "short", - srcs = [ - "main_test.go", - "switch_test.go", - ], - flaky = True, - race = "on", - shard_count = 2, - deps = [ - "//config", - "//ddl", - "//kv", - "//meta", - "//sessionctx/variable", - "//testkit", - "//testkit/testsetup", - "//util", - "@com_github_stretchr_testify//require", - "@org_uber_go_atomic//:atomic", - "@org_uber_go_goleak//:goleak", - ], -) diff --git a/ddl/concurrentddltest/main_test.go b/ddl/concurrentddltest/main_test.go deleted file mode 100644 index 4ab7e96eab2ae..0000000000000 --- a/ddl/concurrentddltest/main_test.go +++ /dev/null @@ -1,45 +0,0 @@ -// Copyright 2022 PingCAP, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package concurrentddltest - -import ( - "testing" - "time" - - "github.com/pingcap/tidb/config" - "github.com/pingcap/tidb/ddl" - "github.com/pingcap/tidb/testkit/testsetup" - "go.uber.org/goleak" -) - -func TestMain(m *testing.M) { - testsetup.SetupForCommonTest() - - config.UpdateGlobal(func(conf *config.Config) { - conf.TiKVClient.AsyncCommit.SafeWindow = 0 - conf.TiKVClient.AsyncCommit.AllowedClockDrift = 0 - }) - - ddl.SetWaitTimeWhenErrorOccurred(time.Microsecond) - - opts := []goleak.Option{ - goleak.IgnoreTopFunction("github.com/golang/glog.(*loggingT).flushDaemon"), - goleak.IgnoreTopFunction("github.com/lestrrat-go/httprc.runFetchWorker"), - goleak.IgnoreTopFunction("go.etcd.io/etcd/client/pkg/v3/logutil.(*MergeLogger).outputLoop"), - goleak.IgnoreTopFunction("go.opencensus.io/stats/view.(*worker).start"), - } - - goleak.VerifyTestMain(m, opts...) -} diff --git a/ddl/concurrentddltest/switch_test.go b/ddl/concurrentddltest/switch_test.go deleted file mode 100644 index 6cd26811008e6..0000000000000 --- a/ddl/concurrentddltest/switch_test.go +++ /dev/null @@ -1,149 +0,0 @@ -// Copyright 2022 PingCAP, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package concurrentddltest - -import ( - "context" - "fmt" - "math/rand" - "testing" - "time" - - "github.com/pingcap/tidb/kv" - "github.com/pingcap/tidb/meta" - "github.com/pingcap/tidb/sessionctx/variable" - "github.com/pingcap/tidb/testkit" - "github.com/pingcap/tidb/util" - "github.com/stretchr/testify/require" - "go.uber.org/atomic" -) - -func TestConcurrentDDLSwitch(t *testing.T) { - store := testkit.CreateMockStore(t) - - type table struct { - columnIdx int - indexIdx int - } - - var tables []*table - tblCount := 20 - for i := 0; i < tblCount; i++ { - tables = append(tables, &table{1, 0}) - } - - tk := testkit.NewTestKit(t, store) - tk.MustExec("use test") - tk.MustExec("set global tidb_enable_metadata_lock=0") - tk.MustExec("set @@global.tidb_ddl_reorg_worker_cnt=1") - tk.MustExec("set @@global.tidb_ddl_reorg_batch_size=32") - - for i := range tables { - tk.MustExec(fmt.Sprintf("create table t%d (col0 int)", i)) - for j := 0; j < 1000; j++ { - tk.MustExec(fmt.Sprintf("insert into t%d values (%d)", i, j)) - } - } - - ddls := make([]string, 0, tblCount) - ddlCount := 100 - for i := 0; i < ddlCount; i++ { - tblIdx := rand.Intn(tblCount) - if rand.Intn(2) == 0 { - ddls = append(ddls, fmt.Sprintf("alter table t%d add index idx%d (col0)", tblIdx, tables[tblIdx].indexIdx)) - tables[tblIdx].indexIdx++ - } else { - ddls = append(ddls, fmt.Sprintf("alter table t%d add column col%d int", tblIdx, tables[tblIdx].columnIdx)) - tables[tblIdx].columnIdx++ - } - } - - c := atomic.NewInt32(0) - ch := make(chan struct{}) - go func() { - var wg util.WaitGroupWrapper - for i := range ddls { - wg.Add(1) - go func(idx int) { - tk := testkit.NewTestKit(t, store) - tk.MustExec("use test") - tk.MustExec(ddls[idx]) - c.Add(1) - wg.Done() - }(i) - } - wg.Wait() - ch <- struct{}{} - }() - - // sleep 2s to make sure the ddl jobs is into table. - time.Sleep(2 * time.Second) - ticker := time.NewTicker(time.Second) - count := 0 - done := false - for !done { - select { - case <-ch: - done = true - case <-ticker.C: - var b bool - var err error - err = kv.RunInNewTxn(kv.WithInternalSourceType(context.Background(), kv.InternalTxnDDL), store, false, func(ctx context.Context, txn kv.Transaction) error { - b, err = meta.NewMeta(txn).IsConcurrentDDL() - return err - }) - require.NoError(t, err) - rs, err := testkit.NewTestKit(t, store).Exec(fmt.Sprintf("set @@global.tidb_enable_concurrent_ddl=%t", !b)) - if rs != nil { - require.NoError(t, rs.Close()) - } - if err == nil { - count++ - if b { - tk := testkit.NewTestKit(t, store) - tk.Session().GetSessionVars().MemQuotaQuery = -1 - tk.MustQuery("select count(*) from mysql.tidb_ddl_job").Check(testkit.Rows("0")) - tk.MustQuery("select count(*) from mysql.tidb_ddl_reorg").Check(testkit.Rows("0")) - } - } - } - } - - require.Equal(t, int32(ddlCount), c.Load()) - require.Greater(t, count, 0) - - tk = testkit.NewTestKit(t, store) - tk.Session().GetSessionVars().MemQuotaQuery = -1 - tk.MustExec("use test") - for i, tbl := range tables { - tk.MustQuery(fmt.Sprintf("select count(*) from information_schema.columns where TABLE_SCHEMA = 'test' and TABLE_NAME = 't%d'", i)).Check(testkit.Rows(fmt.Sprintf("%d", tbl.columnIdx))) - tk.MustExec(fmt.Sprintf("admin check table t%d", i)) - for j := 0; j < tbl.indexIdx; j++ { - tk.MustExec(fmt.Sprintf("admin check index t%d idx%d", i, j)) - } - } -} - -func TestConcurrentDDLSwitchWithMDL(t *testing.T) { - if !variable.EnableConcurrentDDL.Load() { - t.Skip("skip test if concurrent DDL is disabled") - } - store := testkit.CreateMockStore(t) - tk := testkit.NewTestKit(t, store) - tk.MustGetErrMsg("set global tidb_enable_concurrent_ddl=off", "can not disable concurrent ddl when metadata lock is enabled") - tk.MustExec("set global tidb_enable_metadata_lock=0") - tk.MustExec("set global tidb_enable_concurrent_ddl=off") - tk.MustExec("create table test.t(a int)") -} From 35d1aeab57ce3c2a786ed7ac6da95222318af45c Mon Sep 17 00:00:00 2001 From: tangenta Date: Thu, 19 Jan 2023 17:24:46 +0800 Subject: [PATCH 06/11] enable concurrent ddl explicitly --- executor/partition_table_test.go | 1 + 1 file changed, 1 insertion(+) diff --git a/executor/partition_table_test.go b/executor/partition_table_test.go index 806064bbfaf37..e97c64d18ae33 100644 --- a/executor/partition_table_test.go +++ b/executor/partition_table_test.go @@ -3827,6 +3827,7 @@ func TestIssue21732(t *testing.T) { store := testkit.CreateMockStore(t) tk := testkit.NewTestKit(t, store) + tk.MustExec("set @@global.tidb_enable_concurrent_ddl = 1;") for _, mode := range []variable.PartitionPruneMode{variable.StaticOnly, variable.DynamicOnly} { testkit.WithPruneMode(tk, mode, func() { tk.MustExec("create database TestIssue21732") From 98b9820869a205be4628d263462297f576a563ef Mon Sep 17 00:00:00 2001 From: tangenta Date: Sun, 29 Jan 2023 14:17:41 +0800 Subject: [PATCH 07/11] fix integration test --- ddl/concurrentddltest/BUILD.bazel | 26 +++++ ddl/concurrentddltest/main_test.go | 45 ++++++++ ddl/concurrentddltest/switch_test.go | 149 +++++++++++++++++++++++++++ ddl/index.go | 5 +- executor/partition_table_test.go | 11 -- 5 files changed, 224 insertions(+), 12 deletions(-) create mode 100644 ddl/concurrentddltest/BUILD.bazel create mode 100644 ddl/concurrentddltest/main_test.go create mode 100644 ddl/concurrentddltest/switch_test.go diff --git a/ddl/concurrentddltest/BUILD.bazel b/ddl/concurrentddltest/BUILD.bazel new file mode 100644 index 0000000000000..d5acc141896c5 --- /dev/null +++ b/ddl/concurrentddltest/BUILD.bazel @@ -0,0 +1,26 @@ +load("@io_bazel_rules_go//go:def.bzl", "go_test") + +go_test( + name = "concurrentddltest_test", + timeout = "short", + srcs = [ + "main_test.go", + "switch_test.go", + ], + flaky = True, + race = "on", + shard_count = 2, + deps = [ + "//config", + "//ddl", + "//kv", + "//meta", + "//sessionctx/variable", + "//testkit", + "//testkit/testsetup", + "//util", + "@com_github_stretchr_testify//require", + "@org_uber_go_atomic//:atomic", + "@org_uber_go_goleak//:goleak", + ], +) diff --git a/ddl/concurrentddltest/main_test.go b/ddl/concurrentddltest/main_test.go new file mode 100644 index 0000000000000..4ab7e96eab2ae --- /dev/null +++ b/ddl/concurrentddltest/main_test.go @@ -0,0 +1,45 @@ +// Copyright 2022 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package concurrentddltest + +import ( + "testing" + "time" + + "github.com/pingcap/tidb/config" + "github.com/pingcap/tidb/ddl" + "github.com/pingcap/tidb/testkit/testsetup" + "go.uber.org/goleak" +) + +func TestMain(m *testing.M) { + testsetup.SetupForCommonTest() + + config.UpdateGlobal(func(conf *config.Config) { + conf.TiKVClient.AsyncCommit.SafeWindow = 0 + conf.TiKVClient.AsyncCommit.AllowedClockDrift = 0 + }) + + ddl.SetWaitTimeWhenErrorOccurred(time.Microsecond) + + opts := []goleak.Option{ + goleak.IgnoreTopFunction("github.com/golang/glog.(*loggingT).flushDaemon"), + goleak.IgnoreTopFunction("github.com/lestrrat-go/httprc.runFetchWorker"), + goleak.IgnoreTopFunction("go.etcd.io/etcd/client/pkg/v3/logutil.(*MergeLogger).outputLoop"), + goleak.IgnoreTopFunction("go.opencensus.io/stats/view.(*worker).start"), + } + + goleak.VerifyTestMain(m, opts...) +} diff --git a/ddl/concurrentddltest/switch_test.go b/ddl/concurrentddltest/switch_test.go new file mode 100644 index 0000000000000..6cd26811008e6 --- /dev/null +++ b/ddl/concurrentddltest/switch_test.go @@ -0,0 +1,149 @@ +// Copyright 2022 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package concurrentddltest + +import ( + "context" + "fmt" + "math/rand" + "testing" + "time" + + "github.com/pingcap/tidb/kv" + "github.com/pingcap/tidb/meta" + "github.com/pingcap/tidb/sessionctx/variable" + "github.com/pingcap/tidb/testkit" + "github.com/pingcap/tidb/util" + "github.com/stretchr/testify/require" + "go.uber.org/atomic" +) + +func TestConcurrentDDLSwitch(t *testing.T) { + store := testkit.CreateMockStore(t) + + type table struct { + columnIdx int + indexIdx int + } + + var tables []*table + tblCount := 20 + for i := 0; i < tblCount; i++ { + tables = append(tables, &table{1, 0}) + } + + tk := testkit.NewTestKit(t, store) + tk.MustExec("use test") + tk.MustExec("set global tidb_enable_metadata_lock=0") + tk.MustExec("set @@global.tidb_ddl_reorg_worker_cnt=1") + tk.MustExec("set @@global.tidb_ddl_reorg_batch_size=32") + + for i := range tables { + tk.MustExec(fmt.Sprintf("create table t%d (col0 int)", i)) + for j := 0; j < 1000; j++ { + tk.MustExec(fmt.Sprintf("insert into t%d values (%d)", i, j)) + } + } + + ddls := make([]string, 0, tblCount) + ddlCount := 100 + for i := 0; i < ddlCount; i++ { + tblIdx := rand.Intn(tblCount) + if rand.Intn(2) == 0 { + ddls = append(ddls, fmt.Sprintf("alter table t%d add index idx%d (col0)", tblIdx, tables[tblIdx].indexIdx)) + tables[tblIdx].indexIdx++ + } else { + ddls = append(ddls, fmt.Sprintf("alter table t%d add column col%d int", tblIdx, tables[tblIdx].columnIdx)) + tables[tblIdx].columnIdx++ + } + } + + c := atomic.NewInt32(0) + ch := make(chan struct{}) + go func() { + var wg util.WaitGroupWrapper + for i := range ddls { + wg.Add(1) + go func(idx int) { + tk := testkit.NewTestKit(t, store) + tk.MustExec("use test") + tk.MustExec(ddls[idx]) + c.Add(1) + wg.Done() + }(i) + } + wg.Wait() + ch <- struct{}{} + }() + + // sleep 2s to make sure the ddl jobs is into table. + time.Sleep(2 * time.Second) + ticker := time.NewTicker(time.Second) + count := 0 + done := false + for !done { + select { + case <-ch: + done = true + case <-ticker.C: + var b bool + var err error + err = kv.RunInNewTxn(kv.WithInternalSourceType(context.Background(), kv.InternalTxnDDL), store, false, func(ctx context.Context, txn kv.Transaction) error { + b, err = meta.NewMeta(txn).IsConcurrentDDL() + return err + }) + require.NoError(t, err) + rs, err := testkit.NewTestKit(t, store).Exec(fmt.Sprintf("set @@global.tidb_enable_concurrent_ddl=%t", !b)) + if rs != nil { + require.NoError(t, rs.Close()) + } + if err == nil { + count++ + if b { + tk := testkit.NewTestKit(t, store) + tk.Session().GetSessionVars().MemQuotaQuery = -1 + tk.MustQuery("select count(*) from mysql.tidb_ddl_job").Check(testkit.Rows("0")) + tk.MustQuery("select count(*) from mysql.tidb_ddl_reorg").Check(testkit.Rows("0")) + } + } + } + } + + require.Equal(t, int32(ddlCount), c.Load()) + require.Greater(t, count, 0) + + tk = testkit.NewTestKit(t, store) + tk.Session().GetSessionVars().MemQuotaQuery = -1 + tk.MustExec("use test") + for i, tbl := range tables { + tk.MustQuery(fmt.Sprintf("select count(*) from information_schema.columns where TABLE_SCHEMA = 'test' and TABLE_NAME = 't%d'", i)).Check(testkit.Rows(fmt.Sprintf("%d", tbl.columnIdx))) + tk.MustExec(fmt.Sprintf("admin check table t%d", i)) + for j := 0; j < tbl.indexIdx; j++ { + tk.MustExec(fmt.Sprintf("admin check index t%d idx%d", i, j)) + } + } +} + +func TestConcurrentDDLSwitchWithMDL(t *testing.T) { + if !variable.EnableConcurrentDDL.Load() { + t.Skip("skip test if concurrent DDL is disabled") + } + store := testkit.CreateMockStore(t) + tk := testkit.NewTestKit(t, store) + tk.MustGetErrMsg("set global tidb_enable_concurrent_ddl=off", "can not disable concurrent ddl when metadata lock is enabled") + tk.MustExec("set global tidb_enable_metadata_lock=0") + tk.MustExec("set global tidb_enable_concurrent_ddl=off") + tk.MustExec("create table test.t(a int)") +} diff --git a/ddl/index.go b/ddl/index.go index b2af49c353fe5..aad5f3fce35bd 100644 --- a/ddl/index.go +++ b/ddl/index.go @@ -774,6 +774,10 @@ func doReorgWorkForCreateIndexMultiSchema(w *worker, d *ddlCtx, t *meta.Meta, jo done, ver, err = doReorgWorkForCreateIndex(w, d, t, job, tbl, indexInfo) if done { job.MarkNonRevertible() + if err == nil { + ver, err = updateVersionAndTableInfo(d, t, job, tbl.Meta(), true) + return true, ver, err + } } // We need another round to wait for all the others sub-jobs to finish. return false, ver, err @@ -857,7 +861,6 @@ func doReorgWorkForCreateIndex(w *worker, d *ddlCtx, t *meta.Meta, job *model.Jo return false, ver, err } indexInfo.BackfillState = model.BackfillStateInapplicable // Prevent double-write on this index. - ver, err = updateVersionAndTableInfo(d, t, job, tbl.Meta(), true) return true, ver, err default: return false, 0, dbterror.ErrInvalidDDLState.GenWithStackByArgs("backfill", indexInfo.BackfillState) diff --git a/executor/partition_table_test.go b/executor/partition_table_test.go index e97c64d18ae33..5696b56f6f730 100644 --- a/executor/partition_table_test.go +++ b/executor/partition_table_test.go @@ -18,7 +18,6 @@ import ( "fmt" "math/rand" "strings" - "sync" "testing" "time" @@ -3815,19 +3814,9 @@ func TestIssue35181(t *testing.T) { } func TestIssue21732(t *testing.T) { - var wg sync.WaitGroup - wg.Add(1) - dumpChan := make(chan struct{}) - defer func() { - close(dumpChan) - wg.Wait() - }() - go testkit.DebugDumpOnTimeout(&wg, dumpChan, 20*time.Second) - store := testkit.CreateMockStore(t) tk := testkit.NewTestKit(t, store) - tk.MustExec("set @@global.tidb_enable_concurrent_ddl = 1;") for _, mode := range []variable.PartitionPruneMode{variable.StaticOnly, variable.DynamicOnly} { testkit.WithPruneMode(tk, mode, func() { tk.MustExec("create database TestIssue21732") From 1918f3fb888780b76ba9a408634ee9a1016071c5 Mon Sep 17 00:00:00 2001 From: tangenta Date: Sun, 29 Jan 2023 14:23:29 +0800 Subject: [PATCH 08/11] remove debug code --- testkit/testkit.go | 34 ---------------------------------- 1 file changed, 34 deletions(-) diff --git a/testkit/testkit.go b/testkit/testkit.go index 63b5d271711a8..db86548ee3bfd 100644 --- a/testkit/testkit.go +++ b/testkit/testkit.go @@ -19,9 +19,6 @@ package testkit import ( "context" "fmt" - "log" - "os" - "runtime/pprof" "strings" "sync" "testing" @@ -535,34 +532,3 @@ func (c *RegionProperityClient) SendRequest(ctx context.Context, addr string, re } return c.Client.SendRequest(ctx, addr, req, timeout) } - -// DebugDumpOnTimeout will dump stack traces and possible blockers after given timeout. -// wg is the WaitGroup to mark as done when finished (to avoid runaway goroutines) -// c is the channel that will signal or close to cancel the timeout. -func DebugDumpOnTimeout(wg *sync.WaitGroup, c chan struct{}, d time.Duration) { - select { - case <-time.After(d): - log.Print("Injected timeout, dumping all goroutines:") - _ = pprof.Lookup("goroutine").WriteTo(os.Stdout, 2) - log.Print("dumping all stack traces led to possible block:") - _ = pprof.Lookup("block").WriteTo(os.Stdout, 2) - log.Print("dumping all stack traces holding mutexes:") - _ = pprof.Lookup("mutex").WriteTo(os.Stdout, 2) - log.Print("dumping all stack traces led to creation of new OS threads:") - _ = pprof.Lookup("threadcreate").WriteTo(os.Stdout, 2) - log.Print("Waiting 2 seconds and to see if things changed...") - time.Sleep(2 * time.Second) - log.Print("Injected timeout, dumping all goroutines:") - _ = pprof.Lookup("goroutine").WriteTo(os.Stdout, 2) - log.Print("dumping all stack traces led to possible block:") - _ = pprof.Lookup("block").WriteTo(os.Stdout, 2) - log.Print("dumping all stack traces holding mutexes:") - _ = pprof.Lookup("mutex").WriteTo(os.Stdout, 2) - log.Print("dumping all stack traces led to creation of new OS threads:") - _ = pprof.Lookup("threadcreate").WriteTo(os.Stdout, 2) - panic("Injected timeout") - case <-c: - // Test finished - } - wg.Done() -} From c9ba0e7511f12d6e01bd4a9c148240c936306527 Mon Sep 17 00:00:00 2001 From: tangenta Date: Sun, 29 Jan 2023 15:22:16 +0800 Subject: [PATCH 09/11] fix integration test --- ddl/index.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ddl/index.go b/ddl/index.go index aad5f3fce35bd..fd13f92861e10 100644 --- a/ddl/index.go +++ b/ddl/index.go @@ -776,7 +776,7 @@ func doReorgWorkForCreateIndexMultiSchema(w *worker, d *ddlCtx, t *meta.Meta, jo job.MarkNonRevertible() if err == nil { ver, err = updateVersionAndTableInfo(d, t, job, tbl.Meta(), true) - return true, ver, err + return false, ver, err } } // We need another round to wait for all the others sub-jobs to finish. From 1d2a89772aa3921db136f5c6ad08663273d6c110 Mon Sep 17 00:00:00 2001 From: tangenta Date: Sun, 29 Jan 2023 15:26:08 +0800 Subject: [PATCH 10/11] fix integration test --- ddl/index.go | 1 - 1 file changed, 1 deletion(-) diff --git a/ddl/index.go b/ddl/index.go index fd13f92861e10..478338a78adc5 100644 --- a/ddl/index.go +++ b/ddl/index.go @@ -776,7 +776,6 @@ func doReorgWorkForCreateIndexMultiSchema(w *worker, d *ddlCtx, t *meta.Meta, jo job.MarkNonRevertible() if err == nil { ver, err = updateVersionAndTableInfo(d, t, job, tbl.Meta(), true) - return false, ver, err } } // We need another round to wait for all the others sub-jobs to finish. From 6884be541ed17eb1b22c24106a9d1b70229e7471 Mon Sep 17 00:00:00 2001 From: tangenta Date: Sun, 29 Jan 2023 15:57:45 +0800 Subject: [PATCH 11/11] remove concurrentddl test --- ddl/concurrentddltest/BUILD.bazel | 26 ----- ddl/concurrentddltest/main_test.go | 45 -------- ddl/concurrentddltest/switch_test.go | 149 --------------------------- 3 files changed, 220 deletions(-) delete mode 100644 ddl/concurrentddltest/BUILD.bazel delete mode 100644 ddl/concurrentddltest/main_test.go delete mode 100644 ddl/concurrentddltest/switch_test.go diff --git a/ddl/concurrentddltest/BUILD.bazel b/ddl/concurrentddltest/BUILD.bazel deleted file mode 100644 index d5acc141896c5..0000000000000 --- a/ddl/concurrentddltest/BUILD.bazel +++ /dev/null @@ -1,26 +0,0 @@ -load("@io_bazel_rules_go//go:def.bzl", "go_test") - -go_test( - name = "concurrentddltest_test", - timeout = "short", - srcs = [ - "main_test.go", - "switch_test.go", - ], - flaky = True, - race = "on", - shard_count = 2, - deps = [ - "//config", - "//ddl", - "//kv", - "//meta", - "//sessionctx/variable", - "//testkit", - "//testkit/testsetup", - "//util", - "@com_github_stretchr_testify//require", - "@org_uber_go_atomic//:atomic", - "@org_uber_go_goleak//:goleak", - ], -) diff --git a/ddl/concurrentddltest/main_test.go b/ddl/concurrentddltest/main_test.go deleted file mode 100644 index 4ab7e96eab2ae..0000000000000 --- a/ddl/concurrentddltest/main_test.go +++ /dev/null @@ -1,45 +0,0 @@ -// Copyright 2022 PingCAP, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package concurrentddltest - -import ( - "testing" - "time" - - "github.com/pingcap/tidb/config" - "github.com/pingcap/tidb/ddl" - "github.com/pingcap/tidb/testkit/testsetup" - "go.uber.org/goleak" -) - -func TestMain(m *testing.M) { - testsetup.SetupForCommonTest() - - config.UpdateGlobal(func(conf *config.Config) { - conf.TiKVClient.AsyncCommit.SafeWindow = 0 - conf.TiKVClient.AsyncCommit.AllowedClockDrift = 0 - }) - - ddl.SetWaitTimeWhenErrorOccurred(time.Microsecond) - - opts := []goleak.Option{ - goleak.IgnoreTopFunction("github.com/golang/glog.(*loggingT).flushDaemon"), - goleak.IgnoreTopFunction("github.com/lestrrat-go/httprc.runFetchWorker"), - goleak.IgnoreTopFunction("go.etcd.io/etcd/client/pkg/v3/logutil.(*MergeLogger).outputLoop"), - goleak.IgnoreTopFunction("go.opencensus.io/stats/view.(*worker).start"), - } - - goleak.VerifyTestMain(m, opts...) -} diff --git a/ddl/concurrentddltest/switch_test.go b/ddl/concurrentddltest/switch_test.go deleted file mode 100644 index 6cd26811008e6..0000000000000 --- a/ddl/concurrentddltest/switch_test.go +++ /dev/null @@ -1,149 +0,0 @@ -// Copyright 2022 PingCAP, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package concurrentddltest - -import ( - "context" - "fmt" - "math/rand" - "testing" - "time" - - "github.com/pingcap/tidb/kv" - "github.com/pingcap/tidb/meta" - "github.com/pingcap/tidb/sessionctx/variable" - "github.com/pingcap/tidb/testkit" - "github.com/pingcap/tidb/util" - "github.com/stretchr/testify/require" - "go.uber.org/atomic" -) - -func TestConcurrentDDLSwitch(t *testing.T) { - store := testkit.CreateMockStore(t) - - type table struct { - columnIdx int - indexIdx int - } - - var tables []*table - tblCount := 20 - for i := 0; i < tblCount; i++ { - tables = append(tables, &table{1, 0}) - } - - tk := testkit.NewTestKit(t, store) - tk.MustExec("use test") - tk.MustExec("set global tidb_enable_metadata_lock=0") - tk.MustExec("set @@global.tidb_ddl_reorg_worker_cnt=1") - tk.MustExec("set @@global.tidb_ddl_reorg_batch_size=32") - - for i := range tables { - tk.MustExec(fmt.Sprintf("create table t%d (col0 int)", i)) - for j := 0; j < 1000; j++ { - tk.MustExec(fmt.Sprintf("insert into t%d values (%d)", i, j)) - } - } - - ddls := make([]string, 0, tblCount) - ddlCount := 100 - for i := 0; i < ddlCount; i++ { - tblIdx := rand.Intn(tblCount) - if rand.Intn(2) == 0 { - ddls = append(ddls, fmt.Sprintf("alter table t%d add index idx%d (col0)", tblIdx, tables[tblIdx].indexIdx)) - tables[tblIdx].indexIdx++ - } else { - ddls = append(ddls, fmt.Sprintf("alter table t%d add column col%d int", tblIdx, tables[tblIdx].columnIdx)) - tables[tblIdx].columnIdx++ - } - } - - c := atomic.NewInt32(0) - ch := make(chan struct{}) - go func() { - var wg util.WaitGroupWrapper - for i := range ddls { - wg.Add(1) - go func(idx int) { - tk := testkit.NewTestKit(t, store) - tk.MustExec("use test") - tk.MustExec(ddls[idx]) - c.Add(1) - wg.Done() - }(i) - } - wg.Wait() - ch <- struct{}{} - }() - - // sleep 2s to make sure the ddl jobs is into table. - time.Sleep(2 * time.Second) - ticker := time.NewTicker(time.Second) - count := 0 - done := false - for !done { - select { - case <-ch: - done = true - case <-ticker.C: - var b bool - var err error - err = kv.RunInNewTxn(kv.WithInternalSourceType(context.Background(), kv.InternalTxnDDL), store, false, func(ctx context.Context, txn kv.Transaction) error { - b, err = meta.NewMeta(txn).IsConcurrentDDL() - return err - }) - require.NoError(t, err) - rs, err := testkit.NewTestKit(t, store).Exec(fmt.Sprintf("set @@global.tidb_enable_concurrent_ddl=%t", !b)) - if rs != nil { - require.NoError(t, rs.Close()) - } - if err == nil { - count++ - if b { - tk := testkit.NewTestKit(t, store) - tk.Session().GetSessionVars().MemQuotaQuery = -1 - tk.MustQuery("select count(*) from mysql.tidb_ddl_job").Check(testkit.Rows("0")) - tk.MustQuery("select count(*) from mysql.tidb_ddl_reorg").Check(testkit.Rows("0")) - } - } - } - } - - require.Equal(t, int32(ddlCount), c.Load()) - require.Greater(t, count, 0) - - tk = testkit.NewTestKit(t, store) - tk.Session().GetSessionVars().MemQuotaQuery = -1 - tk.MustExec("use test") - for i, tbl := range tables { - tk.MustQuery(fmt.Sprintf("select count(*) from information_schema.columns where TABLE_SCHEMA = 'test' and TABLE_NAME = 't%d'", i)).Check(testkit.Rows(fmt.Sprintf("%d", tbl.columnIdx))) - tk.MustExec(fmt.Sprintf("admin check table t%d", i)) - for j := 0; j < tbl.indexIdx; j++ { - tk.MustExec(fmt.Sprintf("admin check index t%d idx%d", i, j)) - } - } -} - -func TestConcurrentDDLSwitchWithMDL(t *testing.T) { - if !variable.EnableConcurrentDDL.Load() { - t.Skip("skip test if concurrent DDL is disabled") - } - store := testkit.CreateMockStore(t) - tk := testkit.NewTestKit(t, store) - tk.MustGetErrMsg("set global tidb_enable_concurrent_ddl=off", "can not disable concurrent ddl when metadata lock is enabled") - tk.MustExec("set global tidb_enable_metadata_lock=0") - tk.MustExec("set global tidb_enable_concurrent_ddl=off") - tk.MustExec("create table test.t(a int)") -}