Skip to content

Commit

Permalink
copr: fix the issue about runaway index lookup query blocking (#56920)
Browse files Browse the repository at this point in the history
close #56916
  • Loading branch information
nolouch authored Oct 29, 2024
1 parent cc37099 commit 11f86a8
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 7 deletions.
19 changes: 19 additions & 0 deletions pkg/executor/executor_failpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -360,6 +360,25 @@ func TestCoprocessorOOMTiCase(t *testing.T) {
*/
}

func TestCoprocessorBlockIssues56916(t *testing.T) {
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/store/copr/issue56916", `return`))
defer func() { require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/store/copr/issue56916")) }()

tk.MustExec("use test")
tk.MustExec("drop table if exists t_cooldown")
tk.MustExec("create table t_cooldown (id int auto_increment, k int, unique index(id));")
tk.MustExec("insert into t_cooldown (k) values (1);")
tk.MustExec("insert into t_cooldown (k) select id from t_cooldown;")
tk.MustExec("insert into t_cooldown (k) select id from t_cooldown;")
tk.MustExec("insert into t_cooldown (k) select id from t_cooldown;")
tk.MustExec("insert into t_cooldown (k) select id from t_cooldown;")
tk.MustExec("split table t_cooldown by (1),(2),(3),(4),(5),(6),(7),(8),(9),(10);")
tk.MustQuery("select * from t_cooldown use index(id) where id > 0 and id < 10").CheckContain("1")
tk.MustQuery("select * from t_cooldown use index(id) where id between 1 and 10 or id between 124660 and 132790;").CheckContain("1")
}

func TestIssue21441(t *testing.T) {
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/executor/union/issue21441", `return`))
defer func() {
Expand Down
23 changes: 16 additions & 7 deletions pkg/store/copr/coprocessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,12 @@ func (c *CopClient) BuildCopIterator(ctx context.Context, req *kv.Request, vars
it.concurrency = 1
}

// issue56916 is about the cooldown of the runaway checker may block the SQL execution.
failpoint.Inject("issue56916", func(_ failpoint.Value) {
it.concurrency = 1
it.smallTaskConcurrency = 0
})

// if the request is triggered cool down by the runaway checker, we need to adjust the concurrency, let the sql run slowly.
if req.RunawayChecker != nil && req.RunawayChecker.CheckAction() == rmpb.RunawayAction_CoolDown {
it.concurrency = 1
Expand Down Expand Up @@ -835,15 +841,16 @@ func (worker *copIteratorWorker) run(ctx context.Context) {
// open starts workers and sender goroutines.
func (it *copIterator) open(ctx context.Context, enabledRateLimitAction, enableCollectExecutionInfo bool) {
taskCh := make(chan *copTask, 1)
smallTaskCh := make(chan *copTask, 1)
it.unconsumedStats = &unconsumedCopRuntimeStats{}
it.wg.Add(it.concurrency + it.smallTaskConcurrency)
var smallTaskCh chan *copTask
if it.smallTaskConcurrency > 0 {
smallTaskCh = make(chan *copTask, 1)
}
// Start it.concurrency number of workers to handle cop requests.
for i := 0; i < it.concurrency+it.smallTaskConcurrency; i++ {
var ch chan *copTask
if i < it.concurrency {
ch = taskCh
} else {
ch := taskCh
if i >= it.concurrency && smallTaskCh != nil {
ch = smallTaskCh
}
worker := &copIteratorWorker{
Expand Down Expand Up @@ -897,7 +904,7 @@ func (sender *copIteratorTaskSender) run(connID uint64, checker resourcegroup.Ru
break
}
var sendTo chan<- *copTask
if isSmallTask(t) {
if isSmallTask(t) && sender.smallTaskCh != nil {
sendTo = sender.smallTaskCh
} else {
sendTo = sender.taskCh
Expand All @@ -911,7 +918,9 @@ func (sender *copIteratorTaskSender) run(connID uint64, checker resourcegroup.Ru
}
}
close(sender.taskCh)
close(sender.smallTaskCh)
if sender.smallTaskCh != nil {
close(sender.smallTaskCh)
}

// Wait for worker goroutines to exit.
sender.wg.Wait()
Expand Down

0 comments on commit 11f86a8

Please sign in to comment.