Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
106363: kvserver: log incoming snapshot trace on context cancellation r=AlexTalks a=AlexTalks

While traces captured during an incoming snapshot are collected and returned as part of the snapshot response, both on server-side error and on successful application, when the context is cancelled (e.g. on snapshot timeout) the traces are unable to be returned and previously were discarded. This changes the functionality such that in case there is an error on sending to / reading from the incoming snapshot stream due to context cancellation, the collected traces will be logged on the server side - i.e. the snapshot recipient.

Fixes: cockroachdb#105820

Release note: None

Co-authored-by: Alex Sarkesian <[email protected]>
  • Loading branch information
craig[bot] and AlexTalks committed Jul 12, 2023
2 parents a947886 + 7f1c4f4 commit 7b63c23
Showing 1 changed file with 54 additions and 16 deletions.
70 changes: 54 additions & 16 deletions pkg/kv/kvserver/store_snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,43 @@ type incomingSnapshotStream interface {
Recv() (*kvserverpb.SnapshotRequest, error)
}

// loggingIncomingSnapshotStream wraps the interface on a GRPC stream used
// to receive a snapshot over the network, with special handling for logging
// the current tracing span on context cancellation.
type loggingIncomingSnapshotStream struct {
stream incomingSnapshotStream
}

func (l loggingIncomingSnapshotStream) Send(
ctx context.Context, resp *kvserverpb.SnapshotResponse,
) error {
err := l.stream.Send(resp)
if err != nil && ctx.Err() != nil {
// Log trace of incoming snapshot on context cancellation (e.g.
// times out or caller goes away).
if sp := tracing.SpanFromContext(ctx); sp != nil && !sp.IsNoop() {
log.Infof(ctx, "incoming snapshot stream response send failed with error: %s\ntrace:\n%s",
err, sp.GetConfiguredRecording())
}
}
return err
}

func (l loggingIncomingSnapshotStream) Recv(
ctx context.Context,
) (*kvserverpb.SnapshotRequest, error) {
req, err := l.stream.Recv()
if err != nil && ctx.Err() != nil {
// Log trace of incoming snapshot on context cancellation (e.g.
// times out or caller goes away).
if sp := tracing.SpanFromContext(ctx); sp != nil && !sp.IsNoop() {
log.Infof(ctx, "incoming snapshot stream request recv failed with error: %s\ntrace:\n%s",
err, sp.GetConfiguredRecording())
}
}
return req, err
}

// outgoingSnapshotStream is the minimal interface on a GRPC stream required
// to send a snapshot over the network.
type outgoingSnapshotStream interface {
Expand All @@ -115,7 +152,7 @@ type snapshotStrategy interface {
Receive(
context.Context,
*Store,
incomingSnapshotStream,
loggingIncomingSnapshotStream,
kvserverpb.SnapshotRequest_Header,
snapshotRecordMetrics,
) (IncomingSnapshot, error)
Expand Down Expand Up @@ -378,7 +415,7 @@ func (tag *snapshotTimingTag) Render() []attribute.KeyValue {
func (kvSS *kvBatchSnapshotStrategy) Receive(
ctx context.Context,
s *Store,
stream incomingSnapshotStream,
loggingStream loggingIncomingSnapshotStream,
header kvserverpb.SnapshotRequest_Header,
recordBytesReceived snapshotRecordMetrics,
) (IncomingSnapshot, error) {
Expand Down Expand Up @@ -414,14 +451,14 @@ func (kvSS *kvBatchSnapshotStrategy) Receive(

for {
timingTag.start("recv")
req, err := stream.Recv()
req, err := loggingStream.Recv(ctx)
timingTag.stop("recv")
if err != nil {
return noSnap, err
}
if req.Header != nil {
err := errors.New("client error: provided a header mid-stream")
return noSnap, sendSnapshotError(ctx, s, stream, err)
return noSnap, sendSnapshotError(ctx, s, loggingStream, err)
}

if req.KVBatch != nil {
Expand Down Expand Up @@ -487,7 +524,7 @@ func (kvSS *kvBatchSnapshotStrategy) Receive(
snapUUID, err := uuid.FromBytes(header.RaftMessageRequest.Message.Snapshot.Data)
if err != nil {
err = errors.Wrap(err, "client error: invalid snapshot")
return noSnap, sendSnapshotError(ctx, s, stream, err)
return noSnap, sendSnapshotError(ctx, s, loggingStream, err)
}

inSnap := IncomingSnapshot{
Expand Down Expand Up @@ -995,6 +1032,7 @@ func (s *Store) getLocalityComparison(
func (s *Store) receiveSnapshot(
ctx context.Context, header *kvserverpb.SnapshotRequest_Header, stream incomingSnapshotStream,
) error {
loggingStream := loggingIncomingSnapshotStream{stream}
// Draining nodes will generally not be rebalanced to (see the filtering that
// happens in getStoreListFromIDsLocked()), but in case they are, they should
// reject the incoming rebalancing snapshots.
Expand All @@ -1009,7 +1047,7 @@ func (s *Store) receiveSnapshot(
// getStoreListFromIDsLocked(). Is that sound? Don't we want to
// upreplicate to draining nodes if there are no other candidates?
case kvserverpb.SnapshotRequest_REBALANCE:
return sendSnapshotError(ctx, s, stream, errors.New(storeDrainingMsg))
return sendSnapshotError(ctx, s, loggingStream, errors.New(storeDrainingMsg))
default:
// If this a new snapshot type that this cockroach version does not know
// about, we let it through.
Expand All @@ -1020,7 +1058,7 @@ func (s *Store) receiveSnapshot(
if err := fn(header); err != nil {
// NB: we intentionally don't mark this error as errMarkSnapshotError so
// that we don't end up retrying injected errors in tests.
return sendSnapshotError(ctx, s, stream, err)
return sendSnapshotError(ctx, s, loggingStream, err)
}
}

Expand Down Expand Up @@ -1060,7 +1098,7 @@ func (s *Store) receiveSnapshot(
return nil
}); pErr != nil {
log.Infof(ctx, "cannot accept snapshot: %s", pErr)
return sendSnapshotError(ctx, s, stream, pErr.GoError())
return sendSnapshotError(ctx, s, loggingStream, pErr.GoError())
}

defer func() {
Expand All @@ -1082,7 +1120,7 @@ func (s *Store) receiveSnapshot(
snapUUID, err := uuid.FromBytes(header.RaftMessageRequest.Message.Snapshot.Data)
if err != nil {
err = errors.Wrap(err, "invalid snapshot")
return sendSnapshotError(ctx, s, stream, err)
return sendSnapshotError(ctx, s, loggingStream, err)
}

ss = &kvBatchSnapshotStrategy{
Expand All @@ -1092,13 +1130,13 @@ func (s *Store) receiveSnapshot(
}
defer ss.Close(ctx)
default:
return sendSnapshotError(ctx, s, stream,
return sendSnapshotError(ctx, s, loggingStream,
errors.Errorf("%s,r%d: unknown snapshot strategy: %s",
s, header.State.Desc.RangeID, header.Strategy),
)
}

if err := stream.Send(&kvserverpb.SnapshotResponse{Status: kvserverpb.SnapshotResponse_ACCEPTED}); err != nil {
if err := loggingStream.Send(ctx, &kvserverpb.SnapshotResponse{Status: kvserverpb.SnapshotResponse_ACCEPTED}); err != nil {
return err
}
if log.V(2) {
Expand All @@ -1125,7 +1163,7 @@ func (s *Store) receiveSnapshot(
}
ctx, rSp := tracing.EnsureChildSpan(ctx, s.cfg.Tracer(), "receive snapshot data")
defer rSp.Finish() // Ensure that the tracing span is closed, even if ss.Receive errors
inSnap, err := ss.Receive(ctx, s, stream, *header, recordBytesReceived)
inSnap, err := ss.Receive(ctx, s, loggingStream, *header, recordBytesReceived)
if err != nil {
return err
}
Expand All @@ -1142,9 +1180,9 @@ func (s *Store) receiveSnapshot(
// sender as this being a retriable error, see isSnapshotError().
err = errors.Mark(err, errMarkSnapshotError)
err = errors.Wrap(err, "failed to apply snapshot")
return sendSnapshotError(ctx, s, stream, err)
return sendSnapshotError(ctx, s, loggingStream, err)
}
return stream.Send(&kvserverpb.SnapshotResponse{
return loggingStream.Send(ctx, &kvserverpb.SnapshotResponse{
Status: kvserverpb.SnapshotResponse_APPLIED,
CollectedSpans: tracing.SpanFromContext(ctx).GetConfiguredRecording(),
})
Expand All @@ -1154,13 +1192,13 @@ func (s *Store) receiveSnapshot(
// to signify that it can not accept this snapshot. Internally it increments the
// statistic tracking how many invalid snapshots it received.
func sendSnapshotError(
ctx context.Context, s *Store, stream incomingSnapshotStream, err error,
ctx context.Context, s *Store, stream loggingIncomingSnapshotStream, err error,
) error {
s.metrics.RangeSnapshotRecvFailed.Inc(1)
resp := snapRespErr(err)
resp.CollectedSpans = tracing.SpanFromContext(ctx).GetConfiguredRecording()

return stream.Send(resp)
return stream.Send(ctx, resp)
}

func snapRespErr(err error) *kvserverpb.SnapshotResponse {
Expand Down

0 comments on commit 7b63c23

Please sign in to comment.