diff --git a/distsql/distsql.go b/distsql/distsql.go index 83236ce4dc5f4..fa556536dd380 100644 --- a/distsql/distsql.go +++ b/distsql/distsql.go @@ -74,7 +74,8 @@ func Select(ctx context.Context, sctx sessionctx.Context, kvReq *kv.Request, fie kvReq.Streaming = false } enabledRateLimitAction := sctx.GetSessionVars().EnabledRateLimitAction - resp := sctx.GetClient().Send(ctx, kvReq, sctx.GetSessionVars().KVVars, sctx.GetSessionVars().StmtCtx.MemTracker, enabledRateLimitAction) + diagInfo := kv.DiagnosticInfo{Stmt: sctx.GetSessionVars().StmtCtx.OriginalSQL} + resp := sctx.GetClient().Send(ctx, kvReq, sctx.GetSessionVars().KVVars, sctx.GetSessionVars().StmtCtx.MemTracker, enabledRateLimitAction, diagInfo) if resp == nil { err := errors.New("client returns nil response") return nil, err @@ -136,7 +137,7 @@ func SelectWithRuntimeStats(ctx context.Context, sctx sessionctx.Context, kvReq // Analyze do a analyze request. func Analyze(ctx context.Context, client kv.Client, kvReq *kv.Request, vars *kv.Variables, isRestrict bool, sessionMemTracker *memory.Tracker) (SelectResult, error) { - resp := client.Send(ctx, kvReq, vars, sessionMemTracker, false) + resp := client.Send(ctx, kvReq, vars, sessionMemTracker, false, kv.DiagnosticInfo{Stmt: "/* analyze */"}) if resp == nil { return nil, errors.New("client returns nil response") } @@ -159,7 +160,7 @@ func Analyze(ctx context.Context, client kv.Client, kvReq *kv.Request, vars *kv. func Checksum(ctx context.Context, client kv.Client, kvReq *kv.Request, vars *kv.Variables) (SelectResult, error) { // FIXME: As BR have dependency of `Checksum` and TiDB also introduced BR as dependency, Currently we can't edit // Checksum function signature. The two-way dependence should be removed in future. - resp := client.Send(ctx, kvReq, vars, nil, false) + resp := client.Send(ctx, kvReq, vars, nil, false, kv.DiagnosticInfo{Stmt: "/* checksum */"}) if resp == nil { return nil, errors.New("client returns nil response") } diff --git a/executor/simple.go b/executor/simple.go index f0d3135d21e6e..921f2664889d6 100644 --- a/executor/simple.go +++ b/executor/simple.go @@ -1286,7 +1286,8 @@ func killRemoteConn(ctx context.Context, sctx sessionctx.Context, connID *util.G return err } - resp := sctx.GetClient().Send(ctx, kvReq, sctx.GetSessionVars().KVVars, sctx.GetSessionVars().StmtCtx.MemTracker, false) + diagInfo := kv.DiagnosticInfo{Stmt: sctx.GetSessionVars().StmtCtx.OriginalSQL} + resp := sctx.GetClient().Send(ctx, kvReq, sctx.GetSessionVars().KVVars, sctx.GetSessionVars().StmtCtx.MemTracker, false, diagInfo) if resp == nil { err := errors.New("client returns nil response") return err diff --git a/kv/kv.go b/kv/kv.go index bcebff808ca1f..6d97b47ed3c47 100644 --- a/kv/kv.go +++ b/kv/kv.go @@ -326,10 +326,15 @@ type ReturnedValue struct { AlreadyLocked bool } +// DiagnosticInfo is used to pass information to lower layers for diagnostic purposes. +type DiagnosticInfo struct { + Stmt string +} + // Client is used to send request to KV layer. type Client interface { // Send sends request to KV layer, returns a Response. - Send(ctx context.Context, req *Request, vars *Variables, sessionMemTracker *memory.Tracker, enabledRateLimitAction bool) Response + Send(ctx context.Context, req *Request, vars *Variables, sessionMemTracker *memory.Tracker, enabledRateLimitAction bool, diagInfo DiagnosticInfo) Response // IsRequestTypeSupported checks if reqType and subType is supported. IsRequestTypeSupported(reqType, subType int64) bool diff --git a/store/copr/coprocessor.go b/store/copr/coprocessor.go index 90427eed8d217..36bd9b073bbed 100644 --- a/store/copr/coprocessor.go +++ b/store/copr/coprocessor.go @@ -62,14 +62,14 @@ type CopClient struct { } // Send builds the request and gets the coprocessor iterator response. -func (c *CopClient) Send(ctx context.Context, req *kv.Request, vars *kv.Variables, sessionMemTracker *memory.Tracker, enabledRateLimitAction bool) kv.Response { +func (c *CopClient) Send(ctx context.Context, req *kv.Request, vars *kv.Variables, sessionMemTracker *memory.Tracker, enabledRateLimitAction bool, diagInfo kv.DiagnosticInfo) kv.Response { if req.StoreType == kv.TiFlash && req.BatchCop { logutil.BgLogger().Debug("send batch requests") return c.sendBatch(ctx, req, vars) } ctx = context.WithValue(ctx, tikv.TxnStartKey, req.StartTs) bo := tikv.NewBackofferWithVars(ctx, copBuildTaskMaxBackoff, vars) - tasks, err := buildCopTasks(bo, c.store.GetRegionCache(), tikv.NewKeyRanges(req.KeyRanges), req) + tasks, err := buildCopTasks(bo, c.store.GetRegionCache(), tikv.NewKeyRanges(req.KeyRanges), req, diagInfo) if err != nil { return copErrorResponse{err} } @@ -128,6 +128,8 @@ type copTask struct { storeAddr string cmdType tikvrpc.CmdType storeType kv.StoreType + + diagInfo kv.DiagnosticInfo } func (r *copTask) String() string { @@ -138,7 +140,7 @@ func (r *copTask) String() string { // rangesPerTask limits the length of the ranges slice sent in one copTask. const rangesPerTask = 25000 -func buildCopTasks(bo *tikv.Backoffer, cache *tikv.RegionCache, ranges *tikv.KeyRanges, req *kv.Request) ([]*copTask, error) { +func buildCopTasks(bo *tikv.Backoffer, cache *tikv.RegionCache, ranges *tikv.KeyRanges, req *kv.Request, diagInfo kv.DiagnosticInfo) ([]*copTask, error) { start := time.Now() cmdType := tikvrpc.CmdCop if req.Streaming { @@ -165,6 +167,7 @@ func buildCopTasks(bo *tikv.Backoffer, cache *tikv.RegionCache, ranges *tikv.Key respChan: make(chan *copResponse, 2), cmdType: cmdType, storeType: req.StoreType, + diagInfo: diagInfo, }) i = nextI } @@ -861,11 +864,12 @@ func (worker *copIteratorWorker) handleCopResponse(bo *tikv.Backoffer, rpcCtx *t return nil, errors.Trace(err) } // We may meet RegionError at the first packet, but not during visiting the stream. - return buildCopTasks(bo, worker.store.GetRegionCache(), task.ranges, worker.req) + return buildCopTasks(bo, worker.store.GetRegionCache(), task.ranges, worker.req, task.diagInfo) } if lockErr := resp.pbResp.GetLocked(); lockErr != nil { - logutil.BgLogger().Debug("coprocessor encounters", - zap.Stringer("lock", lockErr)) + logutil.BgLogger().Debug("coprocessor encounters lock", + zap.Stringer("lock", lockErr), + zap.String("stmt", task.diagInfo.Stmt)) msBeforeExpired, err1 := worker.ResolveLocks(bo, worker.req.StartTs, []*tikv.Lock{tikv.NewLock(lockErr)}) if err1 != nil { return nil, errors.Trace(err1) @@ -879,10 +883,11 @@ func (worker *copIteratorWorker) handleCopResponse(bo *tikv.Backoffer, rpcCtx *t } if otherErr := resp.pbResp.GetOtherError(); otherErr != "" { err := errors.Errorf("other error: %s", otherErr) - logutil.BgLogger().Warn("other error", + logutil.Logger(bo.GetCtx()).Warn("other error", zap.Uint64("txnStartTS", worker.req.StartTs), zap.Uint64("regionID", task.region.GetID()), zap.String("storeAddr", task.storeAddr), + zap.String("stmt", task.diagInfo.Stmt), zap.Error(err)) return nil, errors.Trace(err) } @@ -1009,7 +1014,7 @@ func (worker *copIteratorWorker) buildCopTasksFromRemain(bo *tikv.Backoffer, las if worker.req.Streaming && lastRange != nil { remainedRanges = worker.calculateRemain(task.ranges, lastRange, worker.req.Desc) } - return buildCopTasks(bo, worker.store.GetRegionCache(), remainedRanges, worker.req) + return buildCopTasks(bo, worker.store.GetRegionCache(), remainedRanges, worker.req, task.diagInfo) } // calculateRemain splits the input ranges into two, and take one of them according to desc flag. diff --git a/store/copr/coprocessor_test.go b/store/copr/coprocessor_test.go index 4a3a020191230..2570515f373de 100644 --- a/store/copr/coprocessor_test.go +++ b/store/copr/coprocessor_test.go @@ -41,49 +41,50 @@ func (s *testCoprocessorSuite) TestBuildTasks(c *C) { req := &kv.Request{} flashReq := &kv.Request{} flashReq.StoreType = kv.TiFlash - tasks, err := buildCopTasks(bo, cache, buildCopRanges("a", "c"), req) + diagInfo := kv.DiagnosticInfo{} + tasks, err := buildCopTasks(bo, cache, buildCopRanges("a", "c"), req, diagInfo) c.Assert(err, IsNil) c.Assert(tasks, HasLen, 1) s.taskEqual(c, tasks[0], regionIDs[0], "a", "c") - tasks, err = buildCopTasks(bo, cache, buildCopRanges("a", "c"), flashReq) + tasks, err = buildCopTasks(bo, cache, buildCopRanges("a", "c"), flashReq, diagInfo) c.Assert(err, IsNil) c.Assert(tasks, HasLen, 1) s.taskEqual(c, tasks[0], regionIDs[0], "a", "c") - tasks, err = buildCopTasks(bo, cache, buildCopRanges("g", "n"), req) + tasks, err = buildCopTasks(bo, cache, buildCopRanges("g", "n"), req, diagInfo) c.Assert(err, IsNil) c.Assert(tasks, HasLen, 1) s.taskEqual(c, tasks[0], regionIDs[1], "g", "n") - tasks, err = buildCopTasks(bo, cache, buildCopRanges("g", "n"), flashReq) + tasks, err = buildCopTasks(bo, cache, buildCopRanges("g", "n"), flashReq, diagInfo) c.Assert(err, IsNil) c.Assert(tasks, HasLen, 1) s.taskEqual(c, tasks[0], regionIDs[1], "g", "n") - tasks, err = buildCopTasks(bo, cache, buildCopRanges("m", "n"), req) + tasks, err = buildCopTasks(bo, cache, buildCopRanges("m", "n"), req, diagInfo) c.Assert(err, IsNil) c.Assert(tasks, HasLen, 1) s.taskEqual(c, tasks[0], regionIDs[1], "m", "n") - tasks, err = buildCopTasks(bo, cache, buildCopRanges("m", "n"), flashReq) + tasks, err = buildCopTasks(bo, cache, buildCopRanges("m", "n"), flashReq, diagInfo) c.Assert(err, IsNil) c.Assert(tasks, HasLen, 1) s.taskEqual(c, tasks[0], regionIDs[1], "m", "n") - tasks, err = buildCopTasks(bo, cache, buildCopRanges("a", "k"), req) + tasks, err = buildCopTasks(bo, cache, buildCopRanges("a", "k"), req, diagInfo) c.Assert(err, IsNil) c.Assert(tasks, HasLen, 2) s.taskEqual(c, tasks[0], regionIDs[0], "a", "g") s.taskEqual(c, tasks[1], regionIDs[1], "g", "k") - tasks, err = buildCopTasks(bo, cache, buildCopRanges("a", "k"), flashReq) + tasks, err = buildCopTasks(bo, cache, buildCopRanges("a", "k"), flashReq, diagInfo) c.Assert(err, IsNil) c.Assert(tasks, HasLen, 2) s.taskEqual(c, tasks[0], regionIDs[0], "a", "g") s.taskEqual(c, tasks[1], regionIDs[1], "g", "k") - tasks, err = buildCopTasks(bo, cache, buildCopRanges("a", "x"), req) + tasks, err = buildCopTasks(bo, cache, buildCopRanges("a", "x"), req, diagInfo) c.Assert(err, IsNil) c.Assert(tasks, HasLen, 4) s.taskEqual(c, tasks[0], regionIDs[0], "a", "g") @@ -91,7 +92,7 @@ func (s *testCoprocessorSuite) TestBuildTasks(c *C) { s.taskEqual(c, tasks[2], regionIDs[2], "n", "t") s.taskEqual(c, tasks[3], regionIDs[3], "t", "x") - tasks, err = buildCopTasks(bo, cache, buildCopRanges("a", "x"), flashReq) + tasks, err = buildCopTasks(bo, cache, buildCopRanges("a", "x"), flashReq, diagInfo) c.Assert(err, IsNil) c.Assert(tasks, HasLen, 4) s.taskEqual(c, tasks[0], regionIDs[0], "a", "g") @@ -99,45 +100,45 @@ func (s *testCoprocessorSuite) TestBuildTasks(c *C) { s.taskEqual(c, tasks[2], regionIDs[2], "n", "t") s.taskEqual(c, tasks[3], regionIDs[3], "t", "x") - tasks, err = buildCopTasks(bo, cache, buildCopRanges("a", "b", "b", "c"), req) + tasks, err = buildCopTasks(bo, cache, buildCopRanges("a", "b", "b", "c"), req, diagInfo) c.Assert(err, IsNil) c.Assert(tasks, HasLen, 1) s.taskEqual(c, tasks[0], regionIDs[0], "a", "b", "b", "c") - tasks, err = buildCopTasks(bo, cache, buildCopRanges("a", "b", "b", "c"), flashReq) + tasks, err = buildCopTasks(bo, cache, buildCopRanges("a", "b", "b", "c"), flashReq, diagInfo) c.Assert(err, IsNil) c.Assert(tasks, HasLen, 1) s.taskEqual(c, tasks[0], regionIDs[0], "a", "b", "b", "c") - tasks, err = buildCopTasks(bo, cache, buildCopRanges("a", "b", "e", "f"), req) + tasks, err = buildCopTasks(bo, cache, buildCopRanges("a", "b", "e", "f"), req, diagInfo) c.Assert(err, IsNil) c.Assert(tasks, HasLen, 1) s.taskEqual(c, tasks[0], regionIDs[0], "a", "b", "e", "f") - tasks, err = buildCopTasks(bo, cache, buildCopRanges("a", "b", "e", "f"), flashReq) + tasks, err = buildCopTasks(bo, cache, buildCopRanges("a", "b", "e", "f"), flashReq, diagInfo) c.Assert(err, IsNil) c.Assert(tasks, HasLen, 1) s.taskEqual(c, tasks[0], regionIDs[0], "a", "b", "e", "f") - tasks, err = buildCopTasks(bo, cache, buildCopRanges("g", "n", "o", "p"), req) + tasks, err = buildCopTasks(bo, cache, buildCopRanges("g", "n", "o", "p"), req, diagInfo) c.Assert(err, IsNil) c.Assert(tasks, HasLen, 2) s.taskEqual(c, tasks[0], regionIDs[1], "g", "n") s.taskEqual(c, tasks[1], regionIDs[2], "o", "p") - tasks, err = buildCopTasks(bo, cache, buildCopRanges("g", "n", "o", "p"), flashReq) + tasks, err = buildCopTasks(bo, cache, buildCopRanges("g", "n", "o", "p"), flashReq, diagInfo) c.Assert(err, IsNil) c.Assert(tasks, HasLen, 2) s.taskEqual(c, tasks[0], regionIDs[1], "g", "n") s.taskEqual(c, tasks[1], regionIDs[2], "o", "p") - tasks, err = buildCopTasks(bo, cache, buildCopRanges("h", "k", "m", "p"), req) + tasks, err = buildCopTasks(bo, cache, buildCopRanges("h", "k", "m", "p"), req, diagInfo) c.Assert(err, IsNil) c.Assert(tasks, HasLen, 2) s.taskEqual(c, tasks[0], regionIDs[1], "h", "k", "m", "n") s.taskEqual(c, tasks[1], regionIDs[2], "n", "p") - tasks, err = buildCopTasks(bo, cache, buildCopRanges("h", "k", "m", "p"), flashReq) + tasks, err = buildCopTasks(bo, cache, buildCopRanges("h", "k", "m", "p"), flashReq, diagInfo) c.Assert(err, IsNil) c.Assert(tasks, HasLen, 2) s.taskEqual(c, tasks[0], regionIDs[1], "h", "k", "m", "n") @@ -208,7 +209,8 @@ func (s *testCoprocessorSuite) TestRebuild(c *C) { bo := tikv.NewBackofferWithVars(context.Background(), 3000, nil) req := &kv.Request{} - tasks, err := buildCopTasks(bo, cache, buildCopRanges("a", "z"), req) + diagInfo := kv.DiagnosticInfo{} + tasks, err := buildCopTasks(bo, cache, buildCopRanges("a", "z"), req, diagInfo) c.Assert(err, IsNil) c.Assert(tasks, HasLen, 2) s.taskEqual(c, tasks[0], regionIDs[0], "a", "m") @@ -222,7 +224,7 @@ func (s *testCoprocessorSuite) TestRebuild(c *C) { cache.InvalidateCachedRegion(tasks[1].region) req.Desc = true - tasks, err = buildCopTasks(bo, cache, buildCopRanges("a", "z"), req) + tasks, err = buildCopTasks(bo, cache, buildCopRanges("a", "z"), req, diagInfo) c.Assert(err, IsNil) c.Assert(tasks, HasLen, 3) s.taskEqual(c, tasks[2], regionIDs[0], "a", "m") diff --git a/util/mock/client.go b/util/mock/client.go index 56ec53336d2e4..9e34711f5569e 100644 --- a/util/mock/client.go +++ b/util/mock/client.go @@ -28,6 +28,6 @@ type Client struct { } // Send implement kv.Client interface. -func (c *Client) Send(ctx context.Context, req *kv.Request, kv *kv.Variables, sessionMemTracker *memory.Tracker, enabledRateLimit bool) kv.Response { +func (c *Client) Send(ctx context.Context, req *kv.Request, kv *kv.Variables, sessionMemTracker *memory.Tracker, enabledRateLimit bool, diagInfo kv.DiagnosticInfo) kv.Response { return c.MockResponse }