Skip to content

Commit

Permalink
Merge #64295
Browse files Browse the repository at this point in the history
64295: colrpc: some more logging around errors in inbox and outbox r=yuzefovich a=yuzefovich

Release note: None

Co-authored-by: Yahor Yuzefovich <[email protected]>
  • Loading branch information
craig[bot] and yuzefovich committed Apr 28, 2021
2 parents 171b1bd + 94afc7b commit 3c3e3dc
Show file tree
Hide file tree
Showing 3 changed files with 27 additions and 17 deletions.
26 changes: 15 additions & 11 deletions pkg/sql/colflow/colrpc/inbox.go
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,7 @@ func (i *Inbox) Init(ctx context.Context) {
// by the connection issues. It is expected that such an error can
// occur. The Inbox must still be closed.
i.close()
log.VEventf(ctx, 1, "Inbox encountered an error in Init: %v", err)
colexecerror.ExpectedError(err)
}
}
Expand All @@ -257,10 +258,15 @@ func (i *Inbox) Next() coldata.Batch {
// Catch any panics that occur and close the Inbox in order to not leak
// the goroutine listening for context cancellation. The Inbox must
// still be closed during normal termination.
if err := recover(); err != nil {
if panicObj := recover(); panicObj != nil {
// Only close the Inbox here in case of an ungraceful termination.
i.close()
colexecerror.InternalError(logcrash.PanicAsError(0, err))
err := logcrash.PanicAsError(0, panicObj)
log.VEventf(i.Ctx, 1, "Inbox encountered an error in Next: %v", err)
// Note that here we use InternalError to propagate the error
// consciously - the code below is careful to mark all expected
// errors as "expected", and we want to keep that distinction.
colexecerror.InternalError(err)
}
}()

Expand All @@ -277,12 +283,10 @@ func (i *Inbox) Next() coldata.Batch {
i.close()
return coldata.ZeroBatch
}
// Note that here err can be stream's context cancellation. If it
// was caused by the internal cancellation of the parallel
// unordered synchronizer, it'll get swallowed by the synchronizer
// goroutine. Regardless of the cause we want to propagate such
// error in all cases so that the caller could decide on how to
// handle it.
// Note that here err can be stream's context cancellation.
// Regardless of the cause we want to propagate such an error as
// expected on in all cases so that the caller could decide on how
// to handle it.
i.errCh <- err
colexecerror.ExpectedError(err)
}
Expand All @@ -293,9 +297,9 @@ func (i *Inbox) Next() coldata.Batch {
continue
}
if meta.Err != nil {
// If an error was encountered, it needs to be propagated immediately.
// All other metadata will simply be buffered and returned in
// DrainMeta.
// If an error was encountered, it needs to be propagated
// immediately. All other metadata will simply be buffered
// and returned in DrainMeta.
colexecerror.ExpectedError(meta.Err)
}
i.bufferedMeta = append(i.bufferedMeta, meta)
Expand Down
14 changes: 10 additions & 4 deletions pkg/sql/colflow/colrpc/outbox.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ package colrpc
import (
"bytes"
"context"
"fmt"
"io"
"sync/atomic"
"time"
Expand Down Expand Up @@ -223,9 +224,9 @@ func (o *Outbox) handleStreamErr(
}
}

func (o *Outbox) moveToDraining(ctx context.Context) {
func (o *Outbox) moveToDraining(ctx context.Context, reason string) {
if atomic.CompareAndSwapUint32(&o.draining, 0, 1) {
log.VEvent(ctx, 2, "Outbox moved to draining")
log.VEventf(ctx, 2, "Outbox moved to draining (%s)", reason)
}
}

Expand Down Expand Up @@ -301,6 +302,7 @@ func (o *Outbox) sendBatches(
func (o *Outbox) sendMetadata(ctx context.Context, stream flowStreamClient, errToSend error) error {
msg := &execinfrapb.ProducerMessage{}
if errToSend != nil {
log.VEventf(ctx, 1, "Outbox sending an error as metadata: %v", errToSend)
msg.Data.Metadata = append(
msg.Data.Metadata, execinfrapb.LocalMetaToRemoteProducerMeta(ctx, execinfrapb.ProducerMetadata{Err: errToSend}),
)
Expand Down Expand Up @@ -360,15 +362,19 @@ func (o *Outbox) runWithStream(
case msg.Handshake != nil:
log.VEventf(ctx, 2, "Outbox received handshake: %v", msg.Handshake)
case msg.DrainRequest != nil:
o.moveToDraining(ctx)
o.moveToDraining(ctx, "consumer requested draining" /* reason */)
}
}
close(waitCh)
}()

terminatedGracefully, errToSend := o.sendBatches(ctx, stream, flowCtxCancel, outboxCtxCancel)
if terminatedGracefully || errToSend != nil {
o.moveToDraining(ctx)
reason := "terminated gracefully"
if errToSend != nil {
reason = fmt.Sprintf("encountered error when sending batches: %v", errToSend)
}
o.moveToDraining(ctx, reason)
if err := o.sendMetadata(ctx, stream, errToSend); err != nil {
o.handleStreamErr(ctx, "Send (metadata)", err, flowCtxCancel, outboxCtxCancel)
} else {
Expand Down
4 changes: 2 additions & 2 deletions pkg/sql/flowinfra/inbound.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,12 @@ import (

// InboundStreamHandler is a handler of an inbound stream.
type InboundStreamHandler interface {
// run is called once a FlowStream RPC is handled and a stream is obtained to
// Run is called once a FlowStream RPC is handled and a stream is obtained to
// make this stream accessible to the rest of the flow.
Run(
ctx context.Context, stream execinfrapb.DistSQL_FlowStreamServer, firstMsg *execinfrapb.ProducerMessage, f *FlowBase,
) error
// timeout is called with an error, which results in the teardown of the
// Timeout is called with an error, which results in the teardown of the
// stream strategy with the given error.
// WARNING: timeout may block.
Timeout(err error)
Expand Down

0 comments on commit 3c3e3dc

Please sign in to comment.