diff --git a/pkg/executor/executor_failpoint_test.go b/pkg/executor/executor_failpoint_test.go index 027181293c73b..031acf115bd40 100644 --- a/pkg/executor/executor_failpoint_test.go +++ b/pkg/executor/executor_failpoint_test.go @@ -360,6 +360,25 @@ func TestCoprocessorOOMTiCase(t *testing.T) { */ } +func TestCoprocessorBlockIssues56916(t *testing.T) { + store := testkit.CreateMockStore(t) + tk := testkit.NewTestKit(t, store) + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/store/copr/issue56916", `return`)) + defer func() { require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/store/copr/issue56916")) }() + + tk.MustExec("use test") + tk.MustExec("drop table if exists t_cooldown") + tk.MustExec("create table t_cooldown (id int auto_increment, k int, unique index(id));") + tk.MustExec("insert into t_cooldown (k) values (1);") + tk.MustExec("insert into t_cooldown (k) select id from t_cooldown;") + tk.MustExec("insert into t_cooldown (k) select id from t_cooldown;") + tk.MustExec("insert into t_cooldown (k) select id from t_cooldown;") + tk.MustExec("insert into t_cooldown (k) select id from t_cooldown;") + tk.MustExec("split table t_cooldown by (1),(2),(3),(4),(5),(6),(7),(8),(9),(10);") + tk.MustQuery("select * from t_cooldown use index(id) where id > 0 and id < 10").CheckContain("1") + tk.MustQuery("select * from t_cooldown use index(id) where id between 1 and 10 or id between 124660 and 132790;").CheckContain("1") +} + func TestIssue21441(t *testing.T) { require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/executor/union/issue21441", `return`)) defer func() { diff --git a/pkg/store/copr/coprocessor.go b/pkg/store/copr/coprocessor.go index f3dd58c911ce5..085dfbe350779 100644 --- a/pkg/store/copr/coprocessor.go +++ b/pkg/store/copr/coprocessor.go @@ -223,6 +223,12 @@ func (c *CopClient) BuildCopIterator(ctx context.Context, req *kv.Request, vars it.concurrency = 1 } + // issue56916 is about the cooldown of the runaway checker may block the SQL execution. + failpoint.Inject("issue56916", func(_ failpoint.Value) { + it.concurrency = 1 + it.smallTaskConcurrency = 0 + }) + // if the request is triggered cool down by the runaway checker, we need to adjust the concurrency, let the sql run slowly. if req.RunawayChecker != nil && req.RunawayChecker.CheckAction() == rmpb.RunawayAction_CoolDown { it.concurrency = 1 @@ -835,15 +841,16 @@ func (worker *copIteratorWorker) run(ctx context.Context) { // open starts workers and sender goroutines. func (it *copIterator) open(ctx context.Context, enabledRateLimitAction, enableCollectExecutionInfo bool) { taskCh := make(chan *copTask, 1) - smallTaskCh := make(chan *copTask, 1) it.unconsumedStats = &unconsumedCopRuntimeStats{} it.wg.Add(it.concurrency + it.smallTaskConcurrency) + var smallTaskCh chan *copTask + if it.smallTaskConcurrency > 0 { + smallTaskCh = make(chan *copTask, 1) + } // Start it.concurrency number of workers to handle cop requests. for i := 0; i < it.concurrency+it.smallTaskConcurrency; i++ { - var ch chan *copTask - if i < it.concurrency { - ch = taskCh - } else { + ch := taskCh + if i >= it.concurrency && smallTaskCh != nil { ch = smallTaskCh } worker := &copIteratorWorker{ @@ -897,7 +904,7 @@ func (sender *copIteratorTaskSender) run(connID uint64, checker resourcegroup.Ru break } var sendTo chan<- *copTask - if isSmallTask(t) { + if isSmallTask(t) && sender.smallTaskCh != nil { sendTo = sender.smallTaskCh } else { sendTo = sender.taskCh @@ -911,7 +918,9 @@ func (sender *copIteratorTaskSender) run(connID uint64, checker resourcegroup.Ru } } close(sender.taskCh) - close(sender.smallTaskCh) + if sender.smallTaskCh != nil { + close(sender.smallTaskCh) + } // Wait for worker goroutines to exit. sender.wg.Wait()