Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

executor: HashJoinExec checks the buildError even if the probeSide is empty #30471

Merged
merged 8 commits into from
Dec 9, 2021
14 changes: 14 additions & 0 deletions executor/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9487,3 +9487,17 @@ func (s *testSerialSuite) TestIssue28650(c *C) {
}()
}
}

func (s *testSerialSuite) TestIssue30289(c *C) {
tk := testkit.NewTestKit(c, s.store)
tk.MustExec("use test")
fpName := "github.com/pingcap/tidb/executor/issue30289"
c.Assert(failpoint.Enable(fpName, `return(true)`), IsNil)
defer func() {
c.Assert(failpoint.Disable(fpName), IsNil)
}()
tk.MustExec("drop table if exists t")
tk.MustExec("create table t(a int)")
err := tk.QueryToErr("select /*+ hash_join(t1) */ * from t t1 join t t2 on t1.a=t2.a")
c.Assert(err.Error(), Matches, "issue30289 build return error")
}
13 changes: 12 additions & 1 deletion executor/join.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,9 +214,13 @@ func (e *HashJoinExec) fetchProbeSideChunks(ctx context.Context) {
return
}
if !hasWaitedForBuild {
failpoint.Inject("issue30289", func(val failpoint.Value) {
if val.(bool) {
probeSideResult.Reset()
}
})
if probeSideResult.NumRows() == 0 && !e.useOuterToBuild {
e.finished.Store(true)
return
}
emptyBuild, buildErr := e.wait4BuildSide()
if buildErr != nil {
Expand Down Expand Up @@ -258,6 +262,13 @@ func (e *HashJoinExec) wait4BuildSide() (emptyBuild bool, err error) {
func (e *HashJoinExec) fetchBuildSideRows(ctx context.Context, chkCh chan<- *chunk.Chunk, doneCh <-chan struct{}) {
defer close(chkCh)
var err error
failpoint.Inject("issue30289", func(val failpoint.Value) {
if val.(bool) {
err = errors.Errorf("issue30289 build return error")
e.buildFinished <- errors.Trace(err)
return
}
})
for {
if e.finished.Load().(bool) {
return
Expand Down
28 changes: 21 additions & 7 deletions executor/shuffle.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,25 +142,39 @@ func (e *ShuffleExec) Close() error {
if !e.prepared {
for _, w := range e.workers {
for _, r := range w.receivers {
close(r.inputHolderCh)
close(r.inputCh)
if r.inputHolderCh != nil {
close(r.inputHolderCh)
}
if r.inputCh != nil {
close(r.inputCh)
}
}
close(w.outputHolderCh)
if w.outputHolderCh != nil {
close(w.outputHolderCh)
}
}
if e.outputCh != nil {
close(e.outputCh)
}
close(e.outputCh)
}
close(e.finishCh)
if e.finishCh != nil {
close(e.finishCh)
}
for _, w := range e.workers {
for _, r := range w.receivers {
for range r.inputCh {
if r.inputCh != nil {
for range r.inputCh {
}
}
}
// close child executor of each worker
if err := w.childExec.Close(); err != nil && firstErr == nil {
firstErr = err
}
}
for range e.outputCh { // workers exit before `e.outputCh` is closed.
if e.outputCh != nil {
for range e.outputCh { // workers exit before `e.outputCh` is closed.
}
}
e.executed = false

Expand Down