diff --git a/pkg/sql/colflow/colrpc/inbox.go b/pkg/sql/colflow/colrpc/inbox.go index 8d2b2efa983f..a7cd1033bb4c 100644 --- a/pkg/sql/colflow/colrpc/inbox.go +++ b/pkg/sql/colflow/colrpc/inbox.go @@ -241,6 +241,7 @@ func (i *Inbox) Init(ctx context.Context) { // by the connection issues. It is expected that such an error can // occur. The Inbox must still be closed. i.close() + log.VEventf(ctx, 1, "Inbox encountered an error in Init: %v", err) colexecerror.ExpectedError(err) } } @@ -257,10 +258,15 @@ func (i *Inbox) Next() coldata.Batch { // Catch any panics that occur and close the Inbox in order to not leak // the goroutine listening for context cancellation. The Inbox must // still be closed during normal termination. - if err := recover(); err != nil { + if panicObj := recover(); panicObj != nil { // Only close the Inbox here in case of an ungraceful termination. i.close() - colexecerror.InternalError(logcrash.PanicAsError(0, err)) + err := logcrash.PanicAsError(0, panicObj) + log.VEventf(i.Ctx, 1, "Inbox encountered an error in Next: %v", err) + // Note that here we use InternalError to propagate the error + // consciously - the code below is careful to mark all expected + // errors as "expected", and we want to keep that distinction. + colexecerror.InternalError(err) } }() @@ -277,12 +283,10 @@ func (i *Inbox) Next() coldata.Batch { i.close() return coldata.ZeroBatch } - // Note that here err can be stream's context cancellation. If it - // was caused by the internal cancellation of the parallel - // unordered synchronizer, it'll get swallowed by the synchronizer - // goroutine. Regardless of the cause we want to propagate such - // error in all cases so that the caller could decide on how to - // handle it. + // Note that here err can be stream's context cancellation. + // Regardless of the cause we want to propagate such an error as + // expected on in all cases so that the caller could decide on how + // to handle it. i.errCh <- err colexecerror.ExpectedError(err) } @@ -293,9 +297,9 @@ func (i *Inbox) Next() coldata.Batch { continue } if meta.Err != nil { - // If an error was encountered, it needs to be propagated immediately. - // All other metadata will simply be buffered and returned in - // DrainMeta. + // If an error was encountered, it needs to be propagated + // immediately. All other metadata will simply be buffered + // and returned in DrainMeta. colexecerror.ExpectedError(meta.Err) } i.bufferedMeta = append(i.bufferedMeta, meta) diff --git a/pkg/sql/colflow/colrpc/outbox.go b/pkg/sql/colflow/colrpc/outbox.go index d1f5c034accb..5565a8b82cfd 100644 --- a/pkg/sql/colflow/colrpc/outbox.go +++ b/pkg/sql/colflow/colrpc/outbox.go @@ -13,6 +13,7 @@ package colrpc import ( "bytes" "context" + "fmt" "io" "sync/atomic" "time" @@ -223,9 +224,9 @@ func (o *Outbox) handleStreamErr( } } -func (o *Outbox) moveToDraining(ctx context.Context) { +func (o *Outbox) moveToDraining(ctx context.Context, reason string) { if atomic.CompareAndSwapUint32(&o.draining, 0, 1) { - log.VEvent(ctx, 2, "Outbox moved to draining") + log.VEventf(ctx, 2, "Outbox moved to draining (%s)", reason) } } @@ -301,6 +302,7 @@ func (o *Outbox) sendBatches( func (o *Outbox) sendMetadata(ctx context.Context, stream flowStreamClient, errToSend error) error { msg := &execinfrapb.ProducerMessage{} if errToSend != nil { + log.VEventf(ctx, 1, "Outbox sending an error as metadata: %v", errToSend) msg.Data.Metadata = append( msg.Data.Metadata, execinfrapb.LocalMetaToRemoteProducerMeta(ctx, execinfrapb.ProducerMetadata{Err: errToSend}), ) @@ -360,7 +362,7 @@ func (o *Outbox) runWithStream( case msg.Handshake != nil: log.VEventf(ctx, 2, "Outbox received handshake: %v", msg.Handshake) case msg.DrainRequest != nil: - o.moveToDraining(ctx) + o.moveToDraining(ctx, "consumer requested draining" /* reason */) } } close(waitCh) @@ -368,7 +370,11 @@ func (o *Outbox) runWithStream( terminatedGracefully, errToSend := o.sendBatches(ctx, stream, flowCtxCancel, outboxCtxCancel) if terminatedGracefully || errToSend != nil { - o.moveToDraining(ctx) + reason := "terminated gracefully" + if errToSend != nil { + reason = fmt.Sprintf("encountered error when sending batches: %v", errToSend) + } + o.moveToDraining(ctx, reason) if err := o.sendMetadata(ctx, stream, errToSend); err != nil { o.handleStreamErr(ctx, "Send (metadata)", err, flowCtxCancel, outboxCtxCancel) } else { diff --git a/pkg/sql/flowinfra/inbound.go b/pkg/sql/flowinfra/inbound.go index 2eeaaa084791..3add011a1cb4 100644 --- a/pkg/sql/flowinfra/inbound.go +++ b/pkg/sql/flowinfra/inbound.go @@ -26,12 +26,12 @@ import ( // InboundStreamHandler is a handler of an inbound stream. type InboundStreamHandler interface { - // run is called once a FlowStream RPC is handled and a stream is obtained to + // Run is called once a FlowStream RPC is handled and a stream is obtained to // make this stream accessible to the rest of the flow. Run( ctx context.Context, stream execinfrapb.DistSQL_FlowStreamServer, firstMsg *execinfrapb.ProducerMessage, f *FlowBase, ) error - // timeout is called with an error, which results in the teardown of the + // Timeout is called with an error, which results in the teardown of the // stream strategy with the given error. // WARNING: timeout may block. Timeout(err error)