Skip to content

Commit

Permalink
Revert "colrpc: fix tracing when an inbox receives an error"
Browse files Browse the repository at this point in the history
This reverts commit bd21647.
  • Loading branch information
yuzefovich authored and celiala committed Apr 11, 2022
1 parent fb9b2ef commit 11787ed
Show file tree
Hide file tree
Showing 5 changed files with 8 additions and 157 deletions.
1 change: 0 additions & 1 deletion pkg/sql/colflow/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,6 @@ go_test(
srcs = [
"colbatch_scan_test.go",
"dep_test.go",
"draining_test.go",
"main_test.go",
"routers_test.go",
"stats_test.go",
Expand Down
9 changes: 0 additions & 9 deletions pkg/sql/colflow/colrpc/colrpc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -643,15 +643,6 @@ func TestOutboxInboxMetadataPropagation(t *testing.T) {
return len(meta) == 1 && errors.Is(meta[0].Err, expectedError)
},
test: func(ctx context.Context, inbox *Inbox) []execinfrapb.ProducerMetadata {
defer func() {
// Make sure that the error is not propagated for the second
// time.
//
// We still need to drain to simulate what happens in
// production - there, the consumer of the inbox would
// transition into draining upon receiving the error.
require.True(t, len(inbox.DrainMeta()) == 0)
}()
for {
var b coldata.Batch
if err := colexecerror.CatchVectorizedRuntimeError(func() {
Expand Down
36 changes: 8 additions & 28 deletions pkg/sql/colflow/colrpc/inbox.go
Original file line number Diff line number Diff line change
Expand Up @@ -328,17 +328,13 @@ func (i *Inbox) Next() coldata.Batch {
return coldata.ZeroBatch
}

var ungracefulStreamTermination bool
defer func() {
// Catch any panics that occur and close the Inbox in order to not leak
// the goroutine listening for context cancellation. The Inbox must
// still be closed during normal termination.
if panicObj := recover(); panicObj != nil {
if ungracefulStreamTermination {
// Only close the Inbox here in case of an ungraceful
// termination.
i.close()
}
// Only close the Inbox here in case of an ungraceful termination.
i.close()
err := logcrash.PanicAsError(0, panicObj)
log.VEventf(i.Ctx, 1, "Inbox encountered an error in Next: %v", err)
// Note that here we use InternalError to propagate the error
Expand Down Expand Up @@ -367,37 +363,21 @@ func (i *Inbox) Next() coldata.Batch {
// to handle it.
err = pgerror.Newf(pgcode.InternalConnectionFailure, "inbox communication error: %s", err)
i.errCh <- err
ungracefulStreamTermination = true
colexecerror.ExpectedError(err)
}
if len(m.Data.Metadata) != 0 {
// If an error was encountered, it needs to be propagated
// immediately. All other metadata will simply be buffered and
// returned in DrainMeta.
var receivedErr error
for _, rpm := range m.Data.Metadata {
meta, ok := execinfrapb.RemoteProducerMetaToLocalMeta(i.Ctx, rpm)
if !ok {
continue
}
if meta.Err != nil && receivedErr == nil {
receivedErr = meta.Err
} else {
// Note that if multiple errors are sent in a single
// message, then we'll propagate the first one right away
// (via a panic below) and will buffer the rest to be
// returned in DrainMeta. The caller will catch the panic
// and will transition to draining, so this all works out.
//
// We choose this way of handling multiple errors rather
// than something like errors.CombineErrors() since we want
// to keep errors unchanged (e.g. roachpb.ErrPriority() will
// be called on each error in the DistSQLReceiver).
i.bufferedMeta = append(i.bufferedMeta, meta)
if meta.Err != nil {
// If an error was encountered, it needs to be propagated
// immediately. All other metadata will simply be buffered
// and returned in DrainMeta.
colexecerror.ExpectedError(meta.Err)
}
}
if receivedErr != nil {
colexecerror.ExpectedError(receivedErr)
i.bufferedMeta = append(i.bufferedMeta, meta)
}
// Continue until we get the next batch or EOF.
continue
Expand Down
6 changes: 0 additions & 6 deletions pkg/sql/colflow/colrpc/inbox_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,10 +139,8 @@ func TestInboxNextPanicDoesntLeakGoroutines(t *testing.T) {
m := &execinfrapb.ProducerMessage{}
m.Data.RawBytes = []byte("garbage")

// Simulate the client (outbox) that sends only a single piece of metadata.
go func() {
_ = rpcLayer.client.Send(m)
_ = rpcLayer.client.CloseSend()
}()

// inbox.Next should panic given that the deserializer will encounter garbage
Expand All @@ -152,10 +150,6 @@ func TestInboxNextPanicDoesntLeakGoroutines(t *testing.T) {
inbox.Next()
})

// Upon catching the panic and converting it into an error, the caller
// transitions to draining.
inbox.DrainMeta()

// We require no error from the stream handler as nothing was canceled. The
// panic is bubbled up through the Next chain on the Inbox's host.
require.NoError(t, <-streamHandlerErrCh)
Expand Down
113 changes: 0 additions & 113 deletions pkg/sql/colflow/draining_test.go

This file was deleted.

0 comments on commit 11787ed

Please sign in to comment.