Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*: make SET TRANSACTION ISOLATION LEVEL READ COMMITTED take effect #3619

Merged
merged 9 commits into from
Jul 17, 2017
8 changes: 8 additions & 0 deletions ast/misc.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,14 @@ var (
_ Node = &VariableAssignment{}
)

// Isolation level constants.
const (
ReadCommitted = "READ-COMMITTED"
ReadUncommitted = "READ-UNCOMMITTED"
Serializable = "SERIALIZABLE"
RepeatableRead = "REPEATABLE-READ"
)

// TypeOpt is used for parsing data type option from SQL.
type TypeOpt struct {
IsUnsigned bool
Expand Down
26 changes: 14 additions & 12 deletions distsql/distsql.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ func (pr *partialResult) Close() error {
// concurrency: The max concurrency for underlying coprocessor request.
// keepOrder: If the result should returned in key order. For example if we need keep data in order by
// scan index, we should set keepOrder to true.
func Select(client kv.Client, ctx goctx.Context, req *tipb.SelectRequest, keyRanges []kv.KeyRange, concurrency int, keepOrder bool) (SelectResult, error) {
func Select(client kv.Client, ctx goctx.Context, req *tipb.SelectRequest, keyRanges []kv.KeyRange, concurrency int, keepOrder bool, isolationLevel kv.IsoLevel) (SelectResult, error) {
var err error
defer func() {
// Add metrics
Expand All @@ -196,7 +196,7 @@ func Select(client kv.Client, ctx goctx.Context, req *tipb.SelectRequest, keyRan
}()

// Convert tipb.*Request to kv.Request.
kvReq, err1 := composeRequest(req, keyRanges, concurrency, keepOrder)
kvReq, err1 := composeRequest(req, keyRanges, concurrency, keepOrder, isolationLevel)
if err1 != nil {
err = errors.Trace(err1)
return nil, err
Expand Down Expand Up @@ -229,7 +229,7 @@ func Select(client kv.Client, ctx goctx.Context, req *tipb.SelectRequest, keyRan
// concurrency: The max concurrency for underlying coprocessor request.
// keepOrder: If the result should returned in key order. For example if we need keep data in order by
// scan index, we should set keepOrder to true.
func SelectDAG(client kv.Client, ctx goctx.Context, dag *tipb.DAGRequest, keyRanges []kv.KeyRange, concurrency int, keepOrder bool, desc bool) (SelectResult, error) {
func SelectDAG(client kv.Client, ctx goctx.Context, dag *tipb.DAGRequest, keyRanges []kv.KeyRange, concurrency int, keepOrder bool, desc bool, isolationLevel kv.IsoLevel) (SelectResult, error) {
var err error
defer func() {
// Add metrics.
Expand All @@ -241,11 +241,12 @@ func SelectDAG(client kv.Client, ctx goctx.Context, dag *tipb.DAGRequest, keyRan
}()

kvReq := &kv.Request{
Tp: kv.ReqTypeDAG,
Concurrency: concurrency,
KeepOrder: keepOrder,
KeyRanges: keyRanges,
Desc: desc,
Tp: kv.ReqTypeDAG,
Concurrency: concurrency,
KeepOrder: keepOrder,
KeyRanges: keyRanges,
Desc: desc,
IsolationLevel: isolationLevel,
}
kvReq.Data, err = dag.Marshal()
if err != nil {
Expand All @@ -267,11 +268,12 @@ func SelectDAG(client kv.Client, ctx goctx.Context, dag *tipb.DAGRequest, keyRan
}

// Convert tipb.Request to kv.Request.
func composeRequest(req *tipb.SelectRequest, keyRanges []kv.KeyRange, concurrency int, keepOrder bool) (*kv.Request, error) {
func composeRequest(req *tipb.SelectRequest, keyRanges []kv.KeyRange, concurrency int, keepOrder bool, isolationLevel kv.IsoLevel) (*kv.Request, error) {
kvReq := &kv.Request{
Concurrency: concurrency,
KeepOrder: keepOrder,
KeyRanges: keyRanges,
Concurrency: concurrency,
KeepOrder: keepOrder,
KeyRanges: keyRanges,
IsolationLevel: isolationLevel,
}
if req.IndexInfo != nil {
kvReq.Tp = kv.ReqTypeIndex
Expand Down
17 changes: 13 additions & 4 deletions executor/distsql.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (

"github.com/juju/errors"
"github.com/ngaut/log"
"github.com/pingcap/tidb/ast"
"github.com/pingcap/tidb/context"
"github.com/pingcap/tidb/distsql"
"github.com/pingcap/tidb/distsql/xeval"
Expand Down Expand Up @@ -661,12 +662,20 @@ func (e *XSelectIndexExec) doIndexRequest() (distsql.SelectResult, error) {
for i, v := range e.index.Columns {
fieldTypes[i] = &(e.table.Cols()[v.Offset].FieldType)
}
sc := e.ctx.GetSessionVars().StmtCtx
sv := e.ctx.GetSessionVars()
sc := sv.StmtCtx
keyRanges, err := indexRangesToKVRanges(sc, e.table.Meta().ID, e.index.ID, e.ranges, fieldTypes)
if err != nil {
return nil, errors.Trace(err)
}
return distsql.Select(e.ctx.GetClient(), e.ctx.GoCtx(), selIdxReq, keyRanges, e.scanConcurrency, !e.outOfOrder)
return distsql.Select(e.ctx.GetClient(), e.ctx.GoCtx(), selIdxReq, keyRanges, e.scanConcurrency, !e.outOfOrder, getIsolationLevel(sv))
}

func getIsolationLevel(sv *variable.SessionVars) kv.IsoLevel {
if sv.Systems[variable.TxnIsolation] == ast.ReadCommitted {
return kv.RC
}
return kv.SI
}

func (e *XSelectIndexExec) buildTableTasks(handles []int64) []*lookupTableTask {
Expand Down Expand Up @@ -811,7 +820,7 @@ func (e *XSelectIndexExec) doTableRequest(handles []int64) (distsql.SelectResult
keyRanges := tableHandlesToKVRanges(e.table.Meta().ID, handles)
// Use the table scan concurrency variable to do table request.
concurrency := e.ctx.GetSessionVars().DistSQLScanConcurrency
resp, err := distsql.Select(e.ctx.GetClient(), goctx.Background(), selTableReq, keyRanges, concurrency, false)
resp, err := distsql.Select(e.ctx.GetClient(), goctx.Background(), selTableReq, keyRanges, concurrency, false, getIsolationLevel(e.ctx.GetSessionVars()))
if err != nil {
return nil, errors.Trace(err)
}
Expand Down Expand Up @@ -891,7 +900,7 @@ func (e *XSelectTableExec) doRequest() error {
selReq.GroupBy = e.byItems

kvRanges := tableRangesToKVRanges(e.table.Meta().ID, e.ranges)
e.result, err = distsql.Select(e.ctx.GetClient(), goctx.Background(), selReq, kvRanges, e.ctx.GetSessionVars().DistSQLScanConcurrency, e.keepOrder)
e.result, err = distsql.Select(e.ctx.GetClient(), goctx.Background(), selReq, kvRanges, e.ctx.GetSessionVars().DistSQLScanConcurrency, e.keepOrder, getIsolationLevel(e.ctx.GetSessionVars()))
if err != nil {
return errors.Trace(err)
}
Expand Down
12 changes: 6 additions & 6 deletions executor/new_distsql.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ func (e *TableReaderExecutor) Next() (*Row, error) {
func (e *TableReaderExecutor) Open() error {
kvRanges := tableRangesToKVRanges(e.tableID, e.ranges)
var err error
e.result, err = distsql.SelectDAG(e.ctx.GetClient(), goctx.Background(), e.dagPB, kvRanges, e.ctx.GetSessionVars().DistSQLScanConcurrency, e.keepOrder, e.desc)
e.result, err = distsql.SelectDAG(e.ctx.GetClient(), goctx.Background(), e.dagPB, kvRanges, e.ctx.GetSessionVars().DistSQLScanConcurrency, e.keepOrder, e.desc, getIsolationLevel(e.ctx.GetSessionVars()))
if err != nil {
return errors.Trace(err)
}
Expand All @@ -126,7 +126,7 @@ func (e *TableReaderExecutor) Open() error {
func (e *TableReaderExecutor) doRequestForHandles(handles []int64, goCtx goctx.Context) error {
kvRanges := tableHandlesToKVRanges(e.tableID, handles)
var err error
e.result, err = distsql.SelectDAG(e.ctx.GetClient(), goCtx, e.dagPB, kvRanges, e.ctx.GetSessionVars().DistSQLScanConcurrency, e.keepOrder, e.desc)
e.result, err = distsql.SelectDAG(e.ctx.GetClient(), goCtx, e.dagPB, kvRanges, e.ctx.GetSessionVars().DistSQLScanConcurrency, e.keepOrder, e.desc, getIsolationLevel(e.ctx.GetSessionVars()))
if err != nil {
return errors.Trace(err)
}
Expand Down Expand Up @@ -226,7 +226,7 @@ func (e *IndexReaderExecutor) Open() error {
if err != nil {
return errors.Trace(err)
}
e.result, err = distsql.SelectDAG(e.ctx.GetClient(), e.ctx.GoCtx(), e.dagPB, kvRanges, e.ctx.GetSessionVars().DistSQLScanConcurrency, e.keepOrder, e.desc)
e.result, err = distsql.SelectDAG(e.ctx.GetClient(), e.ctx.GoCtx(), e.dagPB, kvRanges, e.ctx.GetSessionVars().DistSQLScanConcurrency, e.keepOrder, e.desc, getIsolationLevel(e.ctx.GetSessionVars()))
if err != nil {
return errors.Trace(err)
}
Expand All @@ -240,7 +240,7 @@ func (e *IndexReaderExecutor) doRequestForDatums(values [][]types.Datum, goCtx g
if err != nil {
return errors.Trace(err)
}
e.result, err = distsql.SelectDAG(e.ctx.GetClient(), e.ctx.GoCtx(), e.dagPB, kvRanges, e.ctx.GetSessionVars().DistSQLScanConcurrency, e.keepOrder, e.desc)
e.result, err = distsql.SelectDAG(e.ctx.GetClient(), e.ctx.GoCtx(), e.dagPB, kvRanges, e.ctx.GetSessionVars().DistSQLScanConcurrency, e.keepOrder, e.desc, getIsolationLevel(e.ctx.GetSessionVars()))
if err != nil {
return errors.Trace(err)
}
Expand Down Expand Up @@ -283,7 +283,7 @@ func (e *IndexLookUpExecutor) Open() error {
if err != nil {
return errors.Trace(err)
}
e.result, err = distsql.SelectDAG(e.ctx.GetClient(), e.ctx.GoCtx(), e.dagPB, kvRanges, e.ctx.GetSessionVars().DistSQLScanConcurrency, e.keepOrder, e.desc)
e.result, err = distsql.SelectDAG(e.ctx.GetClient(), e.ctx.GoCtx(), e.dagPB, kvRanges, e.ctx.GetSessionVars().DistSQLScanConcurrency, e.keepOrder, e.desc, getIsolationLevel(e.ctx.GetSessionVars()))
if err != nil {
return errors.Trace(err)
}
Expand All @@ -303,7 +303,7 @@ func (e *IndexLookUpExecutor) doRequestForDatums(values [][]types.Datum, goCtx g
if err != nil {
return errors.Trace(err)
}
e.result, err = distsql.SelectDAG(e.ctx.GetClient(), e.ctx.GoCtx(), e.dagPB, kvRanges, e.ctx.GetSessionVars().DistSQLScanConcurrency, e.keepOrder, e.desc)
e.result, err = distsql.SelectDAG(e.ctx.GetClient(), e.ctx.GoCtx(), e.dagPB, kvRanges, e.ctx.GetSessionVars().DistSQLScanConcurrency, e.keepOrder, e.desc, getIsolationLevel(e.ctx.GetSessionVars()))
if err != nil {
return errors.Trace(err)
}
Expand Down
7 changes: 7 additions & 0 deletions executor/set.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/pingcap/tidb/ast"
"github.com/pingcap/tidb/context"
"github.com/pingcap/tidb/expression"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/sessionctx/varsutil"
Expand Down Expand Up @@ -143,6 +144,12 @@ func (e *SetExecutor) executeSet() error {
valStr, _ := value.ToString()
log.Infof("[%d] set system variable %s = %s", sessionVars.ConnectionID, name, valStr)
}

if name == variable.TxnIsolation {
if sessionVars.Systems[variable.TxnIsolation] == ast.ReadCommitted {
e.ctx.Txn().SetOption(kv.IsolationLevel, kv.RC)
}
}
}
return nil
}
Expand Down
16 changes: 16 additions & 0 deletions executor/set_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,22 @@ func (s *testSuite) TestSetVar(c *C) {
tk.MustExec("set @@tidb_skip_constraint_check = '0'")
c.Assert(vars.SkipConstraintCheck, IsFalse)

// Test set transaction isolation level, which is equivalent to setting variable "tx_isolation".
tk.MustExec("SET SESSION TRANSACTION ISOLATION LEVEL READ COMMITTED")
tk.MustQuery("select @@session.tx_isolation").Check(testkit.Rows("READ-COMMITTED"))
tk.MustExec("SET SESSION TRANSACTION ISOLATION LEVEL READ UNCOMMITTED")
tk.MustQuery("select @@session.tx_isolation").Check(testkit.Rows("READ-UNCOMMITTED"))
tk.MustExec("SET GLOBAL TRANSACTION ISOLATION LEVEL SERIALIZABLE")
tk.MustQuery("select @@global.tx_isolation").Check(testkit.Rows("SERIALIZABLE"))

// Even the transaction fail, set session variable would success.
tk.MustExec("BEGIN")
tk.MustExec("SET SESSION TRANSACTION ISOLATION LEVEL READ COMMITTED")
_, err = tk.Se.Execute(`INSERT INTO t VALUES ("sdfsdf")`)
c.Assert(err, NotNil)
tk.MustExec("COMMIT")
tk.MustQuery("select @@session.tx_isolation").Check(testkit.Rows("READ-COMMITTED"))

tk.MustExec("set global avoid_temporal_upgrade = on")
tk.MustQuery(`select @@global.avoid_temporal_upgrade;`).Check(testkit.Rows("ON"))
tk.MustExec("set @@global.avoid_temporal_upgrade = off")
Expand Down
53 changes: 49 additions & 4 deletions parser/parser.y
Original file line number Diff line number Diff line change
Expand Up @@ -764,6 +764,8 @@ import (
TableOptionListOpt "create table option list opt"
TableRef "table reference"
TableRefs "table references"
TransactionChar "Transaction characteristic"
TransactionChars "Transaction characteristic list"
TrimDirection "Trim string direction"
TruncateTableStmt "TRANSACTION TABLE statement"
UnionOpt "Union Option(empty/ALL/DISTINCT)"
Expand Down Expand Up @@ -842,8 +844,6 @@ import (
OuterOpt "optional OUTER clause"
CrossOpt "Cross join option"
TablesTerminalSym "{TABLE|TABLES}"
TransactionChar "Transaction characteristic"
TransactionChars "Transaction characteristic list"
IsolationLevel "Isolation level"
ShowIndexKwd "Show index/indexs/key keyword"
FromOrIn "From or In"
Expand Down Expand Up @@ -4886,27 +4886,72 @@ SetStmt:
}
| "SET" "GLOBAL" "TRANSACTION" TransactionChars
{
// Parsed but ignored
vars := $4.([]*ast.VariableAssignment)
for _, v := range vars {
v.IsGlobal = true
}
$$ = &ast.SetStmt{Variables: vars}
}
| "SET" "SESSION" "TRANSACTION" TransactionChars
{
// Parsed but ignored
$$ = &ast.SetStmt{Variables: $4.([]*ast.VariableAssignment)}
}

TransactionChars:
TransactionChar
{
if $1 != nil {
$$ = []*ast.VariableAssignment{$1.(*ast.VariableAssignment)}
} else {
$$ = []*ast.VariableAssignment{}
}
}
| TransactionChars ',' TransactionChar
{
if $3 != nil {
$$ = append($1.([]*ast.VariableAssignment), $3.(*ast.VariableAssignment))
} else {
$$ = $1
}
}

TransactionChar:
"ISOLATION" "LEVEL" IsolationLevel
{
tp := types.NewFieldType(mysql.TypeString)
tp.Charset, tp.Collate = parser.charset, parser.collation
expr := ast.NewValueExpr($3)
expr.SetType(tp)
$$ = &ast.VariableAssignment{Name: "tx_isolation", Value: expr, IsSystem: true}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use a const for tx_isolation.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

tx_isolation is a session variable name.
I don't want to put it in other places except variable package.
I also don't like parser package to import variable package.
So use the string "tx_isolation" is the best compromise.

}
| "READ" "WRITE"
{
// Parsed but ignored
$$ = nil
}
| "READ" "ONLY"
{
// Parsed but ignored
$$ = nil
}

IsolationLevel:
"REPEATABLE" "READ"
{
$$ = ast.RepeatableRead
}
| "READ" "COMMITTED"
{
$$ = ast.ReadCommitted
}
| "READ" "UNCOMMITTED"
{
$$ = ast.ReadUncommitted
}
| "SERIALIZABLE"
{
$$ = ast.Serializable
}

SetExpr:
"ON"
Expand Down
33 changes: 33 additions & 0 deletions parser/parser_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1792,3 +1792,36 @@ func (s *testParserSuite) TestGeneratedColumn(c *C) {
}

}

func (s *testParserSuite) TestSetTransaction(c *C) {
defer testleak.AfterTest(c)()
// Set transaction is equivalent to setting the global or session value of tx_isolation.
// For example:
// SET SESSION TRANSACTION ISOLATION LEVEL READ COMMITTED
// SET SESSION tx_isolation='READ-COMMITTED'
tests := []struct {
input string
isGlobal bool
value string
}{
{
"SET SESSION TRANSACTION ISOLATION LEVEL READ COMMITTED",
false, "READ-COMMITTED",
},
{
"SET GLOBAL TRANSACTION ISOLATION LEVEL REPEATABLE READ",
true, "REPEATABLE-READ",
},
}
parser := New()
for _, t := range tests {
stmt1, err := parser.ParseOneStmt(t.input, "", "")
c.Assert(err, IsNil)
setStmt := stmt1.(*ast.SetStmt)
vars := setStmt.Variables[0]
c.Assert(vars.Name, Equals, "tx_isolation")
c.Assert(vars.IsGlobal, Equals, t.isGlobal)
c.Assert(vars.IsSystem, Equals, true)
c.Assert(vars.Value.GetValue(), Equals, t.value)
}
}
3 changes: 3 additions & 0 deletions session.go
Original file line number Diff line number Diff line change
Expand Up @@ -1130,6 +1130,9 @@ func (s *session) ActivePendingTxn() error {
if err != nil {
return errors.Trace(err)
}
if s.sessionVars.Systems[variable.TxnIsolation] == ast.ReadCommitted {
txn.SetOption(kv.IsolationLevel, kv.RC)
}
return nil
}

Expand Down
1 change: 1 addition & 0 deletions sessionctx/variable/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -304,6 +304,7 @@ const (
CharacterSetResults = "character_set_results"
MaxAllowedPacket = "max_allowed_packet"
TimeZone = "time_zone"
TxnIsolation = "tx_isolation"
)

// TableDelta stands for the changed count for one table.
Expand Down