Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

release-23.1.0: kvcoord: Rework error propagation in mux rangefeed #101406

Merged
merged 1 commit into from
Apr 13, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions pkg/kv/kvclient/kvcoord/dist_sender_rangefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -509,6 +509,9 @@ func handleRangefeedError(ctx context.Context, err error) (rangefeedErrorInfo, e
}

switch {
case errors.Is(err, io.EOF):
// If we got an EOF, treat it as a signal to restart single range feed.
return rangefeedErrorInfo{}, nil
case errors.HasType(err, (*kvpb.StoreNotFoundError)(nil)) ||
errors.HasType(err, (*kvpb.NodeUnavailableError)(nil)):
// These errors are likely to be unique to the replica that
Expand Down Expand Up @@ -750,9 +753,6 @@ func (ds *DistSender) singleRangeFeed(
return err
}); err != nil {
log.VErrEventf(ctx, 2, "RPC error: %s", err)
if err == io.EOF {
return args.Timestamp, nil
}
if stuckWatcher.stuck() {
afterCatchUpScan := catchupRes == nil
return args.Timestamp, ds.handleStuckEvent(&args, afterCatchUpScan, stuckWatcher.threshold())
Expand Down
6 changes: 5 additions & 1 deletion pkg/kv/kvserver/client_replica_circuit_breaker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -428,7 +428,11 @@ func waitReplicaRangeFeed(
if ctxErr != nil {
return ctxErr
}
return rfErr
var event kvpb.RangeFeedEvent
event.SetValue(&kvpb.RangeFeedError{
Error: *kvpb.NewError(rfErr),
})
return stream.Send(&event)
}

// This test verifies that RangeFeed bypasses the circuit breaker. When the
Expand Down
4 changes: 1 addition & 3 deletions pkg/kv/kvserver/rangefeed/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -503,10 +503,8 @@ func (reg *registry) Disconnect(span roachpb.Span) {
// DisconnectWithErr disconnects all registrations that overlap the specified
// span with the provided error.
func (reg *registry) DisconnectWithErr(span roachpb.Span, pErr *kvpb.Error) {
err := pErr.GoError()
reg.forOverlappingRegs(span, func(r *registration) (bool, *kvpb.Error) {
r.done.Set(err)
return true, pErr
return true /* disconned */, pErr
})
}

Expand Down
7 changes: 1 addition & 6 deletions pkg/kv/kvserver/replica_rangefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,12 +146,7 @@ func (r *Replica) RangeFeed(
}

if err := r.ensureClosedTimestampStarted(ctx); err != nil {
if err := stream.Send(&kvpb.RangeFeedEvent{Error: &kvpb.RangeFeedError{
Error: *err,
}}); err != nil {
return future.MakeCompletedErrorFuture(err)
}
return future.MakeCompletedErrorFuture(nil)
return future.MakeCompletedErrorFuture(err.GoError())
}

// If the RangeFeed is performing a catch-up scan then it will observe all
Expand Down
111 changes: 103 additions & 8 deletions pkg/server/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"net"
"sort"
"strings"
"sync"
"time"

"github.com/cockroachdb/cockroach/pkg/base"
Expand Down Expand Up @@ -1514,9 +1515,85 @@ func (s *lockedMuxStream) Send(e *kvpb.MuxRangeFeedEvent) error {
return s.wrapped.Send(e)
}

// newMuxRangeFeedCompletionWatcher returns 2 functions: one to forward mux
// rangefeed completion events to the sender, and a cleanup function. Mux
// rangefeed completion events can be triggered at any point, and we would like
// to avoid blocking on IO (sender.Send) during potentially critical areas.
// Thus, the forwarding should happen on a dedicated goroutine.
func newMuxRangeFeedCompletionWatcher(
ctx context.Context, stopper *stop.Stopper, sender *lockedMuxStream,
) (doneFn func(event *kvpb.MuxRangeFeedEvent), cleanup func(), _ error) {
// structure to help coordination of event forwarding and shutdown.
var fin = struct {
syncutil.Mutex
completed []*kvpb.MuxRangeFeedEvent
signalC chan struct{}
}{
// NB: a buffer of 1 ensures we can always send a signal when rangefeed completes.
signalC: make(chan struct{}, 1),
}

// forwardCompletion listens to completion notifications and forwards
// them to the sender.
forwardCompletion := func(ctx context.Context) {
for {
select {
case <-fin.signalC:
var toSend []*kvpb.MuxRangeFeedEvent
fin.Lock()
toSend, fin.completed = fin.completed, nil
fin.Unlock()
for _, e := range toSend {
if err := sender.Send(e); err != nil {
// If we failed to send, there is nothing else we can do.
// The stream is broken anyway.
return
}
}
case <-sender.wrapped.Context().Done():
return
case <-ctx.Done():
return
case <-stopper.ShouldQuiesce():
// There is nothing we can do here; stream cancellation is usually
// triggered by the client. We don't have access to stream cancellation
// function; so, just let things proceed until the server shuts down.
return
}
}
}

var wg sync.WaitGroup
wg.Add(1)
if err := stopper.RunAsyncTask(ctx, "mux-term-forwarder", func(ctx context.Context) {
defer wg.Done()
forwardCompletion(ctx)
}); err != nil {
return nil, nil, err
}

addCompleted := func(event *kvpb.MuxRangeFeedEvent) {
fin.Lock()
fin.completed = append(fin.completed, event)
fin.Unlock()
select {
case fin.signalC <- struct{}{}:
default:
}
}
return addCompleted, wg.Wait, nil
}

// MuxRangeFeed implements the roachpb.InternalServer interface.
func (n *Node) MuxRangeFeed(stream kvpb.Internal_MuxRangeFeedServer) error {
muxStream := &lockedMuxStream{wrapped: stream}

rangefeedCompleted, cleanup, err := newMuxRangeFeedCompletionWatcher(stream.Context(), n.stopper, muxStream)
if err != nil {
return err
}
defer cleanup()

for {
req, err := stream.Recv()
if err != nil {
Expand All @@ -1537,15 +1614,33 @@ func (n *Node) MuxRangeFeed(stream kvpb.Internal_MuxRangeFeedServer) error {
// TODO(yevgeniy): Add observability into actively running rangefeeds.
f := n.stores.RangeFeed(req, &sink)
f.WhenReady(func(err error) {
if err != nil {
var event kvpb.RangeFeedEvent
event.SetValue(&kvpb.RangeFeedError{
Error: *kvpb.NewError(err),
})
// Sending could fail, but if it did, the stream is broken anyway, so
// nothing we can do with this error.
_ = sink.Send(&event)
if err == nil {
// RangeFeed usually finishes with an error. However, if future
// completes with nil error (which could happen e.g. during processor
// shutdown), treat it as a normal stream termination so that the caller
// restarts.
// TODO(101330): Add an explicit retry reason instead of REPLICA_REMOVED.
err = kvpb.NewRangeFeedRetryError(kvpb.RangeFeedRetryError_REASON_REPLICA_REMOVED)
}

e := &kvpb.MuxRangeFeedEvent{
RangeID: req.RangeID,
StreamID: req.StreamID,
}

e.SetValue(&kvpb.RangeFeedError{
Error: *kvpb.NewError(err),
})

// When rangefeed completes, we must notify the client about that.
//
// NB: even though calling sink.Send() to send notification might seem
// correct, it is also unsafe. This future may be completed at any point,
// including during critical section when some important lock (such as
// raftMu in processor) may be held. Issuing potentially blocking IO
// during that time is not a good idea. Thus, we shunt the notification to
// a dedicated goroutine.
rangefeedCompleted(e)
})
}
}
Expand Down