From 9d771040507415a1ddd21507a4a79c1078ef13c2 Mon Sep 17 00:00:00 2001 From: Wenyi Hu Date: Fri, 28 Jun 2024 20:50:31 -0400 Subject: [PATCH] kvserver/rangefeed: handle StreamMuxer.run errors Previously, the mux rangefeed completion watcher ignored stream.Send errors and simply returned, allowing the main loop to continue receiving rangefeed requests even after completion watcher completes. This meant that grpc stream continues taking rangefeed requests even after the rangefeed completion watcher shut down. While not currently problematic, this issue gets worse as the stream muxer begins handling additional tasks, such as invoking rangefeed cleanup callbacks. Additionally, the main loop did not watch for context cancellation or the stopper. This patch addresses these issues by ensuring proper error handling in the main loop. Part of: #126561 Release note: none --- pkg/kv/kvserver/rangefeed/stream_muxer.go | 38 +++++- pkg/server/node.go | 139 ++++++++++++---------- 2 files changed, 107 insertions(+), 70 deletions(-) diff --git a/pkg/kv/kvserver/rangefeed/stream_muxer.go b/pkg/kv/kvserver/rangefeed/stream_muxer.go index 17540caa5c61..8fa54c03fa40 100644 --- a/pkg/kv/kvserver/rangefeed/stream_muxer.go +++ b/pkg/kv/kvserver/rangefeed/stream_muxer.go @@ -94,6 +94,13 @@ type StreamMuxer struct { // there is only one task spawned by StreamMuxer.Start (StreamMuxer.run). wg sync.WaitGroup + // errCh is used to signal errors from StreamMuxer.run back to the caller. If + // non-empty, the StreamMuxer.run is finished and error should be handled. + // Note that it is possible for StreamMuxer.run to be finished without sending + // an error to errCh. Other goroutines are expected to receive the same + // shutdown signal in this case and handle error appropriately. + errCh chan error + // Note that lockedMuxStream wraps the underlying grpc server stream, ensuring // thread safety. sender ServerStreamSender @@ -149,7 +156,7 @@ func (sm *StreamMuxer) detachMuxErrors() []*kvpb.MuxRangeFeedEvent { // to be called in a goroutine and will block until the context is done or the // stopper is quiesced. StreamMuxer will stop forward rangefeed completion // errors after run completes, and caller is responsible for handling shutdown. -func (sm *StreamMuxer) run(ctx context.Context, stopper *stop.Stopper) { +func (sm *StreamMuxer) run(ctx context.Context, stopper *stop.Stopper) error { for { select { case <-sm.notifyMuxError: @@ -157,17 +164,32 @@ func (sm *StreamMuxer) run(ctx context.Context, stopper *stop.Stopper) { if err := sm.sender.Send(clientErr); err != nil { log.Errorf(ctx, "failed to send rangefeed completion error back to client due to broken stream: %v", err) - return + return err } } case <-ctx.Done(): - return + // Top level goroutine will receive the context cancellation and handle + // ctx.Err(). + return nil case <-stopper.ShouldQuiesce(): - return + // Top level goroutine will receive the stopper quiesce signal and handle + // error. + return nil } } } +// Error returns a channel that can be used to receive errors from +// StreamMuxer.run. Only non-nil errors are sent on this channel. If non-empty, +// streamMuxer.run is finished, and the caller is responsible for handling the +// error. +func (sm *StreamMuxer) Error() chan error { + if sm.errCh == nil { + log.Fatalf(context.Background(), "StreamMuxer.Error called before StreamMuxer.Start") + } + return sm.errCh +} + // Stop cancels the StreamMuxer.run task and waits for it to complete. It does // nothing if StreamMuxer.run is already finished. It is expected to be called // after StreamMuxer.Start. Note that the caller is responsible for handling any @@ -189,11 +211,17 @@ func (sm *StreamMuxer) Stop() { // // defer streamMuxer.Stop() func (sm *StreamMuxer) Start(ctx context.Context, stopper *stop.Stopper) error { + if sm.errCh != nil { + log.Fatalf(ctx, "StreamMuxer.Start called multiple times") + } + sm.errCh = make(chan error, 1) ctx, sm.taskCancel = context.WithCancel(ctx) sm.wg.Add(1) if err := stopper.RunAsyncTask(ctx, "test-stream-muxer", func(ctx context.Context) { defer sm.wg.Done() - sm.run(ctx, stopper) + if err := sm.run(ctx, stopper); err != nil { + sm.errCh <- err + } }); err != nil { sm.taskCancel() sm.wg.Done() diff --git a/pkg/server/node.go b/pkg/server/node.go index b25d48ba6889..a71cae3384fe 100644 --- a/pkg/server/node.go +++ b/pkg/server/node.go @@ -1885,81 +1885,90 @@ func (n *Node) MuxRangeFeed(stream kvpb.Internal_MuxRangeFeedServer) error { var activeStreams sync.Map for { - req, err := stream.Recv() - if err != nil { + select { + case err := <-streamMuxer.Error(): return err - } + case <-ctx.Done(): + return ctx.Err() + case <-n.stopper.ShouldQuiesce(): + return stop.ErrUnavailable + default: + req, err := stream.Recv() + if err != nil { + return err + } - if req.CloseStream { - // Client issued a request to close previously established stream. - if v, loaded := activeStreams.LoadAndDelete(req.StreamID); loaded { - s := v.(*setRangeIDEventSink) - s.cancel() - } else { - // This is a bit strange, but it could happen if this stream completes - // just before we receive close request. So, just print out a warning. - if log.V(1) { - log.Infof(ctx, "closing unknown rangefeed stream ID %d", req.StreamID) + if req.CloseStream { + // Client issued a request to close previously established stream. + if v, loaded := activeStreams.LoadAndDelete(req.StreamID); loaded { + s := v.(*setRangeIDEventSink) + s.cancel() + } else { + // This is a bit strange, but it could happen if this stream completes + // just before we receive close request. So, just print out a warning. + if log.V(1) { + log.Infof(ctx, "closing unknown rangefeed stream ID %d", req.StreamID) + } } + continue } - continue - } - streamCtx, cancel := context.WithCancel(ctx) - streamCtx = logtags.AddTag(streamCtx, "r", req.RangeID) - streamCtx = logtags.AddTag(streamCtx, "s", req.Replica.StoreID) - streamCtx = logtags.AddTag(streamCtx, "sid", req.StreamID) - - streamSink := &setRangeIDEventSink{ - ctx: streamCtx, - cancel: cancel, - rangeID: req.RangeID, - streamID: req.StreamID, - wrapped: muxStream, - } - activeStreams.Store(req.StreamID, streamSink) - - n.metrics.NumMuxRangeFeed.Inc(1) - n.metrics.ActiveMuxRangeFeed.Inc(1) - f := n.stores.RangeFeed(req, streamSink) - f.WhenReady(func(err error) { - n.metrics.ActiveMuxRangeFeed.Inc(-1) - - _, loaded := activeStreams.LoadAndDelete(req.StreamID) - streamClosedByClient := !loaded - streamSink.cancel() - - if streamClosedByClient && streamSink.ctx.Err() != nil { - // If the stream was explicitly closed by the client, we expect to see - // context.Canceled error. In this case, return - // kvpb.RangeFeedRetryError_REASON_RANGEFEED_CLOSED to the client. - err = kvpb.NewRangeFeedRetryError(kvpb.RangeFeedRetryError_REASON_RANGEFEED_CLOSED) + streamCtx, cancel := context.WithCancel(ctx) + streamCtx = logtags.AddTag(streamCtx, "r", req.RangeID) + streamCtx = logtags.AddTag(streamCtx, "s", req.Replica.StoreID) + streamCtx = logtags.AddTag(streamCtx, "sid", req.StreamID) + + streamSink := &setRangeIDEventSink{ + ctx: streamCtx, + cancel: cancel, + rangeID: req.RangeID, + streamID: req.StreamID, + wrapped: muxStream, } + activeStreams.Store(req.StreamID, streamSink) + + n.metrics.NumMuxRangeFeed.Inc(1) + n.metrics.ActiveMuxRangeFeed.Inc(1) + f := n.stores.RangeFeed(req, streamSink) + f.WhenReady(func(err error) { + n.metrics.ActiveMuxRangeFeed.Inc(-1) + + _, loaded := activeStreams.LoadAndDelete(req.StreamID) + streamClosedByClient := !loaded + streamSink.cancel() + + if streamClosedByClient && streamSink.ctx.Err() != nil { + // If the stream was explicitly closed by the client, we expect to see + // context.Canceled error. In this case, return + // kvpb.RangeFeedRetryError_REASON_RANGEFEED_CLOSED to the client. + err = kvpb.NewRangeFeedRetryError(kvpb.RangeFeedRetryError_REASON_RANGEFEED_CLOSED) + } - if err == nil { - cause := kvpb.RangeFeedRetryError_REASON_RANGEFEED_CLOSED - err = kvpb.NewRangeFeedRetryError(cause) - } + if err == nil { + cause := kvpb.RangeFeedRetryError_REASON_RANGEFEED_CLOSED + err = kvpb.NewRangeFeedRetryError(cause) + } - e := &kvpb.MuxRangeFeedEvent{ - RangeID: req.RangeID, - StreamID: req.StreamID, - } + e := &kvpb.MuxRangeFeedEvent{ + RangeID: req.RangeID, + StreamID: req.StreamID, + } - e.SetValue(&kvpb.RangeFeedError{ - Error: *kvpb.NewError(err), + e.SetValue(&kvpb.RangeFeedError{ + Error: *kvpb.NewError(err), + }) + + // When rangefeed completes, we must notify the client about that. + // + // NB: even though calling sink.Send() to send notification might seem + // correct, it is also unsafe. This future may be completed at any point, + // including during critical section when some important lock (such as + // raftMu in processor) may be held. Issuing potentially blocking IO + // during that time is not a good idea. Thus, we shunt the notification to + // a dedicated goroutine. + streamMuxer.AppendMuxError(e) }) - - // When rangefeed completes, we must notify the client about that. - // - // NB: even though calling sink.Send() to send notification might seem - // correct, it is also unsafe. This future may be completed at any point, - // including during critical section when some important lock (such as - // raftMu in processor) may be held. Issuing potentially blocking IO - // during that time is not a good idea. Thus, we shunt the notification to - // a dedicated goroutine. - streamMuxer.AppendMuxError(e) - }) + } } }