Skip to content

Commit

Permalink
ddl : ddl forbid stale read for cache table and support history read (p…
Browse files Browse the repository at this point in the history
  • Loading branch information
JayLZhou authored Oct 29, 2021
1 parent 0e16830 commit ac2c6f8
Show file tree
Hide file tree
Showing 7 changed files with 155 additions and 9 deletions.
11 changes: 11 additions & 0 deletions ddl/db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5917,6 +5917,17 @@ func (s *testDBSuite2) TestAlterTableCache(c *C) {
// Multiple alter cache is okay
tk.MustExec("alter table t cache")
tk.MustExec("alter table t cache")
// Test a temporary table
tk.MustExec("drop table if exists t")
tk.MustExec("create temporary table t (id int primary key auto_increment, u int unique, v int)")
tk.MustExec("drop table if exists tmp1")
// local temporary table alter is not supported
tk.MustGetErrCode("alter table t cache", errno.ErrUnsupportedDDLOperation)
// test global temporary table
tk.MustExec("create global temporary table tmp1 " +
"(id int not null primary key, code int not null, value int default null, unique key code(code))" +
"on commit delete rows")
tk.MustGetErrMsg("alter table tmp1 cache", ddl.ErrOptOnTemporaryTable.GenWithStackByArgs("alter temporary table cache").Error())

}

Expand Down
3 changes: 3 additions & 0 deletions ddl/ddl_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -6637,6 +6637,9 @@ func (d *ddl) AlterTableCache(ctx sessionctx.Context, ti ast.Ident) (err error)
if t.Meta().TableCacheStatusType == model.TableCacheStatusEnable {
return nil
}
if t.Meta().TempTableType != model.TempTableNone {
return ErrOptOnTemporaryTable.GenWithStackByArgs("alter temporary table cache")
}
job := &model.Job{
SchemaID: schema.ID,
SchemaName: schema.Name.L,
Expand Down
4 changes: 3 additions & 1 deletion ddl/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -1475,7 +1475,9 @@ func onAlterCacheTable(t *meta.Meta, job *model.Job) (ver int64, err error) {
job.FinishTableJob(model.JobStateDone, model.StatePublic, ver, tbInfo)
return ver, nil
}

if tbInfo.TempTableType != model.TempTableNone {
return ver, errors.Trace(ErrOptOnTemporaryTable.GenWithStackByArgs("alter temporary table cache"))
}
switch tbInfo.TableCacheStatusType {
case model.TableCacheStatusDisable:
// disable -> switching
Expand Down
7 changes: 6 additions & 1 deletion executor/admin_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@ func (s *testSuite5) TestAdminCheckIndexInLocalTemporaryMode(c *C) {
c.Assert(err.Error(), Equals, core.ErrOptOnTemporaryTable.GenWithStackByArgs("admin checksum table").Error())
tk.MustExec("drop table if exists local_temporary_admin_checksum_table_with_index_test,local_temporary_admin_checksum_table_without_index_test;")
}

func (s *testSuite5) TestAdminCheckIndexInCacheTable(c *C) {
tk := testkit.NewTestKit(c, s.store)
tk.MustExec("use test")
Expand All @@ -137,6 +138,8 @@ func (s *testSuite5) TestAdminCheckIndexInCacheTable(c *C) {
tk.MustExec("admin check table cache_admin_test;")
tk.MustExec("admin check index cache_admin_test c1;")
tk.MustExec("admin check index cache_admin_test c2;")
tk.MustExec("drop table if exists cache_admin_test;")

tk.MustExec(`drop table if exists check_index_test;`)
tk.MustExec(`create table check_index_test (a int, b varchar(10), index a_b (a, b), index b (b))`)
tk.MustExec(`insert check_index_test values (3, "ab"),(2, "cd"),(1, "ef"),(-1, "hi")`)
Expand All @@ -145,7 +148,8 @@ func (s *testSuite5) TestAdminCheckIndexInCacheTable(c *C) {
result.Check(testkit.Rows("1 ef 3", "2 cd 2"))
result = tk.MustQuery("admin check index check_index_test a_b (3, 5);")
result.Check(testkit.Rows("-1 hi 4", "1 ef 3"))
tk.MustExec("drop table if exists cache_admin_test;")
tk.MustExec("drop table if exists check_index_test;")

tk.MustExec("drop table if exists cache_admin_table_with_index_test;")
tk.MustExec("drop table if exists cache_admin_table_without_index_test;")
tk.MustExec("create table cache_admin_table_with_index_test (id int, count int, PRIMARY KEY(id), KEY(count))")
Expand All @@ -156,6 +160,7 @@ func (s *testSuite5) TestAdminCheckIndexInCacheTable(c *C) {
tk.MustExec("admin checksum table cache_admin_table_without_index_test;")
tk.MustExec("drop table if exists cache_admin_table_with_index_test,cache_admin_table_without_index_test;")
}

func (s *testSuite5) TestAdminRecoverIndex(c *C) {
tk := testkit.NewTestKit(c, s.store)
tk.MustExec("use test")
Expand Down
34 changes: 28 additions & 6 deletions executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -2848,7 +2848,7 @@ func buildNoRangeTableReader(b *executorBuilder, v *plannercore.PhysicalTableRea
return nil, err
}
ts := v.GetTableScan()
if err = b.validCanReadTemporaryTable(ts.Table); err != nil {
if err = b.validCanReadTemporaryOrCacheTable(ts.Table); err != nil {
return nil, err
}

Expand Down Expand Up @@ -2964,7 +2964,7 @@ func (b *executorBuilder) buildTableReader(v *plannercore.PhysicalTableReader) E
}

ts := v.GetTableScan()
if err = b.validCanReadTemporaryTable(ts.Table); err != nil {
if err = b.validCanReadTemporaryOrCacheTable(ts.Table); err != nil {
b.err = err
return nil
}
Expand Down Expand Up @@ -3187,7 +3187,7 @@ func buildNoRangeIndexReader(b *executorBuilder, v *plannercore.PhysicalIndexRea

func (b *executorBuilder) buildIndexReader(v *plannercore.PhysicalIndexReader) Executor {
is := v.IndexPlans[0].(*plannercore.PhysicalIndexScan)
if err := b.validCanReadTemporaryTable(is.Table); err != nil {
if err := b.validCanReadTemporaryOrCacheTable(is.Table); err != nil {
b.err = err
return nil
}
Expand Down Expand Up @@ -3346,7 +3346,7 @@ func buildNoRangeIndexLookUpReader(b *executorBuilder, v *plannercore.PhysicalIn

func (b *executorBuilder) buildIndexLookUpReader(v *plannercore.PhysicalIndexLookUpReader) Executor {
is := v.IndexPlans[0].(*plannercore.PhysicalIndexScan)
if err := b.validCanReadTemporaryTable(is.Table); err != nil {
if err := b.validCanReadTemporaryOrCacheTable(is.Table); err != nil {
b.err = err
return nil
}
Expand Down Expand Up @@ -3461,7 +3461,7 @@ func buildNoRangeIndexMergeReader(b *executorBuilder, v *plannercore.PhysicalInd

func (b *executorBuilder) buildIndexMergeReader(v *plannercore.PhysicalIndexMergeReader) Executor {
ts := v.TablePlans[0].(*plannercore.PhysicalTableScan)
if err := b.validCanReadTemporaryTable(ts.Table); err != nil {
if err := b.validCanReadTemporaryOrCacheTable(ts.Table); err != nil {
b.err = err
return nil
}
Expand Down Expand Up @@ -4266,7 +4266,7 @@ func NewRowDecoder(ctx sessionctx.Context, schema *expression.Schema, tbl *model
}

func (b *executorBuilder) buildBatchPointGet(plan *plannercore.BatchPointGetPlan) Executor {
if err := b.validCanReadTemporaryTable(plan.TblInfo); err != nil {
if err := b.validCanReadTemporaryOrCacheTable(plan.TblInfo); err != nil {
b.err = err
return nil
}
Expand Down Expand Up @@ -4551,6 +4551,28 @@ func (b *executorBuilder) buildCTETableReader(v *plannercore.PhysicalCTETable) E
chkIdx: 0,
}
}
func (b *executorBuilder) validCanReadTemporaryOrCacheTable(tbl *model.TableInfo) error {
err := b.validCanReadTemporaryTable(tbl)
if err != nil {
return err
}
return b.validCanReadCacheTable(tbl)
}

func (b *executorBuilder) validCanReadCacheTable(tbl *model.TableInfo) error {
if tbl.TableCacheStatusType == model.TableCacheStatusDisable {
return nil
}

sessionVars := b.ctx.GetSessionVars()

// Temporary table can't switch into cache table. so the following code will not cause confusion
if sessionVars.TxnCtx.IsStaleness || b.isStaleness {
return errors.Trace(errors.New("can not stale read cache table"))
}

return nil
}

func (b *executorBuilder) validCanReadTemporaryTable(tbl *model.TableInfo) error {
if tbl.TempTableType == model.TempTableNone {
Expand Down
103 changes: 103 additions & 0 deletions executor/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8990,6 +8990,109 @@ func (s *testStaleTxnSuite) TestInvalidReadTemporaryTable(c *C) {
}
}

func (s *testStaleTxnSuite) TestInvalidReadCacheTable(c *C) {
tk := testkit.NewTestKit(c, s.store)
// For mocktikv, safe point is not initialized, we manually insert it for snapshot to use.
safePointName := "tikv_gc_safe_point"
safePointValue := "20160102-15:04:05 -0700"
safePointComment := "All versions after safe point can be accessed. (DO NOT EDIT)"
updateSafePoint := fmt.Sprintf(`INSERT INTO mysql.tidb VALUES ('%[1]s', '%[2]s', '%[3]s')
ON DUPLICATE KEY
UPDATE variable_value = '%[2]s', comment = '%[3]s'`, safePointName, safePointValue, safePointComment)
tk.MustExec(updateSafePoint)
tk.MustExec("use test")
tk.MustExec("drop table if exists cache_tmp1")
tk.MustExec("create table cache_tmp1 " +
"(id int not null primary key, code int not null, value int default null, unique key code(code))")
tk.MustExec("alter table cache_tmp1 cache")
tk.MustExec("drop table if exists cache_tmp2")
tk.MustExec("create table cache_tmp2 (id int not null primary key, code int not null, value int default null, unique key code(code));")
tk.MustExec("alter table cache_tmp2 cache")
tk.MustExec("drop table if exists cache_tmp3 , cache_tmp4, cache_tmp5")
tk.MustExec("create table cache_tmp3 (id int not null primary key, code int not null, value int default null, unique key code(code));")
tk.MustExec("create table cache_tmp4 (id int not null primary key, code int not null, value int default null, unique key code(code));")
tk.MustExec("create table cache_tmp5 (id int primary key);")
// sleep 1us to make test stale
time.Sleep(time.Microsecond)

queries := []struct {
sql string
}{
{
sql: "select * from cache_tmp1 where id=1",
},
{
sql: "select * from cache_tmp1 where code=1",
},
{
sql: "select * from cache_tmp1 where id in (1, 2, 3)",
},
{
sql: "select * from cache_tmp1 where code in (1, 2, 3)",
},
{
sql: "select * from cache_tmp1 where id > 1",
},
{
sql: "select /*+use_index(cache_tmp1, code)*/ * from cache_tmp1 where code > 1",
},
{
sql: "select /*+use_index(cache_tmp1, code)*/ code from cache_tmp1 where code > 1",
},
}

addStaleReadToSQL := func(sql string) string {
idx := strings.Index(sql, " where ")
if idx < 0 {
return ""
}
return sql[0:idx] + " as of timestamp NOW(6)" + sql[idx:]
}
for _, query := range queries {
sql := addStaleReadToSQL(query.sql)
if sql != "" {
tk.MustGetErrMsg(sql, "can not stale read cache table")
}
}

tk.MustExec("start transaction read only as of timestamp NOW(6)")
for _, query := range queries {
tk.MustGetErrMsg(query.sql, "can not stale read cache table")
}
tk.MustExec("commit")

for _, query := range queries {
tk.MustExec(query.sql)
}

// Test normal table when cache table exits.
tk.MustExec("insert into cache_tmp5 values(1);")
tk.MustExec("set @a=now(6);")
time.Sleep(time.Microsecond)
tk.MustExec("drop table cache_tmp5")
tk.MustExec("create table cache_tmp5 (id int primary key);")
tk.MustQuery("select * from cache_tmp5 as of timestamp(@a) where id=1;").Check(testkit.Rows("1"))
tk.MustQuery("select * from cache_tmp4 as of timestamp(@a), cache_tmp3 as of timestamp(@a) where cache_tmp3.id=1;")
tk.MustGetErrMsg("select * from cache_tmp4 as of timestamp(@a), cache_tmp2 as of timestamp(@a) where cache_tmp2.id=1;", "can not stale read cache table")
tk.MustExec("set transaction read only as of timestamp NOW(6)")
tk.MustExec("start transaction")
for _, query := range queries {
tk.MustGetErrMsg(query.sql, "can not stale read cache table")
}
tk.MustExec("commit")

for _, query := range queries {
tk.MustExec(query.sql)
}

tk.MustExec("set @@tidb_snapshot=NOW(6)")
for _, query := range queries {
// enable historical read cache table
tk.MustExec(query.sql)

}
}

func (s *testSuite) TestTableSampleTemporaryTable(c *C) {
tk := testkit.NewTestKit(c, s.store)
// For mocktikv, safe point is not initialized, we manually insert it for snapshot to use.
Expand Down
2 changes: 1 addition & 1 deletion executor/point_get.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ import (
)

func (b *executorBuilder) buildPointGet(p *plannercore.PointGetPlan) Executor {
if err := b.validCanReadTemporaryTable(p.TblInfo); err != nil {
if err := b.validCanReadTemporaryOrCacheTable(p.TblInfo); err != nil {
b.err = err
return nil
}
Expand Down

0 comments on commit ac2c6f8

Please sign in to comment.