diff --git a/executor/asyncloaddata/BUILD.bazel b/executor/asyncloaddata/BUILD.bazel new file mode 100644 index 0000000000000..f70abe0bdd49a --- /dev/null +++ b/executor/asyncloaddata/BUILD.bazel @@ -0,0 +1,49 @@ +load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test") + +go_library( + name = "asyncloaddata", + srcs = [ + "progress.go", + "util.go", + ], + importpath = "github.com/pingcap/tidb/executor/asyncloaddata", + visibility = ["//visibility:public"], + deps = [ + "//kv", + "//parser/terror", + "//types", + "//util/chunk", + "//util/dbterror/exeerrors", + "//util/sqlexec", + "@com_github_pingcap_errors//:errors", + "@com_github_tikv_client_go_v2//util", + ], +) + +go_test( + name = "asyncloaddata_test", + timeout = "short", + srcs = [ + "detach_test.go", + "main_test.go", + "operate_test.go", + "show_test.go", + "util_test.go", + ], + embed = [":asyncloaddata"], + flaky = True, + race = "on", + deps = [ + "//br/pkg/lightning/config", + "//executor/importer", + "//kv", + "//parser/auth", + "//testkit", + "//util/sqlexec", + "@com_github_fsouza_fake_gcs_server//fakestorage", + "@com_github_pingcap_failpoint//:failpoint", + "@com_github_stretchr_testify//require", + "@com_github_stretchr_testify//suite", + "@org_uber_go_goleak//:goleak", + ], +) diff --git a/util/execdetails/execdetails.go b/util/execdetails/execdetails.go index 4eec1f0223a30..b2f2803becfac 100644 --- a/util/execdetails/execdetails.go +++ b/util/execdetails/execdetails.go @@ -511,20 +511,24 @@ func (p *Percentile[valueType]) Sum() float64 { // String implements the RuntimeStats interface. func (e *basicCopRuntimeStats) String() string { if e.storeType == "tiflash" { - return fmt.Sprintf("time:%v, loops:%d, threads:%d, ", FormatDuration(time.Duration(e.consume)), e.loop, e.threads) + e.BasicRuntimeStats.tiflashScanContext.String() + return fmt.Sprintf("time:%v, loops:%d, threads:%d, ", FormatDuration(time.Duration(e.consume.Load())), e.loop.Load(), e.threads) + e.BasicRuntimeStats.tiflashScanContext.String() } - return fmt.Sprintf("time:%v, loops:%d", FormatDuration(time.Duration(e.consume)), e.loop) + return fmt.Sprintf("time:%v, loops:%d", FormatDuration(time.Duration(e.consume.Load())), e.loop.Load()) } // Clone implements the RuntimeStats interface. func (e *basicCopRuntimeStats) Clone() RuntimeStats { - return &basicCopRuntimeStats{ - BasicRuntimeStats: BasicRuntimeStats{loop: e.loop, consume: e.consume, rows: e.rows, tiflashScanContext: e.tiflashScanContext.Clone()}, + stats := &basicCopRuntimeStats{ + BasicRuntimeStats: BasicRuntimeStats{tiflashScanContext: e.tiflashScanContext.Clone()}, threads: e.threads, storeType: e.storeType, totalTasks: e.totalTasks, procTimes: e.procTimes, } + stats.loop.Store(e.loop.Load()) + stats.consume.Store(e.consume.Load()) + stats.rows.Store(e.rows.Load()) + return stats } // Merge implements the RuntimeStats interface. @@ -533,15 +537,19 @@ func (e *basicCopRuntimeStats) Merge(rs RuntimeStats) { if !ok { return } - e.loop += tmp.loop - e.consume += tmp.consume - e.rows += tmp.rows + e.loop.Add(tmp.loop.Load()) + e.consume.Add(tmp.consume.Load()) + e.rows.Add(tmp.rows.Load()) e.threads += tmp.threads e.totalTasks += tmp.totalTasks if tmp.procTimes.Size() == 0 { e.procTimes.Add(Duration(tmp.consume)) } else { +<<<<<<< HEAD e.procTimes.MergePercentile(&tmp.procTimes) +======= + e.procTimes = append(e.procTimes, time.Duration(tmp.consume.Load())) +>>>>>>> ed86b9c7419 (execdetails: fix data race in the BasicRuntimeStats (#42338)) } e.tiflashScanContext.Merge(tmp.tiflashScanContext) } @@ -575,11 +583,9 @@ func (crs *CopRuntimeStats) RecordOneCopTask(address string, summary *tipb.Execu storeType: crs.storeType, } } - crs.stats[address].Merge(&basicCopRuntimeStats{ + data := &basicCopRuntimeStats{ storeType: crs.storeType, - BasicRuntimeStats: BasicRuntimeStats{loop: int32(*summary.NumIterations), - consume: int64(*summary.TimeProcessedNs), - rows: int64(*summary.NumProducedRows), + BasicRuntimeStats: BasicRuntimeStats{ tiflashScanContext: TiFlashScanContext{ totalDmfileScannedPacks: summary.GetTiflashScanContext().GetTotalDmfileScannedPacks(), totalDmfileSkippedPacks: summary.GetTiflashScanContext().GetTotalDmfileSkippedPacks(), @@ -589,13 +595,17 @@ func (crs *CopRuntimeStats) RecordOneCopTask(address string, summary *tipb.Execu totalDmfileReadTimeMs: summary.GetTiflashScanContext().GetTotalDmfileReadTimeMs(), totalCreateSnapshotTimeMs: summary.GetTiflashScanContext().GetTotalCreateSnapshotTimeMs()}}, threads: int32(summary.GetConcurrency()), totalTasks: 1, - }) + } + data.BasicRuntimeStats.loop.Store(int32(*summary.NumIterations)) + data.BasicRuntimeStats.consume.Store(int64(*summary.TimeProcessedNs)) + data.BasicRuntimeStats.rows.Store(int64(*summary.NumProducedRows)) + crs.stats[address].Merge(data) } // GetActRows return total rows of CopRuntimeStats. func (crs *CopRuntimeStats) GetActRows() (totalRows int64) { for _, instanceStats := range crs.stats { - totalRows += instanceStats.rows + totalRows += instanceStats.rows.Load() } return totalRows } @@ -604,9 +614,15 @@ func (crs *CopRuntimeStats) GetActRows() (totalRows int64) { func (crs *CopRuntimeStats) MergeBasicStats() (procTimes Percentile[Duration], totalTime time.Duration, totalTasks, totalLoops, totalThreads int32, totalTiFlashScanContext TiFlashScanContext) { totalTiFlashScanContext = TiFlashScanContext{} for _, instanceStats := range crs.stats { +<<<<<<< HEAD procTimes.MergePercentile(&instanceStats.procTimes) totalTime += time.Duration(instanceStats.consume) totalLoops += instanceStats.loop +======= + procTimes = append(procTimes, instanceStats.procTimes...) + totalTime += time.Duration(instanceStats.consume.Load()) + totalLoops += instanceStats.loop.Load() +>>>>>>> ed86b9c7419 (execdetails: fix data race in the BasicRuntimeStats (#42338)) totalThreads += instanceStats.threads totalTiFlashScanContext.Merge(instanceStats.tiflashScanContext) totalTasks += instanceStats.totalTasks @@ -751,28 +767,29 @@ func (context *TiFlashScanContext) Empty() bool { // BasicRuntimeStats is the basic runtime stats. type BasicRuntimeStats struct { // executor's Next() called times. - loop int32 + loop atomic.Int32 // executor consume time. - consume int64 + consume atomic.Int64 // executor return row count. - rows int64 + rows atomic.Int64 // executor extra infos tiflashScanContext TiFlashScanContext } // GetActRows return total rows of BasicRuntimeStats. func (e *BasicRuntimeStats) GetActRows() int64 { - return e.rows + return e.rows.Load() } // Clone implements the RuntimeStats interface. func (e *BasicRuntimeStats) Clone() RuntimeStats { - return &BasicRuntimeStats{ - loop: e.loop, - consume: e.consume, - rows: e.rows, + result := &BasicRuntimeStats{ tiflashScanContext: e.tiflashScanContext.Clone(), } + result.loop.Store(e.loop.Load()) + result.consume.Store(e.consume.Load()) + result.rows.Store(e.rows.Load()) + return result } // Merge implements the RuntimeStats interface. @@ -781,9 +798,9 @@ func (e *BasicRuntimeStats) Merge(rs RuntimeStats) { if !ok { return } - e.loop += tmp.loop - e.consume += tmp.consume - e.rows += tmp.rows + e.loop.Add(tmp.loop.Load()) + e.consume.Add(tmp.consume.Load()) + e.rows.Add(tmp.rows.Load()) e.tiflashScanContext.Merge(tmp.tiflashScanContext) } @@ -808,7 +825,7 @@ func (e *RootRuntimeStats) GetActRows() int64 { if e.basic == nil { return 0 } - return e.basic.rows + return e.basic.rows.Load() } // MergeStats merges stats in the RootRuntimeStats and return the stats suitable for display directly. @@ -834,14 +851,14 @@ func (e *RootRuntimeStats) String() string { // Record records executor's execution. func (e *BasicRuntimeStats) Record(d time.Duration, rowNum int) { - atomic.AddInt32(&e.loop, 1) - atomic.AddInt64(&e.consume, int64(d)) - atomic.AddInt64(&e.rows, int64(rowNum)) + e.loop.Add(1) + e.consume.Add(int64(d)) + e.rows.Add(int64(rowNum)) } // SetRowNum sets the row num. func (e *BasicRuntimeStats) SetRowNum(rowNum int64) { - atomic.StoreInt64(&e.rows, rowNum) + e.rows.Store(rowNum) } // String implements the RuntimeStats interface. @@ -851,15 +868,15 @@ func (e *BasicRuntimeStats) String() string { } var str strings.Builder str.WriteString("time:") - str.WriteString(FormatDuration(time.Duration(e.consume))) + str.WriteString(FormatDuration(time.Duration(e.consume.Load()))) str.WriteString(", loops:") - str.WriteString(strconv.FormatInt(int64(e.loop), 10)) + str.WriteString(strconv.FormatInt(int64(e.loop.Load()), 10)) return str.String() } // GetTime get the int64 total time func (e *BasicRuntimeStats) GetTime() int64 { - return e.consume + return e.consume.Load() } // RuntimeStatsColl collects executors's execution info.