Skip to content

Commit

Permalink
Merge branch 'master' of https://github.com/pingcap/tidb into cartesi…
Browse files Browse the repository at this point in the history
…an_join
  • Loading branch information
windtalker committed Jun 2, 2021
2 parents b7b4a73 + 7c3e036 commit 1cf6b51
Show file tree
Hide file tree
Showing 32 changed files with 1,509 additions and 147 deletions.
14 changes: 14 additions & 0 deletions ddl/column.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ import (
"github.com/pingcap/tidb/tablecodec"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util"
"github.com/pingcap/tidb/util/collate"
"github.com/pingcap/tidb/util/logutil"
decoder "github.com/pingcap/tidb/util/rowDecoder"
"github.com/pingcap/tidb/util/sqlexec"
Expand Down Expand Up @@ -712,6 +713,10 @@ func needChangeColumnData(oldCol, newCol *model.ColumnInfo) bool {
return needTruncationOrToggleSign()
}

if convertBetweenCharAndVarchar(oldCol.Tp, newCol.Tp) {
return true
}

// Deal with the different type.
switch oldCol.Tp {
case mysql.TypeVarchar, mysql.TypeString, mysql.TypeVarString, mysql.TypeBlob, mysql.TypeTinyBlob, mysql.TypeMediumBlob, mysql.TypeLongBlob:
Expand All @@ -734,6 +739,15 @@ func needChangeColumnData(oldCol, newCol *model.ColumnInfo) bool {
return true
}

// Column type conversion between varchar to char need reorganization because
// 1. varchar -> char: char type is stored with the padding removed. All the indexes need to be rewritten.
// 2. char -> varchar: the index value encoding of secondary index on clustered primary key tables is different.
// These secondary indexes need to be rewritten.
func convertBetweenCharAndVarchar(oldCol, newCol byte) bool {
return (types.IsTypeVarchar(oldCol) && newCol == mysql.TypeString) ||
(oldCol == mysql.TypeString && types.IsTypeVarchar(newCol) && collate.NewCollationEnabled())
}

func isElemsChangedToModifyColumn(oldElems, newElems []string) bool {
if len(newElems) < len(oldElems) {
return true
Expand Down
2 changes: 2 additions & 0 deletions ddl/column_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1170,6 +1170,8 @@ func (s *testColumnSuite) TestModifyColumn(c *C) {
}{
{"int", "bigint", nil},
{"int", "int unsigned", errUnsupportedModifyColumn.GenWithStackByArgs("can't change unsigned integer to signed or vice versa, and tidb_enable_change_column_type is false")},
{"varchar(10)", "text", nil},
{"varbinary(10)", "blob", nil},
{"text", "blob", errUnsupportedModifyCharset.GenWithStackByArgs("charset from utf8mb4 to binary")},
{"varchar(10)", "varchar(8)", errUnsupportedModifyColumn.GenWithStackByArgs("length 8 is less than origin 10, and tidb_enable_change_column_type is false")},
{"varchar(10)", "varchar(11)", nil},
Expand Down
243 changes: 169 additions & 74 deletions ddl/column_type_change_test.go

Large diffs are not rendered by default.

8 changes: 5 additions & 3 deletions ddl/db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4102,9 +4102,11 @@ func (s *testSerialDBSuite) TestModifyColumnBetweenStringTypes(c *C) {
tk.MustGetErrMsg("alter table tt change a a varchar(4);", "[types:1406]Data Too Long, field len 4, data len 5")
tk.MustExec("alter table tt change a a varchar(100);")

tk.MustExec("drop table if exists tt;")
tk.MustExec("create table tt (a char(10));")
tk.MustExec("insert into tt values ('111'),('10000');")
// varchar to char
tk.MustExec("alter table tt change a a char(10);")
c2 = getModifyColumn(c, s.s.(sessionctx.Context), "test", "tt", "a", false)
c.Assert(c2.FieldType.Tp, Equals, mysql.TypeString)
c.Assert(c2.FieldType.Flen, Equals, 10)
tk.MustQuery("select * from tt").Check(testkit.Rows("111", "10000"))
tk.MustGetErrMsg("alter table tt change a a char(4);", "[types:1406]Data Too Long, field len 4, data len 5")

Expand Down
10 changes: 5 additions & 5 deletions ddl/ddl_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -3537,7 +3537,7 @@ func CheckModifyTypeCompatible(origin *types.FieldType, to *types.FieldType) (ca
return true, notCompatibleMsg, errUnsupportedModifyColumn.GenWithStackByArgs(notCompatibleMsg)
}

func needReorgToChange(origin *types.FieldType, to *types.FieldType) (needOreg bool, reasonMsg string) {
func needReorgToChange(origin *types.FieldType, to *types.FieldType) (needReorg bool, reasonMsg string) {
toFlen := to.Flen
originFlen := origin.Flen
if mysql.IsIntegerType(to.Tp) && mysql.IsIntegerType(origin.Tp) {
Expand All @@ -3547,6 +3547,10 @@ func needReorgToChange(origin *types.FieldType, to *types.FieldType) (needOreg b
toFlen, _ = mysql.GetDefaultFieldLengthAndDecimal(to.Tp)
}

if convertBetweenCharAndVarchar(origin.Tp, to.Tp) {
return true, "conversion between char and varchar string needs reorganization"
}

if toFlen > 0 && toFlen < originFlen {
return true, fmt.Sprintf("length %d is less than origin %d", toFlen, originFlen)
}
Expand Down Expand Up @@ -3621,10 +3625,6 @@ func checkModifyTypes(ctx sessionctx.Context, origin *types.FieldType, to *types
return errUnsupportedModifyColumn.GenWithStackByArgs(msg)
}
}
if types.IsTypeVarchar(origin.Tp) != types.IsTypeVarchar(to.Tp) {
unsupportedMsg := "column type conversion between 'varchar' and 'non-varchar' is currently unsupported yet"
return errUnsupportedModifyColumn.GenWithStackByArgs(unsupportedMsg)
}

err = checkModifyCharsetAndCollation(to.Charset, to.Collate, origin.Charset, origin.Collate, needRewriteCollationData)
// column type change can handle the charset change between these two types in the process of the reorg.
Expand Down
40 changes: 26 additions & 14 deletions executor/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/log"
"github.com/pingcap/parser"
"github.com/pingcap/parser/ast"
"github.com/pingcap/parser/model"
"github.com/pingcap/parser/mysql"
Expand Down Expand Up @@ -56,7 +57,7 @@ import (
"github.com/pingcap/tidb/util/sqlexec"
"github.com/pingcap/tidb/util/stmtsummary"
"github.com/pingcap/tidb/util/stringutil"

"github.com/pingcap/tidb/util/topsql"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
)
Expand Down Expand Up @@ -213,6 +214,7 @@ func (a *ExecStmt) PointGet(ctx context.Context, is infoschema.InfoSchema) (*rec
defer span1.Finish()
ctx = opentracing.ContextWithSpan(ctx, span1)
}
ctx = a.setPlanLabelForTopSQL(ctx)
startTs := uint64(math.MaxUint64)
err := a.Ctx.InitTxnWithStartTS(startTs)
if err != nil {
Expand Down Expand Up @@ -288,6 +290,15 @@ func (a *ExecStmt) RebuildPlan(ctx context.Context) (int64, error) {
return a.InfoSchema.SchemaMetaVersion(), nil
}

func (a *ExecStmt) setPlanLabelForTopSQL(ctx context.Context) context.Context {
if a.Plan == nil || !variable.TopSQLEnabled() {
return ctx
}
normalizedSQL, sqlDigest := a.Ctx.GetSessionVars().StmtCtx.SQLDigest()
normalizedPlan, planDigest := getPlanDigest(a.Ctx, a.Plan)
return topsql.AttachSQLInfo(ctx, normalizedSQL, sqlDigest, normalizedPlan, planDigest)
}

// Exec builds an Executor from a plan. If the Executor doesn't return result,
// like the INSERT, UPDATE statements, it executes in this function, if the Executor returns
// result, execution is done after this function returns, in the returned sqlexec.RecordSet Next method.
Expand Down Expand Up @@ -357,8 +368,8 @@ func (a *ExecStmt) Exec(ctx context.Context) (_ sqlexec.RecordSet, err error) {
if err != nil {
return nil, err
}

getPlanDigest(a.Ctx, a.Plan)
// ExecuteExec will rewrite `a.Plan`, so set plan label should be executed after `a.buildExecutor`.
ctx = a.setPlanLabelForTopSQL(ctx)

if err = e.Open(ctx); err != nil {
terror.Call(e.Close)
Expand Down Expand Up @@ -951,7 +962,7 @@ func (a *ExecStmt) LogSlowQuery(txnTS uint64, succ bool, hasMoreResults bool) {
statsInfos := plannercore.GetStatsInfo(a.Plan)
memMax := sessVars.StmtCtx.MemTracker.MaxConsumed()
diskMax := sessVars.StmtCtx.DiskTracker.MaxConsumed()
planDigest := getPlanDigest(a.Ctx, a.Plan)
_, planDigest := getPlanDigest(a.Ctx, a.Plan)
slowItems := &variable.SlowQueryLogItems{
TxnTS: txnTS,
SQL: sql.String(),
Expand All @@ -969,7 +980,7 @@ func (a *ExecStmt) LogSlowQuery(txnTS uint64, succ bool, hasMoreResults bool) {
DiskMax: diskMax,
Succ: succ,
Plan: getPlanTree(a.Ctx, a.Plan),
PlanDigest: planDigest,
PlanDigest: planDigest.String(),
Prepared: a.isPreparedStmt,
HasMoreResults: hasMoreResults,
PlanFromCache: sessVars.FoundInPlanCache,
Expand Down Expand Up @@ -1043,15 +1054,15 @@ func getPlanTree(sctx sessionctx.Context, p plannercore.Plan) string {
}

// getPlanDigest will try to get the select plan tree if the plan is select or the select plan of delete/update/insert statement.
func getPlanDigest(sctx sessionctx.Context, p plannercore.Plan) string {
func getPlanDigest(sctx sessionctx.Context, p plannercore.Plan) (string, *parser.Digest) {
sc := sctx.GetSessionVars().StmtCtx
_, planDigest := sc.GetPlanDigest()
if planDigest != nil {
return planDigest.String()
normalized, planDigest := sc.GetPlanDigest()
if len(normalized) > 0 && planDigest != nil {
return normalized, planDigest
}
normalized, planDigest := plannercore.NormalizePlan(p)
normalized, planDigest = plannercore.NormalizePlan(p)
sc.SetPlanDigest(normalized, planDigest)
return planDigest.String()
return normalized, planDigest
}

// getEncodedPlan gets the encoded plan, and generates the hint string if indicated.
Expand Down Expand Up @@ -1125,11 +1136,12 @@ func (a *ExecStmt) SummaryStmt(succ bool) {
var planDigestGen func() string
if a.Plan.TP() == plancodec.TypePointGet {
planDigestGen = func() string {
planDigest := getPlanDigest(a.Ctx, a.Plan)
return planDigest
_, planDigest := getPlanDigest(a.Ctx, a.Plan)
return planDigest.String()
}
} else {
planDigest = getPlanDigest(a.Ctx, a.Plan)
_, tmp := getPlanDigest(a.Ctx, a.Plan)
planDigest = tmp.String()
}

execDetail := stmtCtx.GetExecDetails()
Expand Down
22 changes: 16 additions & 6 deletions executor/cte.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/pingcap/tidb/util/chunk"
"github.com/pingcap/tidb/util/codec"
"github.com/pingcap/tidb/util/cteutil"
"github.com/pingcap/tidb/util/disk"
"github.com/pingcap/tidb/util/memory"
)

Expand Down Expand Up @@ -77,6 +78,9 @@ type CTEExec struct {
curIter int
hCtx *hashContext
sel []int

memTracker *memory.Tracker
diskTracker *disk.Tracker
}

// Open implements the Executor interface.
Expand All @@ -93,6 +97,11 @@ func (e *CTEExec) Open(ctx context.Context) (err error) {
return err
}

e.memTracker = memory.NewTracker(e.id, -1)
e.diskTracker = disk.NewTracker(e.id, -1)
e.memTracker.AttachTo(e.ctx.GetSessionVars().StmtCtx.MemTracker)
e.diskTracker.AttachTo(e.ctx.GetSessionVars().StmtCtx.DiskTracker)

if e.recursiveExec != nil {
if err = e.recursiveExec.Open(ctx); err != nil {
return err
Expand All @@ -103,7 +112,7 @@ func (e *CTEExec) Open(ctx context.Context) (err error) {
return err
}

setupCTEStorageTracker(e.iterOutTbl, e.ctx)
setupCTEStorageTracker(e.iterOutTbl, e.ctx, e.memTracker, e.diskTracker)
}

if e.isDistinct {
Expand All @@ -126,8 +135,8 @@ func (e *CTEExec) Next(ctx context.Context, req *chunk.Chunk) (err error) {
e.resTbl.Lock()
if !e.resTbl.Done() {
defer e.resTbl.Unlock()
resAction := setupCTEStorageTracker(e.resTbl, e.ctx)
iterInAction := setupCTEStorageTracker(e.iterInTbl, e.ctx)
resAction := setupCTEStorageTracker(e.resTbl, e.ctx, e.memTracker, e.diskTracker)
iterInAction := setupCTEStorageTracker(e.iterInTbl, e.ctx, e.memTracker, e.diskTracker)

failpoint.Inject("testCTEStorageSpill", func(val failpoint.Value) {
if val.(bool) && config.GetGlobalConfig().OOMUseTmpStorage {
Expand Down Expand Up @@ -323,14 +332,15 @@ func (e *CTEExec) reopenTbls() (err error) {
return e.iterInTbl.Reopen()
}

func setupCTEStorageTracker(tbl cteutil.Storage, ctx sessionctx.Context) (actionSpill *chunk.SpillDiskAction) {
func setupCTEStorageTracker(tbl cteutil.Storage, ctx sessionctx.Context, parentMemTracker *memory.Tracker,
parentDiskTracker *disk.Tracker) (actionSpill *chunk.SpillDiskAction) {
memTracker := tbl.GetMemTracker()
memTracker.SetLabel(memory.LabelForCTEStorage)
memTracker.AttachTo(ctx.GetSessionVars().StmtCtx.MemTracker)
memTracker.AttachTo(parentMemTracker)

diskTracker := tbl.GetDiskTracker()
diskTracker.SetLabel(memory.LabelForCTEStorage)
diskTracker.AttachTo(ctx.GetSessionVars().StmtCtx.DiskTracker)
diskTracker.AttachTo(parentDiskTracker)

if config.GetGlobalConfig().OOMUseTmpStorage {
actionSpill = tbl.ActionSpill()
Expand Down
6 changes: 3 additions & 3 deletions executor/ddl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -546,12 +546,12 @@ func (s *testSuite6) TestAlterTableModifyColumn(c *C) {
_, err = tk.Exec("alter table mc modify column c2 varchar(8)")
c.Assert(err, NotNil)
tk.MustExec("alter table mc modify column c2 varchar(11)")
tk.MustGetErrCode("alter table mc modify column c2 text(13)", errno.ErrUnsupportedDDLOperation)
tk.MustGetErrCode("alter table mc modify column c2 text", errno.ErrUnsupportedDDLOperation)
tk.MustExec("alter table mc modify column c2 text(13)")
tk.MustExec("alter table mc modify column c2 text")
tk.MustExec("alter table mc modify column c3 bit")
result := tk.MustQuery("show create table mc")
createSQL := result.Rows()[0][1]
expected := "CREATE TABLE `mc` (\n `c1` bigint(20) DEFAULT NULL,\n `c2` varchar(11) DEFAULT NULL,\n `c3` bit(1) DEFAULT NULL\n) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin"
expected := "CREATE TABLE `mc` (\n `c1` bigint(20) DEFAULT NULL,\n `c2` text DEFAULT NULL,\n `c3` bit(1) DEFAULT NULL\n) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin"
c.Assert(createSQL, Equals, expected)
tk.MustExec("create or replace view alter_view as select c1,c2 from mc")
_, err = tk.Exec("alter table alter_view modify column c2 text")
Expand Down
17 changes: 15 additions & 2 deletions executor/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"fmt"
"math"
"runtime"
"runtime/pprof"
"runtime/trace"
"strconv"
"strings"
Expand Down Expand Up @@ -66,6 +67,7 @@ import (
"github.com/pingcap/tidb/util/logutil"
"github.com/pingcap/tidb/util/memory"
"github.com/pingcap/tidb/util/resourcegrouptag"
"github.com/pingcap/tidb/util/topsql"
"go.uber.org/zap"
)

Expand Down Expand Up @@ -1653,9 +1655,20 @@ func ResetContextOfStmt(ctx sessionctx.Context, s ast.StmtNode) (err error) {
sc.MemTracker.SetActionOnExceed(action)
}
if execStmt, ok := s.(*ast.ExecuteStmt); ok {
s, err = planner.GetPreparedStmt(execStmt, vars)
prepareStmt, err := planner.GetPreparedStmt(execStmt, vars)
if err != nil {
return
return err
}
s = prepareStmt.PreparedAst.Stmt
sc.InitSQLDigest(prepareStmt.NormalizedSQL, prepareStmt.SQLDigest)
// For `execute stmt` SQL, should reset the SQL digest with the prepare SQL digest.
goCtx := context.Background()
if variable.EnablePProfSQLCPU.Load() && len(prepareStmt.NormalizedSQL) > 0 {
goCtx = pprof.WithLabels(goCtx, pprof.Labels("sql", util.QueryStrForLog(prepareStmt.NormalizedSQL)))
pprof.SetGoroutineLabels(goCtx)
}
if variable.TopSQLEnabled() && prepareStmt.SQLDigest != nil {
goCtx = topsql.AttachSQLInfo(goCtx, prepareStmt.NormalizedSQL, prepareStmt.SQLDigest, "", nil)
}
}
// execute missed stmtID uses empty sql
Expand Down
25 changes: 25 additions & 0 deletions executor/explain_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,7 @@ func (s *testSuite2) TestExplainAnalyzeExecutionInfo(c *C) {
s.checkExecutionInfo(c, tk, "explain analyze select * from t")
s.checkExecutionInfo(c, tk, "explain analyze select k from t use index(k)")
s.checkExecutionInfo(c, tk, "explain analyze select * from t use index(k)")
s.checkExecutionInfo(c, tk, "explain analyze with recursive cte(a) as (select 1 union select a + 1 from cte where a < 1000) select * from cte;")

tk.MustExec("CREATE TABLE IF NOT EXISTS nation ( N_NATIONKEY BIGINT NOT NULL,N_NAME CHAR(25) NOT NULL,N_REGIONKEY BIGINT NOT NULL,N_COMMENT VARCHAR(152),PRIMARY KEY (N_NATIONKEY));")
tk.MustExec("CREATE TABLE IF NOT EXISTS part ( P_PARTKEY BIGINT NOT NULL,P_NAME VARCHAR(55) NOT NULL,P_MFGR CHAR(25) NOT NULL,P_BRAND CHAR(10) NOT NULL,P_TYPE VARCHAR(25) NOT NULL,P_SIZE BIGINT NOT NULL,P_CONTAINER CHAR(10) NOT NULL,P_RETAILPRICE DECIMAL(15,2) NOT NULL,P_COMMENT VARCHAR(23) NOT NULL,PRIMARY KEY (P_PARTKEY));")
Expand Down Expand Up @@ -320,9 +321,33 @@ func (s *testSuite1) TestCheckActRowsWithUnistore(c *C) {
sql: "select count(*) from t_unistore_act_rows group by b",
expected: []string{"2", "2", "2", "4"},
},
{
sql: "with cte(a) as (select a from t_unistore_act_rows) select (select 1 from cte limit 1) from cte;",
expected: []string{"4", "4", "4", "4", "4"},
},
}

for _, test := range tests {
checkActRows(c, tk, test.sql, test.expected)
}
}

func (s *testSuite2) TestExplainAnalyzeCTEMemoryAndDiskInfo(c *C) {
tk := testkit.NewTestKitWithInit(c, s.store)
tk.MustExec("drop table if exists t")
tk.MustExec("create table t (a int)")
tk.MustExec("insert into t with recursive cte(a) as (select 1 union select a + 1 from cte where a < 1000) select * from cte;")

rows := tk.MustQuery("explain analyze with recursive cte(a) as (select 1 union select a + 1 from cte where a < 1000)" +
" select * from cte, t;").Rows()

c.Assert(rows[4][7].(string), Not(Equals), "N/A")
c.Assert(rows[4][8].(string), Equals, "0 Bytes")

tk.MustExec("set @@tidb_mem_quota_query=10240;")
rows = tk.MustQuery("explain analyze with recursive cte(a) as (select 1 union select a + 1 from cte where a < 1000)" +
" select * from cte, t;").Rows()

c.Assert(rows[4][7].(string), Not(Equals), "N/A")
c.Assert(rows[4][8].(string), Not(Equals), "N/A")
}
Loading

0 comments on commit 1cf6b51

Please sign in to comment.