From 2c83950445428f34db22b04cd058626572b5876c Mon Sep 17 00:00:00 2001 From: YangKeao Date: Tue, 12 Sep 2023 13:09:11 +0800 Subject: [PATCH] *: Exchange partition, fix LIST COLUMNs validation as well as NULL validation (#46533) (#46766) close pingcap/tidb#46492 --- ddl/db_partition_test.go | 183 +++++++++++++++++- ddl/ddl.go | 5 + ddl/ddl_api.go | 3 +- ddl/ddl_worker.go | 36 ++-- ddl/failtest/fail_db_test.go | 2 - ddl/metadatalocktest/BUILD.bazel | 3 + ddl/metadatalocktest/mdl_test.go | 209 +++++++++++++++++++++ ddl/partition.go | 205 +++++++++++--------- ddl/placement_policy_test.go | 73 +++---- ddl/rollingback.go | 28 ++- executor/insert_common.go | 2 +- executor/write.go | 2 +- infoschema/builder.go | 56 ++++-- parser/model/model.go | 3 +- planner/core/integration_partition_test.go | 2 +- testkit/testkit.go | 35 ++++ types/parser_driver/value_expr.go | 4 +- 17 files changed, 687 insertions(+), 164 deletions(-) diff --git a/ddl/db_partition_test.go b/ddl/db_partition_test.go index e66dd360bfd3e..1d2fc71d57895 100644 --- a/ddl/db_partition_test.go +++ b/ddl/db_partition_test.go @@ -1590,7 +1590,7 @@ func TestAlterTableTruncatePartitionPreSplitRegion(t *testing.T) { tk.MustExec("drop table if exists t1;") tk.MustExec(`CREATE TABLE t1 (id int, c varchar(128), key c(c)) partition by range (id) ( - partition p0 values less than (10), + partition p0 values less than (10), partition p1 values less than MAXVALUE)`) re := tk.MustQuery("show table t1 regions") rows := re.Rows() @@ -2513,6 +2513,71 @@ func TestExchangePartitionTableCompatiable(t *testing.T) { require.NoError(t, err) } +func TestExchangePartitionValidation(t *testing.T) { + store := testkit.CreateMockStore(t) + tk := testkit.NewTestKit(t, store) + + dbName := "ExchangeValidation" + tk.MustExec(`create schema ` + dbName) + tk.MustExec(`use ` + dbName) + tk.MustExec(`CREATE TABLE t1 ( + d date NOT NULL , + name varchar(10) NOT NULL, + UNIQUE KEY (d,name))`) + + tk.MustExec(`CREATE TABLE t1p ( + d date NOT NULL , + name varchar(10) NOT NULL, + UNIQUE KEY (d,name) + ) + PARTITION BY RANGE COLUMNS(d) + (PARTITION p202307 VALUES LESS THAN ('2023-08-01'), + PARTITION p202308 VALUES LESS THAN ('2023-09-01'), + PARTITION p202309 VALUES LESS THAN ('2023-10-01'), + PARTITION p202310 VALUES LESS THAN ('2023-11-01'), + PARTITION p202311 VALUES LESS THAN ('2023-12-01'), + PARTITION p202312 VALUES LESS THAN ('2024-01-01'), + PARTITION pfuture VALUES LESS THAN (MAXVALUE))`) + + tk.MustExec(`insert into t1 values ("2023-08-06","0000")`) + tk.MustContainErrMsg(`alter table t1p exchange partition p202307 with table t1 with validation`, + "[ddl:1737]Found a row that does not match the partition") + tk.MustExec(`insert into t1 values ("2023-08-06","0001")`) +} + +func TestExchangePartitionPlacementPolicy(t *testing.T) { + store := testkit.CreateMockStore(t) + tk := testkit.NewTestKit(t, store) + + tk.MustExec(`create schema ExchangePartWithPolicy`) + tk.MustExec(`use ExchangePartWithPolicy`) + tk.MustExec(`CREATE PLACEMENT POLICY rule1 FOLLOWERS=1`) + tk.MustExec(`CREATE PLACEMENT POLICY rule2 FOLLOWERS=2`) + tk.MustExec(`CREATE TABLE t1 ( + d date NOT NULL , + name varchar(10) NOT NULL, + UNIQUE KEY (d,name) + ) PLACEMENT POLICY="rule1"`) + + tk.MustExec(`CREATE TABLE t1p ( + d date NOT NULL , + name varchar(10) NOT NULL, + UNIQUE KEY (d,name) + ) PLACEMENT POLICY="rule2" + PARTITION BY RANGE COLUMNS(d) + (PARTITION p202307 VALUES LESS THAN ('2023-08-01'), + PARTITION p202308 VALUES LESS THAN ('2023-09-01'), + PARTITION p202309 VALUES LESS THAN ('2023-10-01'), + PARTITION p202310 VALUES LESS THAN ('2023-11-01'), + PARTITION p202311 VALUES LESS THAN ('2023-12-01'), + PARTITION p202312 VALUES LESS THAN ('2024-01-01'), + PARTITION pfuture VALUES LESS THAN (MAXVALUE))`) + + tk.MustContainErrMsg(`alter table t1p exchange partition p202307 with table t1`, + "[ddl:1736]Tables have different definitions") + tk.MustExec(`insert into t1 values ("2023-08-06","0000")`) +} + func TestExchangePartitionHook(t *testing.T) { store, dom := testkit.CreateMockStoreAndDomain(t) tk := testkit.NewTestKit(t, store) @@ -4693,3 +4758,119 @@ func TestAlterModifyColumnOnPartitionedTableRename(t *testing.T) { tk.MustExec(`create table t (a int, b char) partition by hash (a) partitions 3`) tk.MustContainErrMsg(`alter table t change a c int`, "[planner:1054]Unknown column 'a' in 'expression'") } + +func TestListExchangeValidate(t *testing.T) { + store := testkit.CreateMockStore(t) + tk := testkit.NewTestKit(t, store) + tk.MustExec("create database ListExchange") + tk.MustExec("use ListExchange") + tk.MustExec("create table lcp (id int, create_ts datetime, name varchar(10))\n" + + "partition by list columns (create_ts)\n" + + "(partition p20230829 values in ('2023-08-29'),partition p20230830 values in ('2023-08-30'))") + tk.MustExec(`insert into lcp values (1,'2023-08-29','a')`) + tk.MustExec(`insert into lcp values (2,'2023-08-30','b')`) + tk.MustContainErrMsg(`insert into lcp values (3,'2023-08-31','c')`, + "[table:1526]Table has no partition for value from column_list") + + tk.MustExec(`create table t (id int, create_ts datetime, name varchar(10))`) + tk.MustExec(`insert into t values (3,'2023-08-31','c')`) + + tk.MustContainErrMsg(`alter table lcp EXCHANGE PARTITION p20230829 WITH TABLE t`, + "[ddl:1737]Found a row that does not match the partition") + tk.MustExec(`alter table lcp add partition + (partition p202302 values in ('2023-02-01','2023-02-28',null), + partition p202303 values in ('2023-03-01','2023-03-02','2023-03-31'))`) + tk.MustContainErrMsg(`alter table lcp EXCHANGE PARTITION p202302 WITH TABLE t`, + "[ddl:1737]Found a row that does not match the partition") + tk.MustContainErrMsg(`alter table lcp EXCHANGE PARTITION p202303 WITH TABLE t`, + "[ddl:1737]Found a row that does not match the partition") + tk.MustExec(`truncate table t`) + tk.MustExec(`insert into t values (4,'2023-02-01','d'), (5,'2023-02-28','e'), (6, null, 'f')`) + tk.MustContainErrMsg(`alter table lcp EXCHANGE PARTITION p202303 WITH TABLE t`, + "[ddl:1737]Found a row that does not match the partition") + tk.MustExec(`alter table lcp EXCHANGE PARTITION p202302 WITH TABLE t`) + tk.MustExec(`insert into t values (4,'2023-03-01','d'), (5,'2023-03-02','e'), (6,'2023-03-31','f')`) + tk.MustContainErrMsg(`alter table lcp EXCHANGE PARTITION p202302 WITH TABLE t`, + "[ddl:1737]Found a row that does not match the partition") + tk.MustExec(`alter table lcp EXCHANGE PARTITION p202303 WITH TABLE t`) + + tk.MustExec(`drop table t`) + tk.MustExec(`CREATE TABLE lmcp (d date, name varchar(10), data varchar(255)) + PARTITION BY LIST COLUMNS(d,name) + (partition p3 values IN (('2021-01-01','a'),('2021-01-02','b'),('2021-01-03','c')), + partition p4 values IN (('2021-01-01','b'),(null,'a'),('2021-01-01',null),(null,null)), + partition p2 values IN (('2021-01-01','c'),('2021-01-02','a')), + partition p1 values IN (('2021-01-02','c')))`) + tk.MustExec(`CREATE TABLE t (d date, name varchar(10), data varchar(255))`) + + tk.MustExec(`insert into t values ('2021-01-02', 'c', "OK")`) + tk.MustContainErrMsg(`alter table lmcp EXCHANGE PARTITION p3 WITH TABLE t`, + "[ddl:1737]Found a row that does not match the partition") + tk.MustContainErrMsg(`alter table lmcp EXCHANGE PARTITION p4 WITH TABLE t`, + "[ddl:1737]Found a row that does not match the partition") + tk.MustContainErrMsg(`alter table lmcp EXCHANGE PARTITION p2 WITH TABLE t`, + "[ddl:1737]Found a row that does not match the partition") + tk.MustExec(`alter table lmcp EXCHANGE PARTITION p1 WITH TABLE t`) + + tk.MustExec(`insert into t values ('2021-01-01', 'c', "OK"), ('2021-01-02', 'a', "OK")`) + tk.MustContainErrMsg(`alter table lmcp EXCHANGE PARTITION p3 WITH TABLE t`, + "[ddl:1737]Found a row that does not match the partition") + tk.MustContainErrMsg(`alter table lmcp EXCHANGE PARTITION p4 WITH TABLE t`, + "[ddl:1737]Found a row that does not match the partition") + tk.MustContainErrMsg(`alter table lmcp EXCHANGE PARTITION p1 WITH TABLE t`, + "[ddl:1737]Found a row that does not match the partition") + tk.MustExec(`alter table lmcp EXCHANGE PARTITION p2 WITH TABLE t`) + + tk.MustExec(`insert into t values ('2021-01-01', 'a', "OK"), ('2021-01-02','b', "OK"), ('2021-01-03','c', "OK")`) + tk.MustContainErrMsg(`alter table lmcp EXCHANGE PARTITION p1 WITH TABLE t`, + "[ddl:1737]Found a row that does not match the partition") + tk.MustContainErrMsg(`alter table lmcp EXCHANGE PARTITION p2 WITH TABLE t`, + "[ddl:1737]Found a row that does not match the partition") + tk.MustContainErrMsg(`alter table lmcp EXCHANGE PARTITION p4 WITH TABLE t`, + "[ddl:1737]Found a row that does not match the partition") + tk.MustExec(`alter table lmcp EXCHANGE PARTITION p3 WITH TABLE t`) + + tk.MustExec(`insert into t values ('2021-01-01', 'b', "OK"), ('2021-01-01',null, "OK"), (null,'a', "OK"), (null,null,"OK")`) + tk.MustContainErrMsg(`alter table lmcp EXCHANGE PARTITION p1 WITH TABLE t`, + "[ddl:1737]Found a row that does not match the partition") + tk.MustContainErrMsg(`alter table lmcp EXCHANGE PARTITION p2 WITH TABLE t`, + "[ddl:1737]Found a row that does not match the partition") + tk.MustContainErrMsg(`alter table lmcp EXCHANGE PARTITION p3 WITH TABLE t`, + "[ddl:1737]Found a row that does not match the partition") + tk.MustExec(`alter table lmcp EXCHANGE PARTITION p4 WITH TABLE t`) + + tk.MustExec(`create table lp (a int, data varchar(255)) partition by list (a) (partition p0 values in (0,4), partition pNull values in (null))`) + tk.MustExec(`create table np (a int, data varchar(255))`) + tk.MustExec(`insert into np values (0,"OK"), (4,"OK")`) + tk.MustContainErrMsg(`alter table lp EXCHANGE PARTITION pNull WITH TABLE np`, + "[ddl:1737]Found a row that does not match the partition") + tk.MustExec(`alter table lp EXCHANGE PARTITION p0 WITH TABLE np`) + tk.MustExec(`insert into np values (null,"OK")`) + tk.MustContainErrMsg(`alter table lp EXCHANGE PARTITION p0 WITH TABLE np`, + "[ddl:1737]Found a row that does not match the partition") + tk.MustExec(`alter table lp EXCHANGE PARTITION pNull WITH TABLE np`) + // TODO: Check EXCHANGE with DEFAULT partition!! +} + +func TestRangeExchangeValidate(t *testing.T) { + store := testkit.CreateMockStore(t) + tk := testkit.NewTestKit(t, store) + tk.MustExec("create database RangeExchange") + tk.MustExec("use RangeExchange") + tk.MustExec(`CREATE TABLE t (d date, name varchar(10), data varchar(255))`) + tk.MustExec("create table rcp (d date, name varchar(10), data varchar(255))\n" + + "partition by range columns (d)\n" + + "(partition p20230829 values less than ('2023-08-30'),partition p20230830 values less than ('2023-08-31'))") + tk.MustExec(`insert into rcp values ('2023-08-29', 'a', "OK")`) + tk.MustExec(`insert into rcp values ('2023-08-30', 'b', "OK")`) + tk.MustContainErrMsg(`insert into rcp values ('2023-08-31', 'c', "FAIL")`, + "[table:1526]Table has no partition for value from column_list") + tk.MustExec(`insert into t values ('2023-08-31', 'c', "FAIL")`) + tk.MustContainErrMsg(`alter table rcp EXCHANGE PARTITION p20230829 WITH TABLE t`, + "[ddl:1737]Found a row that does not match the partition") + // TODO: Add test with a RANGE single partition (both normal AND maxvalue!) + // TODO: add test with maxvalue (1, 2, and more partitions) + // TODO: add test not in first partition (both last without maxvalue and also not last with/without maxvalue) +} + +// TODO: check EXCHANGE how it handles null (for all types of partitioning!!!) diff --git a/ddl/ddl.go b/ddl/ddl.go index 8e95c642c41d5..a7412cb047412 100644 --- a/ddl/ddl.go +++ b/ddl/ddl.go @@ -1229,6 +1229,7 @@ func (d *ddl) SwitchMDL(enable bool) error { ctx, cancel := context.WithTimeout(context.Background(), time.Second*30) defer cancel() + logutil.BgLogger().Info("Switch EnableMDL 1", zap.Bool("arg", enable), zap.Bool("value", variable.EnableMDL.Load())) // Disable MDL for test. if enable && !variable.DefTiDBEnableConcurrentDDL { sql := fmt.Sprintf("UPDATE HIGH_PRIORITY %[1]s.%[2]s SET VARIABLE_VALUE = %[4]d WHERE VARIABLE_NAME = '%[3]s'", @@ -1247,6 +1248,7 @@ func (d *ddl) SwitchMDL(enable bool) error { return nil } + logutil.BgLogger().Info("Switch EnableMDL 2", zap.Bool("arg", enable), zap.Bool("value", variable.EnableMDL.Load())) isEnableBefore := variable.EnableMDL.Load() if isEnableBefore == enable { return nil @@ -1268,7 +1270,10 @@ func (d *ddl) SwitchMDL(enable bool) error { return errors.New("please wait for all jobs done") } + logutil.BgLogger().Info("Switch EnableMDL 3", zap.Bool("arg", enable), zap.Bool("value", variable.EnableMDL.Load())) variable.EnableMDL.Store(enable) + logutil.BgLogger().Info("Switch EnableMDL 4", zap.Bool("arg", enable), zap.Bool("value", variable.EnableMDL.Load())) + err = kv.RunInNewTxn(kv.WithInternalSourceType(context.Background(), kv.InternalTxnDDL), d.store, true, func(ctx context.Context, txn kv.Transaction) error { m := meta.NewMeta(txn) oldEnable, _, err := m.GetMetadataLock() diff --git a/ddl/ddl_api.go b/ddl/ddl_api.go index 1df222bd09712..76842cada9a5e 100644 --- a/ddl/ddl_api.go +++ b/ddl/ddl_api.go @@ -4204,7 +4204,6 @@ func checkExchangePartition(pt *model.TableInfo, nt *model.TableInfo) error { return errors.Trace(dbterror.ErrPartitionExchangeForeignKey.GenWithStackByArgs(nt.Name)) } - // NOTE: if nt is temporary table, it should be checked return nil } @@ -6887,7 +6886,7 @@ func checkAndGetColumnsTypeAndValuesMatch(ctx sessionctx.Context, colTypes []typ switch colType.GetType() { case mysql.TypeDate, mysql.TypeDatetime, mysql.TypeDuration: switch vkind { - case types.KindString, types.KindBytes: + case types.KindString, types.KindBytes, types.KindNull: default: return nil, dbterror.ErrWrongTypeColumnValue.GenWithStackByArgs() } diff --git a/ddl/ddl_worker.go b/ddl/ddl_worker.go index 751da33f5803c..e744e467f9650 100644 --- a/ddl/ddl_worker.go +++ b/ddl/ddl_worker.go @@ -1556,24 +1556,28 @@ func updateSchemaVersion(d *ddlCtx, t *meta.Meta, job *model.Job, multiInfos ... diff.OldSchemaID = oldSchemaIDs[0] diff.AffectedOpts = affects case model.ActionExchangeTablePartition: - var ( - ptSchemaID int64 - ptTableID int64 - partName string - withValidation bool - ) - err = job.DecodeArgs(&diff.TableID, &ptSchemaID, &ptTableID, &partName, &withValidation) - if err != nil { - return 0, errors.Trace(err) - } diff.OldTableID = job.TableID - affects := make([]*model.AffectedOption, 1) - affects[0] = &model.AffectedOption{ - SchemaID: ptSchemaID, - TableID: ptTableID, - OldTableID: ptTableID, + diff.OldSchemaID = job.SchemaID + if job.SchemaState != model.StatePublic { + diff.TableID = job.TableID + diff.SchemaID = job.SchemaID + } else { + // Update the partitioned table (it is only done in the last state) + var ( + ptSchemaID int64 + ptTableID int64 + ptDefID int64 // Not needed, will reload the whole table + partName string // Not used + withValidation bool // Not used + ) + // See ddl.ExchangeTablePartition + err = job.DecodeArgs(&ptDefID, &ptSchemaID, &ptTableID, &partName, &withValidation) + if err != nil { + return 0, errors.Trace(err) + } + diff.SchemaID = ptSchemaID + diff.TableID = ptTableID } - diff.AffectedOpts = affects case model.ActionTruncateTablePartition: diff.TableID = job.TableID if len(job.CtxVars) > 0 { diff --git a/ddl/failtest/fail_db_test.go b/ddl/failtest/fail_db_test.go index 4a938e5fd2ad4..6d3280cb481d4 100644 --- a/ddl/failtest/fail_db_test.go +++ b/ddl/failtest/fail_db_test.go @@ -135,8 +135,6 @@ func TestHalfwayCancelOperations(t *testing.T) { tk.MustExec("insert into pt values(1), (3), (5)") tk.MustExec("create table nt(a int)") tk.MustExec("insert into nt values(7)") - tk.MustExec("set @@tidb_enable_exchange_partition=1") - defer tk.MustExec("set @@tidb_enable_exchange_partition=0") err = tk.ExecToErr("alter table pt exchange partition p1 with table nt") require.Error(t, err) diff --git a/ddl/metadatalocktest/BUILD.bazel b/ddl/metadatalocktest/BUILD.bazel index fb169367a7933..413fd554034f5 100644 --- a/ddl/metadatalocktest/BUILD.bazel +++ b/ddl/metadatalocktest/BUILD.bazel @@ -13,10 +13,13 @@ go_test( "//ddl", "//errno", "//server", + "//sessionctx/variable", "//testkit", "//testkit/testsetup", + "//util/logutil", "@com_github_pingcap_failpoint//:failpoint", "@com_github_stretchr_testify//require", "@org_uber_go_goleak//:goleak", + "@org_uber_go_zap//:zap", ], ) diff --git a/ddl/metadatalocktest/mdl_test.go b/ddl/metadatalocktest/mdl_test.go index 2732073251c07..5364809aecebf 100644 --- a/ddl/metadatalocktest/mdl_test.go +++ b/ddl/metadatalocktest/mdl_test.go @@ -18,6 +18,7 @@ package metadatalocktest import ( "fmt" + "strings" "sync" "testing" "time" @@ -25,8 +26,11 @@ import ( "github.com/pingcap/failpoint" mysql "github.com/pingcap/tidb/errno" "github.com/pingcap/tidb/server" + "github.com/pingcap/tidb/sessionctx/variable" "github.com/pingcap/tidb/testkit" + "github.com/pingcap/tidb/util/logutil" "github.com/stretchr/testify/require" + "go.uber.org/zap" ) func TestMDLBasicSelect(t *testing.T) { @@ -1134,3 +1138,208 @@ func TestMDLUpdateEtcdFail(t *testing.T) { tk.MustExec("alter table test.t add column c int") } + +// Tests that require MDL. +// They are here, since they must run without 'featuretag' defined +func TestExchangePartitionStates(t *testing.T) { + store := testkit.CreateMockStore(t) + tk := testkit.NewTestKit(t, store) + dbName := "partSchemaVer" + tk.MustExec("create database " + dbName) + tk.MustExec("use " + dbName) + tk.MustExec(`set @@global.tidb_enable_metadata_lock = ON`) + tk2 := testkit.NewTestKit(t, store) + tk2.MustExec("use " + dbName) + tk3 := testkit.NewTestKit(t, store) + tk3.MustExec("use " + dbName) + tk4 := testkit.NewTestKit(t, store) + tk4.MustExec("use " + dbName) + tk.MustExec(`create table t (a int primary key, b varchar(255), key (b))`) + tk.MustExec(`create table tp (a int primary key, b varchar(255), key (b)) partition by range (a) (partition p0 values less than (1000000), partition p1M values less than (2000000))`) + tk.MustExec(`insert into t values (1, "1")`) + tk.MustExec(`insert into tp values (2, "2")`) + tk.MustExec(`analyze table t,tp`) + tk.MustQuery(`select * from information_schema.global_variables`).Check(testkit.Rows()) + var wg sync.WaitGroup + wg.Add(1) + dumpChan := make(chan struct{}) + defer func() { + close(dumpChan) + wg.Wait() + }() + go testkit.DebugDumpOnTimeout(&wg, dumpChan, 20*time.Second) + tk.MustExec("BEGIN") + tk.MustQuery(`select * from t`).Check(testkit.Rows("1 1")) + tk.MustQuery(`select * from tp`).Check(testkit.Rows("2 2")) + //require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/ddl/exchangePartitionAutoID", `pause`)) + alterChan := make(chan error) + go func() { + // WITH VALIDATION is the default + err := tk2.ExecToErr(`alter table tp exchange partition p0 with table t`) + alterChan <- err + }() + waitFor := func(tableName, s string, pos int) { + for { + select { + case alterErr := <-alterChan: + require.Fail(t, "Alter completed unexpectedly", "With error %v", alterErr) + default: + // Alter still running + } + res := tk4.MustQuery(`admin show ddl jobs where db_name = '` + strings.ToLower(dbName) + `' and table_name = '` + tableName + `' and job_type = 'exchange partition'`).Rows() + if len(res) == 1 && res[0][pos] == s { + logutil.BgLogger().Info("Got state", zap.String("State", s)) + break + } + time.Sleep(50 * time.Millisecond) + } + } + waitFor("t", "write only", 4) + tk3.MustExec(`BEGIN`) + tk3.MustExec(`insert into t values (4,"4")`) + tk3.MustContainErrMsg(`insert into t values (1000004,"1000004")`, "[table:1748]Found a row not matching the given partition set") + tk.MustExec(`insert into t values (5,"5")`) + // This should fail the alter table! + tk.MustExec(`insert into t values (1000005,"1000005")`) + + // MDL will block the alter to not continue until all clients + // are in StateWriteOnly, which tk is blocking until it commits + tk.MustExec(`COMMIT`) + //require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/ddl/exchangePartitionAutoID")) + waitFor("t", "rollback done", 11) + // MDL will block the alter from finish, tk is in 'rollbacked' schema version + // but the alter is still waiting for tk3 to commit, before continuing + tk.MustExec("BEGIN") + tk.MustExec(`insert into t values (1000006,"1000006")`) + tk.MustExec(`insert into t values (6,"6")`) + tk3.MustExec(`insert into t values (7,"7")`) + tk3.MustContainErrMsg(`insert into t values (1000007,"1000007")`, + "[table:1748]Found a row not matching the given partition set") + tk3.MustExec("COMMIT") + require.ErrorContains(t, <-alterChan, + "[ddl:1737]Found a row that does not match the partition") + tk3.MustExec(`BEGIN`) + tk.MustQuery(`select * from t`).Sort().Check(testkit.Rows( + "1 1", "1000005 1000005", "1000006 1000006", "5 5", "6 6")) + tk.MustQuery(`select * from tp`).Sort().Check(testkit.Rows("2 2")) + tk3.MustQuery(`select * from t`).Sort().Check(testkit.Rows( + "1 1", "1000005 1000005", "4 4", "5 5", "7 7")) + tk3.MustQuery(`select * from tp`).Sort().Check(testkit.Rows("2 2")) + tk.MustContainErrMsg(`insert into t values (7,"7")`, + "[kv:1062]Duplicate entry '7' for key 't.PRIMARY'") + tk.MustExec(`insert into t values (8,"8")`) + tk.MustExec(`insert into t values (1000008,"1000008")`) + tk.MustExec(`insert into tp values (9,"9")`) + tk.MustExec(`insert into tp values (1000009,"1000009")`) + tk3.MustExec(`insert into t values (10,"10")`) + tk3.MustExec(`insert into t values (1000010,"1000010")`) + + tk3.MustExec(`COMMIT`) + tk.MustQuery(`show create table tp`).Check(testkit.Rows("" + + "tp CREATE TABLE `tp` (\n" + + " `a` int(11) NOT NULL,\n" + + " `b` varchar(255) DEFAULT NULL,\n" + + " PRIMARY KEY (`a`) /*T![clustered_index] CLUSTERED */,\n" + + " KEY `b` (`b`)\n" + + ") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin\n" + + "PARTITION BY RANGE (`a`)\n" + + "(PARTITION `p0` VALUES LESS THAN (1000000),\n" + + " PARTITION `p1M` VALUES LESS THAN (2000000))")) + tk.MustQuery(`show create table t`).Check(testkit.Rows("" + + "t CREATE TABLE `t` (\n" + + " `a` int(11) NOT NULL,\n" + + " `b` varchar(255) DEFAULT NULL,\n" + + " PRIMARY KEY (`a`) /*T![clustered_index] CLUSTERED */,\n" + + " KEY `b` (`b`)\n" + + ") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin")) + tk.MustExec(`commit`) + tk.MustExec(`insert into t values (11,"11")`) + tk.MustExec(`insert into t values (1000011,"1000011")`) + tk.MustExec(`insert into tp values (12,"12")`) + tk.MustExec(`insert into tp values (1000012,"1000012")`) +} + +func TestExchangePartitionMultiTable(t *testing.T) { + logutil.BgLogger().Info("mdl related variable status before bootstrap", zap.Bool("EnableMDL", variable.EnableMDL.Load()), zap.Bool("EnableConcurrentDDL", variable.EnableConcurrentDDL.Load())) + store := testkit.CreateMockStore(t) + tk1 := testkit.NewTestKit(t, store) + logutil.BgLogger().Info("mdl related variable status after bootstrap", zap.Bool("EnableMDL", variable.EnableMDL.Load()), zap.Bool("EnableConcurrentDDL", variable.EnableConcurrentDDL.Load())) + + dbName := "ExchangeMultiTable" + tk1.MustExec(`create schema ` + dbName) + tk1.MustExec(`use ` + dbName) + tk1.MustExec(`set global tidb_enable_metadata_lock = 'ON'`) + tk1.MustExec(`CREATE TABLE t1 (a int)`) + tk1.MustExec(`CREATE TABLE t2 (a int)`) + tk1.MustExec(`CREATE TABLE tp (a int) partition by hash(a) partitions 3`) + tk1.MustExec(`insert into t1 values (0)`) + tk1.MustExec(`insert into t2 values (3)`) + tk1.MustExec(`insert into tp values (6)`) + logutil.BgLogger().Info("mdl related variable status after inserting rows", zap.Bool("EnableMDL", variable.EnableMDL.Load()), zap.Bool("EnableConcurrentDDL", variable.EnableConcurrentDDL.Load())) + + tk2 := testkit.NewTestKit(t, store) + tk2.MustExec(`use ` + dbName) + tk3 := testkit.NewTestKit(t, store) + tk3.MustExec(`use ` + dbName) + tk4 := testkit.NewTestKit(t, store) + tk4.MustExec(`use ` + dbName) + waitFor := func(col int, tableName, s string) { + for { + tk4 := testkit.NewTestKit(t, store) + tk4.MustExec(`use test`) + sql := `admin show ddl jobs where db_name = '` + strings.ToLower(dbName) + `' and table_name = '` + tableName + `' and job_type = 'exchange partition'` + res := tk4.MustQuery(sql).Rows() + if len(res) == 1 && res[0][col] == s { + break + } + logutil.BgLogger().Info("No match", zap.String("sql", sql), zap.String("s", s)) + sql = `admin show ddl jobs` + res = tk4.MustQuery(sql).Rows() + for _, row := range res { + strs := make([]string, 0, len(row)) + for _, c := range row { + strs = append(strs, c.(string)) + } + logutil.BgLogger().Info("admin show ddl jobs", zap.Strings("row", strs)) + } + time.Sleep(100 * time.Millisecond) + } + } + var wg sync.WaitGroup + wg.Add(1) + dumpChan := make(chan struct{}) + defer func() { + close(dumpChan) + wg.Wait() + }() + go testkit.DebugDumpOnTimeout(&wg, dumpChan, 20*time.Second) + alterChan1 := make(chan error) + alterChan2 := make(chan error) + tk3.MustExec(`BEGIN`) + tk3.MustExec(`insert into t1 values (1)`) + tk3.MustExec(`insert into t2 values (2)`) + tk3.MustExec(`insert into tp values (3)`) + //require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/ddl/exchangePartitionAutoID", `pause`)) + go func() { + alterChan1 <- tk1.ExecToErr(`alter table tp exchange partition p0 with table t1`) + }() + waitFor(11, "t1", "running") + go func() { + alterChan2 <- tk2.ExecToErr(`alter table tp exchange partition p0 with table t2`) + }() + waitFor(11, "t2", "queueing") + tk3.MustExec(`rollback`) + logutil.BgLogger().Info("rollback done") + //require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/ddl/exchangePartitionAutoID")) + require.NoError(t, <-alterChan1) + logutil.BgLogger().Info("alter1 done") + err := <-alterChan2 + logutil.BgLogger().Info("alter2 done") + tk3.MustQuery(`select * from t1`).Check(testkit.Rows("6")) + logutil.BgLogger().Info("select t1 done") + tk3.MustQuery(`select * from t2`).Check(testkit.Rows("0")) + logutil.BgLogger().Info("select t2 done") + tk3.MustQuery(`select * from tp`).Check(testkit.Rows("3")) + logutil.BgLogger().Info("select tp done") + require.NoError(t, err) +} diff --git a/ddl/partition.go b/ddl/partition.go index 8834b888629de..a8d0ae023848c 100644 --- a/ddl/partition.go +++ b/ddl/partition.go @@ -2035,6 +2035,9 @@ func (w *worker) onExchangeTablePartition(d *ddlCtx, t *meta.Meta, job *model.Jo return ver, errors.Trace(err) } + if job.IsRollingback() { + return rollbackExchangeTablePartition(d, t, job, nt) + } pt, err := getTableInfo(t, ptID, ptSchemaID) if err != nil { if infoschema.ErrDatabaseNotExists.Equal(err) || infoschema.ErrTableNotExists.Equal(err) { @@ -2043,35 +2046,49 @@ func (w *worker) onExchangeTablePartition(d *ddlCtx, t *meta.Meta, job *model.Jo return ver, errors.Trace(err) } - if pt.State != model.StatePublic { - job.State = model.JobStateCancelled - return ver, dbterror.ErrInvalidDDLState.GenWithStack("table %s is not in public, but %s", pt.Name, pt.State) - } - - err = checkExchangePartition(pt, nt) + index, partDef, err := getPartitionDef(pt, partName) if err != nil { - job.State = model.JobStateCancelled return ver, errors.Trace(err) } + if job.SchemaState == model.StateNone { + if pt.State != model.StatePublic { + job.State = model.JobStateCancelled + return ver, dbterror.ErrInvalidDDLState.GenWithStack("table %s is not in public, but %s", pt.Name, pt.State) + } + err = checkExchangePartition(pt, nt) + if err != nil { + job.State = model.JobStateCancelled + return ver, errors.Trace(err) + } - err = checkTableDefCompatible(pt, nt) - if err != nil { - job.State = model.JobStateCancelled - return ver, errors.Trace(err) - } + err = checkTableDefCompatible(pt, nt) + if err != nil { + job.State = model.JobStateCancelled + return ver, errors.Trace(err) + } + + err = checkExchangePartitionPlacementPolicy(t, nt.PlacementPolicyRef, pt.PlacementPolicyRef, partDef.PlacementPolicyRef) + if err != nil { + job.State = model.JobStateCancelled + return ver, errors.Trace(err) + } - index, _, err := getPartitionDef(pt, partName) - if err != nil { - return ver, errors.Trace(err) - } - if nt.ExchangePartitionInfo == nil || !nt.ExchangePartitionInfo.ExchangePartitionFlag { nt.ExchangePartitionInfo = &model.ExchangePartitionInfo{ - ExchangePartitionFlag: true, ExchangePartitionID: ptID, ExchangePartitionDefID: defID, } + // We need an interim schema version, + // so there are no non-matching rows inserted + // into the table using the schema version + // before the exchange is made. + job.SchemaState = model.StateWriteOnly return updateVersionAndTableInfoWithCheck(d, t, job, nt, true) } + // From now on, nt (the non-partitioned table) has + // ExchangePartitionInfo set, meaning it is restricted + // to only allow writes that would match the + // partition to be exchange with. + // So we need to rollback that change, instead of just cancelling. if d.lease > 0 { delayForAsyncCommit() @@ -2080,7 +2097,7 @@ func (w *worker) onExchangeTablePartition(d *ddlCtx, t *meta.Meta, job *model.Jo if withValidation { err = checkExchangePartitionRecordValidation(w, pt, index, ntDbInfo.Name, nt.Name) if err != nil { - job.State = model.JobStateCancelled + job.State = model.JobStateRollingback return ver, errors.Trace(err) } } @@ -2088,19 +2105,11 @@ func (w *worker) onExchangeTablePartition(d *ddlCtx, t *meta.Meta, job *model.Jo // partition table auto IDs. ptAutoIDs, err := t.GetAutoIDAccessors(ptSchemaID, ptID).Get() if err != nil { - job.State = model.JobStateCancelled return ver, errors.Trace(err) } // non-partition table auto IDs. ntAutoIDs, err := t.GetAutoIDAccessors(job.SchemaID, nt.ID).Get() if err != nil { - job.State = model.JobStateCancelled - return ver, errors.Trace(err) - } - - _, partDef, err := getPartitionDef(pt, partName) - if err != nil { - job.State = model.JobStateCancelled return ver, errors.Trace(err) } @@ -2113,35 +2122,32 @@ func (w *worker) onExchangeTablePartition(d *ddlCtx, t *meta.Meta, job *model.Jo } } - // exchange table meta id - partDef.ID, nt.ID = nt.ID, partDef.ID - - err = t.UpdateTable(ptSchemaID, pt) + // Recreate non-partition table meta info, + // by first delete it with the old table id + err = t.DropTableOrView(job.SchemaID, nt.ID) if err != nil { - job.State = model.JobStateCancelled return ver, errors.Trace(err) } - failpoint.Inject("exchangePartitionErr", func(val failpoint.Value) { - if val.(bool) { - job.State = model.JobStateCancelled - failpoint.Return(ver, errors.New("occur an error after updating partition id")) - } - }) + // exchange table meta id + partDef.ID, nt.ID = nt.ID, partDef.ID - // recreate non-partition table meta info - err = t.DropTableOrView(job.SchemaID, partDef.ID) + err = t.UpdateTable(ptSchemaID, pt) if err != nil { - job.State = model.JobStateCancelled return ver, errors.Trace(err) } err = t.CreateTableOrView(job.SchemaID, nt) if err != nil { - job.State = model.JobStateCancelled return ver, errors.Trace(err) } + failpoint.Inject("exchangePartitionErr", func(val failpoint.Value) { + if val.(bool) { + failpoint.Return(ver, errors.New("occur an error after updating partition id")) + } + }) + // Set both tables to the maximum auto IDs between normal table and partitioned table. newAutoIDs := meta.AutoIDGroup{ RowID: mathutil.Max(ptAutoIDs.RowID, ntAutoIDs.RowID), @@ -2150,12 +2156,10 @@ func (w *worker) onExchangeTablePartition(d *ddlCtx, t *meta.Meta, job *model.Jo } err = t.GetAutoIDAccessors(ptSchemaID, pt.ID).Put(newAutoIDs) if err != nil { - job.State = model.JobStateCancelled return ver, errors.Trace(err) } err = t.GetAutoIDAccessors(job.SchemaID, nt.ID).Put(newAutoIDs) if err != nil { - job.State = model.JobStateCancelled return ver, errors.Trace(err) } @@ -2174,23 +2178,15 @@ func (w *worker) onExchangeTablePartition(d *ddlCtx, t *meta.Meta, job *model.Jo } }) - err = checkExchangePartitionPlacementPolicy(t, partDef.PlacementPolicyRef, nt.PlacementPolicyRef) - if err != nil { - job.State = model.JobStateCancelled - return ver, errors.Trace(err) - } - // the follow code is a swap function for rules of two partitions // though partitions has exchanged their ID, swap still take effect - bundles, err := bundlesForExchangeTablePartition(t, job, pt, partDef, nt) + bundles, err := bundlesForExchangeTablePartition(t, pt, partDef, nt) if err != nil { - job.State = model.JobStateCancelled return ver, errors.Trace(err) } if err = infosync.PutRuleBundlesWithDefaultRetry(context.TODO(), bundles); err != nil { - job.State = model.JobStateCancelled return ver, errors.Wrapf(err, "failed to notify PD the placement rules") } @@ -2199,7 +2195,6 @@ func (w *worker) onExchangeTablePartition(d *ddlCtx, t *meta.Meta, job *model.Jo rules, err := infosync.GetLabelRules(context.TODO(), []string{ntrID, ptrID}) if err != nil { - job.State = model.JobStateCancelled return 0, errors.Wrapf(err, "failed to get PD the label rules") } @@ -2226,10 +2221,10 @@ func (w *worker) onExchangeTablePartition(d *ddlCtx, t *meta.Meta, job *model.Jo patch := label.NewRulePatch(setRules, deleteRules) err = infosync.UpdateLabelRules(context.TODO(), patch) if err != nil { - job.State = model.JobStateCancelled return ver, errors.Wrapf(err, "failed to notify PD the label rules") } + job.SchemaState = model.StatePublic nt.ExchangePartitionInfo = nil ver, err = updateVersionAndTableInfoWithCheck(d, t, job, nt, true) if err != nil { @@ -2240,7 +2235,7 @@ func (w *worker) onExchangeTablePartition(d *ddlCtx, t *meta.Meta, job *model.Jo return ver, nil } -func bundlesForExchangeTablePartition(t *meta.Meta, job *model.Job, pt *model.TableInfo, newPar *model.PartitionDefinition, nt *model.TableInfo) ([]*placement.Bundle, error) { +func bundlesForExchangeTablePartition(t *meta.Meta, pt *model.TableInfo, newPar *model.PartitionDefinition, nt *model.TableInfo) ([]*placement.Bundle, error) { bundles := make([]*placement.Bundle, 0, 3) ptBundle, err := placement.NewTableBundle(t, pt) @@ -2335,19 +2330,26 @@ func checkExchangePartitionRecordValidation(w *worker, pt *model.TableInfo, inde if rowCount != 0 { return errors.Trace(dbterror.ErrRowDoesNotMatchPartition) } + // Check warnings! + // Is it possible to check how many rows where checked as well? return nil } -func checkExchangePartitionPlacementPolicy(t *meta.Meta, ntPlacementPolicyRef *model.PolicyRefInfo, ptPlacementPolicyRef *model.PolicyRefInfo) error { - if ntPlacementPolicyRef == nil && ptPlacementPolicyRef == nil { +func checkExchangePartitionPlacementPolicy(t *meta.Meta, ntPPRef, ptPPRef, partPPRef *model.PolicyRefInfo) error { + partitionPPRef := partPPRef + if partitionPPRef == nil { + partitionPPRef = ptPPRef + } + + if ntPPRef == nil && partitionPPRef == nil { return nil } - if ntPlacementPolicyRef == nil || ptPlacementPolicyRef == nil { + if ntPPRef == nil || partitionPPRef == nil { return dbterror.ErrTablesDifferentMetadata } - ptPlacementPolicyInfo, _ := getPolicyInfo(t, ptPlacementPolicyRef.ID) - ntPlacementPolicyInfo, _ := getPolicyInfo(t, ntPlacementPolicyRef.ID) + ptPlacementPolicyInfo, _ := getPolicyInfo(t, partitionPPRef.ID) + ntPlacementPolicyInfo, _ := getPolicyInfo(t, ntPPRef.ID) if ntPlacementPolicyInfo == nil && ptPlacementPolicyInfo == nil { return nil } @@ -2370,13 +2372,13 @@ func buildCheckSQLForRangeExprPartition(pi *model.PartitionInfo, index int, sche buf.WriteString("select 1 from %n.%n where ") buf.WriteString(pi.Expr) buf.WriteString(" >= %? limit 1") - paramList = append(paramList, schemaName.L, tableName.L, trimQuotation(pi.Definitions[index].LessThan[0])) + paramList = append(paramList, schemaName.L, tableName.L, driver.UnwrapFromSingleQuotes(pi.Definitions[index].LessThan[0])) return buf.String(), paramList } else if index == len(pi.Definitions)-1 && strings.EqualFold(pi.Definitions[index].LessThan[0], partitionMaxValue) { buf.WriteString("select 1 from %n.%n where ") buf.WriteString(pi.Expr) buf.WriteString(" < %? limit 1") - paramList = append(paramList, schemaName.L, tableName.L, trimQuotation(pi.Definitions[index-1].LessThan[0])) + paramList = append(paramList, schemaName.L, tableName.L, driver.UnwrapFromSingleQuotes(pi.Definitions[index-1].LessThan[0])) return buf.String(), paramList } else { buf.WriteString("select 1 from %n.%n where ") @@ -2384,26 +2386,22 @@ func buildCheckSQLForRangeExprPartition(pi *model.PartitionInfo, index int, sche buf.WriteString(" < %? or ") buf.WriteString(pi.Expr) buf.WriteString(" >= %? limit 1") - paramList = append(paramList, schemaName.L, tableName.L, trimQuotation(pi.Definitions[index-1].LessThan[0]), trimQuotation(pi.Definitions[index].LessThan[0])) + paramList = append(paramList, schemaName.L, tableName.L, driver.UnwrapFromSingleQuotes(pi.Definitions[index-1].LessThan[0]), driver.UnwrapFromSingleQuotes(pi.Definitions[index].LessThan[0])) return buf.String(), paramList } } -func trimQuotation(str string) string { - return strings.Trim(str, "'") -} - func buildCheckSQLForRangeColumnsPartition(pi *model.PartitionInfo, index int, schemaName, tableName model.CIStr) (string, []interface{}) { paramList := make([]interface{}, 0, 6) colName := pi.Columns[0].L if index == 0 { - paramList = append(paramList, schemaName.L, tableName.L, colName, trimQuotation(pi.Definitions[index].LessThan[0])) + paramList = append(paramList, schemaName.L, tableName.L, colName, driver.UnwrapFromSingleQuotes(pi.Definitions[index].LessThan[0])) return "select 1 from %n.%n where %n >= %? limit 1", paramList } else if index == len(pi.Definitions)-1 && strings.EqualFold(pi.Definitions[index].LessThan[0], partitionMaxValue) { - paramList = append(paramList, schemaName.L, tableName.L, colName, trimQuotation(pi.Definitions[index-1].LessThan[0])) + paramList = append(paramList, schemaName.L, tableName.L, colName, driver.UnwrapFromSingleQuotes(pi.Definitions[index-1].LessThan[0])) return "select 1 from %n.%n where %n < %? limit 1", paramList } else { - paramList = append(paramList, schemaName.L, tableName.L, colName, trimQuotation(pi.Definitions[index-1].LessThan[0]), colName, trimQuotation(pi.Definitions[index].LessThan[0])) + paramList = append(paramList, schemaName.L, tableName.L, colName, driver.UnwrapFromSingleQuotes(pi.Definitions[index-1].LessThan[0]), colName, driver.UnwrapFromSingleQuotes(pi.Definitions[index].LessThan[0])) return "select 1 from %n.%n where %n < %? or %n >= %? limit 1", paramList } } @@ -2411,32 +2409,57 @@ func buildCheckSQLForRangeColumnsPartition(pi *model.PartitionInfo, index int, s func buildCheckSQLForListPartition(pi *model.PartitionInfo, index int, schemaName, tableName model.CIStr) (string, []interface{}) { var buf strings.Builder buf.WriteString("select 1 from %n.%n where ") - buf.WriteString(pi.Expr) - buf.WriteString(" not in (%?) limit 1") - inValues := getInValues(pi, index) - - paramList := make([]interface{}, 0, 3) - paramList = append(paramList, schemaName.L, tableName.L, inValues) + buf.WriteString(" not (") + for i, inValue := range pi.Definitions[index].InValues { + if i != 0 { + buf.WriteString(" OR ") + } + // AND has higher priority than OR, so no need for parentheses + for j, val := range inValue { + if j != 0 { + // Should never happen, since there should be no multi-columns, only a single expression :) + buf.WriteString(" AND ") + } + // null-safe compare '<=>' + buf.WriteString(fmt.Sprintf("(%s) <=> %s", pi.Expr, val)) + } + } + buf.WriteString(") limit 1") + paramList := make([]interface{}, 0, 2) + paramList = append(paramList, schemaName.L, tableName.L) return buf.String(), paramList } func buildCheckSQLForListColumnsPartition(pi *model.PartitionInfo, index int, schemaName, tableName model.CIStr) (string, []interface{}) { - colName := pi.Columns[0].L var buf strings.Builder - buf.WriteString("select 1 from %n.%n where %n not in (%?) limit 1") - inValues := getInValues(pi, index) - - paramList := make([]interface{}, 0, 4) - paramList = append(paramList, schemaName.L, tableName.L, colName, inValues) - return buf.String(), paramList -} - -func getInValues(pi *model.PartitionInfo, index int) []string { - inValues := make([]string, 0, len(pi.Definitions[index].InValues)) - for _, inValue := range pi.Definitions[index].InValues { - inValues = append(inValues, inValue...) + // How to find a match? + // (row <=> vals1) OR (row <=> vals2) + // How to find a non-matching row: + // NOT ( (row <=> vals1) OR (row <=> vals2) ... ) + buf.WriteString("select 1 from %n.%n where not (") + colNames := make([]string, 0, len(pi.Columns)) + for i := range pi.Columns { + // TODO: check if there are no proper quoting function for this? + n := "`" + strings.ReplaceAll(pi.Columns[i].O, "`", "``") + "`" + colNames = append(colNames, n) + } + for i, colValues := range pi.Definitions[index].InValues { + if i != 0 { + buf.WriteString(" OR ") + } + // AND has higher priority than OR, so no need for parentheses + for j, val := range colValues { + if j != 0 { + buf.WriteString(" AND ") + } + // null-safe compare '<=>' + buf.WriteString(fmt.Sprintf("%s <=> %s", colNames[j], val)) + } } - return inValues + buf.WriteString(") limit 1") + paramList := make([]interface{}, 0, 2) + paramList = append(paramList, schemaName.L, tableName.L) + return buf.String(), paramList } func checkAddPartitionTooManyPartitions(piDefs uint64) error { diff --git a/ddl/placement_policy_test.go b/ddl/placement_policy_test.go index eeb1e5f5dc0e6..39130d3ef925f 100644 --- a/ddl/placement_policy_test.go +++ b/ddl/placement_policy_test.go @@ -2086,61 +2086,48 @@ func TestExchangePartitionWithPlacement(t *testing.T) { store, dom := testkit.CreateMockStoreAndDomain(t) // clearAllBundles(t) tk := testkit.NewTestKit(t, store) - tk.MustExec("set @@tidb_enable_exchange_partition=1") tk.MustExec("use test") - tk.MustExec("drop table if exists t1, t2, tp") - tk.MustExec("drop placement policy if exists p1") - tk.MustExec("drop placement policy if exists p2") - tk.MustExec("drop placement policy if exists p3") - - tk.MustExec("create placement policy p1 primary_region='r1' regions='r1'") - defer tk.MustExec("drop placement policy p1") - - tk.MustExec("create placement policy p2 primary_region='r2' regions='r2'") - defer tk.MustExec("drop placement policy p2") - tk.MustExec("create placement policy p3 primary_region='r3' regions='r3'") - defer tk.MustExec("drop placement policy p3") + tk.MustExec("create placement policy pp1 primary_region='r1' regions='r1'") + tk.MustExec("create placement policy pp2 primary_region='r2' regions='r2'") + tk.MustExec("create placement policy pp3 primary_region='r3' regions='r3'") - policy1, ok := dom.InfoSchema().PolicyByName(model.NewCIStr("p1")) + policy1, ok := dom.InfoSchema().PolicyByName(model.NewCIStr("pp1")) require.True(t, ok) - tk.MustExec(`CREATE TABLE t1 (id INT) placement policy p1`) - defer tk.MustExec("drop table t1") - + tk.MustExec(`CREATE TABLE t1 (id INT) placement policy pp1`) tk.MustExec(`CREATE TABLE t2 (id INT)`) - defer tk.MustExec("drop table t2") + tk.MustExec(`CREATE TABLE t3 (id INT) placement policy pp3`) t1, err := dom.InfoSchema().TableByName(model.NewCIStr("test"), model.NewCIStr("t1")) require.NoError(t, err) t1ID := t1.Meta().ID - tk.MustExec(`CREATE TABLE tp (id INT) placement policy p3 PARTITION BY RANGE (id) ( - PARTITION p0 VALUES LESS THAN (100) placement policy p1, - PARTITION p1 VALUES LESS THAN (1000) placement policy p2, - PARTITION p2 VALUES LESS THAN (10000) - );`) - defer tk.MustExec("drop table tp") + tk.MustExec(`CREATE TABLE tp (id INT) placement policy pp3 PARTITION BY RANGE (id) ( + PARTITION p1 VALUES LESS THAN (100) placement policy pp1, + PARTITION p2 VALUES LESS THAN (1000) placement policy pp2, + PARTITION p3 VALUES LESS THAN (10000) + )`) tp, err := dom.InfoSchema().TableByName(model.NewCIStr("test"), model.NewCIStr("tp")) require.NoError(t, err) tpID := tp.Meta().ID par0ID := tp.Meta().Partition.Definitions[0].ID - // exchange par0, t1 - tk.MustExec("alter table tp exchange partition p0 with table t1") + // exchange par1, t1 + tk.MustExec("alter table tp exchange partition p1 with table t1") tk.MustQuery("show create table t1").Check(testkit.Rows("" + "t1 CREATE TABLE `t1` (\n" + " `id` int(11) DEFAULT NULL\n" + - ") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin /*T![placement] PLACEMENT POLICY=`p1` */")) + ") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin /*T![placement] PLACEMENT POLICY=`pp1` */")) tk.MustQuery("show create table tp").Check(testkit.Rows("" + "tp CREATE TABLE `tp` (\n" + " `id` int(11) DEFAULT NULL\n" + - ") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin /*T![placement] PLACEMENT POLICY=`p3` */\n" + + ") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin /*T![placement] PLACEMENT POLICY=`pp3` */\n" + "PARTITION BY RANGE (`id`)\n" + - "(PARTITION `p0` VALUES LESS THAN (100) /*T![placement] PLACEMENT POLICY=`p1` */,\n" + - " PARTITION `p1` VALUES LESS THAN (1000) /*T![placement] PLACEMENT POLICY=`p2` */,\n" + - " PARTITION `p2` VALUES LESS THAN (10000))")) + "(PARTITION `p1` VALUES LESS THAN (100) /*T![placement] PLACEMENT POLICY=`pp1` */,\n" + + " PARTITION `p2` VALUES LESS THAN (1000) /*T![placement] PLACEMENT POLICY=`pp2` */,\n" + + " PARTITION `p3` VALUES LESS THAN (10000))")) tp, err = dom.InfoSchema().TableByName(model.NewCIStr("test"), model.NewCIStr("tp")) require.NoError(t, err) require.Equal(t, tpID, tp.Meta().ID) @@ -2152,11 +2139,31 @@ func TestExchangePartitionWithPlacement(t *testing.T) { require.Equal(t, policy1.ID, t1.Meta().PlacementPolicyRef.ID) checkExistTableBundlesInPD(t, dom, "test", "tp") - // exchange par0, t2 - tk.MustGetErrCode("alter table tp exchange partition p0 with table t2", mysql.ErrTablesDifferentMetadata) + // exchange par2, t1 + tk.MustGetErrCode("alter table tp exchange partition p2 with table t1", mysql.ErrTablesDifferentMetadata) + + // exchange par3, t1 + tk.MustGetErrCode("alter table tp exchange partition p3 with table t1", mysql.ErrTablesDifferentMetadata) // exchange par1, t2 tk.MustGetErrCode("alter table tp exchange partition p1 with table t2", mysql.ErrTablesDifferentMetadata) + + // exchange par2, t2 + tk.MustGetErrCode("alter table tp exchange partition p2 with table t2", mysql.ErrTablesDifferentMetadata) + + // exchange par3, t2 + tk.MustGetErrCode("alter table tp exchange partition p3 with table t2", mysql.ErrTablesDifferentMetadata) + + // exchange par1, t3 + tk.MustGetErrCode("alter table tp exchange partition p1 with table t3", mysql.ErrTablesDifferentMetadata) + + // exchange par2, t3 + tk.MustGetErrCode("alter table tp exchange partition p2 with table t3", mysql.ErrTablesDifferentMetadata) + + // exchange par3, t3 + tk.MustExec("alter table tp exchange partition p3 with table t3") + checkExistTableBundlesInPD(t, dom, "test", "tp") + checkExistTableBundlesInPD(t, dom, "test", "t3") } func TestPDFail(t *testing.T) { diff --git a/ddl/rollingback.go b/ddl/rollingback.go index c6f75442479b6..cba089031b134 100644 --- a/ddl/rollingback.go +++ b/ddl/rollingback.go @@ -253,6 +253,30 @@ func needNotifyAndStopReorgWorker(job *model.Job) bool { return false } +// rollbackExchangeTablePartition will clear the non-partitioned +// table's ExchangePartitionInfo state. +func rollbackExchangeTablePartition(d *ddlCtx, t *meta.Meta, job *model.Job, tblInfo *model.TableInfo) (int64, error) { + tblInfo.ExchangePartitionInfo = nil + job.State = model.JobStateRollbackDone + job.SchemaState = model.StatePublic + return updateVersionAndTableInfo(d, t, job, tblInfo, true) +} + +func rollingbackExchangeTablePartition(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64, err error) { + if job.SchemaState == model.StateNone { + // Nothing is changed + job.State = model.JobStateCancelled + return ver, dbterror.ErrCancelledDDLJob + } + var nt *model.TableInfo + nt, err = GetTableInfoAndCancelFaultJob(t, job, job.SchemaID) + if err != nil { + return ver, errors.Trace(err) + } + ver, err = rollbackExchangeTablePartition(d, t, job, nt) + return ver, errors.Trace(err) +} + func convertAddTablePartitionJob2RollbackJob(d *ddlCtx, t *meta.Meta, job *model.Job, otherwiseErr error, tblInfo *model.TableInfo) (ver int64, err error) { addingDefinitions := tblInfo.Partition.AddingDefinitions partNames := make([]string, 0, len(addingDefinitions)) @@ -373,6 +397,8 @@ func convertJob2RollbackJob(w *worker, d *ddlCtx, t *meta.Meta, job *model.Job) err = rollingbackDropTableOrView(t, job) case model.ActionDropTablePartition: ver, err = rollingbackDropTablePartition(t, job) + case model.ActionExchangeTablePartition: + ver, err = rollingbackExchangeTablePartition(d, t, job) case model.ActionDropSchema: err = rollingbackDropSchema(t, job) case model.ActionRenameIndex: @@ -388,7 +414,7 @@ func convertJob2RollbackJob(w *worker, d *ddlCtx, t *meta.Meta, job *model.Job) model.ActionModifyTableCharsetAndCollate, model.ActionTruncateTablePartition, model.ActionModifySchemaCharsetAndCollate, model.ActionRepairTable, model.ActionModifyTableAutoIdCache, model.ActionAlterIndexVisibility, - model.ActionExchangeTablePartition, model.ActionModifySchemaDefaultPlacement, + model.ActionModifySchemaDefaultPlacement, model.ActionRecoverSchema: ver, err = cancelOnlyNotHandledJob(job, model.StateNone) case model.ActionMultiSchemaChange: diff --git a/executor/insert_common.go b/executor/insert_common.go index 751f83c071eda..5c52695d3ad4c 100644 --- a/executor/insert_common.go +++ b/executor/insert_common.go @@ -675,7 +675,7 @@ func (e *InsertValues) fillRow(ctx context.Context, row []types.Datum, hasValue } tbl := e.Table.Meta() // Handle exchange partition - if tbl.ExchangePartitionInfo != nil && tbl.ExchangePartitionInfo.ExchangePartitionFlag { + if tbl.ExchangePartitionInfo != nil { is := e.ctx.GetDomainInfoSchema().(infoschema.InfoSchema) pt, tableFound := is.TableByID(tbl.ExchangePartitionInfo.ExchangePartitionID) if !tableFound { diff --git a/executor/write.go b/executor/write.go index bc5ec9a292941..5cccfae9e7b3b 100644 --- a/executor/write.go +++ b/executor/write.go @@ -77,7 +77,7 @@ func updateRecord(ctx context.Context, sctx sessionctx.Context, h kv.Handle, old // Handle exchange partition tbl := t.Meta() - if tbl.ExchangePartitionInfo != nil && tbl.ExchangePartitionInfo.ExchangePartitionFlag { + if tbl.ExchangePartitionInfo != nil { is := sctx.GetDomainInfoSchema().(infoschema.InfoSchema) pt, tableFound := is.TableByID(tbl.ExchangePartitionInfo.ExchangePartitionID) if !tableFound { diff --git a/infoschema/builder.go b/infoschema/builder.go index 36101062c1735..c8d36558c3800 100644 --- a/infoschema/builder.go +++ b/infoschema/builder.go @@ -219,6 +219,8 @@ func (b *Builder) ApplyDiff(m *meta.Meta, diff *model.SchemaDiff) ([]int64, erro return b.applyRecoverTable(m, diff) case model.ActionCreateTables: return b.applyCreateTables(m, diff) + case model.ActionExchangeTablePartition: + return b.applyExchangeTablePartition(m, diff) case model.ActionFlashbackCluster: return []int64{-1}, nil default: @@ -285,6 +287,47 @@ func (b *Builder) applyDropTableOrParition(m *meta.Meta, diff *model.SchemaDiff) return tblIDs, nil } +func (b *Builder) applyExchangeTablePartition(m *meta.Meta, diff *model.SchemaDiff) ([]int64, error) { + // The partitioned table is not affected until the last stage + if diff.OldTableID == diff.TableID && diff.OldSchemaID == diff.SchemaID { + return b.applyTableUpdate(m, diff) + } + ntSchemaID := diff.OldSchemaID + ntID := diff.OldTableID + ptSchemaID := diff.SchemaID + ptID := diff.TableID + if len(diff.AffectedOpts) > 0 { + // From old version + ptID = diff.AffectedOpts[0].TableID + ptSchemaID = diff.AffectedOpts[0].SchemaID + } + // The normal table needs to be updated first: + // Just update the tables separately + currDiff := &model.SchemaDiff{ + Version: diff.Version, + TableID: ntID, + SchemaID: ntSchemaID, + } + ntIDs, err := b.applyTableUpdate(m, currDiff) + if err != nil { + return nil, errors.Trace(err) + } + b.markPartitionBundleShouldUpdate(ntID) + // Then the partitioned table + currDiff.TableID = ptID + currDiff.SchemaID = ptSchemaID + ptIDs, err := b.applyTableUpdate(m, currDiff) + if err != nil { + return nil, errors.Trace(err) + } + b.markTableBundleShouldUpdate(ptID) + err = updateAutoIDForExchangePartition(b.store, ptSchemaID, ptID, ntSchemaID, ntID) + if err != nil { + return nil, errors.Trace(err) + } + return append(ptIDs, ntIDs...), nil +} + func (b *Builder) applyRecoverTable(m *meta.Meta, diff *model.SchemaDiff) ([]int64, error) { tblIDs, err := b.applyTableUpdate(m, diff) if err != nil { @@ -352,14 +395,6 @@ func (b *Builder) applyDefaultAction(m *meta.Meta, diff *model.SchemaDiff) ([]in return nil, errors.Trace(err) } tblIDs = append(tblIDs, affectedIDs...) - - if diff.Type == model.ActionExchangeTablePartition { - // handle partition table and table AutoID - err = updateAutoIDForExchangePartition(b.store, affectedDiff.SchemaID, affectedDiff.TableID, diff.SchemaID, diff.TableID) - if err != nil { - return nil, errors.Trace(err) - } - } } return tblIDs, nil @@ -391,7 +426,7 @@ func (b *Builder) applyTableUpdate(m *meta.Meta, diff *model.SchemaDiff) ([]int6 newTableID = diff.TableID case model.ActionDropTable, model.ActionDropView, model.ActionDropSequence: oldTableID = diff.TableID - case model.ActionTruncateTable, model.ActionCreateView, model.ActionExchangeTablePartition: + case model.ActionTruncateTable, model.ActionCreateView: oldTableID = diff.OldTableID newTableID = diff.TableID default: @@ -409,8 +444,6 @@ func (b *Builder) applyTableUpdate(m *meta.Meta, diff *model.SchemaDiff) ([]int6 b.markTableBundleShouldUpdate(newTableID) case model.ActionRecoverTable: b.markTableBundleShouldUpdate(newTableID) - case model.ActionExchangeTablePartition: - b.markPartitionBundleShouldUpdate(newTableID) case model.ActionAlterTablePlacement: b.markTableBundleShouldUpdate(newTableID) } @@ -421,7 +454,6 @@ func (b *Builder) applyTableUpdate(m *meta.Meta, diff *model.SchemaDiff) ([]int6 var allocs autoid.Allocators if tableIDIsValid(oldTableID) { if oldTableID == newTableID && (diff.Type != model.ActionRenameTable && diff.Type != model.ActionRenameTables) && - diff.Type != model.ActionExchangeTablePartition && // For repairing table in TiDB cluster, given 2 normal node and 1 repair node. // For normal node's information schema, repaired table is existed. // For repair node's information schema, repaired table is filtered (couldn't find it in `is`). diff --git a/parser/model/model.go b/parser/model/model.go index 8a4b611eb8ca5..89db1169ebc9e 100644 --- a/parser/model/model.go +++ b/parser/model/model.go @@ -1185,9 +1185,10 @@ func (p PartitionType) String() string { // ExchangePartitionInfo provides exchange partition info. type ExchangePartitionInfo struct { - ExchangePartitionFlag bool `json:"exchange_partition_flag"` ExchangePartitionID int64 `json:"exchange_partition_id"` ExchangePartitionDefID int64 `json:"exchange_partition_def_id"` + // Deprecated, not used + XXXExchangePartitionFlag bool `json:"exchange_partition_flag"` } // PartitionInfo provides table partition info. diff --git a/planner/core/integration_partition_test.go b/planner/core/integration_partition_test.go index 9c397cf9fb3f3..1b9785d3a45b6 100644 --- a/planner/core/integration_partition_test.go +++ b/planner/core/integration_partition_test.go @@ -1149,7 +1149,7 @@ func TestRangeColumnsMultiColumn(t *testing.T) { tk.MustGetErrCode(`create table t (a int, b datetime, c varchar(255)) partition by range columns (a,b,c)`+ `(partition p0 values less than (NULL,NULL,NULL))`, - errno.ErrWrongTypeColumnValue) + errno.ErrParse) tk.MustGetErrCode(`create table t (a int, b datetime, c varchar(255)) partition by range columns (a,b,c)`+ `(partition p1 values less than (`+strconv.FormatInt(math.MinInt32-1, 10)+`,'0000-00-00',""))`, errno.ErrWrongTypeColumnValue) diff --git a/testkit/testkit.go b/testkit/testkit.go index e4461fee82ac6..4aea07926d04f 100644 --- a/testkit/testkit.go +++ b/testkit/testkit.go @@ -19,6 +19,9 @@ package testkit import ( "context" "fmt" + "log" + "os" + "runtime/pprof" "strings" "sync" "testing" @@ -532,3 +535,35 @@ func (c *RegionProperityClient) SendRequest(ctx context.Context, addr string, re } return c.Client.SendRequest(ctx, addr, req, timeout) } + +// DebugDumpOnTimeout will dump stack traces and possible blockers after given timeout. +// wg is the WaitGroup to mark as done when finished (to avoid runaway goroutines) +// c is the channel that will signal or close to cancel the timeout. +func DebugDumpOnTimeout(wg *sync.WaitGroup, c chan struct{}, d time.Duration) { + select { + case <-time.After(d): + log.Print("Injected timeout, dumping all goroutines:") + _ = pprof.Lookup("goroutine").WriteTo(os.Stdout, 2) + log.Print("dumping all stack traces led to possible block:") + _ = pprof.Lookup("block").WriteTo(os.Stdout, 2) + log.Print("dumping all stack traces holding mutexes:") + _ = pprof.Lookup("mutex").WriteTo(os.Stdout, 2) + log.Print("dumping all stack traces led to creation of new OS threads:") + _ = pprof.Lookup("threadcreate").WriteTo(os.Stdout, 2) + log.Print("Waiting 2 seconds and to see if things changed...") + // Wait 2 seconds and print it again, to see if any progress is made + time.Sleep(2 * time.Second) + log.Print("Injected timeout, dumping all goroutines:") + _ = pprof.Lookup("goroutine").WriteTo(os.Stdout, 2) + log.Print("dumping all stack traces led to possible block:") + _ = pprof.Lookup("block").WriteTo(os.Stdout, 2) + log.Print("dumping all stack traces holding mutexes:") + _ = pprof.Lookup("mutex").WriteTo(os.Stdout, 2) + log.Print("dumping all stack traces led to creation of new OS threads:") + _ = pprof.Lookup("threadcreate").WriteTo(os.Stdout, 2) + panic("Injected timeout") + case <-c: + // Test finished + } + wg.Done() +} diff --git a/types/parser_driver/value_expr.go b/types/parser_driver/value_expr.go index 5a3fcf1b8ef8a..b100f1302e3a6 100644 --- a/types/parser_driver/value_expr.go +++ b/types/parser_driver/value_expr.go @@ -183,7 +183,7 @@ func (n *ValueExpr) Format(w io.Writer) { } // WrapInSingleQuotes escapes single quotes and backslashs -// and adds single quotes arond the string +// and adds single quotes around the string func WrapInSingleQuotes(inStr string) string { s := strings.ReplaceAll(inStr, "\\", "\\\\") s = strings.ReplaceAll(s, `'`, `''`) @@ -191,7 +191,7 @@ func WrapInSingleQuotes(inStr string) string { } // UnwrapFromSingleQuotes the reverse of WrapInSingleQuotes -// but also allows non single quoted strings +// but also allows non-single quoted strings func UnwrapFromSingleQuotes(inStr string) string { if len(inStr) < 2 || inStr[:1] != "'" || inStr[len(inStr)-1:] != "'" { return inStr