Skip to content

Commit

Permalink
tikv: refine commit backoff slow log
Browse files Browse the repository at this point in the history
  • Loading branch information
lysu committed Aug 16, 2019
1 parent db6c36c commit f05a27d
Show file tree
Hide file tree
Showing 3 changed files with 26 additions and 13 deletions.
23 changes: 17 additions & 6 deletions store/tikv/2pc.go
Original file line number Diff line number Diff line change
Expand Up @@ -410,6 +410,7 @@ func (c *twoPhaseCommitter) doActionOnBatches(bo *Backoffer, action twoPhaseComm

batch := batch1
go func() {
var singleBatchBackoffer *Backoffer
if action == actionCommit {
// Because the secondary batches of the commit actions are implemented to be
// committed asynchronously in background goroutines, we should not
Expand All @@ -418,12 +419,18 @@ func (c *twoPhaseCommitter) doActionOnBatches(bo *Backoffer, action twoPhaseComm
// Here we makes a new clone of the original backoffer for this goroutine
// exclusively to avoid the data race when using the same backoffer
// in concurrent goroutines.
singleBatchBackoffer := backoffer.Clone()
ch <- singleBatchActionFunc(singleBatchBackoffer, batch)
singleBatchBackoffer = backoffer.Clone()
} else {
singleBatchBackoffer, singleBatchCancel := backoffer.Fork()
var singleBatchCancel context.CancelFunc
singleBatchBackoffer, singleBatchCancel = backoffer.Fork()
defer singleBatchCancel()
ch <- singleBatchActionFunc(singleBatchBackoffer, batch)
}
beforeSleep := singleBatchBackoffer.totalSleep
ch <- singleBatchActionFunc(singleBatchBackoffer, batch)
if c.detail != nil {
if delta := singleBatchBackoffer.totalSleep - beforeSleep; delta > 0 {
atomic.AddInt64(&c.detail.CommitBackoffTime, int64(singleBatchBackoffer.totalSleep-beforeSleep)*int64(time.Millisecond))
}
}
}()
}
Expand Down Expand Up @@ -873,7 +880,9 @@ func (c *twoPhaseCommitter) execute(ctx context.Context) error {
start := time.Now()
err := c.prewriteKeys(prewriteBo, c.keys)
c.detail.PrewriteTime = time.Since(start)
c.detail.TotalBackoffTime += time.Duration(prewriteBo.totalSleep) * time.Millisecond
if prewriteBo.totalSleep > 0 {
atomic.AddInt64(&c.detail.CommitBackoffTime, int64(prewriteBo.totalSleep)*int64(time.Millisecond))
}
if binlogChan != nil {
binlogErr := <-binlogChan
if binlogErr != nil {
Expand Down Expand Up @@ -928,7 +937,9 @@ func (c *twoPhaseCommitter) execute(ctx context.Context) error {
commitBo := NewBackoffer(ctx, CommitMaxBackoff).WithVars(c.txn.vars)
err = c.commitKeys(commitBo, c.keys)
c.detail.CommitTime = time.Since(start)
c.detail.TotalBackoffTime += time.Duration(commitBo.totalSleep) * time.Millisecond
if commitBo.totalSleep > 0 {
atomic.AddInt64(&c.detail.CommitBackoffTime, int64(commitBo.totalSleep)*int64(time.Millisecond))
}
if err != nil {
if undeterminedErr := c.getUndeterminedErr(); undeterminedErr != nil {
logutil.Logger(ctx).Error("2PC commit result undetermined",
Expand Down
12 changes: 7 additions & 5 deletions util/execdetails/execdetails.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ type CommitDetails struct {
PrewriteTime time.Duration
CommitTime time.Duration
LocalLatchTime time.Duration
TotalBackoffTime time.Duration
CommitBackoffTime int64
ResolveLockTime int64
WriteKeys int
WriteSize int
Expand Down Expand Up @@ -104,8 +104,9 @@ func (d ExecDetails) String() string {
if commitDetails.GetCommitTsTime > 0 {
parts = append(parts, fmt.Sprintf("Get_commit_ts_time: %v", commitDetails.GetCommitTsTime.Seconds()))
}
if commitDetails.TotalBackoffTime > 0 {
parts = append(parts, fmt.Sprintf("Total_backoff_time: %v", commitDetails.TotalBackoffTime.Seconds()))
commitBackoffTime := atomic.LoadInt64(&commitDetails.CommitBackoffTime)
if commitBackoffTime > 0 {
parts = append(parts, fmt.Sprintf("Commit_backoff_time: %v", time.Duration(commitBackoffTime).Seconds()))
}
resolveLockTime := atomic.LoadInt64(&commitDetails.ResolveLockTime)
if resolveLockTime > 0 {
Expand Down Expand Up @@ -163,8 +164,9 @@ func (d ExecDetails) ToZapFields() (fields []zap.Field) {
if commitDetails.GetCommitTsTime > 0 {
fields = append(fields, zap.String("get_commit_ts_time", fmt.Sprintf("%v", strconv.FormatFloat(commitDetails.GetCommitTsTime.Seconds(), 'f', -1, 64)+"s")))
}
if commitDetails.TotalBackoffTime > 0 {
fields = append(fields, zap.String("total_backoff_time", fmt.Sprintf("%v", strconv.FormatFloat(commitDetails.TotalBackoffTime.Seconds(), 'f', -1, 64)+"s")))
commitBackoffTime := atomic.LoadInt64(&commitDetails.CommitBackoffTime)
if commitBackoffTime > 0 {
fields = append(fields, zap.String("commit_backoff_time", fmt.Sprintf("%v", strconv.FormatFloat(time.Duration(commitBackoffTime).Seconds(), 'f', -1, 64)+"s")))
}
resolveLockTime := atomic.LoadInt64(&commitDetails.ResolveLockTime)
if resolveLockTime > 0 {
Expand Down
4 changes: 2 additions & 2 deletions util/execdetails/execdetails_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ func TestString(t *testing.T) {
PrewriteTime: time.Second,
CommitTime: time.Second,
LocalLatchTime: time.Second,
TotalBackoffTime: time.Second,
CommitBackoffTime: int64(time.Second),
ResolveLockTime: 1000000000, // 10^9 ns = 1s
WriteKeys: 1,
WriteSize: 1,
Expand All @@ -42,7 +42,7 @@ func TestString(t *testing.T) {
},
}
expected := "Process_time: 2.005 Wait_time: 1 Backoff_time: 1 Request_count: 1 Total_keys: 100 Process_keys: 10 Prewrite_time: 1 Commit_time: 1 " +
"Get_commit_ts_time: 1 Total_backoff_time: 1 Resolve_lock_time: 1 Local_latch_wait_time: 1 Write_keys: 1 Write_size: 1 Prewrite_region: 1 Txn_retry: 1"
"Get_commit_ts_time: 1 Commit_backoff_time: 1 Resolve_lock_time: 1 Local_latch_wait_time: 1 Write_keys: 1 Write_size: 1 Prewrite_region: 1 Txn_retry: 1"
if str := detail.String(); str != expected {
t.Errorf("got:\n%s\nexpected:\n%s", str, expected)
}
Expand Down

0 comments on commit f05a27d

Please sign in to comment.