From b3bbd5dfde2e6271614146859df026f8c015f0d9 Mon Sep 17 00:00:00 2001 From: Ti Chi Robot Date: Tue, 12 Sep 2023 00:58:40 +0800 Subject: [PATCH] ddl: Exchange partition rollback (#45877) (#45980) close pingcap/tidb#45791, close pingcap/tidb#45920 Signed-off-by: Yang Keao --- ddl/db_partition_test.go | 67 +++++++++- ddl/ddl.go | 5 + ddl/ddl_api.go | 1 - ddl/ddl_worker.go | 36 +++--- ddl/failtest/fail_db_test.go | 2 - ddl/metadatalocktest/BUILD.bazel | 3 + ddl/metadatalocktest/mdl_test.go | 209 +++++++++++++++++++++++++++++++ ddl/partition.go | 122 +++++++++--------- ddl/placement_policy_test.go | 73 ++++++----- ddl/rollingback.go | 28 ++++- executor/insert_common.go | 2 +- executor/write.go | 2 +- infoschema/builder.go | 56 +++++++-- parser/model/model.go | 3 +- testkit/testkit.go | 35 ++++++ 15 files changed, 514 insertions(+), 130 deletions(-) diff --git a/ddl/db_partition_test.go b/ddl/db_partition_test.go index 405e42af5fb41..a37279c13d3e9 100644 --- a/ddl/db_partition_test.go +++ b/ddl/db_partition_test.go @@ -1544,7 +1544,7 @@ func TestAlterTableTruncatePartitionPreSplitRegion(t *testing.T) { tk.MustExec("drop table if exists t1;") tk.MustExec(`CREATE TABLE t1 (id int, c varchar(128), key c(c)) partition by range (id) ( - partition p0 values less than (10), + partition p0 values less than (10), partition p1 values less than MAXVALUE)`) re := tk.MustQuery("show table t1 regions") rows := re.Rows() @@ -2467,6 +2467,71 @@ func TestExchangePartitionTableCompatiable(t *testing.T) { require.NoError(t, err) } +func TestExchangePartitionValidation(t *testing.T) { + store := testkit.CreateMockStore(t) + tk := testkit.NewTestKit(t, store) + + dbName := "ExchangeValidation" + tk.MustExec(`create schema ` + dbName) + tk.MustExec(`use ` + dbName) + tk.MustExec(`CREATE TABLE t1 ( + d date NOT NULL , + name varchar(10) NOT NULL, + UNIQUE KEY (d,name))`) + + tk.MustExec(`CREATE TABLE t1p ( + d date NOT NULL , + name varchar(10) NOT NULL, + UNIQUE KEY (d,name) + ) + PARTITION BY RANGE COLUMNS(d) + (PARTITION p202307 VALUES LESS THAN ('2023-08-01'), + PARTITION p202308 VALUES LESS THAN ('2023-09-01'), + PARTITION p202309 VALUES LESS THAN ('2023-10-01'), + PARTITION p202310 VALUES LESS THAN ('2023-11-01'), + PARTITION p202311 VALUES LESS THAN ('2023-12-01'), + PARTITION p202312 VALUES LESS THAN ('2024-01-01'), + PARTITION pfuture VALUES LESS THAN (MAXVALUE))`) + + tk.MustExec(`insert into t1 values ("2023-08-06","0000")`) + tk.MustContainErrMsg(`alter table t1p exchange partition p202307 with table t1 with validation`, + "[ddl:1737]Found a row that does not match the partition") + tk.MustExec(`insert into t1 values ("2023-08-06","0001")`) +} + +func TestExchangePartitionPlacementPolicy(t *testing.T) { + store := testkit.CreateMockStore(t) + tk := testkit.NewTestKit(t, store) + + tk.MustExec(`create schema ExchangePartWithPolicy`) + tk.MustExec(`use ExchangePartWithPolicy`) + tk.MustExec(`CREATE PLACEMENT POLICY rule1 FOLLOWERS=1`) + tk.MustExec(`CREATE PLACEMENT POLICY rule2 FOLLOWERS=2`) + tk.MustExec(`CREATE TABLE t1 ( + d date NOT NULL , + name varchar(10) NOT NULL, + UNIQUE KEY (d,name) + ) PLACEMENT POLICY="rule1"`) + + tk.MustExec(`CREATE TABLE t1p ( + d date NOT NULL , + name varchar(10) NOT NULL, + UNIQUE KEY (d,name) + ) PLACEMENT POLICY="rule2" + PARTITION BY RANGE COLUMNS(d) + (PARTITION p202307 VALUES LESS THAN ('2023-08-01'), + PARTITION p202308 VALUES LESS THAN ('2023-09-01'), + PARTITION p202309 VALUES LESS THAN ('2023-10-01'), + PARTITION p202310 VALUES LESS THAN ('2023-11-01'), + PARTITION p202311 VALUES LESS THAN ('2023-12-01'), + PARTITION p202312 VALUES LESS THAN ('2024-01-01'), + PARTITION pfuture VALUES LESS THAN (MAXVALUE))`) + + tk.MustContainErrMsg(`alter table t1p exchange partition p202307 with table t1`, + "[ddl:1736]Tables have different definitions") + tk.MustExec(`insert into t1 values ("2023-08-06","0000")`) +} + func TestExchangePartitionHook(t *testing.T) { store, dom := testkit.CreateMockStoreAndDomain(t) tk := testkit.NewTestKit(t, store) diff --git a/ddl/ddl.go b/ddl/ddl.go index 136c7abf2bf0f..a800bdab7969d 100644 --- a/ddl/ddl.go +++ b/ddl/ddl.go @@ -1229,6 +1229,7 @@ func (d *ddl) SwitchMDL(enable bool) error { ctx, cancel := context.WithTimeout(context.Background(), time.Second*30) defer cancel() + logutil.BgLogger().Info("Switch EnableMDL 1", zap.Bool("arg", enable), zap.Bool("value", variable.EnableMDL.Load())) // Disable MDL for test. if enable && !variable.DefTiDBEnableConcurrentDDL { sql := fmt.Sprintf("UPDATE HIGH_PRIORITY %[1]s.%[2]s SET VARIABLE_VALUE = %[4]d WHERE VARIABLE_NAME = '%[3]s'", @@ -1247,6 +1248,7 @@ func (d *ddl) SwitchMDL(enable bool) error { return nil } + logutil.BgLogger().Info("Switch EnableMDL 2", zap.Bool("arg", enable), zap.Bool("value", variable.EnableMDL.Load())) isEnableBefore := variable.EnableMDL.Load() if isEnableBefore == enable { return nil @@ -1268,7 +1270,10 @@ func (d *ddl) SwitchMDL(enable bool) error { return errors.New("please wait for all jobs done") } + logutil.BgLogger().Info("Switch EnableMDL 3", zap.Bool("arg", enable), zap.Bool("value", variable.EnableMDL.Load())) variable.EnableMDL.Store(enable) + logutil.BgLogger().Info("Switch EnableMDL 4", zap.Bool("arg", enable), zap.Bool("value", variable.EnableMDL.Load())) + err = kv.RunInNewTxn(kv.WithInternalSourceType(context.Background(), kv.InternalTxnDDL), d.store, true, func(ctx context.Context, txn kv.Transaction) error { m := meta.NewMeta(txn) oldEnable, _, err := m.GetMetadataLock() diff --git a/ddl/ddl_api.go b/ddl/ddl_api.go index fba7f4d4feabc..bf66fdc5d02be 100644 --- a/ddl/ddl_api.go +++ b/ddl/ddl_api.go @@ -4195,7 +4195,6 @@ func checkExchangePartition(pt *model.TableInfo, nt *model.TableInfo) error { return errors.Trace(dbterror.ErrPartitionExchangeForeignKey.GenWithStackByArgs(nt.Name)) } - // NOTE: if nt is temporary table, it should be checked return nil } diff --git a/ddl/ddl_worker.go b/ddl/ddl_worker.go index 55836de141c78..a4e152e7866e0 100644 --- a/ddl/ddl_worker.go +++ b/ddl/ddl_worker.go @@ -1553,24 +1553,28 @@ func updateSchemaVersion(d *ddlCtx, t *meta.Meta, job *model.Job, multiInfos ... diff.OldSchemaID = oldSchemaIDs[0] diff.AffectedOpts = affects case model.ActionExchangeTablePartition: - var ( - ptSchemaID int64 - ptTableID int64 - partName string - withValidation bool - ) - err = job.DecodeArgs(&diff.TableID, &ptSchemaID, &ptTableID, &partName, &withValidation) - if err != nil { - return 0, errors.Trace(err) - } diff.OldTableID = job.TableID - affects := make([]*model.AffectedOption, 1) - affects[0] = &model.AffectedOption{ - SchemaID: ptSchemaID, - TableID: ptTableID, - OldTableID: ptTableID, + diff.OldSchemaID = job.SchemaID + if job.SchemaState != model.StatePublic { + diff.TableID = job.TableID + diff.SchemaID = job.SchemaID + } else { + // Update the partitioned table (it is only done in the last state) + var ( + ptSchemaID int64 + ptTableID int64 + ptDefID int64 // Not needed, will reload the whole table + partName string // Not used + withValidation bool // Not used + ) + // See ddl.ExchangeTablePartition + err = job.DecodeArgs(&ptDefID, &ptSchemaID, &ptTableID, &partName, &withValidation) + if err != nil { + return 0, errors.Trace(err) + } + diff.SchemaID = ptSchemaID + diff.TableID = ptTableID } - diff.AffectedOpts = affects case model.ActionTruncateTablePartition: diff.TableID = job.TableID if len(job.CtxVars) > 0 { diff --git a/ddl/failtest/fail_db_test.go b/ddl/failtest/fail_db_test.go index 4a938e5fd2ad4..6d3280cb481d4 100644 --- a/ddl/failtest/fail_db_test.go +++ b/ddl/failtest/fail_db_test.go @@ -135,8 +135,6 @@ func TestHalfwayCancelOperations(t *testing.T) { tk.MustExec("insert into pt values(1), (3), (5)") tk.MustExec("create table nt(a int)") tk.MustExec("insert into nt values(7)") - tk.MustExec("set @@tidb_enable_exchange_partition=1") - defer tk.MustExec("set @@tidb_enable_exchange_partition=0") err = tk.ExecToErr("alter table pt exchange partition p1 with table nt") require.Error(t, err) diff --git a/ddl/metadatalocktest/BUILD.bazel b/ddl/metadatalocktest/BUILD.bazel index fb169367a7933..413fd554034f5 100644 --- a/ddl/metadatalocktest/BUILD.bazel +++ b/ddl/metadatalocktest/BUILD.bazel @@ -13,10 +13,13 @@ go_test( "//ddl", "//errno", "//server", + "//sessionctx/variable", "//testkit", "//testkit/testsetup", + "//util/logutil", "@com_github_pingcap_failpoint//:failpoint", "@com_github_stretchr_testify//require", "@org_uber_go_goleak//:goleak", + "@org_uber_go_zap//:zap", ], ) diff --git a/ddl/metadatalocktest/mdl_test.go b/ddl/metadatalocktest/mdl_test.go index 2732073251c07..5364809aecebf 100644 --- a/ddl/metadatalocktest/mdl_test.go +++ b/ddl/metadatalocktest/mdl_test.go @@ -18,6 +18,7 @@ package metadatalocktest import ( "fmt" + "strings" "sync" "testing" "time" @@ -25,8 +26,11 @@ import ( "github.com/pingcap/failpoint" mysql "github.com/pingcap/tidb/errno" "github.com/pingcap/tidb/server" + "github.com/pingcap/tidb/sessionctx/variable" "github.com/pingcap/tidb/testkit" + "github.com/pingcap/tidb/util/logutil" "github.com/stretchr/testify/require" + "go.uber.org/zap" ) func TestMDLBasicSelect(t *testing.T) { @@ -1134,3 +1138,208 @@ func TestMDLUpdateEtcdFail(t *testing.T) { tk.MustExec("alter table test.t add column c int") } + +// Tests that require MDL. +// They are here, since they must run without 'featuretag' defined +func TestExchangePartitionStates(t *testing.T) { + store := testkit.CreateMockStore(t) + tk := testkit.NewTestKit(t, store) + dbName := "partSchemaVer" + tk.MustExec("create database " + dbName) + tk.MustExec("use " + dbName) + tk.MustExec(`set @@global.tidb_enable_metadata_lock = ON`) + tk2 := testkit.NewTestKit(t, store) + tk2.MustExec("use " + dbName) + tk3 := testkit.NewTestKit(t, store) + tk3.MustExec("use " + dbName) + tk4 := testkit.NewTestKit(t, store) + tk4.MustExec("use " + dbName) + tk.MustExec(`create table t (a int primary key, b varchar(255), key (b))`) + tk.MustExec(`create table tp (a int primary key, b varchar(255), key (b)) partition by range (a) (partition p0 values less than (1000000), partition p1M values less than (2000000))`) + tk.MustExec(`insert into t values (1, "1")`) + tk.MustExec(`insert into tp values (2, "2")`) + tk.MustExec(`analyze table t,tp`) + tk.MustQuery(`select * from information_schema.global_variables`).Check(testkit.Rows()) + var wg sync.WaitGroup + wg.Add(1) + dumpChan := make(chan struct{}) + defer func() { + close(dumpChan) + wg.Wait() + }() + go testkit.DebugDumpOnTimeout(&wg, dumpChan, 20*time.Second) + tk.MustExec("BEGIN") + tk.MustQuery(`select * from t`).Check(testkit.Rows("1 1")) + tk.MustQuery(`select * from tp`).Check(testkit.Rows("2 2")) + //require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/ddl/exchangePartitionAutoID", `pause`)) + alterChan := make(chan error) + go func() { + // WITH VALIDATION is the default + err := tk2.ExecToErr(`alter table tp exchange partition p0 with table t`) + alterChan <- err + }() + waitFor := func(tableName, s string, pos int) { + for { + select { + case alterErr := <-alterChan: + require.Fail(t, "Alter completed unexpectedly", "With error %v", alterErr) + default: + // Alter still running + } + res := tk4.MustQuery(`admin show ddl jobs where db_name = '` + strings.ToLower(dbName) + `' and table_name = '` + tableName + `' and job_type = 'exchange partition'`).Rows() + if len(res) == 1 && res[0][pos] == s { + logutil.BgLogger().Info("Got state", zap.String("State", s)) + break + } + time.Sleep(50 * time.Millisecond) + } + } + waitFor("t", "write only", 4) + tk3.MustExec(`BEGIN`) + tk3.MustExec(`insert into t values (4,"4")`) + tk3.MustContainErrMsg(`insert into t values (1000004,"1000004")`, "[table:1748]Found a row not matching the given partition set") + tk.MustExec(`insert into t values (5,"5")`) + // This should fail the alter table! + tk.MustExec(`insert into t values (1000005,"1000005")`) + + // MDL will block the alter to not continue until all clients + // are in StateWriteOnly, which tk is blocking until it commits + tk.MustExec(`COMMIT`) + //require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/ddl/exchangePartitionAutoID")) + waitFor("t", "rollback done", 11) + // MDL will block the alter from finish, tk is in 'rollbacked' schema version + // but the alter is still waiting for tk3 to commit, before continuing + tk.MustExec("BEGIN") + tk.MustExec(`insert into t values (1000006,"1000006")`) + tk.MustExec(`insert into t values (6,"6")`) + tk3.MustExec(`insert into t values (7,"7")`) + tk3.MustContainErrMsg(`insert into t values (1000007,"1000007")`, + "[table:1748]Found a row not matching the given partition set") + tk3.MustExec("COMMIT") + require.ErrorContains(t, <-alterChan, + "[ddl:1737]Found a row that does not match the partition") + tk3.MustExec(`BEGIN`) + tk.MustQuery(`select * from t`).Sort().Check(testkit.Rows( + "1 1", "1000005 1000005", "1000006 1000006", "5 5", "6 6")) + tk.MustQuery(`select * from tp`).Sort().Check(testkit.Rows("2 2")) + tk3.MustQuery(`select * from t`).Sort().Check(testkit.Rows( + "1 1", "1000005 1000005", "4 4", "5 5", "7 7")) + tk3.MustQuery(`select * from tp`).Sort().Check(testkit.Rows("2 2")) + tk.MustContainErrMsg(`insert into t values (7,"7")`, + "[kv:1062]Duplicate entry '7' for key 't.PRIMARY'") + tk.MustExec(`insert into t values (8,"8")`) + tk.MustExec(`insert into t values (1000008,"1000008")`) + tk.MustExec(`insert into tp values (9,"9")`) + tk.MustExec(`insert into tp values (1000009,"1000009")`) + tk3.MustExec(`insert into t values (10,"10")`) + tk3.MustExec(`insert into t values (1000010,"1000010")`) + + tk3.MustExec(`COMMIT`) + tk.MustQuery(`show create table tp`).Check(testkit.Rows("" + + "tp CREATE TABLE `tp` (\n" + + " `a` int(11) NOT NULL,\n" + + " `b` varchar(255) DEFAULT NULL,\n" + + " PRIMARY KEY (`a`) /*T![clustered_index] CLUSTERED */,\n" + + " KEY `b` (`b`)\n" + + ") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin\n" + + "PARTITION BY RANGE (`a`)\n" + + "(PARTITION `p0` VALUES LESS THAN (1000000),\n" + + " PARTITION `p1M` VALUES LESS THAN (2000000))")) + tk.MustQuery(`show create table t`).Check(testkit.Rows("" + + "t CREATE TABLE `t` (\n" + + " `a` int(11) NOT NULL,\n" + + " `b` varchar(255) DEFAULT NULL,\n" + + " PRIMARY KEY (`a`) /*T![clustered_index] CLUSTERED */,\n" + + " KEY `b` (`b`)\n" + + ") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin")) + tk.MustExec(`commit`) + tk.MustExec(`insert into t values (11,"11")`) + tk.MustExec(`insert into t values (1000011,"1000011")`) + tk.MustExec(`insert into tp values (12,"12")`) + tk.MustExec(`insert into tp values (1000012,"1000012")`) +} + +func TestExchangePartitionMultiTable(t *testing.T) { + logutil.BgLogger().Info("mdl related variable status before bootstrap", zap.Bool("EnableMDL", variable.EnableMDL.Load()), zap.Bool("EnableConcurrentDDL", variable.EnableConcurrentDDL.Load())) + store := testkit.CreateMockStore(t) + tk1 := testkit.NewTestKit(t, store) + logutil.BgLogger().Info("mdl related variable status after bootstrap", zap.Bool("EnableMDL", variable.EnableMDL.Load()), zap.Bool("EnableConcurrentDDL", variable.EnableConcurrentDDL.Load())) + + dbName := "ExchangeMultiTable" + tk1.MustExec(`create schema ` + dbName) + tk1.MustExec(`use ` + dbName) + tk1.MustExec(`set global tidb_enable_metadata_lock = 'ON'`) + tk1.MustExec(`CREATE TABLE t1 (a int)`) + tk1.MustExec(`CREATE TABLE t2 (a int)`) + tk1.MustExec(`CREATE TABLE tp (a int) partition by hash(a) partitions 3`) + tk1.MustExec(`insert into t1 values (0)`) + tk1.MustExec(`insert into t2 values (3)`) + tk1.MustExec(`insert into tp values (6)`) + logutil.BgLogger().Info("mdl related variable status after inserting rows", zap.Bool("EnableMDL", variable.EnableMDL.Load()), zap.Bool("EnableConcurrentDDL", variable.EnableConcurrentDDL.Load())) + + tk2 := testkit.NewTestKit(t, store) + tk2.MustExec(`use ` + dbName) + tk3 := testkit.NewTestKit(t, store) + tk3.MustExec(`use ` + dbName) + tk4 := testkit.NewTestKit(t, store) + tk4.MustExec(`use ` + dbName) + waitFor := func(col int, tableName, s string) { + for { + tk4 := testkit.NewTestKit(t, store) + tk4.MustExec(`use test`) + sql := `admin show ddl jobs where db_name = '` + strings.ToLower(dbName) + `' and table_name = '` + tableName + `' and job_type = 'exchange partition'` + res := tk4.MustQuery(sql).Rows() + if len(res) == 1 && res[0][col] == s { + break + } + logutil.BgLogger().Info("No match", zap.String("sql", sql), zap.String("s", s)) + sql = `admin show ddl jobs` + res = tk4.MustQuery(sql).Rows() + for _, row := range res { + strs := make([]string, 0, len(row)) + for _, c := range row { + strs = append(strs, c.(string)) + } + logutil.BgLogger().Info("admin show ddl jobs", zap.Strings("row", strs)) + } + time.Sleep(100 * time.Millisecond) + } + } + var wg sync.WaitGroup + wg.Add(1) + dumpChan := make(chan struct{}) + defer func() { + close(dumpChan) + wg.Wait() + }() + go testkit.DebugDumpOnTimeout(&wg, dumpChan, 20*time.Second) + alterChan1 := make(chan error) + alterChan2 := make(chan error) + tk3.MustExec(`BEGIN`) + tk3.MustExec(`insert into t1 values (1)`) + tk3.MustExec(`insert into t2 values (2)`) + tk3.MustExec(`insert into tp values (3)`) + //require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/ddl/exchangePartitionAutoID", `pause`)) + go func() { + alterChan1 <- tk1.ExecToErr(`alter table tp exchange partition p0 with table t1`) + }() + waitFor(11, "t1", "running") + go func() { + alterChan2 <- tk2.ExecToErr(`alter table tp exchange partition p0 with table t2`) + }() + waitFor(11, "t2", "queueing") + tk3.MustExec(`rollback`) + logutil.BgLogger().Info("rollback done") + //require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/ddl/exchangePartitionAutoID")) + require.NoError(t, <-alterChan1) + logutil.BgLogger().Info("alter1 done") + err := <-alterChan2 + logutil.BgLogger().Info("alter2 done") + tk3.MustQuery(`select * from t1`).Check(testkit.Rows("6")) + logutil.BgLogger().Info("select t1 done") + tk3.MustQuery(`select * from t2`).Check(testkit.Rows("0")) + logutil.BgLogger().Info("select t2 done") + tk3.MustQuery(`select * from tp`).Check(testkit.Rows("3")) + logutil.BgLogger().Info("select tp done") + require.NoError(t, err) +} diff --git a/ddl/partition.go b/ddl/partition.go index ad5769c9c3c7c..05d345baaa377 100644 --- a/ddl/partition.go +++ b/ddl/partition.go @@ -2032,6 +2032,9 @@ func (w *worker) onExchangeTablePartition(d *ddlCtx, t *meta.Meta, job *model.Jo return ver, errors.Trace(err) } + if job.IsRollingback() { + return rollbackExchangeTablePartition(d, t, job, nt) + } pt, err := getTableInfo(t, ptID, ptSchemaID) if err != nil { if infoschema.ErrDatabaseNotExists.Equal(err) || infoschema.ErrTableNotExists.Equal(err) { @@ -2040,35 +2043,49 @@ func (w *worker) onExchangeTablePartition(d *ddlCtx, t *meta.Meta, job *model.Jo return ver, errors.Trace(err) } - if pt.State != model.StatePublic { - job.State = model.JobStateCancelled - return ver, dbterror.ErrInvalidDDLState.GenWithStack("table %s is not in public, but %s", pt.Name, pt.State) - } - - err = checkExchangePartition(pt, nt) + index, partDef, err := getPartitionDef(pt, partName) if err != nil { - job.State = model.JobStateCancelled return ver, errors.Trace(err) } + if job.SchemaState == model.StateNone { + if pt.State != model.StatePublic { + job.State = model.JobStateCancelled + return ver, dbterror.ErrInvalidDDLState.GenWithStack("table %s is not in public, but %s", pt.Name, pt.State) + } + err = checkExchangePartition(pt, nt) + if err != nil { + job.State = model.JobStateCancelled + return ver, errors.Trace(err) + } - err = checkTableDefCompatible(pt, nt) - if err != nil { - job.State = model.JobStateCancelled - return ver, errors.Trace(err) - } + err = checkTableDefCompatible(pt, nt) + if err != nil { + job.State = model.JobStateCancelled + return ver, errors.Trace(err) + } + + err = checkExchangePartitionPlacementPolicy(t, nt.PlacementPolicyRef, pt.PlacementPolicyRef, partDef.PlacementPolicyRef) + if err != nil { + job.State = model.JobStateCancelled + return ver, errors.Trace(err) + } - index, _, err := getPartitionDef(pt, partName) - if err != nil { - return ver, errors.Trace(err) - } - if nt.ExchangePartitionInfo == nil || !nt.ExchangePartitionInfo.ExchangePartitionFlag { nt.ExchangePartitionInfo = &model.ExchangePartitionInfo{ - ExchangePartitionFlag: true, ExchangePartitionID: ptID, ExchangePartitionDefID: defID, } + // We need an interim schema version, + // so there are no non-matching rows inserted + // into the table using the schema version + // before the exchange is made. + job.SchemaState = model.StateWriteOnly return updateVersionAndTableInfoWithCheck(d, t, job, nt, true) } + // From now on, nt (the non-partitioned table) has + // ExchangePartitionInfo set, meaning it is restricted + // to only allow writes that would match the + // partition to be exchange with. + // So we need to rollback that change, instead of just cancelling. if d.lease > 0 { delayForAsyncCommit() @@ -2077,7 +2094,7 @@ func (w *worker) onExchangeTablePartition(d *ddlCtx, t *meta.Meta, job *model.Jo if withValidation { err = checkExchangePartitionRecordValidation(w, pt, index, ntDbInfo.Name, nt.Name) if err != nil { - job.State = model.JobStateCancelled + job.State = model.JobStateRollingback return ver, errors.Trace(err) } } @@ -2085,19 +2102,11 @@ func (w *worker) onExchangeTablePartition(d *ddlCtx, t *meta.Meta, job *model.Jo // partition table auto IDs. ptAutoIDs, err := t.GetAutoIDAccessors(ptSchemaID, ptID).Get() if err != nil { - job.State = model.JobStateCancelled return ver, errors.Trace(err) } // non-partition table auto IDs. ntAutoIDs, err := t.GetAutoIDAccessors(job.SchemaID, nt.ID).Get() if err != nil { - job.State = model.JobStateCancelled - return ver, errors.Trace(err) - } - - _, partDef, err := getPartitionDef(pt, partName) - if err != nil { - job.State = model.JobStateCancelled return ver, errors.Trace(err) } @@ -2110,35 +2119,32 @@ func (w *worker) onExchangeTablePartition(d *ddlCtx, t *meta.Meta, job *model.Jo } } - // exchange table meta id - partDef.ID, nt.ID = nt.ID, partDef.ID - - err = t.UpdateTable(ptSchemaID, pt) + // Recreate non-partition table meta info, + // by first delete it with the old table id + err = t.DropTableOrView(job.SchemaID, nt.ID) if err != nil { - job.State = model.JobStateCancelled return ver, errors.Trace(err) } - failpoint.Inject("exchangePartitionErr", func(val failpoint.Value) { - if val.(bool) { - job.State = model.JobStateCancelled - failpoint.Return(ver, errors.New("occur an error after updating partition id")) - } - }) + // exchange table meta id + partDef.ID, nt.ID = nt.ID, partDef.ID - // recreate non-partition table meta info - err = t.DropTableOrView(job.SchemaID, partDef.ID) + err = t.UpdateTable(ptSchemaID, pt) if err != nil { - job.State = model.JobStateCancelled return ver, errors.Trace(err) } err = t.CreateTableOrView(job.SchemaID, nt) if err != nil { - job.State = model.JobStateCancelled return ver, errors.Trace(err) } + failpoint.Inject("exchangePartitionErr", func(val failpoint.Value) { + if val.(bool) { + failpoint.Return(ver, errors.New("occur an error after updating partition id")) + } + }) + // Set both tables to the maximum auto IDs between normal table and partitioned table. newAutoIDs := meta.AutoIDGroup{ RowID: mathutil.Max(ptAutoIDs.RowID, ntAutoIDs.RowID), @@ -2147,12 +2153,10 @@ func (w *worker) onExchangeTablePartition(d *ddlCtx, t *meta.Meta, job *model.Jo } err = t.GetAutoIDAccessors(ptSchemaID, pt.ID).Put(newAutoIDs) if err != nil { - job.State = model.JobStateCancelled return ver, errors.Trace(err) } err = t.GetAutoIDAccessors(job.SchemaID, nt.ID).Put(newAutoIDs) if err != nil { - job.State = model.JobStateCancelled return ver, errors.Trace(err) } @@ -2171,23 +2175,15 @@ func (w *worker) onExchangeTablePartition(d *ddlCtx, t *meta.Meta, job *model.Jo } }) - err = checkExchangePartitionPlacementPolicy(t, partDef.PlacementPolicyRef, nt.PlacementPolicyRef) - if err != nil { - job.State = model.JobStateCancelled - return ver, errors.Trace(err) - } - // the follow code is a swap function for rules of two partitions // though partitions has exchanged their ID, swap still take effect - bundles, err := bundlesForExchangeTablePartition(t, job, pt, partDef, nt) + bundles, err := bundlesForExchangeTablePartition(t, pt, partDef, nt) if err != nil { - job.State = model.JobStateCancelled return ver, errors.Trace(err) } if err = infosync.PutRuleBundlesWithDefaultRetry(context.TODO(), bundles); err != nil { - job.State = model.JobStateCancelled return ver, errors.Wrapf(err, "failed to notify PD the placement rules") } @@ -2196,7 +2192,6 @@ func (w *worker) onExchangeTablePartition(d *ddlCtx, t *meta.Meta, job *model.Jo rules, err := infosync.GetLabelRules(context.TODO(), []string{ntrID, ptrID}) if err != nil { - job.State = model.JobStateCancelled return 0, errors.Wrapf(err, "failed to get PD the label rules") } @@ -2223,10 +2218,10 @@ func (w *worker) onExchangeTablePartition(d *ddlCtx, t *meta.Meta, job *model.Jo patch := label.NewRulePatch(setRules, deleteRules) err = infosync.UpdateLabelRules(context.TODO(), patch) if err != nil { - job.State = model.JobStateCancelled return ver, errors.Wrapf(err, "failed to notify PD the label rules") } + job.SchemaState = model.StatePublic nt.ExchangePartitionInfo = nil ver, err = updateVersionAndTableInfoWithCheck(d, t, job, nt, true) if err != nil { @@ -2237,7 +2232,7 @@ func (w *worker) onExchangeTablePartition(d *ddlCtx, t *meta.Meta, job *model.Jo return ver, nil } -func bundlesForExchangeTablePartition(t *meta.Meta, job *model.Job, pt *model.TableInfo, newPar *model.PartitionDefinition, nt *model.TableInfo) ([]*placement.Bundle, error) { +func bundlesForExchangeTablePartition(t *meta.Meta, pt *model.TableInfo, newPar *model.PartitionDefinition, nt *model.TableInfo) ([]*placement.Bundle, error) { bundles := make([]*placement.Bundle, 0, 3) ptBundle, err := placement.NewTableBundle(t, pt) @@ -2337,16 +2332,21 @@ func checkExchangePartitionRecordValidation(w *worker, pt *model.TableInfo, inde return nil } -func checkExchangePartitionPlacementPolicy(t *meta.Meta, ntPlacementPolicyRef *model.PolicyRefInfo, ptPlacementPolicyRef *model.PolicyRefInfo) error { - if ntPlacementPolicyRef == nil && ptPlacementPolicyRef == nil { +func checkExchangePartitionPlacementPolicy(t *meta.Meta, ntPPRef, ptPPRef, partPPRef *model.PolicyRefInfo) error { + partitionPPRef := partPPRef + if partitionPPRef == nil { + partitionPPRef = ptPPRef + } + + if ntPPRef == nil && partitionPPRef == nil { return nil } - if ntPlacementPolicyRef == nil || ptPlacementPolicyRef == nil { + if ntPPRef == nil || partitionPPRef == nil { return dbterror.ErrTablesDifferentMetadata } - ptPlacementPolicyInfo, _ := getPolicyInfo(t, ptPlacementPolicyRef.ID) - ntPlacementPolicyInfo, _ := getPolicyInfo(t, ntPlacementPolicyRef.ID) + ptPlacementPolicyInfo, _ := getPolicyInfo(t, partitionPPRef.ID) + ntPlacementPolicyInfo, _ := getPolicyInfo(t, ntPPRef.ID) if ntPlacementPolicyInfo == nil && ptPlacementPolicyInfo == nil { return nil } diff --git a/ddl/placement_policy_test.go b/ddl/placement_policy_test.go index 85525e816b54e..06ce0421138a2 100644 --- a/ddl/placement_policy_test.go +++ b/ddl/placement_policy_test.go @@ -2068,61 +2068,48 @@ func TestExchangePartitionWithPlacement(t *testing.T) { store, dom := testkit.CreateMockStoreAndDomain(t) // clearAllBundles(t) tk := testkit.NewTestKit(t, store) - tk.MustExec("set @@tidb_enable_exchange_partition=1") tk.MustExec("use test") - tk.MustExec("drop table if exists t1, t2, tp") - tk.MustExec("drop placement policy if exists p1") - tk.MustExec("drop placement policy if exists p2") - tk.MustExec("drop placement policy if exists p3") - - tk.MustExec("create placement policy p1 primary_region='r1' regions='r1'") - defer tk.MustExec("drop placement policy p1") - - tk.MustExec("create placement policy p2 primary_region='r2' regions='r2'") - defer tk.MustExec("drop placement policy p2") - tk.MustExec("create placement policy p3 primary_region='r3' regions='r3'") - defer tk.MustExec("drop placement policy p3") + tk.MustExec("create placement policy pp1 primary_region='r1' regions='r1'") + tk.MustExec("create placement policy pp2 primary_region='r2' regions='r2'") + tk.MustExec("create placement policy pp3 primary_region='r3' regions='r3'") - policy1, ok := dom.InfoSchema().PolicyByName(model.NewCIStr("p1")) + policy1, ok := dom.InfoSchema().PolicyByName(model.NewCIStr("pp1")) require.True(t, ok) - tk.MustExec(`CREATE TABLE t1 (id INT) placement policy p1`) - defer tk.MustExec("drop table t1") - + tk.MustExec(`CREATE TABLE t1 (id INT) placement policy pp1`) tk.MustExec(`CREATE TABLE t2 (id INT)`) - defer tk.MustExec("drop table t2") + tk.MustExec(`CREATE TABLE t3 (id INT) placement policy pp3`) t1, err := dom.InfoSchema().TableByName(model.NewCIStr("test"), model.NewCIStr("t1")) require.NoError(t, err) t1ID := t1.Meta().ID - tk.MustExec(`CREATE TABLE tp (id INT) placement policy p3 PARTITION BY RANGE (id) ( - PARTITION p0 VALUES LESS THAN (100) placement policy p1, - PARTITION p1 VALUES LESS THAN (1000) placement policy p2, - PARTITION p2 VALUES LESS THAN (10000) - );`) - defer tk.MustExec("drop table tp") + tk.MustExec(`CREATE TABLE tp (id INT) placement policy pp3 PARTITION BY RANGE (id) ( + PARTITION p1 VALUES LESS THAN (100) placement policy pp1, + PARTITION p2 VALUES LESS THAN (1000) placement policy pp2, + PARTITION p3 VALUES LESS THAN (10000) + )`) tp, err := dom.InfoSchema().TableByName(model.NewCIStr("test"), model.NewCIStr("tp")) require.NoError(t, err) tpID := tp.Meta().ID par0ID := tp.Meta().Partition.Definitions[0].ID - // exchange par0, t1 - tk.MustExec("alter table tp exchange partition p0 with table t1") + // exchange par1, t1 + tk.MustExec("alter table tp exchange partition p1 with table t1") tk.MustQuery("show create table t1").Check(testkit.Rows("" + "t1 CREATE TABLE `t1` (\n" + " `id` int(11) DEFAULT NULL\n" + - ") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin /*T![placement] PLACEMENT POLICY=`p1` */")) + ") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin /*T![placement] PLACEMENT POLICY=`pp1` */")) tk.MustQuery("show create table tp").Check(testkit.Rows("" + "tp CREATE TABLE `tp` (\n" + " `id` int(11) DEFAULT NULL\n" + - ") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin /*T![placement] PLACEMENT POLICY=`p3` */\n" + + ") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin /*T![placement] PLACEMENT POLICY=`pp3` */\n" + "PARTITION BY RANGE (`id`)\n" + - "(PARTITION `p0` VALUES LESS THAN (100) /*T![placement] PLACEMENT POLICY=`p1` */,\n" + - " PARTITION `p1` VALUES LESS THAN (1000) /*T![placement] PLACEMENT POLICY=`p2` */,\n" + - " PARTITION `p2` VALUES LESS THAN (10000))")) + "(PARTITION `p1` VALUES LESS THAN (100) /*T![placement] PLACEMENT POLICY=`pp1` */,\n" + + " PARTITION `p2` VALUES LESS THAN (1000) /*T![placement] PLACEMENT POLICY=`pp2` */,\n" + + " PARTITION `p3` VALUES LESS THAN (10000))")) tp, err = dom.InfoSchema().TableByName(model.NewCIStr("test"), model.NewCIStr("tp")) require.NoError(t, err) require.Equal(t, tpID, tp.Meta().ID) @@ -2134,11 +2121,31 @@ func TestExchangePartitionWithPlacement(t *testing.T) { require.Equal(t, policy1.ID, t1.Meta().PlacementPolicyRef.ID) checkExistTableBundlesInPD(t, dom, "test", "tp") - // exchange par0, t2 - tk.MustGetErrCode("alter table tp exchange partition p0 with table t2", mysql.ErrTablesDifferentMetadata) + // exchange par2, t1 + tk.MustGetErrCode("alter table tp exchange partition p2 with table t1", mysql.ErrTablesDifferentMetadata) + + // exchange par3, t1 + tk.MustGetErrCode("alter table tp exchange partition p3 with table t1", mysql.ErrTablesDifferentMetadata) // exchange par1, t2 tk.MustGetErrCode("alter table tp exchange partition p1 with table t2", mysql.ErrTablesDifferentMetadata) + + // exchange par2, t2 + tk.MustGetErrCode("alter table tp exchange partition p2 with table t2", mysql.ErrTablesDifferentMetadata) + + // exchange par3, t2 + tk.MustGetErrCode("alter table tp exchange partition p3 with table t2", mysql.ErrTablesDifferentMetadata) + + // exchange par1, t3 + tk.MustGetErrCode("alter table tp exchange partition p1 with table t3", mysql.ErrTablesDifferentMetadata) + + // exchange par2, t3 + tk.MustGetErrCode("alter table tp exchange partition p2 with table t3", mysql.ErrTablesDifferentMetadata) + + // exchange par3, t3 + tk.MustExec("alter table tp exchange partition p3 with table t3") + checkExistTableBundlesInPD(t, dom, "test", "tp") + checkExistTableBundlesInPD(t, dom, "test", "t3") } func TestPDFail(t *testing.T) { diff --git a/ddl/rollingback.go b/ddl/rollingback.go index c6f75442479b6..cba089031b134 100644 --- a/ddl/rollingback.go +++ b/ddl/rollingback.go @@ -253,6 +253,30 @@ func needNotifyAndStopReorgWorker(job *model.Job) bool { return false } +// rollbackExchangeTablePartition will clear the non-partitioned +// table's ExchangePartitionInfo state. +func rollbackExchangeTablePartition(d *ddlCtx, t *meta.Meta, job *model.Job, tblInfo *model.TableInfo) (int64, error) { + tblInfo.ExchangePartitionInfo = nil + job.State = model.JobStateRollbackDone + job.SchemaState = model.StatePublic + return updateVersionAndTableInfo(d, t, job, tblInfo, true) +} + +func rollingbackExchangeTablePartition(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64, err error) { + if job.SchemaState == model.StateNone { + // Nothing is changed + job.State = model.JobStateCancelled + return ver, dbterror.ErrCancelledDDLJob + } + var nt *model.TableInfo + nt, err = GetTableInfoAndCancelFaultJob(t, job, job.SchemaID) + if err != nil { + return ver, errors.Trace(err) + } + ver, err = rollbackExchangeTablePartition(d, t, job, nt) + return ver, errors.Trace(err) +} + func convertAddTablePartitionJob2RollbackJob(d *ddlCtx, t *meta.Meta, job *model.Job, otherwiseErr error, tblInfo *model.TableInfo) (ver int64, err error) { addingDefinitions := tblInfo.Partition.AddingDefinitions partNames := make([]string, 0, len(addingDefinitions)) @@ -373,6 +397,8 @@ func convertJob2RollbackJob(w *worker, d *ddlCtx, t *meta.Meta, job *model.Job) err = rollingbackDropTableOrView(t, job) case model.ActionDropTablePartition: ver, err = rollingbackDropTablePartition(t, job) + case model.ActionExchangeTablePartition: + ver, err = rollingbackExchangeTablePartition(d, t, job) case model.ActionDropSchema: err = rollingbackDropSchema(t, job) case model.ActionRenameIndex: @@ -388,7 +414,7 @@ func convertJob2RollbackJob(w *worker, d *ddlCtx, t *meta.Meta, job *model.Job) model.ActionModifyTableCharsetAndCollate, model.ActionTruncateTablePartition, model.ActionModifySchemaCharsetAndCollate, model.ActionRepairTable, model.ActionModifyTableAutoIdCache, model.ActionAlterIndexVisibility, - model.ActionExchangeTablePartition, model.ActionModifySchemaDefaultPlacement, + model.ActionModifySchemaDefaultPlacement, model.ActionRecoverSchema: ver, err = cancelOnlyNotHandledJob(job, model.StateNone) case model.ActionMultiSchemaChange: diff --git a/executor/insert_common.go b/executor/insert_common.go index 751f83c071eda..5c52695d3ad4c 100644 --- a/executor/insert_common.go +++ b/executor/insert_common.go @@ -675,7 +675,7 @@ func (e *InsertValues) fillRow(ctx context.Context, row []types.Datum, hasValue } tbl := e.Table.Meta() // Handle exchange partition - if tbl.ExchangePartitionInfo != nil && tbl.ExchangePartitionInfo.ExchangePartitionFlag { + if tbl.ExchangePartitionInfo != nil { is := e.ctx.GetDomainInfoSchema().(infoschema.InfoSchema) pt, tableFound := is.TableByID(tbl.ExchangePartitionInfo.ExchangePartitionID) if !tableFound { diff --git a/executor/write.go b/executor/write.go index bc5ec9a292941..5cccfae9e7b3b 100644 --- a/executor/write.go +++ b/executor/write.go @@ -77,7 +77,7 @@ func updateRecord(ctx context.Context, sctx sessionctx.Context, h kv.Handle, old // Handle exchange partition tbl := t.Meta() - if tbl.ExchangePartitionInfo != nil && tbl.ExchangePartitionInfo.ExchangePartitionFlag { + if tbl.ExchangePartitionInfo != nil { is := sctx.GetDomainInfoSchema().(infoschema.InfoSchema) pt, tableFound := is.TableByID(tbl.ExchangePartitionInfo.ExchangePartitionID) if !tableFound { diff --git a/infoschema/builder.go b/infoschema/builder.go index 36101062c1735..c8d36558c3800 100644 --- a/infoschema/builder.go +++ b/infoschema/builder.go @@ -219,6 +219,8 @@ func (b *Builder) ApplyDiff(m *meta.Meta, diff *model.SchemaDiff) ([]int64, erro return b.applyRecoverTable(m, diff) case model.ActionCreateTables: return b.applyCreateTables(m, diff) + case model.ActionExchangeTablePartition: + return b.applyExchangeTablePartition(m, diff) case model.ActionFlashbackCluster: return []int64{-1}, nil default: @@ -285,6 +287,47 @@ func (b *Builder) applyDropTableOrParition(m *meta.Meta, diff *model.SchemaDiff) return tblIDs, nil } +func (b *Builder) applyExchangeTablePartition(m *meta.Meta, diff *model.SchemaDiff) ([]int64, error) { + // The partitioned table is not affected until the last stage + if diff.OldTableID == diff.TableID && diff.OldSchemaID == diff.SchemaID { + return b.applyTableUpdate(m, diff) + } + ntSchemaID := diff.OldSchemaID + ntID := diff.OldTableID + ptSchemaID := diff.SchemaID + ptID := diff.TableID + if len(diff.AffectedOpts) > 0 { + // From old version + ptID = diff.AffectedOpts[0].TableID + ptSchemaID = diff.AffectedOpts[0].SchemaID + } + // The normal table needs to be updated first: + // Just update the tables separately + currDiff := &model.SchemaDiff{ + Version: diff.Version, + TableID: ntID, + SchemaID: ntSchemaID, + } + ntIDs, err := b.applyTableUpdate(m, currDiff) + if err != nil { + return nil, errors.Trace(err) + } + b.markPartitionBundleShouldUpdate(ntID) + // Then the partitioned table + currDiff.TableID = ptID + currDiff.SchemaID = ptSchemaID + ptIDs, err := b.applyTableUpdate(m, currDiff) + if err != nil { + return nil, errors.Trace(err) + } + b.markTableBundleShouldUpdate(ptID) + err = updateAutoIDForExchangePartition(b.store, ptSchemaID, ptID, ntSchemaID, ntID) + if err != nil { + return nil, errors.Trace(err) + } + return append(ptIDs, ntIDs...), nil +} + func (b *Builder) applyRecoverTable(m *meta.Meta, diff *model.SchemaDiff) ([]int64, error) { tblIDs, err := b.applyTableUpdate(m, diff) if err != nil { @@ -352,14 +395,6 @@ func (b *Builder) applyDefaultAction(m *meta.Meta, diff *model.SchemaDiff) ([]in return nil, errors.Trace(err) } tblIDs = append(tblIDs, affectedIDs...) - - if diff.Type == model.ActionExchangeTablePartition { - // handle partition table and table AutoID - err = updateAutoIDForExchangePartition(b.store, affectedDiff.SchemaID, affectedDiff.TableID, diff.SchemaID, diff.TableID) - if err != nil { - return nil, errors.Trace(err) - } - } } return tblIDs, nil @@ -391,7 +426,7 @@ func (b *Builder) applyTableUpdate(m *meta.Meta, diff *model.SchemaDiff) ([]int6 newTableID = diff.TableID case model.ActionDropTable, model.ActionDropView, model.ActionDropSequence: oldTableID = diff.TableID - case model.ActionTruncateTable, model.ActionCreateView, model.ActionExchangeTablePartition: + case model.ActionTruncateTable, model.ActionCreateView: oldTableID = diff.OldTableID newTableID = diff.TableID default: @@ -409,8 +444,6 @@ func (b *Builder) applyTableUpdate(m *meta.Meta, diff *model.SchemaDiff) ([]int6 b.markTableBundleShouldUpdate(newTableID) case model.ActionRecoverTable: b.markTableBundleShouldUpdate(newTableID) - case model.ActionExchangeTablePartition: - b.markPartitionBundleShouldUpdate(newTableID) case model.ActionAlterTablePlacement: b.markTableBundleShouldUpdate(newTableID) } @@ -421,7 +454,6 @@ func (b *Builder) applyTableUpdate(m *meta.Meta, diff *model.SchemaDiff) ([]int6 var allocs autoid.Allocators if tableIDIsValid(oldTableID) { if oldTableID == newTableID && (diff.Type != model.ActionRenameTable && diff.Type != model.ActionRenameTables) && - diff.Type != model.ActionExchangeTablePartition && // For repairing table in TiDB cluster, given 2 normal node and 1 repair node. // For normal node's information schema, repaired table is existed. // For repair node's information schema, repaired table is filtered (couldn't find it in `is`). diff --git a/parser/model/model.go b/parser/model/model.go index 1a93a21323786..b73f10bf9f11c 100644 --- a/parser/model/model.go +++ b/parser/model/model.go @@ -1170,9 +1170,10 @@ func (p PartitionType) String() string { // ExchangePartitionInfo provides exchange partition info. type ExchangePartitionInfo struct { - ExchangePartitionFlag bool `json:"exchange_partition_flag"` ExchangePartitionID int64 `json:"exchange_partition_id"` ExchangePartitionDefID int64 `json:"exchange_partition_def_id"` + // Deprecated, not used + XXXExchangePartitionFlag bool `json:"exchange_partition_flag"` } // PartitionInfo provides table partition info. diff --git a/testkit/testkit.go b/testkit/testkit.go index e4461fee82ac6..4aea07926d04f 100644 --- a/testkit/testkit.go +++ b/testkit/testkit.go @@ -19,6 +19,9 @@ package testkit import ( "context" "fmt" + "log" + "os" + "runtime/pprof" "strings" "sync" "testing" @@ -532,3 +535,35 @@ func (c *RegionProperityClient) SendRequest(ctx context.Context, addr string, re } return c.Client.SendRequest(ctx, addr, req, timeout) } + +// DebugDumpOnTimeout will dump stack traces and possible blockers after given timeout. +// wg is the WaitGroup to mark as done when finished (to avoid runaway goroutines) +// c is the channel that will signal or close to cancel the timeout. +func DebugDumpOnTimeout(wg *sync.WaitGroup, c chan struct{}, d time.Duration) { + select { + case <-time.After(d): + log.Print("Injected timeout, dumping all goroutines:") + _ = pprof.Lookup("goroutine").WriteTo(os.Stdout, 2) + log.Print("dumping all stack traces led to possible block:") + _ = pprof.Lookup("block").WriteTo(os.Stdout, 2) + log.Print("dumping all stack traces holding mutexes:") + _ = pprof.Lookup("mutex").WriteTo(os.Stdout, 2) + log.Print("dumping all stack traces led to creation of new OS threads:") + _ = pprof.Lookup("threadcreate").WriteTo(os.Stdout, 2) + log.Print("Waiting 2 seconds and to see if things changed...") + // Wait 2 seconds and print it again, to see if any progress is made + time.Sleep(2 * time.Second) + log.Print("Injected timeout, dumping all goroutines:") + _ = pprof.Lookup("goroutine").WriteTo(os.Stdout, 2) + log.Print("dumping all stack traces led to possible block:") + _ = pprof.Lookup("block").WriteTo(os.Stdout, 2) + log.Print("dumping all stack traces holding mutexes:") + _ = pprof.Lookup("mutex").WriteTo(os.Stdout, 2) + log.Print("dumping all stack traces led to creation of new OS threads:") + _ = pprof.Lookup("threadcreate").WriteTo(os.Stdout, 2) + panic("Injected timeout") + case <-c: + // Test finished + } + wg.Done() +}