Skip to content

Commit

Permalink
kvserver/rangefeed: handle StreamMuxer.run errors
Browse files Browse the repository at this point in the history
Previously, the mux rangefeed completion watcher ignored stream.Send errors and
simply returned, allowing the main loop to continue receiving rangefeed requests
even after completion watcher completes. This meant that grpc stream continues
taking rangefeed requests even after the rangefeed completion watcher shut down.
While not currently problematic, this issue gets worse as the stream muxer
begins handling additional tasks, such as invoking rangefeed cleanup callbacks.
Additionally, the main loop did not watch for context cancellation or the
stopper. This patch addresses these issues by ensuring proper error handling in
the main loop.

Part of: cockroachdb#126561
Release note: none
  • Loading branch information
wenyihu6 committed Jul 10, 2024
1 parent d0979cb commit d1e6d59
Show file tree
Hide file tree
Showing 2 changed files with 103 additions and 72 deletions.
36 changes: 29 additions & 7 deletions pkg/kv/kvserver/rangefeed/stream_muxer.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,13 @@ type StreamMuxer struct {
// there is only one task spawned by StreamMuxer.Start (StreamMuxer.run).
wg sync.WaitGroup

// errCh is used to signal errors from StreamMuxer.run back to the caller. If
// non-empty, the StreamMuxer.run is finished and error should be handled.
// Note that it is possible for StreamMuxer.run to be finished without sending
// an error to errCh. Other goroutines are expected to receive the same
// shutdown signal in this case and handle error appropriately.
errCh chan error

// Note that lockedMuxStream wraps the underlying grpc server stream, ensuring
// thread safety.
sender ServerStreamSender
Expand Down Expand Up @@ -149,25 +156,37 @@ func (sm *StreamMuxer) detachMuxErrors() []*kvpb.MuxRangeFeedEvent {
// to be called in a goroutine and will block until the context is done or the
// stopper is quiesced. StreamMuxer will stop forward rangefeed completion
// errors after run completes, and caller is responsible for handling shutdown.
func (sm *StreamMuxer) run(ctx context.Context, stopper *stop.Stopper) {
func (sm *StreamMuxer) run(ctx context.Context, stopper *stop.Stopper) error {
for {
select {
case <-sm.notifyMuxError:
for _, clientErr := range sm.detachMuxErrors() {
if err := sm.sender.Send(clientErr); err != nil {
log.Errorf(ctx,
"failed to send rangefeed completion error back to client due to broken stream: %v", err)
return
return err
}
}
case <-ctx.Done():
return
// Top level goroutine will receive the context cancellation and handle
// ctx.Err().
return nil
case <-stopper.ShouldQuiesce():
return
// Top level goroutine will receive the stopper quiesce signal and handle
// error.
return nil
}
}
}

// Error returns a channel that can be used to receive errors from
// StreamMuxer.run. Only non-nil errors are sent on this channel. If non-empty,
// streamMuxer.run is finished, and the caller is responsible for handling the
// error.
func (sm *StreamMuxer) Error() chan error {
return sm.errCh
}

// Stop cancels the StreamMuxer.run task and waits for it to complete. It does
// nothing if StreamMuxer.run is already finished. It is expected to be called
// after StreamMuxer.Start. Note that the caller is responsible for handling any
Expand All @@ -177,8 +196,8 @@ func (sm *StreamMuxer) Stop() {
sm.wg.Wait()
}

// Start launches StreamMuxer.run in the background if no error is returned.
// StreamMuxer.run continues running until it errors or StreamMuxer.Stop is
// Start launches StreamMuxer.Run in the background if no error is returned.
// StreamMuxer.Run continues running until it errors or StreamMuxer.Stop is
// called. The caller is responsible for calling StreamMuxer.Stop and handle any
// cleanups for any active streams. Note that it is not valid to call Start
// multiple times or restart after Stop. Example usage:
Expand All @@ -189,11 +208,14 @@ func (sm *StreamMuxer) Stop() {
//
// defer streamMuxer.Stop()
func (sm *StreamMuxer) Start(ctx context.Context, stopper *stop.Stopper) error {
sm.errCh = make(chan error, 1)
ctx, sm.taskCancel = context.WithCancel(ctx)
sm.wg.Add(1)
if err := stopper.RunAsyncTask(ctx, "test-stream-muxer", func(ctx context.Context) {
defer sm.wg.Done()
sm.run(ctx, stopper)
if err := sm.run(ctx, stopper); err != nil {
sm.errCh <- err
}
}); err != nil {
sm.taskCancel()
sm.wg.Done()
Expand Down
139 changes: 74 additions & 65 deletions pkg/server/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -1885,81 +1885,90 @@ func (n *Node) MuxRangeFeed(stream kvpb.Internal_MuxRangeFeedServer) error {
var activeStreams sync.Map

for {
req, err := stream.Recv()
if err != nil {
select {
case err := <-streamMuxer.Error():
return err
}
case <-ctx.Done():
return ctx.Err()
case <-n.stopper.ShouldQuiesce():
return stop.ErrUnavailable
default:
req, err := stream.Recv()
if err != nil {
return err
}

if req.CloseStream {
// Client issued a request to close previously established stream.
if v, loaded := activeStreams.LoadAndDelete(req.StreamID); loaded {
s := v.(*setRangeIDEventSink)
s.cancel()
} else {
// This is a bit strange, but it could happen if this stream completes
// just before we receive close request. So, just print out a warning.
if log.V(1) {
log.Infof(ctx, "closing unknown rangefeed stream ID %d", req.StreamID)
if req.CloseStream {
// Client issued a request to close previously established stream.
if v, loaded := activeStreams.LoadAndDelete(req.StreamID); loaded {
s := v.(*setRangeIDEventSink)
s.cancel()
} else {
// This is a bit strange, but it could happen if this stream completes
// just before we receive close request. So, just print out a warning.
if log.V(1) {
log.Infof(ctx, "closing unknown rangefeed stream ID %d", req.StreamID)
}
}
continue
}
continue
}

streamCtx, cancel := context.WithCancel(ctx)
streamCtx = logtags.AddTag(streamCtx, "r", req.RangeID)
streamCtx = logtags.AddTag(streamCtx, "s", req.Replica.StoreID)
streamCtx = logtags.AddTag(streamCtx, "sid", req.StreamID)

streamSink := &setRangeIDEventSink{
ctx: streamCtx,
cancel: cancel,
rangeID: req.RangeID,
streamID: req.StreamID,
wrapped: muxStream,
}
activeStreams.Store(req.StreamID, streamSink)

n.metrics.NumMuxRangeFeed.Inc(1)
n.metrics.ActiveMuxRangeFeed.Inc(1)
f := n.stores.RangeFeed(req, streamSink)
f.WhenReady(func(err error) {
n.metrics.ActiveMuxRangeFeed.Inc(-1)

_, loaded := activeStreams.LoadAndDelete(req.StreamID)
streamClosedByClient := !loaded
streamSink.cancel()

if streamClosedByClient && streamSink.ctx.Err() != nil {
// If the stream was explicitly closed by the client, we expect to see
// context.Canceled error. In this case, return
// kvpb.RangeFeedRetryError_REASON_RANGEFEED_CLOSED to the client.
err = kvpb.NewRangeFeedRetryError(kvpb.RangeFeedRetryError_REASON_RANGEFEED_CLOSED)
streamCtx, cancel := context.WithCancel(ctx)
streamCtx = logtags.AddTag(streamCtx, "r", req.RangeID)
streamCtx = logtags.AddTag(streamCtx, "s", req.Replica.StoreID)
streamCtx = logtags.AddTag(streamCtx, "sid", req.StreamID)

streamSink := &setRangeIDEventSink{
ctx: streamCtx,
cancel: cancel,
rangeID: req.RangeID,
streamID: req.StreamID,
wrapped: muxStream,
}
activeStreams.Store(req.StreamID, streamSink)

n.metrics.NumMuxRangeFeed.Inc(1)
n.metrics.ActiveMuxRangeFeed.Inc(1)
f := n.stores.RangeFeed(req, streamSink)
f.WhenReady(func(err error) {
n.metrics.ActiveMuxRangeFeed.Inc(-1)

_, loaded := activeStreams.LoadAndDelete(req.StreamID)
streamClosedByClient := !loaded
streamSink.cancel()

if streamClosedByClient && streamSink.ctx.Err() != nil {
// If the stream was explicitly closed by the client, we expect to see
// context.Canceled error. In this case, return
// kvpb.RangeFeedRetryError_REASON_RANGEFEED_CLOSED to the client.
err = kvpb.NewRangeFeedRetryError(kvpb.RangeFeedRetryError_REASON_RANGEFEED_CLOSED)
}

if err == nil {
cause := kvpb.RangeFeedRetryError_REASON_RANGEFEED_CLOSED
err = kvpb.NewRangeFeedRetryError(cause)
}
if err == nil {
cause := kvpb.RangeFeedRetryError_REASON_RANGEFEED_CLOSED
err = kvpb.NewRangeFeedRetryError(cause)
}

e := &kvpb.MuxRangeFeedEvent{
RangeID: req.RangeID,
StreamID: req.StreamID,
}
e := &kvpb.MuxRangeFeedEvent{
RangeID: req.RangeID,
StreamID: req.StreamID,
}

e.SetValue(&kvpb.RangeFeedError{
Error: *kvpb.NewError(err),
e.SetValue(&kvpb.RangeFeedError{
Error: *kvpb.NewError(err),
})

// When rangefeed completes, we must notify the client about that.
//
// NB: even though calling sink.Send() to send notification might seem
// correct, it is also unsafe. This future may be completed at any point,
// including during critical section when some important lock (such as
// raftMu in processor) may be held. Issuing potentially blocking IO
// during that time is not a good idea. Thus, we shunt the notification to
// a dedicated goroutine.
streamMuxer.AppendMuxError(e)
})

// When rangefeed completes, we must notify the client about that.
//
// NB: even though calling sink.Send() to send notification might seem
// correct, it is also unsafe. This future may be completed at any point,
// including during critical section when some important lock (such as
// raftMu in processor) may be held. Issuing potentially blocking IO
// during that time is not a good idea. Thus, we shunt the notification to
// a dedicated goroutine.
streamMuxer.AppendMuxError(e)
})
}
}
}

Expand Down

0 comments on commit d1e6d59

Please sign in to comment.