Skip to content

Commit

Permalink
execdetails: fix data race in the BasicRuntimeStats (#42338)
Browse files Browse the repository at this point in the history
close #42337
  • Loading branch information
hawkingrei authored Mar 17, 2023
1 parent 7bcb766 commit ed86b9c
Show file tree
Hide file tree
Showing 2 changed files with 43 additions and 35 deletions.
1 change: 1 addition & 0 deletions executor/asyncloaddata/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ go_test(
],
embed = [":asyncloaddata"],
flaky = True,
race = "on",
deps = [
"//br/pkg/lightning/config",
"//executor/importer",
Expand Down
77 changes: 42 additions & 35 deletions util/execdetails/execdetails.go
Original file line number Diff line number Diff line change
Expand Up @@ -333,20 +333,24 @@ type basicCopRuntimeStats struct {
// String implements the RuntimeStats interface.
func (e *basicCopRuntimeStats) String() string {
if e.storeType == "tiflash" {
return fmt.Sprintf("time:%v, loops:%d, threads:%d, ", FormatDuration(time.Duration(e.consume)), e.loop, e.threads) + e.BasicRuntimeStats.tiflashScanContext.String()
return fmt.Sprintf("time:%v, loops:%d, threads:%d, ", FormatDuration(time.Duration(e.consume.Load())), e.loop.Load(), e.threads) + e.BasicRuntimeStats.tiflashScanContext.String()
}
return fmt.Sprintf("time:%v, loops:%d", FormatDuration(time.Duration(e.consume)), e.loop)
return fmt.Sprintf("time:%v, loops:%d", FormatDuration(time.Duration(e.consume.Load())), e.loop.Load())
}

// Clone implements the RuntimeStats interface.
func (e *basicCopRuntimeStats) Clone() RuntimeStats {
return &basicCopRuntimeStats{
BasicRuntimeStats: BasicRuntimeStats{loop: e.loop, consume: e.consume, rows: e.rows, tiflashScanContext: e.tiflashScanContext.Clone()},
stats := &basicCopRuntimeStats{
BasicRuntimeStats: BasicRuntimeStats{tiflashScanContext: e.tiflashScanContext.Clone()},
threads: e.threads,
storeType: e.storeType,
totalTasks: e.totalTasks,
procTimes: e.procTimes,
}
stats.loop.Store(e.loop.Load())
stats.consume.Store(e.consume.Load())
stats.rows.Store(e.rows.Load())
return stats
}

// Merge implements the RuntimeStats interface.
Expand All @@ -355,15 +359,15 @@ func (e *basicCopRuntimeStats) Merge(rs RuntimeStats) {
if !ok {
return
}
e.loop += tmp.loop
e.consume += tmp.consume
e.rows += tmp.rows
e.loop.Add(tmp.loop.Load())
e.consume.Add(tmp.consume.Load())
e.rows.Add(tmp.rows.Load())
e.threads += tmp.threads
e.totalTasks += tmp.totalTasks
if len(tmp.procTimes) > 0 {
e.procTimes = append(e.procTimes, tmp.procTimes...)
} else {
e.procTimes = append(e.procTimes, time.Duration(tmp.consume))
e.procTimes = append(e.procTimes, time.Duration(tmp.consume.Load()))
}
e.tiflashScanContext.Merge(tmp.tiflashScanContext)
}
Expand Down Expand Up @@ -397,11 +401,9 @@ func (crs *CopRuntimeStats) RecordOneCopTask(address string, summary *tipb.Execu
storeType: crs.storeType,
}
}
crs.stats[address].Merge(&basicCopRuntimeStats{
data := &basicCopRuntimeStats{
storeType: crs.storeType,
BasicRuntimeStats: BasicRuntimeStats{loop: int32(*summary.NumIterations),
consume: int64(*summary.TimeProcessedNs),
rows: int64(*summary.NumProducedRows),
BasicRuntimeStats: BasicRuntimeStats{
tiflashScanContext: TiFlashScanContext{
totalDmfileScannedPacks: summary.GetTiflashScanContext().GetTotalDmfileScannedPacks(),
totalDmfileSkippedPacks: summary.GetTiflashScanContext().GetTotalDmfileSkippedPacks(),
Expand All @@ -413,13 +415,17 @@ func (crs *CopRuntimeStats) RecordOneCopTask(address string, summary *tipb.Execu
totalLocalRegionNum: summary.GetTiflashScanContext().GetTotalLocalRegionNum(),
totalRemoteRegionNum: summary.GetTiflashScanContext().GetTotalRemoteRegionNum()}}, threads: int32(summary.GetConcurrency()),
totalTasks: 1,
})
}
data.BasicRuntimeStats.loop.Store(int32(*summary.NumIterations))
data.BasicRuntimeStats.consume.Store(int64(*summary.TimeProcessedNs))
data.BasicRuntimeStats.rows.Store(int64(*summary.NumProducedRows))
crs.stats[address].Merge(data)
}

// GetActRows return total rows of CopRuntimeStats.
func (crs *CopRuntimeStats) GetActRows() (totalRows int64) {
for _, instanceStats := range crs.stats {
totalRows += instanceStats.rows
totalRows += instanceStats.rows.Load()
}
return totalRows
}
Expand All @@ -430,8 +436,8 @@ func (crs *CopRuntimeStats) MergeBasicStats() (procTimes []time.Duration, totalT
totalTiFlashScanContext = TiFlashScanContext{}
for _, instanceStats := range crs.stats {
procTimes = append(procTimes, instanceStats.procTimes...)
totalTime += time.Duration(instanceStats.consume)
totalLoops += instanceStats.loop
totalTime += time.Duration(instanceStats.consume.Load())
totalLoops += instanceStats.loop.Load()
totalThreads += instanceStats.threads
totalTiFlashScanContext.Merge(instanceStats.tiflashScanContext)
totalTasks += instanceStats.totalTasks
Expand Down Expand Up @@ -586,28 +592,29 @@ func (context *TiFlashScanContext) Empty() bool {
// BasicRuntimeStats is the basic runtime stats.
type BasicRuntimeStats struct {
// executor's Next() called times.
loop int32
loop atomic.Int32
// executor consume time.
consume int64
consume atomic.Int64
// executor return row count.
rows int64
rows atomic.Int64
// executor extra infos
tiflashScanContext TiFlashScanContext
}

// GetActRows return total rows of BasicRuntimeStats.
func (e *BasicRuntimeStats) GetActRows() int64 {
return e.rows
return e.rows.Load()
}

// Clone implements the RuntimeStats interface.
func (e *BasicRuntimeStats) Clone() RuntimeStats {
return &BasicRuntimeStats{
loop: e.loop,
consume: e.consume,
rows: e.rows,
result := &BasicRuntimeStats{
tiflashScanContext: e.tiflashScanContext.Clone(),
}
result.loop.Store(e.loop.Load())
result.consume.Store(e.consume.Load())
result.rows.Store(e.rows.Load())
return result
}

// Merge implements the RuntimeStats interface.
Expand All @@ -616,9 +623,9 @@ func (e *BasicRuntimeStats) Merge(rs RuntimeStats) {
if !ok {
return
}
e.loop += tmp.loop
e.consume += tmp.consume
e.rows += tmp.rows
e.loop.Add(tmp.loop.Load())
e.consume.Add(tmp.consume.Load())
e.rows.Add(tmp.rows.Load())
e.tiflashScanContext.Merge(tmp.tiflashScanContext)
}

Expand All @@ -643,7 +650,7 @@ func (e *RootRuntimeStats) GetActRows() int64 {
if e.basic == nil {
return 0
}
return e.basic.rows
return e.basic.rows.Load()
}

// MergeStats merges stats in the RootRuntimeStats and return the stats suitable for display directly.
Expand All @@ -669,14 +676,14 @@ func (e *RootRuntimeStats) String() string {

// Record records executor's execution.
func (e *BasicRuntimeStats) Record(d time.Duration, rowNum int) {
atomic.AddInt32(&e.loop, 1)
atomic.AddInt64(&e.consume, int64(d))
atomic.AddInt64(&e.rows, int64(rowNum))
e.loop.Add(1)
e.consume.Add(int64(d))
e.rows.Add(int64(rowNum))
}

// SetRowNum sets the row num.
func (e *BasicRuntimeStats) SetRowNum(rowNum int64) {
atomic.StoreInt64(&e.rows, rowNum)
e.rows.Store(rowNum)
}

// String implements the RuntimeStats interface.
Expand All @@ -686,15 +693,15 @@ func (e *BasicRuntimeStats) String() string {
}
var str strings.Builder
str.WriteString("time:")
str.WriteString(FormatDuration(time.Duration(e.consume)))
str.WriteString(FormatDuration(time.Duration(e.consume.Load())))
str.WriteString(", loops:")
str.WriteString(strconv.FormatInt(int64(e.loop), 10))
str.WriteString(strconv.FormatInt(int64(e.loop.Load()), 10))
return str.String()
}

// GetTime get the int64 total time
func (e *BasicRuntimeStats) GetTime() int64 {
return e.consume
return e.consume.Load()
}

// RuntimeStatsColl collects executors's execution info.
Expand Down

0 comments on commit ed86b9c

Please sign in to comment.