Skip to content

Commit

Permalink
kvcoord: Rework error propagation in mux rangefeed
Browse files Browse the repository at this point in the history
Prior to this change, there were cases where a future
used to wait for a single range feed completion, may
be completed multiple times, or a message about range
feed termination may be sent multiple times on a single
mux rangefeed stream.

One of those cases was a check for `ensureClosedTimestampStarted`.
If this method returned an error, we would immediately send
the error on the rpc stream, and then complete the future
with nil error.

Another instance was when registry would `DisconnectWithErr` --
in that case, we would first complete future in this method, and
then, complete it again later.

It appears that completing future multiple times should be
okay; however, it is still a bit worrysome.  The deadlocks observed
were all in the local RPC bypas (`rpc/context.go`), and it's
not a stretch to imagine that as soon as the first error
(e.g. from ensureClosedTimestampStarted) is returned, the
goroutine reading these messages terminates, and causes the
subsequent attempt to send the error deadlock.

Another hypothetical issue is how the mux rangefeed sent
the error when the future completed.  Prior to this change, this
happened inline (via `WhenReady` closure).  This is dangerous since
this closure may run when important locks (such as raft mu) are being
held.  What could happen is that mux rangefeed encounters a retryable
error.  The future is prepared with error value, which causes
an error to be sent to the client.  This happens with some lock being
held.  The client, notices this error, and attempts to restart
rangefeed -- to the same server, and that could block; At least in
theory.  Regardless, it seems that performing IO while the locks could
be potentially held, is not a good idea.  This PR fixes this problem
by shunting logical rangefeed completion notification to a dedicated
go routine.

Informs #99560
Informs #99640
Informs #99214
Informs #98925
Informs #99092
Informs #99212
Informs #99910
Informs #99560

Release note: None
  • Loading branch information
Yevgeniy Miretskiy committed Apr 7, 2023
1 parent 8e6f530 commit acc84e4
Show file tree
Hide file tree
Showing 5 changed files with 137 additions and 21 deletions.
8 changes: 5 additions & 3 deletions pkg/kv/kvclient/kvcoord/dist_sender_rangefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -509,6 +509,11 @@ func handleRangefeedError(ctx context.Context, err error) (rangefeedErrorInfo, e
return rangefeedErrorInfo{}, nil
}

if err == io.EOF {
// If we got an EOF, treat it as a signal to restart single range feed.
return rangefeedErrorInfo{}, nil
}

switch {
case errors.HasType(err, (*kvpb.StoreNotFoundError)(nil)) ||
errors.HasType(err, (*kvpb.NodeUnavailableError)(nil)):
Expand Down Expand Up @@ -751,9 +756,6 @@ func (ds *DistSender) singleRangeFeed(
return err
}); err != nil {
log.VErrEventf(ctx, 2, "RPC error: %s", err)
if err == io.EOF {
return args.Timestamp, nil
}
if stuckWatcher.stuck() {
afterCatchUpScan := catchupRes == nil
return args.Timestamp, ds.handleStuckEvent(&args, afterCatchUpScan, stuckWatcher.threshold())
Expand Down
6 changes: 5 additions & 1 deletion pkg/kv/kvserver/client_replica_circuit_breaker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -428,7 +428,11 @@ func waitReplicaRangeFeed(
if ctxErr != nil {
return ctxErr
}
return rfErr
var event kvpb.RangeFeedEvent
event.SetValue(&kvpb.RangeFeedError{
Error: *kvpb.NewError(rfErr),
})
return stream.Send(&event)
}

// This test verifies that RangeFeed bypasses the circuit breaker. When the
Expand Down
4 changes: 1 addition & 3 deletions pkg/kv/kvserver/rangefeed/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -503,10 +503,8 @@ func (reg *registry) Disconnect(span roachpb.Span) {
// DisconnectWithErr disconnects all registrations that overlap the specified
// span with the provided error.
func (reg *registry) DisconnectWithErr(span roachpb.Span, pErr *kvpb.Error) {
err := pErr.GoError()
reg.forOverlappingRegs(span, func(r *registration) (bool, *kvpb.Error) {
r.done.Set(err)
return true, pErr
return true /* disconned */, pErr
})
}

Expand Down
7 changes: 1 addition & 6 deletions pkg/kv/kvserver/replica_rangefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,12 +159,7 @@ func (r *Replica) RangeFeed(
}

if err := r.ensureClosedTimestampStarted(ctx); err != nil {
if err := stream.Send(&kvpb.RangeFeedEvent{Error: &kvpb.RangeFeedError{
Error: *err,
}}); err != nil {
return future.MakeCompletedErrorFuture(err)
}
return future.MakeCompletedErrorFuture(nil)
return future.MakeCompletedErrorFuture(err.GoError())
}

// If the RangeFeed is performing a catch-up scan then it will observe all
Expand Down
133 changes: 125 additions & 8 deletions pkg/server/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"net"
"sort"
"strings"
"sync"
"time"

"github.com/cockroachdb/cockroach/pkg/base"
Expand Down Expand Up @@ -1514,9 +1515,108 @@ func (s *lockedMuxStream) Send(e *kvpb.MuxRangeFeedEvent) error {
return s.wrapped.Send(e)
}

// newMuxRangeFeedCompletionWatcher returns 2 functions: one to forward mux
// rangefeed completion events to the sender, and a cleanup function. Mux
// rangefeed completion events can be triggered at any point, and we would like
// to avoid blocking on IO (sender.Send) during potentially critical areas.
// Thus, the forwarding should happen on a dedicated goroutine.
// However, some simple solutions might not work well:
// 1. spinning a dedicated goroutine whenever rangefeed completes is
// undesirable because we could wind up spinning thousands of those goroutines
// in short burst.
// 2. having a single goroutine process completion requests would require the
// use of a channel; but it's not clear what the channel size should be --
// sufficiently sized bursts of termination would fill this channel, leading
// to wait time -- potentially during critical section, which is something we
// want to avoid.
//
// The solution presented by this function, adds an unbounded queue of
// completion requests, serviced by a dedicated goroutine.
func newMuxRangeFeedCompletionWatcher(
ctx context.Context, stopper *stop.Stopper, sender *lockedMuxStream,
) (doneFn func(event *kvpb.MuxRangeFeedEvent), cleanup func(), _ error) {
// structure to help coordination of event forwarding and shutdown.
var fin = struct {
syncutil.Mutex
cv *sync.Cond
stop bool
completed []*kvpb.MuxRangeFeedEvent
}{}
fin.cv = sync.NewCond(&fin)

// stopForwarder signals completion forwarder to terminate whenever
// context or stopper is signaled.
stopForwarder := func(ctx context.Context) {
select {
case <-ctx.Done():
case <-stopper.ShouldQuiesce():
}
fin.Lock()
fin.stop = true
fin.cv.Broadcast()
fin.Unlock()
}

// forwardCompletion listens to completion notifications and forwards
// them to the sender.
forwardCompletion := func(ctx context.Context) {
for {
var toSend []*kvpb.MuxRangeFeedEvent
fin.Lock()
for toSend == nil {
if len(fin.completed) == 0 {
if fin.stop {
fin.Unlock()
return
}
fin.cv.Wait()
}
toSend = fin.completed
fin.completed = nil
}
fin.Unlock()
for _, e := range toSend {
// Sending could fail, but if it did, the stream is broken anyway, so
// nothing we can do with this error.
if err := sender.Send(e); err != nil {
return
}
}
}
}

var wg sync.WaitGroup
wg.Add(1)
if err := stopper.RunAsyncTask(ctx, "mux-term-stopper", stopForwarder); err != nil {
return nil, nil, err
}

if err := stopper.RunAsyncTask(ctx, "mux-term-forwarder", func(ctx context.Context) {
defer wg.Done()
forwardCompletion(ctx)
}); err != nil {
return nil, nil, err
}

addCompleted := func(event *kvpb.MuxRangeFeedEvent) {
fin.Lock()
fin.completed = append(fin.completed, event)
fin.cv.Signal()
fin.Unlock()
}
return addCompleted, wg.Wait, nil
}

// MuxRangeFeed implements the roachpb.InternalServer interface.
func (n *Node) MuxRangeFeed(stream kvpb.Internal_MuxRangeFeedServer) error {
muxStream := &lockedMuxStream{wrapped: stream}

rangefeedCompleted, cleanup, err := newMuxRangeFeedCompletionWatcher(stream.Context(), n.stopper, muxStream)
if err != nil {
return err
}
defer cleanup()

for {
req, err := stream.Recv()
if err != nil {
Expand All @@ -1537,15 +1637,32 @@ func (n *Node) MuxRangeFeed(stream kvpb.Internal_MuxRangeFeedServer) error {
// TODO(yevgeniy): Add observability into actively running rangefeeds.
f := n.stores.RangeFeed(req, &sink)
f.WhenReady(func(err error) {
if err != nil {
var event kvpb.RangeFeedEvent
event.SetValue(&kvpb.RangeFeedError{
Error: *kvpb.NewError(err),
})
// Sending could fail, but if it did, the stream is broken anyway, so
// nothing we can do with this error.
_ = sink.Send(&event)
if err == nil {
// RangeFeed usually finishes with an error. However, if future
// completes with nil error (which could happen e.g. during processor
// shutdown), treat it as a normal stream termination so that the caller
// restarts.
// TODO(yevgeniy): Add an explicit retry reason instead of REPLICA_REMOVED.
err = kvpb.NewRangeFeedRetryError(kvpb.RangeFeedRetryError_REASON_REPLICA_REMOVED)
}

// When rangefeed completes, we must notify the client about that.
//
// NB: even though calling sink.Send() to send notification might seem
// correct, it is also unsafe. This future may be completed at any point,
// including during critical section when some important lock (such as
// raftMu in processor) may be held. Issuing potentially blocking IO
// during that time is not a good idea. Thus, we shunt the notification to
// a dedicated goroutine.
e := &kvpb.MuxRangeFeedEvent{
RangeID: req.RangeID,
StreamID: req.StreamID,
}

e.SetValue(&kvpb.RangeFeedError{
Error: *kvpb.NewError(err),
})
rangefeedCompleted(e)
})
}
}
Expand Down

0 comments on commit acc84e4

Please sign in to comment.