diff --git a/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed.go b/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed.go index c01685e0b4fd..50d94e3c5470 100644 --- a/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed.go +++ b/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed.go @@ -556,7 +556,8 @@ func handleRangefeedError(ctx context.Context, err error) (rangefeedErrorInfo, e case kvpb.RangeFeedRetryError_REASON_REPLICA_REMOVED, kvpb.RangeFeedRetryError_REASON_RAFT_SNAPSHOT, kvpb.RangeFeedRetryError_REASON_LOGICAL_OPS_MISSING, - kvpb.RangeFeedRetryError_REASON_SLOW_CONSUMER: + kvpb.RangeFeedRetryError_REASON_SLOW_CONSUMER, + kvpb.RangeFeedRetryError_REASON_RANGEFEED_CLOSED: // Try again with same descriptor. These are transient // errors that should not show up again. return rangefeedErrorInfo{}, nil diff --git a/pkg/kv/kvpb/errors.proto b/pkg/kv/kvpb/errors.proto index 24a1df3dece5..a2ceb73f47e4 100644 --- a/pkg/kv/kvpb/errors.proto +++ b/pkg/kv/kvpb/errors.proto @@ -559,6 +559,10 @@ message RangeFeedRetryError { REASON_SLOW_CONSUMER = 5; // No leaseholder exists or could be created, so closed timestamps won't be emitted. REASON_NO_LEASEHOLDER = 6; + // Replica decided that rangefeed is no longer used and closed it. This + // should not happen unless it was explicitly requested by client and in + // that case client is free not to retry. + REASON_RANGEFEED_CLOSED = 7; } optional Reason reason = 1 [(gogoproto.nullable) = false]; } diff --git a/pkg/server/node.go b/pkg/server/node.go index 269533ab8f7d..5d42604ebf60 100644 --- a/pkg/server/node.go +++ b/pkg/server/node.go @@ -1636,12 +1636,11 @@ func (n *Node) MuxRangeFeed(stream kvpb.Internal_MuxRangeFeedServer) error { f := n.stores.RangeFeed(req, &sink) f.WhenReady(func(err error) { if err == nil { - // RangeFeed usually finishes with an error. However, if future - // completes with nil error (which could happen e.g. during processor - // shutdown), treat it as a normal stream termination so that the caller - // restarts. - // TODO(101330): Add an explicit retry reason instead of REPLICA_REMOVED. - err = kvpb.NewRangeFeedRetryError(kvpb.RangeFeedRetryError_REASON_REPLICA_REMOVED) + cause := kvpb.RangeFeedRetryError_REASON_RANGEFEED_CLOSED + if !n.storeCfg.Settings.Version.IsActive(stream.Context(), clusterversion.V23_2) { + cause = kvpb.RangeFeedRetryError_REASON_REPLICA_REMOVED + } + err = kvpb.NewRangeFeedRetryError(cause) } e := &kvpb.MuxRangeFeedEvent{