Skip to content

Commit

Permalink
Merge #103049
Browse files Browse the repository at this point in the history
103049: rangefeed: when processor is stopped send errors to client r=miretsky a=aliher1911

Previously when processor was stopped after feeds are closed it would
try to close all existing client feeds with nil error. This should
never happen as processor would always stop with a reason if feeds are
active.

This PR adds a new error code that would be returned by `MuxRangeFeed`
whenever underlying RangeFeed finishes cleanly and returns
`REASON_RANGEFEED_CLOSED` instead of nil. This will cause client side feed
to retry if necessary.

Release note: None

Fixes #101330

Co-authored-by: Oleg Afanasyev <[email protected]>
  • Loading branch information
craig[bot] and aliher1911 committed May 31, 2023
2 parents a879627 + 542f9c3 commit 3ff847a
Show file tree
Hide file tree
Showing 3 changed files with 11 additions and 7 deletions.
3 changes: 2 additions & 1 deletion pkg/kv/kvclient/kvcoord/dist_sender_rangefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -556,7 +556,8 @@ func handleRangefeedError(ctx context.Context, err error) (rangefeedErrorInfo, e
case kvpb.RangeFeedRetryError_REASON_REPLICA_REMOVED,
kvpb.RangeFeedRetryError_REASON_RAFT_SNAPSHOT,
kvpb.RangeFeedRetryError_REASON_LOGICAL_OPS_MISSING,
kvpb.RangeFeedRetryError_REASON_SLOW_CONSUMER:
kvpb.RangeFeedRetryError_REASON_SLOW_CONSUMER,
kvpb.RangeFeedRetryError_REASON_RANGEFEED_CLOSED:
// Try again with same descriptor. These are transient
// errors that should not show up again.
return rangefeedErrorInfo{}, nil
Expand Down
4 changes: 4 additions & 0 deletions pkg/kv/kvpb/errors.proto
Original file line number Diff line number Diff line change
Expand Up @@ -559,6 +559,10 @@ message RangeFeedRetryError {
REASON_SLOW_CONSUMER = 5;
// No leaseholder exists or could be created, so closed timestamps won't be emitted.
REASON_NO_LEASEHOLDER = 6;
// Replica decided that rangefeed is no longer used and closed it. This
// should not happen unless it was explicitly requested by client and in
// that case client is free not to retry.
REASON_RANGEFEED_CLOSED = 7;
}
optional Reason reason = 1 [(gogoproto.nullable) = false];
}
Expand Down
11 changes: 5 additions & 6 deletions pkg/server/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -1636,12 +1636,11 @@ func (n *Node) MuxRangeFeed(stream kvpb.Internal_MuxRangeFeedServer) error {
f := n.stores.RangeFeed(req, &sink)
f.WhenReady(func(err error) {
if err == nil {
// RangeFeed usually finishes with an error. However, if future
// completes with nil error (which could happen e.g. during processor
// shutdown), treat it as a normal stream termination so that the caller
// restarts.
// TODO(101330): Add an explicit retry reason instead of REPLICA_REMOVED.
err = kvpb.NewRangeFeedRetryError(kvpb.RangeFeedRetryError_REASON_REPLICA_REMOVED)
cause := kvpb.RangeFeedRetryError_REASON_RANGEFEED_CLOSED
if !n.storeCfg.Settings.Version.IsActive(stream.Context(), clusterversion.V23_2) {
cause = kvpb.RangeFeedRetryError_REASON_REPLICA_REMOVED
}
err = kvpb.NewRangeFeedRetryError(cause)
}

e := &kvpb.MuxRangeFeedEvent{
Expand Down

0 comments on commit 3ff847a

Please sign in to comment.