Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*: Log SQL statement when coprocessor encounteres lock #27737

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions distsql/distsql.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,8 @@ func Select(ctx context.Context, sctx sessionctx.Context, kvReq *kv.Request, fie
kvReq.Streaming = false
}
enabledRateLimitAction := sctx.GetSessionVars().EnabledRateLimitAction
resp := sctx.GetClient().Send(ctx, kvReq, sctx.GetSessionVars().KVVars, sctx.GetSessionVars().StmtCtx.MemTracker, enabledRateLimitAction)
diagInfo := kv.DiagnosticInfo{Stmt: sctx.GetSessionVars().StmtCtx.OriginalSQL}
resp := sctx.GetClient().Send(ctx, kvReq, sctx.GetSessionVars().KVVars, sctx.GetSessionVars().StmtCtx.MemTracker, enabledRateLimitAction, diagInfo)
if resp == nil {
err := errors.New("client returns nil response")
return nil, err
Expand Down Expand Up @@ -136,7 +137,7 @@ func SelectWithRuntimeStats(ctx context.Context, sctx sessionctx.Context, kvReq
// Analyze do a analyze request.
func Analyze(ctx context.Context, client kv.Client, kvReq *kv.Request, vars *kv.Variables,
isRestrict bool, sessionMemTracker *memory.Tracker) (SelectResult, error) {
resp := client.Send(ctx, kvReq, vars, sessionMemTracker, false)
resp := client.Send(ctx, kvReq, vars, sessionMemTracker, false, kv.DiagnosticInfo{Stmt: "/* analyze */"})
if resp == nil {
return nil, errors.New("client returns nil response")
}
Expand All @@ -159,7 +160,7 @@ func Analyze(ctx context.Context, client kv.Client, kvReq *kv.Request, vars *kv.
func Checksum(ctx context.Context, client kv.Client, kvReq *kv.Request, vars *kv.Variables) (SelectResult, error) {
// FIXME: As BR have dependency of `Checksum` and TiDB also introduced BR as dependency, Currently we can't edit
// Checksum function signature. The two-way dependence should be removed in future.
resp := client.Send(ctx, kvReq, vars, nil, false)
resp := client.Send(ctx, kvReq, vars, nil, false, kv.DiagnosticInfo{Stmt: "/* checksum */"})
if resp == nil {
return nil, errors.New("client returns nil response")
}
Expand Down
3 changes: 2 additions & 1 deletion executor/simple.go
Original file line number Diff line number Diff line change
Expand Up @@ -1286,7 +1286,8 @@ func killRemoteConn(ctx context.Context, sctx sessionctx.Context, connID *util.G
return err
}

resp := sctx.GetClient().Send(ctx, kvReq, sctx.GetSessionVars().KVVars, sctx.GetSessionVars().StmtCtx.MemTracker, false)
diagInfo := kv.DiagnosticInfo{Stmt: sctx.GetSessionVars().StmtCtx.OriginalSQL}
resp := sctx.GetClient().Send(ctx, kvReq, sctx.GetSessionVars().KVVars, sctx.GetSessionVars().StmtCtx.MemTracker, false, diagInfo)
if resp == nil {
err := errors.New("client returns nil response")
return err
Expand Down
7 changes: 6 additions & 1 deletion kv/kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -326,10 +326,15 @@ type ReturnedValue struct {
AlreadyLocked bool
}

// DiagnosticInfo is used to pass information to lower layers for diagnostic purposes.
type DiagnosticInfo struct {
Stmt string
}

// Client is used to send request to KV layer.
type Client interface {
// Send sends request to KV layer, returns a Response.
Send(ctx context.Context, req *Request, vars *Variables, sessionMemTracker *memory.Tracker, enabledRateLimitAction bool) Response
Send(ctx context.Context, req *Request, vars *Variables, sessionMemTracker *memory.Tracker, enabledRateLimitAction bool, diagInfo DiagnosticInfo) Response

// IsRequestTypeSupported checks if reqType and subType is supported.
IsRequestTypeSupported(reqType, subType int64) bool
Expand Down
21 changes: 13 additions & 8 deletions store/copr/coprocessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,14 +62,14 @@ type CopClient struct {
}

// Send builds the request and gets the coprocessor iterator response.
func (c *CopClient) Send(ctx context.Context, req *kv.Request, vars *kv.Variables, sessionMemTracker *memory.Tracker, enabledRateLimitAction bool) kv.Response {
func (c *CopClient) Send(ctx context.Context, req *kv.Request, vars *kv.Variables, sessionMemTracker *memory.Tracker, enabledRateLimitAction bool, diagInfo kv.DiagnosticInfo) kv.Response {
if req.StoreType == kv.TiFlash && req.BatchCop {
logutil.BgLogger().Debug("send batch requests")
return c.sendBatch(ctx, req, vars)
}
ctx = context.WithValue(ctx, tikv.TxnStartKey, req.StartTs)
bo := tikv.NewBackofferWithVars(ctx, copBuildTaskMaxBackoff, vars)
tasks, err := buildCopTasks(bo, c.store.GetRegionCache(), tikv.NewKeyRanges(req.KeyRanges), req)
tasks, err := buildCopTasks(bo, c.store.GetRegionCache(), tikv.NewKeyRanges(req.KeyRanges), req, diagInfo)
if err != nil {
return copErrorResponse{err}
}
Expand Down Expand Up @@ -128,6 +128,8 @@ type copTask struct {
storeAddr string
cmdType tikvrpc.CmdType
storeType kv.StoreType

diagInfo kv.DiagnosticInfo
}

func (r *copTask) String() string {
Expand All @@ -138,7 +140,7 @@ func (r *copTask) String() string {
// rangesPerTask limits the length of the ranges slice sent in one copTask.
const rangesPerTask = 25000

func buildCopTasks(bo *tikv.Backoffer, cache *tikv.RegionCache, ranges *tikv.KeyRanges, req *kv.Request) ([]*copTask, error) {
func buildCopTasks(bo *tikv.Backoffer, cache *tikv.RegionCache, ranges *tikv.KeyRanges, req *kv.Request, diagInfo kv.DiagnosticInfo) ([]*copTask, error) {
start := time.Now()
cmdType := tikvrpc.CmdCop
if req.Streaming {
Expand All @@ -165,6 +167,7 @@ func buildCopTasks(bo *tikv.Backoffer, cache *tikv.RegionCache, ranges *tikv.Key
respChan: make(chan *copResponse, 2),
cmdType: cmdType,
storeType: req.StoreType,
diagInfo: diagInfo,
})
i = nextI
}
Expand Down Expand Up @@ -861,11 +864,12 @@ func (worker *copIteratorWorker) handleCopResponse(bo *tikv.Backoffer, rpcCtx *t
return nil, errors.Trace(err)
}
// We may meet RegionError at the first packet, but not during visiting the stream.
return buildCopTasks(bo, worker.store.GetRegionCache(), task.ranges, worker.req)
return buildCopTasks(bo, worker.store.GetRegionCache(), task.ranges, worker.req, task.diagInfo)
}
if lockErr := resp.pbResp.GetLocked(); lockErr != nil {
logutil.BgLogger().Debug("coprocessor encounters",
zap.Stringer("lock", lockErr))
logutil.BgLogger().Debug("coprocessor encounters lock",
zap.Stringer("lock", lockErr),
zap.String("stmt", task.diagInfo.Stmt))
msBeforeExpired, err1 := worker.ResolveLocks(bo, worker.req.StartTs, []*tikv.Lock{tikv.NewLock(lockErr)})
if err1 != nil {
return nil, errors.Trace(err1)
Expand All @@ -879,10 +883,11 @@ func (worker *copIteratorWorker) handleCopResponse(bo *tikv.Backoffer, rpcCtx *t
}
if otherErr := resp.pbResp.GetOtherError(); otherErr != "" {
err := errors.Errorf("other error: %s", otherErr)
logutil.BgLogger().Warn("other error",
logutil.Logger(bo.GetCtx()).Warn("other error",
zap.Uint64("txnStartTS", worker.req.StartTs),
zap.Uint64("regionID", task.region.GetID()),
zap.String("storeAddr", task.storeAddr),
zap.String("stmt", task.diagInfo.Stmt),
zap.Error(err))
return nil, errors.Trace(err)
}
Expand Down Expand Up @@ -1009,7 +1014,7 @@ func (worker *copIteratorWorker) buildCopTasksFromRemain(bo *tikv.Backoffer, las
if worker.req.Streaming && lastRange != nil {
remainedRanges = worker.calculateRemain(task.ranges, lastRange, worker.req.Desc)
}
return buildCopTasks(bo, worker.store.GetRegionCache(), remainedRanges, worker.req)
return buildCopTasks(bo, worker.store.GetRegionCache(), remainedRanges, worker.req, task.diagInfo)
}

// calculateRemain splits the input ranges into two, and take one of them according to desc flag.
Expand Down
42 changes: 22 additions & 20 deletions store/copr/coprocessor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,103 +41,104 @@ func (s *testCoprocessorSuite) TestBuildTasks(c *C) {
req := &kv.Request{}
flashReq := &kv.Request{}
flashReq.StoreType = kv.TiFlash
tasks, err := buildCopTasks(bo, cache, buildCopRanges("a", "c"), req)
diagInfo := kv.DiagnosticInfo{}
tasks, err := buildCopTasks(bo, cache, buildCopRanges("a", "c"), req, diagInfo)
c.Assert(err, IsNil)
c.Assert(tasks, HasLen, 1)
s.taskEqual(c, tasks[0], regionIDs[0], "a", "c")

tasks, err = buildCopTasks(bo, cache, buildCopRanges("a", "c"), flashReq)
tasks, err = buildCopTasks(bo, cache, buildCopRanges("a", "c"), flashReq, diagInfo)
c.Assert(err, IsNil)
c.Assert(tasks, HasLen, 1)
s.taskEqual(c, tasks[0], regionIDs[0], "a", "c")

tasks, err = buildCopTasks(bo, cache, buildCopRanges("g", "n"), req)
tasks, err = buildCopTasks(bo, cache, buildCopRanges("g", "n"), req, diagInfo)
c.Assert(err, IsNil)
c.Assert(tasks, HasLen, 1)
s.taskEqual(c, tasks[0], regionIDs[1], "g", "n")

tasks, err = buildCopTasks(bo, cache, buildCopRanges("g", "n"), flashReq)
tasks, err = buildCopTasks(bo, cache, buildCopRanges("g", "n"), flashReq, diagInfo)
c.Assert(err, IsNil)
c.Assert(tasks, HasLen, 1)
s.taskEqual(c, tasks[0], regionIDs[1], "g", "n")

tasks, err = buildCopTasks(bo, cache, buildCopRanges("m", "n"), req)
tasks, err = buildCopTasks(bo, cache, buildCopRanges("m", "n"), req, diagInfo)
c.Assert(err, IsNil)
c.Assert(tasks, HasLen, 1)
s.taskEqual(c, tasks[0], regionIDs[1], "m", "n")

tasks, err = buildCopTasks(bo, cache, buildCopRanges("m", "n"), flashReq)
tasks, err = buildCopTasks(bo, cache, buildCopRanges("m", "n"), flashReq, diagInfo)
c.Assert(err, IsNil)
c.Assert(tasks, HasLen, 1)
s.taskEqual(c, tasks[0], regionIDs[1], "m", "n")

tasks, err = buildCopTasks(bo, cache, buildCopRanges("a", "k"), req)
tasks, err = buildCopTasks(bo, cache, buildCopRanges("a", "k"), req, diagInfo)
c.Assert(err, IsNil)
c.Assert(tasks, HasLen, 2)
s.taskEqual(c, tasks[0], regionIDs[0], "a", "g")
s.taskEqual(c, tasks[1], regionIDs[1], "g", "k")

tasks, err = buildCopTasks(bo, cache, buildCopRanges("a", "k"), flashReq)
tasks, err = buildCopTasks(bo, cache, buildCopRanges("a", "k"), flashReq, diagInfo)
c.Assert(err, IsNil)
c.Assert(tasks, HasLen, 2)
s.taskEqual(c, tasks[0], regionIDs[0], "a", "g")
s.taskEqual(c, tasks[1], regionIDs[1], "g", "k")

tasks, err = buildCopTasks(bo, cache, buildCopRanges("a", "x"), req)
tasks, err = buildCopTasks(bo, cache, buildCopRanges("a", "x"), req, diagInfo)
c.Assert(err, IsNil)
c.Assert(tasks, HasLen, 4)
s.taskEqual(c, tasks[0], regionIDs[0], "a", "g")
s.taskEqual(c, tasks[1], regionIDs[1], "g", "n")
s.taskEqual(c, tasks[2], regionIDs[2], "n", "t")
s.taskEqual(c, tasks[3], regionIDs[3], "t", "x")

tasks, err = buildCopTasks(bo, cache, buildCopRanges("a", "x"), flashReq)
tasks, err = buildCopTasks(bo, cache, buildCopRanges("a", "x"), flashReq, diagInfo)
c.Assert(err, IsNil)
c.Assert(tasks, HasLen, 4)
s.taskEqual(c, tasks[0], regionIDs[0], "a", "g")
s.taskEqual(c, tasks[1], regionIDs[1], "g", "n")
s.taskEqual(c, tasks[2], regionIDs[2], "n", "t")
s.taskEqual(c, tasks[3], regionIDs[3], "t", "x")

tasks, err = buildCopTasks(bo, cache, buildCopRanges("a", "b", "b", "c"), req)
tasks, err = buildCopTasks(bo, cache, buildCopRanges("a", "b", "b", "c"), req, diagInfo)
c.Assert(err, IsNil)
c.Assert(tasks, HasLen, 1)
s.taskEqual(c, tasks[0], regionIDs[0], "a", "b", "b", "c")

tasks, err = buildCopTasks(bo, cache, buildCopRanges("a", "b", "b", "c"), flashReq)
tasks, err = buildCopTasks(bo, cache, buildCopRanges("a", "b", "b", "c"), flashReq, diagInfo)
c.Assert(err, IsNil)
c.Assert(tasks, HasLen, 1)
s.taskEqual(c, tasks[0], regionIDs[0], "a", "b", "b", "c")

tasks, err = buildCopTasks(bo, cache, buildCopRanges("a", "b", "e", "f"), req)
tasks, err = buildCopTasks(bo, cache, buildCopRanges("a", "b", "e", "f"), req, diagInfo)
c.Assert(err, IsNil)
c.Assert(tasks, HasLen, 1)
s.taskEqual(c, tasks[0], regionIDs[0], "a", "b", "e", "f")

tasks, err = buildCopTasks(bo, cache, buildCopRanges("a", "b", "e", "f"), flashReq)
tasks, err = buildCopTasks(bo, cache, buildCopRanges("a", "b", "e", "f"), flashReq, diagInfo)
c.Assert(err, IsNil)
c.Assert(tasks, HasLen, 1)
s.taskEqual(c, tasks[0], regionIDs[0], "a", "b", "e", "f")

tasks, err = buildCopTasks(bo, cache, buildCopRanges("g", "n", "o", "p"), req)
tasks, err = buildCopTasks(bo, cache, buildCopRanges("g", "n", "o", "p"), req, diagInfo)
c.Assert(err, IsNil)
c.Assert(tasks, HasLen, 2)
s.taskEqual(c, tasks[0], regionIDs[1], "g", "n")
s.taskEqual(c, tasks[1], regionIDs[2], "o", "p")

tasks, err = buildCopTasks(bo, cache, buildCopRanges("g", "n", "o", "p"), flashReq)
tasks, err = buildCopTasks(bo, cache, buildCopRanges("g", "n", "o", "p"), flashReq, diagInfo)
c.Assert(err, IsNil)
c.Assert(tasks, HasLen, 2)
s.taskEqual(c, tasks[0], regionIDs[1], "g", "n")
s.taskEqual(c, tasks[1], regionIDs[2], "o", "p")

tasks, err = buildCopTasks(bo, cache, buildCopRanges("h", "k", "m", "p"), req)
tasks, err = buildCopTasks(bo, cache, buildCopRanges("h", "k", "m", "p"), req, diagInfo)
c.Assert(err, IsNil)
c.Assert(tasks, HasLen, 2)
s.taskEqual(c, tasks[0], regionIDs[1], "h", "k", "m", "n")
s.taskEqual(c, tasks[1], regionIDs[2], "n", "p")

tasks, err = buildCopTasks(bo, cache, buildCopRanges("h", "k", "m", "p"), flashReq)
tasks, err = buildCopTasks(bo, cache, buildCopRanges("h", "k", "m", "p"), flashReq, diagInfo)
c.Assert(err, IsNil)
c.Assert(tasks, HasLen, 2)
s.taskEqual(c, tasks[0], regionIDs[1], "h", "k", "m", "n")
Expand Down Expand Up @@ -208,7 +209,8 @@ func (s *testCoprocessorSuite) TestRebuild(c *C) {
bo := tikv.NewBackofferWithVars(context.Background(), 3000, nil)

req := &kv.Request{}
tasks, err := buildCopTasks(bo, cache, buildCopRanges("a", "z"), req)
diagInfo := kv.DiagnosticInfo{}
tasks, err := buildCopTasks(bo, cache, buildCopRanges("a", "z"), req, diagInfo)
c.Assert(err, IsNil)
c.Assert(tasks, HasLen, 2)
s.taskEqual(c, tasks[0], regionIDs[0], "a", "m")
Expand All @@ -222,7 +224,7 @@ func (s *testCoprocessorSuite) TestRebuild(c *C) {
cache.InvalidateCachedRegion(tasks[1].region)

req.Desc = true
tasks, err = buildCopTasks(bo, cache, buildCopRanges("a", "z"), req)
tasks, err = buildCopTasks(bo, cache, buildCopRanges("a", "z"), req, diagInfo)
c.Assert(err, IsNil)
c.Assert(tasks, HasLen, 3)
s.taskEqual(c, tasks[2], regionIDs[0], "a", "m")
Expand Down
2 changes: 1 addition & 1 deletion util/mock/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,6 @@ type Client struct {
}

// Send implement kv.Client interface.
func (c *Client) Send(ctx context.Context, req *kv.Request, kv *kv.Variables, sessionMemTracker *memory.Tracker, enabledRateLimit bool) kv.Response {
func (c *Client) Send(ctx context.Context, req *kv.Request, kv *kv.Variables, sessionMemTracker *memory.Tracker, enabledRateLimit bool, diagInfo kv.DiagnosticInfo) kv.Response {
return c.MockResponse
}