From 3939e67315b17b18cc2be3255c826b0b1e715934 Mon Sep 17 00:00:00 2001 From: tiancaiamao Date: Tue, 29 Jun 2021 19:59:25 +0800 Subject: [PATCH] *: Merge global temporary table into release-5.1 (#25724) --- bindinfo/bind_test.go | 16 +++ ddl/ddl_api.go | 46 ++++---- ddl/serial_test.go | 113 +++++++++++++++++++- errno/errcode.go | 1 + errno/errname.go | 1 + errors.toml | 15 +++ executor/adapter.go | 2 + executor/builder.go | 68 +++++++++++- executor/executor_test.go | 144 ++++++++++++++++++++++++- executor/point_get.go | 5 + executor/set_test.go | 4 +- executor/stale_txn_test.go | 140 ++++++++++++------------ kv/option.go | 2 + planner/core/errors.go | 13 ++- planner/core/preprocess.go | 153 +++++++++++++++++++++------ planner/core/preprocess_test.go | 26 +++++ session/session.go | 48 +++------ session/session_test.go | 31 ++++++ sessionctx/variable/noop.go | 1 - sessionctx/variable/session.go | 7 ++ sessionctx/variable/sysvar.go | 10 +- sessionctx/variable/tidb_vars.go | 1 + sessionctx/variable/varsutil_test.go | 1 + store/driver/txn/txn_driver.go | 2 + table/table.go | 2 + table/tables/tables.go | 50 ++++++++- util/tableutil/tableutil.go | 3 + 27 files changed, 734 insertions(+), 171 deletions(-) diff --git a/bindinfo/bind_test.go b/bindinfo/bind_test.go index 4175ddce77eb9..13ea3e8f9d367 100644 --- a/bindinfo/bind_test.go +++ b/bindinfo/bind_test.go @@ -30,6 +30,7 @@ import ( "github.com/pingcap/parser/terror" "github.com/pingcap/tidb/bindinfo" "github.com/pingcap/tidb/domain" + "github.com/pingcap/tidb/errno" "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/meta/autoid" "github.com/pingcap/tidb/metrics" @@ -2098,3 +2099,18 @@ func (s *testSuite) TestBindingWithoutCharset(c *C) { c.Assert(rows[0][0], Equals, "select * from `test` . `t` where `a` = ?") c.Assert(rows[0][1], Equals, "SELECT * FROM `test`.`t` WHERE `a` = 'aa'") } + +func (s *testSuite) TestTemporaryTable(c *C) { + tk := testkit.NewTestKit(c, s.store) + s.cleanBindingEnv(tk) + tk.MustExec("use test") + tk.MustExec("drop table if exists t") + tk.MustExec("set tidb_enable_global_temporary_table = true") + tk.MustExec("create global temporary table t(a int, b int, key(a), key(b)) on commit delete rows") + tk.MustExec("create table t2(a int, b int, key(a), key(b))") + tk.MustGetErrCode("create session binding for select * from t where b = 123 using select * from t ignore index(b) where b = 123;", errno.ErrOptOnTemporaryTable) + tk.MustGetErrCode("create binding for insert into t select * from t2 where t2.b = 1 and t2.c > 1 using insert into t select /*+ use_index(t2,c) */ * from t2 where t2.b = 1 and t2.c > 1", errno.ErrOptOnTemporaryTable) + tk.MustGetErrCode("create binding for replace into t select * from t2 where t2.b = 1 and t2.c > 1 using replace into t select /*+ use_index(t2,c) */ * from t2 where t2.b = 1 and t2.c > 1", errno.ErrOptOnTemporaryTable) + tk.MustGetErrCode("create binding for update t set a = 1 where b = 1 and c > 1 using update /*+ use_index(t, c) */ t set a = 1 where b = 1 and c > 1", errno.ErrOptOnTemporaryTable) + tk.MustGetErrCode("create binding for delete from t where b = 1 and c > 1 using delete /*+ use_index(t, c) */ from t where b = 1 and c > 1", errno.ErrOptOnTemporaryTable) +} diff --git a/ddl/ddl_api.go b/ddl/ddl_api.go index 69eefdd9358c6..a1f8ffa807dec 100644 --- a/ddl/ddl_api.go +++ b/ddl/ddl_api.go @@ -1642,12 +1642,15 @@ func checkTableInfoValid(tblInfo *model.TableInfo) error { return checkInvisibleIndexOnPK(tblInfo) } -func buildTableInfoWithLike(ident ast.Ident, referTblInfo *model.TableInfo, s *ast.CreateTableStmt) (*model.TableInfo, error) { +func buildTableInfoWithLike(ctx sessionctx.Context, ident ast.Ident, referTblInfo *model.TableInfo, s *ast.CreateTableStmt) (*model.TableInfo, error) { // Check the referred table is a real table object. if referTblInfo.IsSequence() || referTblInfo.IsView() { return nil, ErrWrongObject.GenWithStackByArgs(ident.Schema, referTblInfo.Name, "BASE TABLE") } tblInfo := *referTblInfo + if err := setTemporaryType(ctx, &tblInfo, s); err != nil { + return nil, errors.Trace(err) + } // Check non-public column and adjust column offset. newColumns := referTblInfo.Cols() newIndices := make([]*model.IndexInfo, 0, len(tblInfo.Indices)) @@ -1735,22 +1738,8 @@ func buildTableInfoWithStmt(ctx sessionctx.Context, s *ast.CreateTableStmt, dbCh if err != nil { return nil, errors.Trace(err) } - switch s.TemporaryKeyword { - case ast.TemporaryGlobal: - tbInfo.TempTableType = model.TempTableGlobal - if !ctx.GetSessionVars().EnableGlobalTemporaryTable { - return nil, errors.New("global temporary table is experimental and it is switched off by tidb_enable_global_temporary_table") - } - // "create global temporary table ... on commit preserve rows" - if !s.OnCommitDelete { - return nil, errors.Trace(errUnsupportedOnCommitPreserve) - } - case ast.TemporaryLocal: - // TODO: set "tbInfo.TempTableType = model.TempTableLocal" after local temporary table is supported. - tbInfo.TempTableType = model.TempTableNone - ctx.GetSessionVars().StmtCtx.AppendWarning(errors.New("local TEMPORARY TABLE is not supported yet, TEMPORARY will be parsed but ignored")) - case ast.TemporaryNone: - tbInfo.TempTableType = model.TempTableNone + if err = setTemporaryType(ctx, tbInfo, s); err != nil { + return nil, errors.Trace(err) } if err = setTableAutoRandomBits(ctx, tbInfo, colDefs); err != nil { @@ -1812,7 +1801,7 @@ func (d *ddl) CreateTable(ctx sessionctx.Context, s *ast.CreateTableStmt) (err e // build tableInfo var tbInfo *model.TableInfo if s.ReferTable != nil { - tbInfo, err = buildTableInfoWithLike(ident, referTbl.Meta(), s) + tbInfo, err = buildTableInfoWithLike(ctx, ident, referTbl.Meta(), s) } else { tbInfo, err = buildTableInfoWithStmt(ctx, s, schema.Charset, schema.Collate) } @@ -1832,6 +1821,27 @@ func (d *ddl) CreateTable(ctx sessionctx.Context, s *ast.CreateTableStmt) (err e return d.CreateTableWithInfo(ctx, schema.Name, tbInfo, onExist, false /*tryRetainID*/) } +func setTemporaryType(ctx sessionctx.Context, tbInfo *model.TableInfo, s *ast.CreateTableStmt) error { + switch s.TemporaryKeyword { + case ast.TemporaryGlobal: + tbInfo.TempTableType = model.TempTableGlobal + if !ctx.GetSessionVars().EnableGlobalTemporaryTable { + return errors.New("global temporary table is experimental and it is switched off by tidb_enable_global_temporary_table") + } + // "create global temporary table ... on commit preserve rows" + if !s.OnCommitDelete { + return errors.Trace(errUnsupportedOnCommitPreserve) + } + case ast.TemporaryLocal: + // TODO: set "tbInfo.TempTableType = model.TempTableLocal" after local temporary table is supported. + tbInfo.TempTableType = model.TempTableNone + ctx.GetSessionVars().StmtCtx.AppendWarning(errors.New("local TEMPORARY TABLE is not supported yet, TEMPORARY will be parsed but ignored")) + default: + tbInfo.TempTableType = model.TempTableNone + } + return nil +} + func (d *ddl) CreateTableWithInfo( ctx sessionctx.Context, dbName model.CIStr, diff --git a/ddl/serial_test.go b/ddl/serial_test.go index e6e3164381566..915e9d66f8cef 100644 --- a/ddl/serial_test.go +++ b/ddl/serial_test.go @@ -525,6 +525,10 @@ func (s *testSerialSuite) TestCreateTableWithLike(c *C) { tk.MustExec("drop database ctwl_db") tk.MustExec("drop database ctwl_db1") +} + +func (s *testSerialSuite) TestCreateTableWithLikeAtTemporaryMode(c *C) { + tk := testkit.NewTestKit(c, s.store) // Test create table like at temporary mode. tk.MustExec("set tidb_enable_global_temporary_table=true") @@ -532,9 +536,116 @@ func (s *testSerialSuite) TestCreateTableWithLike(c *C) { tk.MustExec("drop table if exists temporary_table;") tk.MustExec("create global temporary table temporary_table (a int, b int,index(a)) on commit delete rows") tk.MustExec("drop table if exists temporary_table_t1;") - _, err = tk.Exec("create table temporary_table_t1 like temporary_table") + _, err := tk.Exec("create table temporary_table_t1 like temporary_table") c.Assert(err.Error(), Equals, core.ErrOptOnTemporaryTable.GenWithStackByArgs("create table like").Error()) tk.MustExec("drop table if exists temporary_table;") + + // Test create temporary table like. + // Test auto_random. + tk.MustExec("drop table if exists auto_random_table") + _, err = tk.Exec("create table auto_random_table (a bigint primary key auto_random(3), b varchar(255));") + defer tk.MustExec("drop table if exists auto_random_table") + tk.MustExec("drop table if exists auto_random_temporary_global") + _, err = tk.Exec("create global temporary table auto_random_temporary_global like auto_random_table on commit delete rows;") + c.Assert(err.Error(), Equals, core.ErrOptOnTemporaryTable.GenWithStackByArgs("auto_random").Error()) + + // Test pre split regions. + tk.MustExec("drop table if exists table_pre_split") + _, err = tk.Exec("create table table_pre_split(id int) shard_row_id_bits = 2 pre_split_regions=2;") + defer tk.MustExec("drop table if exists table_pre_split") + tk.MustExec("drop table if exists temporary_table_pre_split") + _, err = tk.Exec("create global temporary table temporary_table_pre_split like table_pre_split ON COMMIT DELETE ROWS;") + c.Assert(err.Error(), Equals, core.ErrOptOnTemporaryTable.GenWithStackByArgs("pre split regions").Error()) + + // Test shard_row_id_bits. + tk.MustExec("drop table if exists shard_row_id_table, shard_row_id_temporary_table, shard_row_id_table_plus, shard_row_id_temporary_table_plus") + _, err = tk.Exec("create table shard_row_id_table (a int) shard_row_id_bits = 5;") + _, err = tk.Exec("create global temporary table shard_row_id_temporary_table like shard_row_id_table on commit delete rows;") + c.Assert(err.Error(), Equals, core.ErrOptOnTemporaryTable.GenWithStackByArgs("shard_row_id_bits").Error()) + tk.MustExec("create table shard_row_id_table_plus (a int);") + tk.MustExec("create global temporary table shard_row_id_temporary_table_plus (a int) on commit delete rows;") + defer tk.MustExec("drop table if exists shard_row_id_table, shard_row_id_temporary_table, shard_row_id_table_plus, shard_row_id_temporary_table_plus") + _, err = tk.Exec("alter table shard_row_id_temporary_table_plus shard_row_id_bits = 4;") + c.Assert(err.Error(), Equals, ddl.ErrOptOnTemporaryTable.GenWithStackByArgs("shard_row_id_bits").Error()) + + // Test partition. + tk.MustExec("drop table if exists global_partition_table;") + tk.MustExec("create table global_partition_table (a int, b int) partition by hash(a) partitions 3;") + defer tk.MustExec("drop table if exists global_partition_table;") + tk.MustGetErrCode("create global temporary table global_partition_temp_table like global_partition_table ON COMMIT DELETE ROWS;", + errno.ErrPartitionNoTemporary) + // Test virtual columns. + tk.MustExec("drop table if exists test_gv_ddl, test_gv_ddl_temp") + tk.MustExec(`create table test_gv_ddl(a int, b int as (a+8) virtual, c int as (b + 2) stored)`) + tk.MustExec(`create global temporary table test_gv_ddl_temp like test_gv_ddl on commit delete rows;`) + defer tk.MustExec("drop table if exists test_gv_ddl_temp, test_gv_ddl") + is := tk.Se.(sessionctx.Context).GetInfoSchema().(infoschema.InfoSchema) + table, err := is.TableByName(model.NewCIStr("test"), model.NewCIStr("test_gv_ddl")) + c.Assert(err, IsNil) + testCases := []struct { + generatedExprString string + generatedStored bool + }{ + {"", false}, + {"`a` + 8", false}, + {"`b` + 2", true}, + } + for i, column := range table.Meta().Columns { + c.Assert(column.GeneratedExprString, Equals, testCases[i].generatedExprString) + c.Assert(column.GeneratedStored, Equals, testCases[i].generatedStored) + } + result := tk.MustQuery(`DESC test_gv_ddl_temp`) + result.Check(testkit.Rows(`a int(11) YES `, `b int(11) YES VIRTUAL GENERATED`, `c int(11) YES STORED GENERATED`)) + tk.MustExec("begin;") + tk.MustExec("insert into test_gv_ddl_temp values (1, default, default)") + tk.MustQuery("select * from test_gv_ddl_temp").Check(testkit.Rows("1 9 11")) + _, err = tk.Exec("commit") + c.Assert(err, IsNil) + + // Test foreign key. + tk.MustExec("drop table if exists test_foreign_key, t1") + tk.MustExec("create table t1 (a int, b int);") + tk.MustExec("create table test_foreign_key (c int,d int,foreign key (d) references t1 (b));") + defer tk.MustExec("drop table if exists test_foreign_key, t1;") + tk.MustExec("create global temporary table test_foreign_key_temp like test_foreign_key on commit delete rows;") + is = tk.Se.(sessionctx.Context).GetInfoSchema().(infoschema.InfoSchema) + table, err = is.TableByName(model.NewCIStr("test"), model.NewCIStr("test_foreign_key_temp")) + c.Assert(err, IsNil) + tableInfo := table.Meta() + c.Assert(len(tableInfo.ForeignKeys), Equals, 0) + + // Issue 25613. + // Test from->normal, to->normal. + tk.MustExec("drop table if exists tb1, tb2") + tk.MustExec("create table tb1(id int);") + tk.MustExec("create table tb2 like tb1") + defer tk.MustExec("drop table if exists tb1, tb2") + tk.MustQuery("show create table tb2;").Check(testkit.Rows("tb2 CREATE TABLE `tb2` (\n" + + " `id` int(11) DEFAULT NULL\n" + + ") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin")) + + // Test from->normal, to->global temporary. + tk.MustExec("drop table if exists tb3, tb4") + tk.MustExec("create table tb3(id int);") + tk.MustExec("create global temporary table tb4 like tb3 on commit delete rows;") + defer tk.MustExec("drop table if exists tb3, tb4") + tk.MustQuery("show create table tb4;").Check(testkit.Rows("tb4 CREATE GLOBAL TEMPORARY TABLE `tb4` (\n" + + " `id` int(11) DEFAULT NULL\n" + + ") ENGINE=memory DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin ON COMMIT DELETE ROWS")) + + // Test from->global temporary, to->normal. + tk.MustExec("drop table if exists tb5, tb6") + tk.MustExec("create global temporary table tb5(id int) on commit delete rows;") + _, err = tk.Exec("create table tb6 like tb5;") + c.Assert(err.Error(), Equals, core.ErrOptOnTemporaryTable.GenWithStackByArgs("create table like").Error()) + defer tk.MustExec("drop table if exists tb5, tb6") + + // Test from->global temporary, to->global temporary. + tk.MustExec("drop table if exists tb7, tb8") + tk.MustExec("create global temporary table tb7(id int) on commit delete rows;") + _, err = tk.Exec("create global temporary table tb8 like tb7 on commit delete rows;") + c.Assert(err.Error(), Equals, core.ErrOptOnTemporaryTable.GenWithStackByArgs("create table like").Error()) + defer tk.MustExec("drop table if exists tb7, tb8") } // TestCancelAddIndex1 tests canceling ddl job when the add index worker is not started. diff --git a/errno/errcode.go b/errno/errcode.go index d6b7c912a15d9..319ddd092bd82 100644 --- a/errno/errcode.go +++ b/errno/errcode.go @@ -923,6 +923,7 @@ const ( ErrTxnTooLarge = 8004 ErrWriteConflictInTiDB = 8005 ErrOptOnTemporaryTable = 8006 + ErrDropTableOnTemporaryTable = 8007 ErrUnsupportedReloadPlugin = 8018 ErrUnsupportedReloadPluginVar = 8019 ErrTableLocked = 8020 diff --git a/errno/errname.go b/errno/errname.go index 6ab983acc3f29..9b40c3c685aaa 100644 --- a/errno/errname.go +++ b/errno/errname.go @@ -925,6 +925,7 @@ var MySQLErrName = map[uint16]*mysql.ErrMessage{ ErrForUpdateCantRetry: mysql.Message("[%d] can not retry select for update statement", nil), ErrAdminCheckTable: mysql.Message("TiDB admin check table failed.", nil), ErrOptOnTemporaryTable: mysql.Message("`%s` is unsupported on temporary tables.", nil), + ErrDropTableOnTemporaryTable: mysql.Message("`drop global temporary table` applies on global temporary table only.", nil), ErrTxnTooLarge: mysql.Message("Transaction is too large, size: %d", nil), ErrWriteConflictInTiDB: mysql.Message("Write conflict, txnStartTS %d is stale", nil), ErrInvalidPluginID: mysql.Message("Wrong plugin id: %s, valid plugin id is [name]-[version], both name and version should not contain '-'", nil), diff --git a/errors.toml b/errors.toml index 3ef2ba977a86e..0fb9c2977f386 100644 --- a/errors.toml +++ b/errors.toml @@ -981,6 +981,11 @@ error = ''' `%-.192s`.`%-.192s` contains view recursion ''' +["planner:1562"] +error = ''' +Cannot create temporary table with partitions +''' + ["planner:1706"] error = ''' Primary key/partition key update is not allowed since the table is updated both as '%-.192s' and '%-.192s'. @@ -1156,6 +1161,11 @@ error = ''' `%s` is unsupported on temporary tables. ''' +["planner:8007"] +error = ''' +`drop global temporary table` applies on global temporary table only. +''' + ["planner:8108"] error = ''' Unsupported type %T @@ -1386,6 +1396,11 @@ error = ''' Unknown column '%-.192s' in '%-.192s' ''' +["table:1114"] +error = ''' +The table '%-.192s' is full +''' + ["table:1192"] error = ''' Can't execute the given command because you have active locked tables or an active transaction diff --git a/executor/adapter.go b/executor/adapter.go index faadfe267bfdf..77498e35c88dc 100644 --- a/executor/adapter.go +++ b/executor/adapter.go @@ -796,6 +796,8 @@ func (a *ExecStmt) buildExecutor() (Executor, error) { } b := newExecutorBuilder(ctx, a.InfoSchema, a.Ti, a.SnapshotTS, a.ExplicitStaleness, a.TxnScope) + b.snapshotTS = a.SnapshotTS + b.explicitStaleness = a.ExplicitStaleness e := b.build(a.Plan) if b.err != nil { return nil, errors.Trace(b.err) diff --git a/executor/builder.go b/executor/builder.go index fb94a1d8d55b2..1af2b34aebc59 100644 --- a/executor/builder.go +++ b/executor/builder.go @@ -2662,6 +2662,10 @@ func buildNoRangeTableReader(b *executorBuilder, v *plannercore.PhysicalTableRea return nil, err } ts := v.GetTableScan() + if err = b.validCanReadTemporaryTable(ts.Table); err != nil { + return nil, err + } + tbl, _ := b.is.TableByID(ts.Table.ID) isPartition, physicalTableID := ts.IsPartition() if isPartition { @@ -2774,6 +2778,11 @@ func (b *executorBuilder) buildTableReader(v *plannercore.PhysicalTableReader) E } ts := v.GetTableScan() + if err = b.validCanReadTemporaryTable(ts.Table); err != nil { + b.err = err + return nil + } + ret.ranges = ts.Ranges sctx := b.ctx.GetSessionVars().StmtCtx sctx.TableIDs = append(sctx.TableIDs, ts.Table.ID) @@ -2991,13 +3000,18 @@ func buildNoRangeIndexReader(b *executorBuilder, v *plannercore.PhysicalIndexRea } func (b *executorBuilder) buildIndexReader(v *plannercore.PhysicalIndexReader) Executor { + is := v.IndexPlans[0].(*plannercore.PhysicalIndexScan) + if err := b.validCanReadTemporaryTable(is.Table); err != nil { + b.err = err + return nil + } + ret, err := buildNoRangeIndexReader(b, v) if err != nil { b.err = err return nil } - is := v.IndexPlans[0].(*plannercore.PhysicalIndexScan) ret.ranges = is.Ranges sctx := b.ctx.GetSessionVars().StmtCtx sctx.IndexNames = append(sctx.IndexNames, is.Table.Name.O+":"+is.Index.Name.O) @@ -3145,13 +3159,18 @@ func buildNoRangeIndexLookUpReader(b *executorBuilder, v *plannercore.PhysicalIn } func (b *executorBuilder) buildIndexLookUpReader(v *plannercore.PhysicalIndexLookUpReader) Executor { + is := v.IndexPlans[0].(*plannercore.PhysicalIndexScan) + if err := b.validCanReadTemporaryTable(is.Table); err != nil { + b.err = err + return nil + } + ret, err := buildNoRangeIndexLookUpReader(b, v) if err != nil { b.err = err return nil } - is := v.IndexPlans[0].(*plannercore.PhysicalIndexScan) ts := v.TablePlans[0].(*plannercore.PhysicalTableScan) ret.ranges = is.Ranges @@ -3255,6 +3274,12 @@ func buildNoRangeIndexMergeReader(b *executorBuilder, v *plannercore.PhysicalInd } func (b *executorBuilder) buildIndexMergeReader(v *plannercore.PhysicalIndexMergeReader) Executor { + ts := v.TablePlans[0].(*plannercore.PhysicalTableScan) + if err := b.validCanReadTemporaryTable(ts.Table); err != nil { + b.err = err + return nil + } + ret, err := buildNoRangeIndexMergeReader(b, v) if err != nil { b.err = err @@ -3274,7 +3299,6 @@ func (b *executorBuilder) buildIndexMergeReader(v *plannercore.PhysicalIndexMerg } } } - ts := v.TablePlans[0].(*plannercore.PhysicalTableScan) sctx.TableIDs = append(sctx.TableIDs, ts.Table.ID) executorCounterIndexMergeReaderExecutor.Inc() @@ -4048,6 +4072,11 @@ func NewRowDecoder(ctx sessionctx.Context, schema *expression.Schema, tbl *model } func (b *executorBuilder) buildBatchPointGet(plan *plannercore.BatchPointGetPlan) Executor { + if err := b.validCanReadTemporaryTable(plan.TblInfo); err != nil { + b.err = err + return nil + } + startTS, err := b.getSnapshotTS() if err != nil { b.err = err @@ -4181,6 +4210,16 @@ func fullRangePartition(idxArr []int) bool { return len(idxArr) == 1 && idxArr[0] == plannercore.FullRange } +type emptySampler struct{} + +func (s *emptySampler) writeChunk(_ *chunk.Chunk) error { + return nil +} + +func (s *emptySampler) finished() bool { + return true +} + func (b *executorBuilder) buildTableSample(v *plannercore.PhysicalTableSample) *TableSampleExecutor { startTS, err := b.getSnapshotTS() if err != nil { @@ -4192,11 +4231,15 @@ func (b *executorBuilder) buildTableSample(v *plannercore.PhysicalTableSample) * table: v.TableInfo, startTS: startTS, } - if v.TableSampleInfo.AstNode.SampleMethod == ast.SampleMethodTypeTiDBRegion { + + if v.TableInfo.Meta().TempTableType != model.TempTableNone { + e.sampler = &emptySampler{} + } else if v.TableSampleInfo.AstNode.SampleMethod == ast.SampleMethodTypeTiDBRegion { e.sampler = newTableRegionSampler( b.ctx, v.TableInfo, startTS, v.TableSampleInfo.Partitions, v.Schema(), v.TableSampleInfo.FullSchema, e.retFieldTypes, v.Desc) } + return e } @@ -4295,3 +4338,20 @@ func (b *executorBuilder) buildCTETableReader(v *plannercore.PhysicalCTETable) E chkIdx: 0, } } + +func (b *executorBuilder) validCanReadTemporaryTable(tbl *model.TableInfo) error { + if tbl.TempTableType == model.TempTableNone { + return nil + } + + // Some tools like dumpling use history read to dump all table's records and will be fail if we return an error. + // So we do not check SnapshotTS here + + sessionVars := b.ctx.GetSessionVars() + + if sessionVars.TxnCtx.IsStaleness || b.explicitStaleness { + return errors.New("can not stale read temporary table") + } + + return nil +} diff --git a/executor/executor_test.go b/executor/executor_test.go index 0ca710c4a8456..f280f19c9cfbd 100644 --- a/executor/executor_test.go +++ b/executor/executor_test.go @@ -8404,15 +8404,15 @@ func (s testSerialSuite) TestTemporaryTableNoNetwork(c *C) { tk.MustExec("set tidb_enable_global_temporary_table=true") tk.MustExec("create global temporary table tmp_t (id int, a int, index(a)) on commit delete rows") - tk.MustExec("begin") - tk.MustExec("insert into tmp_t values (1, 1)") - tk.MustExec("insert into tmp_t values (2, 2)") - c.Assert(failpoint.Enable("github.com/pingcap/tidb/store/mockstore/unistore/rpcServerBusy", "return(true)"), IsNil) defer func() { c.Assert(failpoint.Disable("github.com/pingcap/tidb/store/mockstore/unistore/rpcServerBusy"), IsNil) }() + tk.MustExec("begin") + tk.MustExec("insert into tmp_t values (1, 1)") + tk.MustExec("insert into tmp_t values (2, 2)") + // Make sure the fail point works. // With that failpoint, all requests to the TiKV is discard. rs, err := tk1.Exec("select * from normal") @@ -8559,3 +8559,139 @@ func (s *testResourceTagSuite) TestResourceGroupTag(c *C) { c.Assert(checkCnt > 0, IsTrue, commentf) } } + +func (s *testStaleTxnSuite) TestInvalidReadTemporaryTable(c *C) { + tk := testkit.NewTestKit(c, s.store) + // For mocktikv, safe point is not initialized, we manually insert it for snapshot to use. + safePointName := "tikv_gc_safe_point" + safePointValue := "20160102-15:04:05 -0700" + safePointComment := "All versions after safe point can be accessed. (DO NOT EDIT)" + updateSafePoint := fmt.Sprintf(`INSERT INTO mysql.tidb VALUES ('%[1]s', '%[2]s', '%[3]s') + ON DUPLICATE KEY + UPDATE variable_value = '%[2]s', comment = '%[3]s'`, safePointName, safePointValue, safePointComment) + tk.MustExec(updateSafePoint) + + tk.MustExec("set @@tidb_enable_global_temporary_table=1") + tk.MustExec("use test") + tk.MustExec("drop table if exists tmp1") + tk.MustExec("create global temporary table tmp1 " + + "(id int not null primary key, code int not null, value int default null, unique key code(code))" + + "on commit delete rows") + + // sleep 1us to make test stale + time.Sleep(time.Microsecond) + + queries := []struct { + sql string + }{ + { + sql: "select * from tmp1 where id=1", + }, + { + sql: "select * from tmp1 where code=1", + }, + { + sql: "select * from tmp1 where id in (1, 2, 3)", + }, + { + sql: "select * from tmp1 where code in (1, 2, 3)", + }, + { + sql: "select * from tmp1 where id > 1", + }, + { + sql: "select /*+use_index(tmp1, code)*/ * from tmp1 where code > 1", + }, + { + sql: "select /*+use_index(tmp1, code)*/ code from tmp1 where code > 1", + }, + { + sql: "select /*+ use_index_merge(tmp1, primary, code) */ * from tmp1 where id > 1 or code > 2", + }, + } + + addStaleReadToSQL := func(sql string) string { + idx := strings.Index(sql, " where ") + if idx < 0 { + return "" + } + return sql[0:idx] + " as of timestamp NOW(6)" + sql[idx:] + } + + for _, query := range queries { + sql := addStaleReadToSQL(query.sql) + if sql != "" { + tk.MustGetErrMsg(sql, "can not stale read temporary table") + } + } + + tk.MustExec("start transaction read only as of timestamp NOW(6)") + for _, query := range queries { + tk.MustGetErrMsg(query.sql, "can not stale read temporary table") + } + tk.MustExec("commit") + + for _, query := range queries { + tk.MustExec(query.sql) + } + + tk.MustExec("set transaction read only as of timestamp NOW(6)") + tk.MustExec("start transaction") + for _, query := range queries { + tk.MustGetErrMsg(query.sql, "can not stale read temporary table") + } + tk.MustExec("commit") + + for _, query := range queries { + tk.MustExec(query.sql) + } + + tk.MustExec("set @@tidb_snapshot=NOW(6)") + for _, query := range queries { + // Will success here for compatibility with some tools like dumping + rs := tk.MustQuery(query.sql) + rs.Check(testkit.Rows()) + } +} + +func (s *testSuite) TestEmptyTableSampleTemporaryTable(c *C) { + tk := testkit.NewTestKit(c, s.store) + // For mocktikv, safe point is not initialized, we manually insert it for snapshot to use. + safePointName := "tikv_gc_safe_point" + safePointValue := "20160102-15:04:05 -0700" + safePointComment := "All versions after safe point can be accessed. (DO NOT EDIT)" + updateSafePoint := fmt.Sprintf(`INSERT INTO mysql.tidb VALUES ('%[1]s', '%[2]s', '%[3]s') + ON DUPLICATE KEY + UPDATE variable_value = '%[2]s', comment = '%[3]s'`, safePointName, safePointValue, safePointComment) + tk.MustExec(updateSafePoint) + + tk.MustExec("set @@tidb_enable_global_temporary_table=1") + tk.MustExec("use test") + tk.MustExec("drop table if exists tmp1") + tk.MustExec("create global temporary table tmp1 " + + "(id int not null primary key, code int not null, value int default null, unique key code(code))" + + "on commit delete rows") + + // sleep 1us to make test stale + time.Sleep(time.Microsecond) + + // test tablesample return empty + rs := tk.MustQuery("select * from tmp1 tablesample regions()") + rs.Check(testkit.Rows()) + + tk.MustExec("begin") + tk.MustExec("insert into tmp1 values (1, 1, 1)") + rs = tk.MustQuery("select * from tmp1 tablesample regions()") + rs.Check(testkit.Rows()) + tk.MustExec("commit") + + // tablesample should not return error for compatibility of tools like dumpling + tk.MustExec("set @@tidb_snapshot=NOW(6)") + rs = tk.MustQuery("select * from tmp1 tablesample regions()") + rs.Check(testkit.Rows()) + + tk.MustExec("begin") + rs = tk.MustQuery("select * from tmp1 tablesample regions()") + rs.Check(testkit.Rows()) + tk.MustExec("commit") +} diff --git a/executor/point_get.go b/executor/point_get.go index 65200df9e965a..384a8b67722bc 100644 --- a/executor/point_get.go +++ b/executor/point_get.go @@ -42,6 +42,11 @@ import ( ) func (b *executorBuilder) buildPointGet(p *plannercore.PointGetPlan) Executor { + if err := b.validCanReadTemporaryTable(p.TblInfo); err != nil { + b.err = err + return nil + } + startTS, err := b.getSnapshotTS() if err != nil { b.err = err diff --git a/executor/set_test.go b/executor/set_test.go index d0a7967cf1c3b..e0dbb14935521 100644 --- a/executor/set_test.go +++ b/executor/set_test.go @@ -884,9 +884,9 @@ func (s *testSuite5) TestValidateSetVar(c *C) { result = tk.MustQuery("select @@tmp_table_size;") result.Check(testkit.Rows("167772161")) - tk.MustExec("set @@tmp_table_size=18446744073709551615") + tk.MustExec("set @@tmp_table_size=9223372036854775807") result = tk.MustQuery("select @@tmp_table_size;") - result.Check(testkit.Rows("18446744073709551615")) + result.Check(testkit.Rows("9223372036854775807")) _, err = tk.Exec("set @@tmp_table_size=18446744073709551616") c.Assert(terror.ErrorEqual(err, variable.ErrWrongTypeForVar), IsTrue) diff --git a/executor/stale_txn_test.go b/executor/stale_txn_test.go index 6ca8aac96f0f5..1f313edade59d 100644 --- a/executor/stale_txn_test.go +++ b/executor/stale_txn_test.go @@ -15,6 +15,7 @@ package executor_test import ( "fmt" + "strings" "time" . "github.com/pingcap/check" @@ -861,79 +862,88 @@ func (s *testStaleTxnSuite) TestSetTransactionInfoSchema(c *C) { c.Assert(tk.Se.GetInfoSchema().SchemaMetaVersion(), Equals, schemaVer3) } -func (s *testStaleTxnSuite) TestStaleSelect(c *C) { +func (s *testStaleTxnSuite) TestStaleReadTemporaryTable(c *C) { tk := testkit.NewTestKit(c, s.store) - tk.MustExec("use test") - tk.MustExec("drop table if exists t") - defer tk.MustExec("drop table if exists t") - tk.MustExec("create table t (id int)") - - tolerance := 50 * time.Millisecond - - tk.MustExec("insert into t values (1)") - time.Sleep(tolerance) - time1 := time.Now() - - tk.MustExec("insert into t values (2)") - time.Sleep(tolerance) - time2 := time.Now() - - tk.MustExec("insert into t values (3)") - time.Sleep(tolerance) - - staleRows := testkit.Rows("1") - staleSQL := fmt.Sprintf(`select * from t as of timestamp '%s'`, time1.Format("2006-1-2 15:04:05.000")) + // For mocktikv, safe point is not initialized, we manually insert it for snapshot to use. + safePointName := "tikv_gc_safe_point" + safePointValue := "20160102-15:04:05 -0700" + safePointComment := "All versions after safe point can be accessed. (DO NOT EDIT)" + updateSafePoint := fmt.Sprintf(`INSERT INTO mysql.tidb VALUES ('%[1]s', '%[2]s', '%[3]s') + ON DUPLICATE KEY + UPDATE variable_value = '%[2]s', comment = '%[3]s'`, safePointName, safePointValue, safePointComment) + tk.MustExec(updateSafePoint) - // test normal stale select - tk.MustQuery(staleSQL).Check(staleRows) + tk.MustExec("set @@tidb_enable_global_temporary_table=1") + tk.MustExec("use test") + tk.MustExec("drop table if exists tmp1") + tk.MustExec("create global temporary table tmp1 " + + "(id int not null primary key, code int not null, value int default null, unique key code(code))" + + "on commit delete rows") + time.Sleep(time.Second) + tk.MustGetErrMsg("select * from tmp1 as of timestamp NOW() where id=1", "can not stale read temporary table") + + queries := []struct { + sql string + }{ + { + sql: "select * from tmp1 where id=1", + }, + { + sql: "select * from tmp1 where code=1", + }, + { + sql: "select * from tmp1 where id in (1, 2, 3)", + }, + { + sql: "select * from tmp1 where code in (1, 2, 3)", + }, + { + sql: "select * from tmp1 where id > 1", + }, + { + sql: "select /*+use_index(tmp1, code)*/ * from tmp1 where code > 1", + }, + { + sql: "select /*+use_index(tmp1, code)*/ code from tmp1 where code > 1", + }, + { + sql: "select /*+ use_index_merge(tmp1, primary, code) */ * from tmp1 where id > 1 or code > 2", + }, + } - // test stale select in txn - tk.MustExec("begin") - c.Assert(tk.ExecToErr(staleSQL), NotNil) - tk.MustExec("commit") + addStaleReadToSQL := func(sql string) string { + idx := strings.Index(sql, " where ") + if idx < 0 { + return "" + } + return sql[0:idx] + " as of timestamp NOW()" + sql[idx:] + } - // test prepared stale select - tk.MustExec(fmt.Sprintf(`prepare s from "%s"`, staleSQL)) - tk.MustQuery("execute s") + for _, query := range queries { + sql := addStaleReadToSQL(query.sql) + if sql != "" { + tk.MustGetErrMsg(sql, "can not stale read temporary table") + } + } - // test prepared stale select in txn - tk.MustExec("begin") - c.Assert(tk.ExecToErr(staleSQL), NotNil) + tk.MustExec("start transaction read only as of timestamp NOW()") + for _, query := range queries { + tk.MustGetErrMsg(query.sql, "can not stale read temporary table") + } tk.MustExec("commit") - // test stale select in stale txn - tk.MustExec(fmt.Sprintf(`start transaction read only as of timestamp '%s'`, time2.Format("2006-1-2 15:04:05.000"))) - c.Assert(tk.ExecToErr(staleSQL), NotNil) - tk.MustExec("commit") + for _, query := range queries { + tk.MustExec(query.sql) + } - // test prepared stale select in stale txn - tk.MustExec(fmt.Sprintf(`start transaction read only as of timestamp '%s'`, time2.Format("2006-1-2 15:04:05.000"))) - c.Assert(tk.ExecToErr(staleSQL), NotNil) + tk.MustExec("set transaction read only as of timestamp NOW()") + tk.MustExec("start transaction") + for _, query := range queries { + tk.MustGetErrMsg(query.sql, "can not stale read temporary table") + } tk.MustExec("commit") - // test prepared stale select with schema change - tk.MustExec("alter table t add column c int") - tk.MustExec("insert into t values (4, 5)") - time.Sleep(10 * time.Millisecond) - tk.MustQuery("execute s").Check(staleRows) - - // test dynamic timestamp stale select - time3 := time.Now() - tk.MustExec("alter table t add column d int") - tk.MustExec("insert into t values (4, 4, 4)") - time.Sleep(tolerance) - time4 := time.Now() - staleRows = testkit.Rows("1 ", "2 ", "3 ", "4 5") - tk.MustQuery(fmt.Sprintf("select * from t as of timestamp CURRENT_TIMESTAMP(3) - INTERVAL %d MICROSECOND", time4.Sub(time3).Microseconds())).Check(staleRows) - - // test prepared dynamic timestamp stale select - time5 := time.Now() - tk.MustExec(fmt.Sprintf(`prepare v from "select * from t as of timestamp CURRENT_TIMESTAMP(3) - INTERVAL %d MICROSECOND"`, time5.Sub(time3).Microseconds())) - tk.MustQuery("execute v").Check(staleRows) - - // test point get - time6 := time.Now() - tk.MustExec("insert into t values (5, 5, 5)") - time.Sleep(tolerance) - tk.MustQuery(fmt.Sprintf("select * from t as of timestamp '%s' where c=5", time6.Format("2006-1-2 15:04:05.000"))).Check(testkit.Rows("4 5 ")) + for _, query := range queries { + tk.MustExec(query.sql) + } } diff --git a/kv/option.go b/kv/option.go index de5a1d8834c40..6b1e830e12984 100644 --- a/kv/option.go +++ b/kv/option.go @@ -61,6 +61,8 @@ const ( MatchStoreLabels // ResourceGroupTag indicates the resource group of the kv request. ResourceGroupTag + // KVFilter indicates the filter to ignore key-values in the transaction's memory buffer. + KVFilter ) // ReplicaReadType is the type of replica to read data from diff --git a/planner/core/errors.go b/planner/core/errors.go index 04fa1797d95bd..e501af7676c5f 100644 --- a/planner/core/errors.go +++ b/planner/core/errors.go @@ -94,9 +94,12 @@ var ( ErrCTERecursiveForbiddenJoinOrder = dbterror.ClassOptimizer.NewStd(mysql.ErrCTERecursiveForbiddenJoinOrder) ErrInvalidRequiresSingleReference = dbterror.ClassOptimizer.NewStd(mysql.ErrInvalidRequiresSingleReference) // Since we cannot know if user logged in with a password, use message of ErrAccessDeniedNoPassword instead - ErrAccessDenied = dbterror.ClassOptimizer.NewStdErr(mysql.ErrAccessDenied, mysql.MySQLErrName[mysql.ErrAccessDeniedNoPassword]) - ErrBadNull = dbterror.ClassOptimizer.NewStd(mysql.ErrBadNull) - ErrNotSupportedWithSem = dbterror.ClassOptimizer.NewStd(mysql.ErrNotSupportedWithSem) - ErrAsOf = dbterror.ClassOptimizer.NewStd(mysql.ErrAsOf) - ErrOptOnTemporaryTable = dbterror.ClassOptimizer.NewStd(mysql.ErrOptOnTemporaryTable) + ErrAccessDenied = dbterror.ClassOptimizer.NewStdErr(mysql.ErrAccessDenied, mysql.MySQLErrName[mysql.ErrAccessDeniedNoPassword]) + ErrBadNull = dbterror.ClassOptimizer.NewStd(mysql.ErrBadNull) + ErrNotSupportedWithSem = dbterror.ClassOptimizer.NewStd(mysql.ErrNotSupportedWithSem) + ErrAsOf = dbterror.ClassOptimizer.NewStd(mysql.ErrAsOf) + ErrOptOnTemporaryTable = dbterror.ClassOptimizer.NewStd(mysql.ErrOptOnTemporaryTable) + ErrDropTableOnTemporaryTable = dbterror.ClassOptimizer.NewStd(mysql.ErrDropTableOnTemporaryTable) + // ErrPartitionNoTemporary returns when partition at temporary mode + ErrPartitionNoTemporary = dbterror.ClassOptimizer.NewStd(mysql.ErrPartitionNoTemporary) ) diff --git a/planner/core/preprocess.go b/planner/core/preprocess.go index 4dca6ddb50170..5f7190d166d3c 100644 --- a/planner/core/preprocess.go +++ b/planner/core/preprocess.go @@ -24,6 +24,7 @@ import ( "github.com/pingcap/parser/format" "github.com/pingcap/parser/model" "github.com/pingcap/parser/mysql" + "github.com/pingcap/parser/terror" "github.com/pingcap/tidb/config" "github.com/pingcap/tidb/ddl" "github.com/pingcap/tidb/domain" @@ -33,6 +34,7 @@ import ( "github.com/pingcap/tidb/privilege" "github.com/pingcap/tidb/sessionctx" "github.com/pingcap/tidb/store/tikv/oracle" + "github.com/pingcap/tidb/table" "github.com/pingcap/tidb/types" driver "github.com/pingcap/tidb/types/parser_driver" "github.com/pingcap/tidb/util" @@ -53,7 +55,7 @@ func InTxnRetry(p *preprocessor) { p.flag |= inTxnRetry } -// WithPreprocessorReturn returns a PreprocessOpt to initialize the PreprocesorReturn. +// WithPreprocessorReturn returns a PreprocessOpt to initialize the PreprocessorReturn. func WithPreprocessorReturn(ret *PreprocessorReturn) PreprocessOpt { return func(p *preprocessor) { p.PreprocessorReturn = ret @@ -91,7 +93,7 @@ func TryAddExtraLimit(ctx sessionctx.Context, node ast.StmtNode) ast.StmtNode { } // Preprocess resolves table names of the node, and checks some statements validation. -// prepreocssReturn used to extract the infoschema for the tableName and the timestamp from the asof clause. +// preprocessReturn used to extract the infoschema for the tableName and the timestamp from the asof clause. func Preprocess(ctx sessionctx.Context, node ast.Node, preprocessOpt ...PreprocessOpt) error { v := preprocessor{ctx: ctx, tableAliasInJoin: make([]map[string]interface{}, 0), withName: make(map[string]interface{})} for _, optFn := range preprocessOpt { @@ -339,6 +341,37 @@ func bindableStmtType(node ast.StmtNode) byte { return TypeInvalid } +func (p *preprocessor) tableByName(tn *ast.TableName) (table.Table, error) { + currentDB := p.ctx.GetSessionVars().CurrentDB + if tn.Schema.String() != "" { + currentDB = tn.Schema.L + } + if currentDB == "" { + return nil, errors.Trace(ErrNoDB) + } + sName := model.NewCIStr(currentDB) + tbl, err := p.ensureInfoSchema().TableByName(sName, tn.Name) + if err != nil { + // We should never leak that the table doesn't exist (i.e. attach ErrTableNotExists) + // unless we know that the user has permissions to it, should it exist. + // By checking here, this makes all SELECT/SHOW/INSERT/UPDATE/DELETE statements safe. + currentUser, activeRoles := p.ctx.GetSessionVars().User, p.ctx.GetSessionVars().ActiveRoles + if pm := privilege.GetPrivilegeManager(p.ctx); pm != nil { + if !pm.RequestVerification(activeRoles, sName.L, tn.Name.O, "", mysql.AllPrivMask) { + u := currentUser.Username + h := currentUser.Hostname + if currentUser.AuthHostname != "" { + u = currentUser.AuthUsername + h = currentUser.AuthHostname + } + return nil, ErrTableaccessDenied.GenWithStackByArgs(p.stmtType(), u, h, tn.Name.O) + } + } + return nil, err + } + return tbl, err +} + func (p *preprocessor) checkBindGrammar(originNode, hintedNode ast.StmtNode, defaultDB string) { origTp := bindableStmtType(originNode) hintedTp := bindableStmtType(hintedNode) @@ -357,6 +390,39 @@ func (p *preprocessor) checkBindGrammar(originNode, hintedNode ast.StmtNode, def return } } + + // Check the bind operation is not on any temporary table. + var resNode ast.ResultSetNode + switch n := originNode.(type) { + case *ast.SelectStmt: + resNode = n.From.TableRefs + case *ast.DeleteStmt: + resNode = n.TableRefs.TableRefs + case *ast.UpdateStmt: + resNode = n.TableRefs.TableRefs + case *ast.InsertStmt: + resNode = n.Table.TableRefs + } + if resNode != nil { + tblNames := extractTableList(resNode, nil, false) + for _, tn := range tblNames { + tbl, err := p.tableByName(tn) + if err != nil { + // If the operation is order is: drop table -> drop binding + // The table doesn't exist, it is not an error. + if terror.ErrorEqual(err, infoschema.ErrTableNotExists) { + continue + } + p.err = err + return + } + if tbl.Meta().TempTableType != model.TempTableNone { + p.err = ddl.ErrOptOnTemporaryTable.GenWithStackByArgs("create binding") + return + } + } + } + originSQL := parser.Normalize(utilparser.RestoreWithDefaultDB(originNode, defaultDB, originNode.Text())) hintedSQL := parser.Normalize(utilparser.RestoreWithDefaultDB(hintedNode, defaultDB, hintedNode.Text())) if originSQL != hintedSQL { @@ -600,17 +666,7 @@ func (p *preprocessor) checkDropDatabaseGrammar(stmt *ast.DropDatabaseStmt) { func (p *preprocessor) checkAdminCheckTableGrammar(stmt *ast.AdminStmt) { for _, table := range stmt.Tables { - currentDB := p.ctx.GetSessionVars().CurrentDB - if table.Schema.String() != "" { - currentDB = table.Schema.L - } - if currentDB == "" { - p.err = errors.Trace(ErrNoDB) - return - } - sName := model.NewCIStr(currentDB) - tName := table.Name - tableInfo, err := p.ensureInfoSchema().TableByName(sName, tName) + tableInfo, err := p.tableByName(table) if err != nil { p.err = err return @@ -639,10 +695,19 @@ func (p *preprocessor) checkCreateTableGrammar(stmt *ast.CreateTableStmt) { p.err = err return } - if tableInfo.Meta().TempTableType != model.TempTableNone { + tableMetaInfo := tableInfo.Meta() + if tableMetaInfo.TempTableType != model.TempTableNone { p.err = ErrOptOnTemporaryTable.GenWithStackByArgs("create table like") return } + if stmt.TemporaryKeyword != ast.TemporaryNone { + err := checkReferInfoForTemporaryTable(tableMetaInfo) + if err != nil { + p.err = err + return + } + + } } if stmt.TemporaryKeyword != ast.TemporaryNone { for _, opt := range stmt.Options { @@ -774,6 +839,29 @@ func (p *preprocessor) checkDropTableGrammar(stmt *ast.DropTableStmt) { p.err = expression.ErrFunctionsNoopImpl.GenWithStackByArgs("DROP TEMPORARY TABLE") return } + if stmt.TemporaryKeyword == ast.TemporaryNone { + return + } + currentDB := model.NewCIStr(p.ctx.GetSessionVars().CurrentDB) + for _, t := range stmt.Tables { + if isIncorrectName(t.Name.String()) { + p.err = ddl.ErrWrongTableName.GenWithStackByArgs(t.Name.String()) + return + } + if t.Schema.String() != "" { + currentDB = t.Schema + } + tableInfo, err := p.ensureInfoSchema().TableByName(currentDB, t.Name) + if err != nil { + p.err = err + return + } + if tableInfo.Meta().TempTableType != model.TempTableGlobal { + p.err = ErrDropTableOnTemporaryTable + return + } + } + } func (p *preprocessor) checkDropTableNames(tables []*ast.TableName) { @@ -1023,6 +1111,23 @@ func checkTableEngine(engineName string) error { return nil } +func checkReferInfoForTemporaryTable(tableMetaInfo *model.TableInfo) error { + if tableMetaInfo.AutoRandomBits != 0 { + return ErrOptOnTemporaryTable.GenWithStackByArgs("auto_random") + } + if tableMetaInfo.PreSplitRegions != 0 { + return ErrOptOnTemporaryTable.GenWithStackByArgs("pre split regions") + } + if tableMetaInfo.Partition != nil { + return ErrPartitionNoTemporary + } + if tableMetaInfo.ShardRowIDBits != 0 { + return ErrOptOnTemporaryTable.GenWithStackByArgs("shard_row_id_bits") + } + + return nil +} + // checkColumn checks if the column definition is valid. // See https://dev.mysql.com/doc/refman/5.7/en/storage-requirements.html func checkColumn(colDef *ast.ColumnDef) error { @@ -1246,27 +1351,12 @@ func (p *preprocessor) handleTableName(tn *ast.TableName) { return } - table, err := p.ensureInfoSchema().TableByName(tn.Schema, tn.Name) + table, err := p.tableByName(tn) if err != nil { - // We should never leak that the table doesn't exist (i.e. attach ErrTableNotExists) - // unless we know that the user has permissions to it, should it exist. - // By checking here, this makes all SELECT/SHOW/INSERT/UPDATE/DELETE statements safe. - currentUser, activeRoles := p.ctx.GetSessionVars().User, p.ctx.GetSessionVars().ActiveRoles - if pm := privilege.GetPrivilegeManager(p.ctx); pm != nil { - if !pm.RequestVerification(activeRoles, tn.Schema.L, tn.Name.O, "", mysql.AllPrivMask) { - u := currentUser.Username - h := currentUser.Hostname - if currentUser.AuthHostname != "" { - u = currentUser.AuthUsername - h = currentUser.AuthHostname - } - p.err = ErrTableaccessDenied.GenWithStackByArgs(p.stmtType(), u, h, tn.Name.O) - return - } - } p.err = err return } + tableInfo := table.Meta() dbInfo, _ := p.ensureInfoSchema().SchemaByName(tn.Schema) // tableName should be checked as sequence object. @@ -1450,6 +1540,7 @@ func (p *preprocessor) handleAsOfAndReadTS(node *ast.AsOfClause) { if p.err != nil { return } + p.ExplicitStaleness = true } p.initedLastSnapshotTS = true } diff --git a/planner/core/preprocess_test.go b/planner/core/preprocess_test.go index d9f053f509e92..e32148c572728 100644 --- a/planner/core/preprocess_test.go +++ b/planner/core/preprocess_test.go @@ -337,3 +337,29 @@ func (s *testValidatorSuite) TestForeignKey(c *C) { s.runSQL(c, "ALTER TABLE test.t1 ADD CONSTRAINT fk FOREIGN KEY (c) REFERENCES test2.t (e)", false, nil) } + +func (s *testValidatorSuite) TestDropGlobalTempTable(c *C) { + defer testleak.AfterTest(c)() + defer func() { + s.dom.Close() + s.store.Close() + }() + + ctx := context.Background() + execSQLList := []string{ + "use test", + "set tidb_enable_global_temporary_table=true", + "create table tb(id int);", + "create global temporary table temp(id int) on commit delete rows;", + "create global temporary table temp1(id int) on commit delete rows;", + } + for _, execSQL := range execSQLList { + _, err := s.se.Execute(ctx, execSQL) + c.Assert(err, IsNil) + } + s.is = s.dom.InfoSchema() + s.runSQL(c, "drop global temporary table tb;", false, core.ErrDropTableOnTemporaryTable) + s.runSQL(c, "drop global temporary table temp", false, nil) + s.runSQL(c, "drop global temporary table test.tb;", false, core.ErrDropTableOnTemporaryTable) + s.runSQL(c, "drop global temporary table test.temp1", false, nil) +} diff --git a/session/session.go b/session/session.go index 880fa13a6671d..7ef59197fb3ca 100644 --- a/session/session.go +++ b/session/session.go @@ -70,6 +70,7 @@ import ( "github.com/pingcap/tidb/statistics" "github.com/pingcap/tidb/statistics/handle" "github.com/pingcap/tidb/store/tikv" + tikvstore "github.com/pingcap/tidb/store/tikv/kv" tikvutil "github.com/pingcap/tidb/store/tikv/util" "github.com/pingcap/tidb/tablecodec" "github.com/pingcap/tidb/telemetry" @@ -83,6 +84,7 @@ import ( "github.com/pingcap/tidb/util/logutil" "github.com/pingcap/tidb/util/sli" "github.com/pingcap/tidb/util/sqlexec" + "github.com/pingcap/tidb/util/tableutil" "github.com/pingcap/tidb/util/timeutil" ) @@ -474,10 +476,6 @@ func (s *session) doCommit(ctx context.Context) error { if err != nil { return err } - if err = s.removeTempTableFromBuffer(); err != nil { - return err - } - // mockCommitError and mockGetTSErrorInRetry use to test PR #8743. failpoint.Inject("mockCommitError", func(val failpoint.Value) { if val.(bool) && tikv.IsMockCommitErrorEnable() { @@ -535,41 +533,23 @@ func (s *session) doCommit(ctx context.Context) error { s.txn.SetOption(kv.GuaranteeLinearizability, s.GetSessionVars().TxnCtx.IsExplicit && s.GetSessionVars().GuaranteeLinearizability) } + if tables := s.GetSessionVars().TxnCtx.GlobalTemporaryTables; len(tables) > 0 { + s.txn.SetOption(kv.KVFilter, temporaryTableKVFilter(tables)) + } return s.txn.Commit(tikvutil.SetSessionID(ctx, s.GetSessionVars().ConnectionID)) } -// removeTempTableFromBuffer filters out the temporary table key-values. -func (s *session) removeTempTableFromBuffer() error { - tables := s.GetSessionVars().TxnCtx.GlobalTemporaryTables - if len(tables) == 0 { - return nil - } - memBuffer := s.txn.GetMemBuffer() - // Reset and new an empty stage buffer. - defer func() { - s.txn.cleanup() - }() - for tid := range tables { - seekKey := tablecodec.EncodeTablePrefix(tid) - endKey := tablecodec.EncodeTablePrefix(tid + 1) - iter, err := memBuffer.Iter(seekKey, endKey) - if err != nil { - return err - } - for iter.Valid() && iter.Key().HasPrefix(seekKey) { - if err = memBuffer.Delete(iter.Key()); err != nil { - return err - } - s.txn.UpdateEntriesCountAndSize() - if err = iter.Next(); err != nil { - return err - } - } +type temporaryTableKVFilter map[int64]tableutil.TempTable + +func (m temporaryTableKVFilter) IsUnnecessaryKeyValue(key, value []byte, flags tikvstore.KeyFlags) bool { + tid := tablecodec.DecodeTableID(key) + if _, ok := m[tid]; ok { + return true } - // Flush to the root membuffer. - s.txn.flushStmtBuf() - return nil + + // This is the default filter for all tables. + return tablecodec.IsUntouchedIndexKValue(key, value) } // errIsNoisy is used to filter DUPLCATE KEY errors. diff --git a/session/session_test.go b/session/session_test.go index 69fc02bcf803a..6cd608f7b5e59 100644 --- a/session/session_test.go +++ b/session/session_test.go @@ -37,6 +37,7 @@ import ( "github.com/pingcap/tidb/config" "github.com/pingcap/tidb/ddl/placement" "github.com/pingcap/tidb/domain" + "github.com/pingcap/tidb/errno" "github.com/pingcap/tidb/executor" "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/meta/autoid" @@ -4593,6 +4594,36 @@ func (s *testSessionSuite) TestInTxnPSProtoPointGet(c *C) { tk.MustExec("commit") } +func (s *testSessionSuite) TestTMPTableSize(c *C) { + // Test the @@tmp_table_size system variable. + tk := testkit.NewTestKit(c, s.store) + tk.MustExec("use test") + tk.MustExec("set tidb_enable_global_temporary_table=on") + tk.MustExec("create global temporary table t (c1 int, c2 varchar(512)) on commit delete rows") + + tk.MustQuery("select @@global.tmp_table_size").Check(testkit.Rows(strconv.Itoa(variable.DefTMPTableSize))) + c.Assert(tk.Se.GetSessionVars().TMPTableSize, Equals, int64(variable.DefTMPTableSize)) + + // Min value 1024, so the result is change to 1024, with a warning. + tk.MustExec("set @@global.tmp_table_size = 123") + tk.MustQuery("show warnings").Check(testkit.Rows("Warning 1292 Truncated incorrect tmp_table_size value: '123'")) + + // Change the session scope value. + tk.MustExec("set @@session.tmp_table_size = 2097152") + c.Assert(tk.Se.GetSessionVars().TMPTableSize, Equals, int64(2097152)) + + // Check in another sessin, change session scope value does not affect the global scope. + tk1 := testkit.NewTestKit(c, s.store) + tk1.MustQuery("select @@global.tmp_table_size").Check(testkit.Rows("1024")) + + // The value is now 1024, check the error when table size exceed it. + tk.MustExec("set @@session.tmp_table_size = 1024") + tk.MustExec("begin") + tk.MustExec("insert into t values (1, repeat('x', 512))") + tk.MustExec("insert into t values (1, repeat('x', 512))") + tk.MustGetErrCode("insert into t values (1, repeat('x', 512))", errno.ErrRecordFileFull) +} + func (s *testSessionSuite) TestTiDBEnableGlobalTemporaryTable(c *C) { // Test the @@tidb_enable_global_temporary_table system variable. tk := testkit.NewTestKit(c, s.store) diff --git a/sessionctx/variable/noop.go b/sessionctx/variable/noop.go index 93b3c1c9ce084..fe3af2d33605d 100644 --- a/sessionctx/variable/noop.go +++ b/sessionctx/variable/noop.go @@ -83,7 +83,6 @@ var noopSysVars = []*SysVar{ {Scope: ScopeNone, Name: "performance_schema_max_statement_classes", Value: "168"}, {Scope: ScopeGlobal, Name: "server_id", Value: "0"}, {Scope: ScopeGlobal, Name: "innodb_flushing_avg_loops", Value: "30"}, - {Scope: ScopeGlobal | ScopeSession, Name: TmpTableSize, Value: "16777216", Type: TypeUnsigned, MinValue: 1024, MaxValue: math.MaxUint64, AutoConvertOutOfRange: true, IsHintUpdatable: true}, {Scope: ScopeGlobal, Name: "innodb_max_purge_lag", Value: "0"}, {Scope: ScopeGlobal | ScopeSession, Name: "preload_buffer_size", Value: "32768"}, {Scope: ScopeGlobal, Name: CheckProxyUsers, Value: Off, Type: TypeBool}, diff --git a/sessionctx/variable/session.go b/sessionctx/variable/session.go index 813841c6bf051..069297eb837fc 100644 --- a/sessionctx/variable/session.go +++ b/sessionctx/variable/session.go @@ -844,6 +844,12 @@ type SessionVars struct { // see https://dev.mysql.com/doc/refman/8.0/en/server-system-variables.html#sysvar_cte_max_recursion_depth CTEMaxRecursionDepth int + // The temporary table size threshold + // In MySQL, when a temporary table exceed this size, it spills to disk. + // In TiDB, as we do not support spill to disk for now, an error is reported. + // See https://dev.mysql.com/doc/refman/8.0/en/server-system-variables.html#sysvar_tmp_table_size + TMPTableSize int64 + // EnableGlobalTemporaryTable indicates whether to enable global temporary table EnableGlobalTemporaryTable bool } @@ -1058,6 +1064,7 @@ func NewSessionVars() *SessionVars { EnableIndexMergeJoin: DefTiDBEnableIndexMergeJoin, AllowFallbackToTiKV: make(map[kv.StoreType]struct{}), CTEMaxRecursionDepth: DefCTEMaxRecursionDepth, + TMPTableSize: DefTMPTableSize, EnableGlobalTemporaryTable: DefTiDBEnableGlobalTemporaryTable, } vars.KVVars = tikvstore.NewVariables(&vars.Killed) diff --git a/sessionctx/variable/sysvar.go b/sessionctx/variable/sysvar.go index e9abf05d1633f..6db3bba0fc146 100644 --- a/sessionctx/variable/sysvar.go +++ b/sessionctx/variable/sysvar.go @@ -1692,8 +1692,14 @@ var defaultSysVars = []*SysVar{ {Scope: ScopeGlobal, Name: TiDBGCRunInterval, Value: "10m0s", Type: TypeDuration, MinValue: int64(time.Minute * 10), MaxValue: math.MaxInt64}, {Scope: ScopeGlobal, Name: TiDBGCLifetime, Value: "10m0s", Type: TypeDuration, MinValue: int64(time.Minute * 10), MaxValue: math.MaxInt64}, {Scope: ScopeGlobal, Name: TiDBGCConcurrency, Value: "-1", Type: TypeInt, MinValue: 1, MaxValue: 128, AllowAutoValue: true}, + {Scope: ScopeGlobal, Name: TiDBGCScanLockMode, Value: "PHYSICAL", Type: TypeEnum, PossibleValues: []string{"PHYSICAL", "LEGACY"}}, {Scope: ScopeGlobal, Name: TiDBGCScanLockMode, Value: "LEGACY", Type: TypeEnum, PossibleValues: []string{"PHYSICAL", "LEGACY"}}, + // See https://dev.mysql.com/doc/refman/8.0/en/server-system-variables.html#sysvar_tmp_table_size + {Scope: ScopeGlobal | ScopeSession, Name: TMPTableSize, Value: strconv.Itoa(DefTMPTableSize), Type: TypeUnsigned, MinValue: 1024, MaxValue: math.MaxInt64, AutoConvertOutOfRange: true, IsHintUpdatable: true, AllowEmpty: true, SetSession: func(s *SessionVars, val string) error { + s.TMPTableSize = tidbOptInt64(val, DefTMPTableSize) + return nil + }}, // variable for top SQL feature. {Scope: ScopeGlobal, Name: TiDBEnableTopSQL, Value: BoolToOnOff(DefTiDBTopSQLEnable), Type: TypeBool, Hidden: true, AllowEmpty: true, GetSession: func(s *SessionVars) (string, error) { return BoolToOnOff(TopSQLVariable.Enable.Load()), nil @@ -1859,8 +1865,8 @@ const ( MaxConnectErrors = "max_connect_errors" // TableDefinitionCache is the name for 'table_definition_cache' system variable. TableDefinitionCache = "table_definition_cache" - // TmpTableSize is the name for 'tmp_table_size' system variable. - TmpTableSize = "tmp_table_size" + // TMPTableSize is the name for 'tmp_table_size' system variable. + TMPTableSize = "tmp_table_size" // Timestamp is the name for 'timestamp' system variable. Timestamp = "timestamp" // ConnectTimeout is the name for 'connect_timeout' system variable. diff --git a/sessionctx/variable/tidb_vars.go b/sessionctx/variable/tidb_vars.go index 62be28a7d31ef..7d626686728db 100644 --- a/sessionctx/variable/tidb_vars.go +++ b/sessionctx/variable/tidb_vars.go @@ -720,6 +720,7 @@ const ( DefTiDBTopSQLReportIntervalSeconds = 60 DefTiDBEnableGlobalTemporaryTable = false DefTiDBEnableLocalTxn = false + DefTMPTableSize = 16777216 ) // Process global variables. diff --git a/sessionctx/variable/varsutil_test.go b/sessionctx/variable/varsutil_test.go index 78a5b5c492378..41f379a9d6353 100644 --- a/sessionctx/variable/varsutil_test.go +++ b/sessionctx/variable/varsutil_test.go @@ -104,6 +104,7 @@ func (s *testVarsutilSuite) TestNewSessionVars(c *C) { c.Assert(vars.ShardAllocateStep, Equals, int64(DefTiDBShardAllocateStep)) c.Assert(vars.AnalyzeVersion, Equals, DefTiDBAnalyzeVersion) c.Assert(vars.CTEMaxRecursionDepth, Equals, DefCTEMaxRecursionDepth) + c.Assert(vars.TMPTableSize, Equals, int64(DefTMPTableSize)) assertFieldsGreaterThanZero(c, reflect.ValueOf(vars.MemQuota)) assertFieldsGreaterThanZero(c, reflect.ValueOf(vars.BatchSize)) diff --git a/store/driver/txn/txn_driver.go b/store/driver/txn/txn_driver.go index 886b0df39900a..93a4f4a9508df 100644 --- a/store/driver/txn/txn_driver.go +++ b/store/driver/txn/txn_driver.go @@ -176,6 +176,8 @@ func (txn *tikvTxn) SetOption(opt int, val interface{}) { txn.KVTxn.GetSnapshot().SetMatchStoreLabels(val.([]*metapb.StoreLabel)) case kv.ResourceGroupTag: txn.KVTxn.SetResourceGroupTag(val.([]byte)) + case kv.KVFilter: + txn.KVTxn.SetKVFilter(val.(tikv.KVFilter)) } } diff --git a/table/table.go b/table/table.go index b39304adcaaa6..f76210a9bdd42 100644 --- a/table/table.go +++ b/table/table.go @@ -97,6 +97,8 @@ var ( ErrSequenceHasRunOut = dbterror.ClassTable.NewStd(mysql.ErrSequenceRunOut) // ErrRowDoesNotMatchGivenPartitionSet returns when the destination partition conflict with the partition selection. ErrRowDoesNotMatchGivenPartitionSet = dbterror.ClassTable.NewStd(mysql.ErrRowDoesNotMatchGivenPartitionSet) + // ErrTempTableFull returns a table is full error, it's used by temporary table now. + ErrTempTableFull = dbterror.ClassTable.NewStd(mysql.ErrRecordFileFull) ) // RecordIterFunc is used for low-level record iteration. diff --git a/table/tables/tables.go b/table/tables/tables.go index 898d5668063c7..326bdd4701213 100644 --- a/table/tables/tables.go +++ b/table/tables/tables.go @@ -324,7 +324,12 @@ func (t *TableCommon) UpdateRecord(ctx context.Context, sctx sessionctx.Context, defer memBuffer.Cleanup(sh) if m := t.Meta(); m.TempTableType == model.TempTableGlobal { - addTemporaryTable(sctx, m) + if tmpTable := addTemporaryTable(sctx, m); tmpTable != nil { + if tmpTable.GetSize() > sctx.GetSessionVars().TMPTableSize { + return table.ErrTempTableFull.GenWithStackByArgs(m.Name.O) + } + defer handleTempTableSize(tmpTable, txn.Size(), txn) + } } var colIDs, binlogColIDs []int64 @@ -590,9 +595,20 @@ func TryGetCommonPkColumns(tbl table.Table) []*table.Column { return pkCols } -func addTemporaryTable(sctx sessionctx.Context, tblInfo *model.TableInfo) { +func addTemporaryTable(sctx sessionctx.Context, tblInfo *model.TableInfo) tableutil.TempTable { tempTable := sctx.GetSessionVars().GetTemporaryTable(tblInfo) tempTable.SetModified(true) + return tempTable +} + +// The size of a temporary table is calculated by accumulating the transaction size delta. +func handleTempTableSize(t tableutil.TempTable, txnSizeBefore int, txn kv.Transaction) { + txnSizeNow := txn.Size() + delta := txnSizeNow - txnSizeBefore + + oldSize := t.GetSize() + newSize := oldSize + int64(delta) + t.SetSize(newSize) } // AddRecord implements table.Table AddRecord interface. @@ -608,7 +624,12 @@ func (t *TableCommon) AddRecord(sctx sessionctx.Context, r []types.Datum, opts . } if m := t.Meta(); m.TempTableType == model.TempTableGlobal { - addTemporaryTable(sctx, m) + if tmpTable := addTemporaryTable(sctx, m); tmpTable != nil { + if tmpTable.GetSize() > sctx.GetSessionVars().TMPTableSize { + return nil, table.ErrTempTableFull.GenWithStackByArgs(m.Name.O) + } + defer handleTempTableSize(tmpTable, txn.Size(), txn) + } } var ctx context.Context @@ -1009,8 +1030,17 @@ func (t *TableCommon) RemoveRecord(ctx sessionctx.Context, h kv.Handle, r []type return err } + txn, err := ctx.Txn(true) + if err != nil { + return err + } if m := t.Meta(); m.TempTableType == model.TempTableGlobal { - addTemporaryTable(ctx, m) + if tmpTable := addTemporaryTable(ctx, m); tmpTable != nil { + if tmpTable.GetSize() > ctx.GetSessionVars().TMPTableSize { + return table.ErrTempTableFull.GenWithStackByArgs(m.Name.O) + } + defer handleTempTableSize(tmpTable, txn.Size(), txn) + } } // The table has non-public column and this column is doing the operation of "modify/change column". @@ -1795,6 +1825,8 @@ type TemporaryTable struct { stats *statistics.Table // The autoID allocator of this table. autoIDAllocator autoid.Allocator + // Table size. + size int64 } // TempTableFromMeta builds a TempTable from model.TableInfo. @@ -1825,3 +1857,13 @@ func (t *TemporaryTable) GetModified() bool { func (t *TemporaryTable) GetStats() interface{} { return t.stats } + +// GetSize gets the table size. +func (t *TemporaryTable) GetSize() int64 { + return t.size +} + +// SetSize sets the table size. +func (t *TemporaryTable) SetSize(v int64) { + t.size = v +} diff --git a/util/tableutil/tableutil.go b/util/tableutil/tableutil.go index 11cbe626dcc56..bf5d7caac2732 100644 --- a/util/tableutil/tableutil.go +++ b/util/tableutil/tableutil.go @@ -33,6 +33,9 @@ type TempTable interface { // The stats of this table (*statistics.Table). // Define the return type as interface{} here to avoid cycle imports. GetStats() interface{} + + GetSize() int64 + SetSize(int64) } // TempTableFromMeta builds a TempTable from *model.TableInfo.