Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
100189:  kvcoord: Restart ranges on a dedicated goroutine. r=miretskiy a=miretskiy

Restart ranges on a dedicated goroutine (if needed).
Fix logic bug in stuck range handling.
Increase verbosity of logging to help debug mux rangefeed issues.

Informs #99560
Informs #99640
Informs #99214
Informs #98925
Informs #99092
Informs #99212
Informs #99910
Informs #99560

Release note: None

100525: rpc: Handle closed error r=erikgrinaker a=andrewbaptist

We close the listener before closing the connection. This can result in a spurious failure due to the Listener also closing our connection.

Epic: none
Fixes: #100391
Fixes: #77754
Informs: #80034

Release note: None

100528: sql: fix flaky TestSQLStatsCompactor r=j82w a=j82w

The test failure is showing more total wide scans
than expected. Change the compact stats job to run
once a year to avoid it running at the same time
as the test.

The interceptor is disabled right after delete
reducing the possibility of another operation
causing a conflict.

Epic: none
closes: #99653

Release note: none

100589: allocator: deflake full disk test r=andrewbaptist a=kvoli

In #97409 we introduced cluster settings to control the disk fullness threshold for rebalancing towards a store and shedding replicas off of the store. The `TestAllocatorFullDisks` assumes the total number of range bytes is equal or less than the rebalance threshold of the nodes, however the test was updated to use the shed threshold instead. This caused the test to flake occasionally as there was more than the expected amount of total range bytes.

This patch changes the ranges per node calculation to use the rebalance threshold again, instead of the shed threshold

```
dev test pkg/kv/kvserver/allocator/allocatorimpl -f TestAllocatorFullDisks -v --stress
...
15714 runs so far, 0 failures, over 39m45s
```

Fixes: #100033

Release note: None

100610: roachtest: set config.Quiet to true r=herkolategan a=srosenberg

After refactoring in [1], the default of config.Quiet was set to false since the roachprod CLI option is intended to set it to true. This resulted in an unwanted side-effect, namely roachtests running with the new default. Consequently, test_runner's log ended up with a bunch of (terminal) escape codes due to (status) spinner.

This change ensures roachtest explicitly sets config.Quiet to true.

[1] #99133

Epic: none

Release note: None

Co-authored-by: Yevgeniy Miretskiy <[email protected]>
Co-authored-by: Andrew Baptist <[email protected]>
Co-authored-by: j82w <[email protected]>
Co-authored-by: Austen McClernon <[email protected]>
Co-authored-by: Stan Rosenberg <[email protected]>
  • Loading branch information
6 people committed Apr 4, 2023
6 parents 0d15e0d + ad6b430 + 41288e1 + 097fe1b + 3b94693 + 6c4d605 commit 76956df
Show file tree
Hide file tree
Showing 5 changed files with 102 additions and 70 deletions.
2 changes: 2 additions & 0 deletions pkg/cmd/roachtest/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -353,6 +353,8 @@ runner itself.
fmt.Fprintf(os.Stderr, "unable to lookup current user: %s\n", err)
os.Exit(1)
}
// Disable spinners and other fancy status messages since all IO is non-interactive.
config.Quiet = true

if err := roachprod.InitDirs(); err != nil {
fmt.Fprintf(os.Stderr, "%s\n", err)
Expand Down
97 changes: 56 additions & 41 deletions pkg/kv/kvclient/kvcoord/dist_sender_mux_rangefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,8 @@ func muxRangeFeed(
catchupSem *limit.ConcurrentRequestLimiter,
eventCh chan<- RangeFeedMessage,
) (retErr error) {
if log.V(1) {
// TODO(yevgeniy): Undo log.V(0) and replace with log.V(1) once stuck rangefeed issue resolved.
if log.V(0) {
log.Infof(ctx, "Establishing MuxRangeFeed (%s...; %d spans)", spans[0], len(spans))
start := timeutil.Now()
defer func() {
Expand Down Expand Up @@ -198,7 +199,7 @@ func (m *rangefeedMuxer) startSingleRangeFeed(
var err error
ri, err := m.ds.getRoutingInfo(ctx, rs.Key, rangecache.EvictionToken{}, false)
if err != nil {
log.VErrEventf(ctx, 1, "range descriptor re-lookup failed: %s", err)
log.VErrEventf(ctx, 0, "range descriptor re-lookup failed: %s", err)
if !rangecache.IsRangeLookupErrorRetryable(err) {
return err
}
Expand All @@ -208,11 +209,11 @@ func (m *rangefeedMuxer) startSingleRangeFeed(
}

// Establish a RangeFeed for a single Range.
log.VEventf(ctx, 1, "MuxRangeFeed starting for range %s@%s (rangeID %d)",
span, startAfter, token.Desc().RangeID)
log.VEventf(ctx, 0, "MuxRangeFeed starting for range %s@%s (rangeID %d, attempt %d)",
span, startAfter, token.Desc().RangeID, r.CurrentAttempt())
transport, err := newTransportForRange(ctx, token.Desc(), m.ds)
if err != nil {
log.VErrEventf(ctx, 1, "Failed to create transport for %s ", token.String())
log.VErrEventf(ctx, 0, "Failed to create transport for %s (err=%s) ", token.String(), err)
continue
}
releaseTransport = transport.Release
Expand All @@ -224,7 +225,7 @@ func (m *rangefeedMuxer) startSingleRangeFeed(

rpcClient, err := transport.NextInternalClient(ctx)
if err != nil {
log.VErrEventf(ctx, 1, "RPC error connecting to replica %s: %s", args.Replica, err)
log.VErrEventf(ctx, 0, "RPC error connecting to replica %s: %s", args.Replica, err)
continue
}

Expand All @@ -234,7 +235,7 @@ func (m *rangefeedMuxer) startSingleRangeFeed(
}

if err := conn.startRangeFeed(streamID, stream, &args); err != nil {
log.VErrEventf(ctx, 1,
log.VErrEventf(ctx, 0,
"RPC error establishing mux rangefeed to replica %s: %s", args.Replica, err)
continue
}
Expand Down Expand Up @@ -292,7 +293,7 @@ func (m *rangefeedMuxer) startNodeMuxRangeFeed(
ctx, restore := pprofutil.SetProfilerLabelsFromCtxTags(ctx)
defer restore()

if log.V(1) {
if log.V(0) {
log.Infof(ctx, "Establishing MuxRangeFeed to node %d", nodeID)
start := timeutil.Now()
defer func() {
Expand Down Expand Up @@ -331,9 +332,17 @@ func (m *rangefeedMuxer) startNodeMuxRangeFeed(
recvErr = nil
}

return ms.closeWithRestart(ctx, recvErr, func(a *activeMuxRangeFeed) error {
return m.restartActiveRangeFeed(ctx, a, recvErr)
})
// make sure that the underlying error is not fatal. If it is, there is no
// reason to restart each rangefeed, so just bail out.
if _, err := handleRangefeedError(ctx, recvErr); err != nil {
return err
}

toRestart := ms.close()
if log.V(0) {
log.Infof(ctx, "mux to node %d restarted %d streams", ms.nodeID, len(toRestart))
}
return m.restartActiveRangeFeeds(ctx, recvErr, toRestart)
}

return nil
Expand Down Expand Up @@ -374,7 +383,7 @@ func (m *rangefeedMuxer) receiveEventsFromNode(
// which one executes is a coin flip) and so it is possible that we may see
// additional event(s) arriving for a stream that is no longer active.
if active == nil {
if log.V(1) {
if log.V(0) {
log.Infof(ctx, "received stray event stream %d: %v", event.StreamID, event)
}
continue
Expand All @@ -401,9 +410,12 @@ func (m *rangefeedMuxer) receiveEventsFromNode(
m.ds.metrics.RangefeedErrorCatchup.Inc(1)
}
ms.deleteStream(event.StreamID)
if err := m.restartActiveRangeFeed(ctx, active, t.Error.GoError()); err != nil {
return err
}
// Restart rangefeed on another goroutine. Restart might be a bit
// expensive, particularly if we have to resolve span. We do not want
// to block receiveEventsFromNode for too long.
m.g.GoCtx(func(ctx context.Context) error {
return m.restartActiveRangeFeed(ctx, active, t.Error.GoError())
})
continue
}

Expand All @@ -420,26 +432,42 @@ func (m *rangefeedMuxer) receiveEventsFromNode(
// NB: this does not notify the server in any way. We may have to add
// a more complex protocol -- or better yet, figure out why ranges
// get stuck in the first place.
if timeutil.Now().Before(nextStuckCheck) {
if timeutil.Now().After(nextStuckCheck) {
if threshold := stuckThreshold(); threshold > 0 {
for _, a := range ms.purgeStuckStreams(threshold) {
if err := m.restartActiveRangeFeed(ctx, a, errRestartStuckRange); err != nil {
return err
}
// Restart rangefeed on another goroutine. Restart might be a bit
// expensive, particularly if we have to resolve span. We do not want
// to block receiveEventsFromNode for too long.
toRestart := ms.purgeStuckStreams(threshold)
if len(toRestart) > 0 {
m.g.GoCtx(func(ctx context.Context) error {
return m.restartActiveRangeFeeds(ctx, errRestartStuckRange, toRestart)
})
}
}
nextStuckCheck = timeutil.Now().Add(stuckCheckFreq())
}
}
}

// restarActiveRangeFeeds restarts one or more rangefeeds.
func (m *rangefeedMuxer) restartActiveRangeFeeds(
ctx context.Context, reason error, toRestart []*activeMuxRangeFeed,
) error {
for _, active := range toRestart {
if err := m.restartActiveRangeFeed(ctx, active, reason); err != nil {
return err
}
}
return nil
}

// restartActiveRangeFeed restarts rangefeed after it encountered "reason" error.
func (m *rangefeedMuxer) restartActiveRangeFeed(
ctx context.Context, active *activeMuxRangeFeed, reason error,
) error {
m.ds.metrics.RangefeedRestartRanges.Inc(1)

if log.V(1) {
if log.V(0) {
log.Infof(ctx, "RangeFeed %s@%s disconnected with last checkpoint %s ago: %v",
active.Span, active.StartAfter, timeutil.Since(active.Resolved.GoTime()), reason)
}
Expand Down Expand Up @@ -521,29 +549,16 @@ func (c *muxStream) deleteStream(streamID int64) {
c.mu.Unlock()
}

func (c *muxStream) closeWithRestart(
ctx context.Context, reason error, restartFn func(a *activeMuxRangeFeed) error,
) error {
// close closes mux stream returning the list of active range feeds.
func (c *muxStream) close() []*activeMuxRangeFeed {
c.mu.Lock()
c.mu.closed = true
toRestart := c.mu.streams
toRestart := make([]*activeMuxRangeFeed, 0, len(c.mu.streams))
for _, a := range c.mu.streams {
toRestart = append(toRestart, a)
}
c.mu.streams = nil
c.mu.Unlock()

// make sure that the underlying error is not fatal. If it is, there is no
// reason to restart each rangefeed, so just bail out.
if _, err := handleRangefeedError(ctx, reason); err != nil {
return err
}

for _, a := range toRestart {
if err := restartFn(a); err != nil {
return err
}
}

if log.V(1) {
log.Infof(ctx, "mux to node %d restarted %d streams", c.nodeID, len(toRestart))
}
return nil
return toRestart
}
9 changes: 8 additions & 1 deletion pkg/kv/kvserver/allocator/allocatorimpl/allocator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8475,7 +8475,14 @@ func TestAllocatorFullDisks(t *testing.T) {

do := makeDiskCapacityOptions(&st.SV)

rangesPerNode := int(math.Floor(capacity * do.ShedAndBlockAllThreshold / rangeSize))
// Each range is equally sized (16mb), we want the number of ranges per node,
// when their size is added, to be no greater than the full disk rebalance
// threshold (0.925%) e.g for below:
// capacity = 1024mb
// rangeSize = 16mb
// threshold = 0.925
// rangesPerNode = ⌊1024mb * 0.925 / 16mb⌋ = 59
rangesPerNode := int(math.Floor(capacity * do.RebalanceToThreshold / rangeSize))
rangesToAdd := rangesPerNode * nodes

// Initialize testStores.
Expand Down
12 changes: 10 additions & 2 deletions pkg/rpc/context_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1176,7 +1176,9 @@ func TestHeartbeatHealthTransport(t *testing.T) {
defer mu.Unlock()
n := len(mu.conns)
for i := n - 1; i >= 0; i-- {
if err := mu.conns[i].Close(); err != nil {
// This can spuriously return ErrClosed since the listener is closed
// before us.
if err := mu.conns[i].Close(); err != nil && !errors.Is(err, net.ErrClosed) {
return 0, err
}
mu.conns = mu.conns[:i]
Expand Down Expand Up @@ -1272,10 +1274,16 @@ func TestHeartbeatHealthTransport(t *testing.T) {
return nil
})

// TODO(baptist): Better understand when this happens. It appears we can get
// spurious connections to other tests on a stress run. This has been
// happening for a while, but only comes out rarely when this package is
// stressed. This test is very aggressive since it is calling GRPCDialNode in
// a busy loop for 50ms.
expected := "doesn't match server cluster ID"
// Should stay unhealthy despite reconnection attempts.
for then := timeutil.Now(); timeutil.Since(then) < 50*clientCtx.Config.RPCHeartbeatTimeout; {
err := clientCtx.TestingConnHealth(remoteAddr, serverNodeID)
if !isUnhealthy(err) {
if !isUnhealthy(err) && !testutils.IsError(err, expected) {
t.Fatal(err)
}
}
Expand Down
52 changes: 26 additions & 26 deletions pkg/sql/sqlstats/persistedsqlstats/compaction_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,9 @@ func TestSQLStatsCompactor(t *testing.T) {

// Disable automatic flush since the test will handle the flush manually.
sqlConn.Exec(t, "SET CLUSTER SETTING sql.stats.flush.interval = '24h'")
// Change the automatic compaction job to avoid it running during the test.
// Test creates a new compactor and calls it directly.
sqlConn.Exec(t, "SET CLUSTER SETTING sql.stats.cleanup.recurrence = '@yearly';")

for _, tc := range testCases {
t.Run(fmt.Sprintf("stmtCount=%d/maxPersistedRowLimit=%d/rowsDeletePerTxn=%d",
Expand Down Expand Up @@ -206,32 +209,29 @@ func TestSQLStatsCompactor(t *testing.T) {
require.Equal(t, tc.maxPersistedRowLimit, len(expectedDeletedStmtFingerprints))
require.Equal(t, tc.maxPersistedRowLimit, len(expectedDeletedTxnFingerprints))

run := func() {
// The two interceptors (kvInterceptor and cleanupInterceptor) are
// injected into kvserver and StatsCompactor respectively.
// The cleanupInterceptor calculates the number of expected "wide scan"
// that should be issued by the StatsCompactor.
// The kvInterceptor counts the number of actual "wide scan" KV Request
// issued.
kvInterceptor.reset()
cleanupInterceptor.reset()
kvInterceptor.enable()
defer kvInterceptor.disable()

err := statsCompactor.DeleteOldestEntries(ctx)
require.NoError(t, err)

expectedNumberOfWideScans := cleanupInterceptor.getExpectedNumberOfWideScans()
actualNumberOfWideScans := kvInterceptor.getTotalWideScans()

require.Equal(t,
expectedNumberOfWideScans,
actualNumberOfWideScans,
"expected %d number of wide scans issued, but %d number of "+
"wide scan issued", expectedNumberOfWideScans, actualNumberOfWideScans,
)
}
run()
// The two interceptors (kvInterceptor and cleanupInterceptor) are
// injected into kvserver and StatsCompactor respectively.
// The cleanupInterceptor calculates the number of expected "wide scan"
// that should be issued by the StatsCompactor.
// The kvInterceptor counts the number of actual "wide scan" KV Request
// issued.
kvInterceptor.reset()
cleanupInterceptor.reset()
kvInterceptor.enable()

err = statsCompactor.DeleteOldestEntries(ctx)
kvInterceptor.disable()
require.NoError(t, err)

expectedNumberOfWideScans := cleanupInterceptor.getExpectedNumberOfWideScans()
actualNumberOfWideScans := kvInterceptor.getTotalWideScans()

require.Equal(t,
expectedNumberOfWideScans,
actualNumberOfWideScans,
"expected %d number of wide scans issued, but %d number of "+
"wide scan issued", expectedNumberOfWideScans, actualNumberOfWideScans,
)

actualStmtFingerprints, actualTxnFingerprints :=
getTopSortedFingerprints(t, sqlConn, 0 /* limit */)
Expand Down

0 comments on commit 76956df

Please sign in to comment.