Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
40630: distsqlrun: avoid flow.Waiting if called after a panic r=yuzefovich a=asubiotto

If a panic occurred in a flow, flow.Wait() would still be called since
it was deferred in Run. This would result in unnecessarily Waiting for
components that would never be signalled to exit through context
cancellation (since the panicking goroutine could be the one responsible
to do so).

Release note: None

Closes cockroachdb#39779

Co-authored-by: Alfonso Subiotto Marqués <[email protected]>
  • Loading branch information
craig[bot] and asubiotto committed Sep 10, 2019
2 parents a41355d + 10f81dd commit 78599a1
Show file tree
Hide file tree
Showing 2 changed files with 50 additions and 0 deletions.
11 changes: 11 additions & 0 deletions pkg/sql/distsqlrun/flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -591,6 +591,14 @@ func (f *Flow) Wait() {
if !f.startedGoroutines {
return
}

var panicVal interface{}
if panicVal = recover(); panicVal != nil {
// If Wait is called as part of stack unwinding during a panic, the flow
// context must be canceled to ensure that all asynchronous goroutines get
// the message that they must exit (otherwise we will wait indefinitely).
f.ctxCancel()
}
waitChan := make(chan struct{})

go func() {
Expand All @@ -605,6 +613,9 @@ func (f *Flow) Wait() {
case <-waitChan:
// Exit normally
}
if panicVal != nil {
panic(panicVal)
}
}

// Releasable is an interface for objects than can be Released back into a
Expand Down
39 changes: 39 additions & 0 deletions pkg/sql/distsqlrun/vectorized_panic_propagation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ package distsqlrun

import (
"context"
"sync"
"testing"

"github.com/cockroachdb/cockroach/pkg/col/coldata"
Expand Down Expand Up @@ -117,6 +118,44 @@ func TestNonVectorizedPanicPropagation(t *testing.T) {
require.Panics(t, func() { mat.Next() }, "NonVectorizedPanic was caught by the operators")
}

// TestNonVectorizedPanicDoesntHangServer verifies that propagating a non
// vectorized panic doesn't result in a hang as described in:
// https://github.com/cockroachdb/cockroach/issues/39779
func TestNonVectorizedPanicDoesntHangServer(t *testing.T) {
defer leaktest.AfterTest(t)()

mat := &materializer{
input: &exec.CallbackOperator{
NextCb: func(ctx context.Context) coldata.Batch {
a := []int{0}
// Trigger an index out of bounds panic.
a[0] = a[100]
return nil
},
},
}
// Avoid uninitialized output panic.
mat.out.output = &RowBuffer{}
flow := &Flow{
processors: []Processor{mat},
// This test specifically verifies that a flow doesn't get stuck in Wait for
// asynchronous components that haven't been signaled to exit. To simulate
// this we just create a mock startable.
startables: []startable{
startableFn(func(ctx context.Context, wg *sync.WaitGroup, _ context.CancelFunc) {
wg.Add(1)
go func() {
// Ensure context is canceled.
<-ctx.Done()
wg.Done()
}()
}),
},
}

require.Panics(t, func() { require.NoError(t, flow.Run(context.Background(), nil)) })
}

// testVectorizedPanicEmitter is an exec.Operator that panics on every
// odd-numbered invocation of Next() and returns the next batch from the input
// on every even-numbered (i.e. it becomes a noop for those iterations). Used
Expand Down

0 comments on commit 78599a1

Please sign in to comment.