Skip to content

Commit

Permalink
*: support read and write operations for the global temporary table (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
tiancaiamao authored Apr 29, 2021
1 parent 6b0b74e commit af6cece
Show file tree
Hide file tree
Showing 10 changed files with 82 additions and 7 deletions.
2 changes: 1 addition & 1 deletion ddl/column_type_change_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,7 @@ func (s *testColumnTypeChangeSuite) TestRollbackColumnTypeChangeBetweenInteger(c
SQL := "alter table t modify column c2 int not null"
_, err := tk.Exec(SQL)
c.Assert(err, NotNil)
c.Assert(err.Error(), Equals, "[ddl:1]MockRollingBackInCallBack-none")
c.Assert(err.Error(), Equals, "[ddl:1]MockRollingBackInCallBack-queueing")
assertRollBackedColUnchanged(c, tk)

// Mock roll back at model.StateDeleteOnly.
Expand Down
8 changes: 8 additions & 0 deletions ddl/ddl_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -1751,6 +1751,14 @@ func buildTableInfoWithStmt(ctx sessionctx.Context, s *ast.CreateTableStmt, dbCh
if err != nil {
return nil, errors.Trace(err)
}
switch s.TemporaryKeyword {
case ast.TemporaryGlobal:
tbInfo.TempTableType = model.TempTableGlobal
case ast.TemporaryLocal:
tbInfo.TempTableType = model.TempTableLocal
case ast.TemporaryNone:
tbInfo.TempTableType = model.TempTableNone
}

if err = setTableAutoRandomBits(ctx, tbInfo, colDefs); err != nil {
return nil, errors.Trace(err)
Expand Down
2 changes: 1 addition & 1 deletion executor/seqtest/seq_executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -967,7 +967,7 @@ func (s *seqTestSuite) TestBatchInsertDelete(c *C) {
atomic.StoreUint64(&kv.TxnTotalSizeLimit, originLimit)
}()
// Set the limitation to a small value, make it easier to reach the limitation.
atomic.StoreUint64(&kv.TxnTotalSizeLimit, 5000)
atomic.StoreUint64(&kv.TxnTotalSizeLimit, 5500)

tk := testkit.NewTestKit(c, s.store)
tk.MustExec("use test")
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ require (
github.com/pingcap/goleveldb v0.0.0-20191226122134-f82aafb29989
github.com/pingcap/kvproto v0.0.0-20210402093459-65aa336ccbbf
github.com/pingcap/log v0.0.0-20210317133921-96f4fcab92a4
github.com/pingcap/parser v0.0.0-20210421190254-588138d35e55
github.com/pingcap/parser v0.0.0-20210427084954-8e8ed7927bde
github.com/pingcap/sysutil v0.0.0-20210315073920-cc0985d983a3
github.com/pingcap/tidb-tools v4.0.9-0.20201127090955-2707c97b3853+incompatible
github.com/pingcap/tipb v0.0.0-20210422074242-57dd881b81b1
Expand Down
5 changes: 3 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -443,8 +443,8 @@ github.com/pingcap/log v0.0.0-20200511115504-543df19646ad/go.mod h1:4rbK1p9ILyIf
github.com/pingcap/log v0.0.0-20201112100606-8f1e84a3abc8/go.mod h1:4rbK1p9ILyIfb6hU7OG2CiWSqMXnp3JMbiaVJ6mvoY8=
github.com/pingcap/log v0.0.0-20210317133921-96f4fcab92a4 h1:ERrF0fTuIOnwfGbt71Ji3DKbOEaP189tjym50u8gpC8=
github.com/pingcap/log v0.0.0-20210317133921-96f4fcab92a4/go.mod h1:4rbK1p9ILyIfb6hU7OG2CiWSqMXnp3JMbiaVJ6mvoY8=
github.com/pingcap/parser v0.0.0-20210421190254-588138d35e55 h1:J/NfwCFFPCv7h44ft+2pS3KiMyvOkHprPM5NhDJEoHc=
github.com/pingcap/parser v0.0.0-20210421190254-588138d35e55/go.mod h1:xZC8I7bug4GJ5KtHhgAikjTfU4kBv1Sbo3Pf1MZ6lVw=
github.com/pingcap/parser v0.0.0-20210427084954-8e8ed7927bde h1:CcGOCE3kr8aYBy6rRcWWldidL1X5smQxV79nlnzOk+o=
github.com/pingcap/parser v0.0.0-20210427084954-8e8ed7927bde/go.mod h1:xZC8I7bug4GJ5KtHhgAikjTfU4kBv1Sbo3Pf1MZ6lVw=
github.com/pingcap/sysutil v0.0.0-20200206130906-2bfa6dc40bcd/go.mod h1:EB/852NMQ+aRKioCpToQ94Wl7fktV+FNnxf3CX/TTXI=
github.com/pingcap/sysutil v0.0.0-20210315073920-cc0985d983a3 h1:A9KL9R+lWSVPH8IqUuH1QSTRJ5FGoY1bT2IcfPKsWD8=
github.com/pingcap/sysutil v0.0.0-20210315073920-cc0985d983a3/go.mod h1:tckvA041UWP+NqYzrJ3fMgC/Hw9wnmQ/tUkp/JaHly8=
Expand Down Expand Up @@ -500,6 +500,7 @@ github.com/russross/blackfriday/v2 v2.0.1/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQD
github.com/sasha-s/go-deadlock v0.2.0/go.mod h1:StQn567HiB1fF2yJ44N9au7wOhrPS3iZqiDbRupzT10=
github.com/satori/go.uuid v1.2.0/go.mod h1:dA0hQrYB0VpLJoorglMZABFdXlWrHn1NEOzdhQKdks0=
github.com/sergi/go-diff v1.0.1-0.20180205163309-da645544ed44/go.mod h1:0CfEIISq7TuYL3j771MWULgwwjU+GofnZX9QAmXWZgo=
github.com/sergi/go-diff v1.1.0 h1:we8PVUC3FE2uYfodKH/nBHMSetSfHDR6scGdBi+erh0=
github.com/sergi/go-diff v1.1.0/go.mod h1:STckp+ISIX8hZLjrqAeVduY0gWCT9IjLuqbuNXdaHfM=
github.com/shirou/gopsutil v2.19.10+incompatible/go.mod h1:5b4v6he4MtMOwMlS0TUMTu2PcXUg8+E1lC7eC3UO/RA=
github.com/shirou/gopsutil v3.21.2+incompatible h1:U+YvJfjCh6MslYlIAXvPtzhW3YZEtc9uncueUNpD/0A=
Expand Down
4 changes: 2 additions & 2 deletions planner/core/preprocess.go
Original file line number Diff line number Diff line change
Expand Up @@ -564,7 +564,7 @@ func (p *preprocessor) checkCreateTableGrammar(stmt *ast.CreateTableStmt) {
return
}
enableNoopFuncs := p.ctx.GetSessionVars().EnableNoopFuncs
if stmt.IsTemporary && !enableNoopFuncs {
if stmt.TemporaryKeyword == ast.TemporaryLocal && !enableNoopFuncs {
p.err = expression.ErrFunctionsNoopImpl.GenWithStackByArgs("CREATE TEMPORARY TABLE")
return
}
Expand Down Expand Up @@ -676,7 +676,7 @@ func (p *preprocessor) checkDropSequenceGrammar(stmt *ast.DropSequenceStmt) {
func (p *preprocessor) checkDropTableGrammar(stmt *ast.DropTableStmt) {
p.checkDropTableNames(stmt.Tables)
enableNoopFuncs := p.ctx.GetSessionVars().EnableNoopFuncs
if stmt.IsTemporary && !enableNoopFuncs {
if stmt.TemporaryKeyword == ast.TemporaryLocal && !enableNoopFuncs {
p.err = expression.ErrFunctionsNoopImpl.GenWithStackByArgs("DROP TEMPORARY TABLE")
return
}
Expand Down
22 changes: 22 additions & 0 deletions session/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ import (
tikvstore "github.com/pingcap/tidb/store/tikv/kv"
"github.com/pingcap/tidb/store/tikv/oracle"
tikvutil "github.com/pingcap/tidb/store/tikv/util"
"github.com/pingcap/tidb/tablecodec"
"github.com/pingcap/tidb/telemetry"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util"
Expand Down Expand Up @@ -509,6 +510,27 @@ func (s *session) doCommit(ctx context.Context) error {
s.GetSessionVars().TxnCtx.IsExplicit && s.GetSessionVars().GuaranteeLinearizability)
}

// Filter out the temporary table key-values.
if tables := s.sessionVars.TxnCtx.GlobalTemporaryTables; tables != nil {
memBuffer := s.txn.GetMemBuffer()
for tid := range tables {
seekKey := tablecodec.EncodeTablePrefix(tid)
endKey := tablecodec.EncodeTablePrefix(tid + 1)
iter, err := memBuffer.Iter(seekKey, endKey)
if err != nil {
return err
}
for iter.Valid() && iter.Key().HasPrefix(seekKey) {
if err = memBuffer.Delete(iter.Key()); err != nil {
return errors.Trace(err)
}
if err = iter.Next(); err != nil {
return errors.Trace(err)
}
}
}
}

return s.txn.Commit(tikvutil.SetSessionID(ctx, s.GetSessionVars().ConnectionID))
}

Expand Down
21 changes: 21 additions & 0 deletions session/session_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4243,3 +4243,24 @@ func (s *testSessionSerialSuite) TestParseWithParams(c *C) {
c.Assert(err, IsNil)
c.Assert(sb.String(), Equals, "SELECT 3")
}

func (s *testSessionSuite3) TestGlobalTemporaryTable(c *C) {
tk := testkit.NewTestKitWithInit(c, s.store)
tk.MustExec("create global temporary table g_tmp (a int primary key, b int, c int, index i_b(b)) on commit delete rows")
tk.MustExec("begin")
tk.MustExec("insert into g_tmp values (3, 3, 3)")
tk.MustExec("insert into g_tmp values (4, 7, 9)")

// Cover table scan.
tk.MustQuery("select * from g_tmp").Check(testkit.Rows("3 3 3", "4 7 9"))
// Cover index reader.
tk.MustQuery("select b from g_tmp where b > 3").Check(testkit.Rows("7"))
// Cover index lookup.
tk.MustQuery("select c from g_tmp where b = 3").Check(testkit.Rows("3"))
// Cover point get.
tk.MustQuery("select * from g_tmp where a = 3").Check(testkit.Rows("3 3 3"))
tk.MustExec("commit")

// The global temporary table data is discard after the transaction commit.
tk.MustQuery("select * from g_tmp").Check(testkit.Rows())
}
2 changes: 2 additions & 0 deletions sessionctx/variable/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,8 @@ type TransactionContext struct {

// TableDeltaMap lock to prevent potential data race
tdmLock sync.Mutex

GlobalTemporaryTables map[int64]struct{}
}

// GetShard returns the shard prefix for the next `count` rowids.
Expand Down
21 changes: 21 additions & 0 deletions table/tables/tables.go
Original file line number Diff line number Diff line change
Expand Up @@ -322,6 +322,10 @@ func (t *TableCommon) UpdateRecord(ctx context.Context, sctx sessionctx.Context,
sh := memBuffer.Staging()
defer memBuffer.Cleanup(sh)

if meta := t.Meta(); meta.TempTableType == model.TempTableGlobal {
addTemporaryTableID(sctx, meta.ID)
}

var colIDs, binlogColIDs []int64
var row, binlogOldRow, binlogNewRow []types.Datum
numColsCap := len(newData) + 1 // +1 for the extra handle column that we may need to append.
Expand Down Expand Up @@ -584,6 +588,14 @@ func TryGetCommonPkColumns(tbl table.Table) []*table.Column {
return pkCols
}

func addTemporaryTableID(sctx sessionctx.Context, id int64) {
txnCtx := sctx.GetSessionVars().TxnCtx
if txnCtx.GlobalTemporaryTables == nil {
txnCtx.GlobalTemporaryTables = make(map[int64]struct{})
}
txnCtx.GlobalTemporaryTables[id] = struct{}{}
}

// AddRecord implements table.Table AddRecord interface.
func (t *TableCommon) AddRecord(sctx sessionctx.Context, r []types.Datum, opts ...table.AddRecordOption) (recordID kv.Handle, err error) {
txn, err := sctx.Txn(true)
Expand All @@ -596,6 +608,10 @@ func (t *TableCommon) AddRecord(sctx sessionctx.Context, r []types.Datum, opts .
fn.ApplyOn(&opt)
}

if meta := t.Meta(); meta.TempTableType == model.TempTableGlobal {
addTemporaryTableID(sctx, meta.ID)
}

var ctx context.Context
if opt.Ctx != nil {
ctx = opt.Ctx
Expand Down Expand Up @@ -992,6 +1008,11 @@ func (t *TableCommon) RemoveRecord(ctx sessionctx.Context, h kv.Handle, r []type
if err != nil {
return err
}

if meta := t.Meta(); meta.TempTableType == model.TempTableGlobal {
addTemporaryTableID(ctx, meta.ID)
}

// The table has non-public column and this column is doing the operation of "modify/change column".
if len(t.Columns) > len(r) && t.Columns[len(r)].ChangeStateInfo != nil {
r = append(r, r[t.Columns[len(r)].ChangeStateInfo.DependencyColumnOffset])
Expand Down

0 comments on commit af6cece

Please sign in to comment.