diff --git a/bindinfo/bind_test.go b/bindinfo/bind_test.go index 1eb7e1478b2f9..22b60187f0a77 100644 --- a/bindinfo/bind_test.go +++ b/bindinfo/bind_test.go @@ -35,6 +35,7 @@ import ( "github.com/pingcap/tidb/metrics" plannercore "github.com/pingcap/tidb/planner/core" "github.com/pingcap/tidb/session" + "github.com/pingcap/tidb/session/txninfo" "github.com/pingcap/tidb/store/mockstore" "github.com/pingcap/tidb/store/tikv/mockstore/cluster" "github.com/pingcap/tidb/util" @@ -70,6 +71,10 @@ type mockSessionManager struct { PS []*util.ProcessInfo } +func (msm *mockSessionManager) ShowTxnList() []*txninfo.TxnInfo { + panic("unimplemented!") +} + func (msm *mockSessionManager) ShowProcessList() map[uint64]*util.ProcessInfo { ret := make(map[uint64]*util.ProcessInfo) for _, item := range msm.PS { diff --git a/domain/domain_test.go b/domain/domain_test.go index 7c9d9ff633bc5..a4432b0fb1fe6 100644 --- a/domain/domain_test.go +++ b/domain/domain_test.go @@ -35,6 +35,7 @@ import ( "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/meta" "github.com/pingcap/tidb/metrics" + "github.com/pingcap/tidb/session/txninfo" "github.com/pingcap/tidb/sessionctx/variable" "github.com/pingcap/tidb/store/mockstore" "github.com/pingcap/tidb/store/tikv" @@ -241,6 +242,10 @@ type mockSessionManager struct { PS []*util.ProcessInfo } +func (msm *mockSessionManager) ShowTxnList() []*txninfo.TxnInfo { + panic("unimplemented!") +} + func (msm *mockSessionManager) ShowProcessList() map[uint64]*util.ProcessInfo { ret := make(map[uint64]*util.ProcessInfo) for _, item := range msm.PS { diff --git a/errno/errname.go b/errno/errname.go index 5afdbbb91c4c0..62662ce5ac934 100644 --- a/errno/errname.go +++ b/errno/errname.go @@ -1020,7 +1020,7 @@ var MySQLErrName = map[uint16]*mysql.ErrMessage{ ErrBuildExecutor: mysql.Message("Failed to build executor", nil), ErrBatchInsertFail: mysql.Message("Batch insert failed, please clean the table and try again.", nil), ErrGetStartTS: mysql.Message("Can not get start ts", nil), - ErrPrivilegeCheckFail: mysql.Message("privilege check fail", nil), // this error message should begin lowercased to be compatible with the test + ErrPrivilegeCheckFail: mysql.Message("privilege check for '%s' fail", nil), // this error message should begin lowercased to be compatible with the test ErrInvalidWildCard: mysql.Message("Wildcard fields without any table name appears in wrong place", nil), ErrMixOfGroupFuncAndFieldsIncompatible: mysql.Message("In aggregated query without GROUP BY, expression #%d of SELECT list contains nonaggregated column '%s'; this is incompatible with sql_mode=only_full_group_by", nil), ErrUnsupportedSecondArgumentType: mysql.Message("JSON_OBJECTAGG: unsupported second argument type %v", nil), diff --git a/errors.toml b/errors.toml index 458af951629d8..0ce61654373fb 100644 --- a/errors.toml +++ b/errors.toml @@ -1133,7 +1133,7 @@ Schema has changed ["planner:8121"] error = ''' -privilege check fail +privilege check for '%s' fail ''' ["planner:8122"] diff --git a/executor/adapter.go b/executor/adapter.go index 5f5229195c3f9..44d00cd1efa1e 100644 --- a/executor/adapter.go +++ b/executor/adapter.go @@ -55,6 +55,7 @@ import ( "github.com/pingcap/tidb/util/sqlexec" "github.com/pingcap/tidb/util/stmtsummary" "github.com/pingcap/tidb/util/stringutil" + "go.uber.org/zap" "go.uber.org/zap/zapcore" ) @@ -377,6 +378,7 @@ func (a *ExecStmt) Exec(ctx context.Context) (_ sqlexec.RecordSet, err error) { if txn.Valid() { txnStartTS = txn.StartTS() } + return &recordSet{ executor: e, stmt: a, @@ -590,6 +592,7 @@ func (a *ExecStmt) handlePessimisticDML(ctx context.Context, e Executor) error { } e, err = a.handlePessimisticLockError(ctx, err) if err != nil { + // todo: Report deadlock if ErrDeadlock.Equal(err) { metrics.StatementDeadlockDetectDuration.Observe(time.Since(startLocking).Seconds()) } diff --git a/executor/builder.go b/executor/builder.go index 40282d1030b2c..3324e52f894ff 100644 --- a/executor/builder.go +++ b/executor/builder.go @@ -1531,7 +1531,9 @@ func (b *executorBuilder) buildMemTable(v *plannercore.PhysicalMemTable) Executo strings.ToLower(infoschema.TablePlacementPolicy), strings.ToLower(infoschema.TableClientErrorsSummaryGlobal), strings.ToLower(infoschema.TableClientErrorsSummaryByUser), - strings.ToLower(infoschema.TableClientErrorsSummaryByHost): + strings.ToLower(infoschema.TableClientErrorsSummaryByHost), + strings.ToLower(infoschema.TableTiDBTrx), + strings.ToLower(infoschema.ClusterTableTiDBTrx): return &MemTableReaderExec{ baseExecutor: newBaseExecutor(b.ctx, v.Schema(), v.ID()), table: v.Table, diff --git a/executor/executor_pkg_test.go b/executor/executor_pkg_test.go index 7cc5a8a69d66e..5591dcefde54d 100644 --- a/executor/executor_pkg_test.go +++ b/executor/executor_pkg_test.go @@ -30,6 +30,7 @@ import ( "github.com/pingcap/tidb/executor/aggfuncs" "github.com/pingcap/tidb/expression" plannerutil "github.com/pingcap/tidb/planner/util" + txninfo "github.com/pingcap/tidb/session/txninfo" "github.com/pingcap/tidb/sessionctx/variable" "github.com/pingcap/tidb/types" "github.com/pingcap/tidb/util" @@ -60,6 +61,10 @@ type mockSessionManager struct { serverID uint64 } +func (msm *mockSessionManager) ShowTxnList() []*txninfo.TxnInfo { + panic("unimplemented!") +} + // ShowProcessList implements the SessionManager.ShowProcessList interface. func (msm *mockSessionManager) ShowProcessList() map[uint64]*util.ProcessInfo { ret := make(map[uint64]*util.ProcessInfo) diff --git a/executor/explainfor_test.go b/executor/explainfor_test.go index a113200a925d8..e29a7a3e24cee 100644 --- a/executor/explainfor_test.go +++ b/executor/explainfor_test.go @@ -26,6 +26,7 @@ import ( "github.com/pingcap/parser/auth" "github.com/pingcap/tidb/planner/core" "github.com/pingcap/tidb/session" + txninfo "github.com/pingcap/tidb/session/txninfo" "github.com/pingcap/tidb/sessionctx/variable" "github.com/pingcap/tidb/util" "github.com/pingcap/tidb/util/israce" @@ -38,6 +39,10 @@ type mockSessionManager1 struct { PS []*util.ProcessInfo } +func (msm *mockSessionManager1) ShowTxnList() []*txninfo.TxnInfo { + return nil +} + // ShowProcessList implements the SessionManager.ShowProcessList interface. func (msm *mockSessionManager1) ShowProcessList() map[uint64]*util.ProcessInfo { ret := make(map[uint64]*util.ProcessInfo) diff --git a/executor/infoschema_reader.go b/executor/infoschema_reader.go index 0ec0c48885ecf..ae338bdd644d2 100644 --- a/executor/infoschema_reader.go +++ b/executor/infoschema_reader.go @@ -149,6 +149,10 @@ func (e *memtableRetriever) retrieve(ctx context.Context, sctx sessionctx.Contex infoschema.TableClientErrorsSummaryByUser, infoschema.TableClientErrorsSummaryByHost: err = e.setDataForClientErrorsSummary(sctx, e.table.Name.O) + case infoschema.TableTiDBTrx: + e.setDataForTiDBTrx(sctx) + case infoschema.ClusterTableTiDBTrx: + err = e.setDataForClusterTiDBTrx(sctx) } if err != nil { return nil, err @@ -2011,6 +2015,40 @@ func (e *memtableRetriever) setDataForClientErrorsSummary(ctx sessionctx.Context return nil } +func (e *memtableRetriever) setDataForTiDBTrx(ctx sessionctx.Context) { + sm := ctx.GetSessionManager() + if sm == nil { + return + } + + loginUser := ctx.GetSessionVars().User + var hasProcessPriv bool + if pm := privilege.GetPrivilegeManager(ctx); pm != nil { + if pm.RequestVerification(ctx.GetSessionVars().ActiveRoles, "", "", "", mysql.ProcessPriv) { + hasProcessPriv = true + } + } + infoList := sm.ShowTxnList() + for _, info := range infoList { + // If you have the PROCESS privilege, you can see all running transactions. + // Otherwise, you can see only your own transactions. + if !hasProcessPriv && loginUser != nil && info.Username != loginUser.Username { + continue + } + e.rows = append(e.rows, info.ToDatum()) + } +} + +func (e *memtableRetriever) setDataForClusterTiDBTrx(ctx sessionctx.Context) error { + e.setDataForTiDBTrx(ctx) + rows, err := infoschema.AppendHostInfoToRows(ctx, e.rows) + if err != nil { + return err + } + e.rows = rows + return nil +} + type hugeMemTableRetriever struct { dummyCloser table *model.TableInfo diff --git a/executor/infoschema_reader_test.go b/executor/infoschema_reader_test.go index c3e125824873d..e19eb9d9b3064 100644 --- a/executor/infoschema_reader_test.go +++ b/executor/infoschema_reader_test.go @@ -36,6 +36,7 @@ import ( "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/server" "github.com/pingcap/tidb/session" + txninfo "github.com/pingcap/tidb/session/txninfo" "github.com/pingcap/tidb/sessionctx/variable" "github.com/pingcap/tidb/statistics" "github.com/pingcap/tidb/statistics/handle" @@ -728,6 +729,10 @@ type mockSessionManager struct { serverID uint64 } +func (sm *mockSessionManager) ShowTxnList() []*txninfo.TxnInfo { + panic("unimplemented!") +} + func (sm *mockSessionManager) ShowProcessList() map[uint64]*util.ProcessInfo { return sm.processInfoMap } diff --git a/executor/mpp_gather.go b/executor/mpp_gather.go index 64236558af94e..7cfeb613c40f6 100644 --- a/executor/mpp_gather.go +++ b/executor/mpp_gather.go @@ -30,7 +30,7 @@ import ( ) func useMPPExecution(ctx sessionctx.Context, tr *plannercore.PhysicalTableReader) bool { - if !ctx.GetSessionVars().AllowMPPExecution { + if !ctx.GetSessionVars().IsMPPAllowed() { return false } _, ok := tr.GetTablePlan().(*plannercore.PhysicalExchangeSender) diff --git a/executor/partition_table_test.go b/executor/partition_table_test.go index 8ce126972ec73..0c9e63129ebf1 100644 --- a/executor/partition_table_test.go +++ b/executor/partition_table_test.go @@ -434,6 +434,72 @@ func (s *partitionTableSuite) TestGlobalStatsAndSQLBinding(c *C) { tk.MustIndexLookup("select * from tlist where a<1") } +func createTable4DynamicPruneModeTestWithExpression(tk *testkit.TestKit) { + tk.MustExec("create table trange(a int) partition by range(a) (partition p0 values less than(3), partition p1 values less than (5), partition p2 values less than(11));") + tk.MustExec("create table thash(a int) partition by hash(a) partitions 4;") + tk.MustExec("create table t(a int)") + tk.MustExec("insert into trange values(1), (1), (1), (2), (3), (4), (5), (6), (7), (7), (10), (NULL), (NULL);") + tk.MustExec("insert into thash values(1), (1), (1), (2), (3), (4), (5), (6), (7), (7), (10), (NULL), (NULL);") + tk.MustExec("insert into t values(1), (1), (1), (2), (3), (4), (5), (6), (7), (7), (10), (NULL), (NULL);") + tk.MustExec("set session tidb_partition_prune_mode='dynamic'") + tk.MustExec("analyze table trange") + tk.MustExec("analyze table thash") + tk.MustExec("analyze table t") +} + +type testData4Expression struct { + sql string + partitions []string +} + +func (s *partitionTableSuite) TestDynamicPruneModeWithEqualExpression(c *C) { + tk := testkit.NewTestKitWithInit(c, s.store) + tk.MustExec("drop database if exists db_equal_expression") + tk.MustExec("create database db_equal_expression") + tk.MustExec("use db_equal_expression") + createTable4DynamicPruneModeTestWithExpression(tk) + + tables := []string{"trange", "thash"} + tests := []testData4Expression{ + { + sql: "select * from %s where a = 2", + partitions: []string{ + "p0", + "p2", + }, + }, + { + sql: "select * from %s where a = 4 or a = 1", + partitions: []string{ + "p0,p1", + "p0,p1", + }, + }, + { + sql: "select * from %s where a = -1", + partitions: []string{ + "p0", + "p1", + }, + }, + { + sql: "select * from %s where a is NULL", + partitions: []string{ + "p0", + "p0", + }, + }, + } + + for _, t := range tests { + for i := range t.partitions { + sql := fmt.Sprintf(t.sql, tables[i]) + c.Assert(tk.MustPartition(sql, t.partitions[i]), IsTrue) + tk.MustQuery(sql).Sort().Check(tk.MustQuery(fmt.Sprintf(t.sql, "t")).Sort().Rows()) + } + } +} + func (s *partitionTableSuite) TestDirectReadingWithAgg(c *C) { if israce.RaceEnabled { c.Skip("exhaustive types test, skip race test") diff --git a/executor/prepared_test.go b/executor/prepared_test.go index 1f8edf79d942e..e0e2c19ee0f22 100644 --- a/executor/prepared_test.go +++ b/executor/prepared_test.go @@ -26,6 +26,7 @@ import ( "github.com/pingcap/tidb/domain" plannercore "github.com/pingcap/tidb/planner/core" "github.com/pingcap/tidb/session" + txninfo "github.com/pingcap/tidb/session/txninfo" "github.com/pingcap/tidb/sessionctx/variable" "github.com/pingcap/tidb/util" "github.com/pingcap/tidb/util/israce" @@ -135,6 +136,10 @@ type mockSessionManager2 struct { killed bool } +func (sm *mockSessionManager2) ShowTxnList() []*txninfo.TxnInfo { + panic("unimplemented!") +} + func (sm *mockSessionManager2) ShowProcessList() map[uint64]*util.ProcessInfo { pl := make(map[uint64]*util.ProcessInfo) if pi, ok := sm.GetProcessInfo(0); ok { diff --git a/executor/revoke.go b/executor/revoke.go index 1477534962fbe..b090f048c62a7 100644 --- a/executor/revoke.go +++ b/executor/revoke.go @@ -88,8 +88,14 @@ func (e *RevokeExec) Next(ctx context.Context, req *chunk.Chunk) error { return err } + sessVars := e.ctx.GetSessionVars() // Revoke for each user. for _, user := range e.Users { + if user.User.CurrentUser { + user.User.Username = sessVars.User.AuthUsername + user.User.Hostname = sessVars.User.AuthHostname + } + // Check if user exists. exists, err := userExists(e.ctx, user.User.Username, user.User.Hostname) if err != nil { diff --git a/executor/seqtest/prepared_test.go b/executor/seqtest/prepared_test.go index 916f218db1f9d..bb8f05e5eff54 100644 --- a/executor/seqtest/prepared_test.go +++ b/executor/seqtest/prepared_test.go @@ -25,6 +25,7 @@ import ( "github.com/pingcap/tidb/metrics" plannercore "github.com/pingcap/tidb/planner/core" "github.com/pingcap/tidb/session" + "github.com/pingcap/tidb/session/txninfo" "github.com/pingcap/tidb/types" "github.com/pingcap/tidb/util" "github.com/pingcap/tidb/util/kvcache" @@ -796,6 +797,10 @@ type mockSessionManager1 struct { Se session.Session } +func (msm *mockSessionManager1) ShowTxnList() []*txninfo.TxnInfo { + panic("unimplemented!") +} + // ShowProcessList implements the SessionManager.ShowProcessList interface. func (msm *mockSessionManager1) ShowProcessList() map[uint64]*util.ProcessInfo { ret := make(map[uint64]*util.ProcessInfo) diff --git a/executor/stale_txn_test.go b/executor/stale_txn_test.go index c68b8a5bfa511..ce5202ae58a75 100644 --- a/executor/stale_txn_test.go +++ b/executor/stale_txn_test.go @@ -256,65 +256,65 @@ func (s *testStaleTxnSerialSuite) TestTimeBoundedStalenessTxn(c *C) { defer tk.MustExec(`drop table if exists t`) tk.MustExec(`START TRANSACTION READ ONLY WITH TIMESTAMP BOUND MAX STALENESS '00:00:10'`) testcases := []struct { - name string - sql string - injectResolveTS uint64 - useResolveTS bool + name string + sql string + injectSafeTS uint64 + useSafeTS bool }{ { - name: "max 20 seconds ago, resolveTS 10 secs ago", + name: "max 20 seconds ago, safeTS 10 secs ago", sql: `START TRANSACTION READ ONLY WITH TIMESTAMP BOUND MAX STALENESS '00:00:20'`, - injectResolveTS: func() uint64 { + injectSafeTS: func() uint64 { phy := time.Now().Add(-10*time.Second).Unix() * 1000 return oracle.ComposeTS(phy, 0) }(), - useResolveTS: true, + useSafeTS: true, }, { - name: "max 10 seconds ago, resolveTS 20 secs ago", + name: "max 10 seconds ago, safeTS 20 secs ago", sql: `START TRANSACTION READ ONLY WITH TIMESTAMP BOUND MAX STALENESS '00:00:10'`, - injectResolveTS: func() uint64 { + injectSafeTS: func() uint64 { phy := time.Now().Add(-20*time.Second).Unix() * 1000 return oracle.ComposeTS(phy, 0) }(), - useResolveTS: false, + useSafeTS: false, }, { - name: "max 20 seconds ago, resolveTS 10 secs ago", + name: "max 20 seconds ago, safeTS 10 secs ago", sql: func() string { return fmt.Sprintf(`START TRANSACTION READ ONLY WITH TIMESTAMP BOUND MIN READ TIMESTAMP '%v'`, time.Now().Add(-20*time.Second).Format("2006-01-02 15:04:05")) }(), - injectResolveTS: func() uint64 { + injectSafeTS: func() uint64 { phy := time.Now().Add(-10*time.Second).Unix() * 1000 return oracle.ComposeTS(phy, 0) }(), - useResolveTS: true, + useSafeTS: true, }, { - name: "max 10 seconds ago, resolveTS 20 secs ago", + name: "max 10 seconds ago, safeTS 20 secs ago", sql: func() string { return fmt.Sprintf(`START TRANSACTION READ ONLY WITH TIMESTAMP BOUND MIN READ TIMESTAMP '%v'`, time.Now().Add(-10*time.Second).Format("2006-01-02 15:04:05")) }(), - injectResolveTS: func() uint64 { + injectSafeTS: func() uint64 { phy := time.Now().Add(-20*time.Second).Unix() * 1000 return oracle.ComposeTS(phy, 0) }(), - useResolveTS: false, + useSafeTS: false, }, } for _, testcase := range testcases { c.Log(testcase.name) - c.Assert(failpoint.Enable("github.com/pingcap/tidb/store/tikv/injectResolveTS", - fmt.Sprintf("return(%v)", testcase.injectResolveTS)), IsNil) + c.Assert(failpoint.Enable("github.com/pingcap/tidb/store/tikv/injectSafeTS", + fmt.Sprintf("return(%v)", testcase.injectSafeTS)), IsNil) tk.MustExec(testcase.sql) - if testcase.useResolveTS { - c.Assert(tk.Se.GetSessionVars().TxnCtx.StartTS, Equals, testcase.injectResolveTS) + if testcase.useSafeTS { + c.Assert(tk.Se.GetSessionVars().TxnCtx.StartTS, Equals, testcase.injectSafeTS) } else { - c.Assert(oracle.CompareTS(tk.Se.GetSessionVars().TxnCtx.StartTS, testcase.injectResolveTS), Equals, 1) + c.Assert(oracle.CompareTS(tk.Se.GetSessionVars().TxnCtx.StartTS, testcase.injectSafeTS), Equals, 1) } tk.MustExec("commit") - failpoint.Disable("github.com/pingcap/tidb/store/tikv/injectResolveTS") + failpoint.Disable("github.com/pingcap/tidb/store/tikv/injectSafeTS") } } diff --git a/expression/integration_test.go b/expression/integration_test.go index d4b6a031087e4..f15dc5822be15 100644 --- a/expression/integration_test.go +++ b/expression/integration_test.go @@ -9156,6 +9156,15 @@ func (s *testIntegrationSerialSuite) TestIssue23805(c *C) { tk.MustExec("insert ignore into tbl_5 set col_28 = 'ZmZIdSnq' , col_25 = '18:50:52.00' on duplicate key update col_26 = 'y';\n") } +func (s *testIntegrationSuite) TestIssue24429(c *C) { + tk := testkit.NewTestKit(c, s.store) + + tk.MustExec("set @@sql_mode = ANSI_QUOTES;") + tk.MustExec("use test") + tk.MustExec("create table t (a int);") + tk.MustQuery(`select t."a"=10 from t;`).Check(testkit.Rows()) +} + func (s *testIntegrationSuite) TestVitessHash(c *C) { defer s.cleanEnv(c) tk := testkit.NewTestKit(c, s.store) diff --git a/go.mod b/go.mod index bf927f9cc55ce..cd8becf5a757c 100644 --- a/go.mod +++ b/go.mod @@ -47,7 +47,7 @@ require ( github.com/pingcap/goleveldb v0.0.0-20191226122134-f82aafb29989 github.com/pingcap/kvproto v0.0.0-20210429093846-65f54a202d7e github.com/pingcap/log v0.0.0-20210317133921-96f4fcab92a4 - github.com/pingcap/parser v0.0.0-20210427084954-8e8ed7927bde + github.com/pingcap/parser v0.0.0-20210508071014-cd9cd78e230c github.com/pingcap/sysutil v0.0.0-20210315073920-cc0985d983a3 github.com/pingcap/tidb-tools v4.0.9-0.20201127090955-2707c97b3853+incompatible github.com/pingcap/tipb v0.0.0-20210422074242-57dd881b81b1 diff --git a/go.sum b/go.sum index a3ebad580db64..4e1030039a04f 100644 --- a/go.sum +++ b/go.sum @@ -443,8 +443,8 @@ github.com/pingcap/log v0.0.0-20200511115504-543df19646ad/go.mod h1:4rbK1p9ILyIf github.com/pingcap/log v0.0.0-20201112100606-8f1e84a3abc8/go.mod h1:4rbK1p9ILyIfb6hU7OG2CiWSqMXnp3JMbiaVJ6mvoY8= github.com/pingcap/log v0.0.0-20210317133921-96f4fcab92a4 h1:ERrF0fTuIOnwfGbt71Ji3DKbOEaP189tjym50u8gpC8= github.com/pingcap/log v0.0.0-20210317133921-96f4fcab92a4/go.mod h1:4rbK1p9ILyIfb6hU7OG2CiWSqMXnp3JMbiaVJ6mvoY8= -github.com/pingcap/parser v0.0.0-20210427084954-8e8ed7927bde h1:CcGOCE3kr8aYBy6rRcWWldidL1X5smQxV79nlnzOk+o= -github.com/pingcap/parser v0.0.0-20210427084954-8e8ed7927bde/go.mod h1:xZC8I7bug4GJ5KtHhgAikjTfU4kBv1Sbo3Pf1MZ6lVw= +github.com/pingcap/parser v0.0.0-20210508071014-cd9cd78e230c h1:GLFd+wBN7EsV6ad/tVGFCD37taOyzIMVs3SdiWZF18I= +github.com/pingcap/parser v0.0.0-20210508071014-cd9cd78e230c/go.mod h1:xZC8I7bug4GJ5KtHhgAikjTfU4kBv1Sbo3Pf1MZ6lVw= github.com/pingcap/sysutil v0.0.0-20200206130906-2bfa6dc40bcd/go.mod h1:EB/852NMQ+aRKioCpToQ94Wl7fktV+FNnxf3CX/TTXI= github.com/pingcap/sysutil v0.0.0-20210315073920-cc0985d983a3 h1:A9KL9R+lWSVPH8IqUuH1QSTRJ5FGoY1bT2IcfPKsWD8= github.com/pingcap/sysutil v0.0.0-20210315073920-cc0985d983a3/go.mod h1:tckvA041UWP+NqYzrJ3fMgC/Hw9wnmQ/tUkp/JaHly8= diff --git a/infoschema/cluster.go b/infoschema/cluster.go index f113e90a0f587..2d196fe5b0023 100644 --- a/infoschema/cluster.go +++ b/infoschema/cluster.go @@ -37,6 +37,8 @@ const ( ClusterTableStatementsSummary = "CLUSTER_STATEMENTS_SUMMARY" // ClusterTableStatementsSummaryHistory is the string constant of cluster statement summary history table. ClusterTableStatementsSummaryHistory = "CLUSTER_STATEMENTS_SUMMARY_HISTORY" + // ClusterTableTiDBTrx is the string constant of cluster transaction running table. + ClusterTableTiDBTrx = "CLUSTER_TIDB_TRX" ) // memTableToClusterTables means add memory table to cluster table. @@ -45,6 +47,7 @@ var memTableToClusterTables = map[string]string{ TableProcesslist: ClusterTableProcesslist, TableStatementsSummary: ClusterTableStatementsSummary, TableStatementsSummaryHistory: ClusterTableStatementsSummaryHistory, + TableTiDBTrx: ClusterTableTiDBTrx, } func init() { diff --git a/infoschema/infoschema_test.go b/infoschema/infoschema_test.go index c3892e6527962..6aa0c5526f467 100644 --- a/infoschema/infoschema_test.go +++ b/infoschema/infoschema_test.go @@ -332,6 +332,7 @@ func (*testSuite) TestInfoTables(c *C) { "TABLESPACES", "COLLATION_CHARACTER_SET_APPLICABILITY", "PROCESSLIST", + "TIDB_TRX", } for _, t := range infoTables { tb, err1 := is.TableByName(util.InformationSchemaName, model.NewCIStr(t)) diff --git a/infoschema/tables.go b/infoschema/tables.go index bfca649e89fdd..2d5112ada05c0 100644 --- a/infoschema/tables.go +++ b/infoschema/tables.go @@ -31,11 +31,13 @@ import ( "github.com/pingcap/parser/model" "github.com/pingcap/parser/mysql" "github.com/pingcap/parser/terror" + "github.com/pingcap/tidb/config" "github.com/pingcap/tidb/ddl/placement" "github.com/pingcap/tidb/domain/infosync" "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/meta/autoid" + "github.com/pingcap/tidb/session/txninfo" "github.com/pingcap/tidb/sessionctx" "github.com/pingcap/tidb/sessionctx/variable" "github.com/pingcap/tidb/store/tikv" @@ -161,6 +163,8 @@ const ( TableClientErrorsSummaryByUser = "CLIENT_ERRORS_SUMMARY_BY_USER" // TableClientErrorsSummaryByHost is the string constant of client errors table. TableClientErrorsSummaryByHost = "CLIENT_ERRORS_SUMMARY_BY_HOST" + // TableTiDBTrx is current running transaction status table. + TableTiDBTrx = "TIDB_TRX" ) var tableIDMap = map[string]int64{ @@ -233,22 +237,25 @@ var tableIDMap = map[string]int64{ TableClientErrorsSummaryGlobal: autoid.InformationSchemaDBID + 67, TableClientErrorsSummaryByUser: autoid.InformationSchemaDBID + 68, TableClientErrorsSummaryByHost: autoid.InformationSchemaDBID + 69, + TableTiDBTrx: autoid.InformationSchemaDBID + 70, + ClusterTableTiDBTrx: autoid.InformationSchemaDBID + 71, } type columnInfo struct { - name string - tp byte - size int - decimal int - flag uint - deflt interface{} - comment string + name string + tp byte + size int + decimal int + flag uint + deflt interface{} + comment string + enumElems []string } func buildColumnInfo(col columnInfo) *model.ColumnInfo { mCharset := charset.CharsetBin mCollation := charset.CharsetBin - if col.tp == mysql.TypeVarchar || col.tp == mysql.TypeBlob || col.tp == mysql.TypeLongBlob { + if col.tp == mysql.TypeVarchar || col.tp == mysql.TypeBlob || col.tp == mysql.TypeLongBlob || col.tp == mysql.TypeEnum { mCharset = charset.CharsetUTF8MB4 mCollation = charset.CollationUTF8MB4 } @@ -259,6 +266,7 @@ func buildColumnInfo(col columnInfo) *model.ColumnInfo { Flen: col.size, Decimal: col.decimal, Flag: col.flag, + Elems: col.enumElems, } return &model.ColumnInfo{ Name: model.NewCIStr(col.name), @@ -1332,6 +1340,19 @@ var tableClientErrorsSummaryByHostCols = []columnInfo{ {name: "LAST_SEEN", tp: mysql.TypeTimestamp, size: 26}, } +var tableTiDBTrxCols = []columnInfo{ + {name: "ID", tp: mysql.TypeLonglong, size: 21, flag: mysql.PriKeyFlag | mysql.NotNullFlag | mysql.UnsignedFlag}, + {name: "START_TIME", tp: mysql.TypeTimestamp, size: 26, comment: "Start time of the transaction"}, + {name: "DIGEST", tp: mysql.TypeVarchar, size: 64, comment: "Digest of the sql the transaction are currently running"}, + {name: "STATE", tp: mysql.TypeEnum, enumElems: txninfo.TxnRunningStateStrs, comment: "Current running state of the transaction"}, + {name: "WAITING_START_TIME", tp: mysql.TypeTimestamp, size: 26, comment: "Current lock waiting's start time"}, + {name: "LEN", tp: mysql.TypeLonglong, size: 64, comment: "How many entries are in MemDB"}, + {name: "SIZE", tp: mysql.TypeLonglong, size: 64, comment: "MemDB used memory"}, + {name: "SESSION_ID", tp: mysql.TypeLonglong, size: 21, flag: mysql.UnsignedFlag, comment: "Which session this transaction belongs to"}, + {name: "USER", tp: mysql.TypeVarchar, size: 16, comment: "The user who open this session"}, + {name: "DB", tp: mysql.TypeVarchar, size: 64, comment: "The schema this transaction works on"}, +} + // GetShardingInfo returns a nil or description string for the sharding information of given TableInfo. // The returned description string may be: // - "NOT_SHARDED": for tables that SHARD_ROW_ID_BITS is not specified. @@ -1701,6 +1722,7 @@ var tableNameToColumns = map[string][]columnInfo{ TableClientErrorsSummaryGlobal: tableClientErrorsSummaryGlobalCols, TableClientErrorsSummaryByUser: tableClientErrorsSummaryByUserCols, TableClientErrorsSummaryByHost: tableClientErrorsSummaryByHostCols, + TableTiDBTrx: tableTiDBTrxCols, } func createInfoSchemaTable(_ autoid.Allocators, meta *model.TableInfo) (table.Table, error) { diff --git a/infoschema/tables_test.go b/infoschema/tables_test.go index f30f25ba6abfa..6cc24300c1be4 100644 --- a/infoschema/tables_test.go +++ b/infoschema/tables_test.go @@ -28,6 +28,7 @@ import ( . "github.com/pingcap/check" "github.com/pingcap/failpoint" "github.com/pingcap/fn" + "github.com/pingcap/parser" "github.com/pingcap/parser/auth" "github.com/pingcap/parser/model" "github.com/pingcap/parser/mysql" @@ -42,6 +43,7 @@ import ( plannercore "github.com/pingcap/tidb/planner/core" "github.com/pingcap/tidb/server" "github.com/pingcap/tidb/session" + "github.com/pingcap/tidb/session/txninfo" "github.com/pingcap/tidb/store/helper" "github.com/pingcap/tidb/store/mockstore" "github.com/pingcap/tidb/util" @@ -121,7 +123,7 @@ func (s *testClusterTableSuite) setUpRPCService(c *C, addr string) (*grpc.Server lis, err := net.Listen("tcp", addr) c.Assert(err, IsNil) // Fix issue 9836 - sm := &mockSessionManager{make(map[uint64]*util.ProcessInfo, 1)} + sm := &mockSessionManager{make(map[uint64]*util.ProcessInfo, 1), nil} sm.processInfoMap[1] = &util.ProcessInfo{ ID: 1, User: "root", @@ -276,7 +278,7 @@ func (s *testTableSuite) TestInfoschemaFieldValue(c *C) { tk1.MustQuery("select distinct(table_schema) from information_schema.tables").Check(testkit.Rows("INFORMATION_SCHEMA")) // Fix issue 9836 - sm := &mockSessionManager{make(map[uint64]*util.ProcessInfo, 1)} + sm := &mockSessionManager{make(map[uint64]*util.ProcessInfo, 1), nil} sm.processInfoMap[1] = &util.ProcessInfo{ ID: 1, User: "root", @@ -433,6 +435,11 @@ func (s *testTableSuite) TestCurrentTimestampAsDefault(c *C) { type mockSessionManager struct { processInfoMap map[uint64]*util.ProcessInfo + txnInfo []*txninfo.TxnInfo +} + +func (sm *mockSessionManager) ShowTxnList() []*txninfo.TxnInfo { + return sm.txnInfo } func (sm *mockSessionManager) ShowProcessList() map[uint64]*util.ProcessInfo { @@ -459,7 +466,7 @@ func (s *testTableSuite) TestSomeTables(c *C) { c.Assert(err, IsNil) tk := testkit.NewTestKit(c, s.store) tk.Se = se - sm := &mockSessionManager{make(map[uint64]*util.ProcessInfo, 2)} + sm := &mockSessionManager{make(map[uint64]*util.ProcessInfo, 2), nil} sm.processInfoMap[1] = &util.ProcessInfo{ ID: 1, User: "user-1", @@ -516,7 +523,7 @@ func (s *testTableSuite) TestSomeTables(c *C) { fmt.Sprintf("3 user-3 127.0.0.1:12345 test Init DB 9223372036 %s %s", "in transaction", "check port"), )) - sm = &mockSessionManager{make(map[uint64]*util.ProcessInfo, 2)} + sm = &mockSessionManager{make(map[uint64]*util.ProcessInfo, 2), nil} sm.processInfoMap[1] = &util.ProcessInfo{ ID: 1, User: "user-1", @@ -1509,3 +1516,24 @@ func (s *testTableSuite) TestInfoschemaClientErrors(c *C) { err = tk.ExecToErr("FLUSH CLIENT_ERRORS_SUMMARY") c.Assert(err.Error(), Equals, "[planner:1227]Access denied; you need (at least one of) the RELOAD privilege(s) for this operation") } + +func (s *testTableSuite) TestTrx(c *C) { + tk := s.newTestKitWithRoot(c) + _, digest := parser.NormalizeDigest("select * from trx for update;") + sm := &mockSessionManager{nil, make([]*txninfo.TxnInfo, 1)} + sm.txnInfo[0] = &txninfo.TxnInfo{ + StartTS: 424768545227014155, + CurrentSQLDigest: digest, + State: txninfo.TxnRunningNormal, + BlockStartTime: nil, + EntriesCount: 1, + EntriesSize: 19, + ConnectionID: 2, + Username: "root", + CurrentDB: "test", + } + tk.Se.SetSessionManager(sm) + tk.MustQuery("select * from information_schema.TIDB_TRX;").Check( + testkit.Rows("424768545227014155 2021-05-07 12:56:48 " + digest + " Normal 1 19 2 root test"), + ) +} diff --git a/kv/kv.go b/kv/kv.go index a6a23a88df01d..1fad79d641009 100644 --- a/kv/kv.go +++ b/kv/kv.go @@ -154,6 +154,7 @@ type Transaction interface { // String implements fmt.Stringer interface. String() string // LockKeys tries to lock the entries with the keys in KV store. + // Will block until all keys are locked successfully or an error occurs. LockKeys(ctx context.Context, lockCtx *LockCtx, keys ...Key) error // SetOption sets an option with a value, when val is nil, uses the default // value of this option. diff --git a/planner/core/exhaust_physical_plans.go b/planner/core/exhaust_physical_plans.go index d4f2923b6220a..cd227657a75d9 100644 --- a/planner/core/exhaust_physical_plans.go +++ b/planner/core/exhaust_physical_plans.go @@ -1662,7 +1662,7 @@ func (p *LogicalJoin) exhaustPhysicalPlans(prop *property.PhysicalProperty) ([]P } joins := make([]PhysicalPlan, 0, 8) canPushToTiFlash := p.canPushToCop(kv.TiFlash) - if p.ctx.GetSessionVars().AllowMPPExecution && canPushToTiFlash { + if p.ctx.GetSessionVars().IsMPPAllowed() && canPushToTiFlash { if p.shouldUseMPPBCJ() { mppJoins := p.tryToGetMppHashJoin(prop, true) if (p.preferJoinType & preferBCJoin) > 0 { @@ -1965,7 +1965,7 @@ func (lt *LogicalTopN) getPhysTopN(prop *property.PhysicalProperty) []PhysicalPl if !lt.limitHints.preferLimitToCop { allTaskTypes = append(allTaskTypes, property.RootTaskType) } - if lt.ctx.GetSessionVars().AllowMPPExecution { + if lt.ctx.GetSessionVars().IsMPPAllowed() { allTaskTypes = append(allTaskTypes, property.MppTaskType) } ret := make([]PhysicalPlan, 0, len(allTaskTypes)) @@ -2355,7 +2355,7 @@ func (la *LogicalAggregation) getHashAggs(prop *property.PhysicalProperty) []Phy taskTypes = append(taskTypes, property.CopTiFlashLocalReadTaskType) } canPushDownToTiFlash := la.canPushToCop(kv.TiFlash) - canPushDownToMPP := la.ctx.GetSessionVars().AllowMPPExecution && la.checkCanPushDownToMPP() && canPushDownToTiFlash + canPushDownToMPP := la.ctx.GetSessionVars().IsMPPAllowed() && la.checkCanPushDownToMPP() && canPushDownToTiFlash if la.HasDistinct() { // TODO: remove after the cost estimation of distinct pushdown is implemented. if !la.ctx.GetSessionVars().AllowDistinctAggPushDown { diff --git a/planner/core/integration_test.go b/planner/core/integration_test.go index 2abb4c353b304..9e8eaa9204af9 100644 --- a/planner/core/integration_test.go +++ b/planner/core/integration_test.go @@ -3597,6 +3597,23 @@ func (s *testIntegrationSuite) TestIssue24095(c *C) { } } +func (s *testIntegrationSuite) TestIssue24281(c *C) { + tk := testkit.NewTestKit(c, s.store) + tk.MustExec("use test") + tk.MustExec("drop table if exists member, agent, deposit, view_member_agents") + tk.MustExec("create table member(login varchar(50) NOT NULL, agent_login varchar(100) DEFAULT NULL, PRIMARY KEY(login))") + tk.MustExec("create table agent(login varchar(50) NOT NULL, data varchar(100) DEFAULT NULL, share_login varchar(50) NOT NULL, PRIMARY KEY(login))") + tk.MustExec("create table deposit(id varchar(50) NOT NULL, member_login varchar(50) NOT NULL, transfer_amount int NOT NULL, PRIMARY KEY(id), KEY midx(member_login, transfer_amount))") + tk.MustExec("create definer='root'@'localhost' view view_member_agents (member, share_login) as select m.login as member, a.share_login AS share_login from member as m join agent as a on m.agent_login = a.login") + + tk.MustExec(" select s.member_login as v1, SUM(s.transfer_amount) AS v2 " + + "FROM deposit AS s " + + "JOIN view_member_agents AS v ON s.member_login = v.member " + + "WHERE 1 = 1 AND v.share_login = 'somevalue' " + + "GROUP BY s.member_login " + + "UNION select 1 as v1, 2 as v2") +} + func (s *testIntegrationSuite) TestConflictReadFromStorage(c *C) { tk := testkit.NewTestKit(c, s.store) tk.MustExec("use test") @@ -3690,6 +3707,132 @@ func (s *testIntegrationSerialSuite) TestMergeContinuousSelections(c *C) { } } +func (s *testIntegrationSerialSuite) TestEnforceMPP(c *C) { + tk := testkit.NewTestKit(c, s.store) + + // test value limit of tidb_opt_tiflash_concurrency_factor + err := tk.ExecToErr("set @@tidb_opt_tiflash_concurrency_factor = 0") + c.Assert(err, NotNil) + c.Assert(err.Error(), Equals, `[variable:1231]Variable 'tidb_opt_tiflash_concurrency_factor' can't be set to the value of '0'`) + + tk.MustExec("set @@tidb_opt_tiflash_concurrency_factor = 1") + tk.MustQuery("select @@tidb_opt_tiflash_concurrency_factor").Check(testkit.Rows("1")) + tk.MustExec("set @@tidb_opt_tiflash_concurrency_factor = 24") + tk.MustQuery("select @@tidb_opt_tiflash_concurrency_factor").Check(testkit.Rows("24")) + + // test set tidb_allow_mpp + tk.MustExec("set @@session.tidb_allow_mpp = 0") + tk.MustQuery("select @@session.tidb_allow_mpp").Check(testkit.Rows("OFF")) + tk.MustExec("set @@session.tidb_allow_mpp = 1") + tk.MustQuery("select @@session.tidb_allow_mpp").Check(testkit.Rows("ON")) + tk.MustExec("set @@session.tidb_allow_mpp = 2") + tk.MustQuery("select @@session.tidb_allow_mpp").Check(testkit.Rows("ENFORCE")) + + tk.MustExec("set @@session.tidb_allow_mpp = off") + tk.MustQuery("select @@session.tidb_allow_mpp").Check(testkit.Rows("OFF")) + tk.MustExec("set @@session.tidb_allow_mpp = oN") + tk.MustQuery("select @@session.tidb_allow_mpp").Check(testkit.Rows("ON")) + tk.MustExec("set @@session.tidb_allow_mpp = enForcE") + tk.MustQuery("select @@session.tidb_allow_mpp").Check(testkit.Rows("ENFORCE")) + + tk.MustExec("set @@global.tidb_allow_mpp = faLsE") + tk.MustQuery("select @@global.tidb_allow_mpp").Check(testkit.Rows("OFF")) + tk.MustExec("set @@global.tidb_allow_mpp = True") + tk.MustQuery("select @@global.tidb_allow_mpp").Check(testkit.Rows("ON")) + + err = tk.ExecToErr("set @@global.tidb_allow_mpp = enforceWithTypo") + c.Assert(err, NotNil) + c.Assert(err.Error(), Equals, `[variable:1231]Variable 'tidb_allow_mpp' can't be set to the value of 'enforceWithTypo'`) + + // test query + tk.MustExec("use test") + tk.MustExec("drop table if exists t") + tk.MustExec("create table t(a int)") + tk.MustExec("create index idx on t(a)") + + // Create virtual tiflash replica info. + dom := domain.GetDomain(tk.Se) + is := dom.InfoSchema() + db, exists := is.SchemaByName(model.NewCIStr("test")) + c.Assert(exists, IsTrue) + for _, tblInfo := range db.Tables { + if tblInfo.Name.L == "t" { + tblInfo.TiFlashReplica = &model.TiFlashReplicaInfo{ + Count: 1, + Available: true, + } + } + } + + // ban mpp + tk.MustExec("set @@session.tidb_allow_mpp = 0") + tk.MustQuery("select @@session.tidb_allow_mpp").Check(testkit.Rows("OFF")) + + // read from tiflash, batch cop. + tk.MustQuery("explain format='verbose' select /*+ read_from_storage(tiflash[t]) */ count(*) from t where a=1").Check(testkit.Rows( + "StreamAgg_20 1.00 285050.00 root funcs:count(Column#5)->Column#3", + "└─TableReader_21 1.00 19003.88 root data:StreamAgg_9", + " └─StreamAgg_9 1.00 19006.88 batchCop[tiflash] funcs:count(1)->Column#5", + " └─Selection_19 10.00 285020.00 batchCop[tiflash] eq(test.t.a, 1)", + " └─TableFullScan_18 10000.00 255020.00 batchCop[tiflash] table:t keep order:false, stats:pseudo")) + + // open mpp + tk.MustExec("set @@session.tidb_allow_mpp = 1") + tk.MustQuery("select @@session.tidb_allow_mpp").Check(testkit.Rows("ON")) + + // should use tikv to index read + tk.MustQuery("explain format='verbose' select count(*) from t where a=1;").Check(testkit.Rows( + "StreamAgg_30 1.00 485.00 root funcs:count(Column#6)->Column#3", + "└─IndexReader_31 1.00 32.88 root index:StreamAgg_10", + " └─StreamAgg_10 1.00 35.88 cop[tikv] funcs:count(1)->Column#6", + " └─IndexRangeScan_29 10.00 455.00 cop[tikv] table:t, index:idx(a) range:[1,1], keep order:false, stats:pseudo")) + + // read from tikv, indexRead + tk.MustQuery("explain format='verbose' select /*+ read_from_storage(tikv[t]) */ count(*) from t where a=1;").Check(testkit.Rows( + "StreamAgg_18 1.00 485.00 root funcs:count(Column#5)->Column#3", + "└─IndexReader_19 1.00 32.88 root index:StreamAgg_10", + " └─StreamAgg_10 1.00 35.88 cop[tikv] funcs:count(1)->Column#5", + " └─IndexRangeScan_17 10.00 455.00 cop[tikv] table:t, index:idx(a) range:[1,1], keep order:false, stats:pseudo")) + + // read from tiflash, mpp with large cost + tk.MustQuery("explain format='verbose' select /*+ read_from_storage(tiflash[t]) */ count(*) from t where a=1").Check(testkit.Rows( + "HashAgg_21 1.00 11910.68 root funcs:count(Column#5)->Column#3", + "└─TableReader_23 1.00 11877.08 root data:ExchangeSender_22", + " └─ExchangeSender_22 1.00 285050.00 batchCop[tiflash] ExchangeType: PassThrough", + " └─HashAgg_9 1.00 285050.00 batchCop[tiflash] funcs:count(1)->Column#5", + " └─Selection_20 10.00 285020.00 batchCop[tiflash] eq(test.t.a, 1)", + " └─TableFullScan_19 10000.00 255020.00 batchCop[tiflash] table:t keep order:false, stats:pseudo")) + + // enforce mpp + tk.MustExec("set @@session.tidb_allow_mpp = 2") + tk.MustQuery("select @@session.tidb_allow_mpp").Check(testkit.Rows("ENFORCE")) + + // should use mpp + tk.MustQuery("explain format='verbose' select count(*) from t where a=1;").Check(testkit.Rows( + "HashAgg_24 1.00 33.60 root funcs:count(Column#5)->Column#3", + "└─TableReader_26 1.00 0.00 root data:ExchangeSender_25", + " └─ExchangeSender_25 1.00 285050.00 batchCop[tiflash] ExchangeType: PassThrough", + " └─HashAgg_9 1.00 285050.00 batchCop[tiflash] funcs:count(1)->Column#5", + " └─Selection_23 10.00 285020.00 batchCop[tiflash] eq(test.t.a, 1)", + " └─TableFullScan_22 10000.00 255020.00 batchCop[tiflash] table:t keep order:false, stats:pseudo")) + + // read from tikv, indexRead + tk.MustQuery("explain format='verbose' select /*+ read_from_storage(tikv[t]) */ count(*) from t where a=1;").Check(testkit.Rows( + "StreamAgg_18 1.00 485.00 root funcs:count(Column#5)->Column#3", + "└─IndexReader_19 1.00 32.88 root index:StreamAgg_10", + " └─StreamAgg_10 1.00 35.88 cop[tikv] funcs:count(1)->Column#5", + " └─IndexRangeScan_17 10.00 455.00 cop[tikv] table:t, index:idx(a) range:[1,1], keep order:false, stats:pseudo")) + + // read from tiflash, mpp with little cost + tk.MustQuery("explain format='verbose' select /*+ read_from_storage(tiflash[t]) */ count(*) from t where a=1").Check(testkit.Rows( + "HashAgg_21 1.00 33.60 root funcs:count(Column#5)->Column#3", + "└─TableReader_23 1.00 0.00 root data:ExchangeSender_22", + " └─ExchangeSender_22 1.00 285050.00 batchCop[tiflash] ExchangeType: PassThrough", + " └─HashAgg_9 1.00 285050.00 batchCop[tiflash] funcs:count(1)->Column#5", + " └─Selection_20 10.00 285020.00 batchCop[tiflash] eq(test.t.a, 1)", + " └─TableFullScan_19 10000.00 255020.00 batchCop[tiflash] table:t keep order:false, stats:pseudo")) +} + func (s *testIntegrationSuite) TestEliminateLockForTemporaryTable(c *C) { tk := testkit.NewTestKit(c, s.store) diff --git a/planner/core/optimizer.go b/planner/core/optimizer.go index 56c6e34ebd972..0c1c4a668d3c8 100644 --- a/planner/core/optimizer.go +++ b/planner/core/optimizer.go @@ -104,13 +104,13 @@ func CheckPrivilege(activeRoles []*auth.RoleIdentity, pm privilege.Manager, vs [ if v.privilege == mysql.ExtendedPriv { if !pm.RequestDynamicVerification(activeRoles, v.dynamicPriv, v.dynamicWithGrant) { if v.err == nil { - return ErrPrivilegeCheckFail + return ErrPrivilegeCheckFail.GenWithStackByArgs(v.dynamicPriv) } return v.err } } else if !pm.RequestVerification(activeRoles, v.db, v.table, v.column, v.privilege) { if v.err == nil { - return ErrPrivilegeCheckFail + return ErrPrivilegeCheckFail.GenWithStackByArgs(v.privilege.String()) } return v.err } diff --git a/planner/core/point_get_plan.go b/planner/core/point_get_plan.go index 7813d65d92104..fbc0bf9333a29 100644 --- a/planner/core/point_get_plan.go +++ b/planner/core/point_get_plan.go @@ -1000,7 +1000,7 @@ func checkFastPlanPrivilege(ctx sessionctx.Context, dbName, tableName string, ch var visitInfos []visitInfo for _, checkType := range checkTypes { if pm != nil && !pm.RequestVerification(ctx.GetSessionVars().ActiveRoles, dbName, tableName, "", checkType) { - return errors.New("privilege check fail") + return ErrPrivilegeCheckFail.GenWithStackByArgs(checkType.String()) } // This visitInfo is only for table lock check, so we do not need column field, // just fill it empty string. diff --git a/planner/core/property_cols_prune.go b/planner/core/property_cols_prune.go index 91a9f34fb9017..9cd83adc412de 100644 --- a/planner/core/property_cols_prune.go +++ b/planner/core/property_cols_prune.go @@ -148,21 +148,22 @@ func (p *LogicalProjection) PreparePossibleProperties(schema *expression.Schema, } } tmpSchema := expression.NewSchema(oldCols...) - for i := len(childProperties) - 1; i >= 0; i-- { - for j, col := range childProperties[i] { + newProperties := make([][]*expression.Column, 0, len(childProperties)) + for _, childProperty := range childProperties { + newChildProperty := make([]*expression.Column, 0, len(childProperty)) + for _, col := range childProperty { pos := tmpSchema.ColumnIndex(col) if pos >= 0 { - childProperties[i][j] = newCols[pos] + newChildProperty = append(newChildProperty, newCols[pos]) } else { - childProperties[i] = childProperties[i][:j] break } } - if len(childProperties[i]) == 0 { - childProperties = append(childProperties[:i], childProperties[i+1:]...) + if len(newChildProperty) != 0 { + newProperties = append(newProperties, newChildProperty) } } - return childProperties + return newProperties } func clonePossibleProperties(props [][]*expression.Column) [][]*expression.Column { diff --git a/planner/core/task.go b/planner/core/task.go index 205f5eb77b08a..fa6855503dd0e 100644 --- a/planner/core/task.go +++ b/planner/core/task.go @@ -2026,11 +2026,15 @@ func (t *mppTask) convertToRootTaskImpl(ctx sessionctx.Context) *rootTask { StoreType: kv.TiFlash, }.Init(ctx, t.p.SelectBlockOffset()) p.stats = t.p.statsInfo() + + p.cost = t.cst / p.ctx.GetSessionVars().CopTiFlashConcurrencyFactor + if p.ctx.GetSessionVars().IsMPPEnforced() { + p.cost = 0 + } rt := &rootTask{ p: p, - cst: t.cst / p.ctx.GetSessionVars().CopTiFlashConcurrencyFactor, + cst: p.cost, } - p.cost = rt.cost() return rt } diff --git a/privilege/privileges/privileges_test.go b/privilege/privileges/privileges_test.go index 2efb565b3ed2b..3038aad397076 100644 --- a/privilege/privileges/privileges_test.go +++ b/privilege/privileges/privileges_test.go @@ -843,6 +843,12 @@ func (s *testPrivilegeSuite) TestRevokePrivileges(c *C) { c.Assert(se.Auth(&auth.UserIdentity{Username: "hasgrant", Hostname: "localhost", AuthUsername: "hasgrant", AuthHostname: "%"}, nil, nil), IsTrue) mustExec(c, se, "REVOKE SELECT ON mysql.* FROM 'withoutgrant'") mustExec(c, se, "REVOKE ALL ON mysql.* FROM withoutgrant") + + // For issue https://github.com/pingcap/tidb/issues/23850 + mustExec(c, se, "CREATE USER u4") + mustExec(c, se, "GRANT ALL ON *.* TO u4 WITH GRANT OPTION") + c.Assert(se.Auth(&auth.UserIdentity{Username: "u4", Hostname: "localhost", AuthUsername: "u4", AuthHostname: "%"}, nil, nil), IsTrue) + mustExec(c, se, "REVOKE ALL ON *.* FROM CURRENT_USER()") } func (s *testPrivilegeSuite) TestSetGlobal(c *C) { @@ -1006,14 +1012,14 @@ func (s *testPrivilegeSuite) TestSystemSchema(c *C) { _, err = se.ExecuteInternal(context.Background(), "drop table information_schema.tables") c.Assert(strings.Contains(err.Error(), "denied to user"), IsTrue) _, err = se.ExecuteInternal(context.Background(), "update information_schema.tables set table_name = 'tst' where table_name = 'mysql'") - c.Assert(strings.Contains(err.Error(), "privilege check fail"), IsTrue) + c.Assert(strings.Contains(err.Error(), "privilege check"), IsTrue) // Test performance_schema. mustExec(c, se, `select * from performance_schema.events_statements_summary_by_digest`) _, err = se.ExecuteInternal(context.Background(), "drop table performance_schema.events_statements_summary_by_digest") c.Assert(strings.Contains(err.Error(), "denied to user"), IsTrue) _, err = se.ExecuteInternal(context.Background(), "update performance_schema.events_statements_summary_by_digest set schema_name = 'tst'") - c.Assert(strings.Contains(err.Error(), "privilege check fail"), IsTrue) + c.Assert(strings.Contains(err.Error(), "privilege check"), IsTrue) _, err = se.ExecuteInternal(context.Background(), "delete from performance_schema.events_statements_summary_by_digest") c.Assert(strings.Contains(err.Error(), "DELETE command denied to user"), IsTrue) _, err = se.ExecuteInternal(context.Background(), "create table performance_schema.t(a int)") @@ -1025,7 +1031,7 @@ func (s *testPrivilegeSuite) TestSystemSchema(c *C) { _, err = se.ExecuteInternal(context.Background(), "drop table metrics_schema.tidb_query_duration") c.Assert(strings.Contains(err.Error(), "denied to user"), IsTrue) _, err = se.ExecuteInternal(context.Background(), "update metrics_schema.tidb_query_duration set instance = 'tst'") - c.Assert(strings.Contains(err.Error(), "privilege check fail"), IsTrue) + c.Assert(strings.Contains(err.Error(), "privilege check"), IsTrue) _, err = se.ExecuteInternal(context.Background(), "delete from metrics_schema.tidb_query_duration") c.Assert(strings.Contains(err.Error(), "DELETE command denied to user"), IsTrue) _, err = se.ExecuteInternal(context.Background(), "create table metric_schema.t(a int)") @@ -1041,9 +1047,9 @@ func (s *testPrivilegeSuite) TestAdminCommand(c *C) { c.Assert(se.Auth(&auth.UserIdentity{Username: "test_admin", Hostname: "localhost"}, nil, nil), IsTrue) _, err := se.ExecuteInternal(context.Background(), "ADMIN SHOW DDL JOBS") - c.Assert(strings.Contains(err.Error(), "privilege check fail"), IsTrue) + c.Assert(strings.Contains(err.Error(), "privilege check"), IsTrue) _, err = se.ExecuteInternal(context.Background(), "ADMIN CHECK TABLE t") - c.Assert(strings.Contains(err.Error(), "privilege check fail"), IsTrue) + c.Assert(strings.Contains(err.Error(), "privilege check"), IsTrue) c.Assert(se.Auth(&auth.UserIdentity{Username: "root", Hostname: "localhost"}, nil, nil), IsTrue) _, err = se.ExecuteInternal(context.Background(), "ADMIN SHOW DDL JOBS") diff --git a/server/server.go b/server/server.go index f7a6021a11221..29f5307895cc2 100644 --- a/server/server.go +++ b/server/server.go @@ -37,7 +37,6 @@ import ( "math/rand" "net" "net/http" - "unsafe" // For pprof _ "net/http/pprof" @@ -46,6 +45,7 @@ import ( "sync" "sync/atomic" "time" + "unsafe" "github.com/blacktear23/go-proxyprotocol" "github.com/pingcap/errors" @@ -56,6 +56,7 @@ import ( "github.com/pingcap/tidb/errno" "github.com/pingcap/tidb/metrics" "github.com/pingcap/tidb/plugin" + "github.com/pingcap/tidb/session/txninfo" "github.com/pingcap/tidb/sessionctx/variable" "github.com/pingcap/tidb/store/tikv/oracle" "github.com/pingcap/tidb/util" @@ -557,6 +558,22 @@ func (s *Server) ShowProcessList() map[uint64]*util.ProcessInfo { return rs } +// ShowTxnList shows all txn info for displaying in `TIDB_TRX` +func (s *Server) ShowTxnList() []*txninfo.TxnInfo { + s.rwlock.RLock() + defer s.rwlock.RUnlock() + rs := make([]*txninfo.TxnInfo, 0, len(s.clients)) + for _, client := range s.clients { + if client.ctx.Session != nil { + info := client.ctx.Session.TxnInfo() + if info != nil { + rs = append(rs, info) + } + } + } + return rs +} + // GetProcessInfo implements the SessionManager interface. func (s *Server) GetProcessInfo(id uint64) (*util.ProcessInfo, bool) { s.rwlock.RLock() diff --git a/session/clustered_index_test.go b/session/clustered_index_test.go index fd40cfd567f11..0f79b1b13fc2e 100644 --- a/session/clustered_index_test.go +++ b/session/clustered_index_test.go @@ -14,11 +14,16 @@ package session_test import ( + "fmt" + "math/rand" + "strings" + . "github.com/pingcap/check" "github.com/pingcap/tidb/config" "github.com/pingcap/tidb/errno" "github.com/pingcap/tidb/sessionctx/variable" "github.com/pingcap/tidb/util/collate" + "github.com/pingcap/tidb/util/israce" "github.com/pingcap/tidb/util/testkit" "github.com/pingcap/tidb/util/testutil" ) @@ -578,6 +583,50 @@ func (s *testClusteredSerialSuite) TestPrefixClusteredIndexAddIndexAndRecover(c tk1.MustExec("admin check table t") } +func (s *testClusteredSerialSuite) TestPartitionTable(c *C) { + if israce.RaceEnabled { + c.Skip("exhaustive types test, skip race test") + } + + tk := testkit.NewTestKitWithInit(c, s.store) + tk.MustExec("create database test_view") + tk.MustExec("use test_view") + tk.MustExec("set @@tidb_partition_prune_mode = 'dynamic'") + + tk.MustExec(`create table thash (a int, b int, c varchar(32), primary key(a, b) clustered) partition by hash(a) partitions 4`) + tk.MustExec(`create table trange (a int, b int, c varchar(32), primary key(a, b) clustered) partition by range columns(a) ( + partition p0 values less than (3000), + partition p1 values less than (6000), + partition p2 values less than (9000), + partition p3 values less than (10000))`) + tk.MustExec(`create table tnormal (a int, b int, c varchar(32), primary key(a, b))`) + + vals := make([]string, 0, 4000) + existedPK := make(map[string]struct{}, 4000) + for i := 0; i < 4000; { + a := rand.Intn(10000) + b := rand.Intn(10000) + pk := fmt.Sprintf("%v, %v", a, b) + if _, ok := existedPK[pk]; ok { + continue + } + existedPK[pk] = struct{}{} + i++ + vals = append(vals, fmt.Sprintf(`(%v, %v, '%v')`, a, b, rand.Intn(10000))) + } + + tk.MustExec("insert into thash values " + strings.Join(vals, ", ")) + tk.MustExec("insert into trange values " + strings.Join(vals, ", ")) + tk.MustExec("insert into tnormal values " + strings.Join(vals, ", ")) + + for i := 0; i < 200; i++ { + cond := fmt.Sprintf("where a in (%v, %v, %v) and b < %v", rand.Intn(10000), rand.Intn(10000), rand.Intn(10000), rand.Intn(10000)) + result := tk.MustQuery("select * from tnormal " + cond).Sort().Rows() + tk.MustQuery("select * from thash use index(primary) " + cond).Sort().Check(result) + tk.MustQuery("select * from trange use index(primary) " + cond).Sort().Check(result) + } +} + // https://github.com/pingcap/tidb/issues/23106 func (s *testClusteredSerialSuite) TestClusteredIndexDecodeRestoredDataV5(c *C) { tk := testkit.NewTestKitWithInit(c, s.store) diff --git a/session/pessimistic_test.go b/session/pessimistic_test.go index 8fdd635b51bc1..83f0057384aea 100644 --- a/session/pessimistic_test.go +++ b/session/pessimistic_test.go @@ -33,7 +33,6 @@ import ( "github.com/pingcap/tidb/sessionctx/variable" txndriver "github.com/pingcap/tidb/store/driver/txn" "github.com/pingcap/tidb/store/tikv" - tikverr "github.com/pingcap/tidb/store/tikv/error" "github.com/pingcap/tidb/store/tikv/oracle" "github.com/pingcap/tidb/tablecodec" "github.com/pingcap/tidb/types" @@ -612,7 +611,7 @@ func (s *testPessimisticSuite) TestWaitLockKill(c *C) { _, err := tk2.Exec("update test_kill set c = c + 1 where id = 1") wg.Done() c.Assert(err, NotNil) - c.Assert(terror.ErrorEqual(err, tikverr.ErrQueryInterrupted), IsTrue) + c.Assert(terror.ErrorEqual(err, txndriver.ErrQueryInterrupted), IsTrue) tk.MustExec("rollback") } @@ -1132,11 +1131,11 @@ func (s *testPessimisticSuite) TestPessimisticLockNonExistsKey(c *C) { tk1.MustExec("begin pessimistic") err := tk1.ExecToErr("select * from t where k = 2 for update nowait") - c.Check(tikverr.ErrLockAcquireFailAndNoWaitSet.Equal(err), IsTrue) + c.Check(txndriver.ErrLockAcquireFailAndNoWaitSet.Equal(err), IsTrue) err = tk1.ExecToErr("select * from t where k = 4 for update nowait") - c.Check(tikverr.ErrLockAcquireFailAndNoWaitSet.Equal(err), IsTrue) + c.Check(txndriver.ErrLockAcquireFailAndNoWaitSet.Equal(err), IsTrue) err = tk1.ExecToErr("select * from t where k = 7 for update nowait") - c.Check(tikverr.ErrLockAcquireFailAndNoWaitSet.Equal(err), IsTrue) + c.Check(txndriver.ErrLockAcquireFailAndNoWaitSet.Equal(err), IsTrue) tk.MustExec("rollback") tk1.MustExec("rollback") @@ -1148,9 +1147,9 @@ func (s *testPessimisticSuite) TestPessimisticLockNonExistsKey(c *C) { tk1.MustExec("begin pessimistic") err = tk1.ExecToErr("select * from t where k = 2 for update nowait") - c.Check(tikverr.ErrLockAcquireFailAndNoWaitSet.Equal(err), IsTrue) + c.Check(txndriver.ErrLockAcquireFailAndNoWaitSet.Equal(err), IsTrue) err = tk1.ExecToErr("select * from t where k = 6 for update nowait") - c.Check(tikverr.ErrLockAcquireFailAndNoWaitSet.Equal(err), IsTrue) + c.Check(txndriver.ErrLockAcquireFailAndNoWaitSet.Equal(err), IsTrue) tk.MustExec("rollback") tk1.MustExec("rollback") } @@ -1283,7 +1282,7 @@ func (s *testPessimisticSuite) TestBatchPointGetLockIndex(c *C) { c.Assert(txndriver.ErrLockWaitTimeout.Equal(err), IsTrue) err = tk2.ExecToErr("select * from t1 where c2 = 3 for update nowait") c.Assert(err, NotNil) - c.Assert(tikverr.ErrLockAcquireFailAndNoWaitSet.Equal(err), IsTrue) + c.Assert(txndriver.ErrLockAcquireFailAndNoWaitSet.Equal(err), IsTrue) tk.MustExec("rollback") tk2.MustExec("rollback") } @@ -1430,12 +1429,12 @@ func (s *testPessimisticSuite) TestGenerateColPointGet(c *C) { tk2.MustExec("begin pessimistic") err := tk2.ExecToErr("select * from tu where z = 3 for update nowait") c.Assert(err, NotNil) - c.Assert(terror.ErrorEqual(err, tikverr.ErrLockAcquireFailAndNoWaitSet), IsTrue) + c.Assert(terror.ErrorEqual(err, txndriver.ErrLockAcquireFailAndNoWaitSet), IsTrue) tk.MustExec("begin pessimistic") tk.MustExec("insert into tu(x, y) values(2, 2);") err = tk2.ExecToErr("select * from tu where z = 4 for update nowait") c.Assert(err, NotNil) - c.Assert(terror.ErrorEqual(err, tikverr.ErrLockAcquireFailAndNoWaitSet), IsTrue) + c.Assert(terror.ErrorEqual(err, txndriver.ErrLockAcquireFailAndNoWaitSet), IsTrue) // test batch point get lock tk.MustExec("begin pessimistic") @@ -1444,12 +1443,12 @@ func (s *testPessimisticSuite) TestGenerateColPointGet(c *C) { tk2.MustExec("begin pessimistic") err = tk2.ExecToErr("select x from tu where z in (3, 7, 9) for update nowait") c.Assert(err, NotNil) - c.Assert(terror.ErrorEqual(err, tikverr.ErrLockAcquireFailAndNoWaitSet), IsTrue) + c.Assert(terror.ErrorEqual(err, txndriver.ErrLockAcquireFailAndNoWaitSet), IsTrue) tk.MustExec("begin pessimistic") tk.MustExec("insert into tu(x, y) values(5, 6);") err = tk2.ExecToErr("select * from tu where z = 11 for update nowait") c.Assert(err, NotNil) - c.Assert(terror.ErrorEqual(err, tikverr.ErrLockAcquireFailAndNoWaitSet), IsTrue) + c.Assert(terror.ErrorEqual(err, txndriver.ErrLockAcquireFailAndNoWaitSet), IsTrue) tk.MustExec("commit") tk2.MustExec("commit") diff --git a/session/session.go b/session/session.go index 2f842f92e183a..0b4cb309f434b 100644 --- a/session/session.go +++ b/session/session.go @@ -41,6 +41,9 @@ import ( "github.com/pingcap/parser/model" "github.com/pingcap/parser/mysql" "github.com/pingcap/parser/terror" + "github.com/pingcap/tipb/go-binlog" + "go.uber.org/zap" + "github.com/pingcap/tidb/bindinfo" "github.com/pingcap/tidb/config" "github.com/pingcap/tidb/ddl" @@ -58,6 +61,7 @@ import ( "github.com/pingcap/tidb/plugin" "github.com/pingcap/tidb/privilege" "github.com/pingcap/tidb/privilege/privileges" + txninfo "github.com/pingcap/tidb/session/txninfo" "github.com/pingcap/tidb/sessionctx" "github.com/pingcap/tidb/sessionctx/binloginfo" "github.com/pingcap/tidb/sessionctx/stmtctx" @@ -81,8 +85,6 @@ import ( "github.com/pingcap/tidb/util/sli" "github.com/pingcap/tidb/util/sqlexec" "github.com/pingcap/tidb/util/timeutil" - "github.com/pingcap/tipb/go-binlog" - "go.uber.org/zap" ) var ( @@ -145,6 +147,8 @@ type Session interface { Auth(user *auth.UserIdentity, auth []byte, salt []byte) bool AuthWithoutVerification(user *auth.UserIdentity) bool ShowProcess() *util.ProcessInfo + // Return the information of the txn current running + TxnInfo() *txninfo.TxnInfo // PrepareTxnCtx is exported for test. PrepareTxnCtx(context.Context) // FieldList returns fields list of a table. @@ -183,7 +187,7 @@ func (h *StmtHistory) Count() int { type session struct { // processInfo is used by ShowProcess(), and should be modified atomically. processInfo atomic.Value - txn TxnState + txn LazyTxn mu struct { sync.RWMutex @@ -442,6 +446,19 @@ func (s *session) FieldList(tableName string) ([]*ast.ResultField, error) { return fields, nil } +func (s *session) TxnInfo() *txninfo.TxnInfo { + txnInfo := s.txn.Info() + if txnInfo == nil { + return nil + } + processInfo := s.ShowProcess() + txnInfo.CurrentSQLDigest = processInfo.Digest + txnInfo.ConnectionID = processInfo.ID + txnInfo.Username = processInfo.User + txnInfo.CurrentDB = processInfo.DB + return txnInfo +} + func (s *session) doCommit(ctx context.Context) error { if !s.txn.Valid() { return nil @@ -524,6 +541,7 @@ func (s *session) doCommit(ctx context.Context) error { if err = memBuffer.Delete(iter.Key()); err != nil { return errors.Trace(err) } + s.txn.UpdateEntriesCountAndSize() if err = iter.Next(); err != nil { return errors.Trace(err) } diff --git a/session/session_test.go b/session/session_test.go index 9ed2f9759243b..3baee4f0ef6f1 100644 --- a/session/session_test.go +++ b/session/session_test.go @@ -42,6 +42,7 @@ import ( plannercore "github.com/pingcap/tidb/planner/core" "github.com/pingcap/tidb/privilege/privileges" "github.com/pingcap/tidb/session" + txninfo "github.com/pingcap/tidb/session/txninfo" "github.com/pingcap/tidb/sessionctx" "github.com/pingcap/tidb/sessionctx/binloginfo" "github.com/pingcap/tidb/sessionctx/variable" @@ -83,6 +84,7 @@ var _ = SerialSuites(&testSessionSerialSuite{}) var _ = SerialSuites(&testBackupRestoreSuite{}) var _ = Suite(&testClusteredSuite{}) var _ = SerialSuites(&testClusteredSerialSuite{}) +var _ = SerialSuites(&testTxnStateSuite{}) type testSessionSuiteBase struct { cluster cluster.Cluster @@ -2892,7 +2894,7 @@ func (s *testSessionSuite2) TestUpdatePrivilege(c *C) { _, err := tk1.Exec("update t2 set id = 666 where id = 1;") c.Assert(err, NotNil) - c.Assert(strings.Contains(err.Error(), "privilege check fail"), IsTrue) + c.Assert(strings.Contains(err.Error(), "privilege check"), IsTrue) // Cover a bug that t1 and t2 both require update privilege. // In fact, the privlege check for t1 should be update, and for t2 should be select. @@ -4303,3 +4305,103 @@ func (s *testSessionSuite3) TestGlobalTemporaryTable(c *C) { // The global temporary table data is discard after the transaction commit. tk.MustQuery("select * from g_tmp").Check(testkit.Rows()) } + +type testTxnStateSuite struct { + testSessionSuiteBase +} + +func (s *testTxnStateSuite) TestBasic(c *C) { + tk := testkit.NewTestKitWithInit(c, s.store) + tk.MustExec("create table t(a int);") + tk.MustExec("insert into t(a) values (1);") + info := tk.Se.TxnInfo() + c.Assert(info, IsNil) + tk.MustExec("begin pessimistic;") + tk.MustExec("select * from t for update;") + info = tk.Se.TxnInfo() + _, expectedDigest := parser.NormalizeDigest("select * from t for update;") + c.Assert(info.CurrentSQLDigest, Equals, expectedDigest) + c.Assert(info.State, Equals, txninfo.TxnRunningNormal) + c.Assert(info.BlockStartTime, IsNil) + // len and size will be covered in TestLenAndSize + c.Assert(info.ConnectionID, Equals, tk.Se.GetSessionVars().ConnectionID) + c.Assert(info.Username, Equals, "") + c.Assert(info.CurrentDB, Equals, "test") + tk.MustExec("commit;") + info = tk.Se.TxnInfo() + c.Assert(info, IsNil) +} + +func (s *testTxnStateSuite) TestEntriesCountAndSize(c *C) { + tk := testkit.NewTestKitWithInit(c, s.store) + tk.MustExec("create table t(a int);") + tk.MustExec("begin pessimistic;") + tk.MustExec("insert into t(a) values (1);") + info := tk.Se.TxnInfo() + c.Assert(info.EntriesCount, Equals, uint64(1)) + c.Assert(info.EntriesSize, Equals, uint64(29)) + tk.MustExec("insert into t(a) values (2);") + info = tk.Se.TxnInfo() + c.Assert(info.EntriesCount, Equals, uint64(2)) + c.Assert(info.EntriesSize, Equals, uint64(58)) + tk.MustExec("commit;") +} + +func (s *testTxnStateSuite) TestBlocked(c *C) { + tk := testkit.NewTestKitWithInit(c, s.store) + tk2 := testkit.NewTestKitWithInit(c, s.store) + tk.MustExec("create table t(a int);") + tk.MustExec("insert into t(a) values (1);") + tk.MustExec("begin pessimistic;") + tk.MustExec("select * from t where a = 1 for update;") + go func() { + tk2.MustExec("begin pessimistic") + tk2.MustExec("select * from t where a = 1 for update;") + tk2.MustExec("commit;") + }() + time.Sleep(100 * time.Millisecond) + c.Assert(tk2.Se.TxnInfo().State, Equals, txninfo.TxnLockWaiting) + c.Assert(tk2.Se.TxnInfo().BlockStartTime, NotNil) + tk.MustExec("commit;") +} + +func (s *testTxnStateSuite) TestCommitting(c *C) { + tk := testkit.NewTestKitWithInit(c, s.store) + tk2 := testkit.NewTestKitWithInit(c, s.store) + tk.MustExec("create table t(a int);") + tk.MustExec("insert into t(a) values (1), (2);") + tk.MustExec("begin pessimistic;") + tk.MustExec("select * from t where a = 1 for update;") + ch := make(chan struct{}) + go func() { + tk2.MustExec("begin pessimistic") + c.Assert(tk2.Se.TxnInfo(), NotNil) + tk2.MustExec("select * from t where a = 2 for update;") + failpoint.Enable("github.com/pingcap/tidb/session/mockSlowCommit", "sleep(200)") + defer failpoint.Disable("github.com/pingcap/tidb/session/mockSlowCommit") + tk2.MustExec("commit;") + ch <- struct{}{} + }() + time.Sleep(100 * time.Millisecond) + c.Assert(tk2.Se.TxnInfo().State, Equals, txninfo.TxnCommitting) + tk.MustExec("commit;") + <-ch +} + +func (s *testTxnStateSuite) TestRollbacking(c *C) { + tk := testkit.NewTestKitWithInit(c, s.store) + tk.MustExec("create table t(a int);") + tk.MustExec("insert into t(a) values (1), (2);") + ch := make(chan struct{}) + go func() { + tk.MustExec("begin pessimistic") + tk.MustExec("insert into t(a) values (3);") + failpoint.Enable("github.com/pingcap/tidb/session/mockSlowRollback", "sleep(200)") + defer failpoint.Disable("github.com/pingcap/tidb/session/mockSlowRollback") + tk.MustExec("rollback;") + ch <- struct{}{} + }() + time.Sleep(100 * time.Millisecond) + c.Assert(tk.Se.TxnInfo().State, Equals, txninfo.TxnRollingBack) + <-ch +} diff --git a/session/txn.go b/session/txn.go index aebed7ed920b2..133cafb976aae 100644 --- a/session/txn.go +++ b/session/txn.go @@ -20,6 +20,8 @@ import ( "runtime/trace" "strings" "sync/atomic" + "time" + "unsafe" "github.com/opentracing/opentracing-go" "github.com/pingcap/errors" @@ -28,6 +30,7 @@ import ( "github.com/pingcap/parser/terror" "github.com/pingcap/tidb/config" "github.com/pingcap/tidb/kv" + "github.com/pingcap/tidb/session/txninfo" "github.com/pingcap/tidb/sessionctx" "github.com/pingcap/tidb/sessionctx/binloginfo" tikvstore "github.com/pingcap/tidb/store/tikv/kv" @@ -39,12 +42,12 @@ import ( "go.uber.org/zap" ) -// TxnState wraps kv.Transaction to provide a new kv.Transaction. +// LazyTxn wraps kv.Transaction to provide a new kv.Transaction. // 1. It holds all statement related modification in the buffer before flush to the txn, // so if execute statement meets error, the txn won't be made dirty. // 2. It's a lazy transaction, that means it's a txnFuture before StartTS() is really need. -type TxnState struct { - // States of a TxnState should be one of the followings: +type LazyTxn struct { + // States of a LazyTxn should be one of the followings: // Invalid: kv.Transaction == nil && txnFuture == nil // Pending: kv.Transaction == nil && txnFuture != nil // Valid: kv.Transaction != nil && txnFuture == nil @@ -55,23 +58,40 @@ type TxnState struct { stagingHandle kv.StagingHandle mutations map[int64]*binlog.TableMutation writeSLI sli.TxnWriteThroughputSLI + + // following atomic fields are used for filling TxnInfo + // we need these fields because kv.Transaction provides no thread safety promise + // but we hope getting TxnInfo is a thread safe op + + infoStartTS uint64 + // current executing state + State txninfo.TxnRunningState + // last trying to block start time + blockStartTime unsafe.Pointer // *time.Time, cannot use atomic.Value here because it is possible to be nil + // how many entries are there in the memBuffer, should be equal to self.(kv.Transaction).Len() + EntriesCount uint64 + // how many memory space do the entries in the memBuffer take, should be equal to self.(kv.Transaction).Size() + EntriesSize uint64 } // GetTableInfo returns the cached index name. -func (txn *TxnState) GetTableInfo(id int64) *model.TableInfo { +func (txn *LazyTxn) GetTableInfo(id int64) *model.TableInfo { return txn.Transaction.GetTableInfo(id) } // CacheTableInfo caches the index name. -func (txn *TxnState) CacheTableInfo(id int64, info *model.TableInfo) { +func (txn *LazyTxn) CacheTableInfo(id int64, info *model.TableInfo) { txn.Transaction.CacheTableInfo(id, info) } -func (txn *TxnState) init() { +func (txn *LazyTxn) init() { txn.mutations = make(map[int64]*binlog.TableMutation) + atomic.StoreInt32(&txn.State, txninfo.TxnRunningNormal) + atomic.StoreUint64(&txn.EntriesCount, 0) + atomic.StoreUint64(&txn.EntriesSize, 0) } -func (txn *TxnState) initStmtBuf() { +func (txn *LazyTxn) initStmtBuf() { if txn.Transaction == nil { return } @@ -81,14 +101,14 @@ func (txn *TxnState) initStmtBuf() { } // countHint is estimated count of mutations. -func (txn *TxnState) countHint() int { +func (txn *LazyTxn) countHint() int { if txn.stagingHandle == kv.InvalidStagingHandle { return 0 } return txn.Transaction.GetMemBuffer().Len() - txn.initCnt } -func (txn *TxnState) flushStmtBuf() { +func (txn *LazyTxn) flushStmtBuf() { if txn.stagingHandle == kv.InvalidStagingHandle { return } @@ -97,17 +117,19 @@ func (txn *TxnState) flushStmtBuf() { txn.initCnt = buf.Len() } -func (txn *TxnState) cleanupStmtBuf() { +func (txn *LazyTxn) cleanupStmtBuf() { if txn.stagingHandle == kv.InvalidStagingHandle { return } buf := txn.Transaction.GetMemBuffer() buf.Cleanup(txn.stagingHandle) txn.initCnt = buf.Len() + atomic.StoreUint64(&txn.EntriesCount, uint64(txn.Transaction.Len())) + atomic.StoreUint64(&txn.EntriesSize, uint64(txn.Transaction.Size())) } // Size implements the MemBuffer interface. -func (txn *TxnState) Size() int { +func (txn *LazyTxn) Size() int { if txn.Transaction == nil { return 0 } @@ -115,19 +137,19 @@ func (txn *TxnState) Size() int { } // Valid implements the kv.Transaction interface. -func (txn *TxnState) Valid() bool { +func (txn *LazyTxn) Valid() bool { return txn.Transaction != nil && txn.Transaction.Valid() } -func (txn *TxnState) pending() bool { +func (txn *LazyTxn) pending() bool { return txn.Transaction == nil && txn.txnFuture != nil } -func (txn *TxnState) validOrPending() bool { +func (txn *LazyTxn) validOrPending() bool { return txn.txnFuture != nil || txn.Valid() } -func (txn *TxnState) String() string { +func (txn *LazyTxn) String() string { if txn.Transaction != nil { return txn.Transaction.String() } @@ -138,7 +160,7 @@ func (txn *TxnState) String() string { } // GoString implements the "%#v" format for fmt.Printf. -func (txn *TxnState) GoString() string { +func (txn *LazyTxn) GoString() string { var s strings.Builder s.WriteString("Txn{") if txn.pending() { @@ -157,18 +179,25 @@ func (txn *TxnState) GoString() string { return s.String() } -func (txn *TxnState) changeInvalidToValid(kvTxn kv.Transaction) { +func (txn *LazyTxn) changeInvalidToValid(kvTxn kv.Transaction) { txn.Transaction = kvTxn + atomic.StoreInt32(&txn.State, txninfo.TxnRunningNormal) + atomic.StoreUint64(&txn.infoStartTS, kvTxn.StartTS()) txn.initStmtBuf() + atomic.StoreUint64(&txn.EntriesCount, uint64(txn.Transaction.Len())) + atomic.StoreUint64(&txn.EntriesSize, uint64(txn.Transaction.Size())) txn.txnFuture = nil } -func (txn *TxnState) changeInvalidToPending(future *txnFuture) { +func (txn *LazyTxn) changeInvalidToPending(future *txnFuture) { txn.Transaction = nil txn.txnFuture = future + atomic.StoreUint64(&txn.infoStartTS, 0) + atomic.StoreUint64(&txn.EntriesCount, uint64(0)) + atomic.StoreUint64(&txn.EntriesSize, uint64(0)) } -func (txn *TxnState) changePendingToValid(ctx context.Context) error { +func (txn *LazyTxn) changePendingToValid(ctx context.Context) error { if txn.txnFuture == nil { return errors.New("transaction future is not set") } @@ -183,17 +212,24 @@ func (txn *TxnState) changePendingToValid(ctx context.Context) error { return err } txn.Transaction = t + atomic.StoreInt32(&txn.State, txninfo.TxnRunningNormal) + atomic.StoreUint64(&txn.infoStartTS, t.StartTS()) txn.initStmtBuf() + atomic.StoreUint64(&txn.EntriesCount, uint64(txn.Transaction.Len())) + atomic.StoreUint64(&txn.EntriesSize, uint64(txn.Transaction.Size())) return nil } -func (txn *TxnState) changeToInvalid() { +func (txn *LazyTxn) changeToInvalid() { if txn.stagingHandle != kv.InvalidStagingHandle { txn.Transaction.GetMemBuffer().Cleanup(txn.stagingHandle) } txn.stagingHandle = kv.InvalidStagingHandle txn.Transaction = nil txn.txnFuture = nil + atomic.StoreUint64(&txn.infoStartTS, 0) + atomic.StoreUint64(&txn.EntriesCount, 0) + atomic.StoreUint64(&txn.EntriesSize, 0) } var hasMockAutoIncIDRetry = int64(0) @@ -223,7 +259,7 @@ func ResetMockAutoRandIDRetryCount(failTimes int64) { } // Commit overrides the Transaction interface. -func (txn *TxnState) Commit(ctx context.Context) error { +func (txn *LazyTxn) Commit(ctx context.Context) error { defer txn.reset() if len(txn.mutations) != 0 || txn.countHint() != 0 { logutil.BgLogger().Error("the code should never run here", @@ -233,6 +269,10 @@ func (txn *TxnState) Commit(ctx context.Context) error { return errors.Trace(kv.ErrInvalidTxn) } + atomic.StoreInt32(&txn.State, txninfo.TxnCommitting) + + failpoint.Inject("mockSlowCommit", func(_ failpoint.Value) {}) + // mockCommitError8942 is used for PR #8942. failpoint.Inject("mockCommitError8942", func(val failpoint.Value) { if val.(bool) { @@ -259,17 +299,34 @@ func (txn *TxnState) Commit(ctx context.Context) error { } // Rollback overrides the Transaction interface. -func (txn *TxnState) Rollback() error { +func (txn *LazyTxn) Rollback() error { defer txn.reset() + atomic.StoreInt32(&txn.State, txninfo.TxnRollingBack) + // mockSlowRollback is used to mock a rollback which takes a long time + failpoint.Inject("mockSlowRollback", func(_ failpoint.Value) {}) return txn.Transaction.Rollback() } -func (txn *TxnState) reset() { +// LockKeys Wrap the inner transaction's `LockKeys` to record the status +func (txn *LazyTxn) LockKeys(ctx context.Context, lockCtx *kv.LockCtx, keys ...kv.Key) error { + originState := atomic.LoadInt32(&txn.State) + atomic.StoreInt32(&txn.State, txninfo.TxnLockWaiting) + t := time.Now() + atomic.StorePointer(&txn.blockStartTime, unsafe.Pointer(&t)) + err := txn.Transaction.LockKeys(ctx, lockCtx, keys...) + atomic.StorePointer(&txn.blockStartTime, unsafe.Pointer(nil)) + atomic.StoreInt32(&txn.State, originState) + atomic.StoreUint64(&txn.EntriesCount, uint64(txn.Transaction.Len())) + atomic.StoreUint64(&txn.EntriesSize, uint64(txn.Transaction.Size())) + return err +} + +func (txn *LazyTxn) reset() { txn.cleanup() txn.changeToInvalid() } -func (txn *TxnState) cleanup() { +func (txn *LazyTxn) cleanup() { txn.cleanupStmtBuf() txn.initStmtBuf() for key := range txn.mutations { @@ -278,7 +335,7 @@ func (txn *TxnState) cleanup() { } // KeysNeedToLock returns the keys need to be locked. -func (txn *TxnState) KeysNeedToLock() ([]kv.Key, error) { +func (txn *LazyTxn) KeysNeedToLock() ([]kv.Key, error) { if txn.stagingHandle == kv.InvalidStagingHandle { return nil, nil } @@ -316,6 +373,32 @@ func keyNeedToLock(k, v []byte, flags tikvstore.KeyFlags) bool { return !isNonUniqueIndex } +// Info dump the TxnState to Datum for displaying in `TIDB_TRX` +// This function is supposed to be thread safe +func (txn *LazyTxn) Info() *txninfo.TxnInfo { + startTs := atomic.LoadUint64(&txn.infoStartTS) + if startTs == 0 { + return nil + } + return &txninfo.TxnInfo{ + StartTS: startTs, + State: atomic.LoadInt32(&txn.State), + BlockStartTime: (*time.Time)(atomic.LoadPointer(&txn.blockStartTime)), + EntriesCount: atomic.LoadUint64(&txn.EntriesCount), + EntriesSize: atomic.LoadUint64(&txn.EntriesSize), + } +} + +// UpdateEntriesCountAndSize updates the EntriesCount and EntriesSize +// Note this function is not thread safe, because +// txn.Transaction can be changed during this function's execution if running parallel. +func (txn *LazyTxn) UpdateEntriesCountAndSize() { + if txn.Valid() { + atomic.StoreUint64(&txn.EntriesCount, uint64(txn.Transaction.Len())) + atomic.StoreUint64(&txn.EntriesSize, uint64(txn.Transaction.Size())) + } +} + func getBinlogMutation(ctx sessionctx.Context, tableID int64) *binlog.TableMutation { bin := binloginfo.GetPrewriteValue(ctx, true) for i := range bin.Mutations { diff --git a/session/txninfo/txn_info.go b/session/txninfo/txn_info.go new file mode 100644 index 0000000000000..77a2d8c90cd05 --- /dev/null +++ b/session/txninfo/txn_info.go @@ -0,0 +1,96 @@ +// Copyright 2021 PingCAP, Inc. + +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package txninfo + +import ( + "time" + + "github.com/pingcap/parser/mysql" + "github.com/pingcap/tidb/store/tikv/oracle" + "github.com/pingcap/tidb/types" +) + +// TxnRunningState is the current state of a transaction +type TxnRunningState = int32 + +const ( + // TxnRunningNormal means the transaction is running normally + TxnRunningNormal TxnRunningState = iota + // TxnLockWaiting means the transaction is blocked on a lock + TxnLockWaiting + // TxnCommitting means the transaction is (at least trying to) committing + TxnCommitting + // TxnRollingBack means the transaction is rolling back + TxnRollingBack +) + +// TxnRunningStateStrs is the names of the TxnRunningStates +var TxnRunningStateStrs = []string{ + "Normal", "LockWaiting", "Committing", "RollingBack", +} + +// TxnInfo is information about a running transaction +// This is supposed to be the datasource of `TIDB_TRX` in infoschema +type TxnInfo struct { + StartTS uint64 + // digest of SQL current running + CurrentSQLDigest string + // current executing State + State TxnRunningState + // last trying to block start time + BlockStartTime *time.Time + // How many entries are in MemDB + EntriesCount uint64 + // MemDB used memory + EntriesSize uint64 + + // the following fields will be filled in `session` instead of `LazyTxn` + + // Which session this transaction belongs to + ConnectionID uint64 + // The user who open this session + Username string + // The schema this transaction works on + CurrentDB string +} + +// ToDatum Converts the `TxnInfo` to `Datum` to show in the `TIDB_TRX` table +func (info *TxnInfo) ToDatum() []types.Datum { + humanReadableStartTime := time.Unix(0, oracle.ExtractPhysical(info.StartTS)*1e6) + var blockStartTime interface{} + if info.BlockStartTime == nil { + blockStartTime = nil + } else { + blockStartTime = types.NewTime(types.FromGoTime(*info.BlockStartTime), mysql.TypeTimestamp, 0) + } + e, err := types.ParseEnumValue(TxnRunningStateStrs, uint64(info.State+1)) + if err != nil { + panic("this should never happen") + } + state := types.NewMysqlEnumDatum(e) + datums := types.MakeDatums( + info.StartTS, + types.NewTime(types.FromGoTime(humanReadableStartTime), mysql.TypeTimestamp, 0), + info.CurrentSQLDigest, + ) + datums = append(datums, state) + datums = append(datums, types.MakeDatums( + blockStartTime, + info.EntriesCount, + info.EntriesSize, + info.ConnectionID, + info.Username, + info.CurrentDB)...) + return datums +} diff --git a/sessionctx/variable/session.go b/sessionctx/variable/session.go index 618120b5da6e6..d6bb5763e67d8 100644 --- a/sessionctx/variable/session.go +++ b/sessionctx/variable/session.go @@ -492,11 +492,12 @@ type SessionVars struct { AllowWriteRowID bool // AllowBatchCop means if we should send batch coprocessor to TiFlash. Default value is 1, means to use batch cop in case of aggregation and join. - // If value is set to 2 , which means to force to send batch cop for any query. Value is set to 0 means never use batch cop. + // Value set to 2 means to force to send batch cop for any query. Value set to 0 means never use batch cop. AllowBatchCop int - // AllowMPPExecution will prefer using mpp way to execute a query. - AllowMPPExecution bool + // AllowMPPExecution means if we should use mpp way to execute query. Default value is "ON", means to be determined by the optimizer. + // Value set to "ENFORCE" means to use mpp whenever possible. Value set to means never use mpp. + allowMPPExecution string // TiDBAllowAutoRandExplicitInsert indicates whether explicit insertion on auto_random column is allowed. AllowAutoRandExplicitInsert bool @@ -845,6 +846,16 @@ func (s *SessionVars) AllocMPPTaskID(startTS uint64) int64 { return 1 } +// IsMPPAllowed returns whether mpp execution is allowed. +func (s *SessionVars) IsMPPAllowed() bool { + return s.allowMPPExecution != "OFF" +} + +// IsMPPEnforced returns whether mpp execution is enforced. +func (s *SessionVars) IsMPPEnforced() bool { + return s.allowMPPExecution == "ENFORCE" +} + // CheckAndGetTxnScope will return the transaction scope we should use in the current session. func (s *SessionVars) CheckAndGetTxnScope() string { if s.InRestrictedSQL { @@ -1094,7 +1105,7 @@ func NewSessionVars() *SessionVars { terror.Log(vars.SetSystemVar(TiDBEnableStreaming, enableStreaming)) vars.AllowBatchCop = DefTiDBAllowBatchCop - vars.AllowMPPExecution = DefTiDBAllowMPPExecution + vars.allowMPPExecution = DefTiDBAllowMPPExecution var enableChunkRPC string if config.GetGlobalConfig().TiKVClient.EnableChunkRPC { diff --git a/sessionctx/variable/sysvar.go b/sessionctx/variable/sysvar.go index 9a01c19470722..73a8ca0066450 100644 --- a/sessionctx/variable/sysvar.go +++ b/sessionctx/variable/sysvar.go @@ -720,8 +720,8 @@ var defaultSysVars = []*SysVar{ } return nil }}, - {Scope: ScopeGlobal | ScopeSession, Name: TiDBAllowMPPExecution, Type: TypeBool, Value: BoolToOnOff(DefTiDBAllowMPPExecution), SetSession: func(s *SessionVars, val string) error { - s.AllowMPPExecution = TiDBOptOn(val) + {Scope: ScopeGlobal | ScopeSession, Name: TiDBAllowMPPExecution, Value: On, Type: TypeEnum, PossibleValues: []string{"OFF", "ON", "ENFORCE"}, SetSession: func(s *SessionVars, val string) error { + s.allowMPPExecution = val return nil }}, {Scope: ScopeGlobal | ScopeSession, Name: TiDBBCJThresholdCount, Value: strconv.Itoa(DefBroadcastJoinThresholdCount), Type: TypeInt, MinValue: 0, MaxValue: math.MaxInt64, SetSession: func(s *SessionVars, val string) error { @@ -793,7 +793,7 @@ var defaultSysVars = []*SysVar{ s.CPUFactor = tidbOptFloat64(val, DefOptCPUFactor) return nil }}, - {Scope: ScopeGlobal | ScopeSession, Name: TiDBOptTiFlashConcurrencyFactor, Value: strconv.FormatFloat(DefOptTiFlashConcurrencyFactor, 'f', -1, 64), Type: TypeFloat, MinValue: 0, MaxValue: math.MaxUint64, SetSession: func(s *SessionVars, val string) error { + {Scope: ScopeGlobal | ScopeSession, Name: TiDBOptTiFlashConcurrencyFactor, Value: strconv.FormatFloat(DefOptTiFlashConcurrencyFactor, 'f', -1, 64), Type: TypeFloat, MinValue: 1, MaxValue: math.MaxUint64, SetSession: func(s *SessionVars, val string) error { s.CopTiFlashConcurrencyFactor = tidbOptFloat64(val, DefOptTiFlashConcurrencyFactor) return nil }}, diff --git a/sessionctx/variable/tidb_vars.go b/sessionctx/variable/tidb_vars.go index 30d52ac54f386..e416f9a695fc3 100644 --- a/sessionctx/variable/tidb_vars.go +++ b/sessionctx/variable/tidb_vars.go @@ -216,7 +216,6 @@ const ( // A distsql scan task can be a table scan or a index scan, which may be distributed to many TiKV nodes. // Higher concurrency may reduce latency, but with the cost of higher memory usage and system performance impact. // If the query has a LIMIT clause, high concurrency makes the system do much more work than needed. - // tidb_distsql_scan_concurrency is deprecated, use tidb_executor_concurrency instead. TiDBDistSQLScanConcurrency = "tidb_distsql_scan_concurrency" // tidb_opt_insubquery_to_join_and_agg is used to enable/disable the optimizer rule of rewriting IN subquery. @@ -291,6 +290,8 @@ const ( // The default value is 0 TiDBAllowBatchCop = "tidb_allow_batch_cop" + // TiDBAllowMPPExecution means if we should use mpp way to execute query. Default value is 1 (or 'ON'), means to be determined by the optimizer. + // Value set to 2 (or 'ENFORCE') which means to use mpp whenever possible. Value set to 2 (or 'OFF') means never use mpp. TiDBAllowMPPExecution = "tidb_allow_mpp" // TiDBInitChunkSize is used to control the init chunk size during query execution. @@ -614,7 +615,7 @@ const ( DefBroadcastJoinThresholdCount = 10 * 1024 DefTiDBOptimizerSelectivityLevel = 0 DefTiDBAllowBatchCop = 1 - DefTiDBAllowMPPExecution = true + DefTiDBAllowMPPExecution = "ON" DefTiDBTxnMode = "" DefTiDBRowFormatV1 = 1 DefTiDBRowFormatV2 = 2 diff --git a/store/copr/batch_coprocessor.go b/store/copr/batch_coprocessor.go index 3231f95bbc824..af224c59fc38a 100644 --- a/store/copr/batch_coprocessor.go +++ b/store/copr/batch_coprocessor.go @@ -28,7 +28,6 @@ import ( "github.com/pingcap/tidb/kv" txndriver "github.com/pingcap/tidb/store/driver/txn" "github.com/pingcap/tidb/store/tikv" - tikverr "github.com/pingcap/tidb/store/tikv/error" tikvstore "github.com/pingcap/tidb/store/tikv/kv" "github.com/pingcap/tidb/store/tikv/logutil" "github.com/pingcap/tidb/store/tikv/metrics" @@ -262,7 +261,7 @@ func (b *batchCopIterator) recvFromRespCh(ctx context.Context) (resp *batchCopRe return case <-ticker.C: if atomic.LoadUint32(b.vars.Killed) == 1 { - resp = &batchCopResponse{err: tikverr.ErrQueryInterrupted} + resp = &batchCopResponse{err: txndriver.ErrQueryInterrupted} ok = true return } diff --git a/store/copr/coprocessor.go b/store/copr/coprocessor.go index cb799edb16f70..5e7eab303e84f 100644 --- a/store/copr/coprocessor.go +++ b/store/copr/coprocessor.go @@ -37,7 +37,6 @@ import ( tidbmetrics "github.com/pingcap/tidb/metrics" txndriver "github.com/pingcap/tidb/store/driver/txn" "github.com/pingcap/tidb/store/tikv" - tikverr "github.com/pingcap/tidb/store/tikv/error" "github.com/pingcap/tidb/store/tikv/logutil" "github.com/pingcap/tidb/store/tikv/metrics" "github.com/pingcap/tidb/store/tikv/tikvrpc" @@ -477,7 +476,7 @@ func (it *copIterator) recvFromRespCh(ctx context.Context, respCh <-chan *copRes return case <-ticker.C: if atomic.LoadUint32(it.vars.Killed) == 1 { - resp = &copResponse{err: tikverr.ErrQueryInterrupted} + resp = &copResponse{err: txndriver.ErrQueryInterrupted} ok = true return } diff --git a/store/copr/mpp.go b/store/copr/mpp.go index 3ea07e744f9b9..10784912faa9b 100644 --- a/store/copr/mpp.go +++ b/store/copr/mpp.go @@ -29,7 +29,6 @@ import ( "github.com/pingcap/tidb/kv" txndriver "github.com/pingcap/tidb/store/driver/txn" "github.com/pingcap/tidb/store/tikv" - tikverr "github.com/pingcap/tidb/store/tikv/error" "github.com/pingcap/tidb/store/tikv/logutil" "github.com/pingcap/tidb/store/tikv/tikvrpc" "go.uber.org/zap" @@ -406,7 +405,7 @@ func (m *mppIterator) nextImpl(ctx context.Context) (resp *mppResponse, ok bool, return case <-ticker.C: if m.vars != nil && m.vars.Killed != nil && atomic.LoadUint32(m.vars.Killed) == 1 { - err = tikverr.ErrQueryInterrupted + err = txndriver.ErrQueryInterrupted exit = true return } diff --git a/store/driver/txn/error.go b/store/driver/txn/error.go index 5bbdc01024947..4c8e770c44ff7 100644 --- a/store/driver/txn/error.go +++ b/store/driver/txn/error.go @@ -47,9 +47,13 @@ var ( ErrGCTooEarly = dbterror.ClassTiKV.NewStd(errno.ErrGCTooEarly) // ErrTiKVStaleCommand is the error that the command is stale in tikv. ErrTiKVStaleCommand = dbterror.ClassTiKV.NewStd(errno.ErrTiKVStaleCommand) + // ErrQueryInterrupted is the error when the query is interrupted. + ErrQueryInterrupted = dbterror.ClassTiKV.NewStd(errno.ErrQueryInterrupted) // ErrTiKVMaxTimestampNotSynced is the error that tikv's max timestamp is not synced. ErrTiKVMaxTimestampNotSynced = dbterror.ClassTiKV.NewStd(errno.ErrTiKVMaxTimestampNotSynced) - ErrResolveLockTimeout = dbterror.ClassTiKV.NewStd(errno.ErrResolveLockTimeout) + // ErrLockAcquireFailAndNoWaitSet is the error that acquire the lock failed while no wait is setted. + ErrLockAcquireFailAndNoWaitSet = dbterror.ClassTiKV.NewStd(errno.ErrLockAcquireFailAndNoWaitSet) + ErrResolveLockTimeout = dbterror.ClassTiKV.NewStd(errno.ErrResolveLockTimeout) // ErrLockWaitTimeout is the error that wait for the lock is timeout. ErrLockWaitTimeout = dbterror.ClassTiKV.NewStd(errno.ErrLockWaitTimeout) // ErrTiKVServerBusy is the error when tikv server is busy. @@ -60,6 +64,8 @@ var ( ErrPDServerTimeout = dbterror.ClassTiKV.NewStd(errno.ErrPDServerTimeout) // ErrRegionUnavailable is the error when region is not available. ErrRegionUnavailable = dbterror.ClassTiKV.NewStd(errno.ErrRegionUnavailable) + // ErrUnknown is the unknow error. + ErrUnknown = dbterror.ClassTiKV.NewStd(errno.ErrUnknown) ) // Registers error returned from TiKV. @@ -229,6 +235,10 @@ func ToTiDBErr(err error) error { return ErrTiFlashServerTimeout } + if errors.ErrorEqual(err, tikverr.ErrQueryInterrupted) { + return ErrQueryInterrupted + } + if errors.ErrorEqual(err, tikverr.ErrTiKVServerBusy) { return ErrTiKVServerBusy } @@ -249,6 +259,10 @@ func ToTiDBErr(err error) error { return ErrTiKVMaxTimestampNotSynced } + if errors.ErrorEqual(err, tikverr.ErrLockAcquireFailAndNoWaitSet) { + return ErrLockAcquireFailAndNoWaitSet + } + if errors.ErrorEqual(err, tikverr.ErrResolveLockTimeout) { return ErrResolveLockTimeout } @@ -265,6 +279,10 @@ func ToTiDBErr(err error) error { return ErrTokenLimit.GenWithStackByArgs(e.StoreID) } + if errors.ErrorEqual(err, tikverr.ErrUnknown) { + return ErrUnknown + } + return errors.Trace(originErr) } diff --git a/store/driver/txn/txn_driver.go b/store/driver/txn/txn_driver.go index 50bba80d2b54e..739b983fd3d99 100644 --- a/store/driver/txn/txn_driver.go +++ b/store/driver/txn/txn_driver.go @@ -148,8 +148,12 @@ func (txn *tikvTxn) SetOption(opt int, val interface{}) { txn.KVTxn.GetSnapshot().SetTaskID(val.(uint64)) case tikvstore.InfoSchema: txn.SetSchemaVer(val.(tikv.SchemaVer)) + case tikvstore.SchemaAmender: + txn.SetSchemaAmender(val.(tikv.SchemaAmender)) case tikvstore.CommitHook: txn.SetCommitCallback(val.(func(string, error))) + case tikvstore.EnableAsyncCommit: + txn.SetEnableAsyncCommit(val.(bool)) case tikvstore.Enable1PC: txn.SetEnable1PC(val.(bool)) case tikvstore.TxnScope: diff --git a/store/tikv/2pc.go b/store/tikv/2pc.go index 8703b1861c65d..ee94eceec166a 100644 --- a/store/tikv/2pc.go +++ b/store/tikv/2pc.go @@ -825,12 +825,10 @@ func (c *twoPhaseCommitter) checkAsyncCommit() bool { return false } - enableAsyncCommitOption := c.txn.us.GetOption(kv.EnableAsyncCommit) - enableAsyncCommit := enableAsyncCommitOption != nil && enableAsyncCommitOption.(bool) asyncCommitCfg := config.GetGlobalConfig().TiKVClient.AsyncCommit // TODO the keys limit need more tests, this value makes the unit test pass by now. // Async commit is not compatible with Binlog because of the non unique timestamp issue. - if c.sessionID > 0 && enableAsyncCommit && + if c.sessionID > 0 && c.txn.enableAsyncCommit && uint(c.mutations.Len()) <= asyncCommitCfg.KeysLimit && !c.shouldWriteBinlog() { totalKeySize := uint64(0) diff --git a/store/tikv/error/errcode.go b/store/tikv/error/errcode.go deleted file mode 100644 index 01e8db4d12473..0000000000000 --- a/store/tikv/error/errcode.go +++ /dev/null @@ -1,23 +0,0 @@ -// Copyright 2021 PingCAP, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// See the License for the specific language governing permissions and -// limitations under the License. - -package error - -// MySQL error code. -// This value is numeric. It is not portable to other database systems. -const ( - CodeUnknown = 1105 - CodeLockWaitTimeout = 1205 - CodeQueryInterrupted = 1317 - CodeLockAcquireFailAndNoWaitSet = 3572 -) diff --git a/store/tikv/error/error.go b/store/tikv/error/error.go index bde97e8b2e5d4..898354cc11a2d 100644 --- a/store/tikv/error/error.go +++ b/store/tikv/error/error.go @@ -20,7 +20,6 @@ import ( "github.com/pingcap/errors" "github.com/pingcap/kvproto/pkg/kvrpcpb" "github.com/pingcap/kvproto/pkg/pdpb" - "github.com/pingcap/tidb/util/dbterror" ) var ( @@ -38,10 +37,14 @@ var ( ErrTiKVServerTimeout = errors.New("tikv server timeout") // ErrTiFlashServerTimeout is the error when tiflash server is timeout. ErrTiFlashServerTimeout = errors.New("tiflash server timeout") + // ErrQueryInterrupted is the error when the query is interrupted. + ErrQueryInterrupted = errors.New("query interruppted") // ErrTiKVStaleCommand is the error that the command is stale in tikv. ErrTiKVStaleCommand = errors.New("tikv stale command") // ErrTiKVMaxTimestampNotSynced is the error that tikv's max timestamp is not synced. ErrTiKVMaxTimestampNotSynced = errors.New("tikv max timestamp not synced") + // ErrLockAcquireFailAndNoWaitSet is the error that acquire the lock failed while no wait is setted. + ErrLockAcquireFailAndNoWaitSet = errors.New("lock acquired failed and no wait is setted") // ErrResolveLockTimeout is the error that resolve lock timeout. ErrResolveLockTimeout = errors.New("resolve lock timeout") // ErrLockWaitTimeout is the error that wait for the lock is timeout. @@ -52,18 +55,13 @@ var ( ErrTiFlashServerBusy = errors.New("tiflash server busy") // ErrRegionUnavailable is the error when region is not available. ErrRegionUnavailable = errors.New("region unavailable") + // ErrUnknown is the unknow error. + ErrUnknown = errors.New("unknow") ) // MismatchClusterID represents the message that the cluster ID of the PD client does not match the PD. const MismatchClusterID = "mismatch cluster id" -// error instances. -var ( - ErrQueryInterrupted = dbterror.ClassTiKV.NewStd(CodeQueryInterrupted) - ErrLockAcquireFailAndNoWaitSet = dbterror.ClassTiKV.NewStd(CodeLockAcquireFailAndNoWaitSet) - ErrUnknown = dbterror.ClassTiKV.NewStd(CodeUnknown) -) - // IsErrNotFound checks if err is a kind of NotFound error. func IsErrNotFound(err error) bool { return errors.ErrorEqual(err, ErrNotExist) diff --git a/store/tikv/extract_start_ts_test.go b/store/tikv/extract_start_ts_test.go index 1422e387bfda5..a108a0f7e41cb 100644 --- a/store/tikv/extract_start_ts_test.go +++ b/store/tikv/extract_start_ts_test.go @@ -56,8 +56,8 @@ func (s *extractStartTsSuite) SetUpTest(c *C) { Value: "Some Random Label", }}, } - store.resolveTSMu.resolveTS[2] = 102 - store.resolveTSMu.resolveTS[3] = 101 + store.setSafeTS(2, 102) + store.setSafeTS(3, 101) s.store = store } @@ -105,8 +105,8 @@ func (s *extractStartTsSuite) TestExtractStartTs(c *C) { } func (s *extractStartTsSuite) TestMaxPrevSecFallback(c *C) { - s.store.resolveTSMu.resolveTS[2] = 0x8000000000000002 - s.store.resolveTSMu.resolveTS[3] = 0x8000000000000001 + s.store.setSafeTS(2, 0x8000000000000002) + s.store.setSafeTS(3, 0x8000000000000001) i := uint64(100) cases := []kv.TransactionOption{ diff --git a/store/tikv/kv.go b/store/tikv/kv.go index 5ddca52726a04..a487b0024e3e9 100644 --- a/store/tikv/kv.go +++ b/store/tikv/kv.go @@ -82,10 +82,10 @@ type KVStore struct { spMutex sync.RWMutex // this is used to update safePoint and spTime closed chan struct{} // this is used to nofity when the store is closed - resolveTSMu struct { - sync.RWMutex - resolveTS map[uint64]uint64 // storeID -> resolveTS - } + // storeID -> safeTS, stored as map[uint64]uint64 + // safeTS here will be used during the Stale Read process, + // it indicates the safe timestamp point that can be used to read consistent but may not the latest data. + safeTSMap sync.Map replicaReadSeed uint32 // this is used to load balance followers / learners when replica read is enabled } @@ -142,7 +142,6 @@ func NewKVStore(uuid string, pdClient pd.Client, spkv SafePointKV, client Client replicaReadSeed: rand.Uint32(), } store.lockResolver = newLockResolver(store) - store.resolveTSMu.resolveTS = make(map[uint64]uint64) go store.runSafePointChecker() go store.safeTSUpdater() @@ -337,20 +336,30 @@ func (s *KVStore) GetTiKVClient() (client Client) { return s.client } -func (s *KVStore) getMinResolveTSByStores(stores []*Store) uint64 { - failpoint.Inject("injectResolveTS", func(val failpoint.Value) { +func (s *KVStore) getSafeTS(storeID uint64) uint64 { + safeTS, ok := s.safeTSMap.Load(storeID) + if !ok { + return 0 + } + return safeTS.(uint64) +} + +func (s *KVStore) setSafeTS(storeID, safeTS uint64) { + s.safeTSMap.Store(storeID, safeTS) +} + +func (s *KVStore) getMinSafeTSByStores(stores []*Store) uint64 { + failpoint.Inject("injectSafeTS", func(val failpoint.Value) { injectTS := val.(int) failpoint.Return(uint64(injectTS)) }) minSafeTS := uint64(math.MaxUint64) - s.resolveTSMu.RLock() - defer s.resolveTSMu.RUnlock() // when there is no store, return 0 in order to let minStartTS become startTS directly if len(stores) < 1 { return 0 } for _, store := range stores { - safeTS := s.resolveTSMu.resolveTS[store.storeID] + safeTS := s.getSafeTS(store.storeID) if safeTS < minSafeTS { minSafeTS = safeTS } @@ -368,12 +377,12 @@ func (s *KVStore) safeTSUpdater() { case <-s.Closed(): return case <-t.C: - s.updateResolveTS(ctx) + s.updateSafeTS(ctx) } } } -func (s *KVStore) updateResolveTS(ctx context.Context) { +func (s *KVStore) updateSafeTS(ctx context.Context) { stores := s.regionCache.getStoresByType(tikvrpc.TiKV) tikvClient := s.GetTiKVClient() wg := &sync.WaitGroup{} @@ -389,13 +398,11 @@ func (s *KVStore) updateResolveTS(ctx context.Context) { EndKey: []byte(""), }}), ReadTimeoutShort) if err != nil { - logutil.BgLogger().Debug("update resolveTS failed", zap.Error(err), zap.Uint64("store-id", storeID)) + logutil.BgLogger().Debug("update safeTS failed", zap.Error(err), zap.Uint64("store-id", storeID)) return } safeTSResp := resp.Resp.(*kvrpcpb.StoreSafeTSResponse) - s.resolveTSMu.Lock() - s.resolveTSMu.resolveTS[storeID] = safeTSResp.GetSafeTs() - s.resolveTSMu.Unlock() + s.setSafeTS(storeID, safeTSResp.GetSafeTs()) }(ctx, wg, storeID, storeAddr) } wg.Wait() diff --git a/store/tikv/tests/2pc_test.go b/store/tikv/tests/2pc_test.go index d1e635f205efa..5589752043b2b 100644 --- a/store/tikv/tests/2pc_test.go +++ b/store/tikv/tests/2pc_test.go @@ -105,7 +105,7 @@ func (s *testCommitterSuite) begin(c *C) tikv.TxnProbe { func (s *testCommitterSuite) beginAsyncCommit(c *C) tikv.TxnProbe { txn, err := s.store.Begin() c.Assert(err, IsNil) - txn.SetOption(kv.EnableAsyncCommit, true) + txn.SetEnableAsyncCommit(true) return txn } diff --git a/store/tikv/tests/async_commit_test.go b/store/tikv/tests/async_commit_test.go index 0f4985fa7ab86..381771bfa0836 100644 --- a/store/tikv/tests/async_commit_test.go +++ b/store/tikv/tests/async_commit_test.go @@ -134,7 +134,7 @@ func (s *testAsyncCommitCommon) beginAsyncCommitWithLinearizability(c *C) tikv.T func (s *testAsyncCommitCommon) beginAsyncCommit(c *C) tikv.TxnProbe { txn, err := s.store.Begin() c.Assert(err, IsNil) - txn.SetOption(kv.EnableAsyncCommit, true) + txn.SetEnableAsyncCommit(true) return tikv.TxnProbe{KVTxn: txn} } @@ -160,7 +160,7 @@ func (s *testAsyncCommitSuite) SetUpTest(c *C) { func (s *testAsyncCommitSuite) lockKeysWithAsyncCommit(c *C, keys, values [][]byte, primaryKey, primaryValue []byte, commitPrimary bool) (uint64, uint64) { txn, err := s.store.Begin() c.Assert(err, IsNil) - txn.SetOption(kv.EnableAsyncCommit, true) + txn.SetEnableAsyncCommit(true) for i, k := range keys { if len(values[i]) > 0 { err = txn.Set(k, values[i]) diff --git a/store/tikv/tests/snapshot_fail_test.go b/store/tikv/tests/snapshot_fail_test.go index 1360841bd743a..aca3c59099cf7 100644 --- a/store/tikv/tests/snapshot_fail_test.go +++ b/store/tikv/tests/snapshot_fail_test.go @@ -152,6 +152,7 @@ func (s *testSnapshotFailSuite) TestRetryMaxTsPointGetSkipLock(c *C) { err = txn.Set([]byte("k2"), []byte("v2")) c.Assert(err, IsNil) txn.SetOption(kv.EnableAsyncCommit, true) + txn.SetEnableAsyncCommit(true) c.Assert(failpoint.Enable("github.com/pingcap/tidb/store/tikv/asyncCommitDoNothing", "return"), IsNil) c.Assert(failpoint.Enable("github.com/pingcap/tidb/store/tikv/twoPCShortLockTTL", "return"), IsNil) @@ -181,7 +182,7 @@ func (s *testSnapshotFailSuite) TestRetryMaxTsPointGetSkipLock(c *C) { // Prewrite k1 and k2 again without committing them txn, err = s.store.Begin() c.Assert(err, IsNil) - txn.SetOption(kv.EnableAsyncCommit, true) + txn.SetEnableAsyncCommit(true) err = txn.Set([]byte("k1"), []byte("v3")) c.Assert(err, IsNil) err = txn.Set([]byte("k2"), []byte("v4")) @@ -210,7 +211,7 @@ func (s *testSnapshotFailSuite) TestRetryPointGetResolveTS(c *C) { c.Assert(txn.Set([]byte("k1"), []byte("v1")), IsNil) err = txn.Set([]byte("k2"), []byte("v2")) c.Assert(err, IsNil) - txn.SetOption(kv.EnableAsyncCommit, false) + txn.SetEnableAsyncCommit(false) txn.SetEnable1PC(false) txn.SetOption(kv.GuaranteeLinearizability, false) diff --git a/store/tikv/txn.go b/store/tikv/txn.go index 0ae2df13c12e9..a8c0f70f8da8d 100644 --- a/store/tikv/txn.go +++ b/store/tikv/txn.go @@ -82,6 +82,7 @@ type KVTxn struct { syncLog bool priority Priority isPessimistic bool + enableAsyncCommit bool enable1PC bool scope string kvFilter KVFilter @@ -112,12 +113,12 @@ func extractStartTs(store *KVStore, options kv.TransactionOption) (uint64, error } else { stores = allStores } - resolveTS := store.getMinResolveTSByStores(stores) + safeTS := store.getMinSafeTSByStores(stores) startTs = *options.MinStartTS - // If the resolveTS is larger than the minStartTS, we will use resolveTS as StartTS, otherwise we will use + // If the safeTS is larger than the minStartTS, we will use safeTS as StartTS, otherwise we will use // minStartTS directly. - if oracle.CompareTS(startTs, resolveTS) < 0 { - startTs = resolveTS + if oracle.CompareTS(startTs, safeTS) < 0 { + startTs = safeTS } } else if options.MaxPrevSec != nil { bo := NewBackofferWithVars(context.Background(), tsoMaxBackoff, nil) @@ -223,10 +224,6 @@ func (txn *KVTxn) Delete(k []byte) error { func (txn *KVTxn) SetOption(opt int, val interface{}) { txn.us.SetOption(opt, val) txn.snapshot.SetOption(opt, val) - switch opt { - case tikv.SchemaAmender: - txn.schemaAmender = val.(SchemaAmender) - } } // GetOption returns the option @@ -265,12 +262,22 @@ func (txn *KVTxn) SetPriority(pri Priority) { txn.GetSnapshot().SetPriority(pri) } +// SetSchemaAmender sets an amender to update mutations after schema change. +func (txn *KVTxn) SetSchemaAmender(sa SchemaAmender) { + txn.schemaAmender = sa +} + // SetCommitCallback sets up a function that will be called when the transaction // is finished. func (txn *KVTxn) SetCommitCallback(f func(string, error)) { txn.commitCallback = f } +// SetEnableAsyncCommit indicates if the transaction will try to use async commit. +func (txn *KVTxn) SetEnableAsyncCommit(b bool) { + txn.enableAsyncCommit = b +} + // SetEnable1PC indicates if the transaction will try to use 1 phase commit. func (txn *KVTxn) SetEnable1PC(b bool) { txn.enable1PC = b diff --git a/tidb-server/main.go b/tidb-server/main.go index f070d2eeec48d..3e2351bf7c352 100644 --- a/tidb-server/main.go +++ b/tidb-server/main.go @@ -572,7 +572,7 @@ func setGlobalVars() { kvcache.GlobalLRUMemUsageTracker.AttachToGlobalTracker(executor.GlobalMemoryUsageTracker) t, err := time.ParseDuration(cfg.TiKVClient.StoreLivenessTimeout) - if err != nil { + if err != nil || t < 0 { logutil.BgLogger().Fatal("invalid duration value for store-liveness-timeout", zap.String("currentValue", cfg.TiKVClient.StoreLivenessTimeout)) } diff --git a/util/processinfo.go b/util/processinfo.go index 29716d914c3de..ebbf17094b80d 100644 --- a/util/processinfo.go +++ b/util/processinfo.go @@ -22,6 +22,7 @@ import ( "time" "github.com/pingcap/parser/mysql" + "github.com/pingcap/tidb/session/txninfo" "github.com/pingcap/tidb/sessionctx/stmtctx" "github.com/pingcap/tidb/store/tikv/oracle" "github.com/pingcap/tidb/util/execdetails" @@ -161,6 +162,7 @@ func serverStatus2Str(state uint16) string { // kill statement rely on this interface. type SessionManager interface { ShowProcessList() map[uint64]*ProcessInfo + ShowTxnList() []*txninfo.TxnInfo GetProcessInfo(id uint64) (*ProcessInfo, bool) Kill(connectionID uint64, query bool) KillAllConnections() diff --git a/util/testkit/testkit.go b/util/testkit/testkit.go index 4992e28663b1a..06eb826c56b78 100644 --- a/util/testkit/testkit.go +++ b/util/testkit/testkit.go @@ -255,6 +255,17 @@ func (tk *TestKit) MustNoGlobalStats(table string) bool { return true } +// MustPartition checks if the result execution plan must read specific partitions. +func (tk *TestKit) MustPartition(sql string, partitions string, args ...interface{}) bool { + rs := tk.MustQuery("explain "+sql, args...) + for i := range rs.rows { + if strings.Compare(rs.rows[i][3], "partition:"+partitions) == 0 { + return true + } + } + return false +} + // MustUseIndex checks if the result execution plan contains specific index(es). func (tk *TestKit) MustUseIndex(sql string, index string, args ...interface{}) bool { rs := tk.MustQuery("explain "+sql, args...)