From 7f1c4f4e1c188a23dd63b4bcfe9241b748037738 Mon Sep 17 00:00:00 2001 From: Alex Sarkesian Date: Thu, 6 Jul 2023 22:06:22 -0400 Subject: [PATCH] kvserver: log incoming snapshot trace on context cancellation While traces captured during an incoming snapshot are collected and returned as part of the snapshot response, both on server-side error and on successful application, when the context is cancelled (e.g. on snapshot timeout) the traces are unable to be returned and previously were discarded. This changes the functionality such that in case there is an error on sending to / reading from the incoming snapshot stream due to context cancellation, the collected traces will be logged on the server side - i.e. the snapshot recipient. Fixes: #105820 Release note: None --- pkg/kv/kvserver/store_snapshot.go | 70 ++++++++++++++++++++++++------- 1 file changed, 54 insertions(+), 16 deletions(-) diff --git a/pkg/kv/kvserver/store_snapshot.go b/pkg/kv/kvserver/store_snapshot.go index ee1ea28e198c..e028ade2f952 100644 --- a/pkg/kv/kvserver/store_snapshot.go +++ b/pkg/kv/kvserver/store_snapshot.go @@ -93,6 +93,43 @@ type incomingSnapshotStream interface { Recv() (*kvserverpb.SnapshotRequest, error) } +// loggingIncomingSnapshotStream wraps the interface on a GRPC stream used +// to receive a snapshot over the network, with special handling for logging +// the current tracing span on context cancellation. +type loggingIncomingSnapshotStream struct { + stream incomingSnapshotStream +} + +func (l loggingIncomingSnapshotStream) Send( + ctx context.Context, resp *kvserverpb.SnapshotResponse, +) error { + err := l.stream.Send(resp) + if err != nil && ctx.Err() != nil { + // Log trace of incoming snapshot on context cancellation (e.g. + // times out or caller goes away). + if sp := tracing.SpanFromContext(ctx); sp != nil && !sp.IsNoop() { + log.Infof(ctx, "incoming snapshot stream response send failed with error: %s\ntrace:\n%s", + err, sp.GetConfiguredRecording()) + } + } + return err +} + +func (l loggingIncomingSnapshotStream) Recv( + ctx context.Context, +) (*kvserverpb.SnapshotRequest, error) { + req, err := l.stream.Recv() + if err != nil && ctx.Err() != nil { + // Log trace of incoming snapshot on context cancellation (e.g. + // times out or caller goes away). + if sp := tracing.SpanFromContext(ctx); sp != nil && !sp.IsNoop() { + log.Infof(ctx, "incoming snapshot stream request recv failed with error: %s\ntrace:\n%s", + err, sp.GetConfiguredRecording()) + } + } + return req, err +} + // outgoingSnapshotStream is the minimal interface on a GRPC stream required // to send a snapshot over the network. type outgoingSnapshotStream interface { @@ -115,7 +152,7 @@ type snapshotStrategy interface { Receive( context.Context, *Store, - incomingSnapshotStream, + loggingIncomingSnapshotStream, kvserverpb.SnapshotRequest_Header, snapshotRecordMetrics, ) (IncomingSnapshot, error) @@ -378,7 +415,7 @@ func (tag *snapshotTimingTag) Render() []attribute.KeyValue { func (kvSS *kvBatchSnapshotStrategy) Receive( ctx context.Context, s *Store, - stream incomingSnapshotStream, + loggingStream loggingIncomingSnapshotStream, header kvserverpb.SnapshotRequest_Header, recordBytesReceived snapshotRecordMetrics, ) (IncomingSnapshot, error) { @@ -414,14 +451,14 @@ func (kvSS *kvBatchSnapshotStrategy) Receive( for { timingTag.start("recv") - req, err := stream.Recv() + req, err := loggingStream.Recv(ctx) timingTag.stop("recv") if err != nil { return noSnap, err } if req.Header != nil { err := errors.New("client error: provided a header mid-stream") - return noSnap, sendSnapshotError(ctx, s, stream, err) + return noSnap, sendSnapshotError(ctx, s, loggingStream, err) } if req.KVBatch != nil { @@ -487,7 +524,7 @@ func (kvSS *kvBatchSnapshotStrategy) Receive( snapUUID, err := uuid.FromBytes(header.RaftMessageRequest.Message.Snapshot.Data) if err != nil { err = errors.Wrap(err, "client error: invalid snapshot") - return noSnap, sendSnapshotError(ctx, s, stream, err) + return noSnap, sendSnapshotError(ctx, s, loggingStream, err) } inSnap := IncomingSnapshot{ @@ -995,6 +1032,7 @@ func (s *Store) getLocalityComparison( func (s *Store) receiveSnapshot( ctx context.Context, header *kvserverpb.SnapshotRequest_Header, stream incomingSnapshotStream, ) error { + loggingStream := loggingIncomingSnapshotStream{stream} // Draining nodes will generally not be rebalanced to (see the filtering that // happens in getStoreListFromIDsLocked()), but in case they are, they should // reject the incoming rebalancing snapshots. @@ -1009,7 +1047,7 @@ func (s *Store) receiveSnapshot( // getStoreListFromIDsLocked(). Is that sound? Don't we want to // upreplicate to draining nodes if there are no other candidates? case kvserverpb.SnapshotRequest_REBALANCE: - return sendSnapshotError(ctx, s, stream, errors.New(storeDrainingMsg)) + return sendSnapshotError(ctx, s, loggingStream, errors.New(storeDrainingMsg)) default: // If this a new snapshot type that this cockroach version does not know // about, we let it through. @@ -1020,7 +1058,7 @@ func (s *Store) receiveSnapshot( if err := fn(header); err != nil { // NB: we intentionally don't mark this error as errMarkSnapshotError so // that we don't end up retrying injected errors in tests. - return sendSnapshotError(ctx, s, stream, err) + return sendSnapshotError(ctx, s, loggingStream, err) } } @@ -1060,7 +1098,7 @@ func (s *Store) receiveSnapshot( return nil }); pErr != nil { log.Infof(ctx, "cannot accept snapshot: %s", pErr) - return sendSnapshotError(ctx, s, stream, pErr.GoError()) + return sendSnapshotError(ctx, s, loggingStream, pErr.GoError()) } defer func() { @@ -1082,7 +1120,7 @@ func (s *Store) receiveSnapshot( snapUUID, err := uuid.FromBytes(header.RaftMessageRequest.Message.Snapshot.Data) if err != nil { err = errors.Wrap(err, "invalid snapshot") - return sendSnapshotError(ctx, s, stream, err) + return sendSnapshotError(ctx, s, loggingStream, err) } ss = &kvBatchSnapshotStrategy{ @@ -1092,13 +1130,13 @@ func (s *Store) receiveSnapshot( } defer ss.Close(ctx) default: - return sendSnapshotError(ctx, s, stream, + return sendSnapshotError(ctx, s, loggingStream, errors.Errorf("%s,r%d: unknown snapshot strategy: %s", s, header.State.Desc.RangeID, header.Strategy), ) } - if err := stream.Send(&kvserverpb.SnapshotResponse{Status: kvserverpb.SnapshotResponse_ACCEPTED}); err != nil { + if err := loggingStream.Send(ctx, &kvserverpb.SnapshotResponse{Status: kvserverpb.SnapshotResponse_ACCEPTED}); err != nil { return err } if log.V(2) { @@ -1125,7 +1163,7 @@ func (s *Store) receiveSnapshot( } ctx, rSp := tracing.EnsureChildSpan(ctx, s.cfg.Tracer(), "receive snapshot data") defer rSp.Finish() // Ensure that the tracing span is closed, even if ss.Receive errors - inSnap, err := ss.Receive(ctx, s, stream, *header, recordBytesReceived) + inSnap, err := ss.Receive(ctx, s, loggingStream, *header, recordBytesReceived) if err != nil { return err } @@ -1142,9 +1180,9 @@ func (s *Store) receiveSnapshot( // sender as this being a retriable error, see isSnapshotError(). err = errors.Mark(err, errMarkSnapshotError) err = errors.Wrap(err, "failed to apply snapshot") - return sendSnapshotError(ctx, s, stream, err) + return sendSnapshotError(ctx, s, loggingStream, err) } - return stream.Send(&kvserverpb.SnapshotResponse{ + return loggingStream.Send(ctx, &kvserverpb.SnapshotResponse{ Status: kvserverpb.SnapshotResponse_APPLIED, CollectedSpans: tracing.SpanFromContext(ctx).GetConfiguredRecording(), }) @@ -1154,13 +1192,13 @@ func (s *Store) receiveSnapshot( // to signify that it can not accept this snapshot. Internally it increments the // statistic tracking how many invalid snapshots it received. func sendSnapshotError( - ctx context.Context, s *Store, stream incomingSnapshotStream, err error, + ctx context.Context, s *Store, stream loggingIncomingSnapshotStream, err error, ) error { s.metrics.RangeSnapshotRecvFailed.Inc(1) resp := snapRespErr(err) resp.CollectedSpans = tracing.SpanFromContext(ctx).GetConfiguredRecording() - return stream.Send(resp) + return stream.Send(ctx, resp) } func snapRespErr(err error) *kvserverpb.SnapshotResponse {