diff --git a/executor/executor_test.go b/executor/executor_test.go index ba3b9f934e7a8..6ef3a2ba6fa86 100644 --- a/executor/executor_test.go +++ b/executor/executor_test.go @@ -8218,3 +8218,79 @@ func (s *testSerialSuite) TestDeadlockTable(c *C) { id2+"/2022-06-11 02:03:04.987654/1/203////201", )) } + +func (s testSerialSuite) TestExprBlackListForEnum(c *C) { + tk := testkit.NewTestKit(c, s.store) + + tk.MustExec("use test") + tk.MustExec("drop table if exists t;") + tk.MustExec("create table t(a enum('a','b','c'), b enum('a','b','c'), c int, index idx(b,a));") + tk.MustExec("insert into t values(1,1,1),(2,2,2),(3,3,3);") + + checkFuncPushDown := func(rows [][]interface{}, keyWord string) bool { + for _, line := range rows { + // Agg/Expr push down + if line[2].(string) == "cop[tikv]" && strings.Contains(line[4].(string), keyWord) { + return true + } + // access index + if line[2].(string) == "cop[tikv]" && strings.Contains(line[3].(string), keyWord) { + return true + } + } + return false + } + + // Test agg(enum) push down + tk.MustExec("insert into mysql.expr_pushdown_blacklist(name) values('enum');") + tk.MustExec("admin reload expr_pushdown_blacklist;") + rows := tk.MustQuery("desc format='brief' select /*+ HASH_AGG() */ max(a) from t;").Rows() + c.Assert(checkFuncPushDown(rows, "max"), IsFalse) + rows = tk.MustQuery("desc format='brief' select /*+ STREAM_AGG() */ max(a) from t;").Rows() + c.Assert(checkFuncPushDown(rows, "max"), IsFalse) + + tk.MustExec("delete from mysql.expr_pushdown_blacklist;") + tk.MustExec("admin reload expr_pushdown_blacklist;") + rows = tk.MustQuery("desc format='brief' select /*+ HASH_AGG() */ max(a) from t;").Rows() + c.Assert(checkFuncPushDown(rows, "max"), IsTrue) + rows = tk.MustQuery("desc format='brief' select /*+ STREAM_AGG() */ max(a) from t;").Rows() + c.Assert(checkFuncPushDown(rows, "max"), IsTrue) + + // Test expr(enum) push down + tk.MustExec("insert into mysql.expr_pushdown_blacklist(name) values('enum');") + tk.MustExec("admin reload expr_pushdown_blacklist;") + rows = tk.MustQuery("desc format='brief' select * from t where a + b;").Rows() + c.Assert(checkFuncPushDown(rows, "plus"), IsFalse) + rows = tk.MustQuery("desc format='brief' select * from t where a + b;").Rows() + c.Assert(checkFuncPushDown(rows, "plus"), IsFalse) + + tk.MustExec("delete from mysql.expr_pushdown_blacklist;") + tk.MustExec("admin reload expr_pushdown_blacklist;") + rows = tk.MustQuery("desc format='brief' select * from t where a + b;").Rows() + c.Assert(checkFuncPushDown(rows, "plus"), IsTrue) + rows = tk.MustQuery("desc format='brief' select * from t where a + b;").Rows() + c.Assert(checkFuncPushDown(rows, "plus"), IsTrue) + + // Test enum index + tk.MustExec("insert into mysql.expr_pushdown_blacklist(name) values('enum');") + tk.MustExec("admin reload expr_pushdown_blacklist;") + rows = tk.MustQuery("desc format='brief' select * from t where b = 1;").Rows() + c.Assert(checkFuncPushDown(rows, "index:idx(b)"), IsFalse) + rows = tk.MustQuery("desc format='brief' select * from t where b = 'a';").Rows() + c.Assert(checkFuncPushDown(rows, "index:idx(b)"), IsFalse) + rows = tk.MustQuery("desc format='brief' select * from t where b > 1;").Rows() + c.Assert(checkFuncPushDown(rows, "index:idx(b)"), IsFalse) + rows = tk.MustQuery("desc format='brief' select * from t where b > 'a';").Rows() + c.Assert(checkFuncPushDown(rows, "index:idx(b)"), IsFalse) + + tk.MustExec("delete from mysql.expr_pushdown_blacklist;") + tk.MustExec("admin reload expr_pushdown_blacklist;") + rows = tk.MustQuery("desc format='brief' select * from t where b = 1 and a = 1;").Rows() + c.Assert(checkFuncPushDown(rows, "index:idx(b, a)"), IsTrue) + rows = tk.MustQuery("desc format='brief' select * from t where b = 'a' and a = 'a';").Rows() + c.Assert(checkFuncPushDown(rows, "index:idx(b, a)"), IsTrue) + rows = tk.MustQuery("desc format='brief' select * from t where b = 1 and a > 1;").Rows() + c.Assert(checkFuncPushDown(rows, "index:idx(b, a)"), IsTrue) + rows = tk.MustQuery("desc format='brief' select * from t where b = 1 and a > 'a'").Rows() + c.Assert(checkFuncPushDown(rows, "index:idx(b, a)"), IsTrue) +} diff --git a/executor/partition_table_test.go b/executor/partition_table_test.go index 8337be660ec0c..cb5414d91815f 100644 --- a/executor/partition_table_test.go +++ b/executor/partition_table_test.go @@ -1328,6 +1328,101 @@ func (s *partitionTableSuite) TestDirectReadingWithAgg(c *C) { } } +func (s *partitionTableSuite) TestIdexMerge(c *C) { + if israce.RaceEnabled { + c.Skip("exhaustive types test, skip race test") + } + + tk := testkit.NewTestKitWithInit(c, s.store) + tk.MustExec("create database test_idx_merge") + tk.MustExec("use test_idx_merge") + tk.MustExec("set @@tidb_partition_prune_mode = 'dynamic'") + + // list partition table + tk.MustExec(`create table tlist(a int, b int, primary key(a) clustered, index idx_b(b)) partition by list(a)( + partition p0 values in (1, 2, 3, 4), + partition p1 values in (5, 6, 7, 8), + partition p2 values in (9, 10, 11, 12));`) + + // range partition table + tk.MustExec(`create table trange(a int, b int, primary key(a) clustered, index idx_b(b)) partition by range(a) ( + partition p0 values less than(300), + partition p1 values less than (500), + partition p2 values less than(1100));`) + + // hash partition table + tk.MustExec(`create table thash(a int, b int, primary key(a) clustered, index idx_b(b)) partition by hash(a) partitions 4;`) + + // regular table + tk.MustExec("create table tregular1(a int, b int, primary key(a) clustered)") + tk.MustExec("create table tregular2(a int, b int, primary key(a) clustered)") + + // generate some random data to be inserted + vals := make([]string, 0, 2000) + for i := 0; i < 2000; i++ { + vals = append(vals, fmt.Sprintf("(%v, %v)", rand.Intn(1100), rand.Intn(2000))) + } + + tk.MustExec("insert ignore into trange values " + strings.Join(vals, ",")) + tk.MustExec("insert ignore into thash values " + strings.Join(vals, ",")) + tk.MustExec("insert ignore into tregular1 values " + strings.Join(vals, ",")) + + vals = make([]string, 0, 2000) + for i := 0; i < 2000; i++ { + vals = append(vals, fmt.Sprintf("(%v, %v)", rand.Intn(12)+1, rand.Intn(20))) + } + + tk.MustExec("insert ignore into tlist values " + strings.Join(vals, ",")) + tk.MustExec("insert ignore into tregular2 values " + strings.Join(vals, ",")) + + // test range partition + for i := 0; i < 100; i++ { + x1 := rand.Intn(1099) + x2 := rand.Intn(1099) + + queryPartition1 := fmt.Sprintf("select /*+ use_index_merge(trange) */ * from trange where a > %v or b < %v;", x1, x2) + queryRegular1 := fmt.Sprintf("select /*+ use_index_merge(tregular1) */ * from tregular1 where a > %v or b < %v;", x1, x2) + c.Assert(tk.HasPlan(queryPartition1, "IndexMerge"), IsTrue) // check if IndexLookUp is used + tk.MustQuery(queryPartition1).Sort().Check(tk.MustQuery(queryRegular1).Sort().Rows()) + + queryPartition2 := fmt.Sprintf("select /*+ use_index_merge(trange) */ * from trange where a > %v or b > %v;", x1, x2) + queryRegular2 := fmt.Sprintf("select /*+ use_index_merge(tregular1) */ * from tregular1 where a > %v or b > %v;", x1, x2) + c.Assert(tk.HasPlan(queryPartition2, "IndexMerge"), IsTrue) // check if IndexLookUp is used + tk.MustQuery(queryPartition2).Sort().Check(tk.MustQuery(queryRegular2).Sort().Rows()) + } + + // test hash partition + for i := 0; i < 100; i++ { + x1 := rand.Intn(1099) + x2 := rand.Intn(1099) + + queryPartition1 := fmt.Sprintf("select /*+ use_index_merge(thash) */ * from thash where a > %v or b < %v;", x1, x2) + queryRegular1 := fmt.Sprintf("select /*+ use_index_merge(tregualr1) */ * from tregular1 where a > %v or b < %v;", x1, x2) + c.Assert(tk.HasPlan(queryPartition1, "IndexMerge"), IsTrue) // check if IndexLookUp is used + tk.MustQuery(queryPartition1).Sort().Check(tk.MustQuery(queryRegular1).Sort().Rows()) + + queryPartition2 := fmt.Sprintf("select /*+ use_index_merge(thash) */ * from thash where a > %v or b > %v;", x1, x2) + queryRegular2 := fmt.Sprintf("select /*+ use_index_merge(tregular1) */ * from tregular1 where a > %v or b > %v;", x1, x2) + c.Assert(tk.HasPlan(queryPartition2, "IndexMerge"), IsTrue) // check if IndexLookUp is used + tk.MustQuery(queryPartition2).Sort().Check(tk.MustQuery(queryRegular2).Sort().Rows()) + } + + // test list partition + for i := 0; i < 100; i++ { + x1 := rand.Intn(12) + 1 + x2 := rand.Intn(12) + 1 + queryPartition1 := fmt.Sprintf("select /*+ use_index_merge(tlist) */ * from tlist where a > %v or b < %v;", x1, x2) + queryRegular1 := fmt.Sprintf("select /*+ use_index_merge(tregular2) */ * from tregular2 where a > %v or b < %v;", x1, x2) + c.Assert(tk.HasPlan(queryPartition1, "IndexMerge"), IsTrue) // check if IndexLookUp is used + tk.MustQuery(queryPartition1).Sort().Check(tk.MustQuery(queryRegular1).Sort().Rows()) + + queryPartition2 := fmt.Sprintf("select /*+ use_index_merge(tlist) */ * from tlist where a > %v or b > %v;", x1, x2) + queryRegular2 := fmt.Sprintf("select /*+ use_index_merge(tregular2) */ * from tregular2 where a > %v or b > %v;", x1, x2) + c.Assert(tk.HasPlan(queryPartition2, "IndexMerge"), IsTrue) // check if IndexLookUp is used + tk.MustQuery(queryPartition2).Sort().Check(tk.MustQuery(queryRegular2).Sort().Rows()) + } +} + func (s *globalIndexSuite) TestGlobalIndexScan(c *C) { tk := testkit.NewTestKitWithInit(c, s.store) tk.MustExec("drop table if exists p") @@ -1357,3 +1452,80 @@ func (s *globalIndexSuite) TestIssue21731(c *C) { tk.MustExec("drop table if exists p, t") tk.MustExec("create table t (a int, b int, unique index idx(a)) partition by list columns(b) (partition p0 values in (1), partition p1 values in (2));") } + +func (s *testSuiteWithData) TestRangePartitionBoundariesEq(c *C) { + tk := testkit.NewTestKit(c, s.store) + + tk.MustExec("SET @@tidb_partition_prune_mode = 'dynamic'") + tk.MustExec("CREATE DATABASE TestRangePartitionBoundaries") + defer tk.MustExec("DROP DATABASE TestRangePartitionBoundaries") + tk.MustExec("USE TestRangePartitionBoundaries") + tk.MustExec("DROP TABLE IF EXISTS t") + tk.MustExec(`CREATE TABLE t +(a INT, b varchar(255)) +PARTITION BY RANGE (a) ( + PARTITION p0 VALUES LESS THAN (1000000), + PARTITION p1 VALUES LESS THAN (2000000), + PARTITION p2 VALUES LESS THAN (3000000)); +`) + + var input []string + var output []testOutput + s.testData.GetTestCases(c, &input, &output) + s.verifyPartitionResult(tk, input, output) +} + +type testOutput struct { + SQL string + Plan []string + Res []string +} + +func (s *testSuiteWithData) TestRangePartitionBoundariesNe(c *C) { + tk := testkit.NewTestKit(c, s.store) + + tk.MustExec("SET @@tidb_partition_prune_mode = 'dynamic'") + tk.MustExec("CREATE DATABASE TestRangePartitionBoundariesNe") + defer tk.MustExec("DROP DATABASE TestRangePartitionBoundariesNe") + tk.MustExec("USE TestRangePartitionBoundariesNe") + tk.MustExec("DROP TABLE IF EXISTS t") + tk.MustExec(`CREATE TABLE t +(a INT, b varchar(255)) +PARTITION BY RANGE (a) ( + PARTITION p0 VALUES LESS THAN (1), + PARTITION p1 VALUES LESS THAN (2), + PARTITION p2 VALUES LESS THAN (3), + PARTITION p3 VALUES LESS THAN (4), + PARTITION p4 VALUES LESS THAN (5), + PARTITION p5 VALUES LESS THAN (6), + PARTITION p6 VALUES LESS THAN (7))`) + + var input []string + var output []testOutput + s.testData.GetTestCases(c, &input, &output) + s.verifyPartitionResult(tk, input, output) +} + +func (s *testSuiteWithData) verifyPartitionResult(tk *testkit.TestKit, input []string, output []testOutput) { + for i, tt := range input { + var isSelect bool = false + if strings.HasPrefix(strings.ToLower(tt), "select ") { + isSelect = true + } + s.testData.OnRecord(func() { + output[i].SQL = tt + if isSelect { + output[i].Plan = s.testData.ConvertRowsToStrings(tk.UsedPartitions(tt).Rows()) + output[i].Res = s.testData.ConvertRowsToStrings(tk.MustQuery(tt).Sort().Rows()) + } else { + // to avoid double execution of INSERT (and INSERT does not return anything) + output[i].Res = nil + output[i].Plan = nil + } + }) + if isSelect { + tk.UsedPartitions(tt).Check(testkit.Rows(output[i].Plan...)) + } + tk.MayQuery(tt).Sort().Check(testkit.Rows(output[i].Res...)) + } +} diff --git a/executor/testdata/executor_suite_in.json b/executor/testdata/executor_suite_in.json index 6abd20c740a80..fff3187717f0a 100644 --- a/executor/testdata/executor_suite_in.json +++ b/executor/testdata/executor_suite_in.json @@ -51,5 +51,96 @@ "select count(*) from t as t1 left join t as t2 on t1.c1 = t2.c1 where t1.c1 != NULL", "select * from t as t1 left join t as t2 on t1.c1 = t2.c1 where t1.c1 is not NULL" ] + }, + { + "name": "TestRangePartitionBoundariesEq", + "cases": [ + "INSERT INTO t VALUES (999998, '999998 Filler ...'), (999999, '999999 Filler ...'), (1000000, '1000000 Filler ...'), (1000001, '1000001 Filler ...'), (1000002, '1000002 Filler ...')", + "INSERT INTO t VALUES (1999998, '1999998 Filler ...'), (1999999, '1999999 Filler ...'), (2000000, '2000000 Filler ...'), (2000001, '2000001 Filler ...'), (2000002, '2000002 Filler ...')", + "INSERT INTO t VALUES (2999998, '2999998 Filler ...'), (2999999, '2999999 Filler ...')", + "INSERT INTO t VALUES (-2147483648, 'MIN_INT filler...'), (0, '0 Filler...')", + "ANALYZE TABLE t", + "SELECT * FROM t WHERE a = -2147483648", + "SELECT * FROM t WHERE a IN (-2147483648)", + "SELECT * FROM t WHERE a = 0", + "SELECT * FROM t WHERE a IN (0)", + "SELECT * FROM t WHERE a = 999998", + "SELECT * FROM t WHERE a IN (999998)", + "SELECT * FROM t WHERE a = 999999", + "SELECT * FROM t WHERE a IN (999999)", + "SELECT * FROM t WHERE a = 1000000", + "SELECT * FROM t WHERE a IN (1000000)", + "SELECT * FROM t WHERE a = 1000001", + "SELECT * FROM t WHERE a IN (1000001)", + "SELECT * FROM t WHERE a = 1000002", + "SELECT * FROM t WHERE a IN (1000002)", + "SELECT * FROM t WHERE a = 3000000", + "SELECT * FROM t WHERE a IN (3000000)", + "SELECT * FROM t WHERE a = 3000001", + "SELECT * FROM t WHERE a IN (3000001)", + "SELECT * FROM t WHERE a IN (-2147483648, -2147483647)", + "SELECT * FROM t WHERE a IN (-2147483647, -2147483646)", + "SELECT * FROM t WHERE a IN (999997, 999998, 999999)", + "SELECT * FROM t WHERE a IN (999998, 999999, 1000000)", + "SELECT * FROM t WHERE a IN (999999, 1000000, 1000001)", + "SELECT * FROM t WHERE a IN (1000000, 1000001, 1000002)", + "SELECT * FROM t WHERE a IN (1999997, 1999998, 1999999)", + "SELECT * FROM t WHERE a IN (1999998, 1999999, 2000000)", + "SELECT * FROM t WHERE a IN (1999999, 2000000, 2000001)", + "SELECT * FROM t WHERE a IN (2000000, 2000001, 2000002)", + "SELECT * FROM t WHERE a IN (2999997, 2999998, 2999999)", + "SELECT * FROM t WHERE a IN (2999998, 2999999, 3000000)", + "SELECT * FROM t WHERE a IN (2999999, 3000000, 3000001)", + "SELECT * FROM t WHERE a IN (3000000, 3000001, 3000002)" + ] + }, + { + "name": "TestRangePartitionBoundariesNe", + "cases": [ + "INSERT INTO t VALUES (0, '0 Filler...')", + "INSERT INTO t VALUES (1, '1 Filler...')", + "INSERT INTO t VALUES (2, '2 Filler...')", + "INSERT INTO t VALUES (3, '3 Filler...')", + "INSERT INTO t VALUES (4, '4 Filler...')", + "INSERT INTO t VALUES (5, '5 Filler...')", + "INSERT INTO t VALUES (6, '6 Filler...')", + "ANALYZE TABLE t", + "SELECT * FROM t WHERE a != -1", + "SELECT * FROM t WHERE 1 = 1 AND a != -1", + "SELECT * FROM t WHERE a NOT IN (-2, -1)", + "SELECT * FROM t WHERE 1 = 0 OR a = -1", + "SELECT * FROM t WHERE a != 0", + "SELECT * FROM t WHERE 1 = 1 AND a != -1 AND a != 0", + "SELECT * FROM t WHERE a NOT IN (-2, -1, 0)", + "SELECT * FROM t WHERE 1 = 0 OR a = -1 OR a = 0", + "SELECT * FROM t WHERE a != 1", + "SELECT * FROM t WHERE 1 = 1 AND a != -1 AND a != 0 AND a != 1", + "SELECT * FROM t WHERE a NOT IN (-2, -1, 0, 1)", + "SELECT * FROM t WHERE 1 = 0 OR a = -1 OR a = 0 OR a = 1", + "SELECT * FROM t WHERE a != 2", + "SELECT * FROM t WHERE 1 = 1 AND a != -1 AND a != 0 AND a != 1 AND a != 2", + "SELECT * FROM t WHERE a NOT IN (-2, -1, 0, 1, 2)", + "SELECT * FROM t WHERE 1 = 0 OR a = -1 OR a = 0 OR a = 1 OR a = 2", + "SELECT * FROM t WHERE a != 3", + "SELECT * FROM t WHERE 1 = 1 AND a != -1 AND a != 0 AND a != 1 AND a != 2 AND a != 3", + "SELECT * FROM t WHERE a NOT IN (-2, -1, 0, 1, 2, 3)", + "SELECT * FROM t WHERE 1 = 0 OR a = -1 OR a = 0 OR a = 1 OR a = 2 OR a = 3", + "SELECT * FROM t WHERE a != 4", + "SELECT * FROM t WHERE 1 = 1 AND a != -1 AND a != 0 AND a != 1 AND a != 2 AND a != 3 AND a != 4", + "SELECT * FROM t WHERE a NOT IN (-2, -1, 0, 1, 2, 3, 4)", + "SELECT * FROM t WHERE 1 = 0 OR a = -1 OR a = 0 OR a = 1 OR a = 2 OR a = 3 OR a = 4", + "SELECT * FROM t WHERE a != 5", + "SELECT * FROM t WHERE 1 = 1 AND a != -1 AND a != 0 AND a != 1 AND a != 2 AND a != 3 AND a != 4 AND a != 5", + "SELECT * FROM t WHERE a NOT IN (-2, -1, 0, 1, 2, 3, 4, 5)", + "SELECT * FROM t WHERE 1 = 0 OR a = -1 OR a = 0 OR a = 1 OR a = 2 OR a = 3 OR a = 4 OR a = 5", + "SELECT * FROM t WHERE a != 6", + "SELECT * FROM t WHERE 1 = 1 AND a != -1 AND a != 0 AND a != 1 AND a != 2 AND a != 3 AND a != 4 AND a != 5 AND a != 6", + "SELECT * FROM t WHERE a NOT IN (-2, -1, 0, 1, 2, 3, 4, 5, 6)", + "SELECT * FROM t WHERE 1 = 0 OR a = -1 OR a = 0 OR a = 1 OR a = 2 OR a = 3 OR a = 4 OR a = 5 OR a = 6", + "SELECT * FROM t WHERE a != 7", + "SELECT * FROM t WHERE 1 = 1 AND a != -1 AND a != 0 AND a != 1 AND a != 2 AND a != 3 AND a != 4 AND a != 5 AND a != 6 AND a != 7", + "SELECT * FROM t WHERE a NOT IN (-2, -1, 0, 1, 2, 3, 4, 5, 6, 7)", + "SELECT * FROM t WHERE 1 = 0 OR a = -1 OR a = 0 OR a = 1 OR a = 2 OR a = 3 OR a = 4 OR a = 5 OR a = 6 OR a = 7" + ] } ] diff --git a/executor/testdata/executor_suite_out.json b/executor/testdata/executor_suite_out.json index 2be3c8ea4894f..caa5c4f948966 100644 --- a/executor/testdata/executor_suite_out.json +++ b/executor/testdata/executor_suite_out.json @@ -598,5 +598,802 @@ ] } ] + }, + { + "Name": "TestRangePartitionBoundariesEq", + "Cases": [ + { + "SQL": "INSERT INTO t VALUES (999998, '999998 Filler ...'), (999999, '999999 Filler ...'), (1000000, '1000000 Filler ...'), (1000001, '1000001 Filler ...'), (1000002, '1000002 Filler ...')", + "Plan": null, + "Res": null + }, + { + "SQL": "INSERT INTO t VALUES (1999998, '1999998 Filler ...'), (1999999, '1999999 Filler ...'), (2000000, '2000000 Filler ...'), (2000001, '2000001 Filler ...'), (2000002, '2000002 Filler ...')", + "Plan": null, + "Res": null + }, + { + "SQL": "INSERT INTO t VALUES (2999998, '2999998 Filler ...'), (2999999, '2999999 Filler ...')", + "Plan": null, + "Res": null + }, + { + "SQL": "INSERT INTO t VALUES (-2147483648, 'MIN_INT filler...'), (0, '0 Filler...')", + "Plan": null, + "Res": null + }, + { + "SQL": "ANALYZE TABLE t", + "Plan": null, + "Res": null + }, + { + "SQL": "SELECT * FROM t WHERE a = -2147483648", + "Plan": [ + "p0" + ], + "Res": [ + "-2147483648 MIN_INT filler..." + ] + }, + { + "SQL": "SELECT * FROM t WHERE a IN (-2147483648)", + "Plan": [ + "p0" + ], + "Res": [ + "-2147483648 MIN_INT filler..." + ] + }, + { + "SQL": "SELECT * FROM t WHERE a = 0", + "Plan": [ + "p0" + ], + "Res": [ + "0 0 Filler..." + ] + }, + { + "SQL": "SELECT * FROM t WHERE a IN (0)", + "Plan": [ + "p0" + ], + "Res": [ + "0 0 Filler..." + ] + }, + { + "SQL": "SELECT * FROM t WHERE a = 999998", + "Plan": [ + "p0" + ], + "Res": [ + "999998 999998 Filler ..." + ] + }, + { + "SQL": "SELECT * FROM t WHERE a IN (999998)", + "Plan": [ + "p0" + ], + "Res": [ + "999998 999998 Filler ..." + ] + }, + { + "SQL": "SELECT * FROM t WHERE a = 999999", + "Plan": [ + "p0" + ], + "Res": [ + "999999 999999 Filler ..." + ] + }, + { + "SQL": "SELECT * FROM t WHERE a IN (999999)", + "Plan": [ + "p0" + ], + "Res": [ + "999999 999999 Filler ..." + ] + }, + { + "SQL": "SELECT * FROM t WHERE a = 1000000", + "Plan": [ + "p1" + ], + "Res": [ + "1000000 1000000 Filler ..." + ] + }, + { + "SQL": "SELECT * FROM t WHERE a IN (1000000)", + "Plan": [ + "p1" + ], + "Res": [ + "1000000 1000000 Filler ..." + ] + }, + { + "SQL": "SELECT * FROM t WHERE a = 1000001", + "Plan": [ + "p1" + ], + "Res": [ + "1000001 1000001 Filler ..." + ] + }, + { + "SQL": "SELECT * FROM t WHERE a IN (1000001)", + "Plan": [ + "p1" + ], + "Res": [ + "1000001 1000001 Filler ..." + ] + }, + { + "SQL": "SELECT * FROM t WHERE a = 1000002", + "Plan": [ + "p1" + ], + "Res": [ + "1000002 1000002 Filler ..." + ] + }, + { + "SQL": "SELECT * FROM t WHERE a IN (1000002)", + "Plan": [ + "p1" + ], + "Res": [ + "1000002 1000002 Filler ..." + ] + }, + { + "SQL": "SELECT * FROM t WHERE a = 3000000", + "Plan": [ + "dual" + ], + "Res": null + }, + { + "SQL": "SELECT * FROM t WHERE a IN (3000000)", + "Plan": [ + "dual" + ], + "Res": null + }, + { + "SQL": "SELECT * FROM t WHERE a = 3000001", + "Plan": [ + "dual" + ], + "Res": null + }, + { + "SQL": "SELECT * FROM t WHERE a IN (3000001)", + "Plan": [ + "dual" + ], + "Res": null + }, + { + "SQL": "SELECT * FROM t WHERE a IN (-2147483648, -2147483647)", + "Plan": [ + "p0" + ], + "Res": [ + "-2147483648 MIN_INT filler..." + ] + }, + { + "SQL": "SELECT * FROM t WHERE a IN (-2147483647, -2147483646)", + "Plan": [ + "p0" + ], + "Res": null + }, + { + "SQL": "SELECT * FROM t WHERE a IN (999997, 999998, 999999)", + "Plan": [ + "p0" + ], + "Res": [ + "999998 999998 Filler ...", + "999999 999999 Filler ..." + ] + }, + { + "SQL": "SELECT * FROM t WHERE a IN (999998, 999999, 1000000)", + "Plan": [ + "p0 p1" + ], + "Res": [ + "1000000 1000000 Filler ...", + "999998 999998 Filler ...", + "999999 999999 Filler ..." + ] + }, + { + "SQL": "SELECT * FROM t WHERE a IN (999999, 1000000, 1000001)", + "Plan": [ + "p0 p1" + ], + "Res": [ + "1000000 1000000 Filler ...", + "1000001 1000001 Filler ...", + "999999 999999 Filler ..." + ] + }, + { + "SQL": "SELECT * FROM t WHERE a IN (1000000, 1000001, 1000002)", + "Plan": [ + "p1" + ], + "Res": [ + "1000000 1000000 Filler ...", + "1000001 1000001 Filler ...", + "1000002 1000002 Filler ..." + ] + }, + { + "SQL": "SELECT * FROM t WHERE a IN (1999997, 1999998, 1999999)", + "Plan": [ + "p1" + ], + "Res": [ + "1999998 1999998 Filler ...", + "1999999 1999999 Filler ..." + ] + }, + { + "SQL": "SELECT * FROM t WHERE a IN (1999998, 1999999, 2000000)", + "Plan": [ + "p1 p2" + ], + "Res": [ + "1999998 1999998 Filler ...", + "1999999 1999999 Filler ...", + "2000000 2000000 Filler ..." + ] + }, + { + "SQL": "SELECT * FROM t WHERE a IN (1999999, 2000000, 2000001)", + "Plan": [ + "p1 p2" + ], + "Res": [ + "1999999 1999999 Filler ...", + "2000000 2000000 Filler ...", + "2000001 2000001 Filler ..." + ] + }, + { + "SQL": "SELECT * FROM t WHERE a IN (2000000, 2000001, 2000002)", + "Plan": [ + "p2" + ], + "Res": [ + "2000000 2000000 Filler ...", + "2000001 2000001 Filler ...", + "2000002 2000002 Filler ..." + ] + }, + { + "SQL": "SELECT * FROM t WHERE a IN (2999997, 2999998, 2999999)", + "Plan": [ + "p2" + ], + "Res": [ + "2999998 2999998 Filler ...", + "2999999 2999999 Filler ..." + ] + }, + { + "SQL": "SELECT * FROM t WHERE a IN (2999998, 2999999, 3000000)", + "Plan": [ + "p2" + ], + "Res": [ + "2999998 2999998 Filler ...", + "2999999 2999999 Filler ..." + ] + }, + { + "SQL": "SELECT * FROM t WHERE a IN (2999999, 3000000, 3000001)", + "Plan": [ + "p2" + ], + "Res": [ + "2999999 2999999 Filler ..." + ] + }, + { + "SQL": "SELECT * FROM t WHERE a IN (3000000, 3000001, 3000002)", + "Plan": [ + "dual" + ], + "Res": null + } + ] + }, + { + "Name": "TestRangePartitionBoundariesNe", + "Cases": [ + { + "SQL": "INSERT INTO t VALUES (0, '0 Filler...')", + "Plan": null, + "Res": null + }, + { + "SQL": "INSERT INTO t VALUES (1, '1 Filler...')", + "Plan": null, + "Res": null + }, + { + "SQL": "INSERT INTO t VALUES (2, '2 Filler...')", + "Plan": null, + "Res": null + }, + { + "SQL": "INSERT INTO t VALUES (3, '3 Filler...')", + "Plan": null, + "Res": null + }, + { + "SQL": "INSERT INTO t VALUES (4, '4 Filler...')", + "Plan": null, + "Res": null + }, + { + "SQL": "INSERT INTO t VALUES (5, '5 Filler...')", + "Plan": null, + "Res": null + }, + { + "SQL": "INSERT INTO t VALUES (6, '6 Filler...')", + "Plan": null, + "Res": null + }, + { + "SQL": "ANALYZE TABLE t", + "Plan": null, + "Res": null + }, + { + "SQL": "SELECT * FROM t WHERE a != -1", + "Plan": [ + "all" + ], + "Res": [ + "0 0 Filler...", + "1 1 Filler...", + "2 2 Filler...", + "3 3 Filler...", + "4 4 Filler...", + "5 5 Filler...", + "6 6 Filler..." + ] + }, + { + "SQL": "SELECT * FROM t WHERE 1 = 1 AND a != -1", + "Plan": [ + "all" + ], + "Res": [ + "0 0 Filler...", + "1 1 Filler...", + "2 2 Filler...", + "3 3 Filler...", + "4 4 Filler...", + "5 5 Filler...", + "6 6 Filler..." + ] + }, + { + "SQL": "SELECT * FROM t WHERE a NOT IN (-2, -1)", + "Plan": [ + "all" + ], + "Res": [ + "0 0 Filler...", + "1 1 Filler...", + "2 2 Filler...", + "3 3 Filler...", + "4 4 Filler...", + "5 5 Filler...", + "6 6 Filler..." + ] + }, + { + "SQL": "SELECT * FROM t WHERE 1 = 0 OR a = -1", + "Plan": [ + "p0" + ], + "Res": null + }, + { + "SQL": "SELECT * FROM t WHERE a != 0", + "Plan": [ + "all" + ], + "Res": [ + "1 1 Filler...", + "2 2 Filler...", + "3 3 Filler...", + "4 4 Filler...", + "5 5 Filler...", + "6 6 Filler..." + ] + }, + { + "SQL": "SELECT * FROM t WHERE 1 = 1 AND a != -1 AND a != 0", + "Plan": [ + "all" + ], + "Res": [ + "1 1 Filler...", + "2 2 Filler...", + "3 3 Filler...", + "4 4 Filler...", + "5 5 Filler...", + "6 6 Filler..." + ] + }, + { + "SQL": "SELECT * FROM t WHERE a NOT IN (-2, -1, 0)", + "Plan": [ + "all" + ], + "Res": [ + "1 1 Filler...", + "2 2 Filler...", + "3 3 Filler...", + "4 4 Filler...", + "5 5 Filler...", + "6 6 Filler..." + ] + }, + { + "SQL": "SELECT * FROM t WHERE 1 = 0 OR a = -1 OR a = 0", + "Plan": [ + "p0" + ], + "Res": [ + "0 0 Filler..." + ] + }, + { + "SQL": "SELECT * FROM t WHERE a != 1", + "Plan": [ + "all" + ], + "Res": [ + "0 0 Filler...", + "2 2 Filler...", + "3 3 Filler...", + "4 4 Filler...", + "5 5 Filler...", + "6 6 Filler..." + ] + }, + { + "SQL": "SELECT * FROM t WHERE 1 = 1 AND a != -1 AND a != 0 AND a != 1", + "Plan": [ + "all" + ], + "Res": [ + "2 2 Filler...", + "3 3 Filler...", + "4 4 Filler...", + "5 5 Filler...", + "6 6 Filler..." + ] + }, + { + "SQL": "SELECT * FROM t WHERE a NOT IN (-2, -1, 0, 1)", + "Plan": [ + "all" + ], + "Res": [ + "2 2 Filler...", + "3 3 Filler...", + "4 4 Filler...", + "5 5 Filler...", + "6 6 Filler..." + ] + }, + { + "SQL": "SELECT * FROM t WHERE 1 = 0 OR a = -1 OR a = 0 OR a = 1", + "Plan": [ + "p0 p1" + ], + "Res": [ + "0 0 Filler...", + "1 1 Filler..." + ] + }, + { + "SQL": "SELECT * FROM t WHERE a != 2", + "Plan": [ + "all" + ], + "Res": [ + "0 0 Filler...", + "1 1 Filler...", + "3 3 Filler...", + "4 4 Filler...", + "5 5 Filler...", + "6 6 Filler..." + ] + }, + { + "SQL": "SELECT * FROM t WHERE 1 = 1 AND a != -1 AND a != 0 AND a != 1 AND a != 2", + "Plan": [ + "all" + ], + "Res": [ + "3 3 Filler...", + "4 4 Filler...", + "5 5 Filler...", + "6 6 Filler..." + ] + }, + { + "SQL": "SELECT * FROM t WHERE a NOT IN (-2, -1, 0, 1, 2)", + "Plan": [ + "all" + ], + "Res": [ + "3 3 Filler...", + "4 4 Filler...", + "5 5 Filler...", + "6 6 Filler..." + ] + }, + { + "SQL": "SELECT * FROM t WHERE 1 = 0 OR a = -1 OR a = 0 OR a = 1 OR a = 2", + "Plan": [ + "p0 p1 p2" + ], + "Res": [ + "0 0 Filler...", + "1 1 Filler...", + "2 2 Filler..." + ] + }, + { + "SQL": "SELECT * FROM t WHERE a != 3", + "Plan": [ + "all" + ], + "Res": [ + "0 0 Filler...", + "1 1 Filler...", + "2 2 Filler...", + "4 4 Filler...", + "5 5 Filler...", + "6 6 Filler..." + ] + }, + { + "SQL": "SELECT * FROM t WHERE 1 = 1 AND a != -1 AND a != 0 AND a != 1 AND a != 2 AND a != 3", + "Plan": [ + "all" + ], + "Res": [ + "4 4 Filler...", + "5 5 Filler...", + "6 6 Filler..." + ] + }, + { + "SQL": "SELECT * FROM t WHERE a NOT IN (-2, -1, 0, 1, 2, 3)", + "Plan": [ + "all" + ], + "Res": [ + "4 4 Filler...", + "5 5 Filler...", + "6 6 Filler..." + ] + }, + { + "SQL": "SELECT * FROM t WHERE 1 = 0 OR a = -1 OR a = 0 OR a = 1 OR a = 2 OR a = 3", + "Plan": [ + "p0 p1 p2 p3" + ], + "Res": [ + "0 0 Filler...", + "1 1 Filler...", + "2 2 Filler...", + "3 3 Filler..." + ] + }, + { + "SQL": "SELECT * FROM t WHERE a != 4", + "Plan": [ + "all" + ], + "Res": [ + "0 0 Filler...", + "1 1 Filler...", + "2 2 Filler...", + "3 3 Filler...", + "5 5 Filler...", + "6 6 Filler..." + ] + }, + { + "SQL": "SELECT * FROM t WHERE 1 = 1 AND a != -1 AND a != 0 AND a != 1 AND a != 2 AND a != 3 AND a != 4", + "Plan": [ + "all" + ], + "Res": [ + "5 5 Filler...", + "6 6 Filler..." + ] + }, + { + "SQL": "SELECT * FROM t WHERE a NOT IN (-2, -1, 0, 1, 2, 3, 4)", + "Plan": [ + "all" + ], + "Res": [ + "5 5 Filler...", + "6 6 Filler..." + ] + }, + { + "SQL": "SELECT * FROM t WHERE 1 = 0 OR a = -1 OR a = 0 OR a = 1 OR a = 2 OR a = 3 OR a = 4", + "Plan": [ + "p0 p1 p2 p3 p4" + ], + "Res": [ + "0 0 Filler...", + "1 1 Filler...", + "2 2 Filler...", + "3 3 Filler...", + "4 4 Filler..." + ] + }, + { + "SQL": "SELECT * FROM t WHERE a != 5", + "Plan": [ + "all" + ], + "Res": [ + "0 0 Filler...", + "1 1 Filler...", + "2 2 Filler...", + "3 3 Filler...", + "4 4 Filler...", + "6 6 Filler..." + ] + }, + { + "SQL": "SELECT * FROM t WHERE 1 = 1 AND a != -1 AND a != 0 AND a != 1 AND a != 2 AND a != 3 AND a != 4 AND a != 5", + "Plan": [ + "all" + ], + "Res": [ + "6 6 Filler..." + ] + }, + { + "SQL": "SELECT * FROM t WHERE a NOT IN (-2, -1, 0, 1, 2, 3, 4, 5)", + "Plan": [ + "all" + ], + "Res": [ + "6 6 Filler..." + ] + }, + { + "SQL": "SELECT * FROM t WHERE 1 = 0 OR a = -1 OR a = 0 OR a = 1 OR a = 2 OR a = 3 OR a = 4 OR a = 5", + "Plan": [ + "p0 p1 p2 p3 p4 p5" + ], + "Res": [ + "0 0 Filler...", + "1 1 Filler...", + "2 2 Filler...", + "3 3 Filler...", + "4 4 Filler...", + "5 5 Filler..." + ] + }, + { + "SQL": "SELECT * FROM t WHERE a != 6", + "Plan": [ + "all" + ], + "Res": [ + "0 0 Filler...", + "1 1 Filler...", + "2 2 Filler...", + "3 3 Filler...", + "4 4 Filler...", + "5 5 Filler..." + ] + }, + { + "SQL": "SELECT * FROM t WHERE 1 = 1 AND a != -1 AND a != 0 AND a != 1 AND a != 2 AND a != 3 AND a != 4 AND a != 5 AND a != 6", + "Plan": [ + "all" + ], + "Res": null + }, + { + "SQL": "SELECT * FROM t WHERE a NOT IN (-2, -1, 0, 1, 2, 3, 4, 5, 6)", + "Plan": [ + "all" + ], + "Res": null + }, + { + "SQL": "SELECT * FROM t WHERE 1 = 0 OR a = -1 OR a = 0 OR a = 1 OR a = 2 OR a = 3 OR a = 4 OR a = 5 OR a = 6", + "Plan": [ + "all" + ], + "Res": [ + "0 0 Filler...", + "1 1 Filler...", + "2 2 Filler...", + "3 3 Filler...", + "4 4 Filler...", + "5 5 Filler...", + "6 6 Filler..." + ] + }, + { + "SQL": "SELECT * FROM t WHERE a != 7", + "Plan": [ + "all" + ], + "Res": [ + "0 0 Filler...", + "1 1 Filler...", + "2 2 Filler...", + "3 3 Filler...", + "4 4 Filler...", + "5 5 Filler...", + "6 6 Filler..." + ] + }, + { + "SQL": "SELECT * FROM t WHERE 1 = 1 AND a != -1 AND a != 0 AND a != 1 AND a != 2 AND a != 3 AND a != 4 AND a != 5 AND a != 6 AND a != 7", + "Plan": [ + "all" + ], + "Res": null + }, + { + "SQL": "SELECT * FROM t WHERE a NOT IN (-2, -1, 0, 1, 2, 3, 4, 5, 6, 7)", + "Plan": [ + "all" + ], + "Res": null + }, + { + "SQL": "SELECT * FROM t WHERE 1 = 0 OR a = -1 OR a = 0 OR a = 1 OR a = 2 OR a = 3 OR a = 4 OR a = 5 OR a = 6 OR a = 7", + "Plan": [ + "all" + ], + "Res": [ + "0 0 Filler...", + "1 1 Filler...", + "2 2 Filler...", + "3 3 Filler...", + "4 4 Filler...", + "5 5 Filler...", + "6 6 Filler..." + ] + } + ] } ] diff --git a/expression/builtin_cast.go b/expression/builtin_cast.go index 0f2d5827c91d9..9d6becab44cbe 100644 --- a/expression/builtin_cast.go +++ b/expression/builtin_cast.go @@ -1804,6 +1804,7 @@ func BuildCastFunction4Union(ctx sessionctx.Context, expr Expression, tp *types. // BuildCastFunction builds a CAST ScalarFunction from the Expression. func BuildCastFunction(ctx sessionctx.Context, expr Expression, tp *types.FieldType) (res Expression) { + expr = TryPushCastIntoControlFunctionForHybridType(ctx, expr, tp) var fc functionClass switch tp.EvalType() { case types.ETInt: @@ -1983,3 +1984,92 @@ func WrapWithCastAsJSON(ctx sessionctx.Context, expr Expression) Expression { } return BuildCastFunction(ctx, expr, tp) } + +// TryPushCastIntoControlFunctionForHybridType try to push cast into control function for Hybrid Type. +// If necessary, it will rebuild control function using changed args. +// When a hybrid type is the output of a control function, the result may be as a numeric type to subsequent calculation +// We should perform the `Cast` operation early to avoid using the wrong type for calculation +// For example, the condition `if(1, e, 'a') = 1`, `if` function will output `e` and compare with `1`. +// If the evaltype is ETString, it will get wrong result. So we can rewrite the condition to +// `IfInt(1, cast(e as int), cast('a' as int)) = 1` to get the correct result. +func TryPushCastIntoControlFunctionForHybridType(ctx sessionctx.Context, expr Expression, tp *types.FieldType) (res Expression) { + sf, ok := expr.(*ScalarFunction) + if !ok { + return expr + } + + var wrapCastFunc func(ctx sessionctx.Context, expr Expression) Expression + switch tp.EvalType() { + case types.ETInt: + wrapCastFunc = WrapWithCastAsInt + case types.ETReal: + wrapCastFunc = WrapWithCastAsReal + default: + return expr + } + + isHybrid := func(ft *types.FieldType) bool { + // todo: compatible with mysql control function using bit type. issue 24725 + return ft.Hybrid() && ft.Tp != mysql.TypeBit + } + + args := sf.GetArgs() + switch sf.FuncName.L { + case ast.If: + if isHybrid(args[1].GetType()) || isHybrid(args[2].GetType()) { + args[1] = wrapCastFunc(ctx, args[1]) + args[2] = wrapCastFunc(ctx, args[2]) + f, err := funcs[ast.If].getFunction(ctx, args) + if err != nil { + return expr + } + sf.RetType, sf.Function = f.getRetTp(), f + return sf + } + case ast.Case: + hasHybrid := false + for i := 0; i < len(args)-1; i += 2 { + hasHybrid = hasHybrid || isHybrid(args[i+1].GetType()) + } + if len(args)%2 == 1 { + hasHybrid = hasHybrid || isHybrid(args[len(args)-1].GetType()) + } + if !hasHybrid { + return expr + } + + for i := 0; i < len(args)-1; i += 2 { + args[i+1] = wrapCastFunc(ctx, args[i+1]) + } + if len(args)%2 == 1 { + args[len(args)-1] = wrapCastFunc(ctx, args[len(args)-1]) + } + f, err := funcs[ast.Case].getFunction(ctx, args) + if err != nil { + return expr + } + sf.RetType, sf.Function = f.getRetTp(), f + return sf + case ast.Elt: + hasHybrid := false + for i := 1; i < len(args); i++ { + hasHybrid = hasHybrid || isHybrid(args[i].GetType()) + } + if !hasHybrid { + return expr + } + + for i := 1; i < len(args); i++ { + args[i] = wrapCastFunc(ctx, args[i]) + } + f, err := funcs[ast.Elt].getFunction(ctx, args) + if err != nil { + return expr + } + sf.RetType, sf.Function = f.getRetTp(), f + return sf + default: + return expr + } + return expr +} diff --git a/expression/expr_to_pb.go b/expression/expr_to_pb.go index 59ff01b73b67c..dc031145d0d95 100644 --- a/expression/expr_to_pb.go +++ b/expression/expr_to_pb.go @@ -211,6 +211,10 @@ func (pc PbConverter) columnToPBExpr(column *Column) *tipb.Expr { switch column.GetType().Tp { case mysql.TypeBit, mysql.TypeSet, mysql.TypeGeometry, mysql.TypeUnspecified: return nil + case mysql.TypeEnum: + if !IsPushDownEnabled("enum", kv.UnSpecified) { + return nil + } } if pc.client.IsRequestTypeSupported(kv.ReqTypeDAG, kv.ReqSubTypeBasic) { diff --git a/expression/integration_test.go b/expression/integration_test.go index 69142c01c3f35..a0fb6fd8b5499 100644 --- a/expression/integration_test.go +++ b/expression/integration_test.go @@ -9435,3 +9435,76 @@ func (s *testIntegrationSuite) TestEnumIndex(c *C) { tk.MustQuery("select /*+ use_index(t,idx) */ col3 from t where col2 = 'b' and col1 is not null;").Check( testkit.Rows("2")) } + +func (s *testIntegrationSuite) TestControlFunctionWithEnumOrSet(c *C) { + defer s.cleanEnv(c) + + // issue 23114 + tk := testkit.NewTestKit(c, s.store) + tk.MustExec("use test") + tk.MustExec("drop table if exists e;") + tk.MustExec("create table e(e enum('c', 'b', 'a'));") + tk.MustExec("insert into e values ('a'),('b'),('a'),('b');") + tk.MustQuery("select e from e where if(e>1, e, e);").Sort().Check( + testkit.Rows("a", "a", "b", "b")) + tk.MustQuery("select e from e where case e when 1 then e else e end;").Sort().Check( + testkit.Rows("a", "a", "b", "b")) + tk.MustQuery("select e from e where case 1 when e then e end;").Check(testkit.Rows()) + + tk.MustQuery("select if(e>1,e,e)='a' from e").Sort().Check( + testkit.Rows("0", "0", "1", "1")) + tk.MustQuery("select if(e>1,e,e)=1 from e").Sort().Check( + testkit.Rows("0", "0", "0", "0")) + // if and if + tk.MustQuery("select if(e>2,e,e) and if(e<=2,e,e) from e;").Sort().Check( + testkit.Rows("1", "1", "1", "1")) + tk.MustQuery("select if(e>2,e,e) and (if(e<3,0,e) or if(e>=2,0,e)) from e;").Sort().Check( + testkit.Rows("0", "0", "1", "1")) + tk.MustQuery("select * from e where if(e>2,e,e) and if(e<=2,e,e);").Sort().Check( + testkit.Rows("a", "a", "b", "b")) + tk.MustQuery("select * from e where if(e>2,e,e) and (if(e<3,0,e) or if(e>=2,0,e));").Sort().Check( + testkit.Rows("a", "a")) + + // issue 24494 + tk.MustExec("drop table if exists t;") + tk.MustExec("create table t(a int,b enum(\"b\",\"y\",\"1\"));") + tk.MustExec("insert into t values(0,\"y\"),(1,\"b\"),(null,null),(2,\"1\");") + tk.MustQuery("SELECT count(*) FROM t where if(a,b ,null);").Check(testkit.Rows("2")) + + tk.MustExec("drop table if exists t;") + tk.MustExec("create table t(a int,b enum(\"b\"),c enum(\"c\"));") + tk.MustExec("insert into t values(1,1,1),(2,1,1),(1,1,1),(2,1,1);") + tk.MustQuery("select a from t where if(a=1,b,c)=\"b\";").Check(testkit.Rows("1", "1")) + tk.MustQuery("select a from t where if(a=1,b,c)=\"c\";").Check(testkit.Rows("2", "2")) + tk.MustQuery("select a from t where if(a=1,b,c)=1;").Sort().Check(testkit.Rows("1", "1", "2", "2")) + tk.MustQuery("select a from t where if(a=1,b,c);").Sort().Check(testkit.Rows("1", "1", "2", "2")) + + tk.MustExec("drop table if exists e;") + tk.MustExec("create table e(e enum('c', 'b', 'a'));") + tk.MustExec("insert into e values(3)") + tk.MustQuery("select elt(1,e) = 'a' from e").Check(testkit.Rows("1")) + tk.MustQuery("select elt(1,e) = 3 from e").Check(testkit.Rows("1")) + tk.MustQuery("select e from e where elt(1,e)").Check(testkit.Rows("a")) + + // test set type + tk.MustExec("drop table if exists s;") + tk.MustExec("create table s(s set('c', 'b', 'a'));") + tk.MustExec("insert into s values ('a'),('b'),('a'),('b');") + tk.MustQuery("select s from s where if(s>1, s, s);").Sort().Check( + testkit.Rows("a", "a", "b", "b")) + tk.MustQuery("select s from s where case s when 1 then s else s end;").Sort().Check( + testkit.Rows("a", "a", "b", "b")) + tk.MustQuery("select s from s where case 1 when s then s end;").Check(testkit.Rows()) + + tk.MustQuery("select if(s>1,s,s)='a' from s").Sort().Check( + testkit.Rows("0", "0", "1", "1")) + tk.MustQuery("select if(s>1,s,s)=4 from s").Sort().Check( + testkit.Rows("0", "0", "1", "1")) + + tk.MustExec("drop table if exists s;") + tk.MustExec("create table s(s set('c', 'b', 'a'));") + tk.MustExec("insert into s values('a')") + tk.MustQuery("select elt(1,s) = 'a' from s").Check(testkit.Rows("1")) + tk.MustQuery("select elt(1,s) = 4 from s").Check(testkit.Rows("1")) + tk.MustQuery("select s from s where elt(1,s)").Check(testkit.Rows("a")) +} diff --git a/kv/fault_injection.go b/kv/fault_injection.go index 218ca9cbd6966..d61685a7f8a71 100644 --- a/kv/fault_injection.go +++ b/kv/fault_injection.go @@ -16,6 +16,8 @@ package kv import ( "context" "sync" + + "github.com/pingcap/tidb/store/tikv" ) // InjectionConfig is used for fault injections for KV components. @@ -64,7 +66,7 @@ func (s *InjectedStore) Begin() (Transaction, error) { } // BeginWithOption creates an injected Transaction with given option. -func (s *InjectedStore) BeginWithOption(option TransactionOption) (Transaction, error) { +func (s *InjectedStore) BeginWithOption(option tikv.StartTSOption) (Transaction, error) { txn, err := s.Storage.BeginWithOption(option) return &InjectedTransaction{ Transaction: txn, diff --git a/kv/fault_injection_test.go b/kv/fault_injection_test.go index 33b6535214b2c..4979dbf4268cd 100644 --- a/kv/fault_injection_test.go +++ b/kv/fault_injection_test.go @@ -19,6 +19,7 @@ import ( . "github.com/pingcap/check" "github.com/pingcap/errors" "github.com/pingcap/parser/terror" + "github.com/pingcap/tidb/store/tikv" "github.com/pingcap/tidb/store/tikv/oracle" ) @@ -35,7 +36,7 @@ func (s testFaultInjectionSuite) TestFaultInjectionBasic(c *C) { storage := NewInjectedStore(newMockStorage(), &cfg) txn, err := storage.Begin() c.Assert(err, IsNil) - _, err = storage.BeginWithOption(TransactionOption{}.SetTxnScope(oracle.GlobalTxnScope).SetStartTs(0)) + _, err = storage.BeginWithOption(tikv.DefaultStartTSOption().SetTxnScope(oracle.GlobalTxnScope).SetStartTs(0)) c.Assert(err, IsNil) ver := Version{Ver: 1} snap := storage.GetSnapshot(ver) diff --git a/kv/interface_mock_test.go b/kv/interface_mock_test.go index 5d85261bc2111..9e41832678294 100644 --- a/kv/interface_mock_test.go +++ b/kv/interface_mock_test.go @@ -17,6 +17,7 @@ import ( "context" "github.com/pingcap/parser/model" + "github.com/pingcap/tidb/store/tikv" "github.com/pingcap/tidb/store/tikv/oracle" ) @@ -154,7 +155,7 @@ func (s *mockStorage) Begin() (Transaction, error) { return newMockTxn(), nil } -func (s *mockStorage) BeginWithOption(option TransactionOption) (Transaction, error) { +func (s *mockStorage) BeginWithOption(option tikv.StartTSOption) (Transaction, error) { return newMockTxn(), nil } diff --git a/kv/kv.go b/kv/kv.go index 20b0fc84b7144..471dfe09a110b 100644 --- a/kv/kv.go +++ b/kv/kv.go @@ -21,6 +21,7 @@ import ( "github.com/pingcap/kvproto/pkg/metapb" "github.com/pingcap/parser/model" "github.com/pingcap/tidb/config" + "github.com/pingcap/tidb/store/tikv" tikvstore "github.com/pingcap/tidb/store/tikv/kv" "github.com/pingcap/tidb/store/tikv/oracle" "github.com/pingcap/tidb/util/memory" @@ -339,59 +340,13 @@ type Driver interface { Open(path string) (Storage, error) } -// TransactionOption indicates the option when beginning a transaction -// `TxnScope` must be set for each object -// Every other fields are optional, but currently at most one of them can be set -type TransactionOption struct { - TxnScope string - StartTS *uint64 - PrevSec *uint64 - MinStartTS *uint64 - MaxPrevSec *uint64 -} - -// DefaultTransactionOption creates a default TransactionOption, ie. Work in GlobalTxnScope and get start ts when got used -func DefaultTransactionOption() TransactionOption { - return TransactionOption{TxnScope: oracle.GlobalTxnScope} -} - -// SetMaxPrevSec set maxPrevSec -func (to TransactionOption) SetMaxPrevSec(maxPrevSec uint64) TransactionOption { - to.MaxPrevSec = &maxPrevSec - return to -} - -// SetMinStartTS set minStartTS -func (to TransactionOption) SetMinStartTS(minStartTS uint64) TransactionOption { - to.MinStartTS = &minStartTS - return to -} - -// SetStartTs set startTS -func (to TransactionOption) SetStartTs(startTS uint64) TransactionOption { - to.StartTS = &startTS - return to -} - -// SetPrevSec set prevSec -func (to TransactionOption) SetPrevSec(prevSec uint64) TransactionOption { - to.PrevSec = &prevSec - return to -} - -// SetTxnScope set txnScope -func (to TransactionOption) SetTxnScope(txnScope string) TransactionOption { - to.TxnScope = txnScope - return to -} - // Storage defines the interface for storage. // Isolation should be at least SI(SNAPSHOT ISOLATION) type Storage interface { // Begin a global transaction Begin() (Transaction, error) // Begin a transaction with given option - BeginWithOption(option TransactionOption) (Transaction, error) + BeginWithOption(option tikv.StartTSOption) (Transaction, error) // GetSnapshot gets a snapshot that is able to read any data which data is <= ver. // if ver is MaxVersion or > current max committed version, we will use current version for this snapshot. GetSnapshot(ver Version) Snapshot diff --git a/planner/core/logical_plan_builder.go b/planner/core/logical_plan_builder.go index 64bc0c41407e1..fc6fb53dcff44 100644 --- a/planner/core/logical_plan_builder.go +++ b/planner/core/logical_plan_builder.go @@ -975,6 +975,19 @@ func (b *PlanBuilder) buildSelection(ctx context.Context, p LogicalPlan, where a if len(cnfExpres) == 0 { return p, nil } + // check expr field types. + for i, expr := range cnfExpres { + if expr.GetType().EvalType() == types.ETString { + tp := &types.FieldType{ + Tp: mysql.TypeDouble, + Flag: expr.GetType().Flag, + Flen: mysql.MaxRealWidth, + Decimal: types.UnspecifiedLength, + } + types.SetBinChsClnFlag(tp) + cnfExpres[i] = expression.TryPushCastIntoControlFunctionForHybridType(b.ctx, expr, tp) + } + } selection.Conditions = cnfExpres selection.SetChildren(p) return selection, nil diff --git a/session/session.go b/session/session.go index 78a60a6ebaecf..c9b3f8f7a8abd 100644 --- a/session/session.go +++ b/session/session.go @@ -1975,7 +1975,7 @@ func (s *session) NewTxn(ctx context.Context) error { zap.String("txnScope", txnScope)) } - txn, err := s.store.BeginWithOption(kv.DefaultTransactionOption().SetTxnScope(s.sessionVars.CheckAndGetTxnScope())) + txn, err := s.store.BeginWithOption(tikv.DefaultStartTSOption().SetTxnScope(s.sessionVars.CheckAndGetTxnScope())) if err != nil { return err } @@ -2652,6 +2652,7 @@ var builtinGlobalVariable = []string{ variable.TiDBAllowFallbackToTiKV, variable.TiDBEnableDynamicPrivileges, variable.CTEMaxRecursionDepth, + variable.TiDBDMLBatchSize, } // loadCommonGlobalVariablesIfNeeded loads and applies commonly used global variables for the session. @@ -2767,7 +2768,7 @@ func (s *session) InitTxnWithStartTS(startTS uint64) error { } // no need to get txn from txnFutureCh since txn should init with startTs - txn, err := s.store.BeginWithOption(kv.DefaultTransactionOption().SetTxnScope(s.GetSessionVars().CheckAndGetTxnScope()).SetStartTs(startTS)) + txn, err := s.store.BeginWithOption(tikv.DefaultStartTSOption().SetTxnScope(s.GetSessionVars().CheckAndGetTxnScope()).SetStartTs(startTS)) if err != nil { return err } @@ -2800,22 +2801,22 @@ func (s *session) NewTxnWithStalenessOption(ctx context.Context, option sessionc txnScope := s.GetSessionVars().CheckAndGetTxnScope() switch option.Mode { case ast.TimestampBoundReadTimestamp: - txn, err = s.store.BeginWithOption(kv.DefaultTransactionOption().SetTxnScope(txnScope).SetStartTs(option.StartTS)) + txn, err = s.store.BeginWithOption(tikv.DefaultStartTSOption().SetTxnScope(txnScope).SetStartTs(option.StartTS)) if err != nil { return err } case ast.TimestampBoundExactStaleness: - txn, err = s.store.BeginWithOption(kv.DefaultTransactionOption().SetTxnScope(txnScope).SetPrevSec(option.PrevSec)) + txn, err = s.store.BeginWithOption(tikv.DefaultStartTSOption().SetTxnScope(txnScope).SetPrevSec(option.PrevSec)) if err != nil { return err } case ast.TimestampBoundMaxStaleness: - txn, err = s.store.BeginWithOption(kv.DefaultTransactionOption().SetTxnScope(txnScope).SetMaxPrevSec(option.PrevSec)) + txn, err = s.store.BeginWithOption(tikv.DefaultStartTSOption().SetTxnScope(txnScope).SetMaxPrevSec(option.PrevSec)) if err != nil { return err } case ast.TimestampBoundMinReadTimestamp: - txn, err = s.store.BeginWithOption(kv.DefaultTransactionOption().SetTxnScope(txnScope).SetMinStartTS(option.StartTS)) + txn, err = s.store.BeginWithOption(tikv.DefaultStartTSOption().SetTxnScope(txnScope).SetMinStartTS(option.StartTS)) if err != nil { return err } diff --git a/session/session_test.go b/session/session_test.go index f7267e3a13259..df2a167921e56 100644 --- a/session/session_test.go +++ b/session/session_test.go @@ -4476,3 +4476,13 @@ func (s *testTxnStateSuite) TestRollbacking(c *C) { c.Assert(tk.Se.TxnInfo().State, Equals, txninfo.TxnRollingBack) <-ch } + +func (s *testSessionSuite) TestReadDMLBatchSize(c *C) { + tk := testkit.NewTestKit(c, s.store) + tk.MustExec("set global tidb_dml_batch_size=1000") + se, err := session.CreateSession(s.store) + c.Assert(err, IsNil) + // `select 1` to load the global variables. + _, _ = se.Execute(context.TODO(), "select 1") + c.Assert(se.GetSessionVars().DMLBatchSize, Equals, 1000) +} diff --git a/session/txn.go b/session/txn.go index 133cafb976aae..294725f8efaa0 100644 --- a/session/txn.go +++ b/session/txn.go @@ -33,6 +33,7 @@ import ( "github.com/pingcap/tidb/session/txninfo" "github.com/pingcap/tidb/sessionctx" "github.com/pingcap/tidb/sessionctx/binloginfo" + "github.com/pingcap/tidb/store/tikv" tikvstore "github.com/pingcap/tidb/store/tikv/kv" "github.com/pingcap/tidb/store/tikv/oracle" "github.com/pingcap/tidb/tablecodec" @@ -436,14 +437,14 @@ type txnFuture struct { func (tf *txnFuture) wait() (kv.Transaction, error) { startTS, err := tf.future.Wait() if err == nil { - return tf.store.BeginWithOption(kv.DefaultTransactionOption().SetTxnScope(tf.txnScope).SetStartTs(startTS)) + return tf.store.BeginWithOption(tikv.DefaultStartTSOption().SetTxnScope(tf.txnScope).SetStartTs(startTS)) } else if config.GetGlobalConfig().Store == "unistore" { return nil, err } logutil.BgLogger().Warn("wait tso failed", zap.Error(err)) // It would retry get timestamp. - return tf.store.BeginWithOption(kv.DefaultTransactionOption().SetTxnScope(tf.txnScope)) + return tf.store.BeginWithOption(tikv.DefaultStartTSOption().SetTxnScope(tf.txnScope)) } func (s *session) getTxnFuture(ctx context.Context) *txnFuture { diff --git a/sessionctx/variable/sysvar.go b/sessionctx/variable/sysvar.go index 99c3da8233d68..e6f632db6b02d 100644 --- a/sessionctx/variable/sysvar.go +++ b/sessionctx/variable/sysvar.go @@ -904,7 +904,7 @@ var defaultSysVars = []*SysVar{ return nil }}, {Scope: ScopeGlobal | ScopeSession, Name: TiDBDMLBatchSize, Value: strconv.Itoa(DefDMLBatchSize), Type: TypeUnsigned, MinValue: 0, MaxValue: math.MaxUint64, SetSession: func(s *SessionVars, val string) error { - s.DMLBatchSize = int(tidbOptInt64(val, DefOptCorrelationExpFactor)) + s.DMLBatchSize = int(tidbOptInt64(val, DefDMLBatchSize)) return nil }}, {Scope: ScopeSession, Name: TiDBCurrentTS, Value: strconv.Itoa(DefCurretTS), ReadOnly: true}, diff --git a/store/copr/batch_coprocessor.go b/store/copr/batch_coprocessor.go index b0c0ad5c9ea7b..8c73f58fbf892 100644 --- a/store/copr/batch_coprocessor.go +++ b/store/copr/batch_coprocessor.go @@ -16,6 +16,8 @@ package copr import ( "context" "io" + "math" + "strconv" "sync" "sync/atomic" "time" @@ -25,6 +27,7 @@ import ( "github.com/pingcap/kvproto/pkg/coprocessor" "github.com/pingcap/kvproto/pkg/kvrpcpb" "github.com/pingcap/kvproto/pkg/metapb" + "github.com/pingcap/log" "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/store/driver/backoff" derr "github.com/pingcap/tidb/store/driver/error" @@ -40,8 +43,9 @@ import ( type batchCopTask struct { storeAddr string cmdType tikvrpc.CmdType + ctx *tikv.RPCContext - copTasks []copTaskAndRPCContext + regionInfos []tikv.RegionInfo } type batchCopResponse struct { @@ -93,9 +97,152 @@ func (rs *batchCopResponse) RespTime() time.Duration { return rs.respTime } -type copTaskAndRPCContext struct { - task *copTask - ctx *tikv.RPCContext +// balanceBatchCopTask balance the regions between available stores, the basic rule is +// 1. the first region of each original batch cop task belongs to its original store because some +// meta data(like the rpc context) in batchCopTask is related to it +// 2. for the remaining regions: +// if there is only 1 available store, then put the region to the related store +// otherwise, use a greedy algorithm to put it into the store with highest weight +func balanceBatchCopTask(originalTasks []*batchCopTask) []*batchCopTask { + if len(originalTasks) <= 1 { + return originalTasks + } + storeTaskMap := make(map[uint64]*batchCopTask) + storeCandidateRegionMap := make(map[uint64]map[string]tikv.RegionInfo) + totalRegionCandidateNum := 0 + totalRemainingRegionNum := 0 + + for _, task := range originalTasks { + taskStoreID := task.regionInfos[0].AllStores[0] + batchTask := &batchCopTask{ + storeAddr: task.storeAddr, + cmdType: task.cmdType, + ctx: task.ctx, + regionInfos: []tikv.RegionInfo{task.regionInfos[0]}, + } + storeTaskMap[taskStoreID] = batchTask + } + + for _, task := range originalTasks { + taskStoreID := task.regionInfos[0].AllStores[0] + for index, ri := range task.regionInfos { + // for each region, figure out the valid store num + validStoreNum := 0 + if index == 0 { + continue + } + if len(ri.AllStores) <= 1 { + validStoreNum = 1 + } else { + for _, storeID := range ri.AllStores { + if _, ok := storeTaskMap[storeID]; ok { + validStoreNum++ + } + } + } + if validStoreNum == 1 { + // if only one store is valid, just put it to storeTaskMap + storeTaskMap[taskStoreID].regionInfos = append(storeTaskMap[taskStoreID].regionInfos, ri) + } else { + // if more than one store is valid, put the region + // to store candidate map + totalRegionCandidateNum += validStoreNum + totalRemainingRegionNum += 1 + taskKey := ri.Region.String() + for _, storeID := range ri.AllStores { + if _, validStore := storeTaskMap[storeID]; !validStore { + continue + } + if _, ok := storeCandidateRegionMap[storeID]; !ok { + candidateMap := make(map[string]tikv.RegionInfo) + storeCandidateRegionMap[storeID] = candidateMap + } + if _, duplicateRegion := storeCandidateRegionMap[storeID][taskKey]; duplicateRegion { + // duplicated region, should not happen, just give up balance + logutil.BgLogger().Warn("Meet duplicated region info during when trying to balance batch cop task, give up balancing") + return originalTasks + } + storeCandidateRegionMap[storeID][taskKey] = ri + } + } + } + } + if totalRemainingRegionNum == 0 { + return originalTasks + } + + avgStorePerRegion := float64(totalRegionCandidateNum) / float64(totalRemainingRegionNum) + findNextStore := func(candidateStores []uint64) uint64 { + store := uint64(math.MaxUint64) + weightedRegionNum := math.MaxFloat64 + if candidateStores != nil { + for _, storeID := range candidateStores { + if _, validStore := storeCandidateRegionMap[storeID]; !validStore { + continue + } + num := float64(len(storeCandidateRegionMap[storeID]))/avgStorePerRegion + float64(len(storeTaskMap[storeID].regionInfos)) + if num < weightedRegionNum { + store = storeID + weightedRegionNum = num + } + } + if store != uint64(math.MaxUint64) { + return store + } + } + for storeID := range storeTaskMap { + if _, validStore := storeCandidateRegionMap[storeID]; !validStore { + continue + } + num := float64(len(storeCandidateRegionMap[storeID]))/avgStorePerRegion + float64(len(storeTaskMap[storeID].regionInfos)) + if num < weightedRegionNum { + store = storeID + weightedRegionNum = num + } + } + return store + } + + store := findNextStore(nil) + for totalRemainingRegionNum > 0 { + if store == uint64(math.MaxUint64) { + break + } + var key string + var ri tikv.RegionInfo + for key, ri = range storeCandidateRegionMap[store] { + // get the first region + break + } + storeTaskMap[store].regionInfos = append(storeTaskMap[store].regionInfos, ri) + totalRemainingRegionNum-- + for _, id := range ri.AllStores { + if _, ok := storeCandidateRegionMap[id]; ok { + delete(storeCandidateRegionMap[id], key) + totalRegionCandidateNum-- + if len(storeCandidateRegionMap[id]) == 0 { + delete(storeCandidateRegionMap, id) + } + } + } + if totalRemainingRegionNum > 0 { + avgStorePerRegion = float64(totalRegionCandidateNum) / float64(totalRemainingRegionNum) + // it is not optimal because we only check the stores that affected by this region, in fact in order + // to find out the store with the lowest weightedRegionNum, all stores should be checked, but I think + // check only the affected stores is more simple and will get a good enough result + store = findNextStore(ri.AllStores) + } + } + if totalRemainingRegionNum > 0 { + logutil.BgLogger().Warn("Some regions are not used when trying to balance batch cop task, give up balancing") + return originalTasks + } + + var ret []*batchCopTask + for _, task := range storeTaskMap { + ret = append(ret, task) + } + return ret } func buildBatchCopTasks(bo *Backoffer, cache *tikv.RegionCache, ranges *tikv.KeyRanges, storeType kv.StoreType) ([]*batchCopTask, error) { @@ -138,13 +285,15 @@ func buildBatchCopTasks(bo *Backoffer, cache *tikv.RegionCache, ranges *tikv.Key // Then `splitRegion` will reloads these regions. continue } + allStores := cache.GetAllValidTiFlashStores(task.region, rpcCtx.Store) if batchCop, ok := storeTaskMap[rpcCtx.Addr]; ok { - batchCop.copTasks = append(batchCop.copTasks, copTaskAndRPCContext{task: task, ctx: rpcCtx}) + batchCop.regionInfos = append(batchCop.regionInfos, tikv.RegionInfo{Region: task.region, Meta: rpcCtx.Meta, Ranges: task.ranges, AllStores: allStores}) } else { batchTask := &batchCopTask{ - storeAddr: rpcCtx.Addr, - cmdType: cmdType, - copTasks: []copTaskAndRPCContext{{task, rpcCtx}}, + storeAddr: rpcCtx.Addr, + cmdType: cmdType, + ctx: rpcCtx, + regionInfos: []tikv.RegionInfo{{Region: task.region, Meta: rpcCtx.Meta, Ranges: task.ranges, AllStores: allStores}}, } storeTaskMap[rpcCtx.Addr] = batchTask } @@ -159,9 +308,25 @@ func buildBatchCopTasks(bo *Backoffer, cache *tikv.RegionCache, ranges *tikv.Key } continue } + for _, task := range storeTaskMap { batchTasks = append(batchTasks, task) } + if log.GetLevel() <= zap.DebugLevel { + msg := "Before region balance:" + for _, task := range batchTasks { + msg += " store " + task.storeAddr + ": " + strconv.Itoa(len(task.regionInfos)) + " regions," + } + logutil.BgLogger().Debug(msg) + } + batchTasks = balanceBatchCopTask(batchTasks) + if log.GetLevel() <= zap.DebugLevel { + msg := "After region balance:" + for _, task := range batchTasks { + msg += " store " + task.storeAddr + ": " + strconv.Itoa(len(task.regionInfos)) + " regions," + } + logutil.BgLogger().Debug(msg) + } if elapsed := time.Since(start); elapsed > time.Millisecond*500 { logutil.BgLogger().Warn("buildBatchCopTasks takes too much time", @@ -311,8 +476,8 @@ func (b *batchCopIterator) handleTask(ctx context.Context, bo *Backoffer, task * // Merge all ranges and request again. func (b *batchCopIterator) retryBatchCopTask(ctx context.Context, bo *Backoffer, batchTask *batchCopTask) ([]*batchCopTask, error) { var ranges []tikvstore.KeyRange - for _, taskCtx := range batchTask.copTasks { - taskCtx.task.ranges.Do(func(ran *tikvstore.KeyRange) { + for _, ri := range batchTask.regionInfos { + ri.Ranges.Do(func(ran *tikvstore.KeyRange) { ranges = append(ranges, *ran) }) } @@ -320,16 +485,16 @@ func (b *batchCopIterator) retryBatchCopTask(ctx context.Context, bo *Backoffer, } func (b *batchCopIterator) handleTaskOnce(ctx context.Context, bo *Backoffer, task *batchCopTask) ([]*batchCopTask, error) { - sender := NewRegionBatchRequestSender(b.store.GetRegionCache(), b.store.GetTiKVClient()) - var regionInfos = make([]*coprocessor.RegionInfo, 0, len(task.copTasks)) - for _, task := range task.copTasks { + sender := tikv.NewRegionBatchRequestSender(b.store.GetRegionCache(), b.store.GetTiKVClient()) + var regionInfos = make([]*coprocessor.RegionInfo, 0, len(task.regionInfos)) + for _, ri := range task.regionInfos { regionInfos = append(regionInfos, &coprocessor.RegionInfo{ - RegionId: task.task.region.GetID(), + RegionId: ri.Region.GetID(), RegionEpoch: &metapb.RegionEpoch{ - ConfVer: task.task.region.GetConfVer(), - Version: task.task.region.GetVer(), + ConfVer: ri.Region.GetConfVer(), + Version: ri.Region.GetVer(), }, - Ranges: task.task.ranges.ToPBRanges(), + Ranges: ri.Ranges.ToPBRanges(), }) } @@ -351,13 +516,14 @@ func (b *batchCopIterator) handleTaskOnce(ctx context.Context, bo *Backoffer, ta }) req.StoreTp = tikvrpc.TiFlash - logutil.BgLogger().Debug("send batch request to ", zap.String("req info", req.String()), zap.Int("cop task len", len(task.copTasks))) - resp, retry, cancel, err := sender.sendStreamReqToAddr(bo, task.copTasks, req, tikv.ReadTimeoutUltraLong) + logutil.BgLogger().Debug("send batch request to ", zap.String("req info", req.String()), zap.Int("cop task len", len(task.regionInfos))) + resp, retry, cancel, err := sender.SendReqToAddr(bo.TiKVBackoffer(), task.ctx, task.regionInfos, req, tikv.ReadTimeoutUltraLong) // If there are store errors, we should retry for all regions. if retry { return b.retryBatchCopTask(ctx, bo, task) } if err != nil { + err = derr.ToTiDBErr(err) return nil, errors.Trace(err) } defer cancel() diff --git a/store/copr/mpp.go b/store/copr/mpp.go index 2aaf4223ed8e5..1941f2b3fbfa4 100644 --- a/store/copr/mpp.go +++ b/store/copr/mpp.go @@ -180,14 +180,14 @@ func (m *mppIterator) handleDispatchReq(ctx context.Context, bo *Backoffer, req var regionInfos []*coprocessor.RegionInfo originalTask, ok := req.Meta.(*batchCopTask) if ok { - for _, task := range originalTask.copTasks { + for _, ri := range originalTask.regionInfos { regionInfos = append(regionInfos, &coprocessor.RegionInfo{ - RegionId: task.task.region.GetID(), + RegionId: ri.Region.GetID(), RegionEpoch: &metapb.RegionEpoch{ - ConfVer: task.task.region.GetConfVer(), - Version: task.task.region.GetVer(), + ConfVer: ri.Region.GetConfVer(), + Version: ri.Region.GetVer(), }, - Ranges: task.task.ranges.ToPBRanges(), + Ranges: ri.Ranges.ToPBRanges(), }) } } @@ -214,8 +214,8 @@ func (m *mppIterator) handleDispatchReq(ctx context.Context, bo *Backoffer, req // Or else it's the task without region, which always happens in high layer task without table. // In that case if originalTask != nil { - sender := NewRegionBatchRequestSender(m.store.GetRegionCache(), m.store.GetTiKVClient()) - rpcResp, _, _, err = sender.sendStreamReqToAddr(bo, originalTask.copTasks, wrappedReq, tikv.ReadTimeoutMedium) + sender := tikv.NewRegionBatchRequestSender(m.store.GetRegionCache(), m.store.GetTiKVClient()) + rpcResp, _, _, err = sender.SendReqToAddr(bo.TiKVBackoffer(), originalTask.ctx, originalTask.regionInfos, wrappedReq, tikv.ReadTimeoutMedium) // No matter what the rpc error is, we won't retry the mpp dispatch tasks. // TODO: If we want to retry, we must redo the plan fragment cutting and task scheduling. // That's a hard job but we can try it in the future. diff --git a/store/driver/tikv_driver.go b/store/driver/tikv_driver.go index cc0f217280f31..2d93b7eda4abb 100644 --- a/store/driver/tikv_driver.go +++ b/store/driver/tikv_driver.go @@ -307,7 +307,7 @@ func (s *tikvStore) Begin() (kv.Transaction, error) { } // BeginWithOption begins a transaction with given option -func (s *tikvStore) BeginWithOption(option kv.TransactionOption) (kv.Transaction, error) { +func (s *tikvStore) BeginWithOption(option tikv.StartTSOption) (kv.Transaction, error) { txn, err := s.KVStore.BeginWithOption(option) if err != nil { return nil, derr.ToTiDBErr(err) diff --git a/store/helper/helper.go b/store/helper/helper.go index 49aa7cf2107e0..8eb9b9d7db828 100644 --- a/store/helper/helper.go +++ b/store/helper/helper.go @@ -48,7 +48,7 @@ import ( // Methods copied from kv.Storage and tikv.Storage due to limitation of go1.13. type Storage interface { Begin() (kv.Transaction, error) - BeginWithOption(option kv.TransactionOption) (kv.Transaction, error) + BeginWithOption(option tikv.StartTSOption) (kv.Transaction, error) GetSnapshot(ver kv.Version) kv.Snapshot GetClient() kv.Client GetMPPClient() kv.MPPClient diff --git a/store/mockstore/mockstorage/storage.go b/store/mockstore/mockstorage/storage.go index 6221ef855707d..7d78d1a9b7418 100644 --- a/store/mockstore/mockstorage/storage.go +++ b/store/mockstore/mockstorage/storage.go @@ -83,7 +83,7 @@ func (s *mockStorage) ShowStatus(ctx context.Context, key string) (interface{}, } // BeginWithOption begins a transaction with given option -func (s *mockStorage) BeginWithOption(option kv.TransactionOption) (kv.Transaction, error) { +func (s *mockStorage) BeginWithOption(option tikv.StartTSOption) (kv.Transaction, error) { return newTiKVTxn(s.KVStore.BeginWithOption(option)) } diff --git a/store/tikv/2pc.go b/store/tikv/2pc.go index 19f3e4faf40e3..14609f5f77400 100644 --- a/store/tikv/2pc.go +++ b/store/tikv/2pc.go @@ -739,15 +739,11 @@ func (tm *ttlManager) keepAlive(c *twoPhaseCommitter) { return } bo := retry.NewBackofferWithVars(context.Background(), pessimisticLockMaxBackoff, c.txn.vars) - now, err := c.store.GetOracle().GetTimestamp(bo.GetCtx(), &oracle.Option{TxnScope: oracle.GlobalTxnScope}) + now, err := c.store.getTimestampWithRetry(bo, c.txn.GetScope()) if err != nil { - err1 := bo.Backoff(retry.BoPDRPC, err) - if err1 != nil { - logutil.Logger(bo.GetCtx()).Warn("keepAlive get tso fail", - zap.Error(err)) - return - } - continue + logutil.Logger(bo.GetCtx()).Warn("keepAlive get tso fail", + zap.Error(err)) + return } uptime := uint64(oracle.ExtractPhysical(now) - oracle.ExtractPhysical(c.startTS)) @@ -999,7 +995,7 @@ func (c *twoPhaseCommitter) execute(ctx context.Context) (err error) { // from PD and plus one as our MinCommitTS. if commitTSMayBeCalculated && c.needLinearizability() { failpoint.Inject("getMinCommitTSFromTSO", nil) - latestTS, err := c.store.oracle.GetTimestamp(ctx, &oracle.Option{TxnScope: oracle.GlobalTxnScope}) + latestTS, err := c.store.getTimestampWithRetry(retry.NewBackofferWithVars(ctx, tsoMaxBackoff, c.txn.vars), c.txn.GetScope()) // If we fail to get a timestamp from PD, we just propagate the failure // instead of falling back to the normal 2PC because a normal 2PC will // also be likely to fail due to the same timestamp issue. diff --git a/store/copr/batch_request_sender.go b/store/tikv/batch_request_sender.go similarity index 54% rename from store/copr/batch_request_sender.go rename to store/tikv/batch_request_sender.go index 422306382337d..9aad070b70306 100644 --- a/store/copr/batch_request_sender.go +++ b/store/tikv/batch_request_sender.go @@ -11,7 +11,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package copr +package tikv import ( "context" @@ -19,45 +19,52 @@ import ( "time" "github.com/pingcap/errors" - "github.com/pingcap/tidb/store/tikv" + "github.com/pingcap/kvproto/pkg/metapb" tikverr "github.com/pingcap/tidb/store/tikv/error" "github.com/pingcap/tidb/store/tikv/tikvrpc" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" ) +// RegionInfo contains region related information for batchCopTask +type RegionInfo struct { + Region RegionVerID + Meta *metapb.Region + Ranges *KeyRanges + AllStores []uint64 +} + // RegionBatchRequestSender sends BatchCop requests to TiFlash server by stream way. type RegionBatchRequestSender struct { - *tikv.RegionRequestSender + *RegionRequestSender } // NewRegionBatchRequestSender creates a RegionBatchRequestSender object. -func NewRegionBatchRequestSender(cache *tikv.RegionCache, client tikv.Client) *RegionBatchRequestSender { +func NewRegionBatchRequestSender(cache *RegionCache, client Client) *RegionBatchRequestSender { return &RegionBatchRequestSender{ - RegionRequestSender: tikv.NewRegionRequestSender(cache, client), + RegionRequestSender: NewRegionRequestSender(cache, client), } } -func (ss *RegionBatchRequestSender) sendStreamReqToAddr(bo *Backoffer, ctxs []copTaskAndRPCContext, req *tikvrpc.Request, timout time.Duration) (resp *tikvrpc.Response, retry bool, cancel func(), err error) { - // use the first ctx to send request, because every ctx has same address. +// SendReqToAddr send batch cop request +func (ss *RegionBatchRequestSender) SendReqToAddr(bo *Backoffer, rpcCtx *RPCContext, regionInfos []RegionInfo, req *tikvrpc.Request, timout time.Duration) (resp *tikvrpc.Response, retry bool, cancel func(), err error) { cancel = func() {} - rpcCtx := ctxs[0].ctx if e := tikvrpc.SetContext(req, rpcCtx.Meta, rpcCtx.Peer); e != nil { return nil, false, cancel, errors.Trace(e) } ctx := bo.GetCtx() - if rawHook := ctx.Value(tikv.RPCCancellerCtxKey{}); rawHook != nil { - ctx, cancel = rawHook.(*tikv.RPCCanceller).WithCancel(ctx) + if rawHook := ctx.Value(RPCCancellerCtxKey{}); rawHook != nil { + ctx, cancel = rawHook.(*RPCCanceller).WithCancel(ctx) } start := time.Now() resp, err = ss.GetClient().SendRequest(ctx, rpcCtx.Addr, req, timout) if ss.Stats != nil { - tikv.RecordRegionRequestRuntimeStats(ss.Stats, req.Type, time.Since(start)) + RecordRegionRequestRuntimeStats(ss.Stats, req.Type, time.Since(start)) } if err != nil { cancel() ss.SetRPCError(err) - e := ss.onSendFail(bo, ctxs, err) + e := ss.onSendFailForBatchRegions(bo, rpcCtx, regionInfos, err) if e != nil { return nil, false, func() {}, errors.Trace(e) } @@ -67,30 +74,25 @@ func (ss *RegionBatchRequestSender) sendStreamReqToAddr(bo *Backoffer, ctxs []co return } -func (ss *RegionBatchRequestSender) onSendFail(bo *Backoffer, ctxs []copTaskAndRPCContext, err error) error { +func (ss *RegionBatchRequestSender) onSendFailForBatchRegions(bo *Backoffer, ctx *RPCContext, regionInfos []RegionInfo, err error) error { // If it failed because the context is cancelled by ourself, don't retry. if errors.Cause(err) == context.Canceled || status.Code(errors.Cause(err)) == codes.Canceled { return errors.Trace(err) - } else if atomic.LoadUint32(&tikv.ShuttingDown) > 0 { + } else if atomic.LoadUint32(&ShuttingDown) > 0 { return tikverr.ErrTiDBShuttingDown } - for _, failedCtx := range ctxs { - ctx := failedCtx.ctx - if ctx.Meta != nil { - // The reload region param is always true. Because that every time we try, we must - // re-build the range then re-create the batch sender. As a result, the len of "failStores" - // will change. If tiflash's replica is more than two, the "reload region" will always be false. - // Now that the batch cop and mpp has a relative low qps, it's reasonable to reload every time - // when meeting io error. - ss.GetRegionCache().OnSendFail(bo.TiKVBackoffer(), ctx, true, err) - } - } + // The reload region param is always true. Because that every time we try, we must + // re-build the range then re-create the batch sender. As a result, the len of "failStores" + // will change. If tiflash's replica is more than two, the "reload region" will always be false. + // Now that the batch cop and mpp has a relative low qps, it's reasonable to reload every time + // when meeting io error. + ss.GetRegionCache().OnSendFailForBatchRegions(bo, ctx.Store, regionInfos, true, err) // Retry on send request failure when it's not canceled. // When a store is not available, the leader of related region should be elected quickly. // TODO: the number of retry time should be limited:since region may be unavailable // when some unrecoverable disaster happened. - err = bo.Backoff(tikv.BoTiFlashRPC, errors.Errorf("send tikv request error: %v, ctxs: %v, try next peer later", err, ctxs)) + err = bo.Backoff(BoTiFlashRPC, errors.Errorf("send request error: %v, ctx: %v, regionInfos: %v", err, ctx, regionInfos)) return errors.Trace(err) } diff --git a/store/tikv/kv.go b/store/tikv/kv.go index 8cec3dfbca964..622313f382abd 100644 --- a/store/tikv/kv.go +++ b/store/tikv/kv.go @@ -18,6 +18,7 @@ import ( "crypto/tls" "math" "math/rand" + "strconv" "sync" "sync/atomic" "time" @@ -27,7 +28,6 @@ import ( "github.com/pingcap/failpoint" "github.com/pingcap/kvproto/pkg/kvrpcpb" "github.com/pingcap/kvproto/pkg/metapb" - tidbkv "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/store/tikv/config" tikverr "github.com/pingcap/tidb/store/tikv/error" "github.com/pingcap/tidb/store/tikv/kv" @@ -188,11 +188,11 @@ func (s *KVStore) runSafePointChecker() { // Begin a global transaction. func (s *KVStore) Begin() (*KVTxn, error) { - return s.BeginWithOption(tidbkv.DefaultTransactionOption()) + return s.BeginWithOption(DefaultStartTSOption()) } -// BeginWithOption begins a transaction with the given TransactionOption -func (s *KVStore) BeginWithOption(options tidbkv.TransactionOption) (*KVTxn, error) { +// BeginWithOption begins a transaction with the given StartTSOption +func (s *KVStore) BeginWithOption(options StartTSOption) (*KVTxn, error) { return newTiKVTxnWithOptions(s, options) } @@ -388,6 +388,7 @@ func (s *KVStore) getSafeTS(storeID uint64) uint64 { return safeTS.(uint64) } +// setSafeTS sets safeTs for store storeID, export for testing func (s *KVStore) setSafeTS(storeID, safeTS uint64) { s.safeTSMap.Store(storeID, safeTS) } @@ -436,17 +437,20 @@ func (s *KVStore) updateSafeTS(ctx context.Context) { storeAddr := store.addr go func(ctx context.Context, wg *sync.WaitGroup, storeID uint64, storeAddr string) { defer wg.Done() - // TODO: add metrics for updateSafeTS resp, err := tikvClient.SendRequest(ctx, storeAddr, tikvrpc.NewRequest(tikvrpc.CmdStoreSafeTS, &kvrpcpb.StoreSafeTSRequest{KeyRange: &kvrpcpb.KeyRange{ StartKey: []byte(""), EndKey: []byte(""), }}), ReadTimeoutShort) + storeIDStr := strconv.Itoa(int(storeID)) if err != nil { + metrics.TiKVSafeTSUpdateCounter.WithLabelValues("fail", storeIDStr).Inc() logutil.BgLogger().Debug("update safeTS failed", zap.Error(err), zap.Uint64("store-id", storeID)) return } safeTSResp := resp.Resp.(*kvrpcpb.StoreSafeTSResponse) s.setSafeTS(storeID, safeTSResp.GetSafeTs()) + metrics.TiKVSafeTSUpdateCounter.WithLabelValues("success", storeIDStr).Inc() + metrics.TiKVSafeTSUpdateStats.WithLabelValues(storeIDStr).Set(float64(safeTSResp.GetSafeTs())) }(ctx, wg, storeID, storeAddr) } wg.Wait() diff --git a/store/tikv/lock_resolver.go b/store/tikv/lock_resolver.go index fe50910a896e6..0ed9ecb3fa471 100644 --- a/store/tikv/lock_resolver.go +++ b/store/tikv/lock_resolver.go @@ -229,11 +229,6 @@ func (lr *LockResolver) BatchResolveLocks(bo *Backoffer, locks []*Lock, loc Regi // locks have been cleaned before GC. expiredLocks := locks - callerStartTS, err := lr.store.GetOracle().GetTimestamp(bo.GetCtx(), &oracle.Option{TxnScope: oracle.GlobalTxnScope}) - if err != nil { - return false, errors.Trace(err) - } - txnInfos := make(map[uint64]uint64) startTime := time.Now() for _, l := range expiredLocks { @@ -243,7 +238,7 @@ func (lr *LockResolver) BatchResolveLocks(bo *Backoffer, locks []*Lock, loc Regi metrics.LockResolverCountWithExpired.Inc() // Use currentTS = math.MaxUint64 means rollback the txn, no matter the lock is expired or not! - status, err := lr.getTxnStatus(bo, l.TxnID, l.Primary, callerStartTS, math.MaxUint64, true, false, l) + status, err := lr.getTxnStatus(bo, l.TxnID, l.Primary, 0, math.MaxUint64, true, false, l) if err != nil { return false, err } @@ -257,7 +252,7 @@ func (lr *LockResolver) BatchResolveLocks(bo *Backoffer, locks []*Lock, loc Regi continue } if _, ok := errors.Cause(err).(*nonAsyncCommitLock); ok { - status, err = lr.getTxnStatus(bo, l.TxnID, l.Primary, callerStartTS, math.MaxUint64, true, true, l) + status, err = lr.getTxnStatus(bo, l.TxnID, l.Primary, 0, math.MaxUint64, true, true, l) if err != nil { return false, err } diff --git a/store/tikv/metrics/metrics.go b/store/tikv/metrics/metrics.go index 8d71582fa2522..6b8ea32d456f7 100644 --- a/store/tikv/metrics/metrics.go +++ b/store/tikv/metrics/metrics.go @@ -59,6 +59,8 @@ var ( TiKVPanicCounter *prometheus.CounterVec TiKVForwardRequestCounter *prometheus.CounterVec TiKVTSFutureWaitDuration prometheus.Histogram + TiKVSafeTSUpdateCounter *prometheus.CounterVec + TiKVSafeTSUpdateStats *prometheus.GaugeVec ) // Label constants. @@ -414,6 +416,22 @@ func initMetrics(namespace, subsystem string) { Buckets: prometheus.ExponentialBuckets(0.000005, 2, 30), // 5us ~ 2560s }) + TiKVSafeTSUpdateCounter = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Namespace: namespace, + Subsystem: subsystem, + Name: "safets_update_counter", + Help: "Counter of tikv safe_ts being updated.", + }, []string{LblResult, LblStore}) + + TiKVSafeTSUpdateStats = prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Namespace: namespace, + Subsystem: subsystem, + Name: "safets_update_stats", + Help: "stat of tikv updating safe_ts stats", + }, []string{LblStore}) + initShortcuts() } @@ -468,6 +486,8 @@ func RegisterMetrics() { prometheus.MustRegister(TiKVPanicCounter) prometheus.MustRegister(TiKVForwardRequestCounter) prometheus.MustRegister(TiKVTSFutureWaitDuration) + prometheus.MustRegister(TiKVSafeTSUpdateCounter) + prometheus.MustRegister(TiKVSafeTSUpdateStats) } // readCounter reads the value of a prometheus.Counter. diff --git a/store/tikv/region_cache.go b/store/tikv/region_cache.go index f6225a2724f8e..0d9423a9f5a7e 100644 --- a/store/tikv/region_cache.go +++ b/store/tikv/region_cache.go @@ -112,6 +112,15 @@ func (r *RegionStore) accessStore(mode AccessMode, idx AccessIndex) (int, *Store return sidx, r.stores[sidx] } +func (r *RegionStore) getAccessIndex(mode AccessMode, store *Store) AccessIndex { + for index, sidx := range r.accessIndex[mode] { + if r.stores[sidx].storeID == store.storeID { + return AccessIndex(index) + } + } + return -1 +} + func (r *RegionStore) accessStoreNum(mode AccessMode) int { return len(r.accessIndex[mode]) } @@ -526,6 +535,40 @@ func (c *RegionCache) GetTiKVRPCContext(bo *Backoffer, id RegionVerID, replicaRe }, nil } +// GetAllValidTiFlashStores returns the store ids of all valid TiFlash stores, the store id of currentStore is always the first one +func (c *RegionCache) GetAllValidTiFlashStores(id RegionVerID, currentStore *Store) []uint64 { + // set the cap to 2 because usually, TiFlash table will have 2 replicas + allStores := make([]uint64, 0, 2) + // make sure currentStore id is always the first in allStores + allStores = append(allStores, currentStore.storeID) + ts := time.Now().Unix() + cachedRegion := c.getCachedRegionWithRLock(id) + if cachedRegion == nil { + return allStores + } + if !cachedRegion.checkRegionCacheTTL(ts) { + return allStores + } + regionStore := cachedRegion.getStore() + currentIndex := regionStore.getAccessIndex(TiFlashOnly, currentStore) + if currentIndex == -1 { + return allStores + } + for startOffset := 1; startOffset < regionStore.accessStoreNum(TiFlashOnly); startOffset++ { + accessIdx := AccessIndex((int(currentIndex) + startOffset) % regionStore.accessStoreNum(TiFlashOnly)) + storeIdx, store := regionStore.accessStore(TiFlashOnly, accessIdx) + if store.getResolveState() == needCheck { + continue + } + storeFailEpoch := atomic.LoadUint32(&store.epoch) + if storeFailEpoch != regionStore.storeEpochs[storeIdx] { + continue + } + allStores = append(allStores, store.storeID) + } + return allStores +} + // GetTiFlashRPCContext returns RPCContext for a region must access flash store. If it returns nil, the region // must be out of date and already dropped from cache or not flash store found. // `loadBalance` is an option. For MPP and batch cop, it is pointless and might cause try the failed store repeatly. @@ -668,6 +711,64 @@ func (c *RegionCache) findRegionByKey(bo *Backoffer, key []byte, isEndKey bool) return r, nil } +// OnSendFailForBatchRegions handles send request fail logic. +func (c *RegionCache) OnSendFailForBatchRegions(bo *Backoffer, store *Store, regionInfos []RegionInfo, scheduleReload bool, err error) { + metrics.RegionCacheCounterWithSendFail.Add(float64(len(regionInfos))) + if store.storeType != tikvrpc.TiFlash { + logutil.Logger(bo.GetCtx()).Info("Should not reach here, OnSendFailForBatchRegions only support TiFlash") + return + } + for _, ri := range regionInfos { + if ri.Meta == nil { + continue + } + r := c.getCachedRegionWithRLock(ri.Region) + if r != nil { + peersNum := len(r.meta.Peers) + if len(ri.Meta.Peers) != peersNum { + logutil.Logger(bo.GetCtx()).Info("retry and refresh current region after send request fail and up/down stores length changed", + zap.Stringer("region", &ri.Region), + zap.Bool("needReload", scheduleReload), + zap.Reflect("oldPeers", ri.Meta.Peers), + zap.Reflect("newPeers", r.meta.Peers), + zap.Error(err)) + continue + } + + rs := r.getStore() + + accessMode := TiFlashOnly + accessIdx := rs.getAccessIndex(accessMode, store) + if accessIdx == -1 { + logutil.Logger(bo.GetCtx()).Warn("can not get access index for region " + ri.Region.String()) + continue + } + if err != nil { + storeIdx, s := rs.accessStore(accessMode, accessIdx) + epoch := rs.storeEpochs[storeIdx] + if atomic.CompareAndSwapUint32(&s.epoch, epoch, epoch+1) { + logutil.BgLogger().Info("mark store's regions need be refill", zap.String("store", s.addr)) + metrics.RegionCacheCounterWithInvalidateStoreRegionsOK.Inc() + } + // schedule a store addr resolve. + s.markNeedCheck(c.notifyCheckCh) + } + + // try next peer + rs.switchNextFlashPeer(r, accessIdx) + logutil.Logger(bo.GetCtx()).Info("switch region tiflash peer to next due to send request fail", + zap.Stringer("region", &ri.Region), + zap.Bool("needReload", scheduleReload), + zap.Error(err)) + + // force reload region when retry all known peers in region. + if scheduleReload { + r.scheduleReload() + } + } + } +} + // OnSendFail handles send request fail logic. func (c *RegionCache) OnSendFail(bo *Backoffer, ctx *RPCContext, scheduleReload bool, err error) { metrics.RegionCacheCounterWithSendFail.Inc() diff --git a/store/tikv/test_probe.go b/store/tikv/test_probe.go index 1a8dc5062218d..3e80e6310fe4b 100644 --- a/store/tikv/test_probe.go +++ b/store/tikv/test_probe.go @@ -22,6 +22,7 @@ import ( "github.com/pingcap/errors" "github.com/pingcap/kvproto/pkg/kvrpcpb" pb "github.com/pingcap/kvproto/pkg/kvrpcpb" + "github.com/pingcap/kvproto/pkg/metapb" "github.com/pingcap/tidb/store/tikv/retry" "github.com/pingcap/tidb/store/tikv/tikvrpc" "github.com/pingcap/tidb/store/tikv/unionstore" @@ -81,6 +82,23 @@ func (s StoreProbe) SaveSafePoint(v uint64) error { return saveSafePoint(s.GetSafePointKV(), v) } +// SetRegionCacheStore is used to set a store in region cache, for testing only +func (s StoreProbe) SetRegionCacheStore(id uint64, storeType tikvrpc.EndpointType, state uint64, labels []*metapb.StoreLabel) { + s.regionCache.storeMu.Lock() + defer s.regionCache.storeMu.Unlock() + s.regionCache.storeMu.stores[id] = &Store{ + storeID: id, + storeType: storeType, + state: state, + labels: labels, + } +} + +// SetSafeTS is used to set safeTS for the store with `storeID` +func (s StoreProbe) SetSafeTS(storeID, safeTS uint64) { + s.setSafeTS(storeID, safeTS) +} + // TxnProbe wraps a txn and exports internal states for testing purpose. type TxnProbe struct { *KVTxn diff --git a/store/tikv/tests/2pc_test.go b/store/tikv/tests/2pc_test.go index 5589752043b2b..43b682160c514 100644 --- a/store/tikv/tests/2pc_test.go +++ b/store/tikv/tests/2pc_test.go @@ -29,7 +29,6 @@ import ( "github.com/pingcap/failpoint" "github.com/pingcap/kvproto/pkg/kvrpcpb" pb "github.com/pingcap/kvproto/pkg/kvrpcpb" - tidbkv "github.com/pingcap/tidb/kv" drivertxn "github.com/pingcap/tidb/store/driver/txn" "github.com/pingcap/tidb/store/tikv" "github.com/pingcap/tidb/store/tikv/config" @@ -603,12 +602,12 @@ func (s *testCommitterSuite) TestRejectCommitTS(c *C) { // Use max.Uint64 to read the data and success. // That means the final commitTS > startTS+2, it's not the one we provide. // So we cover the rety commitTS logic. - txn1, err := s.store.BeginWithOption(tidbkv.DefaultTransactionOption().SetStartTs(committer.GetStartTS() + 2)) + txn1, err := s.store.BeginWithOption(tikv.DefaultStartTSOption().SetStartTs(committer.GetStartTS() + 2)) c.Assert(err, IsNil) _, err = txn1.Get(bo.GetCtx(), []byte("x")) c.Assert(tikverr.IsErrNotFound(err), IsTrue) - txn2, err := s.store.BeginWithOption(tidbkv.DefaultTransactionOption().SetStartTs(math.MaxUint64)) + txn2, err := s.store.BeginWithOption(tikv.DefaultStartTSOption().SetStartTs(math.MaxUint64)) c.Assert(err, IsNil) val, err := txn2.Get(bo.GetCtx(), []byte("x")) c.Assert(err, IsNil) diff --git a/store/tikv/extract_start_ts_test.go b/store/tikv/tests/extract_start_ts_test.go similarity index 54% rename from store/tikv/extract_start_ts_test.go rename to store/tikv/tests/extract_start_ts_test.go index b392ca365cde8..566211006b66c 100644 --- a/store/tikv/extract_start_ts_test.go +++ b/store/tikv/tests/extract_start_ts_test.go @@ -11,20 +11,20 @@ // See the License for the specific language governing permissions and // limitations under the License. -package tikv +package tikv_test import ( . "github.com/pingcap/check" "github.com/pingcap/failpoint" "github.com/pingcap/kvproto/pkg/metapb" - "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/store/mockstore/unistore" + "github.com/pingcap/tidb/store/tikv" "github.com/pingcap/tidb/store/tikv/oracle" "github.com/pingcap/tidb/store/tikv/tikvrpc" ) type extractStartTsSuite struct { - store *KVStore + store *tikv.KVStore } var _ = SerialSuites(&extractStartTsSuite{}) @@ -33,31 +33,24 @@ func (s *extractStartTsSuite) SetUpTest(c *C) { client, pdClient, cluster, err := unistore.New("") c.Assert(err, IsNil) unistore.BootstrapWithSingleStore(cluster) - store, err := NewTestTiKVStore(client, pdClient, nil, nil, 0) + store, err := tikv.NewTestTiKVStore(client, pdClient, nil, nil, 0) c.Assert(err, IsNil) - store.regionCache.storeMu.stores[2] = &Store{ - storeID: 2, - storeType: tikvrpc.TiKV, - state: uint64(resolved), - labels: []*metapb.StoreLabel{ - { - Key: DCLabelKey, - Value: oracle.LocalTxnScope, - }, + probe := tikv.StoreProbe{KVStore: store} + probe.SetRegionCacheStore(2, tikvrpc.TiKV, 1, []*metapb.StoreLabel{ + { + Key: tikv.DCLabelKey, + Value: oracle.LocalTxnScope, }, - } - store.regionCache.storeMu.stores[3] = &Store{ - storeID: 3, - storeType: tikvrpc.TiKV, - state: uint64(resolved), - labels: []*metapb.StoreLabel{{ - Key: DCLabelKey, + }) + probe.SetRegionCacheStore(3, tikvrpc.TiKV, 1, []*metapb.StoreLabel{ + { + Key: tikv.DCLabelKey, Value: "Some Random Label", - }}, - } - store.setSafeTS(2, 102) - store.setSafeTS(3, 101) - s.store = store + }, + }) + probe.SetSafeTS(2, 102) + probe.SetSafeTS(3, 101) + s.store = probe.KVStore } func (s *extractStartTsSuite) TestExtractStartTs(c *C) { @@ -69,26 +62,26 @@ func (s *extractStartTsSuite) TestExtractStartTs(c *C) { cases := []struct { expectedTS uint64 - option kv.TransactionOption + option tikv.StartTSOption }{ // StartTS setted - {100, kv.TransactionOption{TxnScope: oracle.GlobalTxnScope, StartTS: &i, PrevSec: nil, MinStartTS: nil, MaxPrevSec: nil}}, + {100, tikv.StartTSOption{TxnScope: oracle.GlobalTxnScope, StartTS: &i, PrevSec: nil, MinStartTS: nil, MaxPrevSec: nil}}, // PrevSec setted - {200, kv.TransactionOption{TxnScope: oracle.GlobalTxnScope, StartTS: nil, PrevSec: &i, MinStartTS: nil, MaxPrevSec: nil}}, + {200, tikv.StartTSOption{TxnScope: oracle.GlobalTxnScope, StartTS: nil, PrevSec: &i, MinStartTS: nil, MaxPrevSec: nil}}, // MinStartTS setted, global - {101, kv.TransactionOption{TxnScope: oracle.GlobalTxnScope, StartTS: nil, PrevSec: nil, MinStartTS: &i, MaxPrevSec: nil}}, + {101, tikv.StartTSOption{TxnScope: oracle.GlobalTxnScope, StartTS: nil, PrevSec: nil, MinStartTS: &i, MaxPrevSec: nil}}, // MinStartTS setted, local - {102, kv.TransactionOption{TxnScope: oracle.LocalTxnScope, StartTS: nil, PrevSec: nil, MinStartTS: &i, MaxPrevSec: nil}}, + {102, tikv.StartTSOption{TxnScope: oracle.LocalTxnScope, StartTS: nil, PrevSec: nil, MinStartTS: &i, MaxPrevSec: nil}}, // MaxPrevSec setted // however we need to add more cases to check the behavior when it fall backs to MinStartTS setted // see `TestMaxPrevSecFallback` - {200, kv.TransactionOption{TxnScope: oracle.GlobalTxnScope, StartTS: nil, PrevSec: nil, MinStartTS: nil, MaxPrevSec: &i}}, + {200, tikv.StartTSOption{TxnScope: oracle.GlobalTxnScope, StartTS: nil, PrevSec: nil, MinStartTS: nil, MaxPrevSec: &i}}, // nothing setted - {300, kv.TransactionOption{TxnScope: oracle.GlobalTxnScope, StartTS: nil, PrevSec: nil, MinStartTS: nil, MaxPrevSec: nil}}, + {300, tikv.StartTSOption{TxnScope: oracle.GlobalTxnScope, StartTS: nil, PrevSec: nil, MinStartTS: nil, MaxPrevSec: nil}}, } for _, cs := range cases { expected := cs.expectedTS - result, _ := extractStartTs(s.store, cs.option) + result, _ := tikv.ExtractStartTs(s.store, cs.option) c.Assert(result, Equals, expected) } @@ -97,18 +90,19 @@ func (s *extractStartTsSuite) TestExtractStartTs(c *C) { } func (s *extractStartTsSuite) TestMaxPrevSecFallback(c *C) { - s.store.setSafeTS(2, 0x8000000000000002) - s.store.setSafeTS(3, 0x8000000000000001) + probe := tikv.StoreProbe{KVStore: s.store} + probe.SetSafeTS(2, 0x8000000000000002) + probe.SetSafeTS(3, 0x8000000000000001) i := uint64(100) cases := []struct { expectedTS uint64 - option kv.TransactionOption + option tikv.StartTSOption }{ - {0x8000000000000001, kv.TransactionOption{TxnScope: oracle.GlobalTxnScope, StartTS: nil, PrevSec: nil, MinStartTS: nil, MaxPrevSec: &i}}, - {0x8000000000000002, kv.TransactionOption{TxnScope: oracle.LocalTxnScope, StartTS: nil, PrevSec: nil, MinStartTS: nil, MaxPrevSec: &i}}, + {0x8000000000000001, tikv.StartTSOption{TxnScope: oracle.GlobalTxnScope, StartTS: nil, PrevSec: nil, MinStartTS: nil, MaxPrevSec: &i}}, + {0x8000000000000002, tikv.StartTSOption{TxnScope: oracle.LocalTxnScope, StartTS: nil, PrevSec: nil, MinStartTS: nil, MaxPrevSec: &i}}, } for _, cs := range cases { - result, _ := extractStartTs(s.store, cs.option) + result, _ := tikv.ExtractStartTs(s.store, cs.option) c.Assert(result, Equals, cs.expectedTS) } } diff --git a/store/tikv/txn.go b/store/tikv/txn.go index 6de51ac27f437..988f6501be553 100644 --- a/store/tikv/txn.go +++ b/store/tikv/txn.go @@ -31,7 +31,6 @@ import ( "github.com/pingcap/failpoint" "github.com/pingcap/kvproto/pkg/kvrpcpb" "github.com/pingcap/kvproto/pkg/metapb" - "github.com/pingcap/tidb/kv" tikverr "github.com/pingcap/tidb/store/tikv/error" tikv "github.com/pingcap/tidb/store/tikv/kv" "github.com/pingcap/tidb/store/tikv/logutil" @@ -55,6 +54,52 @@ type SchemaAmender interface { AmendTxn(ctx context.Context, startInfoSchema SchemaVer, change *RelatedSchemaChange, mutations CommitterMutations) (CommitterMutations, error) } +// StartTSOption indicates the option when beginning a transaction +// `TxnScope` must be set for each object +// Every other fields are optional, but currently at most one of them can be set +type StartTSOption struct { + TxnScope string + StartTS *uint64 + PrevSec *uint64 + MinStartTS *uint64 + MaxPrevSec *uint64 +} + +// DefaultStartTSOption creates a default StartTSOption, ie. Work in GlobalTxnScope and get start ts when got used +func DefaultStartTSOption() StartTSOption { + return StartTSOption{TxnScope: oracle.GlobalTxnScope} +} + +// SetMaxPrevSec returns a new StartTSOption with MaxPrevSec set to maxPrevSec +func (to StartTSOption) SetMaxPrevSec(maxPrevSec uint64) StartTSOption { + to.MaxPrevSec = &maxPrevSec + return to +} + +// SetMinStartTS returns a new StartTSOption with MinStartTS set to minStartTS +func (to StartTSOption) SetMinStartTS(minStartTS uint64) StartTSOption { + to.MinStartTS = &minStartTS + return to +} + +// SetStartTs returns a new StartTSOption with StartTS set to startTS +func (to StartTSOption) SetStartTs(startTS uint64) StartTSOption { + to.StartTS = &startTS + return to +} + +// SetPrevSec returns a new StartTSOption with PrevSec set to prevSec +func (to StartTSOption) SetPrevSec(prevSec uint64) StartTSOption { + to.PrevSec = &prevSec + return to +} + +// SetTxnScope returns a new StartTSOption with TxnScope set to txnScope +func (to StartTSOption) SetTxnScope(txnScope string) StartTSOption { + to.TxnScope = txnScope + return to +} + // KVTxn contains methods to interact with a TiKV transaction. type KVTxn struct { snapshot *KVSnapshot @@ -90,23 +135,24 @@ type KVTxn struct { kvFilter KVFilter } -func extractStartTs(store *KVStore, options kv.TransactionOption) (uint64, error) { +// ExtractStartTs use `option` to get the proper startTS for a transaction +func ExtractStartTs(store *KVStore, option StartTSOption) (uint64, error) { var startTs uint64 var err error - if options.StartTS != nil { - startTs = *options.StartTS - } else if options.PrevSec != nil { + if option.StartTS != nil { + startTs = *option.StartTS + } else if option.PrevSec != nil { bo := NewBackofferWithVars(context.Background(), tsoMaxBackoff, nil) - startTs, err = store.getStalenessTimestamp(bo, options.TxnScope, *options.PrevSec) - } else if options.MinStartTS != nil { + startTs, err = store.getStalenessTimestamp(bo, option.TxnScope, *option.PrevSec) + } else if option.MinStartTS != nil { stores := make([]*Store, 0) allStores := store.regionCache.getStoresByType(tikvrpc.TiKV) - if options.TxnScope != oracle.GlobalTxnScope { + if option.TxnScope != oracle.GlobalTxnScope { for _, store := range allStores { if store.IsLabelsMatch([]*metapb.StoreLabel{ { Key: DCLabelKey, - Value: options.TxnScope, + Value: option.TxnScope, }, }) { stores = append(stores, store) @@ -116,32 +162,32 @@ func extractStartTs(store *KVStore, options kv.TransactionOption) (uint64, error stores = allStores } safeTS := store.getMinSafeTSByStores(stores) - startTs = *options.MinStartTS + startTs = *option.MinStartTS // If the safeTS is larger than the minStartTS, we will use safeTS as StartTS, otherwise we will use // minStartTS directly. if startTs < safeTS { startTs = safeTS } - } else if options.MaxPrevSec != nil { + } else if option.MaxPrevSec != nil { bo := NewBackofferWithVars(context.Background(), tsoMaxBackoff, nil) - minStartTS, err := store.getStalenessTimestamp(bo, options.TxnScope, *options.MaxPrevSec) + minStartTS, err := store.getStalenessTimestamp(bo, option.TxnScope, *option.MaxPrevSec) if err != nil { return 0, errors.Trace(err) } - options.MinStartTS = &minStartTS - return extractStartTs(store, options) + option.MinStartTS = &minStartTS + return ExtractStartTs(store, option) } else { bo := NewBackofferWithVars(context.Background(), tsoMaxBackoff, nil) - startTs, err = store.getTimestampWithRetry(bo, options.TxnScope) + startTs, err = store.getTimestampWithRetry(bo, option.TxnScope) } return startTs, err } -func newTiKVTxnWithOptions(store *KVStore, options kv.TransactionOption) (*KVTxn, error) { +func newTiKVTxnWithOptions(store *KVStore, options StartTSOption) (*KVTxn, error) { if options.TxnScope == "" { options.TxnScope = oracle.GlobalTxnScope } - startTs, err := extractStartTs(store, options) + startTs, err := ExtractStartTs(store, options) if err != nil { return nil, errors.Trace(err) } diff --git a/util/mock/context.go b/util/mock/context.go index d6a5f1d913902..6df2c9c10d356 100644 --- a/util/mock/context.go +++ b/util/mock/context.go @@ -26,6 +26,7 @@ import ( "github.com/pingcap/tidb/owner" "github.com/pingcap/tidb/sessionctx" "github.com/pingcap/tidb/sessionctx/variable" + "github.com/pingcap/tidb/store/tikv" "github.com/pingcap/tidb/store/tikv/oracle" "github.com/pingcap/tidb/util" "github.com/pingcap/tidb/util/disk" @@ -204,7 +205,7 @@ func (c *Context) InitTxnWithStartTS(startTS uint64) error { return nil } if c.Store != nil { - txn, err := c.Store.BeginWithOption(kv.TransactionOption{}.SetTxnScope(oracle.GlobalTxnScope).SetStartTs(startTS)) + txn, err := c.Store.BeginWithOption(tikv.DefaultStartTSOption().SetTxnScope(oracle.GlobalTxnScope).SetStartTs(startTS)) if err != nil { return errors.Trace(err) } diff --git a/util/mock/store.go b/util/mock/store.go index 7c86de4b3cb72..3adba59e115e5 100644 --- a/util/mock/store.go +++ b/util/mock/store.go @@ -17,6 +17,7 @@ import ( "context" "github.com/pingcap/tidb/kv" + "github.com/pingcap/tidb/store/tikv" "github.com/pingcap/tidb/store/tikv/oracle" ) @@ -38,7 +39,7 @@ func (s *Store) GetOracle() oracle.Oracle { return nil } func (s *Store) Begin() (kv.Transaction, error) { return nil, nil } // BeginWithOption implements kv.Storage interface. -func (s *Store) BeginWithOption(option kv.TransactionOption) (kv.Transaction, error) { +func (s *Store) BeginWithOption(option tikv.StartTSOption) (kv.Transaction, error) { return s.Begin() } diff --git a/util/sem/sem.go b/util/sem/sem.go index d29d29b601559..1aac6d0a9a999 100644 --- a/util/sem/sem.go +++ b/util/sem/sem.go @@ -138,6 +138,7 @@ func IsInvisibleSysVar(varNameInLower string) bool { variable.TiDBCheckMb4ValueInUTF8, variable.TiDBConfig, variable.TiDBEnableSlowLog, + variable.TiDBEnableTelemetry, variable.TiDBExpensiveQueryTimeThreshold, variable.TiDBForcePriority, variable.TiDBGeneralLog, @@ -146,12 +147,13 @@ func IsInvisibleSysVar(varNameInLower string) bool { variable.TiDBOptWriteRowID, variable.TiDBPProfSQLCPU, variable.TiDBRecordPlanInSlowLog, + variable.TiDBRowFormatVersion, variable.TiDBSlowQueryFile, variable.TiDBSlowLogThreshold, variable.TiDBEnableCollectExecutionInfo, variable.TiDBMemoryUsageAlarmRatio, - variable.TiDBEnableTelemetry, - variable.TiDBRowFormatVersion: + variable.TiDBRedactLog, + variable.TiDBSlowLogMasking: return true } return false diff --git a/util/sem/sem_test.go b/util/sem/sem_test.go index 073a195139c37..c2a54170dcf99 100644 --- a/util/sem/sem_test.go +++ b/util/sem/sem_test.go @@ -98,4 +98,6 @@ func (s *testSecurity) TestIsInvisibleSysVar(c *C) { c.Assert(IsInvisibleSysVar(variable.TiDBMemoryUsageAlarmRatio), IsTrue) c.Assert(IsInvisibleSysVar(variable.TiDBEnableTelemetry), IsTrue) c.Assert(IsInvisibleSysVar(variable.TiDBRowFormatVersion), IsTrue) + c.Assert(IsInvisibleSysVar(variable.TiDBRedactLog), IsTrue) + c.Assert(IsInvisibleSysVar(variable.TiDBSlowLogMasking), IsTrue) } diff --git a/util/testkit/testkit.go b/util/testkit/testkit.go index 7cacaf211375e..ef0a0858f76cf 100644 --- a/util/testkit/testkit.go +++ b/util/testkit/testkit.go @@ -271,6 +271,21 @@ func (tk *TestKit) MustPartition(sql string, partitions string, args ...interfac return tk.MustQuery(sql, args...) } +// UsedPartitions returns the partition names that will be used or all/dual. +func (tk *TestKit) UsedPartitions(sql string, args ...interface{}) *Result { + rs := tk.MustQuery("explain "+sql, args...) + var usedPartitions [][]string + for i := range rs.rows { + index := strings.Index(rs.rows[i][3], "partition:") + if index != -1 { + p := rs.rows[i][3][index+len("partition:"):] + partitions := strings.Split(strings.SplitN(p, " ", 2)[0], ",") + usedPartitions = append(usedPartitions, partitions) + } + } + return &Result{rows: usedPartitions, c: tk.c, comment: check.Commentf("sql:%s, args:%v", sql, args)} +} + // MustUseIndex checks if the result execution plan contains specific index(es). func (tk *TestKit) MustUseIndex(sql string, index string, args ...interface{}) bool { rs := tk.MustQuery("explain "+sql, args...) @@ -312,6 +327,19 @@ func (tk *TestKit) MustQuery(sql string, args ...interface{}) *Result { return tk.ResultSetToResult(rs, comment) } +// MayQuery query the statements and returns result rows if result set is returned. +// If expected result is set it asserts the query result equals expected result. +func (tk *TestKit) MayQuery(sql string, args ...interface{}) *Result { + comment := check.Commentf("sql:%s, args:%v", sql, args) + rs, err := tk.Exec(sql, args...) + tk.c.Assert(errors.ErrorStack(err), check.Equals, "", comment) + if rs == nil { + var emptyStringAoA [][]string + return &Result{rows: emptyStringAoA, c: tk.c, comment: comment} + } + return tk.ResultSetToResult(rs, comment) +} + // QueryToErr executes a sql statement and discard results. func (tk *TestKit) QueryToErr(sql string, args ...interface{}) error { comment := check.Commentf("sql:%s, args:%v", sql, args)