diff --git a/DEPS.bzl b/DEPS.bzl index 4b5c656182dd3..f14e50fcb8816 100644 --- a/DEPS.bzl +++ b/DEPS.bzl @@ -4179,8 +4179,19 @@ def go_deps(): name = "com_github_tikv_client_go_v2", build_file_proto_mode = "disable_global", importpath = "github.com/tikv/client-go/v2", +<<<<<<< HEAD sum = "h1:MuTNChL77sMvCvtOuGhuU/u1YBaSDPfqnO6roUCKwOA=", version = "v2.0.8-0.20231211100325-d44bb7f9cb9e", +======= + sha256 = "04da7d520727a9140c0d472c0f0a054837aae1da3fa49101c0f52279c7d78094", + strip_prefix = "github.com/tikv/client-go/v2@v2.0.8-0.20231204074048-e80e9ca1fe66", + urls = [ + "http://bazel-cache.pingcap.net:8080/gomod/github.com/tikv/client-go/v2/com_github_tikv_client_go_v2-v2.0.8-0.20231204074048-e80e9ca1fe66.zip", + "http://ats.apps.svc/gomod/github.com/tikv/client-go/v2/com_github_tikv_client_go_v2-v2.0.8-0.20231204074048-e80e9ca1fe66.zip", + "https://cache.hawkingrei.com/gomod/github.com/tikv/client-go/v2/com_github_tikv_client_go_v2-v2.0.8-0.20231204074048-e80e9ca1fe66.zip", + "https://storage.googleapis.com/pingcapmirror/gomod/github.com/tikv/client-go/v2/com_github_tikv_client_go_v2-v2.0.8-0.20231204074048-e80e9ca1fe66.zip", + ], +>>>>>>> 161107a7127 (executor: add ru details in slow log and INFORMATION_SCHEMA.SLOW_QUERY (#49067)) ) go_repository( name = "com_github_tikv_pd", @@ -4194,8 +4205,19 @@ def go_deps(): name = "com_github_tikv_pd_client", build_file_proto_mode = "disable_global", importpath = "github.com/tikv/pd/client", +<<<<<<< HEAD sum = "h1:BBwUZAaBl7DKdyaduOxXqias4xCtdDgIAGsDBuri3lg=", version = "v0.0.0-20231211083919-fe6fd1721aa6", +======= + sha256 = "a1f7b29d753c60a373d29430d426be39a204fbedb62ceeb41296dee7bc6e5efe", + strip_prefix = "github.com/tikv/pd/client@v0.0.0-20231204034622-259435d93ae2", + urls = [ + "http://bazel-cache.pingcap.net:8080/gomod/github.com/tikv/pd/client/com_github_tikv_pd_client-v0.0.0-20231204034622-259435d93ae2.zip", + "http://ats.apps.svc/gomod/github.com/tikv/pd/client/com_github_tikv_pd_client-v0.0.0-20231204034622-259435d93ae2.zip", + "https://cache.hawkingrei.com/gomod/github.com/tikv/pd/client/com_github_tikv_pd_client-v0.0.0-20231204034622-259435d93ae2.zip", + "https://storage.googleapis.com/pingcapmirror/gomod/github.com/tikv/pd/client/com_github_tikv_pd_client-v0.0.0-20231204034622-259435d93ae2.zip", + ], +>>>>>>> 161107a7127 (executor: add ru details in slow log and INFORMATION_SCHEMA.SLOW_QUERY (#49067)) ) go_repository( name = "com_github_timakin_bodyclose", diff --git a/docs/design/2023-08-24-background-tasks-control.md b/docs/design/2023-08-24-background-tasks-control.md new file mode 100644 index 0000000000000..2bee2437c8703 --- /dev/null +++ b/docs/design/2023-08-24-background-tasks-control.md @@ -0,0 +1,233 @@ +# Background Tasks Control + +- Author: [glorv](https://github.com/glorv) +- Tracking Issue: + - + +## Motivation + +Resource control is used to solve some problems of resource usage under data consolidation. We can currently control some normal query tasks by means of RU limiter and scheduling. But it's not an adaptation for some background or bulk import/export tasks very well. + +Due to the implementation restriction, resource control can't be applied for some tasks such as BR and TiDB Lightning. And for some long-running tasks such as DDL or background auto-analyze, it's also hard to control the resource usage becase it's not easy to select a proper RU settrings for these kind of jobs. + +## Design Goals + +- Background tasks will try to use the system's (on the TiKV side) free resources and reduce the impact of background tasks on foreground tasks. +- Metering resource consumption by background tasks, and unifying the consumption unit to RU. +- The resource control of background tasks should be easy to integrate into different components and do not depend on the component's implementation detail +- Feedback to the upper layer(TiDB) to coordinate the concurrency of dispatched tasks + +## Definition of background task + +From our previous analysis, different background tasks can run in different execution routines (e.g Lightning and BR run in a separate tokio thread pool), and different phases of a single task can run in different routines (e.g. For fast DDL, the ingest phase is like Lightning and the merge phase runs as normal transactions). Furthermore, for some tasks, whether it is background depends on the user. For big queries on the normal reading path, if the QoS is important, we better treat it as foreground tasks; But if the minimum impact on other tasks is more important, then a background job is preferred. + +So, we define the "background task" as tasks that may consume a lot of resources and should run with the lowest priority. The "background task"s should run at a speed that their resource consumption should have little performance impact(better 0 in the ideal case) on all foreground tasks. And background tasks are allowed to consume extra free resources for better efficiency. The background tasks can run in a separate routine from the foreground ones, e.g. Lightning import and BR backup and resource, but they can also share the same routines with the foreground ones, e.g. the read phase of DDL and TTL table GC. + +## Background Resource Group Management + +The background tasks controls obeys the same rule as resource control, that is, each resource group can't configue it's background settings separately. + +We extend the resource group SQL interface to also support background tasks control: + +```sql +CREATE/ALTER RESOURCE GROUP rg1 + RU_PER_SEC = 100000 [ PRIORITY = (HIGH|MEDIUM|LOW) ] [BURSTABLE] + [ BACKGROUND = ( TASK_TYPES = "br,analyze" ) ]; +``` + +Currently, we only support set the task types that should be controlled in the background manner. We may extend this interface to include more setttings such as task priority in the future. + +If a resource group's background setting is not set, we automatically apply the `default` resource group's settings to this group. + +We also introduce a new session variable `tidb_request_source_type` to help tag some SQL based tasks. For example, TiDB Lightning and BR uses this variable to set the checksum requests as "lightning" or "br" and in TiDB Lightning's SQL mode, all the SQL statements is executed under the task name of "lightning". + +```sql +SET @@tidb_request_source_type = "lightning"; +``` + +The `tidb_request_source_type` variable is mostly used in the internal logic, but it's also possible to use this variable to allow other tasks be controlled as background jobs. For example, by setting this variable user can let `dumpling` or `TiSpark` tasks be control in the background mode. + +## Implementation: Resource Limiter on TiKV for Background Task + +Considering that the load on each tikv can be very different if there are hotspots, the available resources that can be used for background tasks can also be very different. So resource control at the global level is not fit here. Instead, each store should have its own local resource manager that only considers the resource usage of each resource group locally. + +In order to control the background tasks' resource usage, we plan to add an extra quota resource limit layer to the background jobs. That is, for foreground jobs, on the tikv side, the scheduler only controls the enqueue/deque order. Thus, running tasks is not under control, and tasks from different routine(thread pool) are not controlled either. But for the background jobs, the extra quota limiter layer can ensure the running tasks(in different pools) can be preempted early enough, so their resource usage can appeal to the setting: + +![background-control.png](imgs/background-control.png) + +- Control the resource usage of all background tasks by the Resource Limiter: The rate limit is dynamically adjusted to the value via the formula TiKVTotalRUCapcity - sum(RUCostRateOfForgroundTasks), with a fine-grained adjusting duration, we can ensure the foreground tasks' RU is always enough(or near the system's maximum if the foreground requirement reaches the maximum quota), so the background tasks' impact on foreground tasks should be very low; on the other hand, when the foreground resource consumption is low, the controller should increase the limit threshold, so background jobs can take advantage of the remaining resources. +- The local resource manager will statics RU consumption of background jobs via the Resource Limiter: We will do statistics and report the resource consumption to the global resource manager. In the first stage, we only do statistics globally but control it locally. +- Feedback mechanism: It's better to give feedback on how fast the limiter layer executes tasks on tikv to the upper layer like tidb, so that the upper layer task framework can adjust the number of tasks. + +### Resource Limiter Details + +In the first implementation, we manage CPU and IO with separate limiters, so workloads with different resource consumption can make full use of the resources. The implementation is much like the foreground rate limiter, but we don't distinguish between read and write io: + +```rust +// The public interface of resource control +struct ResourceGroupManager { + groups: Map, +} + +struct ResourceGroup { + // This is the original resource group setting in pb format. + settings: PbResourceGroupSetting, + // New field, we add a ResourceLimiter for each resource group that enables background task control + limiter: Option>, +} + +impl ResourceGroupManager { + // new interface to return a resource limiter. + // return none if the task_type should not be controlled in background mode + pub fn get_resource_limiter(&self, group_name: &str, task_type: &str) -> Option> { + ... + } +} + +// the real quota limiter +struct ResourceLimiter { + cpu_limiter: AsyncRateLimiter, + io_limiter: AsyncRateLimiter, +} + +impl ResourceLimiter { + // the return value is the wait duration. + pub fn consume(&self, cpu_time: Duration, io_bytes: u64) -> Duration { + let cpu_dur = self.cpu_limiter.consume(cpu_time.as_micros()); + let io_dur = self.io_limiter.consume(io_bytes); + cpu_dur.max(io_dur) + } +} +``` + +### Integrate Resource Limiter in TiKV + +Most of the tasks in tikv are running in an async manner, so our resource control will be defined as a future wrapper. The only thing other components need to do is to wrap their task with this future wrapper under the right resource group. The future wrapper takes the responsibility to track resource usage and calculate the time after each poll. This approach is exactly the same as the foreground rate limiter. + +Some basic data structure definitions: + +```rust +struct LimitedFuture { + //The acutal task + f: F, + limiter: Arc, +} + +impl LimitedFuture { + pub fn new(f: F, limiter: Arc) -> Self; +} + +impl Future for LimitedFuture { + fn poll() -> F::Output { + let start = Instant::now(); + // `get_thread_io_bytes_total` is implemented under the `file_system` componenet. + let io_bytes_before = get_thread_io_bytes_total(); + let res = self.f.pool(); + let dur = start.elapsed(); + let io_bytes_delta = get_thread_io_bytes_total() - io_bytes_before; + let wait_dur = self.limiter.consume(dur, io_bytes_delta); + AsyncSleep(wait_dur); + return res; + } +} +``` + +In our implementation, we integrate this rate limiter in the following components so it can cover most use cases: + +- Coprocessor. All SQL read requests are handled via the coprocessor component, this can ensure all read reuqests are covered. +- Txn Scheduler. The write requests in tikv are handled via multiple threadpools via a pipeline manner, to make things simple, we only apply the rate limiter in the first phase, that is, the txn scheduler worker pool. Though this is not ideal, the result is acceptable in our benchmark. We may enhance this mechanism in the future. +- Backup. We apply the rate limiter in backup kv scan and sst upload procedure. +- SST Service. Most sst relate operations are handled via the sst service. This ensure BR, TiDB Lightning and DDL(fast mode) can be controlled. + +#### Example: Integrate background control to BR + +In most cases, other components such as backup or lightning should only need to use the `ResourceGroupManager::get_resource_limiter` to get the related resource limiter and wrap the running tasks with `LimitedFuture`. + +Let's take the br backup as an example. The most resource consuming part of br backup is under the function `BackupRange::backup`. A simplified process is as follows: + +```rust +struct BackupRange { + start_key: Key, + end_key: Key, + region: RegionInfo, + ... +} + +impl BackupRange { + async fn backup(req) -> Result<..>; +} + +async fn backup(req) { + // backup all the region ranges + for r in req.ranges { + r.backup().await; + } +} +``` + +To integrate this with resource control, we just need to change the following: + +```rust +async fn backup(req) { + // `resource_manager` should be inited when build the br endpoint. + let resource_limiter = resource_manager.get_resource_group(req.get_resource_group_name()); + // backup all the region ranges + for r in req.ranges { + let task = r.backup(); + if let Some(ref limiter) = resource_limiter { + LimitedFuture(task, limiter.clone()).await; + } else { + task.await; + } + } +} +``` + +For the simple case, the above should be enough. + +While in some cases, a single task can run for a long time, in order to make the quota limit smoother, we need to insert some manual reschedule code to let the runtime suspend the current task and schedule other tasks. In general, big tasks are handled with a loop, so we can set up a timer and after each loop, we should check if the total running duration exceeds a specified threshold. If so, we just yield the current task. + +#### RU consumption statistics + +Through the consumption statistics inside Limiter, we can know the resource consumption of CPU and IO, and then we can use the RU formula to uniformly convert related resources into RU. The RU consumption cycle is reported to the Global Resource Manager. + +### Resource Limit Auto-adjusting + +In order the control the total resource usage of background tasks, we periodically adjust the CPU and IO quota limit based on the realtime foreground tasks' resource consumption. + +When there are multiple resource groups that have active background tasks, the quota limit for different groups is distributed based on their ru setting: + +```rust + +enum ResourceType { + Cpu, + Io, +} + +fn adjust_background_resource_quota(resurce_type: ResourceType) { + let mut available__quota = get_instance_total_resource(resurce_type) + for group in get_all_foreground_resource_groups(): + available_resource_quota -= group.get_resource_cost_rate() + + // reserve 10% free resource for potential foreground traffic increment. + available_resource_quota *= 0.9 + let bg_groups = get_all_background_resource_groups(); + bg_groups.sort_by(|g| estimated_ru_consuming_rate(g) / g.ru_per_sec()) + let total_background_quota = get_all_background_resource_groups().ru_per_sec().sum() + + for g in bg_groups { + let expect_quota = group.ru_per_sec() / total_background_quota * available_ru_quota; + let estimated_cost_rate = estimated_ru_consuming_rate(g); + if estimated_cost_rate <= expect_quota { + group.SetRULimit(estimated_cost_rate) + available_ru_quota -= estimated_cost_rate + } else { + group.SetRULimit(expect_quota) + available_ru_quota -= expect_quota + } + total_background_quota -= g.ru_per_sec() + } +} +``` + +Because we can't get the total IO bandwidth, we requires the user to explicitly provide it via TiKV's configuration. If the config is set to 0(default value), that means this IO bandwidth is unlimited, the IO rate limiter is disable in this case. diff --git a/executor/adapter.go b/executor/adapter.go index 42ce555c6b3c3..bfcafea4106bb 100644 --- a/executor/adapter.go +++ b/executor/adapter.go @@ -1517,6 +1517,11 @@ func (a *ExecStmt) LogSlowQuery(txnTS uint64, succ bool, hasMoreResults bool) { if tikvExecDetailRaw != nil { tikvExecDetail = *(tikvExecDetailRaw.(*util.ExecDetails)) } + ruDetails := util.NewRUDetails() + if ruDetailsVal := a.GoCtx.Value(util.RUDetailsCtxKey); ruDetailsVal != nil { + ruDetails = ruDetailsVal.(*util.RUDetails) + } + execDetail := stmtCtx.GetExecDetails() copTaskInfo := stmtCtx.CopTasksDetails() memMax := sessVars.MemTracker.MaxConsumed() @@ -1578,6 +1583,10 @@ func (a *ExecStmt) LogSlowQuery(txnTS uint64, succ bool, hasMoreResults bool) { UsedStats: stmtCtx.GetUsedStatsInfo(false), IsSyncStatsFailed: stmtCtx.IsSyncStatsFailed, Warnings: collectWarningsForSlowLog(stmtCtx), + ResourceGroupName: sessVars.ResourceGroupName, + RRU: ruDetails.RRU(), + WRU: ruDetails.WRU(), + WaitRUDuration: ruDetails.RUWaitDuration(), } failpoint.Inject("assertSyncStatsFailed", func(val failpoint.Value) { if val.(bool) { diff --git a/executor/builder.go b/executor/builder.go index 33c213bce8c9c..976a8add86e60 100644 --- a/executor/builder.go +++ b/executor/builder.go @@ -74,8 +74,11 @@ import ( "github.com/tikv/client-go/v2/tikv" "github.com/tikv/client-go/v2/txnkv" "github.com/tikv/client-go/v2/txnkv/txnsnapshot" +<<<<<<< HEAD:executor/builder.go clientutil "github.com/tikv/client-go/v2/util" "golang.org/x/exp/slices" +======= +>>>>>>> 161107a7127 (executor: add ru details in slow log and INFORMATION_SCHEMA.SLOW_QUERY (#49067)):pkg/executor/builder.go ) // executorBuilder builds an Executor from a Plan. @@ -1241,22 +1244,6 @@ func (b *executorBuilder) buildExplain(v *plannercore.Explain) Executor { if b.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl == nil { b.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl = execdetails.NewRuntimeStatsColl(nil) } - // If the resource group name is not empty, we could collect and display the RU - // runtime stats for analyze executor. - resourceGroupName := b.ctx.GetSessionVars().ResourceGroupName - // Try to register the RU runtime stats for analyze executor. - if store, ok := b.ctx.GetStore().(interface { - CreateRURuntimeStats(uint64) *clientutil.RURuntimeStats - }); len(resourceGroupName) > 0 && ok { - // StartTS will be used to identify this SQL, so that the runtime stats could - // aggregate the RU stats beneath the KV storage client. - startTS, err := b.getSnapshotTS() - if err != nil { - b.err = err - return nil - } - explainExec.ruRuntimeStats = store.CreateRURuntimeStats(startTS) - } explainExec.analyzeExec = b.build(v.TargetPlan) } return explainExec diff --git a/executor/explain.go b/executor/explain.go index ed55ebcc5c75b..914569e1dd2c3 100644 --- a/executor/explain.go +++ b/executor/explain.go @@ -42,12 +42,20 @@ import ( type ExplainExec struct { baseExecutor +<<<<<<< HEAD:executor/explain.go explain *core.Explain analyzeExec Executor executed bool ruRuntimeStats *clientutil.RURuntimeStats rows [][]string cursor int +======= + explain *core.Explain + analyzeExec exec.Executor + executed bool + rows [][]string + cursor int +>>>>>>> 161107a7127 (executor: add ru details in slow log and INFORMATION_SCHEMA.SLOW_QUERY (#49067)):pkg/executor/explain.go } // Open implements the Executor Open interface. @@ -134,8 +142,15 @@ func (e *ExplainExec) executeAnalyzeExec(ctx context.Context) (err error) { } // Register the RU runtime stats to the runtime stats collection after the analyze executor has been executed. if e.analyzeExec != nil && e.executed { +<<<<<<< HEAD:executor/explain.go if coll := e.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl; coll != nil { coll.RegisterStats(e.explain.TargetPlan.ID(), &ruRuntimeStats{e.ruRuntimeStats}) +======= + ruDetailsRaw := ctx.Value(clientutil.RUDetailsCtxKey) + if coll := e.Ctx().GetSessionVars().StmtCtx.RuntimeStatsColl; coll != nil && ruDetailsRaw != nil { + ruDetails := ruDetailsRaw.(*clientutil.RUDetails) + coll.RegisterStats(e.explain.TargetPlan.ID(), &ruRuntimeStats{ruDetails}) +>>>>>>> 161107a7127 (executor: add ru details in slow log and INFORMATION_SCHEMA.SLOW_QUERY (#49067)):pkg/executor/explain.go } } return err @@ -316,41 +331,33 @@ func getHeapProfile() (fileName string, err error) { return fileName, nil } -// ruRuntimeStats is a wrapper of clientutil.RURuntimeStats, +// ruRuntimeStats is a wrapper of clientutil.RUDetails, // which implements the RuntimeStats interface. type ruRuntimeStats struct { - *clientutil.RURuntimeStats + *clientutil.RUDetails } // String implements the RuntimeStats interface. func (e *ruRuntimeStats) String() string { - if e.RURuntimeStats != nil { - return fmt.Sprintf("RU:%f", e.RURuntimeStats.RRU()+e.RURuntimeStats.WRU()) + if e.RUDetails != nil { + return fmt.Sprintf("RU:%f", e.RRU()+e.WRU()) } return "" } // Clone implements the RuntimeStats interface. func (e *ruRuntimeStats) Clone() execdetails.RuntimeStats { - newRs := &ruRuntimeStats{} - if e.RURuntimeStats != nil { - newRs.RURuntimeStats = e.RURuntimeStats.Clone() - } - return newRs + return &ruRuntimeStats{RUDetails: e.RUDetails.Clone()} } // Merge implements the RuntimeStats interface. func (e *ruRuntimeStats) Merge(other execdetails.RuntimeStats) { - tmp, ok := other.(*ruRuntimeStats) - if !ok { - return - } - if tmp.RURuntimeStats != nil { - if e.RURuntimeStats == nil { - e.RURuntimeStats = tmp.RURuntimeStats.Clone() - return + if tmp, ok := other.(*ruRuntimeStats); ok { + if e.RUDetails != nil { + e.RUDetails.Merge(tmp.RUDetails) + } else { + e.RUDetails = tmp.RUDetails.Clone() } - e.RURuntimeStats.Merge(tmp.RURuntimeStats) } } diff --git a/executor/slow_query.go b/executor/slow_query.go index f7878839ae24b..485bbb6734509 100644 --- a/executor/slow_query.go +++ b/executor/slow_query.go @@ -749,7 +749,8 @@ func getColumnValueFactoryByName(sctx sessionctx.Context, colName string, column execdetails.CopTimeStr, execdetails.ProcessTimeStr, execdetails.WaitTimeStr, execdetails.BackoffTimeStr, execdetails.LockKeysTimeStr, variable.SlowLogCopProcAvg, variable.SlowLogCopProcP90, variable.SlowLogCopProcMax, variable.SlowLogCopWaitAvg, variable.SlowLogCopWaitP90, variable.SlowLogCopWaitMax, variable.SlowLogKVTotal, - variable.SlowLogPDTotal, variable.SlowLogBackoffTotal, variable.SlowLogWriteSQLRespTotal: + variable.SlowLogPDTotal, variable.SlowLogBackoffTotal, variable.SlowLogWriteSQLRespTotal, variable.SlowLogRRU, + variable.SlowLogWRU, variable.SlowLogWaitRUDuration: return func(row []types.Datum, value string, tz *time.Location, checker *slowLogChecker) (valid bool, err error) { v, err := strconv.ParseFloat(value, 64) if err != nil { @@ -760,7 +761,12 @@ func getColumnValueFactoryByName(sctx sessionctx.Context, colName string, column }, nil case variable.SlowLogUserStr, variable.SlowLogHostStr, execdetails.BackoffTypesStr, variable.SlowLogDBStr, variable.SlowLogIndexNamesStr, variable.SlowLogDigestStr, variable.SlowLogStatsInfoStr, variable.SlowLogCopProcAddr, variable.SlowLogCopWaitAddr, variable.SlowLogPlanDigest, +<<<<<<< HEAD:executor/slow_query.go variable.SlowLogPrevStmt, variable.SlowLogQuerySQLStr, variable.SlowLogWarnings: +======= + variable.SlowLogPrevStmt, variable.SlowLogQuerySQLStr, variable.SlowLogWarnings, variable.SlowLogSessAliasStr, + variable.SlowLogResourceGroup: +>>>>>>> 161107a7127 (executor: add ru details in slow log and INFORMATION_SCHEMA.SLOW_QUERY (#49067)):pkg/executor/slow_query.go return func(row []types.Datum, value string, tz *time.Location, checker *slowLogChecker) (valid bool, err error) { row[columnIdx] = types.NewStringDatum(value) return true, nil diff --git a/executor/slow_query_test.go b/executor/slow_query_test.go index 9289c04caffb0..789f0718ed91e 100644 --- a/executor/slow_query_test.go +++ b/executor/slow_query_test.go @@ -137,6 +137,10 @@ func TestParseSlowLogFile(t *testing.T) { # Plan_from_binding: true # Succ: false # IsExplicitTxn: true +# Resource_group: default +# Request_unit_read: 2.158 +# Request_unit_write: 2.123 +# Time_queued_by_rc: 0.05 # Plan_digest: 60e9378c746d9a2be1c791047e008967cf252eb6de9167ad3aa6098fa2d523f4 # Prev_stmt: update t set i = 1; use test; @@ -163,7 +167,7 @@ select * from t;` `0,0,0,0,0,0,0,0,0,0,0,0,,0,0,0,0,0,0,0.38,0.021,0,0,0,1,637,0,10,10,10,10,100,,,1,42a1c8aae6f133e934d4bf0147491709a8812ea05ff8819ec522780fe657b772,t1:1,t2:2,` + `0.1,0.2,0.03,127.0.0.1:20160,0.05,0.6,0.8,0.0.0.0:20160,70724,65536,0,0,0,0,0,,` + `Cop_backoff_regionMiss_total_times: 200 Cop_backoff_regionMiss_total_time: 0.2 Cop_backoff_regionMiss_max_time: 0.2 Cop_backoff_regionMiss_max_addr: 127.0.0.1 Cop_backoff_regionMiss_avg_time: 0.2 Cop_backoff_regionMiss_p90_time: 0.2 Cop_backoff_rpcPD_total_times: 200 Cop_backoff_rpcPD_total_time: 0.2 Cop_backoff_rpcPD_max_time: 0.2 Cop_backoff_rpcPD_max_addr: 127.0.0.1 Cop_backoff_rpcPD_avg_time: 0.2 Cop_backoff_rpcPD_p90_time: 0.2 Cop_backoff_rpcTiKV_total_times: 200 Cop_backoff_rpcTiKV_total_time: 0.2 Cop_backoff_rpcTiKV_max_time: 0.2 Cop_backoff_rpcTiKV_max_addr: 127.0.0.1 Cop_backoff_rpcTiKV_avg_time: 0.2 Cop_backoff_rpcTiKV_p90_time: 0.2,` + - `0,0,1,0,1,1,0,,60e9378c746d9a2be1c791047e008967cf252eb6de9167ad3aa6098fa2d523f4,` + + `0,0,1,0,1,1,0,default,2.158,2.123,0.05,,60e9378c746d9a2be1c791047e008967cf252eb6de9167ad3aa6098fa2d523f4,` + `,update t set i = 1;,select * from t;` require.Equal(t, expectRecordString, recordString) @@ -186,7 +190,7 @@ select * from t;` `0,0,0,0,0,0,0,0,0,0,0,0,,0,0,0,0,0,0,0.38,0.021,0,0,0,1,637,0,10,10,10,10,100,,,1,42a1c8aae6f133e934d4bf0147491709a8812ea05ff8819ec522780fe657b772,t1:1,t2:2,` + `0.1,0.2,0.03,127.0.0.1:20160,0.05,0.6,0.8,0.0.0.0:20160,70724,65536,0,0,0,0,0,,` + `Cop_backoff_regionMiss_total_times: 200 Cop_backoff_regionMiss_total_time: 0.2 Cop_backoff_regionMiss_max_time: 0.2 Cop_backoff_regionMiss_max_addr: 127.0.0.1 Cop_backoff_regionMiss_avg_time: 0.2 Cop_backoff_regionMiss_p90_time: 0.2 Cop_backoff_rpcPD_total_times: 200 Cop_backoff_rpcPD_total_time: 0.2 Cop_backoff_rpcPD_max_time: 0.2 Cop_backoff_rpcPD_max_addr: 127.0.0.1 Cop_backoff_rpcPD_avg_time: 0.2 Cop_backoff_rpcPD_p90_time: 0.2 Cop_backoff_rpcTiKV_total_times: 200 Cop_backoff_rpcTiKV_total_time: 0.2 Cop_backoff_rpcTiKV_max_time: 0.2 Cop_backoff_rpcTiKV_max_addr: 127.0.0.1 Cop_backoff_rpcTiKV_avg_time: 0.2 Cop_backoff_rpcTiKV_p90_time: 0.2,` + - `0,0,1,0,1,1,0,,60e9378c746d9a2be1c791047e008967cf252eb6de9167ad3aa6098fa2d523f4,` + + `0,0,1,0,1,1,0,default,2.158,2.123,0.05,,60e9378c746d9a2be1c791047e008967cf252eb6de9167ad3aa6098fa2d523f4,` + `,update t set i = 1;,select * from t;` require.Equal(t, expectRecordString, recordString) diff --git a/go.mod b/go.mod index 15ccd9fbd6d3f..1ba3fd77fe3af 100644 --- a/go.mod +++ b/go.mod @@ -94,9 +94,15 @@ require ( github.com/stretchr/testify v1.8.2 github.com/tdakkota/asciicheck v0.2.0 github.com/tiancaiamao/appdash v0.0.0-20181126055449-889f96f722a2 +<<<<<<< HEAD github.com/tikv/client-go/v2 v2.0.8-0.20231211100325-d44bb7f9cb9e github.com/tikv/pd/client v0.0.0-20231211083919-fe6fd1721aa6 github.com/timakin/bodyclose v0.0.0-20221125081123-e39cf3fc478e +======= + github.com/tikv/client-go/v2 v2.0.8-0.20231204074048-e80e9ca1fe66 + github.com/tikv/pd/client v0.0.0-20231204034622-259435d93ae2 + github.com/timakin/bodyclose v0.0.0-20230421092635-574207250966 +>>>>>>> 161107a7127 (executor: add ru details in slow log and INFORMATION_SCHEMA.SLOW_QUERY (#49067)) github.com/twmb/murmur3 v1.1.6 github.com/uber/jaeger-client-go v2.22.1+incompatible github.com/vbauerster/mpb/v7 v7.5.3 diff --git a/go.sum b/go.sum index ad5883c8e337c..955d2111ba13c 100644 --- a/go.sum +++ b/go.sum @@ -967,12 +967,21 @@ github.com/tiancaiamao/appdash v0.0.0-20181126055449-889f96f722a2 h1:mbAskLJ0oJf github.com/tiancaiamao/appdash v0.0.0-20181126055449-889f96f722a2/go.mod h1:2PfKggNGDuadAa0LElHrByyrz4JPZ9fFx6Gs7nx7ZZU= github.com/tiancaiamao/gp v0.0.0-20221230034425-4025bc8a4d4a h1:J/YdBZ46WKpXsxsW93SG+q0F8KI+yFrcIDT4c/RNoc4= github.com/tiancaiamao/gp v0.0.0-20221230034425-4025bc8a4d4a/go.mod h1:h4xBhSNtOeEosLJ4P7JyKXX7Cabg7AVkWCK5gV2vOrM= +<<<<<<< HEAD github.com/tikv/client-go/v2 v2.0.8-0.20231211100325-d44bb7f9cb9e h1:MuTNChL77sMvCvtOuGhuU/u1YBaSDPfqnO6roUCKwOA= github.com/tikv/client-go/v2 v2.0.8-0.20231211100325-d44bb7f9cb9e/go.mod h1:45NuHB8x+VAoztMIjF6hEgXvPQXhXWPfMxDg0N8CoRY= github.com/tikv/pd/client v0.0.0-20231211083919-fe6fd1721aa6 h1:BBwUZAaBl7DKdyaduOxXqias4xCtdDgIAGsDBuri3lg= github.com/tikv/pd/client v0.0.0-20231211083919-fe6fd1721aa6/go.mod h1:8XEP9aqUvvc9guWJZ7vo0av0H6QsJtjIrOmLAQlL2Jo= github.com/timakin/bodyclose v0.0.0-20221125081123-e39cf3fc478e h1:MV6KaVu/hzByHP0UvJ4HcMGE/8a6A4Rggc/0wx2AvJo= github.com/timakin/bodyclose v0.0.0-20221125081123-e39cf3fc478e/go.mod h1:27bSVNWSBOHm+qRp1T9qzaIpsWEP6TbUnei/43HK+PQ= +======= +github.com/tikv/client-go/v2 v2.0.8-0.20231204074048-e80e9ca1fe66 h1:+bCtNxUSYVmY/wmN8Zhf0UVl9mF+04/OjoseguM2aWY= +github.com/tikv/client-go/v2 v2.0.8-0.20231204074048-e80e9ca1fe66/go.mod h1:IE0/o4zWJW5Wpvp15CK2jpbu49DSLLSJBMNmwjv6I6o= +github.com/tikv/pd/client v0.0.0-20231204034622-259435d93ae2 h1:7fnKwFC9pgiOolvnUnquEAb60liIpna+0hFRkopaOSg= +github.com/tikv/pd/client v0.0.0-20231204034622-259435d93ae2/go.mod h1:cd6zBqRM9aogxf26K8NnFRPVtq9BnRE59tKEpX8IaWQ= +github.com/timakin/bodyclose v0.0.0-20230421092635-574207250966 h1:quvGphlmUVU+nhpFa4gg4yJyTRJ13reZMDHrKwYw53M= +github.com/timakin/bodyclose v0.0.0-20230421092635-574207250966/go.mod h1:27bSVNWSBOHm+qRp1T9qzaIpsWEP6TbUnei/43HK+PQ= +>>>>>>> 161107a7127 (executor: add ru details in slow log and INFORMATION_SCHEMA.SLOW_QUERY (#49067)) github.com/tklauser/go-sysconf v0.3.9/go.mod h1:11DU/5sG7UexIrp/O6g35hrWzu0JxlwQ3LSFUzyeuhs= github.com/tklauser/go-sysconf v0.3.11 h1:89WgdJhk5SNwJfu+GKyYveZ4IaJ7xAkecBo+KdJV0CM= github.com/tklauser/go-sysconf v0.3.11/go.mod h1:GqXfhXY3kiPa0nAXPDIQIWzJbMCB7AmcWpGR8lSZfqI= diff --git a/infoschema/tables.go b/infoschema/tables.go index 9d520a5737ef2..9153b001d2514 100644 --- a/infoschema/tables.go +++ b/infoschema/tables.go @@ -913,6 +913,10 @@ var slowQueryCols = []columnInfo{ {name: variable.SlowLogPlanFromCache, tp: mysql.TypeTiny, size: 1}, {name: variable.SlowLogPlanFromBinding, tp: mysql.TypeTiny, size: 1}, {name: variable.SlowLogHasMoreResults, tp: mysql.TypeTiny, size: 1}, + {name: variable.SlowLogResourceGroup, tp: mysql.TypeVarchar, size: 64}, + {name: variable.SlowLogRRU, tp: mysql.TypeDouble, size: 22}, + {name: variable.SlowLogWRU, tp: mysql.TypeDouble, size: 22}, + {name: variable.SlowLogWaitRUDuration, tp: mysql.TypeDouble, size: 22}, {name: variable.SlowLogPlan, tp: mysql.TypeLongBlob, size: types.UnspecifiedLength}, {name: variable.SlowLogPlanDigest, tp: mysql.TypeVarchar, size: 128}, {name: variable.SlowLogBinaryPlan, tp: mysql.TypeLongBlob, size: types.UnspecifiedLength}, diff --git a/infoschema/tables_test.go b/infoschema/tables_test.go index 50d38925d612e..2646625e1f87f 100644 --- a/infoschema/tables_test.go +++ b/infoschema/tables_test.go @@ -611,11 +611,16 @@ func TestSlowQuery(t *testing.T) { "1", "0", "0", + "default", + "0", + "0", + "0", "abcd", "60e9378c746d9a2be1c791047e008967cf252eb6de9167ad3aa6098fa2d523f4", "", "update t set i = 2;", - "select * from t_slim;"}, + "select * from t_slim;", + }, {"2021-09-08 14:39:54.506967", "427578666238083075", "root", @@ -685,6 +690,10 @@ func TestSlowQuery(t *testing.T) { "0", "0", "0", + "rg1", + "96.66703066666668", + "3182.424414062492", + "0", "", "", "", diff --git a/pkg/infoschema/internal/testkit.go b/pkg/infoschema/internal/testkit.go new file mode 100644 index 0000000000000..50eb91f7ac45a --- /dev/null +++ b/pkg/infoschema/internal/testkit.go @@ -0,0 +1,93 @@ +// Copyright 2023 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package internal + +import ( + "os" + "testing" + + "github.com/stretchr/testify/require" +) + +// PrepareSlowLogfile prepares a slow log file for test. +func PrepareSlowLogfile(t *testing.T, slowLogFileName string) { + f, err := os.OpenFile(slowLogFileName, os.O_CREATE|os.O_WRONLY, 0600) + require.NoError(t, err) + _, err = f.WriteString(`# Time: 2019-02-12T19:33:56.571953+08:00 +# Txn_start_ts: 406315658548871171 +# User@Host: root[root] @ localhost [127.0.0.1] +# Conn_ID: 6 +# Exec_retry_time: 0.12 Exec_retry_count: 57 +# Query_time: 4.895492 +# Parse_time: 0.4 +# Compile_time: 0.2 +# Rewrite_time: 0.000000003 Preproc_subqueries: 2 Preproc_subqueries_time: 0.000000002 +# Optimize_time: 0.00000001 +# Wait_TS: 0.000000003 +# LockKeys_time: 1.71 Request_count: 1 Prewrite_time: 0.19 Wait_prewrite_binlog_time: 0.21 Commit_time: 0.01 Commit_backoff_time: 0.18 Backoff_types: [txnLock] Resolve_lock_time: 0.03 Write_keys: 15 Write_size: 480 Prewrite_region: 1 Txn_retry: 8 +# Cop_time: 0.3824278 Process_time: 0.161 Request_count: 1 Total_keys: 100001 Process_keys: 100000 +# Rocksdb_delete_skipped_count: 100 Rocksdb_key_skipped_count: 10 Rocksdb_block_cache_hit_count: 10 Rocksdb_block_read_count: 10 Rocksdb_block_read_byte: 100 +# Wait_time: 0.101 +# Backoff_time: 0.092 +# DB: test +# Is_internal: false +# Digest: 42a1c8aae6f133e934d4bf0147491709a8812ea05ff8819ec522780fe657b772 +# Stats: t1:1,t2:2 +# Cop_proc_avg: 0.1 Cop_proc_p90: 0.2 Cop_proc_max: 0.03 Cop_proc_addr: 127.0.0.1:20160 +# Cop_wait_avg: 0.05 Cop_wait_p90: 0.6 Cop_wait_max: 0.8 Cop_wait_addr: 0.0.0.0:20160 +# Mem_max: 70724 +# Disk_max: 65536 +# Plan_from_cache: true +# Result_rows: 10 +# Succ: true +# Plan: abcd +# Plan_digest: 60e9378c746d9a2be1c791047e008967cf252eb6de9167ad3aa6098fa2d523f4 +# Prev_stmt: update t set i = 2; +# Resource_group: default +select * from t_slim; +# Time: 2021-09-08T14:39:54.506967433+08:00 +# Txn_start_ts: 427578666238083075 +# User@Host: root[root] @ 172.16.0.0 [172.16.0.0] +# Conn_ID: 40507 +# Session_alias: alias123 +# Query_time: 25.571605962 +# Parse_time: 0.002923536 +# Compile_time: 0.006800973 +# Rewrite_time: 0.002100764 +# Optimize_time: 0 +# Wait_TS: 0.000015801 +# Prewrite_time: 25.542014572 Commit_time: 0.002294647 Get_commit_ts_time: 0.000605473 Commit_backoff_time: 12.483 Backoff_types: [tikvRPC regionMiss tikvRPC regionMiss regionMiss] Write_keys: 624 Write_size: 172064 Prewrite_region: 60 +# DB: rtdb +# Is_internal: false +# Digest: 124acb3a0bec903176baca5f9da00b4e7512a41c93b417923f26502edeb324cc +# Num_cop_tasks: 0 +# Mem_max: 856544 +# Prepared: false +# Plan_from_cache: false +# Plan_from_binding: false +# Has_more_results: false +# KV_total: 86.635049185 +# PD_total: 0.015486658 +# Backoff_total: 100.054 +# Write_sql_response_total: 0 +# Succ: true +# Resource_group: rg1 +# Request_unit_read: 96.66703066666668 +# Request_unit_write: 3182.424414062492 +INSERT INTO ...; +`) + require.NoError(t, f.Close()) + require.NoError(t, err) +} diff --git a/server/conn.go b/server/conn.go index b3972097b8239..0e7146c57bf70 100644 --- a/server/conn.go +++ b/server/conn.go @@ -2102,6 +2102,7 @@ func (cc *clientConn) prefetchPointPlanKeys(ctx context.Context, stmts []ast.Stm func (cc *clientConn) handleStmt(ctx context.Context, stmt ast.StmtNode, warns []stmtctx.SQLWarn, lastStmt bool) (bool, error) { ctx = context.WithValue(ctx, execdetails.StmtExecDetailKey, &execdetails.StmtExecDetails{}) ctx = context.WithValue(ctx, util.ExecDetailsKey, &util.ExecDetails{}) + ctx = context.WithValue(ctx, util.RUDetailsCtxKey, util.NewRUDetails()) reg := trace.StartRegion(ctx, "ExecuteStmt") cc.audit(plugin.Starting) rs, err := cc.ctx.ExecuteStmt(ctx, stmt) diff --git a/server/conn_stmt.go b/server/conn_stmt.go index 945e46b347698..e88437dbf07dc 100644 --- a/server/conn_stmt.go +++ b/server/conn_stmt.go @@ -232,6 +232,7 @@ func (cc *clientConn) handleStmtExecute(ctx context.Context, data []byte) (err e func (cc *clientConn) executePlanCacheStmt(ctx context.Context, stmt interface{}, args []expression.Expression, useCursor bool) (err error) { ctx = context.WithValue(ctx, execdetails.StmtExecDetailKey, &execdetails.StmtExecDetails{}) ctx = context.WithValue(ctx, util.ExecDetailsKey, &util.ExecDetails{}) + ctx = context.WithValue(ctx, util.RUDetailsCtxKey, util.NewRUDetails()) retryable, err := cc.executePreparedStmtAndWriteResult(ctx, stmt.(PreparedStatement), args, useCursor) if err != nil { action, txnErr := sessiontxn.GetTxnManager(&cc.ctx).OnStmtErrorForNextAction(ctx, sessiontxn.StmtErrAfterQuery, err) diff --git a/session/session.go b/session/session.go index 003467fec3c3a..cd225d5d01126 100644 --- a/session/session.go +++ b/session/session.go @@ -1903,6 +1903,7 @@ func (s *session) ExecRestrictedStmt(ctx context.Context, stmtNode ast.StmtNode, metrics.SessionRestrictedSQLCounter.Inc() ctx = context.WithValue(ctx, execdetails.StmtExecDetailKey, &execdetails.StmtExecDetails{}) ctx = context.WithValue(ctx, tikvutil.ExecDetailsKey, &tikvutil.ExecDetails{}) + ctx = context.WithValue(ctx, tikvutil.RUDetailsCtxKey, tikvutil.NewRUDetails()) rs, err := se.ExecuteStmt(ctx, stmtNode) if err != nil { se.sessionVars.StmtCtx.AppendError(err) diff --git a/sessionctx/variable/session.go b/sessionctx/variable/session.go index 2be92ac60ac07..f00b7a5abccaa 100644 --- a/sessionctx/variable/session.go +++ b/sessionctx/variable/session.go @@ -3010,6 +3010,14 @@ const ( SlowLogIsWriteCacheTable = "IsWriteCacheTable" // SlowLogIsSyncStatsFailed is used to indicate whether any failure happen during sync stats SlowLogIsSyncStatsFailed = "IsSyncStatsFailed" + // SlowLogResourceGroup is the resource group name that the current session bind. + SlowLogResourceGroup = "Resource_group" + // SlowLogRRU is the read request_unit(RU) cost + SlowLogRRU = "Request_unit_read" + // SlowLogWRU is the write request_unit(RU) cost + SlowLogWRU = "Request_unit_write" + // SlowLogWaitRUDuration is the total duration for kv requests to wait available request-units. + SlowLogWaitRUDuration = "Time_queued_by_rc" ) // GenerateBinaryPlan decides whether we should record binary plan in slow log and stmt summary. @@ -3065,6 +3073,10 @@ type SlowQueryLogItems struct { UsedStats map[int64]*stmtctx.UsedStatsInfoForTable IsSyncStatsFailed bool Warnings []JSONSQLWarnForSlowLog + ResourceGroupName string + RRU float64 + WRU float64 + WaitRUDuration time.Duration } // SlowLogFormat uses for formatting slow log. @@ -3259,6 +3271,20 @@ func (s *SessionVars) SlowLogFormat(logItems *SlowQueryLogItems) string { if len(logItems.BinaryPlan) != 0 { writeSlowLogItem(&buf, SlowLogBinaryPlan, logItems.BinaryPlan) } + + if logItems.ResourceGroupName != "" { + writeSlowLogItem(&buf, SlowLogResourceGroup, logItems.ResourceGroupName) + } + if logItems.RRU > 0.0 { + writeSlowLogItem(&buf, SlowLogRRU, strconv.FormatFloat(logItems.RRU, 'f', -1, 64)) + } + if logItems.WRU > 0.0 { + writeSlowLogItem(&buf, SlowLogWRU, strconv.FormatFloat(logItems.WRU, 'f', -1, 64)) + } + if logItems.WaitRUDuration > time.Duration(0) { + writeSlowLogItem(&buf, SlowLogWaitRUDuration, strconv.FormatFloat(logItems.WaitRUDuration.Seconds(), 'f', -1, 64)) + } + if logItems.PrevStmt != "" { writeSlowLogItem(&buf, SlowLogPrevStmt, logItems.PrevStmt) } diff --git a/sessionctx/variable/session_test.go b/sessionctx/variable/session_test.go index 39d647b942bf7..f83de90f10a6e 100644 --- a/sessionctx/variable/session_test.go +++ b/sessionctx/variable/session_test.go @@ -256,7 +256,11 @@ func TestSlowLogFormat(t *testing.T) { # Succ: true # IsExplicitTxn: true # IsSyncStatsFailed: false -# IsWriteCacheTable: true` +# IsWriteCacheTable: true +# Resource_group: rg1 +# Request_unit_read: 50 +# Request_unit_write: 100.56 +# Time_queued_by_rc: 0.134` sql := "select * from t;" _, digest := parser.NormalizeDigest(sql) logItems := &variable.SlowQueryLogItems{ @@ -295,6 +299,10 @@ func TestSlowLogFormat(t *testing.T) { IsExplicitTxn: true, IsWriteCacheTable: true, UsedStats: map[int64]*stmtctx.UsedStatsInfoForTable{1: usedStats1, 2: usedStats2}, + ResourceGroupName: "rg1", + RRU: 50.0, + WRU: 100.56, + WaitRUDuration: 134 * time.Millisecond, } logString := seVar.SlowLogFormat(logItems) require.Equal(t, resultFields+"\n"+sql, logString)