Skip to content

Commit

Permalink
rangefeed: when processor is stopped send errors to client
Browse files Browse the repository at this point in the history
Previously when processor was stopped after feeds are closed it would
try to close all existing client feeds with nil error. This should
never happen as processor would always stop with a reason if feeds are
active.
This PR adds a new error code that would be returned by MuxRangeFeed
whenever underlying RangeFeed finishes cleanly and returns
REASON_RANGEFEED_CLOSED instead of nil. This will cause client side feed
to retry if necessary.

Release note: None
  • Loading branch information
aliher1911 committed May 30, 2023
1 parent e891046 commit 542f9c3
Show file tree
Hide file tree
Showing 3 changed files with 11 additions and 7 deletions.
3 changes: 2 additions & 1 deletion pkg/kv/kvclient/kvcoord/dist_sender_rangefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -556,7 +556,8 @@ func handleRangefeedError(ctx context.Context, err error) (rangefeedErrorInfo, e
case kvpb.RangeFeedRetryError_REASON_REPLICA_REMOVED,
kvpb.RangeFeedRetryError_REASON_RAFT_SNAPSHOT,
kvpb.RangeFeedRetryError_REASON_LOGICAL_OPS_MISSING,
kvpb.RangeFeedRetryError_REASON_SLOW_CONSUMER:
kvpb.RangeFeedRetryError_REASON_SLOW_CONSUMER,
kvpb.RangeFeedRetryError_REASON_RANGEFEED_CLOSED:
// Try again with same descriptor. These are transient
// errors that should not show up again.
return rangefeedErrorInfo{}, nil
Expand Down
4 changes: 4 additions & 0 deletions pkg/kv/kvpb/errors.proto
Original file line number Diff line number Diff line change
Expand Up @@ -559,6 +559,10 @@ message RangeFeedRetryError {
REASON_SLOW_CONSUMER = 5;
// No leaseholder exists or could be created, so closed timestamps won't be emitted.
REASON_NO_LEASEHOLDER = 6;
// Replica decided that rangefeed is no longer used and closed it. This
// should not happen unless it was explicitly requested by client and in
// that case client is free not to retry.
REASON_RANGEFEED_CLOSED = 7;
}
optional Reason reason = 1 [(gogoproto.nullable) = false];
}
Expand Down
11 changes: 5 additions & 6 deletions pkg/server/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -1636,12 +1636,11 @@ func (n *Node) MuxRangeFeed(stream kvpb.Internal_MuxRangeFeedServer) error {
f := n.stores.RangeFeed(req, &sink)
f.WhenReady(func(err error) {
if err == nil {
// RangeFeed usually finishes with an error. However, if future
// completes with nil error (which could happen e.g. during processor
// shutdown), treat it as a normal stream termination so that the caller
// restarts.
// TODO(101330): Add an explicit retry reason instead of REPLICA_REMOVED.
err = kvpb.NewRangeFeedRetryError(kvpb.RangeFeedRetryError_REASON_REPLICA_REMOVED)
cause := kvpb.RangeFeedRetryError_REASON_RANGEFEED_CLOSED
if !n.storeCfg.Settings.Version.IsActive(stream.Context(), clusterversion.V23_2) {
cause = kvpb.RangeFeedRetryError_REASON_REPLICA_REMOVED
}
err = kvpb.NewRangeFeedRetryError(cause)
}

e := &kvpb.MuxRangeFeedEvent{
Expand Down

0 comments on commit 542f9c3

Please sign in to comment.