From e38a2fc07e40f5b17740353a9cda71cda0e356c8 Mon Sep 17 00:00:00 2001 From: tiancaiamao Date: Tue, 4 Jul 2017 15:53:08 +0800 Subject: [PATCH 1/4] *: make SET TRANSACTION ISOLATION LEVEL READ COMMITTED take effect it is equivalent to setting "tx_isolation" variable. if the value is READ-COMMITTED, set the transaction isolation level option. --- ast/misc.go | 7 +++++ executor/set_test.go | 10 +++++++ parser/parser.y | 53 +++++++++++++++++++++++++++++++--- parser/parser_test.go | 33 +++++++++++++++++++++ session.go | 3 ++ sessionctx/variable/session.go | 1 + 6 files changed, 103 insertions(+), 4 deletions(-) diff --git a/ast/misc.go b/ast/misc.go index 80ba572e80085..1983a7737fce6 100644 --- a/ast/misc.go +++ b/ast/misc.go @@ -46,6 +46,13 @@ var ( _ Node = &VariableAssignment{} ) +const ( + ReadCommitted = "READ-COMMITTED" + ReadUncommitted = "READ-UNCOMMITTED" + Serializable = "SERIALIZABLE " + RepeatableRead = "REPEATABLE-READ" +) + // TypeOpt is used for parsing data type option from SQL. type TypeOpt struct { IsUnsigned bool diff --git a/executor/set_test.go b/executor/set_test.go index bd159da8cf47d..e3bbf4d04055b 100644 --- a/executor/set_test.go +++ b/executor/set_test.go @@ -123,6 +123,16 @@ func (s *testSuite) TestSetVar(c *C) { c.Assert(vars.SkipConstraintCheck, IsTrue) tk.MustExec("set @@tidb_skip_constraint_check = '0'") c.Assert(vars.SkipConstraintCheck, IsFalse) + + // Test set transaction isolation level, which is equivalent to setting variable "tx_isolation". + tk.MustExec("SET GLOBAL TRANSACTION ISOLATION LEVEL SERIALIZABLE") + tk.MustQuery("select @@global.tx_isolation").Check(testkit.Rows("SERIALIZABLE")) + + tk.MustExec("SET SESSION TRANSACTION ISOLATION LEVEL READ UNCOMMITTED") + tk.MustQuery("select @@session.tx_isolation").Check(testkit.Rows("READ-UNCOMMITTED")) + + tk.MustExec("SET SESSION TRANSACTION ISOLATION LEVEL READ COMMITTED") + tk.MustQuery("select @@session.tx_isolation").Check(testkit.Rows("READ-COMMITTED")) } func (s *testSuite) TestSetCharset(c *C) { diff --git a/parser/parser.y b/parser/parser.y index e419e4fb15c62..eff5ca73636f3 100644 --- a/parser/parser.y +++ b/parser/parser.y @@ -757,6 +757,8 @@ import ( TableOptionListOpt "create table option list opt" TableRef "table reference" TableRefs "table references" + TransactionChar "Transaction characteristic" + TransactionChars "Transaction characteristic list" TrimDirection "Trim string direction" TruncateTableStmt "TRANSACTION TABLE statement" UnionOpt "Union Option(empty/ALL/DISTINCT)" @@ -835,8 +837,6 @@ import ( OuterOpt "optional OUTER clause" CrossOpt "Cross join option" TablesTerminalSym "{TABLE|TABLES}" - TransactionChar "Transaction characteristic" - TransactionChars "Transaction characteristic list" IsolationLevel "Isolation level" ShowIndexKwd "Show index/indexs/key keyword" FromOrIn "From or In" @@ -4836,27 +4836,72 @@ SetStmt: } | "SET" "GLOBAL" "TRANSACTION" TransactionChars { - // Parsed but ignored + vars := $4.([]*ast.VariableAssignment) + for _, v := range vars { + v.IsGlobal = true + } + $$ = &ast.SetStmt{Variables: vars} } | "SET" "SESSION" "TRANSACTION" TransactionChars { - // Parsed but ignored + $$ = &ast.SetStmt{Variables: $4.([]*ast.VariableAssignment)} } TransactionChars: TransactionChar + { + if $1 != nil { + $$ = []*ast.VariableAssignment{$1.(*ast.VariableAssignment)} + } else { + $$ = []*ast.VariableAssignment{} + } + } | TransactionChars ',' TransactionChar + { + if $3 != nil { + $$ = append($1.([]*ast.VariableAssignment), $3.(*ast.VariableAssignment)) + } else { + $$ = $1 + } + } TransactionChar: "ISOLATION" "LEVEL" IsolationLevel + { + tp := types.NewFieldType(mysql.TypeString) + tp.Charset, tp.Collate = parser.charset, parser.collation + expr := ast.NewValueExpr($3) + expr.SetType(tp) + $$ = &ast.VariableAssignment{Name: "tx_isolation", Value: expr, IsSystem: true} + } | "READ" "WRITE" + { + // Parsed but ignored + $$ = nil + } | "READ" "ONLY" + { + // Parsed but ignored + $$ = nil + } IsolationLevel: "REPEATABLE" "READ" + { + $$ = ast.RepeatableRead + } | "READ" "COMMITTED" + { + $$ = ast.ReadCommitted + } | "READ" "UNCOMMITTED" + { + $$ = ast.ReadUncommitted + } | "SERIALIZABLE" + { + $$ = ast.Serializable + } VariableAssignment: Identifier eq Expression diff --git a/parser/parser_test.go b/parser/parser_test.go index 3caead00cde66..31a80e0a84f43 100644 --- a/parser/parser_test.go +++ b/parser/parser_test.go @@ -1761,3 +1761,36 @@ func (s *testParserSuite) TestGeneratedColumn(c *C) { } } + +func (s *testParserSuite) TestSetTransaction(c *C) { + defer testleak.AfterTest(c)() + // Set transaction is equivalent to setting the global or session value of tx_isolation. + // For example: + // SET SESSION TRANSACTION ISOLATION LEVEL READ COMMITTED + // SET SESSION tx_isolation='READ-COMMITTED' + tests := []struct { + input string + isGlobal bool + value string + }{ + { + "SET SESSION TRANSACTION ISOLATION LEVEL READ COMMITTED", + false, "READ-COMMITTED", + }, + { + "SET GLOBAL TRANSACTION ISOLATION LEVEL REPEATABLE READ", + true, "REPEATABLE-READ", + }, + } + parser := New() + for _, t := range tests { + stmt1, err := parser.ParseOneStmt(t.input, "", "") + c.Assert(err, IsNil) + setStmt := stmt1.(*ast.SetStmt) + vars := setStmt.Variables[0] + c.Assert(vars.Name, Equals, "tx_isolation") + c.Assert(vars.IsGlobal, Equals, t.isGlobal) + c.Assert(vars.IsSystem, Equals, true) + c.Assert(vars.Value.GetValue(), Equals, t.value) + } +} diff --git a/session.go b/session.go index 15d156da1eed6..37f8fc6552741 100644 --- a/session.go +++ b/session.go @@ -1130,6 +1130,9 @@ func (s *session) ActivePendingTxn() error { if err != nil { return errors.Trace(err) } + if s.sessionVars.Systems[variable.TxnIsolation] == ast.ReadCommitted { + txn.SetOption(kv.IsolationLevel, kv.RC) + } return nil } diff --git a/sessionctx/variable/session.go b/sessionctx/variable/session.go index 614a2c6180ff4..c7f25e2189c46 100644 --- a/sessionctx/variable/session.go +++ b/sessionctx/variable/session.go @@ -304,6 +304,7 @@ const ( CharacterSetResults = "character_set_results" MaxAllowedPacket = "max_allowed_packet" TimeZone = "time_zone" + TxnIsolation = "tx_isolation" ) // TableDelta stands for the changed count for one table. From 8fe98539ae11878f0e02bfead8862c7966ec15e0 Mon Sep 17 00:00:00 2001 From: tiancaiamao Date: Tue, 4 Jul 2017 20:11:14 +0800 Subject: [PATCH 2/4] address comment --- ast/misc.go | 2 +- distsql/distsql.go | 26 ++++++++++++++------------ executor/distsql.go | 17 +++++++++++++---- executor/new_distsql.go | 12 ++++++------ 4 files changed, 34 insertions(+), 23 deletions(-) diff --git a/ast/misc.go b/ast/misc.go index 1983a7737fce6..1c9db8c48d1de 100644 --- a/ast/misc.go +++ b/ast/misc.go @@ -49,7 +49,7 @@ var ( const ( ReadCommitted = "READ-COMMITTED" ReadUncommitted = "READ-UNCOMMITTED" - Serializable = "SERIALIZABLE " + Serializable = "SERIALIZABLE" RepeatableRead = "REPEATABLE-READ" ) diff --git a/distsql/distsql.go b/distsql/distsql.go index 57e167beb8e27..e63da87974d13 100644 --- a/distsql/distsql.go +++ b/distsql/distsql.go @@ -184,7 +184,7 @@ func (pr *partialResult) Close() error { // concurrency: The max concurrency for underlying coprocessor request. // keepOrder: If the result should returned in key order. For example if we need keep data in order by // scan index, we should set keepOrder to true. -func Select(client kv.Client, ctx goctx.Context, req *tipb.SelectRequest, keyRanges []kv.KeyRange, concurrency int, keepOrder bool) (SelectResult, error) { +func Select(client kv.Client, ctx goctx.Context, req *tipb.SelectRequest, keyRanges []kv.KeyRange, concurrency int, keepOrder bool, isolationLevel kv.IsoLevel) (SelectResult, error) { var err error defer func() { // Add metrics @@ -196,7 +196,7 @@ func Select(client kv.Client, ctx goctx.Context, req *tipb.SelectRequest, keyRan }() // Convert tipb.*Request to kv.Request. - kvReq, err1 := composeRequest(req, keyRanges, concurrency, keepOrder) + kvReq, err1 := composeRequest(req, keyRanges, concurrency, keepOrder, isolationLevel) if err1 != nil { err = errors.Trace(err1) return nil, err @@ -229,7 +229,7 @@ func Select(client kv.Client, ctx goctx.Context, req *tipb.SelectRequest, keyRan // concurrency: The max concurrency for underlying coprocessor request. // keepOrder: If the result should returned in key order. For example if we need keep data in order by // scan index, we should set keepOrder to true. -func SelectDAG(client kv.Client, ctx goctx.Context, dag *tipb.DAGRequest, keyRanges []kv.KeyRange, concurrency int, keepOrder bool, desc bool) (SelectResult, error) { +func SelectDAG(client kv.Client, ctx goctx.Context, dag *tipb.DAGRequest, keyRanges []kv.KeyRange, concurrency int, keepOrder bool, desc bool, isolationLevel kv.IsoLevel) (SelectResult, error) { var err error defer func() { // Add metrics. @@ -241,11 +241,12 @@ func SelectDAG(client kv.Client, ctx goctx.Context, dag *tipb.DAGRequest, keyRan }() kvReq := &kv.Request{ - Tp: kv.ReqTypeDAG, - Concurrency: concurrency, - KeepOrder: keepOrder, - KeyRanges: keyRanges, - Desc: desc, + Tp: kv.ReqTypeDAG, + Concurrency: concurrency, + KeepOrder: keepOrder, + KeyRanges: keyRanges, + Desc: desc, + IsolationLevel: isolationLevel, } kvReq.Data, err = dag.Marshal() if err != nil { @@ -267,11 +268,12 @@ func SelectDAG(client kv.Client, ctx goctx.Context, dag *tipb.DAGRequest, keyRan } // Convert tipb.Request to kv.Request. -func composeRequest(req *tipb.SelectRequest, keyRanges []kv.KeyRange, concurrency int, keepOrder bool) (*kv.Request, error) { +func composeRequest(req *tipb.SelectRequest, keyRanges []kv.KeyRange, concurrency int, keepOrder bool, isolationLevel kv.IsoLevel) (*kv.Request, error) { kvReq := &kv.Request{ - Concurrency: concurrency, - KeepOrder: keepOrder, - KeyRanges: keyRanges, + Concurrency: concurrency, + KeepOrder: keepOrder, + KeyRanges: keyRanges, + IsolationLevel: isolationLevel, } if req.IndexInfo != nil { kvReq.Tp = kv.ReqTypeIndex diff --git a/executor/distsql.go b/executor/distsql.go index a40c294ea5ed2..70cb0c6ac5302 100644 --- a/executor/distsql.go +++ b/executor/distsql.go @@ -21,6 +21,7 @@ import ( "github.com/juju/errors" "github.com/ngaut/log" + "github.com/pingcap/tidb/ast" "github.com/pingcap/tidb/context" "github.com/pingcap/tidb/distsql" "github.com/pingcap/tidb/distsql/xeval" @@ -659,12 +660,20 @@ func (e *XSelectIndexExec) doIndexRequest() (distsql.SelectResult, error) { for i, v := range e.index.Columns { fieldTypes[i] = &(e.table.Cols()[v.Offset].FieldType) } - sc := e.ctx.GetSessionVars().StmtCtx + sv := e.ctx.GetSessionVars() + sc := sv.StmtCtx keyRanges, err := indexRangesToKVRanges(sc, e.table.Meta().ID, e.index.ID, e.ranges, fieldTypes) if err != nil { return nil, errors.Trace(err) } - return distsql.Select(e.ctx.GetClient(), e.ctx.GoCtx(), selIdxReq, keyRanges, e.scanConcurrency, !e.outOfOrder) + return distsql.Select(e.ctx.GetClient(), e.ctx.GoCtx(), selIdxReq, keyRanges, e.scanConcurrency, !e.outOfOrder, getIsolationLevel(sv)) +} + +func getIsolationLevel(sv *variable.SessionVars) kv.IsoLevel { + if sv.Systems[variable.TxnIsolation] == ast.ReadCommitted { + return kv.RC + } + return kv.SI } func (e *XSelectIndexExec) buildTableTasks(handles []int64) []*lookupTableTask { @@ -809,7 +818,7 @@ func (e *XSelectIndexExec) doTableRequest(handles []int64) (distsql.SelectResult keyRanges := tableHandlesToKVRanges(e.table.Meta().ID, handles) // Use the table scan concurrency variable to do table request. concurrency := e.ctx.GetSessionVars().DistSQLScanConcurrency - resp, err := distsql.Select(e.ctx.GetClient(), goctx.Background(), selTableReq, keyRanges, concurrency, false) + resp, err := distsql.Select(e.ctx.GetClient(), goctx.Background(), selTableReq, keyRanges, concurrency, false, getIsolationLevel(e.ctx.GetSessionVars())) if err != nil { return nil, errors.Trace(err) } @@ -889,7 +898,7 @@ func (e *XSelectTableExec) doRequest() error { selReq.GroupBy = e.byItems kvRanges := tableRangesToKVRanges(e.table.Meta().ID, e.ranges) - e.result, err = distsql.Select(e.ctx.GetClient(), goctx.Background(), selReq, kvRanges, e.ctx.GetSessionVars().DistSQLScanConcurrency, e.keepOrder) + e.result, err = distsql.Select(e.ctx.GetClient(), goctx.Background(), selReq, kvRanges, e.ctx.GetSessionVars().DistSQLScanConcurrency, e.keepOrder, getIsolationLevel(e.ctx.GetSessionVars())) if err != nil { return errors.Trace(err) } diff --git a/executor/new_distsql.go b/executor/new_distsql.go index d96d19056c1ae..c964dd6b99956 100644 --- a/executor/new_distsql.go +++ b/executor/new_distsql.go @@ -114,7 +114,7 @@ func (e *TableReaderExecutor) Next() (*Row, error) { func (e *TableReaderExecutor) Open() error { kvRanges := tableRangesToKVRanges(e.tableID, e.ranges) var err error - e.result, err = distsql.SelectDAG(e.ctx.GetClient(), goctx.Background(), e.dagPB, kvRanges, e.ctx.GetSessionVars().DistSQLScanConcurrency, e.keepOrder, e.desc) + e.result, err = distsql.SelectDAG(e.ctx.GetClient(), goctx.Background(), e.dagPB, kvRanges, e.ctx.GetSessionVars().DistSQLScanConcurrency, e.keepOrder, e.desc, getIsolationLevel(e.ctx.GetSessionVars())) if err != nil { return errors.Trace(err) } @@ -126,7 +126,7 @@ func (e *TableReaderExecutor) Open() error { func (e *TableReaderExecutor) doRequestForHandles(handles []int64, goCtx goctx.Context) error { kvRanges := tableHandlesToKVRanges(e.tableID, handles) var err error - e.result, err = distsql.SelectDAG(e.ctx.GetClient(), goCtx, e.dagPB, kvRanges, e.ctx.GetSessionVars().DistSQLScanConcurrency, e.keepOrder, e.desc) + e.result, err = distsql.SelectDAG(e.ctx.GetClient(), goCtx, e.dagPB, kvRanges, e.ctx.GetSessionVars().DistSQLScanConcurrency, e.keepOrder, e.desc, getIsolationLevel(e.ctx.GetSessionVars())) if err != nil { return errors.Trace(err) } @@ -226,7 +226,7 @@ func (e *IndexReaderExecutor) Open() error { if err != nil { return errors.Trace(err) } - e.result, err = distsql.SelectDAG(e.ctx.GetClient(), e.ctx.GoCtx(), e.dagPB, kvRanges, e.ctx.GetSessionVars().DistSQLScanConcurrency, e.keepOrder, e.desc) + e.result, err = distsql.SelectDAG(e.ctx.GetClient(), e.ctx.GoCtx(), e.dagPB, kvRanges, e.ctx.GetSessionVars().DistSQLScanConcurrency, e.keepOrder, e.desc, getIsolationLevel(e.ctx.GetSessionVars())) if err != nil { return errors.Trace(err) } @@ -240,7 +240,7 @@ func (e *IndexReaderExecutor) doRequestForDatums(values [][]types.Datum, goCtx g if err != nil { return errors.Trace(err) } - e.result, err = distsql.SelectDAG(e.ctx.GetClient(), e.ctx.GoCtx(), e.dagPB, kvRanges, e.ctx.GetSessionVars().DistSQLScanConcurrency, e.keepOrder, e.desc) + e.result, err = distsql.SelectDAG(e.ctx.GetClient(), e.ctx.GoCtx(), e.dagPB, kvRanges, e.ctx.GetSessionVars().DistSQLScanConcurrency, e.keepOrder, e.desc, getIsolationLevel(e.ctx.GetSessionVars())) if err != nil { return errors.Trace(err) } @@ -283,7 +283,7 @@ func (e *IndexLookUpExecutor) Open() error { if err != nil { return errors.Trace(err) } - e.result, err = distsql.SelectDAG(e.ctx.GetClient(), e.ctx.GoCtx(), e.dagPB, kvRanges, e.ctx.GetSessionVars().DistSQLScanConcurrency, e.keepOrder, e.desc) + e.result, err = distsql.SelectDAG(e.ctx.GetClient(), e.ctx.GoCtx(), e.dagPB, kvRanges, e.ctx.GetSessionVars().DistSQLScanConcurrency, e.keepOrder, e.desc, getIsolationLevel(e.ctx.GetSessionVars())) if err != nil { return errors.Trace(err) } @@ -303,7 +303,7 @@ func (e *IndexLookUpExecutor) doRequestForDatums(values [][]types.Datum, goCtx g if err != nil { return errors.Trace(err) } - e.result, err = distsql.SelectDAG(e.ctx.GetClient(), e.ctx.GoCtx(), e.dagPB, kvRanges, e.ctx.GetSessionVars().DistSQLScanConcurrency, e.keepOrder, e.desc) + e.result, err = distsql.SelectDAG(e.ctx.GetClient(), e.ctx.GoCtx(), e.dagPB, kvRanges, e.ctx.GetSessionVars().DistSQLScanConcurrency, e.keepOrder, e.desc, getIsolationLevel(e.ctx.GetSessionVars())) if err != nil { return errors.Trace(err) } From 3d2e83bf12f0006ca2b513e0c6b1d2fc844fef48 Mon Sep 17 00:00:00 2001 From: tiancaiamao Date: Tue, 4 Jul 2017 20:50:12 +0800 Subject: [PATCH 3/4] make golint happy --- ast/misc.go | 1 + 1 file changed, 1 insertion(+) diff --git a/ast/misc.go b/ast/misc.go index 1c9db8c48d1de..88d9b379e6b73 100644 --- a/ast/misc.go +++ b/ast/misc.go @@ -46,6 +46,7 @@ var ( _ Node = &VariableAssignment{} ) +// Isolation level constants. const ( ReadCommitted = "READ-COMMITTED" ReadUncommitted = "READ-UNCOMMITTED" From f371019be58da1eccaa3a59cdc9ce1bc5fee21e4 Mon Sep 17 00:00:00 2001 From: tiancaiamao Date: Mon, 17 Jul 2017 15:43:35 +0800 Subject: [PATCH 4/4] update --- executor/set.go | 7 +++++++ executor/set_test.go | 12 +++++++++--- 2 files changed, 16 insertions(+), 3 deletions(-) diff --git a/executor/set.go b/executor/set.go index ca52098519088..e6bfe57a41f3b 100644 --- a/executor/set.go +++ b/executor/set.go @@ -23,6 +23,7 @@ import ( "github.com/pingcap/tidb/ast" "github.com/pingcap/tidb/context" "github.com/pingcap/tidb/expression" + "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/sessionctx" "github.com/pingcap/tidb/sessionctx/variable" "github.com/pingcap/tidb/sessionctx/varsutil" @@ -143,6 +144,12 @@ func (e *SetExecutor) executeSet() error { valStr, _ := value.ToString() log.Infof("[%d] set system variable %s = %s", sessionVars.ConnectionID, name, valStr) } + + if name == variable.TxnIsolation { + if sessionVars.Systems[variable.TxnIsolation] == ast.ReadCommitted { + e.ctx.Txn().SetOption(kv.IsolationLevel, kv.RC) + } + } } return nil } diff --git a/executor/set_test.go b/executor/set_test.go index 46f6e3ac4337b..7f5dfc96c5f83 100644 --- a/executor/set_test.go +++ b/executor/set_test.go @@ -125,13 +125,19 @@ func (s *testSuite) TestSetVar(c *C) { c.Assert(vars.SkipConstraintCheck, IsFalse) // Test set transaction isolation level, which is equivalent to setting variable "tx_isolation". - tk.MustExec("SET GLOBAL TRANSACTION ISOLATION LEVEL SERIALIZABLE") - tk.MustQuery("select @@global.tx_isolation").Check(testkit.Rows("SERIALIZABLE")) - + tk.MustExec("SET SESSION TRANSACTION ISOLATION LEVEL READ COMMITTED") + tk.MustQuery("select @@session.tx_isolation").Check(testkit.Rows("READ-COMMITTED")) tk.MustExec("SET SESSION TRANSACTION ISOLATION LEVEL READ UNCOMMITTED") tk.MustQuery("select @@session.tx_isolation").Check(testkit.Rows("READ-UNCOMMITTED")) + tk.MustExec("SET GLOBAL TRANSACTION ISOLATION LEVEL SERIALIZABLE") + tk.MustQuery("select @@global.tx_isolation").Check(testkit.Rows("SERIALIZABLE")) + // Even the transaction fail, set session variable would success. + tk.MustExec("BEGIN") tk.MustExec("SET SESSION TRANSACTION ISOLATION LEVEL READ COMMITTED") + _, err = tk.Se.Execute(`INSERT INTO t VALUES ("sdfsdf")`) + c.Assert(err, NotNil) + tk.MustExec("COMMIT") tk.MustQuery("select @@session.tx_isolation").Check(testkit.Rows("READ-COMMITTED")) tk.MustExec("set global avoid_temporal_upgrade = on")