Skip to content

Commit

Permalink
cherry pick #27735 to release-5.1
Browse files Browse the repository at this point in the history
Signed-off-by: ti-srebot <[email protected]>
  • Loading branch information
MyonKeminta committed Sep 23, 2021
1 parent 1881af7 commit b8ba4f9
Show file tree
Hide file tree
Showing 6 changed files with 117 additions and 33 deletions.
19 changes: 16 additions & 3 deletions distsql/distsql.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,11 @@ import (
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/statistics"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/logutil"
"github.com/pingcap/tidb/util/memory"
"github.com/pingcap/tidb/util/trxevents"
"github.com/pingcap/tipb/go-tipb"
"go.uber.org/zap"
)

// Select sends a DAG request, returns SelectResult.
Expand All @@ -46,7 +49,17 @@ func Select(ctx context.Context, sctx sessionctx.Context, kvReq *kv.Request, fie
kvReq.Streaming = false
}
enabledRateLimitAction := sctx.GetSessionVars().EnabledRateLimitAction
resp := sctx.GetClient().Send(ctx, kvReq, sctx.GetSessionVars().KVVars, sctx.GetSessionVars().StmtCtx.MemTracker, enabledRateLimitAction)
originalSQL := sctx.GetSessionVars().StmtCtx.OriginalSQL
eventCb := func(event trxevents.TransactionEvent) {
// Note: Do not assume this callback will be invoked within the same goroutine.
if copMeetLock := event.GetCopMeetLock(); copMeetLock != nil {
logutil.Logger(ctx).Debug("coprocessor encounters lock",
zap.Uint64("startTS", kvReq.StartTs),
zap.Stringer("lock", copMeetLock.LockInfo),
zap.String("stmt", originalSQL))
}
}
resp := sctx.GetClient().Send(ctx, kvReq, sctx.GetSessionVars().KVVars, sctx.GetSessionVars().StmtCtx.MemTracker, enabledRateLimitAction, eventCb)
if resp == nil {
err := errors.New("client returns nil response")
return nil, err
Expand Down Expand Up @@ -107,7 +120,7 @@ func SelectWithRuntimeStats(ctx context.Context, sctx sessionctx.Context, kvReq
// Analyze do a analyze request.
func Analyze(ctx context.Context, client kv.Client, kvReq *kv.Request, vars *kv.Variables,
isRestrict bool, sessionMemTracker *memory.Tracker) (SelectResult, error) {
resp := client.Send(ctx, kvReq, vars, sessionMemTracker, false)
resp := client.Send(ctx, kvReq, vars, sessionMemTracker, false, nil)
if resp == nil {
return nil, errors.New("client returns nil response")
}
Expand All @@ -129,7 +142,7 @@ func Analyze(ctx context.Context, client kv.Client, kvReq *kv.Request, vars *kv.
func Checksum(ctx context.Context, client kv.Client, kvReq *kv.Request, vars *kv.Variables) (SelectResult, error) {
// FIXME: As BR have dependency of `Checksum` and TiDB also introduced BR as dependency, Currently we can't edit
// Checksum function signature. The two-way dependence should be removed in future.
resp := client.Send(ctx, kvReq, vars, nil, false)
resp := client.Send(ctx, kvReq, vars, nil, false, nil)
if resp == nil {
return nil, errors.New("client returns nil response")
}
Expand Down
3 changes: 2 additions & 1 deletion kv/kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/pingcap/tidb/store/tikv/oracle"
"github.com/pingcap/tidb/util/execdetails"
"github.com/pingcap/tidb/util/memory"
"github.com/pingcap/tidb/util/trxevents"
)

// Transaction options
Expand Down Expand Up @@ -256,7 +257,7 @@ type ReturnedValue struct {
// Client is used to send request to KV layer.
type Client interface {
// Send sends request to KV layer, returns a Response.
Send(ctx context.Context, req *Request, vars *Variables, sessionMemTracker *memory.Tracker, enabledRateLimitAction bool) Response
Send(ctx context.Context, req *Request, vars *Variables, sessionMemTracker *memory.Tracker, enabledRateLimitAction bool, eventCb trxevents.EventCallback) Response

// IsRequestTypeSupported checks if reqType and subType is supported.
IsRequestTypeSupported(reqType, subType int64) bool
Expand Down
27 changes: 19 additions & 8 deletions store/tikv/coprocessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ import (
"github.com/pingcap/tidb/util/execdetails"
"github.com/pingcap/tidb/util/logutil"
"github.com/pingcap/tidb/util/memory"
"github.com/pingcap/tidb/util/trxevents"
"github.com/pingcap/tipb/go-tipb"
"go.uber.org/zap"
)
Expand All @@ -59,14 +60,14 @@ type CopClient struct {
}

// Send builds the request and gets the coprocessor iterator response.
func (c *CopClient) Send(ctx context.Context, req *kv.Request, vars *kv.Variables, sessionMemTracker *memory.Tracker, enabledRateLimitAction bool) kv.Response {
func (c *CopClient) Send(ctx context.Context, req *kv.Request, vars *kv.Variables, sessionMemTracker *memory.Tracker, enabledRateLimitAction bool, eventCb trxevents.EventCallback) kv.Response {
if req.StoreType == kv.TiFlash && req.BatchCop {
logutil.BgLogger().Debug("send batch requests")
return c.sendBatch(ctx, req, vars)
}
ctx = context.WithValue(ctx, txnStartKey, req.StartTs)
bo := NewBackofferWithVars(ctx, copBuildTaskMaxBackoff, vars)
tasks, err := buildCopTasks(bo, c.store.regionCache, &copRanges{mid: req.KeyRanges}, req)
tasks, err := buildCopTasks(bo, c.store.regionCache, &copRanges{mid: req.KeyRanges}, req, eventCb)
if err != nil {
return copErrorResponse{err}
}
Expand Down Expand Up @@ -125,6 +126,8 @@ type copTask struct {
storeAddr string
cmdType tikvrpc.CmdType
storeType kv.StoreType

eventCb trxevents.EventCallback
}

func (r *copTask) String() string {
Expand Down Expand Up @@ -245,7 +248,7 @@ func (r *copRanges) split(key []byte) (*copRanges, *copRanges) {
// rangesPerTask limits the length of the ranges slice sent in one copTask.
const rangesPerTask = 25000

func buildCopTasks(bo *Backoffer, cache *RegionCache, ranges *copRanges, req *kv.Request) ([]*copTask, error) {
func buildCopTasks(bo *Backoffer, cache *RegionCache, ranges *copRanges, req *kv.Request, eventCb trxevents.EventCallback) ([]*copTask, error) {
start := time.Now()
cmdType := tikvrpc.CmdCop
if req.Streaming {
Expand All @@ -272,6 +275,7 @@ func buildCopTasks(bo *Backoffer, cache *RegionCache, ranges *copRanges, req *kv
respChan: make(chan *copResponse, 2),
cmdType: cmdType,
storeType: req.StoreType,
eventCb: eventCb,
})
i = nextI
}
Expand Down Expand Up @@ -1101,11 +1105,18 @@ func (worker *copIteratorWorker) handleCopResponse(bo *Backoffer, rpcCtx *RPCCon
return nil, errors.Trace(err)
}
// We may meet RegionError at the first packet, but not during visiting the stream.
return buildCopTasks(bo, worker.store.regionCache, task.ranges, worker.req)
return buildCopTasks(bo, worker.store.regionCache, task.ranges, worker.req, task.eventCb)
}
if lockErr := resp.pbResp.GetLocked(); lockErr != nil {
logutil.BgLogger().Debug("coprocessor encounters",
zap.Stringer("lock", lockErr))
// Be care that we didn't redact the SQL statement because the log is DEBUG level.
if task.eventCb != nil {
task.eventCb(trxevents.WrapCopMeetLock(&trxevents.CopMeetLock{
LockInfo: lockErr,
}))
} else {
logutil.Logger(bo.ctx).Debug("coprocessor encounters lock",
zap.Stringer("lock", lockErr))
}
msBeforeExpired, err1 := worker.ResolveLocks(bo, worker.req.StartTs, []*Lock{NewLock(lockErr)})
if err1 != nil {
return nil, errors.Trace(err1)
Expand All @@ -1119,7 +1130,7 @@ func (worker *copIteratorWorker) handleCopResponse(bo *Backoffer, rpcCtx *RPCCon
}
if otherErr := resp.pbResp.GetOtherError(); otherErr != "" {
err := errors.Errorf("other error: %s", otherErr)
logutil.BgLogger().Warn("other error",
logutil.Logger(bo.ctx).Warn("other error",
zap.Uint64("txnStartTS", worker.req.StartTs),
zap.Uint64("regionID", task.region.id),
zap.String("storeAddr", task.storeAddr),
Expand Down Expand Up @@ -1248,7 +1259,7 @@ func (worker *copIteratorWorker) buildCopTasksFromRemain(bo *Backoffer, lastRang
if worker.req.Streaming && lastRange != nil {
remainedRanges = worker.calculateRemain(task.ranges, lastRange, worker.req.Desc)
}
return buildCopTasks(bo, worker.store.regionCache, remainedRanges, worker.req)
return buildCopTasks(bo, worker.store.regionCache, remainedRanges, worker.req, task.eventCb)
}

// calculateRemain splits the input ranges into two, and take one of them according to desc flag.
Expand Down
40 changes: 20 additions & 20 deletions store/tikv/coprocessor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,103 +42,103 @@ func (s *testCoprocessorSuite) TestBuildTasks(c *C) {
req := &kv.Request{}
flashReq := &kv.Request{}
flashReq.StoreType = kv.TiFlash
tasks, err := buildCopTasks(bo, cache, buildCopRanges("a", "c"), req)
tasks, err := buildCopTasks(bo, cache, buildCopRanges("a", "c"), req, nil)
c.Assert(err, IsNil)
c.Assert(tasks, HasLen, 1)
s.taskEqual(c, tasks[0], regionIDs[0], "a", "c")

tasks, err = buildCopTasks(bo, cache, buildCopRanges("a", "c"), flashReq)
tasks, err = buildCopTasks(bo, cache, buildCopRanges("a", "c"), flashReq, nil)
c.Assert(err, IsNil)
c.Assert(tasks, HasLen, 1)
s.taskEqual(c, tasks[0], regionIDs[0], "a", "c")

tasks, err = buildCopTasks(bo, cache, buildCopRanges("g", "n"), req)
tasks, err = buildCopTasks(bo, cache, buildCopRanges("g", "n"), req, nil)
c.Assert(err, IsNil)
c.Assert(tasks, HasLen, 1)
s.taskEqual(c, tasks[0], regionIDs[1], "g", "n")

tasks, err = buildCopTasks(bo, cache, buildCopRanges("g", "n"), flashReq)
tasks, err = buildCopTasks(bo, cache, buildCopRanges("g", "n"), flashReq, nil)
c.Assert(err, IsNil)
c.Assert(tasks, HasLen, 1)
s.taskEqual(c, tasks[0], regionIDs[1], "g", "n")

tasks, err = buildCopTasks(bo, cache, buildCopRanges("m", "n"), req)
tasks, err = buildCopTasks(bo, cache, buildCopRanges("m", "n"), req, nil)
c.Assert(err, IsNil)
c.Assert(tasks, HasLen, 1)
s.taskEqual(c, tasks[0], regionIDs[1], "m", "n")

tasks, err = buildCopTasks(bo, cache, buildCopRanges("m", "n"), flashReq)
tasks, err = buildCopTasks(bo, cache, buildCopRanges("m", "n"), flashReq, nil)
c.Assert(err, IsNil)
c.Assert(tasks, HasLen, 1)
s.taskEqual(c, tasks[0], regionIDs[1], "m", "n")

tasks, err = buildCopTasks(bo, cache, buildCopRanges("a", "k"), req)
tasks, err = buildCopTasks(bo, cache, buildCopRanges("a", "k"), req, nil)
c.Assert(err, IsNil)
c.Assert(tasks, HasLen, 2)
s.taskEqual(c, tasks[0], regionIDs[0], "a", "g")
s.taskEqual(c, tasks[1], regionIDs[1], "g", "k")

tasks, err = buildCopTasks(bo, cache, buildCopRanges("a", "k"), flashReq)
tasks, err = buildCopTasks(bo, cache, buildCopRanges("a", "k"), flashReq, nil)
c.Assert(err, IsNil)
c.Assert(tasks, HasLen, 2)
s.taskEqual(c, tasks[0], regionIDs[0], "a", "g")
s.taskEqual(c, tasks[1], regionIDs[1], "g", "k")

tasks, err = buildCopTasks(bo, cache, buildCopRanges("a", "x"), req)
tasks, err = buildCopTasks(bo, cache, buildCopRanges("a", "x"), req, nil)
c.Assert(err, IsNil)
c.Assert(tasks, HasLen, 4)
s.taskEqual(c, tasks[0], regionIDs[0], "a", "g")
s.taskEqual(c, tasks[1], regionIDs[1], "g", "n")
s.taskEqual(c, tasks[2], regionIDs[2], "n", "t")
s.taskEqual(c, tasks[3], regionIDs[3], "t", "x")

tasks, err = buildCopTasks(bo, cache, buildCopRanges("a", "x"), flashReq)
tasks, err = buildCopTasks(bo, cache, buildCopRanges("a", "x"), flashReq, nil)
c.Assert(err, IsNil)
c.Assert(tasks, HasLen, 4)
s.taskEqual(c, tasks[0], regionIDs[0], "a", "g")
s.taskEqual(c, tasks[1], regionIDs[1], "g", "n")
s.taskEqual(c, tasks[2], regionIDs[2], "n", "t")
s.taskEqual(c, tasks[3], regionIDs[3], "t", "x")

tasks, err = buildCopTasks(bo, cache, buildCopRanges("a", "b", "b", "c"), req)
tasks, err = buildCopTasks(bo, cache, buildCopRanges("a", "b", "b", "c"), req, nil)
c.Assert(err, IsNil)
c.Assert(tasks, HasLen, 1)
s.taskEqual(c, tasks[0], regionIDs[0], "a", "b", "b", "c")

tasks, err = buildCopTasks(bo, cache, buildCopRanges("a", "b", "b", "c"), flashReq)
tasks, err = buildCopTasks(bo, cache, buildCopRanges("a", "b", "b", "c"), flashReq, nil)
c.Assert(err, IsNil)
c.Assert(tasks, HasLen, 1)
s.taskEqual(c, tasks[0], regionIDs[0], "a", "b", "b", "c")

tasks, err = buildCopTasks(bo, cache, buildCopRanges("a", "b", "e", "f"), req)
tasks, err = buildCopTasks(bo, cache, buildCopRanges("a", "b", "e", "f"), req, nil)
c.Assert(err, IsNil)
c.Assert(tasks, HasLen, 1)
s.taskEqual(c, tasks[0], regionIDs[0], "a", "b", "e", "f")

tasks, err = buildCopTasks(bo, cache, buildCopRanges("a", "b", "e", "f"), flashReq)
tasks, err = buildCopTasks(bo, cache, buildCopRanges("a", "b", "e", "f"), flashReq, nil)
c.Assert(err, IsNil)
c.Assert(tasks, HasLen, 1)
s.taskEqual(c, tasks[0], regionIDs[0], "a", "b", "e", "f")

tasks, err = buildCopTasks(bo, cache, buildCopRanges("g", "n", "o", "p"), req)
tasks, err = buildCopTasks(bo, cache, buildCopRanges("g", "n", "o", "p"), req, nil)
c.Assert(err, IsNil)
c.Assert(tasks, HasLen, 2)
s.taskEqual(c, tasks[0], regionIDs[1], "g", "n")
s.taskEqual(c, tasks[1], regionIDs[2], "o", "p")

tasks, err = buildCopTasks(bo, cache, buildCopRanges("g", "n", "o", "p"), flashReq)
tasks, err = buildCopTasks(bo, cache, buildCopRanges("g", "n", "o", "p"), flashReq, nil)
c.Assert(err, IsNil)
c.Assert(tasks, HasLen, 2)
s.taskEqual(c, tasks[0], regionIDs[1], "g", "n")
s.taskEqual(c, tasks[1], regionIDs[2], "o", "p")

tasks, err = buildCopTasks(bo, cache, buildCopRanges("h", "k", "m", "p"), req)
tasks, err = buildCopTasks(bo, cache, buildCopRanges("h", "k", "m", "p"), req, nil)
c.Assert(err, IsNil)
c.Assert(tasks, HasLen, 2)
s.taskEqual(c, tasks[0], regionIDs[1], "h", "k", "m", "n")
s.taskEqual(c, tasks[1], regionIDs[2], "n", "p")

tasks, err = buildCopTasks(bo, cache, buildCopRanges("h", "k", "m", "p"), flashReq)
tasks, err = buildCopTasks(bo, cache, buildCopRanges("h", "k", "m", "p"), flashReq, nil)
c.Assert(err, IsNil)
c.Assert(tasks, HasLen, 2)
s.taskEqual(c, tasks[0], regionIDs[1], "h", "k", "m", "n")
Expand Down Expand Up @@ -209,7 +209,7 @@ func (s *testCoprocessorSuite) TestRebuild(c *C) {
bo := NewBackofferWithVars(context.Background(), 3000, nil)

req := &kv.Request{}
tasks, err := buildCopTasks(bo, cache, buildCopRanges("a", "z"), req)
tasks, err := buildCopTasks(bo, cache, buildCopRanges("a", "z"), req, nil)
c.Assert(err, IsNil)
c.Assert(tasks, HasLen, 2)
s.taskEqual(c, tasks[0], regionIDs[0], "a", "m")
Expand All @@ -223,7 +223,7 @@ func (s *testCoprocessorSuite) TestRebuild(c *C) {
cache.InvalidateCachedRegion(tasks[1].region)

req.Desc = true
tasks, err = buildCopTasks(bo, cache, buildCopRanges("a", "z"), req)
tasks, err = buildCopTasks(bo, cache, buildCopRanges("a", "z"), req, nil)
c.Assert(err, IsNil)
c.Assert(tasks, HasLen, 3)
s.taskEqual(c, tasks[2], regionIDs[0], "a", "m")
Expand Down
3 changes: 2 additions & 1 deletion util/mock/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (

"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/util/memory"
"github.com/pingcap/tidb/util/trxevents"
)

// Client implement kv.Client interface, mocked from "CopClient" defined in
Expand All @@ -28,6 +29,6 @@ type Client struct {
}

// Send implement kv.Client interface.
func (c *Client) Send(ctx context.Context, req *kv.Request, kv *kv.Variables, sessionMemTracker *memory.Tracker, enabledRateLimit bool) kv.Response {
func (c *Client) Send(ctx context.Context, req *kv.Request, kv *kv.Variables, sessionMemTracker *memory.Tracker, enabledRateLimit bool, eventCb trxevents.EventCallback) kv.Response {
return c.MockResponse
}
58 changes: 58 additions & 0 deletions util/trxevents/trx_events.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
// Copyright 2021 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package trxevents

import (
"github.com/pingcap/kvproto/pkg/kvrpcpb"
)

// EventType represents the type of a transaction event.
type EventType = int

const (
// EventTypeCopMeetLock stands for the CopMeetLock event type.
EventTypeCopMeetLock = iota
)

// CopMeetLock represents an event that coprocessor reading encounters lock.
type CopMeetLock struct {
LockInfo *kvrpcpb.LockInfo
}

// TransactionEvent represents a transaction event that may belong to any of the possible types.
type TransactionEvent struct {
eventType EventType
inner interface{}
}

// GetCopMeetLock tries to extract the inner CopMeetLock event from a TransactionEvent. Returns nil if it's not a
// CopMeetLock event.
func (e TransactionEvent) GetCopMeetLock() *CopMeetLock {
if e.eventType == EventTypeCopMeetLock {
return e.inner.(*CopMeetLock)
}
return nil
}

// WrapCopMeetLock wraps a CopMeetLock event into a TransactionEvent object.
func WrapCopMeetLock(copMeetLock *CopMeetLock) TransactionEvent {
return TransactionEvent{
eventType: EventTypeCopMeetLock,
inner: copMeetLock,
}
}

// EventCallback is the callback type that handles `TransactionEvent`s.
type EventCallback = func(event TransactionEvent)

0 comments on commit b8ba4f9

Please sign in to comment.