Skip to content

Commit

Permalink
re-order the initialization of resource group runaway checker
Browse files Browse the repository at this point in the history
Signed-off-by: Yang Keao <[email protected]>
  • Loading branch information
YangKeao committed Mar 14, 2024
1 parent ab4b5cd commit 79669c2
Show file tree
Hide file tree
Showing 6 changed files with 18 additions and 16 deletions.
5 changes: 2 additions & 3 deletions pkg/distsql/context/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,9 +79,8 @@ type DistSQLContext struct {
StoreBatchSize int
ResourceGroupName string
LoadBasedReplicaReadThreshold time.Duration
// TODO: remove this double pointer. This is a double-pointer because the `RunawayChecker` in stmtctx will be modified after building executors
RunawayChecker **resourcegroup.RunawayChecker
TiKVClientReadTimeout uint64
RunawayChecker *resourcegroup.RunawayChecker
TiKVClientReadTimeout uint64

ExecDetails *execdetails.SyncExecDetails
}
4 changes: 1 addition & 3 deletions pkg/distsql/request_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -310,9 +310,7 @@ func (builder *RequestBuilder) SetFromSessionVars(dctx *distsqlctx.DistSQLContex
builder.StoreBatchSize = dctx.StoreBatchSize
builder.Request.ResourceGroupName = dctx.ResourceGroupName
builder.Request.StoreBusyThreshold = dctx.LoadBasedReplicaReadThreshold
if dctx.RunawayChecker != nil {
builder.Request.RunawayChecker = *dctx.RunawayChecker
}
builder.Request.RunawayChecker = dctx.RunawayChecker
builder.Request.TiKVClientReadTimeout = dctx.TiKVClientReadTimeout
return builder
}
Expand Down
13 changes: 7 additions & 6 deletions pkg/executor/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -504,12 +504,6 @@ func (a *ExecStmt) Exec(ctx context.Context) (_ sqlexec.RecordSet, err error) {
sctx.GetSessionVars().MemTracker.SetBytesLimit(sctx.GetSessionVars().StmtCtx.MemQuotaQuery)
}

e, err := a.buildExecutor()
if err != nil {
return nil, err
}
// ExecuteExec will rewrite `a.Plan`, so set plan label should be executed after `a.buildExecutor`.
ctx = a.observeStmtBeginForTopSQL(ctx)
if variable.EnableResourceControl.Load() && domain.GetDomain(sctx).RunawayManager() != nil {
stmtCtx := sctx.GetSessionVars().StmtCtx
_, planDigest := GetPlanDigest(stmtCtx)
Expand All @@ -520,6 +514,13 @@ func (a *ExecStmt) Exec(ctx context.Context) (_ sqlexec.RecordSet, err error) {
}
}

e, err := a.buildExecutor()
if err != nil {
return nil, err
}
// ExecuteExec will rewrite `a.Plan`, so set plan label should be executed after `a.buildExecutor`.
ctx = a.observeStmtBeginForTopSQL(ctx)

cmd32 := atomic.LoadUint32(&sctx.GetSessionVars().CommandValue)
cmd := byte(cmd32)
var pi processinfoSetter
Expand Down
4 changes: 2 additions & 2 deletions pkg/session/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -2626,7 +2626,7 @@ func (s *session) GetDistSQLCtx() *distsqlctx.DistSQLContext {
vars := s.GetSessionVars()
sc := vars.StmtCtx

dctx := sc.GetOrInitDistSQLFromCache(func() any {
dctx := sc.GetOrInitDistSQLFromCache(func() *distsqlctx.DistSQLContext {
return &distsqlctx.DistSQLContext{
AppendWarning: sc.AppendWarning,
InRestrictedSQL: sc.InRestrictedSQL,
Expand Down Expand Up @@ -2668,7 +2668,7 @@ func (s *session) GetDistSQLCtx() *distsqlctx.DistSQLContext {
StoreBatchSize: vars.StoreBatchSize,
ResourceGroupName: sc.ResourceGroupName,
LoadBasedReplicaReadThreshold: vars.LoadBasedReplicaReadThreshold,
RunawayChecker: &sc.RunawayChecker,
RunawayChecker: sc.RunawayChecker,
TiKVClientReadTimeout: vars.GetTiKVClientReadTimeout(),

ExecDetails: &sc.SyncExecDetails,
Expand Down
1 change: 1 addition & 0 deletions pkg/sessionctx/stmtctx/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ go_library(
importpath = "github.com/pingcap/tidb/pkg/sessionctx/stmtctx",
visibility = ["//visibility:public"],
deps = [
"//pkg/distsql/context",
"//pkg/domain/resourcegroup",
"//pkg/errctx",
"//pkg/parser",
Expand Down
7 changes: 5 additions & 2 deletions pkg/sessionctx/stmtctx/stmtctx.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"time"

"github.com/pingcap/errors"
distsqlctx "github.com/pingcap/tidb/pkg/distsql/context"
"github.com/pingcap/tidb/pkg/domain/resourcegroup"
"github.com/pingcap/tidb/pkg/errctx"
"github.com/pingcap/tidb/pkg/parser"
Expand Down Expand Up @@ -167,11 +168,13 @@ type StatementContext struct {
// errCtx is used to indicate how to handle the errors
errCtx errctx.Context

DistSQLContext distsqlctx.DistSQLContext

// distSQLCtxCache is used to persist all variables and tools needed by the `distsql`
// this cache is set on `StatementContext` because it has to be updated after each statement.
distSQLCtxCache struct {
init sync.Once
dctx any
dctx *distsqlctx.DistSQLContext
}

// Set the following variables before execution
Expand Down Expand Up @@ -1246,7 +1249,7 @@ func (sc *StatementContext) TypeCtxOrDefault() types.Context {

// GetOrInitDistSQLFromCache returns the `DistSQLContext` inside cache. If it didn't exist, return a new one created by
// the `create` function. It uses the `any` to avoid cycle dependency.
func (sc *StatementContext) GetOrInitDistSQLFromCache(create func() any) any {
func (sc *StatementContext) GetOrInitDistSQLFromCache(create func() *distsqlctx.DistSQLContext) any {
sc.distSQLCtxCache.init.Do(func() {
sc.distSQLCtxCache.dctx = create()
})
Expand Down

0 comments on commit 79669c2

Please sign in to comment.