From 1b139498333cc7cf73794185c68f6112f18a36dd Mon Sep 17 00:00:00 2001 From: crazycs Date: Tue, 28 Jul 2020 14:20:36 +0800 Subject: [PATCH 1/3] cherry pick #18666 to release-4.0 Signed-off-by: ti-srebot --- executor/builder.go | 4 -- executor/executor.go | 12 ++++- executor/point_get.go | 61 ++++++++++++++++++++++ kv/kv.go | 7 +++ planner/core/integration_test.go | 89 ++++++++++++++++++++++++++++++++ store/tikv/coprocessor.go | 7 +++ store/tikv/region_request.go | 47 +++++++++++++++++ store/tikv/snapshot.go | 85 +++++++++++++++++++++++++++++- store/tikv/snapshot_test.go | 18 +++++++ 9 files changed, 324 insertions(+), 6 deletions(-) diff --git a/executor/builder.go b/executor/builder.go index 63d1a2d5059eb..43c662a4136ab 100644 --- a/executor/builder.go +++ b/executor/builder.go @@ -31,7 +31,6 @@ import ( "github.com/pingcap/parser/ast" "github.com/pingcap/parser/model" "github.com/pingcap/parser/mysql" - "github.com/pingcap/tidb/config" "github.com/pingcap/tidb/distsql" "github.com/pingcap/tidb/domain" "github.com/pingcap/tidb/executor/aggfuncs" @@ -98,9 +97,6 @@ type MockPhysicalPlan interface { } func (b *executorBuilder) build(p plannercore.Plan) Executor { - if config.GetGlobalConfig().EnableCollectExecutionInfo && b.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl == nil { - b.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl = execdetails.NewRuntimeStatsColl() - } switch v := p.(type) { case nil: return nil diff --git a/executor/executor.go b/executor/executor.go index 1421315314ef0..df1d4620f4090 100644 --- a/executor/executor.go +++ b/executor/executor.go @@ -1543,10 +1543,16 @@ func ResetContextOfStmt(ctx sessionctx.Context, s ast.StmtNode) (err error) { DiskTracker: disk.NewTracker(stringutil.MemoizeStr(s.Text), -1), TaskID: stmtctx.AllocateTaskID(), } +<<<<<<< HEAD if config.GetGlobalConfig().OOMUseTmpStorage && GlobalDiskUsageTracker != nil { +======= + sc.MemTracker.AttachToGlobalTracker(GlobalMemoryUsageTracker) + globalConfig := config.GetGlobalConfig() + if globalConfig.OOMUseTmpStorage && GlobalDiskUsageTracker != nil { +>>>>>>> 8b19d67... executor: add runtime information for point-get executor (#18666) sc.DiskTracker.AttachToGlobalTracker(GlobalDiskUsageTracker) } - switch config.GetGlobalConfig().OOMAction { + switch globalConfig.OOMAction { case config.OOMActionCancel: action := &memory.PanicOnExceed{ConnID: ctx.GetSessionVars().ConnectionID} action.SetLogHook(domain.GetDomain(ctx).ExpensiveQueryHandle().LogOnQueryExceedMemQuota) @@ -1670,6 +1676,10 @@ func ResetContextOfStmt(ctx sessionctx.Context, s ast.StmtNode) (err error) { } else if vars.StmtCtx.InSelectStmt { sc.PrevAffectedRows = -1 } + if globalConfig.EnableCollectExecutionInfo { + sc.RuntimeStatsColl = execdetails.NewRuntimeStatsColl() + } + sc.TblInfo2UnionScan = make(map[*model.TableInfo]bool) errCount, warnCount := vars.StmtCtx.NumErrorWarnings() vars.SysErrorCount = errCount diff --git a/executor/point_get.go b/executor/point_get.go index a12b2633ee3aa..356d51b54a753 100644 --- a/executor/point_get.go +++ b/executor/point_get.go @@ -21,12 +21,18 @@ import ( "github.com/pingcap/parser/mysql" "github.com/pingcap/tidb/kv" plannercore "github.com/pingcap/tidb/planner/core" +<<<<<<< HEAD +======= + "github.com/pingcap/tidb/sessionctx" + "github.com/pingcap/tidb/store/tikv" +>>>>>>> 8b19d67... executor: add runtime information for point-get executor (#18666) "github.com/pingcap/tidb/table" "github.com/pingcap/tidb/table/tables" "github.com/pingcap/tidb/tablecodec" "github.com/pingcap/tidb/types" "github.com/pingcap/tidb/util/chunk" "github.com/pingcap/tidb/util/codec" + "github.com/pingcap/tidb/util/execdetails" "github.com/pingcap/tidb/util/rowcodec" ) @@ -80,6 +86,8 @@ type PointGetExecutor struct { // virtualColumnRetFieldTypes records the RetFieldTypes of virtual columns. virtualColumnRetFieldTypes []*types.FieldType + + stats *pointGetRuntimeStats } // Init set fields needed for PointGetExecutor reuse, this does NOT change baseExecutor field @@ -146,10 +154,41 @@ func (e *PointGetExecutor) Next(ctx context.Context, req *chunk.Chunk) error { return err } } + if e.runtimeStats != nil { + snapshotStats := &tikv.SnapshotRuntimeStats{} + e.stats = &pointGetRuntimeStats{ + BasicRuntimeStats: e.runtimeStats, + SnapshotRuntimeStats: snapshotStats, + } + e.snapshot.SetOption(kv.CollectRuntimeStats, snapshotStats) + e.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.RegisterStats(e.id.String(), e.stats) + } if e.ctx.GetSessionVars().GetReplicaRead().IsFollowerRead() { e.snapshot.SetOption(kv.ReplicaRead, kv.ReplicaReadFollower) } e.snapshot.SetOption(kv.TaskID, e.ctx.GetSessionVars().StmtCtx.TaskID) +<<<<<<< HEAD +======= + return nil +} + +// Close implements the Executor interface. +func (e *PointGetExecutor) Close() error { + if e.runtimeStats != nil { + e.snapshot.DelOption(kv.CollectRuntimeStats) + } + return nil +} + +// Next implements the Executor interface. +func (e *PointGetExecutor) Next(ctx context.Context, req *chunk.Chunk) error { + req.Reset() + if e.done { + return nil + } + e.done = true + +>>>>>>> 8b19d67... executor: add runtime information for point-get executor (#18666) var tblID int64 if e.partInfo != nil { tblID = e.partInfo.ID @@ -390,3 +429,25 @@ func getColInfoByID(tbl *model.TableInfo, colID int64) *model.ColumnInfo { } return nil } + +type pointGetRuntimeStats struct { + *execdetails.BasicRuntimeStats + *tikv.SnapshotRuntimeStats +} + +func (e *pointGetRuntimeStats) String() string { + var basic, rpcStatsStr string + if e.BasicRuntimeStats != nil { + basic = e.BasicRuntimeStats.String() + } + if e.SnapshotRuntimeStats != nil { + rpcStatsStr = e.SnapshotRuntimeStats.String() + } + if rpcStatsStr == "" { + return basic + } + if basic == "" { + return rpcStatsStr + } + return basic + ", " + rpcStatsStr +} diff --git a/kv/kv.go b/kv/kv.go index 025464fb2ff7a..22ad1adecf32c 100644 --- a/kv/kv.go +++ b/kv/kv.go @@ -56,6 +56,13 @@ const ( ReplicaRead // Set task ID TaskID +<<<<<<< HEAD +======= + // InfoSchema is schema version used by txn startTS. + InfoSchema + // CollectRuntimeStats is used to enable collect runtime stats. + CollectRuntimeStats +>>>>>>> 8b19d67... executor: add runtime information for point-get executor (#18666) ) // Priority value for transaction priority. diff --git a/planner/core/integration_test.go b/planner/core/integration_test.go index de08a3b84176c..74869db49ace9 100644 --- a/planner/core/integration_test.go +++ b/planner/core/integration_test.go @@ -14,7 +14,9 @@ package core_test import ( + "bytes" "fmt" + "strings" . "github.com/pingcap/check" "github.com/pingcap/errors" @@ -1177,3 +1179,90 @@ func (s *testIntegrationSuite) TestIssue16935(c *C) { tk.MustQuery("SELECT * FROM t0 LEFT JOIN v0 ON TRUE WHERE v0.c0 IS NULL;") } +<<<<<<< HEAD +======= + +func (s *testIntegrationSuite) TestAccessPathOnClusterIndex(c *C) { + tk := testkit.NewTestKit(c, s.store) + tk.MustExec("use test") + tk.MustExec("set @@tidb_enable_clustered_index = 1") + tk.MustExec("drop table if exists t1") + tk.MustExec("create table t1 (a int, b varchar(20), c decimal(40,10), d int, primary key(a,b), key(c))") + tk.MustExec(`insert into t1 values (1,"111",1.1,11), (2,"222",2.2,12), (3,"333",3.3,13)`) + tk.MustExec("analyze table t1") + + var input []string + var output []struct { + SQL string + Plan []string + Res []string + } + s.testData.GetTestCases(c, &input, &output) + for i, tt := range input { + s.testData.OnRecord(func() { + output[i].SQL = tt + output[i].Plan = s.testData.ConvertRowsToStrings(tk.MustQuery("explain " + tt).Rows()) + output[i].Res = s.testData.ConvertRowsToStrings(tk.MustQuery(tt).Sort().Rows()) + }) + tk.MustQuery("explain " + tt).Check(testkit.Rows(output[i].Plan...)) + tk.MustQuery(tt).Sort().Check(testkit.Rows(output[i].Res...)) + } +} + +func (s *testIntegrationSuite) TestClusterIndexUniqueDoubleRead(c *C) { + tk := testkit.NewTestKit(c, s.store) + tk.MustExec("create database cluster_idx_unique_double_read;") + tk.MustExec("use cluster_idx_unique_double_read;") + defer tk.MustExec("drop database cluster_idx_unique_double_read;") + tk.MustExec("set @@tidb_enable_clustered_index = 1") + tk.MustExec("drop table if exists t") + + tk.MustExec("create table t (a varchar(64), b varchar(64), uk int, v int, primary key(a, b), unique key uuk(uk));") + tk.MustExec("insert t values ('a', 'a1', 1, 11), ('b', 'b1', 2, 22), ('c', 'c1', 3, 33);") + tk.MustQuery("select * from t use index (uuk);").Check(testkit.Rows("a a1 1 11", "b b1 2 22", "c c1 3 33")) +} + +func (s *testIntegrationSuite) TestIndexJoinOnClusteredIndex(c *C) { + tk := testkit.NewTestKit(c, s.store) + tk.MustExec("use test") + tk.MustExec("set @@tidb_enable_clustered_index = 1") + tk.MustExec("drop table if exists t1") + tk.MustExec("create table t (a int, b varchar(20), c decimal(40,10), d int, primary key(a,b), key(c))") + tk.MustExec(`insert into t values (1,"111",1.1,11), (2,"222",2.2,12), (3,"333",3.3,13)`) + tk.MustExec("analyze table t") + + var input []string + var output []struct { + SQL string + Plan []string + Res []string + } + s.testData.GetTestCases(c, &input, &output) + for i, tt := range input { + s.testData.OnRecord(func() { + output[i].SQL = tt + output[i].Plan = s.testData.ConvertRowsToStrings(tk.MustQuery("explain " + tt).Rows()) + output[i].Res = s.testData.ConvertRowsToStrings(tk.MustQuery(tt).Rows()) + }) + tk.MustQuery("explain " + tt).Check(testkit.Rows(output[i].Plan...)) + tk.MustQuery(tt).Check(testkit.Rows(output[i].Res...)) + } +} + +func (s *testIntegrationSerialSuite) TestExplainAnalyzePointGet(c *C) { + tk := testkit.NewTestKit(c, s.store) + tk.MustExec("use test") + tk.MustExec("drop table if exists t") + tk.MustExec("create table t(a int primary key, b varchar(20))") + tk.MustExec("insert into t values (1,1)") + + res := tk.MustQuery("explain analyze select * from t where a=1;") + resBuff := bytes.NewBufferString("") + for _, row := range res.Rows() { + fmt.Fprintf(resBuff, "%s\n", row) + } + explain := resBuff.String() + c.Assert(strings.Contains(explain, "Get:{num_rpc:"), IsTrue, Commentf("%s", explain)) + c.Assert(strings.Contains(explain, "total_time:"), IsTrue, Commentf("%s", explain)) +} +>>>>>>> 8b19d67... executor: add runtime information for point-get executor (#18666) diff --git a/store/tikv/coprocessor.go b/store/tikv/coprocessor.go index 58afdf4e9ddde..a848f1a319d92 100644 --- a/store/tikv/coprocessor.go +++ b/store/tikv/coprocessor.go @@ -821,6 +821,7 @@ type clientHelper struct { *minCommitTSPushed Client resolveLite bool + stats map[tikvrpc.CmdType]*RegionRequestRuntimeStats } // ResolveLocks wraps the ResolveLocks function and store the resolved result. @@ -828,6 +829,11 @@ func (ch *clientHelper) ResolveLocks(bo *Backoffer, callerStartTS uint64, locks var err error var resolvedLocks []uint64 var msBeforeTxnExpired int64 + if ch.stats != nil { + defer func(start time.Time) { + recordRegionRequestRuntimeStats(ch.stats, tikvrpc.CmdResolveLock, time.Since(start)) + }(time.Now()) + } if ch.resolveLite { msBeforeTxnExpired, resolvedLocks, err = ch.LockResolver.resolveLocksLite(bo, callerStartTS, locks) } else { @@ -849,6 +855,7 @@ func (ch *clientHelper) SendReqCtx(bo *Backoffer, req *tikvrpc.Request, regionID if len(directStoreAddr) > 0 { sender.storeAddr = directStoreAddr } + sender.stats = ch.stats req.Context.ResolvedLocks = ch.minCommitTSPushed.Get() resp, ctx, err := sender.SendReqCtx(bo, req, regionID, timeout, sType) return resp, ctx, sender.storeAddr, err diff --git a/store/tikv/region_request.go b/store/tikv/region_request.go index ec19577035f48..15832ac527d0a 100644 --- a/store/tikv/region_request.go +++ b/store/tikv/region_request.go @@ -60,6 +60,14 @@ type RegionRequestSender struct { storeAddr string rpcError error failStoreIDs map[uint64]struct{} + stats map[tikvrpc.CmdType]*RegionRequestRuntimeStats +} + +// RegionRequestRuntimeStats records the runtime stats of send region requests. +type RegionRequestRuntimeStats struct { + count int64 + // Send region request consume time. + consume int64 } // RegionBatchRequestSender sends BatchCop requests to TiFlash server by stream way. @@ -78,7 +86,16 @@ func (ss *RegionBatchRequestSender) sendReqToAddr(bo *Backoffer, ctxs []copTaskA if e := tikvrpc.SetContext(req, ctx.Meta, ctx.Peer); e != nil { return nil, false, errors.Trace(e) } +<<<<<<< HEAD resp, err = ss.client.SendRequest(bo.ctx, ctx.Addr, req, timout) +======= + if ss.stats != nil { + defer func(start time.Time) { + recordRegionRequestRuntimeStats(ss.stats, req.Type, time.Since(start)) + }(time.Now()) + } + resp, err = ss.client.SendRequest(ctx, rpcCtx.Addr, req, timout) +>>>>>>> 8b19d67... executor: add runtime information for point-get executor (#18666) if err != nil { ss.rpcError = err for _, failedCtx := range ctxs { @@ -93,6 +110,19 @@ func (ss *RegionBatchRequestSender) sendReqToAddr(bo *Backoffer, ctxs []copTaskA return } +func recordRegionRequestRuntimeStats(stats map[tikvrpc.CmdType]*RegionRequestRuntimeStats, cmd tikvrpc.CmdType, d time.Duration) { + stat, ok := stats[cmd] + if !ok { + stats[cmd] = &RegionRequestRuntimeStats{ + count: 1, + consume: int64(d), + } + return + } + stat.count++ + stat.consume += int64(d) +} + func (ss *RegionBatchRequestSender) onSendFail(bo *Backoffer, ctx *RPCContext, err error) error { // If it failed because the context is cancelled by ourself, don't retry. if errors.Cause(err) == context.Canceled || status.Code(errors.Cause(err)) == codes.Canceled { @@ -263,8 +293,25 @@ func (s *RegionRequestSender) sendReqToRegion(bo *Backoffer, ctx *RPCContext, re } defer s.releaseStoreToken(ctx.Store) } +<<<<<<< HEAD resp, err = s.client.SendRequest(bo.ctx, ctx.Addr, req, timeout) +======= + + if s.stats != nil { + defer func(start time.Time) { + recordRegionRequestRuntimeStats(s.stats, req.Type, time.Since(start)) + }(time.Now()) + } + + ctx := bo.ctx + if rawHook := ctx.Value(RPCCancellerCtxKey{}); rawHook != nil { + var cancel context.CancelFunc + ctx, cancel = rawHook.(*RPCCanceller).WithCancel(ctx) + defer cancel() + } + resp, err = s.client.SendRequest(ctx, rpcCtx.Addr, req, timeout) +>>>>>>> 8b19d67... executor: add runtime information for point-get executor (#18666) if err != nil { s.rpcError = err if e := s.onSendFail(bo, ctx, err); e != nil { diff --git a/store/tikv/snapshot.go b/store/tikv/snapshot.go index ed81e2ef90576..d1286fb0eff44 100644 --- a/store/tikv/snapshot.go +++ b/store/tikv/snapshot.go @@ -72,6 +72,7 @@ type tikvSnapshot struct { sync.RWMutex cached map[string][]byte } + stats *SnapshotRuntimeStats } // newTiKVSnapshot creates a snapshot of an TiKV store. @@ -139,6 +140,7 @@ func (s *tikvSnapshot) BatchGet(ctx context.Context, keys []kv.Key) (map[string] m[string(k)] = v mu.Unlock() }) + s.recordBackoffInfo(bo) if err != nil { return nil, errors.Trace(err) } @@ -229,6 +231,12 @@ func (s *tikvSnapshot) batchGetSingleRegion(bo *Backoffer, batch batchKeys, coll minCommitTSPushed: &s.minCommitTSPushed, Client: s.store.client, } + if s.stats != nil { + cli.stats = make(map[tikvrpc.CmdType]*RegionRequestRuntimeStats) + defer func() { + s.mergeRegionRequestStats(cli.stats) + }() + } pending := batch.keys for { @@ -306,7 +314,9 @@ func (s *tikvSnapshot) Get(ctx context.Context, k kv.Key) ([]byte, error) { } ctx = context.WithValue(ctx, txnStartKey, s.version.Ver) - val, err := s.get(NewBackofferWithVars(ctx, getMaxBackoff, s.vars), k) + bo := NewBackofferWithVars(ctx, getMaxBackoff, s.vars) + val, err := s.get(bo, k) + s.recordBackoffInfo(bo) if err != nil { return nil, errors.Trace(err) } @@ -345,6 +355,12 @@ func (s *tikvSnapshot) get(bo *Backoffer, k kv.Key) ([]byte, error) { Client: s.store.client, resolveLite: true, } + if s.stats != nil { + cli.stats = make(map[tikvrpc.CmdType]*RegionRequestRuntimeStats) + defer func() { + s.mergeRegionRequestStats(cli.stats) + }() + } req := tikvrpc.NewReplicaReadRequest(tikvrpc.CmdGet, &pb.GetRequest{ @@ -423,6 +439,8 @@ func (s *tikvSnapshot) SetOption(opt kv.Option, val interface{}) { s.priority = kvPriorityToCommandPri(val.(int)) case kv.TaskID: s.taskID = val.(uint64) + case kv.CollectRuntimeStats: + s.stats = val.(*SnapshotRuntimeStats) } } @@ -431,6 +449,8 @@ func (s *tikvSnapshot) DelOption(opt kv.Option) { switch opt { case kv.ReplicaRead: s.replicaRead = kv.ReplicaReadLeader + case kv.CollectRuntimeStats: + s.stats = nil } } @@ -536,3 +556,66 @@ func prettyWriteKey(buf *bytes.Buffer, key []byte) { logutil.BgLogger().Error("error", zap.Error(err4)) } } + +func (s *tikvSnapshot) recordBackoffInfo(bo *Backoffer) { + if s.stats == nil || bo.totalSleep == 0 { + return + } + s.mu.Lock() + defer s.mu.Unlock() + if s.stats.backoffSleepMS == nil { + s.stats.backoffSleepMS = bo.backoffSleepMS + s.stats.backoffTimes = bo.backoffTimes + return + } + for k, v := range bo.backoffSleepMS { + s.stats.backoffSleepMS[k] += v + } + for k, v := range bo.backoffTimes { + s.stats.backoffTimes[k] += v + } +} + +func (s *tikvSnapshot) mergeRegionRequestStats(stats map[tikvrpc.CmdType]*RegionRequestRuntimeStats) { + s.mu.Lock() + defer s.mu.Unlock() + if s.stats.rpcStats == nil { + s.stats.rpcStats = stats + return + } + for k, v := range stats { + stat, ok := s.stats.rpcStats[k] + if !ok { + s.stats.rpcStats[k] = v + continue + } + stat.count += v.count + stat.consume += v.consume + } +} + +// SnapshotRuntimeStats records the runtime stats of snapshot. +type SnapshotRuntimeStats struct { + rpcStats map[tikvrpc.CmdType]*RegionRequestRuntimeStats + backoffSleepMS map[backoffType]int + backoffTimes map[backoffType]int +} + +// String implements fmt.Stringer interface. +func (rs *SnapshotRuntimeStats) String() string { + var buf bytes.Buffer + for k, v := range rs.rpcStats { + if buf.Len() > 0 { + buf.WriteByte(',') + } + buf.WriteString(fmt.Sprintf("%s:{num_rpc:%d, total_time:%s}", k.String(), v.count, time.Duration(v.consume))) + } + for k, v := range rs.backoffTimes { + if buf.Len() > 0 { + buf.WriteByte(',') + } + ms := rs.backoffSleepMS[k] + buf.WriteString(fmt.Sprintf("%s_backoff:{num:%d, total_time:%d ms}", k.String(), v, ms)) + } + return buf.String() +} diff --git a/store/tikv/snapshot_test.go b/store/tikv/snapshot_test.go index 45259a7088aa7..dd3c6567393a0 100644 --- a/store/tikv/snapshot_test.go +++ b/store/tikv/snapshot_test.go @@ -24,6 +24,7 @@ import ( "github.com/pingcap/failpoint" pb "github.com/pingcap/kvproto/pkg/kvrpcpb" "github.com/pingcap/tidb/kv" + "github.com/pingcap/tidb/store/tikv/tikvrpc" "github.com/pingcap/tidb/util/logutil" "go.uber.org/zap" ) @@ -300,3 +301,20 @@ func (s *testSnapshotSuite) TestSnapshotThreadSafe(c *C) { } wg.Wait() } + +func (s *testSnapshotSuite) TestSnapshotRuntimeStats(c *C) { + reqStats := make(map[tikvrpc.CmdType]*RegionRequestRuntimeStats) + recordRegionRequestRuntimeStats(reqStats, tikvrpc.CmdGet, time.Second) + recordRegionRequestRuntimeStats(reqStats, tikvrpc.CmdGet, time.Millisecond) + snapshot := newTiKVSnapshot(s.store, kv.Version{Ver: 0}, 0) + snapshot.SetOption(kv.CollectRuntimeStats, &SnapshotRuntimeStats{}) + snapshot.mergeRegionRequestStats(reqStats) + snapshot.mergeRegionRequestStats(reqStats) + bo := NewBackofferWithVars(context.Background(), 2000, nil) + err := bo.BackoffWithMaxSleep(boTxnLockFast, 30, errors.New("test")) + c.Assert(err, IsNil) + snapshot.recordBackoffInfo(bo) + snapshot.recordBackoffInfo(bo) + expect := "Get:{num_rpc:4, total_time:2.002s},txnLockFast_backoff:{num:2, total_time:60 ms}" + c.Assert(snapshot.stats.String(), Equals, expect) +} From 8ab89b4ddc67db395420563c0307b9d4d6c68a33 Mon Sep 17 00:00:00 2001 From: crazycs520 Date: Wed, 29 Jul 2020 10:17:33 +0800 Subject: [PATCH 2/3] fix conflict Signed-off-by: crazycs520 --- executor/executor.go | 5 --- executor/point_get.go | 29 ++----------- kv/kv.go | 5 --- planner/core/integration_test.go | 70 -------------------------------- store/tikv/region_request.go | 20 +-------- store/tikv/snapshot.go | 1 + 6 files changed, 6 insertions(+), 124 deletions(-) diff --git a/executor/executor.go b/executor/executor.go index 9fa4c57db1cd5..c69a6d21dacc3 100644 --- a/executor/executor.go +++ b/executor/executor.go @@ -1544,13 +1544,8 @@ func ResetContextOfStmt(ctx sessionctx.Context, s ast.StmtNode) (err error) { DiskTracker: disk.NewTracker(stringutil.MemoizeStr(s.Text), -1), TaskID: stmtctx.AllocateTaskID(), } -<<<<<<< HEAD - if config.GetGlobalConfig().OOMUseTmpStorage && GlobalDiskUsageTracker != nil { -======= - sc.MemTracker.AttachToGlobalTracker(GlobalMemoryUsageTracker) globalConfig := config.GetGlobalConfig() if globalConfig.OOMUseTmpStorage && GlobalDiskUsageTracker != nil { ->>>>>>> 8b19d67... executor: add runtime information for point-get executor (#18666) sc.DiskTracker.AttachToGlobalTracker(GlobalDiskUsageTracker) } switch globalConfig.OOMAction { diff --git a/executor/point_get.go b/executor/point_get.go index 356d51b54a753..27fb01efcd938 100644 --- a/executor/point_get.go +++ b/executor/point_get.go @@ -21,11 +21,7 @@ import ( "github.com/pingcap/parser/mysql" "github.com/pingcap/tidb/kv" plannercore "github.com/pingcap/tidb/planner/core" -<<<<<<< HEAD -======= - "github.com/pingcap/tidb/sessionctx" "github.com/pingcap/tidb/store/tikv" ->>>>>>> 8b19d67... executor: add runtime information for point-get executor (#18666) "github.com/pingcap/tidb/table" "github.com/pingcap/tidb/table/tables" "github.com/pingcap/tidb/tablecodec" @@ -125,6 +121,9 @@ func (e *PointGetExecutor) Open(context.Context) error { // Close implements the Executor interface. func (e *PointGetExecutor) Close() error { + if e.runtimeStats != nil { + e.snapshot.DelOption(kv.CollectRuntimeStats) + } return nil } @@ -167,28 +166,6 @@ func (e *PointGetExecutor) Next(ctx context.Context, req *chunk.Chunk) error { e.snapshot.SetOption(kv.ReplicaRead, kv.ReplicaReadFollower) } e.snapshot.SetOption(kv.TaskID, e.ctx.GetSessionVars().StmtCtx.TaskID) -<<<<<<< HEAD -======= - return nil -} - -// Close implements the Executor interface. -func (e *PointGetExecutor) Close() error { - if e.runtimeStats != nil { - e.snapshot.DelOption(kv.CollectRuntimeStats) - } - return nil -} - -// Next implements the Executor interface. -func (e *PointGetExecutor) Next(ctx context.Context, req *chunk.Chunk) error { - req.Reset() - if e.done { - return nil - } - e.done = true - ->>>>>>> 8b19d67... executor: add runtime information for point-get executor (#18666) var tblID int64 if e.partInfo != nil { tblID = e.partInfo.ID diff --git a/kv/kv.go b/kv/kv.go index 22ad1adecf32c..71e002e493f58 100644 --- a/kv/kv.go +++ b/kv/kv.go @@ -56,13 +56,8 @@ const ( ReplicaRead // Set task ID TaskID -<<<<<<< HEAD -======= - // InfoSchema is schema version used by txn startTS. - InfoSchema // CollectRuntimeStats is used to enable collect runtime stats. CollectRuntimeStats ->>>>>>> 8b19d67... executor: add runtime information for point-get executor (#18666) ) // Priority value for transaction priority. diff --git a/planner/core/integration_test.go b/planner/core/integration_test.go index 74869db49ace9..3cccacc820ea3 100644 --- a/planner/core/integration_test.go +++ b/planner/core/integration_test.go @@ -1179,75 +1179,6 @@ func (s *testIntegrationSuite) TestIssue16935(c *C) { tk.MustQuery("SELECT * FROM t0 LEFT JOIN v0 ON TRUE WHERE v0.c0 IS NULL;") } -<<<<<<< HEAD -======= - -func (s *testIntegrationSuite) TestAccessPathOnClusterIndex(c *C) { - tk := testkit.NewTestKit(c, s.store) - tk.MustExec("use test") - tk.MustExec("set @@tidb_enable_clustered_index = 1") - tk.MustExec("drop table if exists t1") - tk.MustExec("create table t1 (a int, b varchar(20), c decimal(40,10), d int, primary key(a,b), key(c))") - tk.MustExec(`insert into t1 values (1,"111",1.1,11), (2,"222",2.2,12), (3,"333",3.3,13)`) - tk.MustExec("analyze table t1") - - var input []string - var output []struct { - SQL string - Plan []string - Res []string - } - s.testData.GetTestCases(c, &input, &output) - for i, tt := range input { - s.testData.OnRecord(func() { - output[i].SQL = tt - output[i].Plan = s.testData.ConvertRowsToStrings(tk.MustQuery("explain " + tt).Rows()) - output[i].Res = s.testData.ConvertRowsToStrings(tk.MustQuery(tt).Sort().Rows()) - }) - tk.MustQuery("explain " + tt).Check(testkit.Rows(output[i].Plan...)) - tk.MustQuery(tt).Sort().Check(testkit.Rows(output[i].Res...)) - } -} - -func (s *testIntegrationSuite) TestClusterIndexUniqueDoubleRead(c *C) { - tk := testkit.NewTestKit(c, s.store) - tk.MustExec("create database cluster_idx_unique_double_read;") - tk.MustExec("use cluster_idx_unique_double_read;") - defer tk.MustExec("drop database cluster_idx_unique_double_read;") - tk.MustExec("set @@tidb_enable_clustered_index = 1") - tk.MustExec("drop table if exists t") - - tk.MustExec("create table t (a varchar(64), b varchar(64), uk int, v int, primary key(a, b), unique key uuk(uk));") - tk.MustExec("insert t values ('a', 'a1', 1, 11), ('b', 'b1', 2, 22), ('c', 'c1', 3, 33);") - tk.MustQuery("select * from t use index (uuk);").Check(testkit.Rows("a a1 1 11", "b b1 2 22", "c c1 3 33")) -} - -func (s *testIntegrationSuite) TestIndexJoinOnClusteredIndex(c *C) { - tk := testkit.NewTestKit(c, s.store) - tk.MustExec("use test") - tk.MustExec("set @@tidb_enable_clustered_index = 1") - tk.MustExec("drop table if exists t1") - tk.MustExec("create table t (a int, b varchar(20), c decimal(40,10), d int, primary key(a,b), key(c))") - tk.MustExec(`insert into t values (1,"111",1.1,11), (2,"222",2.2,12), (3,"333",3.3,13)`) - tk.MustExec("analyze table t") - - var input []string - var output []struct { - SQL string - Plan []string - Res []string - } - s.testData.GetTestCases(c, &input, &output) - for i, tt := range input { - s.testData.OnRecord(func() { - output[i].SQL = tt - output[i].Plan = s.testData.ConvertRowsToStrings(tk.MustQuery("explain " + tt).Rows()) - output[i].Res = s.testData.ConvertRowsToStrings(tk.MustQuery(tt).Rows()) - }) - tk.MustQuery("explain " + tt).Check(testkit.Rows(output[i].Plan...)) - tk.MustQuery(tt).Check(testkit.Rows(output[i].Res...)) - } -} func (s *testIntegrationSerialSuite) TestExplainAnalyzePointGet(c *C) { tk := testkit.NewTestKit(c, s.store) @@ -1265,4 +1196,3 @@ func (s *testIntegrationSerialSuite) TestExplainAnalyzePointGet(c *C) { c.Assert(strings.Contains(explain, "Get:{num_rpc:"), IsTrue, Commentf("%s", explain)) c.Assert(strings.Contains(explain, "total_time:"), IsTrue, Commentf("%s", explain)) } ->>>>>>> 8b19d67... executor: add runtime information for point-get executor (#18666) diff --git a/store/tikv/region_request.go b/store/tikv/region_request.go index 15832ac527d0a..27f0f454a716a 100644 --- a/store/tikv/region_request.go +++ b/store/tikv/region_request.go @@ -86,16 +86,12 @@ func (ss *RegionBatchRequestSender) sendReqToAddr(bo *Backoffer, ctxs []copTaskA if e := tikvrpc.SetContext(req, ctx.Meta, ctx.Peer); e != nil { return nil, false, errors.Trace(e) } -<<<<<<< HEAD - resp, err = ss.client.SendRequest(bo.ctx, ctx.Addr, req, timout) -======= if ss.stats != nil { defer func(start time.Time) { recordRegionRequestRuntimeStats(ss.stats, req.Type, time.Since(start)) }(time.Now()) } - resp, err = ss.client.SendRequest(ctx, rpcCtx.Addr, req, timout) ->>>>>>> 8b19d67... executor: add runtime information for point-get executor (#18666) + resp, err = ss.client.SendRequest(bo.ctx, ctx.Addr, req, timout) if err != nil { ss.rpcError = err for _, failedCtx := range ctxs { @@ -293,25 +289,13 @@ func (s *RegionRequestSender) sendReqToRegion(bo *Backoffer, ctx *RPCContext, re } defer s.releaseStoreToken(ctx.Store) } -<<<<<<< HEAD - resp, err = s.client.SendRequest(bo.ctx, ctx.Addr, req, timeout) - -======= - if s.stats != nil { defer func(start time.Time) { recordRegionRequestRuntimeStats(s.stats, req.Type, time.Since(start)) }(time.Now()) } - ctx := bo.ctx - if rawHook := ctx.Value(RPCCancellerCtxKey{}); rawHook != nil { - var cancel context.CancelFunc - ctx, cancel = rawHook.(*RPCCanceller).WithCancel(ctx) - defer cancel() - } - resp, err = s.client.SendRequest(ctx, rpcCtx.Addr, req, timeout) ->>>>>>> 8b19d67... executor: add runtime information for point-get executor (#18666) + resp, err = s.client.SendRequest(bo.ctx, ctx.Addr, req, timeout) if err != nil { s.rpcError = err if e := s.onSendFail(bo, ctx, err); e != nil { diff --git a/store/tikv/snapshot.go b/store/tikv/snapshot.go index d1286fb0eff44..23b58e70d49da 100644 --- a/store/tikv/snapshot.go +++ b/store/tikv/snapshot.go @@ -20,6 +20,7 @@ import ( "fmt" "strings" "sync" + "time" "unsafe" "github.com/opentracing/opentracing-go" From c7ae101db6b5de37b8951c185fba7169423b52d6 Mon Sep 17 00:00:00 2001 From: crazycs520 Date: Wed, 29 Jul 2020 11:10:51 +0800 Subject: [PATCH 3/3] fix panic Signed-off-by: crazycs520 --- executor/point_get.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/executor/point_get.go b/executor/point_get.go index 27fb01efcd938..0719552eb11a8 100644 --- a/executor/point_get.go +++ b/executor/point_get.go @@ -121,7 +121,7 @@ func (e *PointGetExecutor) Open(context.Context) error { // Close implements the Executor interface. func (e *PointGetExecutor) Close() error { - if e.runtimeStats != nil { + if e.runtimeStats != nil && e.snapshot != nil { e.snapshot.DelOption(kv.CollectRuntimeStats) } return nil