Skip to content

Commit

Permalink
txn: support pessimistic transaction amend for specific ddls (pingcap…
Browse files Browse the repository at this point in the history
  • Loading branch information
cfzjywxk authored Sep 18, 2020
1 parent 29a6406 commit 0f49dd0
Show file tree
Hide file tree
Showing 14 changed files with 1,363 additions and 62 deletions.
193 changes: 193 additions & 0 deletions ddl/db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"io"
"math"
"math/rand"
"sort"
"strconv"
"strings"
"sync"
Expand Down Expand Up @@ -4693,6 +4694,198 @@ func (s *testSerialDBSuite) TestAddIndexFailOnCaseWhenCanExit(c *C) {
tk.MustExec("drop table if exists t")
}

func (s *testSerialDBSuite) TestCommitTxnWithIndexChange(c *C) {
// Prepare work.
tk := testkit.NewTestKit(c, s.store)
tk.MustExec("drop database if exists test_db")
tk.MustExec("create database test_db")
tk.MustExec("use test_db")
tk.MustExec("create table t1 (c1 int primary key, c2 int, c3 int, index ok2(c2))")
tk.MustExec("insert t1 values (1, 10, 100), (2, 20, 200)")
tk.MustExec("alter table t1 add index k2(c2)")
tk.MustExec("alter table t1 drop index k2")
tk.MustExec("alter table t1 add index k2(c2)")
tk.MustExec("alter table t1 drop index k2")
tk2 := testkit.NewTestKit(c, s.store)
tk2.MustExec("use test_db")

// tkSQLs are the sql statements for the pessimistic transaction.
// tk2DDL are the ddl statements executed before the pessimistic transaction.
// idxDDL is the DDL statement executed between pessimistic transaction begin and commit.
// failCommit means the pessimistic transaction commit should fail not.
type caseUnit struct {
tkSQLs []string
tk2DDL []string
idxDDL string
checkSQLs []string
rowsExps [][]string
failCommit bool
stateEnd model.SchemaState
}

cases := []caseUnit{
// Test secondary index
{[]string{"insert into t1 values(3, 30, 300)",
"insert into t2 values(11, 11, 11)"},
[]string{"alter table t1 add index k2(c2)",
"alter table t1 drop index k2",
"alter table t1 add index kk2(c2, c1)",
"alter table t1 add index k2(c2)",
"alter table t1 drop index k2"},
"alter table t1 add index k2(c2)",
[]string{"select c3, c2 from t1 use index(k2) where c2 = 20",
"select c3, c2 from t1 use index(k2) where c2 = 10",
"select * from t1",
"select * from t2 where c1 = 11"},
[][]string{{"200 20"},
{"100 10"},
{"1 10 100", "2 20 200", "3 30 300"},
{"11 11 11"}},
false,
model.StateNone},
// Test secondary index
{[]string{"insert into t2 values(5, 50, 500)",
"insert into t2 values(11, 11, 11)",
"delete from t2 where c2 = 11",
"update t2 set c2 = 110 where c1 = 11"},
//"update t2 set c1 = 10 where c3 = 100"},
[]string{"alter table t1 add index k2(c2)",
"alter table t1 drop index k2",
"alter table t1 add index kk2(c2, c1)",
"alter table t1 add index k2(c2)",
"alter table t1 drop index k2"},
"alter table t1 add index k2(c2)",
[]string{"select c3, c2 from t1 use index(k2) where c2 = 20",
"select c3, c2 from t1 use index(k2) where c2 = 10",
"select * from t1",
"select * from t2 where c1 = 11",
"select * from t2 where c3 = 100"},
[][]string{{"200 20"},
{"100 10"},
{"1 10 100", "2 20 200"},
{},
{"1 10 100"}},
false,
model.StateNone},
// Test unique index
/* TODO unique index is not supported now.
{[]string{"insert into t1 values(3, 30, 300)",
"insert into t1 values(4, 40, 400)",
"insert into t2 values(11, 11, 11)",
"insert into t2 values(12, 12, 11)"},
[]string{"alter table t1 add unique index uk3(c3)",
"alter table t1 drop index uk3",
"alter table t2 add unique index ukc1c3(c1, c3)",
"alter table t2 add unique index ukc3(c3)",
"alter table t2 drop index ukc1c3",
"alter table t2 drop index ukc3",
"alter table t2 add index kc3(c3)"},
"alter table t1 add unique index uk3(c3)",
[]string{"select c3, c2 from t1 use index(uk3) where c3 = 200",
"select c3, c2 from t1 use index(uk3) where c3 = 300",
"select c3, c2 from t1 use index(uk3) where c3 = 400",
"select * from t1",
"select * from t2"},
[][]string{{"200 20"},
{"300 30"},
{"400 40"},
{"1 10 100", "2 20 200", "3 30 300", "4 40 400"},
{"1 10 100", "2 20 200", "11 11 11", "12 12 11"}},
false, model.StateNone},
// Test unique index fail to commit, this case needs the new index could be inserted
{[]string{"insert into t1 values(3, 30, 300)",
"insert into t1 values(4, 40, 300)",
"insert into t2 values(11, 11, 11)",
"insert into t2 values(12, 11, 12)"},
//[]string{"alter table t1 add unique index uk3(c3)", "alter table t1 drop index uk3"},
[]string{},
"alter table t1 add unique index uk3(c3)",
[]string{"select c3, c2 from t1 use index(uk3) where c3 = 200",
"select c3, c2 from t1 use index(uk3) where c3 = 300",
"select c3, c2 from t1 where c1 = 4",
"select * from t1",
"select * from t2"},
[][]string{{"200 20"},
{},
{},
{"1 10 100", "2 20 200"},
{"1 10 100", "2 20 200"}},
true,
model.StateWriteOnly},
*/
}
tk.MustQuery("select * from t1;").Check(testkit.Rows("1 10 100", "2 20 200"))

// Test add index state change
do := s.dom.DDL()
startStates := []model.SchemaState{model.StateNone, model.StateDeleteOnly}
for _, startState := range startStates {
endStatMap := session.ConstOpAddIndex[startState]
var endStates []model.SchemaState
for st := range endStatMap {
endStates = append(endStates, st)
}
sort.Slice(endStates, func(i, j int) bool { return endStates[i] < endStates[j] })
for _, endState := range endStates {
for _, curCase := range cases {
if endState < curCase.stateEnd {
break
}
tk2.MustExec("drop table if exists t1")
tk2.MustExec("drop table if exists t2")
tk2.MustExec("create table t1 (c1 int primary key, c2 int, c3 int, index ok2(c2))")
tk2.MustExec("create table t2 (c1 int primary key, c2 int, c3 int, index ok2(c2))")
tk2.MustExec("insert t1 values (1, 10, 100), (2, 20, 200)")
tk2.MustExec("insert t2 values (1, 10, 100), (2, 20, 200)")
tk2.MustQuery("select * from t1;").Check(testkit.Rows("1 10 100", "2 20 200"))
tk.MustQuery("select * from t1;").Check(testkit.Rows("1 10 100", "2 20 200"))
tk.MustQuery("select * from t2;").Check(testkit.Rows("1 10 100", "2 20 200"))

for _, DDLSQL := range curCase.tk2DDL {
tk2.MustExec(DDLSQL)
}
hook := &ddl.TestDDLCallback{}
prepared := false
committed := false
hook.OnJobUpdatedExported = func(job *model.Job) {
if job.SchemaState == startState {
if !prepared {
tk.MustExec("begin pessimistic")
for _, tkSQL := range curCase.tkSQLs {
tk.MustExec(tkSQL)
}
prepared = true
}
} else if job.SchemaState == endState {
if !committed {
if curCase.failCommit {
_, err := tk.Exec("commit")
c.Assert(err, NotNil)
} else {
tk.MustExec("commit")
}
}
committed = true
}
}
originalCallback := do.GetHook()
do.(ddl.DDLForTest).SetHook(hook)
tk2.MustExec(curCase.idxDDL)
do.(ddl.DDLForTest).SetHook(originalCallback)
tk2.MustExec("admin check table t1")
for i, checkSQL := range curCase.checkSQLs {
if len(curCase.rowsExps[i]) > 0 {
tk2.MustQuery(checkSQL).Check(testkit.Rows(curCase.rowsExps[i]...))
} else {
tk2.MustQuery(checkSQL).Check(nil)
}
}
}
}
}
tk.MustExec("admin check table t1")
}

func init() {
// Make sure it will only be executed once.
domain.SchemaOutOfDateRetryInterval = int64(50 * time.Millisecond)
Expand Down
2 changes: 1 addition & 1 deletion executor/batch_point_get.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ func (e *BatchPointGetExec) Next(ctx context.Context, req *chunk.Chunk) error {
}
for !req.IsFull() && e.index < len(e.values) {
handle, val := e.handles[e.index], e.values[e.index]
err := decodeRowValToChunk(e.base(), e.tblInfo, handle, val, req, e.rowDecoder)
err := DecodeRowValToChunk(e.base().ctx, e.schema, e.tblInfo, handle, val, req, e.rowDecoder)
if err != nil {
return err
}
Expand Down
5 changes: 3 additions & 2 deletions executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -2984,7 +2984,8 @@ func (b *executorBuilder) buildSQLBindExec(v *plannercore.SQLBindPlan) Executor
return e
}

func newRowDecoder(ctx sessionctx.Context, schema *expression.Schema, tbl *model.TableInfo) *rowcodec.ChunkDecoder {
// NewRowDecoder creates a chunk decoder for new row format row value decode.
func NewRowDecoder(ctx sessionctx.Context, schema *expression.Schema, tbl *model.TableInfo) *rowcodec.ChunkDecoder {
getColInfoByID := func(tbl *model.TableInfo, colID int64) *model.ColumnInfo {
for _, col := range tbl.Columns {
if col.ID == colID {
Expand Down Expand Up @@ -3040,7 +3041,7 @@ func (b *executorBuilder) buildBatchPointGet(plan *plannercore.BatchPointGetPlan
b.err = err
return nil
}
decoder := newRowDecoder(b.ctx, plan.Schema(), plan.TblInfo)
decoder := NewRowDecoder(b.ctx, plan.Schema(), plan.TblInfo)
e := &BatchPointGetExec{
baseExecutor: newBaseExecutor(b.ctx, plan.Schema(), plan.ID()),
tblInfo: plan.TblInfo,
Expand Down
25 changes: 15 additions & 10 deletions executor/point_get.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,10 @@ import (
"github.com/pingcap/failpoint"
"github.com/pingcap/parser/model"
"github.com/pingcap/parser/mysql"
"github.com/pingcap/tidb/expression"
"github.com/pingcap/tidb/kv"
plannercore "github.com/pingcap/tidb/planner/core"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/store/tikv"
"github.com/pingcap/tidb/table"
"github.com/pingcap/tidb/tablecodec"
Expand Down Expand Up @@ -87,7 +89,7 @@ type PointGetExecutor struct {

// Init set fields needed for PointGetExecutor reuse, this does NOT change baseExecutor field
func (e *PointGetExecutor) Init(p *plannercore.PointGetPlan, startTs uint64) {
decoder := newRowDecoder(e.ctx, p.Schema(), p.TblInfo)
decoder := NewRowDecoder(e.ctx, p.Schema(), p.TblInfo)
e.tblInfo = p.TblInfo
e.handle = p.Handle
e.idxInfo = p.IndexInfo
Expand Down Expand Up @@ -227,7 +229,7 @@ func (e *PointGetExecutor) Next(ctx context.Context, req *chunk.Chunk) error {
}
return nil
}
err = decodeRowValToChunk(e.base(), e.tblInfo, e.handle, val, req, e.rowDecoder)
err = DecodeRowValToChunk(e.base().ctx, e.schema, e.tblInfo, e.handle, val, req, e.rowDecoder)
if err != nil {
return err
}
Expand Down Expand Up @@ -352,16 +354,19 @@ func encodeIndexKey(e *baseExecutor, tblInfo *model.TableInfo, idxInfo *model.In
return tablecodec.EncodeIndexSeekKey(tID, idxInfo.ID, encodedIdxVals), hasNull, nil
}

func decodeRowValToChunk(e *baseExecutor, tblInfo *model.TableInfo, handle int64, rowVal []byte, chk *chunk.Chunk, rd *rowcodec.ChunkDecoder) error {
// DecodeRowValToChunk decodes row value into chunk checking row format used.
func DecodeRowValToChunk(sctx sessionctx.Context, schema *expression.Schema, tblInfo *model.TableInfo,
handle int64, rowVal []byte, chk *chunk.Chunk, rd *rowcodec.ChunkDecoder) error {
if rowcodec.IsNewFormat(rowVal) {
return rd.DecodeToChunk(rowVal, handle, chk)
}
return decodeOldRowValToChunk(e, tblInfo, handle, rowVal, chk)
return decodeOldRowValToChunk(sctx, schema, tblInfo, handle, rowVal, chk)
}

func decodeOldRowValToChunk(e *baseExecutor, tblInfo *model.TableInfo, handle int64, rowVal []byte, chk *chunk.Chunk) error {
colID2CutPos := make(map[int64]int, e.schema.Len())
for _, col := range e.schema.Columns {
func decodeOldRowValToChunk(sctx sessionctx.Context, schema *expression.Schema, tblInfo *model.TableInfo, handle int64,
rowVal []byte, chk *chunk.Chunk) error {
colID2CutPos := make(map[int64]int, schema.Len())
for _, col := range schema.Columns {
if _, ok := colID2CutPos[col.ID]; !ok {
colID2CutPos[col.ID] = len(colID2CutPos)
}
Expand All @@ -373,8 +378,8 @@ func decodeOldRowValToChunk(e *baseExecutor, tblInfo *model.TableInfo, handle in
if cutVals == nil {
cutVals = make([][]byte, len(colID2CutPos))
}
decoder := codec.NewDecoder(chk, e.ctx.GetSessionVars().Location())
for i, col := range e.schema.Columns {
decoder := codec.NewDecoder(chk, sctx.GetSessionVars().Location())
for i, col := range schema.Columns {
// fill the virtual column value after row calculation
if col.VirtualExpr != nil {
chk.AppendNull(i)
Expand All @@ -391,7 +396,7 @@ func decodeOldRowValToChunk(e *baseExecutor, tblInfo *model.TableInfo, handle in
cutPos := colID2CutPos[col.ID]
if len(cutVals[cutPos]) == 0 {
colInfo := getColInfoByID(tblInfo, col.ID)
d, err1 := table.GetColOriginDefaultValue(e.ctx, colInfo)
d, err1 := table.GetColOriginDefaultValue(sctx, colInfo)
if err1 != nil {
return err1
}
Expand Down
2 changes: 2 additions & 0 deletions kv/kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@ const (
CheckExists
// InfoSchema is schema version used by txn startTS.
InfoSchema
// SchemaAmender is used to amend mutations for pessimistic transactions
SchemaAmender
)

// Priority value for transaction priority.
Expand Down
26 changes: 26 additions & 0 deletions session/pessimistic_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1547,3 +1547,29 @@ func (s *testPessimisticSuite) TestPessimisticUnionForUpdate(c *C) {
tk.MustExec("commit")
tk.MustExec("admin check table t")
}

func (s *testPessimisticSuite) TestPessimisticTxnWithDDLAddDropColumn(c *C) {
tk := testkit.NewTestKitWithInit(c, s.store)
tk2 := testkit.NewTestKitWithInit(c, s.store)
tk.MustExec("drop table if exists t1")
tk.MustExec("create table t1 (c1 int primary key, c2 int)")
tk.MustExec("insert t1 values (1, 77), (2, 88)")
tk.MustExec("alter table t1 add index k2(c2)")
tk.MustExec("alter table t1 drop index k2")

// tk2 starts a pessimistic transaction and make some changes on table t1.
// tk executes some ddl statements add/drop column on table t1.
tk.MustExec("begin pessimistic")
tk.MustExec("update t1 set c2 = c1 * 10")
tk2.MustExec("alter table t1 add column c3 int after c1")
tk.MustExec("commit")
tk.MustExec("admin check table t1")
tk.MustQuery("select * from t1").Check(testkit.Rows("1 <nil> 10", "2 <nil> 20"))

tk.MustExec("begin pessimistic")
tk.MustExec("insert into t1 values(5, 5, 5)")
tk2.MustExec("alter table t1 drop column c3")
tk2.MustExec("alter table t1 drop column c2")
tk.MustExec("commit")
tk.MustQuery("select * from t1").Check(testkit.Rows("1", "2", "5"))
}
Loading

0 comments on commit 0f49dd0

Please sign in to comment.