diff --git a/store/tikv/2pc.go b/store/tikv/2pc.go index ff35649eab534..cba0d99dc0236 100644 --- a/store/tikv/2pc.go +++ b/store/tikv/2pc.go @@ -410,6 +410,7 @@ func (c *twoPhaseCommitter) doActionOnBatches(bo *Backoffer, action twoPhaseComm batch := batch1 go func() { + var singleBatchBackoffer *Backoffer if action == actionCommit { // Because the secondary batches of the commit actions are implemented to be // committed asynchronously in background goroutines, we should not @@ -418,12 +419,18 @@ func (c *twoPhaseCommitter) doActionOnBatches(bo *Backoffer, action twoPhaseComm // Here we makes a new clone of the original backoffer for this goroutine // exclusively to avoid the data race when using the same backoffer // in concurrent goroutines. - singleBatchBackoffer := backoffer.Clone() - ch <- singleBatchActionFunc(singleBatchBackoffer, batch) + singleBatchBackoffer = backoffer.Clone() } else { - singleBatchBackoffer, singleBatchCancel := backoffer.Fork() + var singleBatchCancel context.CancelFunc + singleBatchBackoffer, singleBatchCancel = backoffer.Fork() defer singleBatchCancel() - ch <- singleBatchActionFunc(singleBatchBackoffer, batch) + } + beforeSleep := singleBatchBackoffer.totalSleep + ch <- singleBatchActionFunc(singleBatchBackoffer, batch) + if c.detail != nil { + if delta := singleBatchBackoffer.totalSleep - beforeSleep; delta > 0 { + atomic.AddInt64(&c.detail.CommitBackoffTime, int64(singleBatchBackoffer.totalSleep-beforeSleep)*int64(time.Millisecond)) + } } }() } @@ -873,7 +880,9 @@ func (c *twoPhaseCommitter) execute(ctx context.Context) error { start := time.Now() err := c.prewriteKeys(prewriteBo, c.keys) c.detail.PrewriteTime = time.Since(start) - c.detail.TotalBackoffTime += time.Duration(prewriteBo.totalSleep) * time.Millisecond + if prewriteBo.totalSleep > 0 { + atomic.AddInt64(&c.detail.CommitBackoffTime, int64(prewriteBo.totalSleep)*int64(time.Millisecond)) + } if binlogChan != nil { binlogErr := <-binlogChan if binlogErr != nil { @@ -928,7 +937,9 @@ func (c *twoPhaseCommitter) execute(ctx context.Context) error { commitBo := NewBackoffer(ctx, CommitMaxBackoff).WithVars(c.txn.vars) err = c.commitKeys(commitBo, c.keys) c.detail.CommitTime = time.Since(start) - c.detail.TotalBackoffTime += time.Duration(commitBo.totalSleep) * time.Millisecond + if commitBo.totalSleep > 0 { + atomic.AddInt64(&c.detail.CommitBackoffTime, int64(commitBo.totalSleep)*int64(time.Millisecond)) + } if err != nil { if undeterminedErr := c.getUndeterminedErr(); undeterminedErr != nil { logutil.Logger(ctx).Error("2PC commit result undetermined", diff --git a/util/execdetails/execdetails.go b/util/execdetails/execdetails.go index e707023895d73..f11a688c04063 100644 --- a/util/execdetails/execdetails.go +++ b/util/execdetails/execdetails.go @@ -49,7 +49,7 @@ type CommitDetails struct { PrewriteTime time.Duration CommitTime time.Duration LocalLatchTime time.Duration - TotalBackoffTime time.Duration + CommitBackoffTime int64 ResolveLockTime int64 WriteKeys int WriteSize int @@ -104,8 +104,9 @@ func (d ExecDetails) String() string { if commitDetails.GetCommitTsTime > 0 { parts = append(parts, fmt.Sprintf("Get_commit_ts_time: %v", commitDetails.GetCommitTsTime.Seconds())) } - if commitDetails.TotalBackoffTime > 0 { - parts = append(parts, fmt.Sprintf("Total_backoff_time: %v", commitDetails.TotalBackoffTime.Seconds())) + commitBackoffTime := atomic.LoadInt64(&commitDetails.CommitBackoffTime) + if commitBackoffTime > 0 { + parts = append(parts, fmt.Sprintf("Commit_backoff_time: %v", time.Duration(commitBackoffTime).Seconds())) } resolveLockTime := atomic.LoadInt64(&commitDetails.ResolveLockTime) if resolveLockTime > 0 { @@ -163,8 +164,9 @@ func (d ExecDetails) ToZapFields() (fields []zap.Field) { if commitDetails.GetCommitTsTime > 0 { fields = append(fields, zap.String("get_commit_ts_time", fmt.Sprintf("%v", strconv.FormatFloat(commitDetails.GetCommitTsTime.Seconds(), 'f', -1, 64)+"s"))) } - if commitDetails.TotalBackoffTime > 0 { - fields = append(fields, zap.String("total_backoff_time", fmt.Sprintf("%v", strconv.FormatFloat(commitDetails.TotalBackoffTime.Seconds(), 'f', -1, 64)+"s"))) + commitBackoffTime := atomic.LoadInt64(&commitDetails.CommitBackoffTime) + if commitBackoffTime > 0 { + fields = append(fields, zap.String("commit_backoff_time", fmt.Sprintf("%v", strconv.FormatFloat(time.Duration(commitBackoffTime).Seconds(), 'f', -1, 64)+"s"))) } resolveLockTime := atomic.LoadInt64(&commitDetails.ResolveLockTime) if resolveLockTime > 0 { diff --git a/util/execdetails/execdetails_test.go b/util/execdetails/execdetails_test.go index 577f933f5130e..33b1105de5aaa 100644 --- a/util/execdetails/execdetails_test.go +++ b/util/execdetails/execdetails_test.go @@ -33,7 +33,7 @@ func TestString(t *testing.T) { PrewriteTime: time.Second, CommitTime: time.Second, LocalLatchTime: time.Second, - TotalBackoffTime: time.Second, + CommitBackoffTime: int64(time.Second), ResolveLockTime: 1000000000, // 10^9 ns = 1s WriteKeys: 1, WriteSize: 1, @@ -42,7 +42,7 @@ func TestString(t *testing.T) { }, } expected := "Process_time: 2.005 Wait_time: 1 Backoff_time: 1 Request_count: 1 Total_keys: 100 Process_keys: 10 Prewrite_time: 1 Commit_time: 1 " + - "Get_commit_ts_time: 1 Total_backoff_time: 1 Resolve_lock_time: 1 Local_latch_wait_time: 1 Write_keys: 1 Write_size: 1 Prewrite_region: 1 Txn_retry: 1" + "Get_commit_ts_time: 1 Commit_backoff_time: 1 Resolve_lock_time: 1 Local_latch_wait_time: 1 Write_keys: 1 Write_size: 1 Prewrite_region: 1 Txn_retry: 1" if str := detail.String(); str != expected { t.Errorf("got:\n%s\nexpected:\n%s", str, expected) }