Skip to content

Commit

Permalink
tikv: refine commit backoff slow log (#11757) (#12335)
Browse files Browse the repository at this point in the history
  • Loading branch information
lysu authored and sre-bot committed Sep 24, 2019
1 parent 99bc1a3 commit 0106711
Show file tree
Hide file tree
Showing 5 changed files with 89 additions and 28 deletions.
57 changes: 44 additions & 13 deletions store/tikv/2pc.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"sync"
"sync/atomic"
"time"
"unsafe"

"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
Expand Down Expand Up @@ -95,7 +96,7 @@ type twoPhaseCommitter struct {
// We use it to guarantee GC worker will not influence any active txn. The value
// should be less than GC life time.
maxTxnTimeUse uint64
detail *execdetails.CommitDetails
detail unsafe.Pointer
primaryKey []byte
forUpdateTS uint64
pessimisticTTL uint64
Expand Down Expand Up @@ -273,7 +274,7 @@ func (c *twoPhaseCommitter) initKeysAndMutations() error {
c.lockTTL = txnLockTTL(txn.startTime, size)
c.priority = getTxnPriority(txn)
c.syncLog = getTxnSyncLog(txn)
c.detail = commitDetail
c.setDetail(commitDetail)
return nil
}

Expand Down Expand Up @@ -334,7 +335,7 @@ func (c *twoPhaseCommitter) doActionOnKeys(bo *Backoffer, action twoPhaseCommitA
}
}
sizeFunc = c.keyValueSize
atomic.AddInt32(&c.detail.PrewriteRegionNum, int32(len(groups)))
atomic.AddInt32(&c.getDetail().PrewriteRegionNum, int32(len(groups)))
}
// Make sure the group that contains primary key goes first.
batches = appendBatchBySize(batches, firstRegion, groups[firstRegion], sizeFunc, txnCommitBatchSize)
Expand Down Expand Up @@ -418,6 +419,7 @@ func (c *twoPhaseCommitter) doActionOnBatches(bo *Backoffer, action twoPhaseComm

batch := batch1
go func() {
var singleBatchBackoffer *Backoffer
if action == actionCommit {
// Because the secondary batches of the commit actions are implemented to be
// committed asynchronously in background goroutines, we should not
Expand All @@ -426,12 +428,22 @@ func (c *twoPhaseCommitter) doActionOnBatches(bo *Backoffer, action twoPhaseComm
// Here we makes a new clone of the original backoffer for this goroutine
// exclusively to avoid the data race when using the same backoffer
// in concurrent goroutines.
singleBatchBackoffer := backoffer.Clone()
ch <- singleBatchActionFunc(singleBatchBackoffer, batch)
singleBatchBackoffer = backoffer.Clone()
} else {
singleBatchBackoffer, singleBatchCancel := backoffer.Fork()
var singleBatchCancel context.CancelFunc
singleBatchBackoffer, singleBatchCancel = backoffer.Fork()
defer singleBatchCancel()
ch <- singleBatchActionFunc(singleBatchBackoffer, batch)
}
beforeSleep := singleBatchBackoffer.totalSleep
ch <- singleBatchActionFunc(singleBatchBackoffer, batch)
commitDetail := c.getDetail()
if commitDetail != nil { // lock operations of pessimistic-txn will let commitDetail be nil
if delta := singleBatchBackoffer.totalSleep - beforeSleep; delta > 0 {
atomic.AddInt64(&commitDetail.CommitBackoffTime, int64(singleBatchBackoffer.totalSleep-beforeSleep)*int64(time.Millisecond))
commitDetail.Mu.Lock()
commitDetail.Mu.BackoffTypes = append(commitDetail.Mu.BackoffTypes, singleBatchBackoffer.types...)
commitDetail.Mu.Unlock()
}
}
}()
}
Expand Down Expand Up @@ -566,7 +578,7 @@ func (c *twoPhaseCommitter) prewriteSingleBatch(bo *Backoffer, batch batchKeys)
if err != nil {
return errors.Trace(err)
}
atomic.AddInt64(&c.detail.ResolveLockTime, int64(time.Since(start)))
atomic.AddInt64(&c.getDetail().ResolveLockTime, int64(time.Since(start)))
if msBeforeExpired > 0 {
err = bo.BackoffWithMaxSleep(BoTxnLock, int(msBeforeExpired), errors.Errorf("2PC prewrite lockedKeys: %d", len(locks)))
if err != nil {
Expand Down Expand Up @@ -714,6 +726,14 @@ func kvPriorityToCommandPri(pri int) pb.CommandPri {
return pb.CommandPri_Normal
}

func (c *twoPhaseCommitter) setDetail(d *execdetails.CommitDetails) {
atomic.StorePointer(&c.detail, unsafe.Pointer(d))
}

func (c *twoPhaseCommitter) getDetail() *execdetails.CommitDetails {
return (*execdetails.CommitDetails)(atomic.LoadPointer(&c.detail))
}

func (c *twoPhaseCommitter) setUndeterminedErr(err error) {
c.mu.Lock()
defer c.mu.Unlock()
Expand Down Expand Up @@ -906,8 +926,14 @@ func (c *twoPhaseCommitter) execute(ctx context.Context) error {
prewriteBo := NewBackoffer(ctx, prewriteMaxBackoff).WithVars(c.txn.vars)
start := time.Now()
err := c.prewriteKeys(prewriteBo, c.keys)
c.detail.PrewriteTime = time.Since(start)
c.detail.TotalBackoffTime += time.Duration(prewriteBo.totalSleep) * time.Millisecond
commitDetail := c.getDetail()
commitDetail.PrewriteTime = time.Since(start)
if prewriteBo.totalSleep > 0 {
atomic.AddInt64(&commitDetail.CommitBackoffTime, int64(prewriteBo.totalSleep)*int64(time.Millisecond))
commitDetail.Mu.Lock()
commitDetail.Mu.BackoffTypes = append(commitDetail.Mu.BackoffTypes, prewriteBo.types...)
commitDetail.Mu.Unlock()
}
if binlogChan != nil {
binlogErr := <-binlogChan
if binlogErr != nil {
Expand All @@ -929,7 +955,7 @@ func (c *twoPhaseCommitter) execute(ctx context.Context) error {
zap.Uint64("txnStartTS", c.startTS))
return errors.Trace(err)
}
c.detail.GetCommitTsTime = time.Since(start)
commitDetail.GetCommitTsTime = time.Since(start)

// check commitTS
if commitTS <= c.startTS {
Expand Down Expand Up @@ -958,8 +984,13 @@ func (c *twoPhaseCommitter) execute(ctx context.Context) error {
start = time.Now()
commitBo := NewBackoffer(ctx, CommitMaxBackoff).WithVars(c.txn.vars)
err = c.commitKeys(commitBo, c.keys)
c.detail.CommitTime = time.Since(start)
c.detail.TotalBackoffTime += time.Duration(commitBo.totalSleep) * time.Millisecond
commitDetail.CommitTime = time.Since(start)
if commitBo.totalSleep > 0 {
atomic.AddInt64(&commitDetail.CommitBackoffTime, int64(commitBo.totalSleep)*int64(time.Millisecond))
commitDetail.Mu.Lock()
commitDetail.Mu.BackoffTypes = append(commitDetail.Mu.BackoffTypes, commitBo.types...)
commitDetail.Mu.Unlock()
}
if err != nil {
if undeterminedErr := c.getUndeterminedErr(); undeterminedErr != nil {
logutil.Logger(ctx).Error("2PC commit result undetermined",
Expand Down
7 changes: 3 additions & 4 deletions store/tikv/backoff.go
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,7 @@ type Backoffer struct {
maxSleep int
totalSleep int
errors []error
types []backoffType
types []fmt.Stringer
vars *kv.Variables
noop bool
}
Expand Down Expand Up @@ -290,6 +290,7 @@ func (b *Backoffer) BackoffWithMaxSleep(typ backoffType, maxSleepMs int, err err
default:
}

b.errors = append(b.errors, errors.Errorf("%s at %s", err.Error(), time.Now().Format(time.RFC3339Nano)))
b.types = append(b.types, typ)
if b.noop || (b.maxSleep > 0 && b.totalSleep >= b.maxSleep) {
errMsg := fmt.Sprintf("%s backoffer.maxSleep %dms is exceeded, errors:", typ.String(), b.maxSleep)
Expand All @@ -301,7 +302,7 @@ func (b *Backoffer) BackoffWithMaxSleep(typ backoffType, maxSleepMs int, err err
}
logutil.Logger(context.Background()).Warn(errMsg)
// Use the first backoff type to generate a MySQL error.
return b.types[0].TError()
return b.types[0].(backoffType).TError()
}

backoffCounter, backoffDuration := typ.metric()
Expand Down Expand Up @@ -330,8 +331,6 @@ func (b *Backoffer) BackoffWithMaxSleep(typ backoffType, maxSleepMs int, err err
zap.Int("maxSleep", b.maxSleep),
zap.Stringer("type", typ),
zap.Reflect("txnStartTS", startTs))

b.errors = append(b.errors, errors.Errorf("%s at %s", err.Error(), time.Now().Format(time.RFC3339Nano)))
return nil
}

Expand Down
9 changes: 5 additions & 4 deletions store/tikv/txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -287,7 +287,7 @@ func (txn *tikvTxn) Commit(ctx context.Context) error {
if *commitDetail != nil {
(*commitDetail).TxnRetry += 1
} else {
*commitDetail = committer.detail
*commitDetail = committer.getDetail()
}
}
}()
Expand All @@ -303,9 +303,10 @@ func (txn *tikvTxn) Commit(ctx context.Context) error {
// for transactions which need to acquire latches
start = time.Now()
lock := txn.store.txnLatches.Lock(committer.startTS, committer.keys)
committer.detail.LocalLatchTime = time.Since(start)
if committer.detail.LocalLatchTime > 0 {
metrics.TiKVLocalLatchWaitTimeHistogram.Observe(committer.detail.LocalLatchTime.Seconds())
commitDetail := committer.getDetail()
commitDetail.LocalLatchTime = time.Since(start)
if commitDetail.LocalLatchTime > 0 {
metrics.TiKVLocalLatchWaitTimeHistogram.Observe(commitDetail.LocalLatchTime.Seconds())
}
defer txn.store.txnLatches.UnLock(lock)
if lock.IsStale() {
Expand Down
26 changes: 21 additions & 5 deletions util/execdetails/execdetails.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,11 @@ type CommitDetails struct {
PrewriteTime time.Duration
CommitTime time.Duration
LocalLatchTime time.Duration
TotalBackoffTime time.Duration
CommitBackoffTime int64
Mu struct {
sync.Mutex
BackoffTypes []fmt.Stringer
}
ResolveLockTime int64
WriteKeys int
WriteSize int
Expand Down Expand Up @@ -102,9 +106,15 @@ func (d ExecDetails) String() string {
if commitDetails.GetCommitTsTime > 0 {
parts = append(parts, fmt.Sprintf("Get_commit_ts_time: %v", commitDetails.GetCommitTsTime.Seconds()))
}
if commitDetails.TotalBackoffTime > 0 {
parts = append(parts, fmt.Sprintf("Total_backoff_time: %v", commitDetails.TotalBackoffTime.Seconds()))
commitBackoffTime := atomic.LoadInt64(&commitDetails.CommitBackoffTime)
if commitBackoffTime > 0 {
parts = append(parts, fmt.Sprintf("Commit_backoff_time: %v", time.Duration(commitBackoffTime).Seconds()))
}
commitDetails.Mu.Lock()
if len(commitDetails.Mu.BackoffTypes) > 0 {
parts = append(parts, fmt.Sprintf("Backoff_types: %v", commitDetails.Mu.BackoffTypes))
}
commitDetails.Mu.Unlock()
resolveLockTime := atomic.LoadInt64(&commitDetails.ResolveLockTime)
if resolveLockTime > 0 {
parts = append(parts, fmt.Sprintf("Resolve_lock_time: %v", time.Duration(resolveLockTime).Seconds()))
Expand Down Expand Up @@ -161,9 +171,15 @@ func (d ExecDetails) ToZapFields() (fields []zap.Field) {
if commitDetails.GetCommitTsTime > 0 {
fields = append(fields, zap.String("get_commit_ts_time", fmt.Sprintf("%v", strconv.FormatFloat(commitDetails.GetCommitTsTime.Seconds(), 'f', -1, 64)+"s")))
}
if commitDetails.TotalBackoffTime > 0 {
fields = append(fields, zap.String("total_backoff_time", fmt.Sprintf("%v", strconv.FormatFloat(commitDetails.TotalBackoffTime.Seconds(), 'f', -1, 64)+"s")))
commitBackoffTime := atomic.LoadInt64(&commitDetails.CommitBackoffTime)
if commitBackoffTime > 0 {
fields = append(fields, zap.String("commit_backoff_time", fmt.Sprintf("%v", strconv.FormatFloat(time.Duration(commitBackoffTime).Seconds(), 'f', -1, 64)+"s")))
}
commitDetails.Mu.Lock()
if len(commitDetails.Mu.BackoffTypes) > 0 {
fields = append(fields, zap.String("backoff_types", fmt.Sprintf("%v", commitDetails.Mu.BackoffTypes)))
}
commitDetails.Mu.Unlock()
resolveLockTime := atomic.LoadInt64(&commitDetails.ResolveLockTime)
if resolveLockTime > 0 {
fields = append(fields, zap.String("resolve_lock_time", fmt.Sprintf("%v", strconv.FormatFloat(time.Duration(resolveLockTime).Seconds(), 'f', -1, 64)+"s")))
Expand Down
18 changes: 16 additions & 2 deletions util/execdetails/execdetails_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,12 @@
package execdetails

import (
"fmt"
"sync"
"testing"
"time"

"github.com/pingcap/tidb/util/stringutil"
"github.com/pingcap/tipb/go-tipb"
)

Expand All @@ -33,7 +36,18 @@ func TestString(t *testing.T) {
PrewriteTime: time.Second,
CommitTime: time.Second,
LocalLatchTime: time.Second,
TotalBackoffTime: time.Second,
CommitBackoffTime: int64(time.Second),
Mu: struct {
sync.Mutex
BackoffTypes []fmt.Stringer
}{BackoffTypes: []fmt.Stringer{
stringutil.MemoizeStr(func() string {
return "backoff1"
}),
stringutil.MemoizeStr(func() string {
return "backoff2"
}),
}},
ResolveLockTime: 1000000000, // 10^9 ns = 1s
WriteKeys: 1,
WriteSize: 1,
Expand All @@ -42,7 +56,7 @@ func TestString(t *testing.T) {
},
}
expected := "Process_time: 2.005 Wait_time: 1 Backoff_time: 1 Request_count: 1 Total_keys: 100 Process_keys: 10 Prewrite_time: 1 Commit_time: 1 " +
"Get_commit_ts_time: 1 Total_backoff_time: 1 Resolve_lock_time: 1 Local_latch_wait_time: 1 Write_keys: 1 Write_size: 1 Prewrite_region: 1 Txn_retry: 1"
"Get_commit_ts_time: 1 Commit_backoff_time: 1 Backoff_types: [backoff1 backoff2] Resolve_lock_time: 1 Local_latch_wait_time: 1 Write_keys: 1 Write_size: 1 Prewrite_region: 1 Txn_retry: 1"
if str := detail.String(); str != expected {
t.Errorf("got:\n%s\nexpected:\n%s", str, expected)
}
Expand Down

0 comments on commit 0106711

Please sign in to comment.