diff --git a/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed.go b/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed.go index dc3c5c272f87..b7ebd427702a 100644 --- a/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed.go +++ b/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed.go @@ -509,6 +509,11 @@ func handleRangefeedError(ctx context.Context, err error) (rangefeedErrorInfo, e return rangefeedErrorInfo{}, nil } + if err == io.EOF { + // If we got an EOF, treat it as a signal to restart single range feed. + return rangefeedErrorInfo{}, nil + } + switch { case errors.HasType(err, (*kvpb.StoreNotFoundError)(nil)) || errors.HasType(err, (*kvpb.NodeUnavailableError)(nil)): @@ -751,9 +756,6 @@ func (ds *DistSender) singleRangeFeed( return err }); err != nil { log.VErrEventf(ctx, 2, "RPC error: %s", err) - if err == io.EOF { - return args.Timestamp, nil - } if stuckWatcher.stuck() { afterCatchUpScan := catchupRes == nil return args.Timestamp, ds.handleStuckEvent(&args, afterCatchUpScan, stuckWatcher.threshold()) diff --git a/pkg/kv/kvserver/client_replica_circuit_breaker_test.go b/pkg/kv/kvserver/client_replica_circuit_breaker_test.go index 865b200154b4..d3540c62f219 100644 --- a/pkg/kv/kvserver/client_replica_circuit_breaker_test.go +++ b/pkg/kv/kvserver/client_replica_circuit_breaker_test.go @@ -428,7 +428,11 @@ func waitReplicaRangeFeed( if ctxErr != nil { return ctxErr } - return rfErr + var event kvpb.RangeFeedEvent + event.SetValue(&kvpb.RangeFeedError{ + Error: *kvpb.NewError(rfErr), + }) + return stream.Send(&event) } // This test verifies that RangeFeed bypasses the circuit breaker. When the diff --git a/pkg/kv/kvserver/rangefeed/registry.go b/pkg/kv/kvserver/rangefeed/registry.go index 229b1dfcb25e..17e268ff1282 100644 --- a/pkg/kv/kvserver/rangefeed/registry.go +++ b/pkg/kv/kvserver/rangefeed/registry.go @@ -503,10 +503,8 @@ func (reg *registry) Disconnect(span roachpb.Span) { // DisconnectWithErr disconnects all registrations that overlap the specified // span with the provided error. func (reg *registry) DisconnectWithErr(span roachpb.Span, pErr *kvpb.Error) { - err := pErr.GoError() reg.forOverlappingRegs(span, func(r *registration) (bool, *kvpb.Error) { - r.done.Set(err) - return true, pErr + return true /* disconned */, pErr }) } diff --git a/pkg/kv/kvserver/replica_rangefeed.go b/pkg/kv/kvserver/replica_rangefeed.go index ad86a3a6badc..6bffb1a08bbb 100644 --- a/pkg/kv/kvserver/replica_rangefeed.go +++ b/pkg/kv/kvserver/replica_rangefeed.go @@ -159,12 +159,7 @@ func (r *Replica) RangeFeed( } if err := r.ensureClosedTimestampStarted(ctx); err != nil { - if err := stream.Send(&kvpb.RangeFeedEvent{Error: &kvpb.RangeFeedError{ - Error: *err, - }}); err != nil { - return future.MakeCompletedErrorFuture(err) - } - return future.MakeCompletedErrorFuture(nil) + return future.MakeCompletedErrorFuture(err.GoError()) } // If the RangeFeed is performing a catch-up scan then it will observe all diff --git a/pkg/server/node.go b/pkg/server/node.go index 39166ff8538a..861d7750bb12 100644 --- a/pkg/server/node.go +++ b/pkg/server/node.go @@ -17,6 +17,7 @@ import ( "net" "sort" "strings" + "sync" "time" "github.com/cockroachdb/cockroach/pkg/base" @@ -1514,9 +1515,108 @@ func (s *lockedMuxStream) Send(e *kvpb.MuxRangeFeedEvent) error { return s.wrapped.Send(e) } +// newMuxRangeFeedCompletionWatcher returns 2 functions: one to forward mux +// rangefeed completion events to the sender, and a cleanup function. Mux +// rangefeed completion events can be triggered at any point, and we would like +// to avoid blocking on IO (sender.Send) during potentially critical areas. +// Thus, the forwarding should happen on a dedicated goroutine. +// However, some simple solutions might not work well: +// 1. spinning a dedicated goroutine whenever rangefeed completes is +// undesirable because we could wind up spinning thousands of those goroutines +// in short burst. +// 2. having a single goroutine process completion requests would require the +// use of a channel; but it's not clear what the channel size should be -- +// sufficiently sized bursts of termination would fill this channel, leading +// to wait time -- potentially during critical section, which is something we +// want to avoid. +// +// The solution presented by this function, adds an unbounded queue of +// completion requests, serviced by a dedicated goroutine. +func newMuxRangeFeedCompletionWatcher( + ctx context.Context, stopper *stop.Stopper, sender *lockedMuxStream, +) (doneFn func(event *kvpb.MuxRangeFeedEvent), cleanup func(), _ error) { + // structure to help coordination of event forwarding and shutdown. + var fin = struct { + syncutil.Mutex + cv *sync.Cond + stop bool + completed []*kvpb.MuxRangeFeedEvent + }{} + fin.cv = sync.NewCond(&fin) + + // stopForwarder signals completion forwarder to terminate whenever + // context or stopper is signaled. + stopForwarder := func(ctx context.Context) { + select { + case <-ctx.Done(): + case <-stopper.ShouldQuiesce(): + } + fin.Lock() + fin.stop = true + fin.cv.Broadcast() + fin.Unlock() + } + + // forwardCompletion listens to completion notifications and forwards + // them to the sender. + forwardCompletion := func(ctx context.Context) { + for { + var toSend []*kvpb.MuxRangeFeedEvent + fin.Lock() + for toSend == nil { + if len(fin.completed) == 0 { + if fin.stop { + fin.Unlock() + return + } + fin.cv.Wait() + } + toSend = fin.completed + fin.completed = nil + } + fin.Unlock() + for _, e := range toSend { + // Sending could fail, but if it did, the stream is broken anyway, so + // nothing we can do with this error. + if err := sender.Send(e); err != nil { + return + } + } + } + } + + var wg sync.WaitGroup + wg.Add(1) + if err := stopper.RunAsyncTask(ctx, "mux-term-stopper", stopForwarder); err != nil { + return nil, nil, err + } + + if err := stopper.RunAsyncTask(ctx, "mux-term-forwarder", func(ctx context.Context) { + defer wg.Done() + forwardCompletion(ctx) + }); err != nil { + return nil, nil, err + } + + addCompleted := func(event *kvpb.MuxRangeFeedEvent) { + fin.Lock() + fin.completed = append(fin.completed, event) + fin.cv.Signal() + fin.Unlock() + } + return addCompleted, wg.Wait, nil +} + // MuxRangeFeed implements the roachpb.InternalServer interface. func (n *Node) MuxRangeFeed(stream kvpb.Internal_MuxRangeFeedServer) error { muxStream := &lockedMuxStream{wrapped: stream} + + rangefeedCompleted, cleanup, err := newMuxRangeFeedCompletionWatcher(stream.Context(), n.stopper, muxStream) + if err != nil { + return err + } + defer cleanup() + for { req, err := stream.Recv() if err != nil { @@ -1537,15 +1637,32 @@ func (n *Node) MuxRangeFeed(stream kvpb.Internal_MuxRangeFeedServer) error { // TODO(yevgeniy): Add observability into actively running rangefeeds. f := n.stores.RangeFeed(req, &sink) f.WhenReady(func(err error) { - if err != nil { - var event kvpb.RangeFeedEvent - event.SetValue(&kvpb.RangeFeedError{ - Error: *kvpb.NewError(err), - }) - // Sending could fail, but if it did, the stream is broken anyway, so - // nothing we can do with this error. - _ = sink.Send(&event) + if err == nil { + // RangeFeed usually finishes with an error. However, if future + // completes with nil error (which could happen e.g. during processor + // shutdown), treat it as a normal stream termination so that the caller + // restarts. + // TODO(yevgeniy): Add an explicit retry reason instead of REPLICA_REMOVED. + err = kvpb.NewRangeFeedRetryError(kvpb.RangeFeedRetryError_REASON_REPLICA_REMOVED) } + + // When rangefeed completes, we must notify the client about that. + // + // NB: even though calling sink.Send() to send notification might seem + // correct, it is also unsafe. This future may be completed at any point, + // including during critical section when some important lock (such as + // raftMu in processor) may be held. Issuing potentially blocking IO + // during that time is not a good idea. Thus, we shunt the notification to + // a dedicated goroutine. + e := &kvpb.MuxRangeFeedEvent{ + RangeID: req.RangeID, + StreamID: req.StreamID, + } + + e.SetValue(&kvpb.RangeFeedError{ + Error: *kvpb.NewError(err), + }) + rangefeedCompleted(e) }) } }