Skip to content

Commit

Permalink
kvcoord: Restart ranges on a dedicated goroutine.
Browse files Browse the repository at this point in the history
Restart ranges on a dedicated goroutine (if needed).
Fix logic bug in stuck range handling.
Increase verbosity of logging to help debug mux rangefeed issues.

Informs cockroachdb#99560
Informs cockroachdb#99640
Informs cockroachdb#99214
Informs cockroachdb#98925
Informs cockroachdb#99092
Informs cockroachdb#99212
Informs cockroachdb#99910
Informs cockroachdb#99560
Informs cockroachdb#100468

Release note: None
  • Loading branch information
Yevgeniy Miretskiy committed Apr 4, 2023
1 parent fd972b2 commit ad6b430
Showing 1 changed file with 56 additions and 41 deletions.
97 changes: 56 additions & 41 deletions pkg/kv/kvclient/kvcoord/dist_sender_mux_rangefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,8 @@ func muxRangeFeed(
catchupSem *limit.ConcurrentRequestLimiter,
eventCh chan<- RangeFeedMessage,
) (retErr error) {
if log.V(1) {
// TODO(yevgeniy): Undo log.V(0) and replace with log.V(1) once stuck rangefeed issue resolved.
if log.V(0) {
log.Infof(ctx, "Establishing MuxRangeFeed (%s...; %d spans)", spans[0], len(spans))
start := timeutil.Now()
defer func() {
Expand Down Expand Up @@ -198,7 +199,7 @@ func (m *rangefeedMuxer) startSingleRangeFeed(
var err error
ri, err := m.ds.getRoutingInfo(ctx, rs.Key, rangecache.EvictionToken{}, false)
if err != nil {
log.VErrEventf(ctx, 1, "range descriptor re-lookup failed: %s", err)
log.VErrEventf(ctx, 0, "range descriptor re-lookup failed: %s", err)
if !rangecache.IsRangeLookupErrorRetryable(err) {
return err
}
Expand All @@ -208,11 +209,11 @@ func (m *rangefeedMuxer) startSingleRangeFeed(
}

// Establish a RangeFeed for a single Range.
log.VEventf(ctx, 1, "MuxRangeFeed starting for range %s@%s (rangeID %d)",
span, startAfter, token.Desc().RangeID)
log.VEventf(ctx, 0, "MuxRangeFeed starting for range %s@%s (rangeID %d, attempt %d)",
span, startAfter, token.Desc().RangeID, r.CurrentAttempt())
transport, err := newTransportForRange(ctx, token.Desc(), m.ds)
if err != nil {
log.VErrEventf(ctx, 1, "Failed to create transport for %s ", token.String())
log.VErrEventf(ctx, 0, "Failed to create transport for %s (err=%s) ", token.String(), err)
continue
}
releaseTransport = transport.Release
Expand All @@ -224,7 +225,7 @@ func (m *rangefeedMuxer) startSingleRangeFeed(

rpcClient, err := transport.NextInternalClient(ctx)
if err != nil {
log.VErrEventf(ctx, 1, "RPC error connecting to replica %s: %s", args.Replica, err)
log.VErrEventf(ctx, 0, "RPC error connecting to replica %s: %s", args.Replica, err)
continue
}

Expand All @@ -234,7 +235,7 @@ func (m *rangefeedMuxer) startSingleRangeFeed(
}

if err := conn.startRangeFeed(streamID, stream, &args); err != nil {
log.VErrEventf(ctx, 1,
log.VErrEventf(ctx, 0,
"RPC error establishing mux rangefeed to replica %s: %s", args.Replica, err)
continue
}
Expand Down Expand Up @@ -292,7 +293,7 @@ func (m *rangefeedMuxer) startNodeMuxRangeFeed(
ctx, restore := pprofutil.SetProfilerLabelsFromCtxTags(ctx)
defer restore()

if log.V(1) {
if log.V(0) {
log.Infof(ctx, "Establishing MuxRangeFeed to node %d", nodeID)
start := timeutil.Now()
defer func() {
Expand Down Expand Up @@ -331,9 +332,17 @@ func (m *rangefeedMuxer) startNodeMuxRangeFeed(
recvErr = nil
}

return ms.closeWithRestart(ctx, recvErr, func(a *activeMuxRangeFeed) error {
return m.restartActiveRangeFeed(ctx, a, recvErr)
})
// make sure that the underlying error is not fatal. If it is, there is no
// reason to restart each rangefeed, so just bail out.
if _, err := handleRangefeedError(ctx, recvErr); err != nil {
return err
}

toRestart := ms.close()
if log.V(0) {
log.Infof(ctx, "mux to node %d restarted %d streams", ms.nodeID, len(toRestart))
}
return m.restartActiveRangeFeeds(ctx, recvErr, toRestart)
}

return nil
Expand Down Expand Up @@ -374,7 +383,7 @@ func (m *rangefeedMuxer) receiveEventsFromNode(
// which one executes is a coin flip) and so it is possible that we may see
// additional event(s) arriving for a stream that is no longer active.
if active == nil {
if log.V(1) {
if log.V(0) {
log.Infof(ctx, "received stray event stream %d: %v", event.StreamID, event)
}
continue
Expand All @@ -401,9 +410,12 @@ func (m *rangefeedMuxer) receiveEventsFromNode(
m.ds.metrics.RangefeedErrorCatchup.Inc(1)
}
ms.deleteStream(event.StreamID)
if err := m.restartActiveRangeFeed(ctx, active, t.Error.GoError()); err != nil {
return err
}
// Restart rangefeed on another goroutine. Restart might be a bit
// expensive, particularly if we have to resolve span. We do not want
// to block receiveEventsFromNode for too long.
m.g.GoCtx(func(ctx context.Context) error {
return m.restartActiveRangeFeed(ctx, active, t.Error.GoError())
})
continue
}

Expand All @@ -420,26 +432,42 @@ func (m *rangefeedMuxer) receiveEventsFromNode(
// NB: this does not notify the server in any way. We may have to add
// a more complex protocol -- or better yet, figure out why ranges
// get stuck in the first place.
if timeutil.Now().Before(nextStuckCheck) {
if timeutil.Now().After(nextStuckCheck) {
if threshold := stuckThreshold(); threshold > 0 {
for _, a := range ms.purgeStuckStreams(threshold) {
if err := m.restartActiveRangeFeed(ctx, a, errRestartStuckRange); err != nil {
return err
}
// Restart rangefeed on another goroutine. Restart might be a bit
// expensive, particularly if we have to resolve span. We do not want
// to block receiveEventsFromNode for too long.
toRestart := ms.purgeStuckStreams(threshold)
if len(toRestart) > 0 {
m.g.GoCtx(func(ctx context.Context) error {
return m.restartActiveRangeFeeds(ctx, errRestartStuckRange, toRestart)
})
}
}
nextStuckCheck = timeutil.Now().Add(stuckCheckFreq())
}
}
}

// restarActiveRangeFeeds restarts one or more rangefeeds.
func (m *rangefeedMuxer) restartActiveRangeFeeds(
ctx context.Context, reason error, toRestart []*activeMuxRangeFeed,
) error {
for _, active := range toRestart {
if err := m.restartActiveRangeFeed(ctx, active, reason); err != nil {
return err
}
}
return nil
}

// restartActiveRangeFeed restarts rangefeed after it encountered "reason" error.
func (m *rangefeedMuxer) restartActiveRangeFeed(
ctx context.Context, active *activeMuxRangeFeed, reason error,
) error {
m.ds.metrics.RangefeedRestartRanges.Inc(1)

if log.V(1) {
if log.V(0) {
log.Infof(ctx, "RangeFeed %s@%s disconnected with last checkpoint %s ago: %v",
active.Span, active.StartAfter, timeutil.Since(active.Resolved.GoTime()), reason)
}
Expand Down Expand Up @@ -521,29 +549,16 @@ func (c *muxStream) deleteStream(streamID int64) {
c.mu.Unlock()
}

func (c *muxStream) closeWithRestart(
ctx context.Context, reason error, restartFn func(a *activeMuxRangeFeed) error,
) error {
// close closes mux stream returning the list of active range feeds.
func (c *muxStream) close() []*activeMuxRangeFeed {
c.mu.Lock()
c.mu.closed = true
toRestart := c.mu.streams
toRestart := make([]*activeMuxRangeFeed, 0, len(c.mu.streams))
for _, a := range c.mu.streams {
toRestart = append(toRestart, a)
}
c.mu.streams = nil
c.mu.Unlock()

// make sure that the underlying error is not fatal. If it is, there is no
// reason to restart each rangefeed, so just bail out.
if _, err := handleRangefeedError(ctx, reason); err != nil {
return err
}

for _, a := range toRestart {
if err := restartFn(a); err != nil {
return err
}
}

if log.V(1) {
log.Infof(ctx, "mux to node %d restarted %d streams", c.nodeID, len(toRestart))
}
return nil
return toRestart
}

0 comments on commit ad6b430

Please sign in to comment.