diff --git a/pkg/sql/distsqlrun/flow.go b/pkg/sql/distsqlrun/flow.go index 5ae534d28e6d..ff7b2e4f9f15 100644 --- a/pkg/sql/distsqlrun/flow.go +++ b/pkg/sql/distsqlrun/flow.go @@ -591,6 +591,14 @@ func (f *Flow) Wait() { if !f.startedGoroutines { return } + + var panicVal interface{} + if panicVal = recover(); panicVal != nil { + // If Wait is called as part of stack unwinding during a panic, the flow + // context must be canceled to ensure that all asynchronous goroutines get + // the message that they must exit (otherwise we will wait indefinitely). + f.ctxCancel() + } waitChan := make(chan struct{}) go func() { @@ -605,6 +613,9 @@ func (f *Flow) Wait() { case <-waitChan: // Exit normally } + if panicVal != nil { + panic(panicVal) + } } // Releasable is an interface for objects than can be Released back into a diff --git a/pkg/sql/distsqlrun/vectorized_panic_propagation_test.go b/pkg/sql/distsqlrun/vectorized_panic_propagation_test.go index 2642c8255b0f..278abf9e576b 100644 --- a/pkg/sql/distsqlrun/vectorized_panic_propagation_test.go +++ b/pkg/sql/distsqlrun/vectorized_panic_propagation_test.go @@ -12,6 +12,7 @@ package distsqlrun import ( "context" + "sync" "testing" "github.com/cockroachdb/cockroach/pkg/col/coldata" @@ -117,6 +118,44 @@ func TestNonVectorizedPanicPropagation(t *testing.T) { require.Panics(t, func() { mat.Next() }, "NonVectorizedPanic was caught by the operators") } +// TestNonVectorizedPanicDoesntHangServer verifies that propagating a non +// vectorized panic doesn't result in a hang as described in: +// https://github.com/cockroachdb/cockroach/issues/39779 +func TestNonVectorizedPanicDoesntHangServer(t *testing.T) { + defer leaktest.AfterTest(t)() + + mat := &materializer{ + input: &exec.CallbackOperator{ + NextCb: func(ctx context.Context) coldata.Batch { + a := []int{0} + // Trigger an index out of bounds panic. + a[0] = a[100] + return nil + }, + }, + } + // Avoid uninitialized output panic. + mat.out.output = &RowBuffer{} + flow := &Flow{ + processors: []Processor{mat}, + // This test specifically verifies that a flow doesn't get stuck in Wait for + // asynchronous components that haven't been signaled to exit. To simulate + // this we just create a mock startable. + startables: []startable{ + startableFn(func(ctx context.Context, wg *sync.WaitGroup, _ context.CancelFunc) { + wg.Add(1) + go func() { + // Ensure context is canceled. + <-ctx.Done() + wg.Done() + }() + }), + }, + } + + require.Panics(t, func() { require.NoError(t, flow.Run(context.Background(), nil)) }) +} + // testVectorizedPanicEmitter is an exec.Operator that panics on every // odd-numbered invocation of Next() and returns the next batch from the input // on every even-numbered (i.e. it becomes a noop for those iterations). Used