From af6cece281c5d396f0cbca9924e1b89b3751528a Mon Sep 17 00:00:00 2001 From: tiancaiamao Date: Thu, 29 Apr 2021 21:49:58 +0800 Subject: [PATCH] *: support read and write operations for the global temporary table (#24196) --- ddl/column_type_change_test.go | 2 +- ddl/ddl_api.go | 8 ++++++++ executor/seqtest/seq_executor_test.go | 2 +- go.mod | 2 +- go.sum | 5 +++-- planner/core/preprocess.go | 4 ++-- session/session.go | 22 ++++++++++++++++++++++ session/session_test.go | 21 +++++++++++++++++++++ sessionctx/variable/session.go | 2 ++ table/tables/tables.go | 21 +++++++++++++++++++++ 10 files changed, 82 insertions(+), 7 deletions(-) diff --git a/ddl/column_type_change_test.go b/ddl/column_type_change_test.go index 402704401eff6..68ee059f47305 100644 --- a/ddl/column_type_change_test.go +++ b/ddl/column_type_change_test.go @@ -233,7 +233,7 @@ func (s *testColumnTypeChangeSuite) TestRollbackColumnTypeChangeBetweenInteger(c SQL := "alter table t modify column c2 int not null" _, err := tk.Exec(SQL) c.Assert(err, NotNil) - c.Assert(err.Error(), Equals, "[ddl:1]MockRollingBackInCallBack-none") + c.Assert(err.Error(), Equals, "[ddl:1]MockRollingBackInCallBack-queueing") assertRollBackedColUnchanged(c, tk) // Mock roll back at model.StateDeleteOnly. diff --git a/ddl/ddl_api.go b/ddl/ddl_api.go index 6520bc77dbba9..06e43e905d616 100644 --- a/ddl/ddl_api.go +++ b/ddl/ddl_api.go @@ -1751,6 +1751,14 @@ func buildTableInfoWithStmt(ctx sessionctx.Context, s *ast.CreateTableStmt, dbCh if err != nil { return nil, errors.Trace(err) } + switch s.TemporaryKeyword { + case ast.TemporaryGlobal: + tbInfo.TempTableType = model.TempTableGlobal + case ast.TemporaryLocal: + tbInfo.TempTableType = model.TempTableLocal + case ast.TemporaryNone: + tbInfo.TempTableType = model.TempTableNone + } if err = setTableAutoRandomBits(ctx, tbInfo, colDefs); err != nil { return nil, errors.Trace(err) diff --git a/executor/seqtest/seq_executor_test.go b/executor/seqtest/seq_executor_test.go index 7b38a6f6b673e..061e09dcc1315 100644 --- a/executor/seqtest/seq_executor_test.go +++ b/executor/seqtest/seq_executor_test.go @@ -967,7 +967,7 @@ func (s *seqTestSuite) TestBatchInsertDelete(c *C) { atomic.StoreUint64(&kv.TxnTotalSizeLimit, originLimit) }() // Set the limitation to a small value, make it easier to reach the limitation. - atomic.StoreUint64(&kv.TxnTotalSizeLimit, 5000) + atomic.StoreUint64(&kv.TxnTotalSizeLimit, 5500) tk := testkit.NewTestKit(c, s.store) tk.MustExec("use test") diff --git a/go.mod b/go.mod index 3fc648111c65d..4ad29e29e6273 100644 --- a/go.mod +++ b/go.mod @@ -47,7 +47,7 @@ require ( github.com/pingcap/goleveldb v0.0.0-20191226122134-f82aafb29989 github.com/pingcap/kvproto v0.0.0-20210402093459-65aa336ccbbf github.com/pingcap/log v0.0.0-20210317133921-96f4fcab92a4 - github.com/pingcap/parser v0.0.0-20210421190254-588138d35e55 + github.com/pingcap/parser v0.0.0-20210427084954-8e8ed7927bde github.com/pingcap/sysutil v0.0.0-20210315073920-cc0985d983a3 github.com/pingcap/tidb-tools v4.0.9-0.20201127090955-2707c97b3853+incompatible github.com/pingcap/tipb v0.0.0-20210422074242-57dd881b81b1 diff --git a/go.sum b/go.sum index a4450303c0621..f3396ccd03898 100644 --- a/go.sum +++ b/go.sum @@ -443,8 +443,8 @@ github.com/pingcap/log v0.0.0-20200511115504-543df19646ad/go.mod h1:4rbK1p9ILyIf github.com/pingcap/log v0.0.0-20201112100606-8f1e84a3abc8/go.mod h1:4rbK1p9ILyIfb6hU7OG2CiWSqMXnp3JMbiaVJ6mvoY8= github.com/pingcap/log v0.0.0-20210317133921-96f4fcab92a4 h1:ERrF0fTuIOnwfGbt71Ji3DKbOEaP189tjym50u8gpC8= github.com/pingcap/log v0.0.0-20210317133921-96f4fcab92a4/go.mod h1:4rbK1p9ILyIfb6hU7OG2CiWSqMXnp3JMbiaVJ6mvoY8= -github.com/pingcap/parser v0.0.0-20210421190254-588138d35e55 h1:J/NfwCFFPCv7h44ft+2pS3KiMyvOkHprPM5NhDJEoHc= -github.com/pingcap/parser v0.0.0-20210421190254-588138d35e55/go.mod h1:xZC8I7bug4GJ5KtHhgAikjTfU4kBv1Sbo3Pf1MZ6lVw= +github.com/pingcap/parser v0.0.0-20210427084954-8e8ed7927bde h1:CcGOCE3kr8aYBy6rRcWWldidL1X5smQxV79nlnzOk+o= +github.com/pingcap/parser v0.0.0-20210427084954-8e8ed7927bde/go.mod h1:xZC8I7bug4GJ5KtHhgAikjTfU4kBv1Sbo3Pf1MZ6lVw= github.com/pingcap/sysutil v0.0.0-20200206130906-2bfa6dc40bcd/go.mod h1:EB/852NMQ+aRKioCpToQ94Wl7fktV+FNnxf3CX/TTXI= github.com/pingcap/sysutil v0.0.0-20210315073920-cc0985d983a3 h1:A9KL9R+lWSVPH8IqUuH1QSTRJ5FGoY1bT2IcfPKsWD8= github.com/pingcap/sysutil v0.0.0-20210315073920-cc0985d983a3/go.mod h1:tckvA041UWP+NqYzrJ3fMgC/Hw9wnmQ/tUkp/JaHly8= @@ -500,6 +500,7 @@ github.com/russross/blackfriday/v2 v2.0.1/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQD github.com/sasha-s/go-deadlock v0.2.0/go.mod h1:StQn567HiB1fF2yJ44N9au7wOhrPS3iZqiDbRupzT10= github.com/satori/go.uuid v1.2.0/go.mod h1:dA0hQrYB0VpLJoorglMZABFdXlWrHn1NEOzdhQKdks0= github.com/sergi/go-diff v1.0.1-0.20180205163309-da645544ed44/go.mod h1:0CfEIISq7TuYL3j771MWULgwwjU+GofnZX9QAmXWZgo= +github.com/sergi/go-diff v1.1.0 h1:we8PVUC3FE2uYfodKH/nBHMSetSfHDR6scGdBi+erh0= github.com/sergi/go-diff v1.1.0/go.mod h1:STckp+ISIX8hZLjrqAeVduY0gWCT9IjLuqbuNXdaHfM= github.com/shirou/gopsutil v2.19.10+incompatible/go.mod h1:5b4v6he4MtMOwMlS0TUMTu2PcXUg8+E1lC7eC3UO/RA= github.com/shirou/gopsutil v3.21.2+incompatible h1:U+YvJfjCh6MslYlIAXvPtzhW3YZEtc9uncueUNpD/0A= diff --git a/planner/core/preprocess.go b/planner/core/preprocess.go index d84fede3e5b00..a3719fe4c4b0b 100644 --- a/planner/core/preprocess.go +++ b/planner/core/preprocess.go @@ -564,7 +564,7 @@ func (p *preprocessor) checkCreateTableGrammar(stmt *ast.CreateTableStmt) { return } enableNoopFuncs := p.ctx.GetSessionVars().EnableNoopFuncs - if stmt.IsTemporary && !enableNoopFuncs { + if stmt.TemporaryKeyword == ast.TemporaryLocal && !enableNoopFuncs { p.err = expression.ErrFunctionsNoopImpl.GenWithStackByArgs("CREATE TEMPORARY TABLE") return } @@ -676,7 +676,7 @@ func (p *preprocessor) checkDropSequenceGrammar(stmt *ast.DropSequenceStmt) { func (p *preprocessor) checkDropTableGrammar(stmt *ast.DropTableStmt) { p.checkDropTableNames(stmt.Tables) enableNoopFuncs := p.ctx.GetSessionVars().EnableNoopFuncs - if stmt.IsTemporary && !enableNoopFuncs { + if stmt.TemporaryKeyword == ast.TemporaryLocal && !enableNoopFuncs { p.err = expression.ErrFunctionsNoopImpl.GenWithStackByArgs("DROP TEMPORARY TABLE") return } diff --git a/session/session.go b/session/session.go index 85aabd16a12e8..67354cd15e494 100644 --- a/session/session.go +++ b/session/session.go @@ -68,6 +68,7 @@ import ( tikvstore "github.com/pingcap/tidb/store/tikv/kv" "github.com/pingcap/tidb/store/tikv/oracle" tikvutil "github.com/pingcap/tidb/store/tikv/util" + "github.com/pingcap/tidb/tablecodec" "github.com/pingcap/tidb/telemetry" "github.com/pingcap/tidb/types" "github.com/pingcap/tidb/util" @@ -509,6 +510,27 @@ func (s *session) doCommit(ctx context.Context) error { s.GetSessionVars().TxnCtx.IsExplicit && s.GetSessionVars().GuaranteeLinearizability) } + // Filter out the temporary table key-values. + if tables := s.sessionVars.TxnCtx.GlobalTemporaryTables; tables != nil { + memBuffer := s.txn.GetMemBuffer() + for tid := range tables { + seekKey := tablecodec.EncodeTablePrefix(tid) + endKey := tablecodec.EncodeTablePrefix(tid + 1) + iter, err := memBuffer.Iter(seekKey, endKey) + if err != nil { + return err + } + for iter.Valid() && iter.Key().HasPrefix(seekKey) { + if err = memBuffer.Delete(iter.Key()); err != nil { + return errors.Trace(err) + } + if err = iter.Next(); err != nil { + return errors.Trace(err) + } + } + } + } + return s.txn.Commit(tikvutil.SetSessionID(ctx, s.GetSessionVars().ConnectionID)) } diff --git a/session/session_test.go b/session/session_test.go index 91dd04abded29..a3a283eb09848 100644 --- a/session/session_test.go +++ b/session/session_test.go @@ -4243,3 +4243,24 @@ func (s *testSessionSerialSuite) TestParseWithParams(c *C) { c.Assert(err, IsNil) c.Assert(sb.String(), Equals, "SELECT 3") } + +func (s *testSessionSuite3) TestGlobalTemporaryTable(c *C) { + tk := testkit.NewTestKitWithInit(c, s.store) + tk.MustExec("create global temporary table g_tmp (a int primary key, b int, c int, index i_b(b)) on commit delete rows") + tk.MustExec("begin") + tk.MustExec("insert into g_tmp values (3, 3, 3)") + tk.MustExec("insert into g_tmp values (4, 7, 9)") + + // Cover table scan. + tk.MustQuery("select * from g_tmp").Check(testkit.Rows("3 3 3", "4 7 9")) + // Cover index reader. + tk.MustQuery("select b from g_tmp where b > 3").Check(testkit.Rows("7")) + // Cover index lookup. + tk.MustQuery("select c from g_tmp where b = 3").Check(testkit.Rows("3")) + // Cover point get. + tk.MustQuery("select * from g_tmp where a = 3").Check(testkit.Rows("3 3 3")) + tk.MustExec("commit") + + // The global temporary table data is discard after the transaction commit. + tk.MustQuery("select * from g_tmp").Check(testkit.Rows()) +} diff --git a/sessionctx/variable/session.go b/sessionctx/variable/session.go index 536f446b16d68..a154d3d451b06 100644 --- a/sessionctx/variable/session.go +++ b/sessionctx/variable/session.go @@ -172,6 +172,8 @@ type TransactionContext struct { // TableDeltaMap lock to prevent potential data race tdmLock sync.Mutex + + GlobalTemporaryTables map[int64]struct{} } // GetShard returns the shard prefix for the next `count` rowids. diff --git a/table/tables/tables.go b/table/tables/tables.go index e725130834583..d1ae7d804fb58 100644 --- a/table/tables/tables.go +++ b/table/tables/tables.go @@ -322,6 +322,10 @@ func (t *TableCommon) UpdateRecord(ctx context.Context, sctx sessionctx.Context, sh := memBuffer.Staging() defer memBuffer.Cleanup(sh) + if meta := t.Meta(); meta.TempTableType == model.TempTableGlobal { + addTemporaryTableID(sctx, meta.ID) + } + var colIDs, binlogColIDs []int64 var row, binlogOldRow, binlogNewRow []types.Datum numColsCap := len(newData) + 1 // +1 for the extra handle column that we may need to append. @@ -584,6 +588,14 @@ func TryGetCommonPkColumns(tbl table.Table) []*table.Column { return pkCols } +func addTemporaryTableID(sctx sessionctx.Context, id int64) { + txnCtx := sctx.GetSessionVars().TxnCtx + if txnCtx.GlobalTemporaryTables == nil { + txnCtx.GlobalTemporaryTables = make(map[int64]struct{}) + } + txnCtx.GlobalTemporaryTables[id] = struct{}{} +} + // AddRecord implements table.Table AddRecord interface. func (t *TableCommon) AddRecord(sctx sessionctx.Context, r []types.Datum, opts ...table.AddRecordOption) (recordID kv.Handle, err error) { txn, err := sctx.Txn(true) @@ -596,6 +608,10 @@ func (t *TableCommon) AddRecord(sctx sessionctx.Context, r []types.Datum, opts . fn.ApplyOn(&opt) } + if meta := t.Meta(); meta.TempTableType == model.TempTableGlobal { + addTemporaryTableID(sctx, meta.ID) + } + var ctx context.Context if opt.Ctx != nil { ctx = opt.Ctx @@ -992,6 +1008,11 @@ func (t *TableCommon) RemoveRecord(ctx sessionctx.Context, h kv.Handle, r []type if err != nil { return err } + + if meta := t.Meta(); meta.TempTableType == model.TempTableGlobal { + addTemporaryTableID(ctx, meta.ID) + } + // The table has non-public column and this column is doing the operation of "modify/change column". if len(t.Columns) > len(r) && t.Columns[len(r)].ChangeStateInfo != nil { r = append(r, r[t.Columns[len(r)].ChangeStateInfo.DependencyColumnOffset])