Skip to content

Commit

Permalink
runaway: only check statements with a non-empty plan digest (#57582)
Browse files Browse the repository at this point in the history
close #56897
  • Loading branch information
nolouch authored Nov 21, 2024
1 parent 8d8cfbb commit 9ef5b59
Show file tree
Hide file tree
Showing 3 changed files with 55 additions and 32 deletions.
19 changes: 19 additions & 0 deletions pkg/executor/internal/querywatch/query_watch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,3 +168,22 @@ func TestQueryWatch(t *testing.T) {
time.Sleep(1 * time.Second)
tk.MustGetErrCode("select * from test.t1", mysql.ErrResourceGroupQueryRunawayQuarantine)
}

func TestQueryWatchIssue56897(t *testing.T) {
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/resourcegroup/runaway/FastRunawayGC", `return(true)`))
defer func() {
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/resourcegroup/runaway/FastRunawayGC"))
}()
store, dom := testkit.CreateMockStoreAndDomain(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
require.Eventually(t, func() bool {
return dom.RunawayManager().IsSyncerInitialized()
}, 20*time.Second, 300*time.Millisecond)
tk.MustQuery("QUERY WATCH ADD ACTION KILL SQL TEXT SIMILAR TO 'use test';").Check((testkit.Rows("1")))
time.Sleep(1 * time.Second)
_, err := tk.Exec("use test")
require.Nil(t, err)
_, err = tk.Exec("use mysql")
require.Nil(t, err)
}
66 changes: 35 additions & 31 deletions pkg/resourcegroup/runaway/checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,10 +49,10 @@ type Checker struct {
// From the group runaway settings, which will be applied when a query lacks a specified watch rule.
settings *rmpb.RunawaySettings

// markedByRule is set to true when the query matches the group runaway settings.
markedByRule atomic.Bool
// markedByWatch is set to true when the query matches the specified watch rules.
markedByWatch bool
// markedByIdentifyInRunawaySettings is set to true when the query matches the group runaway settings.
markedByIdentifyInRunawaySettings atomic.Bool
// markedByQueryWatchRule is set to true when the query matches the specified watch rules.
markedByQueryWatchRule bool
// watchAction is the specified watch action for the runaway query.
// If it's not given, the action defined in `settings` will be used.
watchAction rmpb.RunawayAction
Expand All @@ -65,14 +65,14 @@ func NewChecker(
originalSQL, sqlDigest, planDigest string, startTime time.Time,
) *Checker {
c := &Checker{
manager: manager,
resourceGroupName: resourceGroupName,
originalSQL: originalSQL,
sqlDigest: sqlDigest,
planDigest: planDigest,
settings: settings,
markedByRule: atomic.Bool{},
markedByWatch: false,
manager: manager,
resourceGroupName: resourceGroupName,
originalSQL: originalSQL,
sqlDigest: sqlDigest,
planDigest: planDigest,
settings: settings,
markedByIdentifyInRunawaySettings: atomic.Bool{},
markedByQueryWatchRule: false,
}
if settings != nil {
// avoid setting deadline if the threshold is 0
Expand All @@ -92,6 +92,10 @@ func (rm *Manager) DeriveChecker(resourceGroupName, originalSQL, sqlDigest, plan
logutil.BgLogger().Warn("cannot setup up runaway checker", zap.Error(err))
return nil
}
// Only check the normal statement.
if len(planDigest) == 0 {
return nil
}
rm.ActiveLock.RLock()
defer rm.ActiveLock.RUnlock()
if group.RunawaySettings == nil && rm.ActiveGroup[resourceGroupName] == 0 {
Expand Down Expand Up @@ -129,7 +133,7 @@ func (r *Checker) BeforeExecutor() (string, error) {
switchGroupName = r.settings.SwitchGroupName
}
// Mark it if this is the first time being watched.
r.markRunawayByWatch(action, switchGroupName, exceedCause)
r.markRunawayByQueryWatchRule(action, switchGroupName, exceedCause)
// Take action if needed.
switch action {
case rmpb.RunawayAction_Kill:
Expand Down Expand Up @@ -157,16 +161,16 @@ func (r *Checker) BeforeCopRequest(req *tikvrpc.Request) error {
return nil
}
// If the group settings are not available, and it's not marked by watch, skip this part.
if r.settings == nil && !r.markedByWatch {
if r.settings == nil && !r.markedByQueryWatchRule {
return nil
}
// If it's marked by watch and the action is cooldown, override the priority,
if r.markedByWatch && r.watchAction == rmpb.RunawayAction_CoolDown {
if r.markedByQueryWatchRule && r.watchAction == rmpb.RunawayAction_CoolDown {
req.ResourceControlContext.OverridePriority = 1 // set priority to lowest
}
// If group settings are available and the query is not marked by a rule,
// verify if it matches any rules in the settings.
if r.settings != nil && !r.markedByRule.Load() {
if r.settings != nil && !r.markedByIdentifyInRunawaySettings.Load() {
now := time.Now()
// only check time and need to ensure deadline existed.
exceedCause := r.exceedsThresholds(now, nil, 0)
Expand All @@ -181,7 +185,7 @@ func (r *Checker) BeforeCopRequest(req *tikvrpc.Request) error {
return nil
}
// execution time exceeds the threshold, mark the query as runaway
r.markRunawayByIdentify(&now, exceedCause)
r.markRunawayByIdentifyInRunawaySettings(&now, exceedCause)
// Take action if needed.
switch r.settings.Action {
case rmpb.RunawayAction_Kill:
Expand All @@ -205,10 +209,10 @@ func (r *Checker) CheckAction() rmpb.RunawayAction {
if r == nil {
return rmpb.RunawayAction_NoneAction
}
if r.markedByWatch {
if r.markedByQueryWatchRule {
return r.watchAction
}
if r.markedByRule.Load() {
if r.markedByIdentifyInRunawaySettings.Load() {
return r.settings.Action
}
return rmpb.RunawayAction_NoneAction
Expand All @@ -217,17 +221,17 @@ func (r *Checker) CheckAction() rmpb.RunawayAction {
// CheckRuleKillAction checks whether the query should be killed according to the group settings.
func (r *Checker) CheckRuleKillAction() (string, bool) {
// If the group settings are not available, and it's not marked by watch, skip this part.
if r == nil || r.settings == nil && !r.markedByWatch {
if r == nil || r.settings == nil && !r.markedByQueryWatchRule {
return "", false
}
// If the group settings are available, and it's not marked by rule, check the execution time.
if r.settings != nil && !r.markedByRule.Load() {
if r.settings != nil && !r.markedByIdentifyInRunawaySettings.Load() {
now := time.Now()
exceedCause := r.exceedsThresholds(now, nil, 0)
if exceedCause == "" {
return "", false
}
r.markRunawayByIdentify(&now, exceedCause)
r.markRunawayByIdentifyInRunawaySettings(&now, exceedCause)
return exceedCause, r.settings.Action == rmpb.RunawayAction_Kill
}
return "", false
Expand All @@ -243,19 +247,19 @@ func (r *Checker) markQuarantine(now *time.Time, exceedCause string) {
r.settings.Action, r.settings.SwitchGroupName, ttl, now, exceedCause)
}

func (r *Checker) markRunawayByIdentify(now *time.Time, exceedCause string) bool {
swapped := r.markedByRule.CompareAndSwap(false, true)
func (r *Checker) markRunawayByIdentifyInRunawaySettings(now *time.Time, exceedCause string) bool {
swapped := r.markedByIdentifyInRunawaySettings.CompareAndSwap(false, true)
if swapped {
r.markRunaway("identify", r.settings.Action, r.settings.SwitchGroupName, now, exceedCause)
if !r.markedByWatch {
if !r.markedByQueryWatchRule {
r.markQuarantine(now, exceedCause)
}
}
return swapped
}

func (r *Checker) markRunawayByWatch(action rmpb.RunawayAction, switchGroupName, exceedCause string) {
r.markedByWatch = true
func (r *Checker) markRunawayByQueryWatchRule(action rmpb.RunawayAction, switchGroupName, exceedCause string) {
r.markedByQueryWatchRule = true
r.watchAction = action
now := time.Now()
r.markRunaway("watch", action, switchGroupName, &now, exceedCause)
Expand Down Expand Up @@ -326,15 +330,15 @@ func (r *Checker) CheckThresholds(ruDetail *util.RUDetails, processKeys int64, e
// add the processed keys to the total processed keys.
r.totalProcessedKeys += processKeys
exceedCause := r.exceedsThresholds(checkTime, ruDetail, r.totalProcessedKeys)
if !r.markedByRule.Load() {
if exceedCause != "" && r.markRunawayByIdentify(&now, exceedCause) {
if r.markRunawayByIdentify(&now, exceedCause) {
if !r.markedByIdentifyInRunawaySettings.Load() {
if exceedCause != "" && r.markRunawayByIdentifyInRunawaySettings(&now, exceedCause) {
if r.markRunawayByIdentifyInRunawaySettings(&now, exceedCause) {
return exeerrors.ErrResourceGroupQueryRunawayInterrupted.FastGenByArgs(exceedCause)
}
}
}
// Due to concurrency, check again.
if r.markedByRule.Load() {
if r.markedByIdentifyInRunawaySettings.Load() {
return exeerrors.ErrResourceGroupQueryRunawayInterrupted.FastGenByArgs(exceedCause)
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/store/copr/copr_test/coprocessor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -265,7 +265,7 @@ func TestBuildCopIteratorWithRunawayChecker(t *testing.T) {

sql := "select * from t"
group1 := "rg1"
checker := manager.DeriveChecker(group1, sql, "", "", time.Now())
checker := manager.DeriveChecker(group1, sql, "test", "test", time.Now())
manager.AddWatch(&runaway.QuarantineRecord{
ID: 1,
ResourceGroupName: group1,
Expand Down

0 comments on commit 9ef5b59

Please sign in to comment.