From 542f9c37e1bdb90a56fd1b331f1f6c9092fc1fd1 Mon Sep 17 00:00:00 2001 From: Oleg Afanasyev Date: Thu, 4 May 2023 11:57:50 +0100 Subject: [PATCH] rangefeed: when processor is stopped send errors to client Previously when processor was stopped after feeds are closed it would try to close all existing client feeds with nil error. This should never happen as processor would always stop with a reason if feeds are active. This PR adds a new error code that would be returned by MuxRangeFeed whenever underlying RangeFeed finishes cleanly and returns REASON_RANGEFEED_CLOSED instead of nil. This will cause client side feed to retry if necessary. Release note: None --- pkg/kv/kvclient/kvcoord/dist_sender_rangefeed.go | 3 ++- pkg/kv/kvpb/errors.proto | 4 ++++ pkg/server/node.go | 11 +++++------ 3 files changed, 11 insertions(+), 7 deletions(-) diff --git a/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed.go b/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed.go index c01685e0b4fd..50d94e3c5470 100644 --- a/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed.go +++ b/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed.go @@ -556,7 +556,8 @@ func handleRangefeedError(ctx context.Context, err error) (rangefeedErrorInfo, e case kvpb.RangeFeedRetryError_REASON_REPLICA_REMOVED, kvpb.RangeFeedRetryError_REASON_RAFT_SNAPSHOT, kvpb.RangeFeedRetryError_REASON_LOGICAL_OPS_MISSING, - kvpb.RangeFeedRetryError_REASON_SLOW_CONSUMER: + kvpb.RangeFeedRetryError_REASON_SLOW_CONSUMER, + kvpb.RangeFeedRetryError_REASON_RANGEFEED_CLOSED: // Try again with same descriptor. These are transient // errors that should not show up again. return rangefeedErrorInfo{}, nil diff --git a/pkg/kv/kvpb/errors.proto b/pkg/kv/kvpb/errors.proto index 24a1df3dece5..a2ceb73f47e4 100644 --- a/pkg/kv/kvpb/errors.proto +++ b/pkg/kv/kvpb/errors.proto @@ -559,6 +559,10 @@ message RangeFeedRetryError { REASON_SLOW_CONSUMER = 5; // No leaseholder exists or could be created, so closed timestamps won't be emitted. REASON_NO_LEASEHOLDER = 6; + // Replica decided that rangefeed is no longer used and closed it. This + // should not happen unless it was explicitly requested by client and in + // that case client is free not to retry. + REASON_RANGEFEED_CLOSED = 7; } optional Reason reason = 1 [(gogoproto.nullable) = false]; } diff --git a/pkg/server/node.go b/pkg/server/node.go index 269533ab8f7d..5d42604ebf60 100644 --- a/pkg/server/node.go +++ b/pkg/server/node.go @@ -1636,12 +1636,11 @@ func (n *Node) MuxRangeFeed(stream kvpb.Internal_MuxRangeFeedServer) error { f := n.stores.RangeFeed(req, &sink) f.WhenReady(func(err error) { if err == nil { - // RangeFeed usually finishes with an error. However, if future - // completes with nil error (which could happen e.g. during processor - // shutdown), treat it as a normal stream termination so that the caller - // restarts. - // TODO(101330): Add an explicit retry reason instead of REPLICA_REMOVED. - err = kvpb.NewRangeFeedRetryError(kvpb.RangeFeedRetryError_REASON_REPLICA_REMOVED) + cause := kvpb.RangeFeedRetryError_REASON_RANGEFEED_CLOSED + if !n.storeCfg.Settings.Version.IsActive(stream.Context(), clusterversion.V23_2) { + cause = kvpb.RangeFeedRetryError_REASON_REPLICA_REMOVED + } + err = kvpb.NewRangeFeedRetryError(cause) } e := &kvpb.MuxRangeFeedEvent{