diff --git a/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed.go b/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed.go index 5fdccc5567da..df3771562f04 100644 --- a/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed.go +++ b/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed.go @@ -509,6 +509,9 @@ func handleRangefeedError(ctx context.Context, err error) (rangefeedErrorInfo, e } switch { + case errors.Is(err, io.EOF): + // If we got an EOF, treat it as a signal to restart single range feed. + return rangefeedErrorInfo{}, nil case errors.HasType(err, (*kvpb.StoreNotFoundError)(nil)) || errors.HasType(err, (*kvpb.NodeUnavailableError)(nil)): // These errors are likely to be unique to the replica that @@ -750,9 +753,6 @@ func (ds *DistSender) singleRangeFeed( return err }); err != nil { log.VErrEventf(ctx, 2, "RPC error: %s", err) - if err == io.EOF { - return args.Timestamp, nil - } if stuckWatcher.stuck() { afterCatchUpScan := catchupRes == nil return args.Timestamp, ds.handleStuckEvent(&args, afterCatchUpScan, stuckWatcher.threshold()) diff --git a/pkg/kv/kvserver/client_replica_circuit_breaker_test.go b/pkg/kv/kvserver/client_replica_circuit_breaker_test.go index 865b200154b4..d3540c62f219 100644 --- a/pkg/kv/kvserver/client_replica_circuit_breaker_test.go +++ b/pkg/kv/kvserver/client_replica_circuit_breaker_test.go @@ -428,7 +428,11 @@ func waitReplicaRangeFeed( if ctxErr != nil { return ctxErr } - return rfErr + var event kvpb.RangeFeedEvent + event.SetValue(&kvpb.RangeFeedError{ + Error: *kvpb.NewError(rfErr), + }) + return stream.Send(&event) } // This test verifies that RangeFeed bypasses the circuit breaker. When the diff --git a/pkg/kv/kvserver/rangefeed/registry.go b/pkg/kv/kvserver/rangefeed/registry.go index 229b1dfcb25e..17e268ff1282 100644 --- a/pkg/kv/kvserver/rangefeed/registry.go +++ b/pkg/kv/kvserver/rangefeed/registry.go @@ -503,10 +503,8 @@ func (reg *registry) Disconnect(span roachpb.Span) { // DisconnectWithErr disconnects all registrations that overlap the specified // span with the provided error. func (reg *registry) DisconnectWithErr(span roachpb.Span, pErr *kvpb.Error) { - err := pErr.GoError() reg.forOverlappingRegs(span, func(r *registration) (bool, *kvpb.Error) { - r.done.Set(err) - return true, pErr + return true /* disconned */, pErr }) } diff --git a/pkg/kv/kvserver/replica_rangefeed.go b/pkg/kv/kvserver/replica_rangefeed.go index 494128048e77..2f1590f6ff63 100644 --- a/pkg/kv/kvserver/replica_rangefeed.go +++ b/pkg/kv/kvserver/replica_rangefeed.go @@ -146,12 +146,7 @@ func (r *Replica) RangeFeed( } if err := r.ensureClosedTimestampStarted(ctx); err != nil { - if err := stream.Send(&kvpb.RangeFeedEvent{Error: &kvpb.RangeFeedError{ - Error: *err, - }}); err != nil { - return future.MakeCompletedErrorFuture(err) - } - return future.MakeCompletedErrorFuture(nil) + return future.MakeCompletedErrorFuture(err.GoError()) } // If the RangeFeed is performing a catch-up scan then it will observe all diff --git a/pkg/server/node.go b/pkg/server/node.go index 39166ff8538a..b4728d14dc41 100644 --- a/pkg/server/node.go +++ b/pkg/server/node.go @@ -17,6 +17,7 @@ import ( "net" "sort" "strings" + "sync" "time" "github.com/cockroachdb/cockroach/pkg/base" @@ -1514,9 +1515,85 @@ func (s *lockedMuxStream) Send(e *kvpb.MuxRangeFeedEvent) error { return s.wrapped.Send(e) } +// newMuxRangeFeedCompletionWatcher returns 2 functions: one to forward mux +// rangefeed completion events to the sender, and a cleanup function. Mux +// rangefeed completion events can be triggered at any point, and we would like +// to avoid blocking on IO (sender.Send) during potentially critical areas. +// Thus, the forwarding should happen on a dedicated goroutine. +func newMuxRangeFeedCompletionWatcher( + ctx context.Context, stopper *stop.Stopper, sender *lockedMuxStream, +) (doneFn func(event *kvpb.MuxRangeFeedEvent), cleanup func(), _ error) { + // structure to help coordination of event forwarding and shutdown. + var fin = struct { + syncutil.Mutex + completed []*kvpb.MuxRangeFeedEvent + signalC chan struct{} + }{ + // NB: a buffer of 1 ensures we can always send a signal when rangefeed completes. + signalC: make(chan struct{}, 1), + } + + // forwardCompletion listens to completion notifications and forwards + // them to the sender. + forwardCompletion := func(ctx context.Context) { + for { + select { + case <-fin.signalC: + var toSend []*kvpb.MuxRangeFeedEvent + fin.Lock() + toSend, fin.completed = fin.completed, nil + fin.Unlock() + for _, e := range toSend { + if err := sender.Send(e); err != nil { + // If we failed to send, there is nothing else we can do. + // The stream is broken anyway. + return + } + } + case <-sender.wrapped.Context().Done(): + return + case <-ctx.Done(): + return + case <-stopper.ShouldQuiesce(): + // There is nothing we can do here; stream cancellation is usually + // triggered by the client. We don't have access to stream cancellation + // function; so, just let things proceed until the server shuts down. + return + } + } + } + + var wg sync.WaitGroup + wg.Add(1) + if err := stopper.RunAsyncTask(ctx, "mux-term-forwarder", func(ctx context.Context) { + defer wg.Done() + forwardCompletion(ctx) + }); err != nil { + return nil, nil, err + } + + addCompleted := func(event *kvpb.MuxRangeFeedEvent) { + fin.Lock() + fin.completed = append(fin.completed, event) + fin.Unlock() + select { + case fin.signalC <- struct{}{}: + default: + } + } + return addCompleted, wg.Wait, nil +} + // MuxRangeFeed implements the roachpb.InternalServer interface. func (n *Node) MuxRangeFeed(stream kvpb.Internal_MuxRangeFeedServer) error { muxStream := &lockedMuxStream{wrapped: stream} + + rangefeedCompleted, cleanup, err := newMuxRangeFeedCompletionWatcher(stream.Context(), n.stopper, muxStream) + if err != nil { + return err + } + defer cleanup() + for { req, err := stream.Recv() if err != nil { @@ -1537,15 +1614,33 @@ func (n *Node) MuxRangeFeed(stream kvpb.Internal_MuxRangeFeedServer) error { // TODO(yevgeniy): Add observability into actively running rangefeeds. f := n.stores.RangeFeed(req, &sink) f.WhenReady(func(err error) { - if err != nil { - var event kvpb.RangeFeedEvent - event.SetValue(&kvpb.RangeFeedError{ - Error: *kvpb.NewError(err), - }) - // Sending could fail, but if it did, the stream is broken anyway, so - // nothing we can do with this error. - _ = sink.Send(&event) + if err == nil { + // RangeFeed usually finishes with an error. However, if future + // completes with nil error (which could happen e.g. during processor + // shutdown), treat it as a normal stream termination so that the caller + // restarts. + // TODO(101330): Add an explicit retry reason instead of REPLICA_REMOVED. + err = kvpb.NewRangeFeedRetryError(kvpb.RangeFeedRetryError_REASON_REPLICA_REMOVED) + } + + e := &kvpb.MuxRangeFeedEvent{ + RangeID: req.RangeID, + StreamID: req.StreamID, } + + e.SetValue(&kvpb.RangeFeedError{ + Error: *kvpb.NewError(err), + }) + + // When rangefeed completes, we must notify the client about that. + // + // NB: even though calling sink.Send() to send notification might seem + // correct, it is also unsafe. This future may be completed at any point, + // including during critical section when some important lock (such as + // raftMu in processor) may be held. Issuing potentially blocking IO + // during that time is not a good idea. Thus, we shunt the notification to + // a dedicated goroutine. + rangefeedCompleted(e) }) } }