From 0158f71cda6ccbbd382c6e8759904234baca259c Mon Sep 17 00:00:00 2001 From: tangenta Date: Tue, 1 Jun 2021 18:59:37 +0800 Subject: [PATCH 1/6] ddl: support type conversion between non-varchar and varchar (#24959) --- ddl/column.go | 14 +++ ddl/column_test.go | 2 + ddl/column_type_change_test.go | 213 +++++++++++++++++++++------------ ddl/db_test.go | 8 +- ddl/ddl_api.go | 10 +- executor/ddl_test.go | 6 +- 6 files changed, 168 insertions(+), 85 deletions(-) diff --git a/ddl/column.go b/ddl/column.go index 17abe6aefbe08..7d93c4e15668d 100644 --- a/ddl/column.go +++ b/ddl/column.go @@ -41,6 +41,7 @@ import ( "github.com/pingcap/tidb/tablecodec" "github.com/pingcap/tidb/types" "github.com/pingcap/tidb/util" + "github.com/pingcap/tidb/util/collate" "github.com/pingcap/tidb/util/logutil" decoder "github.com/pingcap/tidb/util/rowDecoder" "github.com/pingcap/tidb/util/sqlexec" @@ -712,6 +713,10 @@ func needChangeColumnData(oldCol, newCol *model.ColumnInfo) bool { return needTruncationOrToggleSign() } + if convertBetweenCharAndVarchar(oldCol.Tp, newCol.Tp) { + return true + } + // Deal with the different type. switch oldCol.Tp { case mysql.TypeVarchar, mysql.TypeString, mysql.TypeVarString, mysql.TypeBlob, mysql.TypeTinyBlob, mysql.TypeMediumBlob, mysql.TypeLongBlob: @@ -734,6 +739,15 @@ func needChangeColumnData(oldCol, newCol *model.ColumnInfo) bool { return true } +// Column type conversion between varchar to char need reorganization because +// 1. varchar -> char: char type is stored with the padding removed. All the indexes need to be rewritten. +// 2. char -> varchar: the index value encoding of secondary index on clustered primary key tables is different. +// These secondary indexes need to be rewritten. +func convertBetweenCharAndVarchar(oldCol, newCol byte) bool { + return (types.IsTypeVarchar(oldCol) && newCol == mysql.TypeString) || + (oldCol == mysql.TypeString && types.IsTypeVarchar(newCol) && collate.NewCollationEnabled()) +} + func isElemsChangedToModifyColumn(oldElems, newElems []string) bool { if len(newElems) < len(oldElems) { return true diff --git a/ddl/column_test.go b/ddl/column_test.go index f3eaa26d22385..10b883a76935a 100644 --- a/ddl/column_test.go +++ b/ddl/column_test.go @@ -1170,6 +1170,8 @@ func (s *testColumnSuite) TestModifyColumn(c *C) { }{ {"int", "bigint", nil}, {"int", "int unsigned", errUnsupportedModifyColumn.GenWithStackByArgs("can't change unsigned integer to signed or vice versa, and tidb_enable_change_column_type is false")}, + {"varchar(10)", "text", nil}, + {"varbinary(10)", "blob", nil}, {"text", "blob", errUnsupportedModifyCharset.GenWithStackByArgs("charset from utf8mb4 to binary")}, {"varchar(10)", "varchar(8)", errUnsupportedModifyColumn.GenWithStackByArgs("length 8 is less than origin 10, and tidb_enable_change_column_type is false")}, {"varchar(10)", "varchar(11)", nil}, diff --git a/ddl/column_type_change_test.go b/ddl/column_type_change_test.go index 0b6234a21d924..32418dd031c29 100644 --- a/ddl/column_type_change_test.go +++ b/ddl/column_type_change_test.go @@ -36,6 +36,7 @@ import ( "github.com/pingcap/tidb/table" "github.com/pingcap/tidb/table/tables" "github.com/pingcap/tidb/tablecodec" + "github.com/pingcap/tidb/util/collate" "github.com/pingcap/tidb/util/dbterror" "github.com/pingcap/tidb/util/testkit" ) @@ -317,10 +318,14 @@ func (s *testColumnTypeChangeSuite) TestColumnTypeChangeFromIntegerToOthers(c *C // integer to string prepare(tk) - tk.MustGetErrCode("alter table t modify a varchar(10)", mysql.ErrUnsupportedDDLOperation) + tk.MustExec("alter table t modify a varchar(10)") + modifiedColumn := getModifyColumn(c, tk.Se, "test", "t", "a", false) + c.Assert(modifiedColumn, NotNil) + c.Assert(modifiedColumn.Tp, Equals, parser_mysql.TypeVarchar) + tk.MustQuery("select a from t").Check(testkit.Rows("1")) tk.MustExec("alter table t modify b char(10)") - modifiedColumn := getModifyColumn(c, tk.Se, "test", "t", "b", false) + modifiedColumn = getModifyColumn(c, tk.Se, "test", "t", "b", false) c.Assert(modifiedColumn, NotNil) c.Assert(modifiedColumn.Tp, Equals, parser_mysql.TypeString) tk.MustQuery("select b from t").Check(testkit.Rows("11")) @@ -331,7 +336,11 @@ func (s *testColumnTypeChangeSuite) TestColumnTypeChangeFromIntegerToOthers(c *C c.Assert(modifiedColumn.Tp, Equals, parser_mysql.TypeString) tk.MustQuery("select c from t").Check(testkit.Rows("111\x00\x00\x00\x00\x00\x00\x00")) - tk.MustGetErrCode("alter table t modify d varbinary(10)", mysql.ErrUnsupportedDDLOperation) + tk.MustExec("alter table t modify d varbinary(10)") + modifiedColumn = getModifyColumn(c, tk.Se, "test", "t", "d", false) + c.Assert(modifiedColumn, NotNil) + c.Assert(modifiedColumn.Tp, Equals, parser_mysql.TypeVarchar) + tk.MustQuery("select d from t").Check(testkit.Rows("1111")) tk.MustExec("alter table t modify e blob(10)") modifiedColumn = getModifyColumn(c, tk.Se, "test", "t", "e", false) @@ -445,7 +454,9 @@ func (s *testColumnTypeChangeSuite) TestColumnTypeChangeFromIntegerToOthers(c *C func (s *testColumnTypeChangeSuite) TestColumnTypeChangeBetweenVarcharAndNonVarchar(c *C) { tk := testkit.NewTestKit(c, s.store) tk.MustExec("use test") - + tk.Se.GetSessionVars().EnableChangeColumnType = false + collate.SetNewCollationEnabledForTest(true) + defer collate.SetNewCollationEnabledForTest(false) tk.MustExec("drop database if exists col_type_change_char;") tk.MustExec("create database col_type_change_char;") tk.MustExec("use col_type_change_char;") @@ -461,6 +472,26 @@ func (s *testColumnTypeChangeSuite) TestColumnTypeChangeBetweenVarcharAndNonVarc tk.MustGetErrCode("alter table t change column a a varchar(10);", mysql.ErrUnsupportedDDLOperation) tk.MustGetErrCode("alter table t change column b b char(10);", mysql.ErrUnsupportedDDLOperation) tk.MustExec("admin check table t;") + + tk.Se.GetSessionVars().EnableChangeColumnType = true + // https://github.com/pingcap/tidb/issues/23624 + tk.MustExec("drop table if exists t;") + tk.MustExec("create table t(b varchar(10));") + tk.MustExec("insert into t values ('aaa ');") + tk.MustExec("alter table t change column b b char(10);") + tk.MustExec("alter table t add index idx(b);") + tk.MustExec("alter table t change column b b varchar(10);") + tk.MustQuery("select b from t use index(idx);").Check(testkit.Rows("aaa")) + tk.MustQuery("select b from t ignore index(idx);").Check(testkit.Rows("aaa")) + tk.MustExec("admin check table t;") + + // https://github.com/pingcap/tidb/pull/23688#issuecomment-810166597 + tk.MustExec("drop table if exists t;") + tk.MustExec("create table t (a varchar(10));") + tk.MustExec("insert into t values ('aaa ');") + tk.MustQuery("select a from t;").Check(testkit.Rows("aaa ")) + tk.MustExec("alter table t modify column a char(10);") + tk.MustQuery("select a from t;").Check(testkit.Rows("aaa")) } func (s *testColumnTypeChangeSuite) TestColumnTypeChangeFromStringToOthers(c *C) { @@ -480,11 +511,12 @@ func (s *testColumnTypeChangeSuite) TestColumnTypeChangeFromStringToOthers(c *C) // Init string date type table. reset := func(tk *testkit.TestKit) { tk.MustExec("drop table if exists t") - // FIXME(tangenta): not support changing from varchar/varbinary to other types. tk.MustExec(` create table t ( c char(8), + vc varchar(8), bny binary(8), + vbny varbinary(8), bb blob, txt text, e enum('123', '2020-07-15 18:32:17.888', 'str', '{"k1": "value"}'), @@ -496,147 +528,181 @@ func (s *testColumnTypeChangeSuite) TestColumnTypeChangeFromStringToOthers(c *C) // To numeric data types. // tinyint reset(tk) - tk.MustExec("insert into t values ('123', '123', '123', '123', '123', '123')") + tk.MustExec("insert into t values ('123', '123', '123', '123', '123', '123', '123', '123')") tk.MustExec("alter table t modify c tinyint") + tk.MustExec("alter table t modify vc tinyint") tk.MustExec("alter table t modify bny tinyint") + tk.MustExec("alter table t modify vbny tinyint") tk.MustExec("alter table t modify bb tinyint") tk.MustExec("alter table t modify txt tinyint") tk.MustExec("alter table t modify e tinyint") tk.MustExec("alter table t modify s tinyint") - tk.MustQuery("select * from t").Check(testkit.Rows("123 123 123 123 1 1")) + tk.MustQuery("select * from t").Check(testkit.Rows("123 123 123 123 123 123 1 1")) // int reset(tk) - tk.MustExec("insert into t values ('17305', '17305', '17305', '17305', '123', '123')") + tk.MustExec("insert into t values ('17305', '17305', '17305', '17305', '17305', '17305', '123', '123')") tk.MustExec("alter table t modify c int") + tk.MustExec("alter table t modify vc int") tk.MustExec("alter table t modify bny int") + tk.MustExec("alter table t modify vbny int") tk.MustExec("alter table t modify bb int") tk.MustExec("alter table t modify txt int") tk.MustExec("alter table t modify e int") tk.MustExec("alter table t modify s int") - tk.MustQuery("select * from t").Check(testkit.Rows("17305 17305 17305 17305 1 1")) + tk.MustQuery("select * from t").Check(testkit.Rows("17305 17305 17305 17305 17305 17305 1 1")) // bigint reset(tk) - tk.MustExec("insert into t values ('17305867', '17305867', '17305867', '17305867', '123', '123')") + tk.MustExec("insert into t values ('17305867', '17305867', '17305867', '17305867', '17305867', '17305867', '123', '123')") tk.MustExec("alter table t modify c bigint") + tk.MustExec("alter table t modify vc bigint") tk.MustExec("alter table t modify bny bigint") + tk.MustExec("alter table t modify vbny bigint") tk.MustExec("alter table t modify bb bigint") tk.MustExec("alter table t modify txt bigint") tk.MustExec("alter table t modify e bigint") tk.MustExec("alter table t modify s bigint") - tk.MustQuery("select * from t").Check(testkit.Rows("17305867 17305867 17305867 17305867 1 1")) + tk.MustQuery("select * from t").Check(testkit.Rows("17305867 17305867 17305867 17305867 17305867 17305867 1 1")) // bit reset(tk) - tk.MustExec("insert into t values ('1', '1', '1', '1', '123', '123')") + tk.MustExec("insert into t values ('1', '1', '1', '1', '1', '1', '123', '123')") tk.MustGetErrCode("alter table t modify c bit", mysql.ErrUnsupportedDDLOperation) + tk.MustGetErrCode("alter table t modify vc bit", mysql.ErrUnsupportedDDLOperation) tk.MustGetErrCode("alter table t modify bny bit", mysql.ErrUnsupportedDDLOperation) + tk.MustGetErrCode("alter table t modify vbny bit", mysql.ErrUnsupportedDDLOperation) tk.MustGetErrCode("alter table t modify bb bit", mysql.ErrUnsupportedDDLOperation) tk.MustGetErrCode("alter table t modify txt bit", mysql.ErrUnsupportedDDLOperation) tk.MustExec("alter table t modify e bit") tk.MustExec("alter table t modify s bit") - tk.MustQuery("select * from t").Check(testkit.Rows("1 1\x00\x00\x00\x00\x00\x00\x00 1 1 \x01 \x01")) + tk.MustQuery("select * from t").Check(testkit.Rows("1 1 1\x00\x00\x00\x00\x00\x00\x00 1 1 1 \x01 \x01")) // decimal reset(tk) - tk.MustExec("insert into t values ('123.45', '123.45', '123.45', '123.45', '123', '123')") + tk.MustExec("insert into t values ('123.45', '123.45', '123.45', '123.45', '123.45', '123.45', '123', '123')") tk.MustExec("alter table t modify c decimal(7, 4)") + tk.MustExec("alter table t modify vc decimal(7, 4)") tk.MustExec("alter table t modify bny decimal(7, 4)") + tk.MustExec("alter table t modify vbny decimal(7, 4)") tk.MustExec("alter table t modify bb decimal(7, 4)") tk.MustExec("alter table t modify txt decimal(7, 4)") tk.MustExec("alter table t modify e decimal(7, 4)") tk.MustExec("alter table t modify s decimal(7, 4)") - tk.MustQuery("select * from t").Check(testkit.Rows("123.4500 123.4500 123.4500 123.4500 1.0000 1.0000")) + tk.MustQuery("select * from t").Check(testkit.Rows("123.4500 123.4500 123.4500 123.4500 123.4500 123.4500 1.0000 1.0000")) // double reset(tk) - tk.MustExec("insert into t values ('123.45', '123.45', '123.45', '123.45', '123', '123')") + tk.MustExec("insert into t values ('123.45', '123.45', '123.45', '123.45', '123.45', '123.45', '123', '123')") tk.MustExec("alter table t modify c double(7, 4)") + tk.MustExec("alter table t modify vc double(7, 4)") tk.MustExec("alter table t modify bny double(7, 4)") + tk.MustExec("alter table t modify vbny double(7, 4)") tk.MustExec("alter table t modify bb double(7, 4)") tk.MustExec("alter table t modify txt double(7, 4)") tk.MustExec("alter table t modify e double(7, 4)") tk.MustExec("alter table t modify s double(7, 4)") - tk.MustQuery("select * from t").Check(testkit.Rows("123.45 123.45 123.45 123.45 1 1")) + tk.MustQuery("select * from t").Check(testkit.Rows("123.45 123.45 123.45 123.45 123.45 123.45 1 1")) // To date and time data types. // date reset(tk) - tk.MustExec("insert into t values ('20200826', '20200826', '2020-08-26', '08-26 19:35:41', '2020-07-15 18:32:17.888', '2020-07-15 18:32:17.888')") + tk.MustExec("insert into t values ('20200826', '2008261', '20200826', '200826', '2020-08-26', '08-26 19:35:41', '2020-07-15 18:32:17.888', '2020-07-15 18:32:17.888')") tk.MustExec("alter table t modify c date") + tk.MustExec("alter table t modify vc date") tk.MustExec("alter table t modify bny date") + tk.MustExec("alter table t modify vbny date") tk.MustExec("alter table t modify bb date") // Alter text '08-26 19:35:41' to date will error. (same as mysql does) tk.MustGetErrCode("alter table t modify txt date", mysql.ErrTruncatedWrongValue) tk.MustGetErrCode("alter table t modify e date", mysql.ErrUnsupportedDDLOperation) tk.MustGetErrCode("alter table t modify s date", mysql.ErrUnsupportedDDLOperation) - tk.MustQuery("select * from t").Check(testkit.Rows("2020-08-26 2020-08-26 2020-08-26 08-26 19:35:41 2020-07-15 18:32:17.888 2020-07-15 18:32:17.888")) + tk.MustQuery("select * from t").Check(testkit.Rows("2020-08-26 2020-08-26 2020-08-26 2020-08-26 2020-08-26 08-26 19:35:41 2020-07-15 18:32:17.888 2020-07-15 18:32:17.888")) // time reset(tk) - tk.MustExec("insert into t values ('19:35:41', '19:35:41', '19:35:41.45678', '19:35:41.45678', '2020-07-15 18:32:17.888', '2020-07-15 18:32:17.888')") + tk.MustExec("insert into t values ('19:35:41', '19:35:41', '19:35:41', '19:35:41', '19:35:41.45678', '19:35:41.45678', '2020-07-15 18:32:17.888', '2020-07-15 18:32:17.888')") tk.MustExec("alter table t modify c time") + tk.MustExec("alter table t modify vc time") tk.MustExec("alter table t modify bny time") + tk.MustExec("alter table t modify vbny time") tk.MustExec("alter table t modify bb time") tk.MustExec("alter table t modify txt time") tk.MustGetErrCode("alter table t modify e time", mysql.ErrUnsupportedDDLOperation) tk.MustGetErrCode("alter table t modify s time", mysql.ErrUnsupportedDDLOperation) - tk.MustQuery("select * from t").Check(testkit.Rows("19:35:41 19:35:41 19:35:41 19:35:41 2020-07-15 18:32:17.888 2020-07-15 18:32:17.888")) + tk.MustQuery("select * from t").Check(testkit.Rows("19:35:41 19:35:41 19:35:41 19:35:41 19:35:41 19:35:41 2020-07-15 18:32:17.888 2020-07-15 18:32:17.888")) // datetime reset(tk) tk.MustExec("alter table t modify c char(23)") + tk.MustExec("alter table t modify vc varchar(23)") tk.MustExec("alter table t modify bny binary(23)") - tk.MustExec("insert into t values ('2020-07-15 18:32:17.888', '2020-07-15 18:32:17.888', '2020-07-15 18:32:17.888', '2020-07-15 18:32:17.888', '2020-07-15 18:32:17.888', '2020-07-15 18:32:17.888')") + tk.MustExec("alter table t modify vbny varbinary(23)") + tk.MustExec("insert into t values ('2020-07-15 18:32:17.888', '2020-07-15 18:32:17.888', '2020-07-15 18:32:17.888', '2020-07-15 18:32:17.888', '2020-07-15 18:32:17.888', '2020-07-15 18:32:17.888', '2020-07-15 18:32:17.888', '2020-07-15 18:32:17.888')") tk.MustExec("alter table t modify c datetime") + tk.MustExec("alter table t modify vc datetime") tk.MustExec("alter table t modify bny datetime") + tk.MustExec("alter table t modify vbny datetime") tk.MustExec("alter table t modify bb datetime") tk.MustExec("alter table t modify txt datetime") tk.MustGetErrCode("alter table t modify e datetime", mysql.ErrUnsupportedDDLOperation) tk.MustGetErrCode("alter table t modify s datetime", mysql.ErrUnsupportedDDLOperation) - tk.MustQuery("select * from t").Check(testkit.Rows("2020-07-15 18:32:18 2020-07-15 18:32:18 2020-07-15 18:32:18 2020-07-15 18:32:18 2020-07-15 18:32:17.888 2020-07-15 18:32:17.888")) + tk.MustQuery("select * from t").Check(testkit.Rows("2020-07-15 18:32:18 2020-07-15 18:32:18 2020-07-15 18:32:18 2020-07-15 18:32:18 2020-07-15 18:32:18 2020-07-15 18:32:18 2020-07-15 18:32:17.888 2020-07-15 18:32:17.888")) // timestamp reset(tk) tk.MustExec("alter table t modify c char(23)") + tk.MustExec("alter table t modify vc varchar(23)") tk.MustExec("alter table t modify bny binary(23)") - tk.MustExec("insert into t values ('2020-07-15 18:32:17.888', '2020-07-15 18:32:17.888', '2020-07-15 18:32:17.888', '2020-07-15 18:32:17.888', '2020-07-15 18:32:17.888', '2020-07-15 18:32:17.888')") + tk.MustExec("alter table t modify vbny varbinary(23)") + tk.MustExec("insert into t values ('2020-07-15 18:32:17.888', '2020-07-15 18:32:17.888', '2020-07-15 18:32:17.888', '2020-07-15 18:32:17.888', '2020-07-15 18:32:17.888', '2020-07-15 18:32:17.888', '2020-07-15 18:32:17.888', '2020-07-15 18:32:17.888')") tk.MustExec("alter table t modify c timestamp") + tk.MustExec("alter table t modify vc timestamp") tk.MustExec("alter table t modify bny timestamp") + tk.MustExec("alter table t modify vbny timestamp") tk.MustExec("alter table t modify bb timestamp") tk.MustExec("alter table t modify txt timestamp") tk.MustGetErrCode("alter table t modify e timestamp", mysql.ErrUnsupportedDDLOperation) tk.MustGetErrCode("alter table t modify s timestamp", mysql.ErrUnsupportedDDLOperation) - tk.MustQuery("select * from t").Check(testkit.Rows("2020-07-15 18:32:18 2020-07-15 18:32:18 2020-07-15 18:32:18 2020-07-15 18:32:18 2020-07-15 18:32:17.888 2020-07-15 18:32:17.888")) + tk.MustQuery("select * from t").Check(testkit.Rows("2020-07-15 18:32:18 2020-07-15 18:32:18 2020-07-15 18:32:18 2020-07-15 18:32:18 2020-07-15 18:32:18 2020-07-15 18:32:18 2020-07-15 18:32:17.888 2020-07-15 18:32:17.888")) // year reset(tk) - tk.MustExec("insert into t values ('2020', '2', '20', '99', '2020-07-15 18:32:17.888', '2020-07-15 18:32:17.888')") + tk.MustExec("insert into t values ('2020', '91', '2', '2020', '20', '99', '2020-07-15 18:32:17.888', '2020-07-15 18:32:17.888')") tk.MustExec("alter table t modify c year") + tk.MustExec("alter table t modify vc year") tk.MustExec("alter table t modify bny year") + tk.MustExec("alter table t modify vbny year") tk.MustExec("alter table t modify bb year") tk.MustExec("alter table t modify txt year") tk.MustExec("alter table t modify e year") tk.MustExec("alter table t modify s year") - tk.MustQuery("select * from t").Check(testkit.Rows("2020 2002 2020 1999 2002 2002")) + tk.MustQuery("select * from t").Check(testkit.Rows("2020 1991 2002 2020 2020 1999 2002 2002")) // To json data type. reset(tk) tk.MustExec("alter table t modify c char(15)") + tk.MustExec("alter table t modify vc varchar(15)") tk.MustExec("alter table t modify bny binary(15)") - tk.MustExec("insert into t values ('{\"k1\": \"value\"}', '{\"k1\": \"value\"}', '{\"k1\": \"value\"}', '{\"k1\": \"value\"}', '{\"k1\": \"value\"}', '{\"k1\": \"value\"}')") + tk.MustExec("alter table t modify vbny varbinary(15)") + tk.MustExec("insert into t values ('{\"k1\": \"value\"}', '{\"k1\": \"value\"}', '{\"k1\": \"value\"}', '{\"k1\": \"value\"}', '{\"k1\": \"value\"}', '{\"k1\": \"value\"}', '{\"k1\": \"value\"}', '{\"k1\": \"value\"}')") tk.MustExec("alter table t modify c json") + tk.MustExec("alter table t modify vc json") tk.MustExec("alter table t modify bny json") + tk.MustExec("alter table t modify vbny json") tk.MustExec("alter table t modify bb json") tk.MustExec("alter table t modify txt json") tk.MustExec("alter table t modify e json") tk.MustExec("alter table t modify s json") - tk.MustQuery("select * from t").Check(testkit.Rows("{\"k1\": \"value\"} {\"k1\": \"value\"} {\"k1\": \"value\"} {\"k1\": \"value\"} \"{\\\"k1\\\": \\\"value\\\"}\" \"{\\\"k1\\\": \\\"value\\\"}\"")) + tk.MustQuery("select * from t").Check(testkit.Rows("{\"k1\": \"value\"} {\"k1\": \"value\"} {\"k1\": \"value\"} {\"k1\": \"value\"} {\"k1\": \"value\"} {\"k1\": \"value\"} \"{\\\"k1\\\": \\\"value\\\"}\" \"{\\\"k1\\\": \\\"value\\\"}\"")) reset(tk) - tk.MustExec("insert into t values ('123x', 'abc', 'timestamp', 'date', '123', '123')") + tk.MustExec("insert into t values ('123x', 'x123', 'abc', 'datetime', 'timestamp', 'date', '123', '123')") tk.MustGetErrCode("alter table t modify c int", mysql.ErrTruncatedWrongValue) + tk.MustGetErrCode("alter table t modify vc smallint", mysql.ErrTruncatedWrongValue) + tk.MustGetErrCode("alter table t modify bny bigint", mysql.ErrTruncatedWrongValue) + tk.MustGetErrCode("alter table t modify vbny datetime", mysql.ErrTruncatedWrongValue) + tk.MustGetErrCode("alter table t modify bb timestamp", mysql.ErrTruncatedWrongValue) tk.MustGetErrCode("alter table t modify txt date", mysql.ErrTruncatedWrongValue) reset(tk) - tk.MustExec("alter table t add vc char(20)") + tk.MustExec("alter table t modify vc varchar(20)") tk.MustExec("insert into t(c, vc) values ('1x', '20200915110836')") tk.MustGetErrCode("alter table t modify c year", mysql.ErrTruncatedWrongValue) @@ -649,7 +715,6 @@ func (s *testColumnTypeChangeSuite) TestColumnTypeChangeFromStringToOthers(c *C) // Both error but different error message. // MySQL will get "ERROR 3140 (22032): Invalid JSON text: "The document root must not be followed by other values." at position 1 in value for column '#sql-5b_42.c'." error. reset(tk) - tk.MustExec("alter table t add vc char(20)") tk.MustExec("alter table t modify c char(15)") tk.MustExec("insert into t(c) values ('{\"k1\": \"value\"')") tk.MustGetErrCode("alter table t modify c json", mysql.ErrInvalidJSONText) @@ -757,15 +822,15 @@ func (s *testColumnTypeChangeSuite) TestColumnTypeChangeFromNumericToOthers(c *C // varchar reset(tk) tk.MustExec("insert into t values (-258.12345, 333.33, 2000000.20000002, 323232323.3232323232, -111.11111111, -222222222222.222222222222222, b'10101')") - tk.MustGetErrCode("alter table t modify d varchar(30)", mysql.ErrUnsupportedDDLOperation) - tk.MustGetErrCode("alter table t modify n varchar(30)", mysql.ErrUnsupportedDDLOperation) - tk.MustGetErrCode("alter table t modify r varchar(30)", mysql.ErrUnsupportedDDLOperation) - tk.MustGetErrCode("alter table t modify db varchar(30)", mysql.ErrUnsupportedDDLOperation) + tk.MustExec("alter table t modify d varchar(30)") + tk.MustExec("alter table t modify n varchar(30)") + tk.MustExec("alter table t modify r varchar(30)") + tk.MustExec("alter table t modify db varchar(30)") // MySQL will get "-111.111" rather than "-111.111115" at TiDB. - tk.MustGetErrCode("alter table t modify f32 varchar(30)", mysql.ErrUnsupportedDDLOperation) + tk.MustExec("alter table t modify f32 varchar(30)") // MySQL will get "ERROR 1406 (22001): Data truncation: Data too long for column 'f64' at row 1". - tk.MustGetErrCode("alter table t modify f64 varchar(30)", mysql.ErrUnsupportedDDLOperation) - tk.MustGetErrCode("alter table t modify b varchar(30)", mysql.ErrUnsupportedDDLOperation) + tk.MustExec("alter table t modify f64 varchar(30)") + tk.MustExec("alter table t modify b varchar(30)") tk.MustQuery("select * from t").Check(testkit.Rows("-258.1234500 333.33 2000000.20000002 323232323.32323235 -111.111115 -222222222222.22223 \x15")) // binary @@ -784,15 +849,15 @@ func (s *testColumnTypeChangeSuite) TestColumnTypeChangeFromNumericToOthers(c *C // varbinary reset(tk) tk.MustExec("insert into t values (-258.12345, 333.33, 2000000.20000002, 323232323.3232323232, -111.11111111, -222222222222.222222222222222, b'10101')") - tk.MustGetErrCode("alter table t modify d varbinary(30)", mysql.ErrUnsupportedDDLOperation) - tk.MustGetErrCode("alter table t modify n varbinary(30)", mysql.ErrUnsupportedDDLOperation) - tk.MustGetErrCode("alter table t modify r varbinary(30)", mysql.ErrUnsupportedDDLOperation) - tk.MustGetErrCode("alter table t modify db varbinary(30)", mysql.ErrUnsupportedDDLOperation) + tk.MustExec("alter table t modify d varbinary(30)") + tk.MustExec("alter table t modify n varbinary(30)") + tk.MustExec("alter table t modify r varbinary(30)") + tk.MustExec("alter table t modify db varbinary(30)") // MySQL will get "-111.111" rather than "-111.111115" at TiDB. - tk.MustGetErrCode("alter table t modify f32 varbinary(30)", mysql.ErrUnsupportedDDLOperation) + tk.MustExec("alter table t modify f32 varbinary(30)") // MySQL will get "ERROR 1406 (22001): Data truncation: Data too long for column 'f64' at row 1". - tk.MustGetErrCode("alter table t modify f64 varbinary(30)", mysql.ErrUnsupportedDDLOperation) - tk.MustGetErrCode("alter table t modify b varbinary(30)", mysql.ErrUnsupportedDDLOperation) + tk.MustExec("alter table t modify f64 varbinary(30)") + tk.MustExec("alter table t modify b varbinary(30)") tk.MustQuery("select * from t").Check(testkit.Rows("-258.1234500 333.33 2000000.20000002 323232323.32323235 -111.111115 -222222222222.22223 \x15")) // blob @@ -1070,11 +1135,11 @@ func (s *testColumnTypeChangeSuite) TestColumnTypeChangeFromDateTimeTypeToOthers // varchar reset(tk) tk.MustExec("insert into t values ('2020-10-30', '19:38:25.001', 20201030082133.455555, 20201030082133.455555, 2020)") - tk.MustGetErrCode("alter table t modify d varchar(30)", mysql.ErrUnsupportedDDLOperation) - tk.MustGetErrCode("alter table t modify t varchar(30)", mysql.ErrUnsupportedDDLOperation) - tk.MustGetErrCode("alter table t modify dt varchar(30)", mysql.ErrUnsupportedDDLOperation) - tk.MustGetErrCode("alter table t modify tmp varchar(30)", mysql.ErrUnsupportedDDLOperation) - tk.MustGetErrCode("alter table t modify y varchar(30)", mysql.ErrUnsupportedDDLOperation) + tk.MustExec("alter table t modify d varchar(30)") + tk.MustExec("alter table t modify t varchar(30)") + tk.MustExec("alter table t modify dt varchar(30)") + tk.MustExec("alter table t modify tmp varchar(30)") + tk.MustExec("alter table t modify y varchar(30)") tk.MustQuery("select * from t").Check(testkit.Rows("2020-10-30 19:38:25.001 2020-10-30 08:21:33.455555 2020-10-30 08:21:33.455555 2020")) // binary @@ -1094,11 +1159,11 @@ func (s *testColumnTypeChangeSuite) TestColumnTypeChangeFromDateTimeTypeToOthers // varbinary reset(tk) tk.MustExec("insert into t values ('2020-10-30', '19:38:25.001', 20201030082133.455555, 20201030082133.455555, 2020)") - tk.MustGetErrCode("alter table t modify d varbinary(30)", mysql.ErrUnsupportedDDLOperation) - tk.MustGetErrCode("alter table t modify t varbinary(30)", mysql.ErrUnsupportedDDLOperation) - tk.MustGetErrCode("alter table t modify dt varbinary(30)", mysql.ErrUnsupportedDDLOperation) - tk.MustGetErrCode("alter table t modify tmp varbinary(30)", mysql.ErrUnsupportedDDLOperation) - tk.MustGetErrCode("alter table t modify y varbinary(30)", mysql.ErrUnsupportedDDLOperation) + tk.MustExec("alter table t modify d varbinary(30)") + tk.MustExec("alter table t modify t varbinary(30)") + tk.MustExec("alter table t modify dt varbinary(30)") + tk.MustExec("alter table t modify tmp varbinary(30)") + tk.MustExec("alter table t modify y varbinary(30)") tk.MustQuery("select * from t").Check(testkit.Rows("2020-10-30 19:38:25.001 2020-10-30 08:21:33.455555 2020-10-30 08:21:33.455555 2020")) // text @@ -1336,15 +1401,15 @@ func (s *testColumnTypeChangeSuite) TestColumnTypeChangeFromJsonToOthers(c *C) { // varchar reset(tk) tk.MustExec("insert into t values ('{\"obj\": 100}', '[-1, 0, 1]', 'null', 'true', 'false', '-22', '22', '323232323.3232323232', '\"json string\"')") - tk.MustGetErrCode("alter table t modify obj varchar(20)", mysql.ErrUnsupportedDDLOperation) - tk.MustGetErrCode("alter table t modify arr varchar(20)", mysql.ErrUnsupportedDDLOperation) - tk.MustGetErrCode("alter table t modify nil varchar(20)", mysql.ErrUnsupportedDDLOperation) - tk.MustGetErrCode("alter table t modify t varchar(20)", mysql.ErrUnsupportedDDLOperation) - tk.MustGetErrCode("alter table t modify f varchar(20)", mysql.ErrUnsupportedDDLOperation) - tk.MustGetErrCode("alter table t modify i varchar(20)", mysql.ErrUnsupportedDDLOperation) - tk.MustGetErrCode("alter table t modify ui varchar(20)", mysql.ErrUnsupportedDDLOperation) - tk.MustGetErrCode("alter table t modify f64 varchar(20)", mysql.ErrUnsupportedDDLOperation) - tk.MustGetErrCode("alter table t modify str varchar(20)", mysql.ErrUnsupportedDDLOperation) + tk.MustExec("alter table t modify obj varchar(20)") + tk.MustExec("alter table t modify arr varchar(20)") + tk.MustExec("alter table t modify nil varchar(20)") + tk.MustExec("alter table t modify t varchar(20)") + tk.MustExec("alter table t modify f varchar(20)") + tk.MustExec("alter table t modify i varchar(20)") + tk.MustExec("alter table t modify ui varchar(20)") + tk.MustExec("alter table t modify f64 varchar(20)") + tk.MustExec("alter table t modify str varchar(20)") tk.MustQuery("select * from t").Check(testkit.Rows("{\"obj\": 100} [-1, 0, 1] null true false -22 22 323232323.32323235 \"json string\"")) // binary @@ -1372,15 +1437,15 @@ func (s *testColumnTypeChangeSuite) TestColumnTypeChangeFromJsonToOthers(c *C) { // varbinary reset(tk) tk.MustExec("insert into t values ('{\"obj\": 100}', '[-1, 0, 1]', 'null', 'true', 'false', '-22', '22', '323232323.3232323232', '\"json string\"')") - tk.MustGetErrCode("alter table t modify obj varbinary(20)", mysql.ErrUnsupportedDDLOperation) - tk.MustGetErrCode("alter table t modify arr varbinary(20)", mysql.ErrUnsupportedDDLOperation) - tk.MustGetErrCode("alter table t modify nil varbinary(20)", mysql.ErrUnsupportedDDLOperation) - tk.MustGetErrCode("alter table t modify t varbinary(20)", mysql.ErrUnsupportedDDLOperation) - tk.MustGetErrCode("alter table t modify f varbinary(20)", mysql.ErrUnsupportedDDLOperation) - tk.MustGetErrCode("alter table t modify i varbinary(20)", mysql.ErrUnsupportedDDLOperation) - tk.MustGetErrCode("alter table t modify ui varbinary(20)", mysql.ErrUnsupportedDDLOperation) - tk.MustGetErrCode("alter table t modify f64 varbinary(20)", mysql.ErrUnsupportedDDLOperation) - tk.MustGetErrCode("alter table t modify str varbinary(20)", mysql.ErrUnsupportedDDLOperation) + tk.MustExec("alter table t modify obj varbinary(20)") + tk.MustExec("alter table t modify arr varbinary(20)") + tk.MustExec("alter table t modify nil varbinary(20)") + tk.MustExec("alter table t modify t varbinary(20)") + tk.MustExec("alter table t modify f varbinary(20)") + tk.MustExec("alter table t modify i varbinary(20)") + tk.MustExec("alter table t modify ui varbinary(20)") + tk.MustExec("alter table t modify f64 varbinary(20)") + tk.MustExec("alter table t modify str varbinary(20)") tk.MustQuery("select * from t").Check(testkit.Rows("{\"obj\": 100} [-1, 0, 1] null true false -22 22 323232323.32323235 \"json string\"")) // blob diff --git a/ddl/db_test.go b/ddl/db_test.go index 80a821a40e387..00cf3fc42b6fc 100644 --- a/ddl/db_test.go +++ b/ddl/db_test.go @@ -4102,9 +4102,11 @@ func (s *testSerialDBSuite) TestModifyColumnBetweenStringTypes(c *C) { tk.MustGetErrMsg("alter table tt change a a varchar(4);", "[types:1406]Data Too Long, field len 4, data len 5") tk.MustExec("alter table tt change a a varchar(100);") - tk.MustExec("drop table if exists tt;") - tk.MustExec("create table tt (a char(10));") - tk.MustExec("insert into tt values ('111'),('10000');") + // varchar to char + tk.MustExec("alter table tt change a a char(10);") + c2 = getModifyColumn(c, s.s.(sessionctx.Context), "test", "tt", "a", false) + c.Assert(c2.FieldType.Tp, Equals, mysql.TypeString) + c.Assert(c2.FieldType.Flen, Equals, 10) tk.MustQuery("select * from tt").Check(testkit.Rows("111", "10000")) tk.MustGetErrMsg("alter table tt change a a char(4);", "[types:1406]Data Too Long, field len 4, data len 5") diff --git a/ddl/ddl_api.go b/ddl/ddl_api.go index 9c3eca13c7fee..e5871311d78a9 100644 --- a/ddl/ddl_api.go +++ b/ddl/ddl_api.go @@ -3537,7 +3537,7 @@ func CheckModifyTypeCompatible(origin *types.FieldType, to *types.FieldType) (ca return true, notCompatibleMsg, errUnsupportedModifyColumn.GenWithStackByArgs(notCompatibleMsg) } -func needReorgToChange(origin *types.FieldType, to *types.FieldType) (needOreg bool, reasonMsg string) { +func needReorgToChange(origin *types.FieldType, to *types.FieldType) (needReorg bool, reasonMsg string) { toFlen := to.Flen originFlen := origin.Flen if mysql.IsIntegerType(to.Tp) && mysql.IsIntegerType(origin.Tp) { @@ -3547,6 +3547,10 @@ func needReorgToChange(origin *types.FieldType, to *types.FieldType) (needOreg b toFlen, _ = mysql.GetDefaultFieldLengthAndDecimal(to.Tp) } + if convertBetweenCharAndVarchar(origin.Tp, to.Tp) { + return true, "conversion between char and varchar string needs reorganization" + } + if toFlen > 0 && toFlen < originFlen { return true, fmt.Sprintf("length %d is less than origin %d", toFlen, originFlen) } @@ -3621,10 +3625,6 @@ func checkModifyTypes(ctx sessionctx.Context, origin *types.FieldType, to *types return errUnsupportedModifyColumn.GenWithStackByArgs(msg) } } - if types.IsTypeVarchar(origin.Tp) != types.IsTypeVarchar(to.Tp) { - unsupportedMsg := "column type conversion between 'varchar' and 'non-varchar' is currently unsupported yet" - return errUnsupportedModifyColumn.GenWithStackByArgs(unsupportedMsg) - } err = checkModifyCharsetAndCollation(to.Charset, to.Collate, origin.Charset, origin.Collate, needRewriteCollationData) // column type change can handle the charset change between these two types in the process of the reorg. diff --git a/executor/ddl_test.go b/executor/ddl_test.go index 7d50b66e5a65c..67c8b8978caa8 100644 --- a/executor/ddl_test.go +++ b/executor/ddl_test.go @@ -546,12 +546,12 @@ func (s *testSuite6) TestAlterTableModifyColumn(c *C) { _, err = tk.Exec("alter table mc modify column c2 varchar(8)") c.Assert(err, NotNil) tk.MustExec("alter table mc modify column c2 varchar(11)") - tk.MustGetErrCode("alter table mc modify column c2 text(13)", errno.ErrUnsupportedDDLOperation) - tk.MustGetErrCode("alter table mc modify column c2 text", errno.ErrUnsupportedDDLOperation) + tk.MustExec("alter table mc modify column c2 text(13)") + tk.MustExec("alter table mc modify column c2 text") tk.MustExec("alter table mc modify column c3 bit") result := tk.MustQuery("show create table mc") createSQL := result.Rows()[0][1] - expected := "CREATE TABLE `mc` (\n `c1` bigint(20) DEFAULT NULL,\n `c2` varchar(11) DEFAULT NULL,\n `c3` bit(1) DEFAULT NULL\n) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin" + expected := "CREATE TABLE `mc` (\n `c1` bigint(20) DEFAULT NULL,\n `c2` text DEFAULT NULL,\n `c3` bit(1) DEFAULT NULL\n) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin" c.Assert(createSQL, Equals, expected) tk.MustExec("create or replace view alter_view as select c1,c2 from mc") _, err = tk.Exec("alter table alter_view modify column c2 text") From c8c0dd0bb94d13f526547dae1448b6ee4f877fb0 Mon Sep 17 00:00:00 2001 From: Song Gao Date: Wed, 2 Jun 2021 11:29:38 +0800 Subject: [PATCH 2/6] executor: supports as of timestamp compatibility (#25019) --- executor/stale_txn_test.go | 52 +++++++++++++++++++++++++++++++++++ planner/core/preprocess.go | 4 +++ sessionctx/variable/sysvar.go | 5 ++++ 3 files changed, 61 insertions(+) diff --git a/executor/stale_txn_test.go b/executor/stale_txn_test.go index 9fd0a9f83d6f3..44def0981bed0 100644 --- a/executor/stale_txn_test.go +++ b/executor/stale_txn_test.go @@ -451,6 +451,7 @@ func (s *testStaleTxnSerialSuite) TestSetTransactionReadOnlyAsOf(c *C) { c.Assert(tk.Se.GetSessionVars().TxnReadTS.PeakTxnReadTS(), Equals, uint64(0)) tk.MustExec("begin") c.Assert(tk.Se.GetSessionVars().TxnCtx.StartTS, Not(Equals), testcase.expectedTS) + tk.MustExec("commit") failpoint.Disable("github.com/pingcap/tidb/expression/injectSafeTS") } @@ -682,6 +683,57 @@ func (s *testStaleTxnSuite) TestSpecialSQLInStalenessTxn(c *C) { } } +func (s *testStaleTxnSuite) TestAsOfTimestampCompatibility(c *C) { + tk := testkit.NewTestKit(c, s.store) + // For mocktikv, safe point is not initialized, we manually insert it for snapshot to use. + safePointName := "tikv_gc_safe_point" + safePointValue := "20160102-15:04:05 -0700" + safePointComment := "All versions after safe point can be accessed. (DO NOT EDIT)" + updateSafePoint := fmt.Sprintf(`INSERT INTO mysql.tidb VALUES ('%[1]s', '%[2]s', '%[3]s') + ON DUPLICATE KEY + UPDATE variable_value = '%[2]s', comment = '%[3]s'`, safePointName, safePointValue, safePointComment) + tk.MustExec(updateSafePoint) + tk.MustExec("use test") + tk.MustExec("create table t5(id int);") + time1 := time.Now() + testcases := []struct { + beginSQL string + sql string + }{ + { + beginSQL: fmt.Sprintf("START TRANSACTION READ ONLY AS OF TIMESTAMP '%s'", time1.Format("2006-1-2 15:04:05.000")), + sql: fmt.Sprintf(`SET TRANSACTION READ ONLY AS OF TIMESTAMP '%s'`, time1.Format("2006-1-2 15:04:05.000")), + }, + { + beginSQL: "begin", + sql: fmt.Sprintf(`SET TRANSACTION READ ONLY AS OF TIMESTAMP '%s'`, time1.Format("2006-1-2 15:04:05.000")), + }, + { + beginSQL: "start transaction", + sql: fmt.Sprintf(`SET TRANSACTION READ ONLY AS OF TIMESTAMP '%s'`, time1.Format("2006-1-2 15:04:05.000")), + }, + { + beginSQL: fmt.Sprintf("START TRANSACTION READ ONLY AS OF TIMESTAMP '%s'", time1.Format("2006-1-2 15:04:05.000")), + sql: fmt.Sprintf("select * from t5 as of timestamp '%s'", time1.Format("2006-1-2 15:04:05.000")), + }, + { + beginSQL: "begin", + sql: fmt.Sprintf("select * from t5 as of timestamp '%s'", time1.Format("2006-1-2 15:04:05.000")), + }, + { + beginSQL: "start transaction", + sql: fmt.Sprintf("select * from t5 as of timestamp '%s'", time1.Format("2006-1-2 15:04:05.000")), + }, + } + for _, testcase := range testcases { + tk.MustExec(testcase.beginSQL) + err := tk.ExecToErr(testcase.sql) + c.Assert(err, NotNil) + c.Assert(err.Error(), Matches, ".*as of timestamp can't be set in transaction.*") + tk.MustExec("commit") + } +} + func (s *testStaleTxnSuite) TestSetTransactionInfoSchema(c *C) { tk := testkit.NewTestKit(c, s.store) // For mocktikv, safe point is not initialized, we manually insert it for snapshot to use. diff --git a/planner/core/preprocess.go b/planner/core/preprocess.go index 635a53d72da72..947b1fdcef1ec 100644 --- a/planner/core/preprocess.go +++ b/planner/core/preprocess.go @@ -1389,6 +1389,10 @@ func (p *preprocessor) handleAsOf(node *ast.AsOfClause) { dom := domain.GetDomain(p.ctx) ts := uint64(0) if node != nil { + if p.ctx.GetSessionVars().InTxn() { + p.err = ErrAsOf.FastGenWithCause("as of timestamp can't be set in transaction.") + return + } ts, p.err = calculateTsExpr(p.ctx, node) if p.err != nil { return diff --git a/sessionctx/variable/sysvar.go b/sessionctx/variable/sysvar.go index bec4db52c1757..d49e4b1b19bda 100644 --- a/sessionctx/variable/sysvar.go +++ b/sessionctx/variable/sysvar.go @@ -829,6 +829,11 @@ var defaultSysVars = []*SysVar{ }}, {Scope: ScopeSession, Name: TiDBTxnReadTS, Value: "", Hidden: true, SetSession: func(s *SessionVars, val string) error { return setTxnReadTS(s, val) + }, Validation: func(vars *SessionVars, normalizedValue string, originalValue string, scope ScopeFlag) (string, error) { + if vars.InTxn() { + return "", errors.New("as of timestamp can't be set in transaction") + } + return normalizedValue, nil }}, {Scope: ScopeGlobal | ScopeSession, Name: TiDBAllowMPPExecution, Value: On, Type: TypeEnum, PossibleValues: []string{"OFF", "ON", "ENFORCE"}, SetSession: func(s *SessionVars, val string) error { s.allowMPPExecution = val From f24dee95fe195ddee2f95956e9ec6d5841778bba Mon Sep 17 00:00:00 2001 From: Arenatlx <314806019@qq.com> Date: Wed, 2 Jun 2021 11:43:38 +0800 Subject: [PATCH 3/6] ddl: fix column type change won't cast the default value to new one (#25025) --- ddl/column_type_change_test.go | 30 ++++++++++++++++++++++++++++++ util/rowDecoder/decoder.go | 2 ++ 2 files changed, 32 insertions(+) diff --git a/ddl/column_type_change_test.go b/ddl/column_type_change_test.go index 32418dd031c29..7febe2209f62c 100644 --- a/ddl/column_type_change_test.go +++ b/ddl/column_type_change_test.go @@ -1883,3 +1883,33 @@ func (s *testColumnTypeChangeSuite) TestChangeIntToBitWillPanicInBackfillIndexes ") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin")) tk.MustQuery("select * from t").Check(testkit.Rows("\x13 1 1.00", "\x11 2 2.00")) } + +// Close issue #24971, #24973, #24971 +func (s *testColumnTypeChangeSuite) TestCTCShouldCastTheDefaultValue(c *C) { + tk := testkit.NewTestKit(c, s.store) + tk.MustExec("use test") + // Enable column change variable. + tk.Se.GetSessionVars().EnableChangeColumnType = true + + tk.MustExec("drop table if exists t") + tk.MustExec("create table t(a int)") + tk.MustExec("insert into t values(1)") + tk.MustExec("alter table t add column b bit(51) default 1512687856625472") // virtual fill the column data + tk.MustGetErrCode("alter table t modify column b decimal(30,18)", mysql.ErrDataOutOfRange) // because 1512687856625472 is out of range. + + tk.MustExec("drop table if exists t") + tk.MustExec("create table tbl_1 (col int)") + tk.MustExec("insert into tbl_1 values (9790)") + tk.MustExec("alter table tbl_1 add column col1 blob(6) collate binary not null") + tk.MustQuery("select col1 from tbl_1").Check(testkit.Rows("")) + tk.MustGetErrCode("alter table tbl_1 change column col1 col2 int", mysql.ErrTruncatedWrongValue) + tk.MustQuery("select col1 from tbl_1").Check(testkit.Rows("")) + + tk.MustExec("drop table if exists t") + tk.MustExec("create table tbl(col_214 decimal(30,8))") + tk.MustExec("replace into tbl values (89687.448)") + tk.MustExec("alter table tbl add column col_279 binary(197) collate binary default 'RAWTdm' not null") + tk.MustQuery("select col_279 from tbl").Check(testkit.Rows("RAWTdm\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00")) + tk.MustGetErrCode("alter table tbl change column col_279 col_287 int", mysql.ErrTruncatedWrongValue) + tk.MustQuery("select col_279 from tbl").Check(testkit.Rows("RAWTdm\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00")) +} diff --git a/util/rowDecoder/decoder.go b/util/rowDecoder/decoder.go index 3fd63c05fae96..35413904fe3d8 100644 --- a/util/rowDecoder/decoder.go +++ b/util/rowDecoder/decoder.go @@ -164,6 +164,8 @@ func (rd *RowDecoder) DecodeTheExistedColumnMap(ctx sessionctx.Context, handle k if err != nil { return nil, err } + // Fill the default value into map. + row[colInfo.ID] = val rd.mutRow.SetValue(colInfo.Offset, val.GetValue()) } // return the existed column map here. From 7811bf9a37f42f91fc2e8673e4f12bd64ce56139 Mon Sep 17 00:00:00 2001 From: crazycs Date: Wed, 2 Jun 2021 12:51:39 +0800 Subject: [PATCH 4/6] *: use pprof profile to collect CPU time group by SQL and plan digest (#24892) --- executor/adapter.go | 40 ++- executor/executor.go | 17 +- executor/prepared.go | 9 +- planner/optimize.go | 8 +- server/conn.go | 20 +- server/conn_stmt.go | 39 ++- server/http_status.go | 11 +- server/sql_info_fetcher.go | 8 +- server/tidb_test.go | 232 ++++++++++++++++ session/session.go | 14 +- statistics/handle/handle.go | 5 + tidb-server/main.go | 2 + util/misc.go | 9 + util/topsql/collector/collector.go | 24 ++ util/topsql/topsql.go | 72 +++++ util/topsql/tracecpu/mock/mock.go | 173 ++++++++++++ util/topsql/tracecpu/profile.go | 424 +++++++++++++++++++++++++++++ util/topsql/tracecpu_test.go | 143 ++++++++++ 18 files changed, 1204 insertions(+), 46 deletions(-) create mode 100644 util/topsql/collector/collector.go create mode 100644 util/topsql/topsql.go create mode 100644 util/topsql/tracecpu/mock/mock.go create mode 100644 util/topsql/tracecpu/profile.go create mode 100644 util/topsql/tracecpu_test.go diff --git a/executor/adapter.go b/executor/adapter.go index df6e62751f90a..f1d6a8b8e6040 100644 --- a/executor/adapter.go +++ b/executor/adapter.go @@ -28,6 +28,7 @@ import ( "github.com/pingcap/errors" "github.com/pingcap/failpoint" "github.com/pingcap/log" + "github.com/pingcap/parser" "github.com/pingcap/parser/ast" "github.com/pingcap/parser/model" "github.com/pingcap/parser/mysql" @@ -56,7 +57,7 @@ import ( "github.com/pingcap/tidb/util/sqlexec" "github.com/pingcap/tidb/util/stmtsummary" "github.com/pingcap/tidb/util/stringutil" - + "github.com/pingcap/tidb/util/topsql" "go.uber.org/zap" "go.uber.org/zap/zapcore" ) @@ -213,6 +214,7 @@ func (a *ExecStmt) PointGet(ctx context.Context, is infoschema.InfoSchema) (*rec defer span1.Finish() ctx = opentracing.ContextWithSpan(ctx, span1) } + ctx = a.setPlanLabelForTopSQL(ctx) startTs := uint64(math.MaxUint64) err := a.Ctx.InitTxnWithStartTS(startTs) if err != nil { @@ -288,6 +290,15 @@ func (a *ExecStmt) RebuildPlan(ctx context.Context) (int64, error) { return a.InfoSchema.SchemaMetaVersion(), nil } +func (a *ExecStmt) setPlanLabelForTopSQL(ctx context.Context) context.Context { + if a.Plan == nil || !variable.TopSQLEnabled() { + return ctx + } + normalizedSQL, sqlDigest := a.Ctx.GetSessionVars().StmtCtx.SQLDigest() + normalizedPlan, planDigest := getPlanDigest(a.Ctx, a.Plan) + return topsql.AttachSQLInfo(ctx, normalizedSQL, sqlDigest, normalizedPlan, planDigest) +} + // Exec builds an Executor from a plan. If the Executor doesn't return result, // like the INSERT, UPDATE statements, it executes in this function, if the Executor returns // result, execution is done after this function returns, in the returned sqlexec.RecordSet Next method. @@ -357,8 +368,8 @@ func (a *ExecStmt) Exec(ctx context.Context) (_ sqlexec.RecordSet, err error) { if err != nil { return nil, err } - - getPlanDigest(a.Ctx, a.Plan) + // ExecuteExec will rewrite `a.Plan`, so set plan label should be executed after `a.buildExecutor`. + ctx = a.setPlanLabelForTopSQL(ctx) if err = e.Open(ctx); err != nil { terror.Call(e.Close) @@ -951,7 +962,7 @@ func (a *ExecStmt) LogSlowQuery(txnTS uint64, succ bool, hasMoreResults bool) { statsInfos := plannercore.GetStatsInfo(a.Plan) memMax := sessVars.StmtCtx.MemTracker.MaxConsumed() diskMax := sessVars.StmtCtx.DiskTracker.MaxConsumed() - planDigest := getPlanDigest(a.Ctx, a.Plan) + _, planDigest := getPlanDigest(a.Ctx, a.Plan) slowItems := &variable.SlowQueryLogItems{ TxnTS: txnTS, SQL: sql.String(), @@ -969,7 +980,7 @@ func (a *ExecStmt) LogSlowQuery(txnTS uint64, succ bool, hasMoreResults bool) { DiskMax: diskMax, Succ: succ, Plan: getPlanTree(a.Ctx, a.Plan), - PlanDigest: planDigest, + PlanDigest: planDigest.String(), Prepared: a.isPreparedStmt, HasMoreResults: hasMoreResults, PlanFromCache: sessVars.FoundInPlanCache, @@ -1043,15 +1054,15 @@ func getPlanTree(sctx sessionctx.Context, p plannercore.Plan) string { } // getPlanDigest will try to get the select plan tree if the plan is select or the select plan of delete/update/insert statement. -func getPlanDigest(sctx sessionctx.Context, p plannercore.Plan) string { +func getPlanDigest(sctx sessionctx.Context, p plannercore.Plan) (string, *parser.Digest) { sc := sctx.GetSessionVars().StmtCtx - _, planDigest := sc.GetPlanDigest() - if planDigest != nil { - return planDigest.String() + normalized, planDigest := sc.GetPlanDigest() + if len(normalized) > 0 && planDigest != nil { + return normalized, planDigest } - normalized, planDigest := plannercore.NormalizePlan(p) + normalized, planDigest = plannercore.NormalizePlan(p) sc.SetPlanDigest(normalized, planDigest) - return planDigest.String() + return normalized, planDigest } // getEncodedPlan gets the encoded plan, and generates the hint string if indicated. @@ -1125,11 +1136,12 @@ func (a *ExecStmt) SummaryStmt(succ bool) { var planDigestGen func() string if a.Plan.TP() == plancodec.TypePointGet { planDigestGen = func() string { - planDigest := getPlanDigest(a.Ctx, a.Plan) - return planDigest + _, planDigest := getPlanDigest(a.Ctx, a.Plan) + return planDigest.String() } } else { - planDigest = getPlanDigest(a.Ctx, a.Plan) + _, tmp := getPlanDigest(a.Ctx, a.Plan) + planDigest = tmp.String() } execDetail := stmtCtx.GetExecDetails() diff --git a/executor/executor.go b/executor/executor.go index ecb9102ba2f39..c444c0920069e 100644 --- a/executor/executor.go +++ b/executor/executor.go @@ -18,6 +18,7 @@ import ( "fmt" "math" "runtime" + "runtime/pprof" "runtime/trace" "strconv" "strings" @@ -66,6 +67,7 @@ import ( "github.com/pingcap/tidb/util/logutil" "github.com/pingcap/tidb/util/memory" "github.com/pingcap/tidb/util/resourcegrouptag" + "github.com/pingcap/tidb/util/topsql" "go.uber.org/zap" ) @@ -1653,9 +1655,20 @@ func ResetContextOfStmt(ctx sessionctx.Context, s ast.StmtNode) (err error) { sc.MemTracker.SetActionOnExceed(action) } if execStmt, ok := s.(*ast.ExecuteStmt); ok { - s, err = planner.GetPreparedStmt(execStmt, vars) + prepareStmt, err := planner.GetPreparedStmt(execStmt, vars) if err != nil { - return + return err + } + s = prepareStmt.PreparedAst.Stmt + sc.InitSQLDigest(prepareStmt.NormalizedSQL, prepareStmt.SQLDigest) + // For `execute stmt` SQL, should reset the SQL digest with the prepare SQL digest. + goCtx := context.Background() + if variable.EnablePProfSQLCPU.Load() && len(prepareStmt.NormalizedSQL) > 0 { + goCtx = pprof.WithLabels(goCtx, pprof.Labels("sql", util.QueryStrForLog(prepareStmt.NormalizedSQL))) + pprof.SetGoroutineLabels(goCtx) + } + if variable.TopSQLEnabled() && prepareStmt.SQLDigest != nil { + goCtx = topsql.AttachSQLInfo(goCtx, prepareStmt.NormalizedSQL, prepareStmt.SQLDigest, "", nil) } } // execute missed stmtID uses empty sql diff --git a/executor/prepared.go b/executor/prepared.go index f494ebac3dc9d..2a11977ef607e 100644 --- a/executor/prepared.go +++ b/executor/prepared.go @@ -29,12 +29,14 @@ import ( "github.com/pingcap/tidb/planner" plannercore "github.com/pingcap/tidb/planner/core" "github.com/pingcap/tidb/sessionctx" + "github.com/pingcap/tidb/sessionctx/variable" "github.com/pingcap/tidb/types" driver "github.com/pingcap/tidb/types/parser_driver" "github.com/pingcap/tidb/util" "github.com/pingcap/tidb/util/chunk" "github.com/pingcap/tidb/util/hint" "github.com/pingcap/tidb/util/sqlexec" + "github.com/pingcap/tidb/util/topsql" "go.uber.org/zap" ) @@ -178,6 +180,10 @@ func (e *PrepareExec) Next(ctx context.Context, req *chunk.Chunk) error { Params: sorter.markers, SchemaVersion: ret.InfoSchema.SchemaMetaVersion(), } + normalizedSQL, digest := parser.NormalizeDigest(prepared.Stmt.Text()) + if variable.TopSQLEnabled() { + ctx = topsql.AttachSQLInfo(ctx, normalizedSQL, digest, "", nil) + } if !plannercore.PreparedPlanCacheEnabled() { prepared.UseCache = false @@ -213,11 +219,10 @@ func (e *PrepareExec) Next(ctx context.Context, req *chunk.Chunk) error { vars.PreparedStmtNameToID[e.name] = e.ID } - normalized, digest := parser.NormalizeDigest(prepared.Stmt.Text()) preparedObj := &plannercore.CachedPrepareStmt{ PreparedAst: prepared, VisitInfos: destBuilder.GetVisitInfo(), - NormalizedSQL: normalized, + NormalizedSQL: normalizedSQL, SQLDigest: digest, ForUpdateRead: destBuilder.GetIsForUpdateRead(), } diff --git a/planner/optimize.go b/planner/optimize.go index c667d6b124ef9..6d87d6ffac5ed 100644 --- a/planner/optimize.go +++ b/planner/optimize.go @@ -44,7 +44,7 @@ import ( ) // GetPreparedStmt extract the prepared statement from the execute statement. -func GetPreparedStmt(stmt *ast.ExecuteStmt, vars *variable.SessionVars) (ast.StmtNode, error) { +func GetPreparedStmt(stmt *ast.ExecuteStmt, vars *variable.SessionVars) (*plannercore.CachedPrepareStmt, error) { var ok bool execID := stmt.ExecID if stmt.Name != "" { @@ -57,7 +57,7 @@ func GetPreparedStmt(stmt *ast.ExecuteStmt, vars *variable.SessionVars) (ast.Stm if !ok { return nil, errors.Errorf("invalid CachedPrepareStmt type") } - return preparedObj.PreparedAst.Stmt, nil + return preparedObj, nil } return nil, plannercore.ErrStmtNotFound } @@ -65,12 +65,12 @@ func GetPreparedStmt(stmt *ast.ExecuteStmt, vars *variable.SessionVars) (ast.Stm // IsReadOnly check whether the ast.Node is a read only statement. func IsReadOnly(node ast.Node, vars *variable.SessionVars) bool { if execStmt, isExecStmt := node.(*ast.ExecuteStmt); isExecStmt { - s, err := GetPreparedStmt(execStmt, vars) + prepareStmt, err := GetPreparedStmt(execStmt, vars) if err != nil { logutil.BgLogger().Warn("GetPreparedStmt failed", zap.Error(err)) return false } - return ast.IsReadOnly(s) + return ast.IsReadOnly(prepareStmt.PreparedAst.Stmt) } return ast.IsReadOnly(node) } diff --git a/server/conn.go b/server/conn.go index 46d0a7c023a27..bba2a41a6a8c1 100644 --- a/server/conn.go +++ b/server/conn.go @@ -78,6 +78,7 @@ import ( storeerr "github.com/pingcap/tidb/store/driver/error" "github.com/pingcap/tidb/store/tikv/util" "github.com/pingcap/tidb/tablecodec" + tidbutil "github.com/pingcap/tidb/util" "github.com/pingcap/tidb/util/arena" "github.com/pingcap/tidb/util/chunk" "github.com/pingcap/tidb/util/execdetails" @@ -906,14 +907,6 @@ func (cc *clientConn) ShutdownOrNotify() bool { return false } -func queryStrForLog(query string) string { - const size = 4096 - if len(query) > size { - return query[:size] + fmt.Sprintf("(len: %d)", len(query)) - } - return query -} - func errStrForLog(err error, enableRedactLog bool) string { if enableRedactLog { // currently, only ErrParse is considered when enableRedactLog because it may contain sensitive information like @@ -1025,6 +1018,9 @@ func (cc *clientConn) dispatch(ctx context.Context, data []byte) error { cc.lastPacket = data cmd := data[0] data = data[1:] + if variable.TopSQLEnabled() { + defer pprof.SetGoroutineLabels(ctx) + } if variable.EnablePProfSQLCPU.Load() { label := getLastStmtInConn{cc}.PProfLabel() if len(label) > 0 { @@ -2125,10 +2121,10 @@ func (cc getLastStmtInConn) String() string { if cc.ctx.GetSessionVars().EnableRedactLog { sql = parser.Normalize(sql) } - return queryStrForLog(sql) + return tidbutil.QueryStrForLog(sql) case mysql.ComStmtExecute, mysql.ComStmtFetch: stmtID := binary.LittleEndian.Uint32(data[0:4]) - return queryStrForLog(cc.preparedStmt2String(stmtID)) + return tidbutil.QueryStrForLog(cc.preparedStmt2String(stmtID)) case mysql.ComStmtClose, mysql.ComStmtReset: stmtID := binary.LittleEndian.Uint32(data[0:4]) return mysql.Command2Str[cmd] + " " + strconv.Itoa(int(stmtID)) @@ -2156,10 +2152,10 @@ func (cc getLastStmtInConn) PProfLabel() string { case mysql.ComStmtReset: return "ResetStmt" case mysql.ComQuery, mysql.ComStmtPrepare: - return parser.Normalize(queryStrForLog(string(hack.String(data)))) + return parser.Normalize(tidbutil.QueryStrForLog(string(hack.String(data)))) case mysql.ComStmtExecute, mysql.ComStmtFetch: stmtID := binary.LittleEndian.Uint32(data[0:4]) - return queryStrForLog(cc.preparedStmt2StringNoArgs(stmtID)) + return tidbutil.QueryStrForLog(cc.preparedStmt2StringNoArgs(stmtID)) default: return "" } diff --git a/server/conn_stmt.go b/server/conn_stmt.go index e9f56306d9800..df85f7ce45f52 100644 --- a/server/conn_stmt.go +++ b/server/conn_stmt.go @@ -50,11 +50,13 @@ import ( "github.com/pingcap/tidb/metrics" plannercore "github.com/pingcap/tidb/planner/core" "github.com/pingcap/tidb/sessionctx/stmtctx" + "github.com/pingcap/tidb/sessionctx/variable" storeerr "github.com/pingcap/tidb/store/driver/error" "github.com/pingcap/tidb/store/tikv/util" "github.com/pingcap/tidb/types" "github.com/pingcap/tidb/util/execdetails" "github.com/pingcap/tidb/util/hack" + "github.com/pingcap/tidb/util/topsql" ) func (cc *clientConn) handleStmtPrepare(ctx context.Context, sql string) error { @@ -128,6 +130,13 @@ func (cc *clientConn) handleStmtExecute(ctx context.Context, data []byte) (err e stmtID := binary.LittleEndian.Uint32(data[0:4]) pos += 4 + if variable.TopSQLEnabled() { + preparedStmt, _ := cc.preparedStmtID2CachePreparedStmt(stmtID) + if preparedStmt != nil && preparedStmt.SQLDigest != nil { + ctx = topsql.AttachSQLInfo(ctx, preparedStmt.NormalizedSQL, preparedStmt.SQLDigest, "", nil) + } + } + stmt := cc.ctx.GetStatement(int(stmtID)) if stmt == nil { return mysql.NewErr(mysql.ErrUnknownStmtHandler, @@ -265,6 +274,12 @@ func (cc *clientConn) handleStmtFetch(ctx context.Context, data []byte) (err err return errors.Annotate(mysql.NewErr(mysql.ErrUnknownStmtHandler, strconv.FormatUint(uint64(stmtID), 10), "stmt_fetch"), cc.preparedStmt2String(stmtID)) } + if variable.TopSQLEnabled() { + prepareObj, _ := cc.preparedStmtID2CachePreparedStmt(stmtID) + if prepareObj != nil && prepareObj.SQLDigest != nil { + ctx = topsql.AttachSQLInfo(ctx, prepareObj.NormalizedSQL, prepareObj.SQLDigest, "", nil) + } + } sql := "" if prepared, ok := cc.ctx.GetStatement(int(stmtID)).(*TiDBStatement); ok { sql = prepared.sql @@ -680,14 +695,30 @@ func (cc *clientConn) preparedStmt2StringNoArgs(stmtID uint32) string { if sv == nil { return "" } + preparedObj, invalid := cc.preparedStmtID2CachePreparedStmt(stmtID) + if invalid { + return "invalidate CachedPrepareStmt type, ID: " + strconv.FormatUint(uint64(stmtID), 10) + } + if preparedObj == nil { + return "prepared statement not found, ID: " + strconv.FormatUint(uint64(stmtID), 10) + } + return preparedObj.PreparedAst.Stmt.Text() +} + +func (cc *clientConn) preparedStmtID2CachePreparedStmt(stmtID uint32) (_ *plannercore.CachedPrepareStmt, invalid bool) { + sv := cc.ctx.GetSessionVars() + if sv == nil { + return nil, false + } preparedPointer, ok := sv.PreparedStmts[stmtID] if !ok { - return "prepared statement not found, ID: " + strconv.FormatUint(uint64(stmtID), 10) + // not found + return nil, false } preparedObj, ok := preparedPointer.(*plannercore.CachedPrepareStmt) if !ok { - return "invalidate CachedPrepareStmt type, ID: " + strconv.FormatUint(uint64(stmtID), 10) + // invalid cache. should never happen. + return nil, true } - preparedAst := preparedObj.PreparedAst - return preparedAst.Stmt.Text() + return preparedObj, false } diff --git a/server/http_status.go b/server/http_status.go index b0fc7615caf49..67eace56562fe 100644 --- a/server/http_status.go +++ b/server/http_status.go @@ -43,6 +43,7 @@ import ( "github.com/pingcap/tidb/util" "github.com/pingcap/tidb/util/logutil" "github.com/pingcap/tidb/util/printer" + "github.com/pingcap/tidb/util/topsql/tracecpu" "github.com/pingcap/tidb/util/versioninfo" "github.com/prometheus/client_golang/prometheus/promhttp" "github.com/soheilhy/cmux" @@ -184,7 +185,7 @@ func (s *Server) startHTTPServer() { serverMux.HandleFunc("/debug/pprof/", pprof.Index) serverMux.HandleFunc("/debug/pprof/cmdline", pprof.Cmdline) - serverMux.HandleFunc("/debug/pprof/profile", pprof.Profile) + serverMux.HandleFunc("/debug/pprof/profile", tracecpu.ProfileHTTPHandler) serverMux.HandleFunc("/debug/pprof/symbol", pprof.Symbol) serverMux.HandleFunc("/debug/pprof/trace", pprof.Trace) serverMux.HandleFunc("/debug/gogc", func(w http.ResponseWriter, r *http.Request) { @@ -251,7 +252,7 @@ func (s *Server) startHTTPServer() { serveError(w, http.StatusInternalServerError, fmt.Sprintf("Create zipped %s fail: %v", "profile", err)) return } - if err := rpprof.StartCPUProfile(fw); err != nil { + if err := tracecpu.StartCPUProfile(fw); err != nil { serveError(w, http.StatusInternalServerError, fmt.Sprintf("Could not enable CPU profiling: %s", err)) return @@ -261,7 +262,11 @@ func (s *Server) startHTTPServer() { sec = 10 } sleepWithCtx(r.Context(), time.Duration(sec)*time.Second) - rpprof.StopCPUProfile() + err = tracecpu.StopCPUProfile() + if err != nil { + serveError(w, http.StatusInternalServerError, fmt.Sprintf("Create zipped %s fail: %v", "config", err)) + return + } // dump config fw, err = zw.Create("config") diff --git a/server/sql_info_fetcher.go b/server/sql_info_fetcher.go index 6fc80daf506d6..57f51f544b90b 100644 --- a/server/sql_info_fetcher.go +++ b/server/sql_info_fetcher.go @@ -20,7 +20,6 @@ import ( "encoding/json" "fmt" "net/http" - "runtime/pprof" "strconv" "strings" "time" @@ -35,6 +34,7 @@ import ( "github.com/pingcap/tidb/session" "github.com/pingcap/tidb/statistics/handle" "github.com/pingcap/tidb/util/sqlexec" + "github.com/pingcap/tidb/util/topsql/tracecpu" ) type sqlInfoFetcher struct { @@ -275,13 +275,13 @@ func (sh *sqlInfoFetcher) getExplainAnalyze(ctx context.Context, sql string, res } func (sh *sqlInfoFetcher) catchCPUProfile(ctx context.Context, sec int, buf *bytes.Buffer, errChan chan<- error) { - if err := pprof.StartCPUProfile(buf); err != nil { + if err := tracecpu.StartCPUProfile(buf); err != nil { errChan <- err return } sleepWithCtx(ctx, time.Duration(sec)*time.Second) - pprof.StopCPUProfile() - errChan <- nil + err := tracecpu.StopCPUProfile() + errChan <- err } func (sh *sqlInfoFetcher) getStatsForTable(pair tableNamePair) (*handle.JSONTable, error) { diff --git a/server/tidb_test.go b/server/tidb_test.go index 426e45cd0c57b..09025e8bec95b 100644 --- a/server/tidb_test.go +++ b/server/tidb_test.go @@ -15,23 +15,29 @@ package server import ( + "bytes" "context" "crypto/rand" "crypto/rsa" "crypto/tls" "crypto/x509" "crypto/x509/pkix" + "database/sql" "encoding/pem" + "fmt" "math/big" "net/http" "os" "path/filepath" + "regexp" + "strings" "sync/atomic" "time" "github.com/go-sql-driver/mysql" . "github.com/pingcap/check" "github.com/pingcap/errors" + "github.com/pingcap/parser" tmysql "github.com/pingcap/parser/mysql" "github.com/pingcap/tidb/config" "github.com/pingcap/tidb/domain" @@ -43,7 +49,10 @@ import ( "github.com/pingcap/tidb/util" "github.com/pingcap/tidb/util/collate" "github.com/pingcap/tidb/util/logutil" + "github.com/pingcap/tidb/util/plancodec" "github.com/pingcap/tidb/util/testkit" + "github.com/pingcap/tidb/util/topsql/tracecpu" + "github.com/pingcap/tidb/util/topsql/tracecpu/mock" ) type tidbTestSuite struct { @@ -54,6 +63,10 @@ type tidbTestSerialSuite struct { *tidbTestSuiteBase } +type tidbTestTopSQLSuite struct { + *tidbTestSuiteBase +} + type tidbTestSuiteBase struct { *testServerClient tidbdrv *TiDBDriver @@ -70,12 +83,18 @@ func newTiDBTestSuiteBase() *tidbTestSuiteBase { var _ = Suite(&tidbTestSuite{newTiDBTestSuiteBase()}) var _ = SerialSuites(&tidbTestSerialSuite{newTiDBTestSuiteBase()}) +var _ = SerialSuites(&tidbTestTopSQLSuite{newTiDBTestSuiteBase()}) func (ts *tidbTestSuite) SetUpSuite(c *C) { metrics.RegisterMetrics() ts.tidbTestSuiteBase.SetUpSuite(c) } +func (ts *tidbTestTopSQLSuite) SetUpSuite(c *C) { + ts.tidbTestSuiteBase.SetUpSuite(c) + tracecpu.GlobalSQLCPUProfiler.Run() +} + func (ts *tidbTestSuiteBase) SetUpSuite(c *C) { var err error ts.store, err = mockstore.NewMockStore() @@ -1153,3 +1172,216 @@ func (ts *tidbTestSerialSuite) TestPrepareCount(c *C) { c.Assert(err, IsNil) c.Assert(atomic.LoadInt64(&variable.PreparedStmtCount), Equals, prepareCnt) } + +func (ts *tidbTestTopSQLSuite) TestTopSQLCPUProfile(c *C) { + db, err := sql.Open("mysql", ts.getDSN()) + c.Assert(err, IsNil, Commentf("Error connecting")) + defer func() { + err := db.Close() + c.Assert(err, IsNil) + }() + collector := mock.NewTopSQLCollector() + tracecpu.GlobalSQLCPUProfiler.SetCollector(collector) + + dbt := &DBTest{c, db} + dbt.mustExec("drop database if exists topsql") + dbt.mustExec("create database topsql") + dbt.mustExec("use topsql;") + dbt.mustExec("create table t (a int auto_increment, b int, unique index idx(a));") + dbt.mustExec("create table t1 (a int auto_increment, b int, unique index idx(a));") + dbt.mustExec("create table t2 (a int auto_increment, b int, unique index idx(a));") + dbt.mustExec("set @@global.tidb_enable_top_sql='On';") + dbt.mustExec("set @@global.tidb_top_sql_agent_address='127.0.0.1:4001';") + dbt.mustExec("set @@global.tidb_top_sql_precision_seconds=1;") + + // Test case 1: DML query: insert/update/replace/delete/select + cases1 := []struct { + sql string + planRegexp string + cancel func() + }{ + {sql: "insert into t () values (),(),(),(),(),(),();", planRegexp: ""}, + {sql: "insert into t (b) values (1),(1),(1),(1),(1),(1),(1),(1);", planRegexp: ""}, + {sql: "replace into t (b) values (1),(1),(1),(1),(1),(1),(1),(1);", planRegexp: ""}, + {sql: "update t set b=a where b is null limit 1;", planRegexp: ".*Limit.*TableReader.*"}, + {sql: "delete from t where b is null limit 2;", planRegexp: ".*Limit.*TableReader.*"}, + {sql: "select * from t use index(idx) where a>0;", planRegexp: ".*IndexLookUp.*"}, + {sql: "select * from t ignore index(idx) where a>0;", planRegexp: ".*TableReader.*"}, + {sql: "select /*+ HASH_JOIN(t1, t2) */ * from t t1 join t t2 on t1.a=t2.a where t1.b is not null;", planRegexp: ".*HashJoin.*"}, + {sql: "select /*+ INL_HASH_JOIN(t1, t2) */ * from t t1 join t t2 on t1.a=t2.a where t1.b is not null;", planRegexp: ".*IndexHashJoin.*"}, + {sql: "select * from t where a=1;", planRegexp: ".*Point_Get.*"}, + {sql: "select * from t where a in (1,2,3,4)", planRegexp: ".*Batch_Point_Get.*"}, + } + for i, ca := range cases1 { + ctx, cancel := context.WithCancel(context.Background()) + cases1[i].cancel = cancel + sqlStr := ca.sql + go ts.loopExec(ctx, c, func(db *sql.DB) { + dbt := &DBTest{c, db} + if strings.HasPrefix(sqlStr, "select") { + rows := dbt.mustQuery(sqlStr) + for rows.Next() { + } + } else { + // Ignore error here since the error may be write conflict. + db.Exec(sqlStr) + } + }) + } + + // Test case 2: prepare/execute sql + cases2 := []struct { + prepare string + args []interface{} + planRegexp string + cancel func() + }{ + {prepare: "insert into t1 (b) values (?);", args: []interface{}{1}, planRegexp: ""}, + {prepare: "replace into t1 (b) values (?);", args: []interface{}{1}, planRegexp: ""}, + {prepare: "update t1 set b=a where b is null limit ?;", args: []interface{}{1}, planRegexp: ".*Limit.*TableReader.*"}, + {prepare: "delete from t1 where b is null limit ?;", args: []interface{}{1}, planRegexp: ".*Limit.*TableReader.*"}, + {prepare: "select * from t1 use index(idx) where a>?;", args: []interface{}{1}, planRegexp: ".*IndexLookUp.*"}, + {prepare: "select * from t1 ignore index(idx) where a>?;", args: []interface{}{1}, planRegexp: ".*TableReader.*"}, + {prepare: "select /*+ HASH_JOIN(t1, t2) */ * from t1 t1 join t1 t2 on t1.a=t2.a where t1.b is not null;", args: nil, planRegexp: ".*HashJoin.*"}, + {prepare: "select /*+ INL_HASH_JOIN(t1, t2) */ * from t1 t1 join t1 t2 on t1.a=t2.a where t1.b is not null;", args: nil, planRegexp: ".*IndexHashJoin.*"}, + {prepare: "select * from t1 where a=?;", args: []interface{}{1}, planRegexp: ".*Point_Get.*"}, + {prepare: "select * from t1 where a in (?,?,?,?)", args: []interface{}{1, 2, 3, 4}, planRegexp: ".*Batch_Point_Get.*"}, + } + for i, ca := range cases2 { + ctx, cancel := context.WithCancel(context.Background()) + cases2[i].cancel = cancel + prepare, args := ca.prepare, ca.args + go ts.loopExec(ctx, c, func(db *sql.DB) { + stmt, err := db.Prepare(prepare) + c.Assert(err, IsNil) + if strings.HasPrefix(prepare, "select") { + rows, err := stmt.Query(args...) + c.Assert(err, IsNil) + for rows.Next() { + } + } else { + // Ignore error here since the error may be write conflict. + stmt.Exec(args...) + } + }) + } + + // Test case 3: prepare, execute stmt using @val... + cases3 := []struct { + prepare string + args []interface{} + planRegexp string + cancel func() + }{ + {prepare: "insert into t2 (b) values (?);", args: []interface{}{1}, planRegexp: ""}, + {prepare: "replace into t2 (b) values (?);", args: []interface{}{1}, planRegexp: ""}, + {prepare: "update t2 set b=a where b is null limit ?;", args: []interface{}{1}, planRegexp: ".*Limit.*TableReader.*"}, + {prepare: "delete from t2 where b is null limit ?;", args: []interface{}{1}, planRegexp: ".*Limit.*TableReader.*"}, + {prepare: "select * from t2 use index(idx) where a>?;", args: []interface{}{1}, planRegexp: ".*IndexLookUp.*"}, + {prepare: "select * from t2 ignore index(idx) where a>?;", args: []interface{}{1}, planRegexp: ".*TableReader.*"}, + {prepare: "select /*+ HASH_JOIN(t1, t2) */ * from t2 t1 join t2 t2 on t1.a=t2.a where t1.b is not null;", args: nil, planRegexp: ".*HashJoin.*"}, + {prepare: "select /*+ INL_HASH_JOIN(t1, t2) */ * from t2 t1 join t2 t2 on t1.a=t2.a where t1.b is not null;", args: nil, planRegexp: ".*IndexHashJoin.*"}, + {prepare: "select * from t2 where a=?;", args: []interface{}{1}, planRegexp: ".*Point_Get.*"}, + {prepare: "select * from t2 where a in (?,?,?,?)", args: []interface{}{1, 2, 3, 4}, planRegexp: ".*Batch_Point_Get.*"}, + } + for i, ca := range cases3 { + ctx, cancel := context.WithCancel(context.Background()) + cases3[i].cancel = cancel + prepare, args := ca.prepare, ca.args + go ts.loopExec(ctx, c, func(db *sql.DB) { + _, err := db.Exec(fmt.Sprintf("prepare stmt from '%v'", prepare)) + c.Assert(err, IsNil) + sqlBuf := bytes.NewBuffer(nil) + sqlBuf.WriteString("execute stmt ") + for i := range args { + _, err = db.Exec(fmt.Sprintf("set @%c=%v", 'a'+i, args[i])) + c.Assert(err, IsNil) + if i == 0 { + sqlBuf.WriteString("using ") + } else { + sqlBuf.WriteByte(',') + } + sqlBuf.WriteByte('@') + sqlBuf.WriteByte('a' + byte(i)) + } + if strings.HasPrefix(prepare, "select") { + rows, err := db.Query(sqlBuf.String()) + c.Assert(err, IsNil, Commentf("%v", sqlBuf.String())) + for rows.Next() { + } + } else { + // Ignore error here since the error may be write conflict. + db.Exec(sqlBuf.String()) + } + }) + } + + // Wait the top sql collector to collect profile data. + collector.WaitCollectCnt(1) + + checkFn := func(sql, planRegexp string) { + commentf := Commentf("sql: %v", sql) + stats := collector.GetSQLStatsBySQLWithRetry(sql, len(planRegexp) > 0) + // since 1 sql may has many plan, check `len(stats) > 0` instead of `len(stats) == 1`. + c.Assert(len(stats) > 0, IsTrue, commentf) + + match := false + for _, s := range stats { + sqlStr := collector.GetSQL(s.SQLDigest) + encodedPlan := collector.GetPlan(s.PlanDigest) + // Normalize the user SQL before check. + normalizedSQL := parser.Normalize(sql) + c.Assert(sqlStr, Equals, normalizedSQL, commentf) + // decode plan before check. + normalizedPlan, err := plancodec.DecodeNormalizedPlan(encodedPlan) + c.Assert(err, IsNil) + // remove '\n' '\t' before do regexp match. + normalizedPlan = strings.Replace(normalizedPlan, "\n", " ", -1) + normalizedPlan = strings.Replace(normalizedPlan, "\t", " ", -1) + ok, err := regexp.MatchString(planRegexp, normalizedPlan) + c.Assert(err, IsNil, commentf) + if ok { + match = true + break + } + } + c.Assert(match, IsTrue, commentf) + } + + // Check result of test case 1. + for _, ca := range cases1 { + checkFn(ca.sql, ca.planRegexp) + ca.cancel() + } + + // Check result of test case 2. + for _, ca := range cases2 { + checkFn(ca.prepare, ca.planRegexp) + ca.cancel() + } + + // Check result of test case 3. + for _, ca := range cases3 { + checkFn(ca.prepare, ca.planRegexp) + ca.cancel() + } +} + +func (ts *tidbTestTopSQLSuite) loopExec(ctx context.Context, c *C, fn func(db *sql.DB)) { + db, err := sql.Open("mysql", ts.getDSN()) + c.Assert(err, IsNil, Commentf("Error connecting")) + defer func() { + err := db.Close() + c.Assert(err, IsNil) + }() + dbt := &DBTest{c, db} + dbt.mustExec("use topsql;") + for { + select { + case <-ctx.Done(): + return + default: + } + fn(db) + } +} diff --git a/session/session.go b/session/session.go index b24461ba653b6..c4b9c77587790 100644 --- a/session/session.go +++ b/session/session.go @@ -41,6 +41,7 @@ import ( "github.com/pingcap/parser/model" "github.com/pingcap/parser/mysql" "github.com/pingcap/parser/terror" + "github.com/pingcap/tidb/util/topsql" "github.com/pingcap/tipb/go-binlog" "go.uber.org/zap" @@ -1386,6 +1387,13 @@ func (s *session) ParseWithParams(ctx context.Context, sql string, args ...inter for _, warn := range warns { s.sessionVars.StmtCtx.AppendWarning(util.SyntaxWarn(warn)) } + if variable.TopSQLEnabled() { + normalized, digest := parser.NormalizeDigest(sql) + if digest != nil { + // Fixme: reset/clean the label when internal sql execute finish. + ctx = topsql.AttachSQLInfo(ctx, normalized, digest, "", nil) + } + } return stmts[0], nil } @@ -1496,6 +1504,11 @@ func (s *session) ExecuteStmt(ctx context.Context, stmtNode ast.StmtNode) (sqlex if err := executor.ResetContextOfStmt(s, stmtNode); err != nil { return nil, err } + normalizedSQL, digest := s.sessionVars.StmtCtx.SQLDigest() + if variable.TopSQLEnabled() { + ctx = topsql.AttachSQLInfo(ctx, normalizedSQL, digest, "", nil) + } + if err := s.validateStatementReadOnlyInStaleness(stmtNode); err != nil { return nil, err } @@ -1503,7 +1516,6 @@ func (s *session) ExecuteStmt(ctx context.Context, stmtNode ast.StmtNode) (sqlex // Uncorrelated subqueries will execute once when building plan, so we reset process info before building plan. cmd32 := atomic.LoadUint32(&s.GetSessionVars().CommandValue) s.SetProcessInfo(stmtNode.Text(), time.Now(), byte(cmd32), 0) - _, digest := s.sessionVars.StmtCtx.SQLDigest() s.txn.onStmtStart(digest.String()) defer s.txn.onStmtEnd() diff --git a/statistics/handle/handle.go b/statistics/handle/handle.go index 0026c97bb0c7c..8d4ca1bc678bb 100644 --- a/statistics/handle/handle.go +++ b/statistics/handle/handle.go @@ -17,6 +17,7 @@ import ( "context" "encoding/json" "fmt" + "runtime/pprof" "sort" "strconv" "sync" @@ -121,6 +122,10 @@ func (h *Handle) withRestrictedSQLExecutor(ctx context.Context, fn func(context. func (h *Handle) execRestrictedSQL(ctx context.Context, sql string, params ...interface{}) ([]chunk.Row, []*ast.ResultField, error) { return h.withRestrictedSQLExecutor(ctx, func(ctx context.Context, exec sqlexec.RestrictedSQLExecutor) ([]chunk.Row, []*ast.ResultField, error) { + if variable.TopSQLEnabled() { + // Restore the goroutine label by using the original ctx after execution is finished. + defer pprof.SetGoroutineLabels(ctx) + } stmt, err := exec.ParseWithParams(ctx, sql, params...) if err != nil { return nil, nil, errors.Trace(err) diff --git a/tidb-server/main.go b/tidb-server/main.go index 3aa21808f97c3..f1b16397046f9 100644 --- a/tidb-server/main.go +++ b/tidb-server/main.go @@ -63,6 +63,7 @@ import ( "github.com/pingcap/tidb/util/sys/linux" storageSys "github.com/pingcap/tidb/util/sys/storage" "github.com/pingcap/tidb/util/systimemon" + "github.com/pingcap/tidb/util/topsql" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/push" pd "github.com/tikv/pd/client" @@ -176,6 +177,7 @@ func main() { printInfo() setupBinlogClient() setupMetrics() + topsql.SetupTopSQL() storage, dom := createStoreAndDomain() svr := createServer(storage, dom) diff --git a/util/misc.go b/util/misc.go index 817c5715af818..ccba554bb07d0 100644 --- a/util/misc.go +++ b/util/misc.go @@ -535,3 +535,12 @@ func GetLocalIP() string { } return "" } + +// QueryStrForLog trim the query if the query length more than 4096 +func QueryStrForLog(query string) string { + const size = 4096 + if len(query) > size { + return query[:size] + fmt.Sprintf("(len: %d)", len(query)) + } + return query +} diff --git a/util/topsql/collector/collector.go b/util/topsql/collector/collector.go new file mode 100644 index 0000000000000..9cc2a429ea990 --- /dev/null +++ b/util/topsql/collector/collector.go @@ -0,0 +1,24 @@ +// Copyright 2021 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package collector + +import "github.com/pingcap/tidb/util/topsql/tracecpu" + +// TopSQLCollector uses to collect SQL stats. +// TODO: add a collector to collect and store the SQL stats. +type TopSQLCollector interface { + tracecpu.Collector + RegisterSQL(sqlDigest []byte, normalizedSQL string) + RegisterPlan(planDigest []byte, normalizedPlan string) +} diff --git a/util/topsql/topsql.go b/util/topsql/topsql.go new file mode 100644 index 0000000000000..255ece7256c09 --- /dev/null +++ b/util/topsql/topsql.go @@ -0,0 +1,72 @@ +// Copyright 2021 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package topsql + +import ( + "context" + "runtime/pprof" + + "github.com/pingcap/parser" + "github.com/pingcap/tidb/util/topsql/collector" + "github.com/pingcap/tidb/util/topsql/tracecpu" +) + +// SetupTopSQL sets up the top-sql worker. +func SetupTopSQL() { + tracecpu.GlobalSQLCPUProfiler.Run() +} + +// AttachSQLInfo attach the sql information info top sql. +func AttachSQLInfo(ctx context.Context, normalizedSQL string, sqlDigest *parser.Digest, normalizedPlan string, planDigest *parser.Digest) context.Context { + if len(normalizedSQL) == 0 || sqlDigest == nil || len(sqlDigest.Bytes()) == 0 { + return ctx + } + var sqlDigestBytes, planDigestBytes []byte + sqlDigestBytes = sqlDigest.Bytes() + if planDigest != nil { + planDigestBytes = planDigest.Bytes() + } + ctx = tracecpu.CtxWithDigest(ctx, sqlDigestBytes, planDigestBytes) + pprof.SetGoroutineLabels(ctx) + + if len(normalizedPlan) == 0 || len(planDigestBytes) == 0 { + // If plan digest is '', indicate it is the first time to attach the SQL info, since it only know the sql digest. + linkSQLTextWithDigest(sqlDigestBytes, normalizedSQL) + } else { + linkPlanTextWithDigest(planDigestBytes, normalizedPlan) + } + return ctx +} + +func linkSQLTextWithDigest(sqlDigest []byte, normalizedSQL string) { + c := tracecpu.GlobalSQLCPUProfiler.GetCollector() + if c == nil { + return + } + topc, ok := c.(collector.TopSQLCollector) + if ok { + topc.RegisterSQL(sqlDigest, normalizedSQL) + } +} + +func linkPlanTextWithDigest(planDigest []byte, normalizedPlan string) { + c := tracecpu.GlobalSQLCPUProfiler.GetCollector() + if c == nil { + return + } + topc, ok := c.(collector.TopSQLCollector) + if ok { + topc.RegisterPlan(planDigest, normalizedPlan) + } +} diff --git a/util/topsql/tracecpu/mock/mock.go b/util/topsql/tracecpu/mock/mock.go new file mode 100644 index 0000000000000..2737306f1a096 --- /dev/null +++ b/util/topsql/tracecpu/mock/mock.go @@ -0,0 +1,173 @@ +// Copyright 2021 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package mock + +import ( + "bytes" + "sync" + "time" + + "github.com/pingcap/parser" + "github.com/pingcap/tidb/util/hack" + "github.com/pingcap/tidb/util/topsql/tracecpu" + "github.com/uber-go/atomic" +) + +// TopSQLCollector uses for testing. +type TopSQLCollector struct { + sync.Mutex + // sql_digest -> normalized SQL + sqlMap map[string]string + // plan_digest -> normalized plan + planMap map[string]string + // (sql + plan_digest) -> sql stats + sqlStatsMap map[string]*tracecpu.SQLCPUResult + collectCnt atomic.Int64 +} + +// NewTopSQLCollector uses for testing. +func NewTopSQLCollector() *TopSQLCollector { + return &TopSQLCollector{ + sqlMap: make(map[string]string), + planMap: make(map[string]string), + sqlStatsMap: make(map[string]*tracecpu.SQLCPUResult), + } +} + +// Collect uses for testing. +func (c *TopSQLCollector) Collect(ts int64, stats []tracecpu.SQLCPUResult) { + defer c.collectCnt.Inc() + if len(stats) == 0 { + return + } + c.Lock() + defer c.Unlock() + for _, stmt := range stats { + hash := c.hash(stmt) + stats, ok := c.sqlStatsMap[hash] + if !ok { + stats = &tracecpu.SQLCPUResult{ + SQLDigest: stmt.SQLDigest, + PlanDigest: stmt.PlanDigest, + } + c.sqlStatsMap[hash] = stats + } + stats.CPUTimeMs += stmt.CPUTimeMs + } +} + +// GetSQLStatsBySQLWithRetry uses for testing. +func (c *TopSQLCollector) GetSQLStatsBySQLWithRetry(sql string, planIsNotNull bool) []*tracecpu.SQLCPUResult { + after := time.After(time.Second * 10) + for { + select { + case <-after: + return nil + default: + } + stats := c.GetSQLStatsBySQL(sql, planIsNotNull) + if len(stats) > 0 { + return stats + } + c.WaitCollectCnt(1) + } +} + +// GetSQLStatsBySQL uses for testing. +func (c *TopSQLCollector) GetSQLStatsBySQL(sql string, planIsNotNull bool) []*tracecpu.SQLCPUResult { + stats := make([]*tracecpu.SQLCPUResult, 0, 2) + sqlDigest := GenSQLDigest(sql) + c.Lock() + for _, stmt := range c.sqlStatsMap { + if bytes.Equal(stmt.SQLDigest, sqlDigest.Bytes()) { + if planIsNotNull { + plan := c.planMap[string(stmt.PlanDigest)] + if len(plan) > 0 { + stats = append(stats, stmt) + } + } else { + stats = append(stats, stmt) + } + } + } + c.Unlock() + return stats +} + +// GetSQL uses for testing. +func (c *TopSQLCollector) GetSQL(sqlDigest []byte) string { + c.Lock() + sql := c.sqlMap[string(sqlDigest)] + c.Unlock() + return sql +} + +// GetPlan uses for testing. +func (c *TopSQLCollector) GetPlan(planDigest []byte) string { + c.Lock() + plan := c.planMap[string(planDigest)] + c.Unlock() + return plan +} + +// RegisterSQL uses for testing. +func (c *TopSQLCollector) RegisterSQL(sqlDigest []byte, normalizedSQL string) { + digestStr := string(hack.String(sqlDigest)) + c.Lock() + _, ok := c.sqlMap[digestStr] + if !ok { + c.sqlMap[digestStr] = normalizedSQL + } + c.Unlock() + +} + +// RegisterPlan uses for testing. +func (c *TopSQLCollector) RegisterPlan(planDigest []byte, normalizedPlan string) { + digestStr := string(hack.String(planDigest)) + c.Lock() + _, ok := c.planMap[digestStr] + if !ok { + c.planMap[digestStr] = normalizedPlan + } + c.Unlock() +} + +// WaitCollectCnt uses for testing. +func (c *TopSQLCollector) WaitCollectCnt(count int64) { + timeout := time.After(time.Second * 10) + end := c.collectCnt.Load() + count + for { + // Wait for collector collect sql stats count >= expected count + if c.collectCnt.Load() >= end { + break + } + select { + case <-timeout: + break + default: + time.Sleep(time.Millisecond * 10) + } + } +} + +func (c *TopSQLCollector) hash(stat tracecpu.SQLCPUResult) string { + return string(stat.SQLDigest) + string(stat.PlanDigest) +} + +// GenSQLDigest uses for testing. +func GenSQLDigest(sql string) *parser.Digest { + _, digest := parser.NormalizeDigest(sql) + return digest +} diff --git a/util/topsql/tracecpu/profile.go b/util/topsql/tracecpu/profile.go new file mode 100644 index 0000000000000..c0fdbe5fee52f --- /dev/null +++ b/util/topsql/tracecpu/profile.go @@ -0,0 +1,424 @@ +// Copyright 2021 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package tracecpu + +import ( + "bytes" + "context" + "errors" + "fmt" + "io" + "net/http" + "runtime/pprof" + "strconv" + "sync" + "sync/atomic" + "time" + + "github.com/google/pprof/profile" + "github.com/pingcap/tidb/sessionctx/variable" + "github.com/pingcap/tidb/util" + "github.com/pingcap/tidb/util/hack" + "github.com/pingcap/tidb/util/logutil" + "go.uber.org/zap" +) + +const ( + labelSQL = "sql" + labelSQLDigest = "sql_digest" + labelPlanDigest = "plan_digest" +) + +// GlobalSQLCPUProfiler is the global SQL stats profiler. +var GlobalSQLCPUProfiler = newSQLCPUProfiler() + +// Collector uses to collect SQL execution cpu time. +type Collector interface { + // Collect uses to collect the SQL execution cpu time. + // ts is a Unix time, unit is second. + Collect(ts int64, stats []SQLCPUResult) +} + +// SQLCPUResult contains the SQL meta and cpu time. +type SQLCPUResult struct { + SQLDigest []byte + PlanDigest []byte + CPUTimeMs uint32 +} + +type sqlCPUProfiler struct { + taskCh chan *profileData + + mu struct { + sync.Mutex + ept *exportProfileTask + } + collector atomic.Value +} + +var ( + defaultProfileBufSize = 100 * 1024 + profileBufPool = sync.Pool{ + New: func() interface{} { + return bytes.NewBuffer(make([]byte, 0, defaultProfileBufSize)) + }, + } +) + +// newSQLCPUProfiler create a sqlCPUProfiler. +func newSQLCPUProfiler() *sqlCPUProfiler { + return &sqlCPUProfiler{ + taskCh: make(chan *profileData, 128), + } +} + +func (sp *sqlCPUProfiler) Run() { + logutil.BgLogger().Info("cpu profiler started") + go sp.startCPUProfileWorker() + go sp.startAnalyzeProfileWorker() +} + +func (sp *sqlCPUProfiler) SetCollector(c Collector) { + sp.collector.Store(c) +} + +func (sp *sqlCPUProfiler) GetCollector() Collector { + c, ok := sp.collector.Load().(Collector) + if !ok || c == nil { + return nil + } + return c +} + +func (sp *sqlCPUProfiler) startCPUProfileWorker() { + defer util.Recover("top-sql", "profileWorker", nil, false) + for { + if sp.IsEnabled() { + sp.doCPUProfile() + } else { + time.Sleep(time.Second) + } + } +} + +func (sp *sqlCPUProfiler) doCPUProfile() { + intervalSecond := variable.TopSQLVariable.PrecisionSeconds.Load() + task := sp.newProfileTask() + if err := pprof.StartCPUProfile(task.buf); err != nil { + // Sleep a while before retry. + time.Sleep(time.Second) + sp.putTaskToBuffer(task) + return + } + ns := int64(time.Second)*intervalSecond - int64(time.Now().Nanosecond()) + time.Sleep(time.Nanosecond * time.Duration(ns)) + pprof.StopCPUProfile() + task.end = time.Now().Unix() + sp.taskCh <- task +} + +func (sp *sqlCPUProfiler) startAnalyzeProfileWorker() { + defer util.Recover("top-sql", "analyzeProfileWorker", nil, false) + for { + task := <-sp.taskCh + p, err := profile.ParseData(task.buf.Bytes()) + if err != nil { + logutil.BgLogger().Error("parse profile error", zap.Error(err)) + sp.putTaskToBuffer(task) + continue + } + stats := sp.parseCPUProfileBySQLLabels(p) + sp.handleExportProfileTask(p) + if c := sp.GetCollector(); c != nil { + c.Collect(task.end, stats) + } + sp.putTaskToBuffer(task) + } +} + +type profileData struct { + buf *bytes.Buffer + end int64 +} + +func (sp *sqlCPUProfiler) newProfileTask() *profileData { + buf := profileBufPool.Get().(*bytes.Buffer) + return &profileData{ + buf: buf, + } +} + +func (sp *sqlCPUProfiler) putTaskToBuffer(task *profileData) { + task.buf.Reset() + profileBufPool.Put(task.buf) +} + +// parseCPUProfileBySQLLabels uses to aggregate the cpu-profile sample data by sql_digest and plan_digest labels, +// output the SQLCPUResult slice. Want to know more information about profile labels, see https://rakyll.org/profiler-labels/ +// The sql_digest label is been set by `SetSQLLabels` function after parse the SQL. +// The plan_digest label is been set by `SetSQLAndPlanLabels` function after build the SQL plan. +// Since `sqlCPUProfiler` only care about the cpu time that consume by (sql_digest,plan_digest), the other sample data +// without those label will be ignore. +func (sp *sqlCPUProfiler) parseCPUProfileBySQLLabels(p *profile.Profile) []SQLCPUResult { + sqlMap := make(map[string]*sqlStats) + idx := len(p.SampleType) - 1 + for _, s := range p.Sample { + digests, ok := s.Label[labelSQLDigest] + if !ok || len(digests) == 0 { + continue + } + for _, digest := range digests { + stmt, ok := sqlMap[digest] + if !ok { + stmt = &sqlStats{ + plans: make(map[string]int64), + total: 0, + } + sqlMap[digest] = stmt + } + stmt.total += s.Value[idx] + + plans := s.Label[labelPlanDigest] + for _, plan := range plans { + stmt.plans[plan] += s.Value[idx] + } + } + } + return sp.createSQLStats(sqlMap) +} + +func (sp *sqlCPUProfiler) createSQLStats(sqlMap map[string]*sqlStats) []SQLCPUResult { + stats := make([]SQLCPUResult, 0, len(sqlMap)) + for sqlDigest, stmt := range sqlMap { + stmt.tune() + for planDigest, val := range stmt.plans { + stats = append(stats, SQLCPUResult{ + SQLDigest: []byte(sqlDigest), + PlanDigest: []byte(planDigest), + CPUTimeMs: uint32(time.Duration(val).Milliseconds()), + }) + } + } + return stats +} + +type sqlStats struct { + plans map[string]int64 + total int64 +} + +// tune use to adjust sql stats. Consider following situation: +// The `sqlStats` maybe: +// plans: { +// "table_scan": 200ms, // The cpu time of the sql that plan with `table_scan` is 200ms. +// "index_scan": 300ms, // The cpu time of the sql that plan with `table_scan` is 300ms. +// }, +// total: 600ms, // The total cpu time of the sql is 600ms. +// total_time - table_scan_time - index_scan_time = 100ms, and this 100ms means those sample data only contain the +// sql_digest label, doesn't contain the plan_digest label. This is cause by the `pprof profile` is base on sample, +// and the plan digest can only be set after optimizer generated execution plan. So the remain 100ms means the plan +// optimizer takes time to generated plan. +// After this tune function, the `sqlStats` become to: +// plans: { +// "" : 100ms, // 600 - 200 - 300 = 100ms, indicate the optimizer generated plan time cost. +// "table_scan": 200ms, +// "index_scan": 300ms, +// }, +// total: 600ms, +func (s *sqlStats) tune() { + if len(s.plans) == 0 { + s.plans[""] = s.total + return + } + planTotal := int64(0) + for _, v := range s.plans { + planTotal += v + } + optimize := s.total - planTotal + if optimize <= 0 { + return + } + s.plans[""] += optimize +} + +func (sp *sqlCPUProfiler) handleExportProfileTask(p *profile.Profile) { + sp.mu.Lock() + defer sp.mu.Unlock() + if sp.mu.ept == nil { + return + } + sp.mu.ept.mergeProfile(p) +} + +func (sp *sqlCPUProfiler) hasExportProfileTask() bool { + sp.mu.Lock() + has := sp.mu.ept != nil + sp.mu.Unlock() + return has +} + +// IsEnabled return true if it is(should be) enabled. It exports for tests. +func (sp *sqlCPUProfiler) IsEnabled() bool { + return variable.TopSQLEnabled() || sp.hasExportProfileTask() +} + +// StartCPUProfile same like pprof.StartCPUProfile. +// Because the GlobalSQLCPUProfiler keep calling pprof.StartCPUProfile to fetch SQL cpu stats, other place (such pprof profile HTTP API handler) call pprof.StartCPUProfile will be failed, +// other place should call tracecpu.StartCPUProfile instead of pprof.StartCPUProfile. +func StartCPUProfile(w io.Writer) error { + if GlobalSQLCPUProfiler.IsEnabled() { + return GlobalSQLCPUProfiler.startExportCPUProfile(w) + } + return pprof.StartCPUProfile(w) +} + +// StopCPUProfile same like pprof.StopCPUProfile. +// other place should call tracecpu.StopCPUProfile instead of pprof.StopCPUProfile. +func StopCPUProfile() error { + if GlobalSQLCPUProfiler.IsEnabled() { + return GlobalSQLCPUProfiler.stopExportCPUProfile() + } + pprof.StopCPUProfile() + return nil +} + +// CtxWithDigest wrap the ctx with sql digest, if plan digest is not null, wrap with plan digest too. +func CtxWithDigest(ctx context.Context, sqlDigest, planDigest []byte) context.Context { + if len(planDigest) == 0 { + return pprof.WithLabels(ctx, pprof.Labels(labelSQLDigest, string(hack.String(sqlDigest)))) + } + return pprof.WithLabels(ctx, pprof.Labels(labelSQLDigest, string(hack.String(sqlDigest)), + labelPlanDigest, string(hack.String(planDigest)))) +} + +func (sp *sqlCPUProfiler) startExportCPUProfile(w io.Writer) error { + sp.mu.Lock() + defer sp.mu.Unlock() + if sp.mu.ept != nil { + return errors.New("cpu profiling already in use") + } + sp.mu.ept = &exportProfileTask{w: w} + return nil +} + +func (sp *sqlCPUProfiler) stopExportCPUProfile() error { + sp.mu.Lock() + ept := sp.mu.ept + sp.mu.ept = nil + sp.mu.Unlock() + if ept.err != nil { + return ept.err + } + if w := ept.w; w != nil && ept.cpuProfile != nil { + sp.removeLabel(ept.cpuProfile) + return ept.cpuProfile.Write(w) + } + return nil +} + +// removeLabel uses to remove labels for export cpu profile data. +// Since the sql_digest and plan_digest label is strange for other users. +// If `variable.EnablePProfSQLCPU` is true means wanto keep the `sql` label, otherwise, remove the `sql` label too. +func (sp *sqlCPUProfiler) removeLabel(p *profile.Profile) { + if p == nil { + return + } + keepLabelSQL := variable.EnablePProfSQLCPU.Load() + for _, s := range p.Sample { + for k := range s.Label { + switch k { + case labelSQL: + if !keepLabelSQL { + delete(s.Label, k) + } + case labelSQLDigest, labelPlanDigest: + delete(s.Label, k) + } + } + } +} + +type exportProfileTask struct { + cpuProfile *profile.Profile + err error + w io.Writer +} + +func (t *exportProfileTask) mergeProfile(p *profile.Profile) { + if t.err != nil || p == nil { + return + } + ps := make([]*profile.Profile, 0, 2) + if t.cpuProfile != nil { + ps = append(ps, t.cpuProfile) + } + ps = append(ps, p) + t.cpuProfile, t.err = profile.Merge(ps) +} + +// ProfileHTTPHandler is same as pprof.Profile. +// The difference is ProfileHTTPHandler uses tracecpu.StartCPUProfile/StopCPUProfile to fetch profile data. +func ProfileHTTPHandler(w http.ResponseWriter, r *http.Request) { + w.Header().Set("X-Content-Type-Options", "nosniff") + sec, err := strconv.ParseInt(r.FormValue("seconds"), 10, 64) + if sec <= 0 || err != nil { + sec = 30 + } + + if durationExceedsWriteTimeout(r, float64(sec)) { + serveError(w, http.StatusBadRequest, "profile duration exceeds server's WriteTimeout") + return + } + + // Set Content Type assuming StartCPUProfile will work, + // because if it does it starts writing. + w.Header().Set("Content-Type", "application/octet-stream") + w.Header().Set("Content-Disposition", `attachment; filename="profile"`) + + err = StartCPUProfile(w) + if err != nil { + serveError(w, http.StatusInternalServerError, "Could not enable CPU profiling: "+err.Error()) + return + } + // TODO: fix me. + // This can be fixed by always starts a 1 second profiling one by one, + // but to aggregate (merge) multiple profiles into one according to the precision. + // |<-- 1s -->| + // -|----------|----------|----------|----------|----------|-----------|-----> Background profile task timeline. + // |________________________________| + // (start cpu profile) v v (stop cpu profile) // expected profile timeline + // |________________________________| // actual profile timeline + time.Sleep(time.Second * time.Duration(sec)) + err = StopCPUProfile() + if err != nil { + serveError(w, http.StatusInternalServerError, "Could not enable CPU profiling: "+err.Error()) + return + } +} + +func durationExceedsWriteTimeout(r *http.Request, seconds float64) bool { + srv, ok := r.Context().Value(http.ServerContextKey).(*http.Server) + return ok && srv.WriteTimeout != 0 && seconds >= srv.WriteTimeout.Seconds() +} + +func serveError(w http.ResponseWriter, status int, txt string) { + w.Header().Set("Content-Type", "text/plain; charset=utf-8") + w.Header().Set("X-Go-Pprof", "1") + w.Header().Del("Content-Disposition") + w.WriteHeader(status) + _, err := fmt.Fprintln(w, txt) + if err != nil { + logutil.BgLogger().Info("write http response error", zap.Error(err)) + } +} diff --git a/util/topsql/tracecpu_test.go b/util/topsql/tracecpu_test.go new file mode 100644 index 0000000000000..6091457a9db07 --- /dev/null +++ b/util/topsql/tracecpu_test.go @@ -0,0 +1,143 @@ +// Copyright 2021 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package topsql_test + +import ( + "bytes" + "context" + "testing" + "time" + + "github.com/google/pprof/profile" + . "github.com/pingcap/check" + "github.com/pingcap/parser" + "github.com/pingcap/tidb/sessionctx/variable" + "github.com/pingcap/tidb/util/topsql" + "github.com/pingcap/tidb/util/topsql/tracecpu" + "github.com/pingcap/tidb/util/topsql/tracecpu/mock" +) + +func TestT(t *testing.T) { + CustomVerboseFlag = true + TestingT(t) +} + +var _ = SerialSuites(&testSuite{}) + +type testSuite struct{} + +func (s *testSuite) SetUpSuite(c *C) { + variable.TopSQLVariable.Enable.Store(true) + variable.TopSQLVariable.AgentAddress.Store("mock") + variable.TopSQLVariable.PrecisionSeconds.Store(1) + tracecpu.GlobalSQLCPUProfiler.Run() +} + +func (s *testSuite) TestTopSQLCPUProfile(c *C) { + collector := mock.NewTopSQLCollector() + tracecpu.GlobalSQLCPUProfiler.SetCollector(collector) + reqs := []struct { + sql string + plan string + }{ + {"select * from t where a=?", "point-get"}, + {"select * from t where a>?", "table-scan"}, + {"insert into t values (?)", ""}, + } + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + for _, req := range reqs { + go func(sql, plan string) { + for { + select { + case <-ctx.Done(): + return + default: + s.mockExecuteSQL(sql, plan) + } + } + }(req.sql, req.plan) + } + + // test for StartCPUProfile. + buf := bytes.NewBuffer(nil) + err := tracecpu.StartCPUProfile(buf) + c.Assert(err, IsNil) + collector.WaitCollectCnt(2) + err = tracecpu.StopCPUProfile() + c.Assert(err, IsNil) + _, err = profile.Parse(buf) + c.Assert(err, IsNil) + + for _, req := range reqs { + stats := collector.GetSQLStatsBySQLWithRetry(req.sql, len(req.plan) > 0) + c.Assert(len(stats), Equals, 1) + sql := collector.GetSQL(stats[0].SQLDigest) + plan := collector.GetPlan(stats[0].PlanDigest) + c.Assert(sql, Equals, req.sql) + c.Assert(plan, Equals, req.plan) + } +} + +func (s *testSuite) TestIsEnabled(c *C) { + s.setTopSQLEnable(false) + c.Assert(tracecpu.GlobalSQLCPUProfiler.IsEnabled(), IsFalse) + + s.setTopSQLEnable(true) + err := tracecpu.StartCPUProfile(bytes.NewBuffer(nil)) + c.Assert(err, IsNil) + c.Assert(tracecpu.GlobalSQLCPUProfiler.IsEnabled(), IsTrue) + s.setTopSQLEnable(false) + c.Assert(tracecpu.GlobalSQLCPUProfiler.IsEnabled(), IsTrue) + err = tracecpu.StopCPUProfile() + c.Assert(err, IsNil) + + s.setTopSQLEnable(false) + c.Assert(tracecpu.GlobalSQLCPUProfiler.IsEnabled(), IsFalse) + s.setTopSQLEnable(true) + c.Assert(tracecpu.GlobalSQLCPUProfiler.IsEnabled(), IsTrue) +} + +func (s *testSuite) setTopSQLEnable(enabled bool) { + variable.TopSQLVariable.Enable.Store(enabled) +} + +func (s *testSuite) mockExecuteSQL(sql, plan string) { + ctx := context.Background() + sqlDigest := mock.GenSQLDigest(sql) + topsql.AttachSQLInfo(ctx, sql, sqlDigest, "", nil) + s.mockExecute(time.Millisecond * 100) + planDigest := genDigest(plan) + topsql.AttachSQLInfo(ctx, sql, sqlDigest, plan, planDigest) + s.mockExecute(time.Millisecond * 300) +} + +func genDigest(str string) *parser.Digest { + if str == "" { + return parser.NewDigest(nil) + } + return parser.DigestNormalized(str) +} + +func (s *testSuite) mockExecute(d time.Duration) { + start := time.Now() + for { + for i := 0; i < 10e5; i++ { + } + if time.Since(start) > d { + return + } + } +} From 6c44ec29d88c9102799ab76f706097f2a70e75b5 Mon Sep 17 00:00:00 2001 From: xiongjiwei Date: Wed, 2 Jun 2021 13:01:47 +0800 Subject: [PATCH 5/6] execdetails: make `ConcurrencyInfo` only appear once in explain analyze (#24514) --- util/execdetails/execdetails.go | 8 ++------ util/execdetails/execdetails_test.go | 5 +---- 2 files changed, 3 insertions(+), 10 deletions(-) diff --git a/util/execdetails/execdetails.go b/util/execdetails/execdetails.go index 16a17b656c1cc..97f9e9611513d 100644 --- a/util/execdetails/execdetails.go +++ b/util/execdetails/execdetails.go @@ -734,12 +734,8 @@ func (e *RuntimeStatsWithConcurrencyInfo) String() string { } // Merge implements the RuntimeStats interface. -func (e *RuntimeStatsWithConcurrencyInfo) Merge(rs RuntimeStats) { - tmp, ok := rs.(*RuntimeStatsWithConcurrencyInfo) - if !ok { - return - } - e.concurrency = append(e.concurrency, tmp.concurrency...) +func (e *RuntimeStatsWithConcurrencyInfo) Merge(_ RuntimeStats) { + return } // RuntimeStatsWithCommit is the RuntimeStats with commit detail. diff --git a/util/execdetails/execdetails_test.go b/util/execdetails/execdetails_test.go index 827410cb04350..3d3a70959c180 100644 --- a/util/execdetails/execdetails_test.go +++ b/util/execdetails/execdetails_test.go @@ -272,11 +272,8 @@ func TestRootRuntimeStats(t *testing.T) { stmtStats.RegisterStats(pid, &RuntimeStatsWithCommit{ Commit: commitDetail, }) - concurrency = &RuntimeStatsWithConcurrencyInfo{} - concurrency.SetConcurrencyInfo(NewConcurrencyInfo("concurrent", 0)) - stmtStats.RegisterStats(pid, concurrency) stats := stmtStats.GetRootStats(1) - expect := "time:3s, loops:2, worker:15, concurrent:OFF, commit_txn: {prewrite:1s, get_commit_ts:1s, commit:1s, region_num:5, write_keys:3, write_byte:66, txn_retry:2}" + expect := "time:3s, loops:2, worker:15, commit_txn: {prewrite:1s, get_commit_ts:1s, commit:1s, region_num:5, write_keys:3, write_byte:66, txn_retry:2}" if stats.String() != expect { t.Fatalf("%v != %v", stats.String(), expect) } From 7c3e0361fb78fb0dd71f85fcdcfb9f5e302fb564 Mon Sep 17 00:00:00 2001 From: xiongjiwei Date: Wed, 2 Jun 2021 13:31:37 +0800 Subject: [PATCH 6/6] executor: support explain analyze for CTE statement (#25023) --- executor/cte.go | 22 ++++++++++++++++------ executor/explain_test.go | 25 +++++++++++++++++++++++++ 2 files changed, 41 insertions(+), 6 deletions(-) diff --git a/executor/cte.go b/executor/cte.go index a5e063e9dc9ee..95921670cb5cf 100644 --- a/executor/cte.go +++ b/executor/cte.go @@ -23,6 +23,7 @@ import ( "github.com/pingcap/tidb/util/chunk" "github.com/pingcap/tidb/util/codec" "github.com/pingcap/tidb/util/cteutil" + "github.com/pingcap/tidb/util/disk" "github.com/pingcap/tidb/util/memory" ) @@ -77,6 +78,9 @@ type CTEExec struct { curIter int hCtx *hashContext sel []int + + memTracker *memory.Tracker + diskTracker *disk.Tracker } // Open implements the Executor interface. @@ -93,6 +97,11 @@ func (e *CTEExec) Open(ctx context.Context) (err error) { return err } + e.memTracker = memory.NewTracker(e.id, -1) + e.diskTracker = disk.NewTracker(e.id, -1) + e.memTracker.AttachTo(e.ctx.GetSessionVars().StmtCtx.MemTracker) + e.diskTracker.AttachTo(e.ctx.GetSessionVars().StmtCtx.DiskTracker) + if e.recursiveExec != nil { if err = e.recursiveExec.Open(ctx); err != nil { return err @@ -103,7 +112,7 @@ func (e *CTEExec) Open(ctx context.Context) (err error) { return err } - setupCTEStorageTracker(e.iterOutTbl, e.ctx) + setupCTEStorageTracker(e.iterOutTbl, e.ctx, e.memTracker, e.diskTracker) } if e.isDistinct { @@ -126,8 +135,8 @@ func (e *CTEExec) Next(ctx context.Context, req *chunk.Chunk) (err error) { e.resTbl.Lock() if !e.resTbl.Done() { defer e.resTbl.Unlock() - resAction := setupCTEStorageTracker(e.resTbl, e.ctx) - iterInAction := setupCTEStorageTracker(e.iterInTbl, e.ctx) + resAction := setupCTEStorageTracker(e.resTbl, e.ctx, e.memTracker, e.diskTracker) + iterInAction := setupCTEStorageTracker(e.iterInTbl, e.ctx, e.memTracker, e.diskTracker) failpoint.Inject("testCTEStorageSpill", func(val failpoint.Value) { if val.(bool) && config.GetGlobalConfig().OOMUseTmpStorage { @@ -323,14 +332,15 @@ func (e *CTEExec) reopenTbls() (err error) { return e.iterInTbl.Reopen() } -func setupCTEStorageTracker(tbl cteutil.Storage, ctx sessionctx.Context) (actionSpill *chunk.SpillDiskAction) { +func setupCTEStorageTracker(tbl cteutil.Storage, ctx sessionctx.Context, parentMemTracker *memory.Tracker, + parentDiskTracker *disk.Tracker) (actionSpill *chunk.SpillDiskAction) { memTracker := tbl.GetMemTracker() memTracker.SetLabel(memory.LabelForCTEStorage) - memTracker.AttachTo(ctx.GetSessionVars().StmtCtx.MemTracker) + memTracker.AttachTo(parentMemTracker) diskTracker := tbl.GetDiskTracker() diskTracker.SetLabel(memory.LabelForCTEStorage) - diskTracker.AttachTo(ctx.GetSessionVars().StmtCtx.DiskTracker) + diskTracker.AttachTo(parentDiskTracker) if config.GetGlobalConfig().OOMUseTmpStorage { actionSpill = tbl.ActionSpill() diff --git a/executor/explain_test.go b/executor/explain_test.go index 8a0b062ef68cb..a0ce1a1eb2e0c 100644 --- a/executor/explain_test.go +++ b/executor/explain_test.go @@ -207,6 +207,7 @@ func (s *testSuite2) TestExplainAnalyzeExecutionInfo(c *C) { s.checkExecutionInfo(c, tk, "explain analyze select * from t") s.checkExecutionInfo(c, tk, "explain analyze select k from t use index(k)") s.checkExecutionInfo(c, tk, "explain analyze select * from t use index(k)") + s.checkExecutionInfo(c, tk, "explain analyze with recursive cte(a) as (select 1 union select a + 1 from cte where a < 1000) select * from cte;") tk.MustExec("CREATE TABLE IF NOT EXISTS nation ( N_NATIONKEY BIGINT NOT NULL,N_NAME CHAR(25) NOT NULL,N_REGIONKEY BIGINT NOT NULL,N_COMMENT VARCHAR(152),PRIMARY KEY (N_NATIONKEY));") tk.MustExec("CREATE TABLE IF NOT EXISTS part ( P_PARTKEY BIGINT NOT NULL,P_NAME VARCHAR(55) NOT NULL,P_MFGR CHAR(25) NOT NULL,P_BRAND CHAR(10) NOT NULL,P_TYPE VARCHAR(25) NOT NULL,P_SIZE BIGINT NOT NULL,P_CONTAINER CHAR(10) NOT NULL,P_RETAILPRICE DECIMAL(15,2) NOT NULL,P_COMMENT VARCHAR(23) NOT NULL,PRIMARY KEY (P_PARTKEY));") @@ -320,9 +321,33 @@ func (s *testSuite1) TestCheckActRowsWithUnistore(c *C) { sql: "select count(*) from t_unistore_act_rows group by b", expected: []string{"2", "2", "2", "4"}, }, + { + sql: "with cte(a) as (select a from t_unistore_act_rows) select (select 1 from cte limit 1) from cte;", + expected: []string{"4", "4", "4", "4", "4"}, + }, } for _, test := range tests { checkActRows(c, tk, test.sql, test.expected) } } + +func (s *testSuite2) TestExplainAnalyzeCTEMemoryAndDiskInfo(c *C) { + tk := testkit.NewTestKitWithInit(c, s.store) + tk.MustExec("drop table if exists t") + tk.MustExec("create table t (a int)") + tk.MustExec("insert into t with recursive cte(a) as (select 1 union select a + 1 from cte where a < 1000) select * from cte;") + + rows := tk.MustQuery("explain analyze with recursive cte(a) as (select 1 union select a + 1 from cte where a < 1000)" + + " select * from cte, t;").Rows() + + c.Assert(rows[4][7].(string), Not(Equals), "N/A") + c.Assert(rows[4][8].(string), Equals, "0 Bytes") + + tk.MustExec("set @@tidb_mem_quota_query=10240;") + rows = tk.MustQuery("explain analyze with recursive cte(a) as (select 1 union select a + 1 from cte where a < 1000)" + + " select * from cte, t;").Rows() + + c.Assert(rows[4][7].(string), Not(Equals), "N/A") + c.Assert(rows[4][8].(string), Not(Equals), "N/A") +}