diff --git a/pkg/kv/kvserver/store_snapshot.go b/pkg/kv/kvserver/store_snapshot.go index ee1ea28e198c..e028ade2f952 100644 --- a/pkg/kv/kvserver/store_snapshot.go +++ b/pkg/kv/kvserver/store_snapshot.go @@ -93,6 +93,43 @@ type incomingSnapshotStream interface { Recv() (*kvserverpb.SnapshotRequest, error) } +// loggingIncomingSnapshotStream wraps the interface on a GRPC stream used +// to receive a snapshot over the network, with special handling for logging +// the current tracing span on context cancellation. +type loggingIncomingSnapshotStream struct { + stream incomingSnapshotStream +} + +func (l loggingIncomingSnapshotStream) Send( + ctx context.Context, resp *kvserverpb.SnapshotResponse, +) error { + err := l.stream.Send(resp) + if err != nil && ctx.Err() != nil { + // Log trace of incoming snapshot on context cancellation (e.g. + // times out or caller goes away). + if sp := tracing.SpanFromContext(ctx); sp != nil && !sp.IsNoop() { + log.Infof(ctx, "incoming snapshot stream response send failed with error: %s\ntrace:\n%s", + err, sp.GetConfiguredRecording()) + } + } + return err +} + +func (l loggingIncomingSnapshotStream) Recv( + ctx context.Context, +) (*kvserverpb.SnapshotRequest, error) { + req, err := l.stream.Recv() + if err != nil && ctx.Err() != nil { + // Log trace of incoming snapshot on context cancellation (e.g. + // times out or caller goes away). + if sp := tracing.SpanFromContext(ctx); sp != nil && !sp.IsNoop() { + log.Infof(ctx, "incoming snapshot stream request recv failed with error: %s\ntrace:\n%s", + err, sp.GetConfiguredRecording()) + } + } + return req, err +} + // outgoingSnapshotStream is the minimal interface on a GRPC stream required // to send a snapshot over the network. type outgoingSnapshotStream interface { @@ -115,7 +152,7 @@ type snapshotStrategy interface { Receive( context.Context, *Store, - incomingSnapshotStream, + loggingIncomingSnapshotStream, kvserverpb.SnapshotRequest_Header, snapshotRecordMetrics, ) (IncomingSnapshot, error) @@ -378,7 +415,7 @@ func (tag *snapshotTimingTag) Render() []attribute.KeyValue { func (kvSS *kvBatchSnapshotStrategy) Receive( ctx context.Context, s *Store, - stream incomingSnapshotStream, + loggingStream loggingIncomingSnapshotStream, header kvserverpb.SnapshotRequest_Header, recordBytesReceived snapshotRecordMetrics, ) (IncomingSnapshot, error) { @@ -414,14 +451,14 @@ func (kvSS *kvBatchSnapshotStrategy) Receive( for { timingTag.start("recv") - req, err := stream.Recv() + req, err := loggingStream.Recv(ctx) timingTag.stop("recv") if err != nil { return noSnap, err } if req.Header != nil { err := errors.New("client error: provided a header mid-stream") - return noSnap, sendSnapshotError(ctx, s, stream, err) + return noSnap, sendSnapshotError(ctx, s, loggingStream, err) } if req.KVBatch != nil { @@ -487,7 +524,7 @@ func (kvSS *kvBatchSnapshotStrategy) Receive( snapUUID, err := uuid.FromBytes(header.RaftMessageRequest.Message.Snapshot.Data) if err != nil { err = errors.Wrap(err, "client error: invalid snapshot") - return noSnap, sendSnapshotError(ctx, s, stream, err) + return noSnap, sendSnapshotError(ctx, s, loggingStream, err) } inSnap := IncomingSnapshot{ @@ -995,6 +1032,7 @@ func (s *Store) getLocalityComparison( func (s *Store) receiveSnapshot( ctx context.Context, header *kvserverpb.SnapshotRequest_Header, stream incomingSnapshotStream, ) error { + loggingStream := loggingIncomingSnapshotStream{stream} // Draining nodes will generally not be rebalanced to (see the filtering that // happens in getStoreListFromIDsLocked()), but in case they are, they should // reject the incoming rebalancing snapshots. @@ -1009,7 +1047,7 @@ func (s *Store) receiveSnapshot( // getStoreListFromIDsLocked(). Is that sound? Don't we want to // upreplicate to draining nodes if there are no other candidates? case kvserverpb.SnapshotRequest_REBALANCE: - return sendSnapshotError(ctx, s, stream, errors.New(storeDrainingMsg)) + return sendSnapshotError(ctx, s, loggingStream, errors.New(storeDrainingMsg)) default: // If this a new snapshot type that this cockroach version does not know // about, we let it through. @@ -1020,7 +1058,7 @@ func (s *Store) receiveSnapshot( if err := fn(header); err != nil { // NB: we intentionally don't mark this error as errMarkSnapshotError so // that we don't end up retrying injected errors in tests. - return sendSnapshotError(ctx, s, stream, err) + return sendSnapshotError(ctx, s, loggingStream, err) } } @@ -1060,7 +1098,7 @@ func (s *Store) receiveSnapshot( return nil }); pErr != nil { log.Infof(ctx, "cannot accept snapshot: %s", pErr) - return sendSnapshotError(ctx, s, stream, pErr.GoError()) + return sendSnapshotError(ctx, s, loggingStream, pErr.GoError()) } defer func() { @@ -1082,7 +1120,7 @@ func (s *Store) receiveSnapshot( snapUUID, err := uuid.FromBytes(header.RaftMessageRequest.Message.Snapshot.Data) if err != nil { err = errors.Wrap(err, "invalid snapshot") - return sendSnapshotError(ctx, s, stream, err) + return sendSnapshotError(ctx, s, loggingStream, err) } ss = &kvBatchSnapshotStrategy{ @@ -1092,13 +1130,13 @@ func (s *Store) receiveSnapshot( } defer ss.Close(ctx) default: - return sendSnapshotError(ctx, s, stream, + return sendSnapshotError(ctx, s, loggingStream, errors.Errorf("%s,r%d: unknown snapshot strategy: %s", s, header.State.Desc.RangeID, header.Strategy), ) } - if err := stream.Send(&kvserverpb.SnapshotResponse{Status: kvserverpb.SnapshotResponse_ACCEPTED}); err != nil { + if err := loggingStream.Send(ctx, &kvserverpb.SnapshotResponse{Status: kvserverpb.SnapshotResponse_ACCEPTED}); err != nil { return err } if log.V(2) { @@ -1125,7 +1163,7 @@ func (s *Store) receiveSnapshot( } ctx, rSp := tracing.EnsureChildSpan(ctx, s.cfg.Tracer(), "receive snapshot data") defer rSp.Finish() // Ensure that the tracing span is closed, even if ss.Receive errors - inSnap, err := ss.Receive(ctx, s, stream, *header, recordBytesReceived) + inSnap, err := ss.Receive(ctx, s, loggingStream, *header, recordBytesReceived) if err != nil { return err } @@ -1142,9 +1180,9 @@ func (s *Store) receiveSnapshot( // sender as this being a retriable error, see isSnapshotError(). err = errors.Mark(err, errMarkSnapshotError) err = errors.Wrap(err, "failed to apply snapshot") - return sendSnapshotError(ctx, s, stream, err) + return sendSnapshotError(ctx, s, loggingStream, err) } - return stream.Send(&kvserverpb.SnapshotResponse{ + return loggingStream.Send(ctx, &kvserverpb.SnapshotResponse{ Status: kvserverpb.SnapshotResponse_APPLIED, CollectedSpans: tracing.SpanFromContext(ctx).GetConfiguredRecording(), }) @@ -1154,13 +1192,13 @@ func (s *Store) receiveSnapshot( // to signify that it can not accept this snapshot. Internally it increments the // statistic tracking how many invalid snapshots it received. func sendSnapshotError( - ctx context.Context, s *Store, stream incomingSnapshotStream, err error, + ctx context.Context, s *Store, stream loggingIncomingSnapshotStream, err error, ) error { s.metrics.RangeSnapshotRecvFailed.Inc(1) resp := snapRespErr(err) resp.CollectedSpans = tracing.SpanFromContext(ctx).GetConfiguredRecording() - return stream.Send(resp) + return stream.Send(ctx, resp) } func snapRespErr(err error) *kvserverpb.SnapshotResponse {