From 41288e1fb83cb29f034f6628b2887de3059b654f Mon Sep 17 00:00:00 2001 From: Andrew Baptist Date: Mon, 3 Apr 2023 17:01:28 -0400 Subject: [PATCH 1/5] rpc: Handle closed error We close the listener before closing the connection. This can result in a spurious failure due to the Listener also closing our connection. Epic: none Fixes: #100391 Fixes: #77754 Informs: #80034 Release note: None --- pkg/rpc/context_test.go | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/pkg/rpc/context_test.go b/pkg/rpc/context_test.go index f1e6efb1044d..26f9de8bfd4a 100644 --- a/pkg/rpc/context_test.go +++ b/pkg/rpc/context_test.go @@ -1176,7 +1176,9 @@ func TestHeartbeatHealthTransport(t *testing.T) { defer mu.Unlock() n := len(mu.conns) for i := n - 1; i >= 0; i-- { - if err := mu.conns[i].Close(); err != nil { + // This can spuriously return ErrClosed since the listener is closed + // before us. + if err := mu.conns[i].Close(); err != nil && !errors.Is(err, net.ErrClosed) { return 0, err } mu.conns = mu.conns[:i] @@ -1272,10 +1274,16 @@ func TestHeartbeatHealthTransport(t *testing.T) { return nil }) + // TODO(baptist): Better understand when this happens. It appears we can get + // spurious connections to other tests on a stress run. This has been + // happening for a while, but only comes out rarely when this package is + // stressed. This test is very aggressive since it is calling GRPCDialNode in + // a busy loop for 50ms. + expected := "doesn't match server cluster ID" // Should stay unhealthy despite reconnection attempts. for then := timeutil.Now(); timeutil.Since(then) < 50*clientCtx.Config.RPCHeartbeatTimeout; { err := clientCtx.TestingConnHealth(remoteAddr, serverNodeID) - if !isUnhealthy(err) { + if !isUnhealthy(err) && !testutils.IsError(err, expected) { t.Fatal(err) } } From 3b94693a9451690eede167bc8f0bd5e43ba015ef Mon Sep 17 00:00:00 2001 From: Austen McClernon Date: Tue, 4 Apr 2023 13:48:32 +0000 Subject: [PATCH 2/5] allocator: deflake full disk test In #97409 we introduced cluster settings to control the disk fullness threshold for rebalancing towards a store and shedding replicas off of the store. The `TestAllocatorFullDisks` assumes the total number of range bytes is equal or less than the rebalance threshold of the nodes, however the test was updated to use the shed threshold instead. This caused the test to flake occasionally as there was more than the expected amount of total range bytes. This patch changes the ranges per node calculation to use the rebalance threshold again, instead of the shed threshold Fixes: #100033 Release note: None --- .../kvserver/allocator/allocatorimpl/allocator_test.go | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/pkg/kv/kvserver/allocator/allocatorimpl/allocator_test.go b/pkg/kv/kvserver/allocator/allocatorimpl/allocator_test.go index fc8bac15aa1e..c4c43afcd5d7 100644 --- a/pkg/kv/kvserver/allocator/allocatorimpl/allocator_test.go +++ b/pkg/kv/kvserver/allocator/allocatorimpl/allocator_test.go @@ -8475,7 +8475,14 @@ func TestAllocatorFullDisks(t *testing.T) { do := makeDiskCapacityOptions(&st.SV) - rangesPerNode := int(math.Floor(capacity * do.ShedAndBlockAllThreshold / rangeSize)) + // Each range is equally sized (16mb), we want the number of ranges per node, + // when their size is added, to be no greater than the full disk rebalance + // threshold (0.925%) e.g for below: + // capacity = 1024mb + // rangeSize = 16mb + // threshold = 0.925 + // rangesPerNode = ⌊1024mb * 0.925 / 16mb⌋ = 59 + rangesPerNode := int(math.Floor(capacity * do.RebalanceToThreshold / rangeSize)) rangesToAdd := rangesPerNode * nodes // Initialize testStores. From ad6b4303d65934dc24990106aa80ad1983148d94 Mon Sep 17 00:00:00 2001 From: Yevgeniy Miretskiy Date: Thu, 30 Mar 2023 14:40:09 -0400 Subject: [PATCH 3/5] kvcoord: Restart ranges on a dedicated goroutine. Restart ranges on a dedicated goroutine (if needed). Fix logic bug in stuck range handling. Increase verbosity of logging to help debug mux rangefeed issues. Informs #99560 Informs #99640 Informs #99214 Informs #98925 Informs #99092 Informs #99212 Informs #99910 Informs #99560 Informs #100468 Release note: None --- .../kvcoord/dist_sender_mux_rangefeed.go | 97 +++++++++++-------- 1 file changed, 56 insertions(+), 41 deletions(-) diff --git a/pkg/kv/kvclient/kvcoord/dist_sender_mux_rangefeed.go b/pkg/kv/kvclient/kvcoord/dist_sender_mux_rangefeed.go index 7d8778618e61..3ddc2e98d072 100644 --- a/pkg/kv/kvclient/kvcoord/dist_sender_mux_rangefeed.go +++ b/pkg/kv/kvclient/kvcoord/dist_sender_mux_rangefeed.go @@ -72,7 +72,8 @@ func muxRangeFeed( catchupSem *limit.ConcurrentRequestLimiter, eventCh chan<- RangeFeedMessage, ) (retErr error) { - if log.V(1) { + // TODO(yevgeniy): Undo log.V(0) and replace with log.V(1) once stuck rangefeed issue resolved. + if log.V(0) { log.Infof(ctx, "Establishing MuxRangeFeed (%s...; %d spans)", spans[0], len(spans)) start := timeutil.Now() defer func() { @@ -198,7 +199,7 @@ func (m *rangefeedMuxer) startSingleRangeFeed( var err error ri, err := m.ds.getRoutingInfo(ctx, rs.Key, rangecache.EvictionToken{}, false) if err != nil { - log.VErrEventf(ctx, 1, "range descriptor re-lookup failed: %s", err) + log.VErrEventf(ctx, 0, "range descriptor re-lookup failed: %s", err) if !rangecache.IsRangeLookupErrorRetryable(err) { return err } @@ -208,11 +209,11 @@ func (m *rangefeedMuxer) startSingleRangeFeed( } // Establish a RangeFeed for a single Range. - log.VEventf(ctx, 1, "MuxRangeFeed starting for range %s@%s (rangeID %d)", - span, startAfter, token.Desc().RangeID) + log.VEventf(ctx, 0, "MuxRangeFeed starting for range %s@%s (rangeID %d, attempt %d)", + span, startAfter, token.Desc().RangeID, r.CurrentAttempt()) transport, err := newTransportForRange(ctx, token.Desc(), m.ds) if err != nil { - log.VErrEventf(ctx, 1, "Failed to create transport for %s ", token.String()) + log.VErrEventf(ctx, 0, "Failed to create transport for %s (err=%s) ", token.String(), err) continue } releaseTransport = transport.Release @@ -224,7 +225,7 @@ func (m *rangefeedMuxer) startSingleRangeFeed( rpcClient, err := transport.NextInternalClient(ctx) if err != nil { - log.VErrEventf(ctx, 1, "RPC error connecting to replica %s: %s", args.Replica, err) + log.VErrEventf(ctx, 0, "RPC error connecting to replica %s: %s", args.Replica, err) continue } @@ -234,7 +235,7 @@ func (m *rangefeedMuxer) startSingleRangeFeed( } if err := conn.startRangeFeed(streamID, stream, &args); err != nil { - log.VErrEventf(ctx, 1, + log.VErrEventf(ctx, 0, "RPC error establishing mux rangefeed to replica %s: %s", args.Replica, err) continue } @@ -292,7 +293,7 @@ func (m *rangefeedMuxer) startNodeMuxRangeFeed( ctx, restore := pprofutil.SetProfilerLabelsFromCtxTags(ctx) defer restore() - if log.V(1) { + if log.V(0) { log.Infof(ctx, "Establishing MuxRangeFeed to node %d", nodeID) start := timeutil.Now() defer func() { @@ -331,9 +332,17 @@ func (m *rangefeedMuxer) startNodeMuxRangeFeed( recvErr = nil } - return ms.closeWithRestart(ctx, recvErr, func(a *activeMuxRangeFeed) error { - return m.restartActiveRangeFeed(ctx, a, recvErr) - }) + // make sure that the underlying error is not fatal. If it is, there is no + // reason to restart each rangefeed, so just bail out. + if _, err := handleRangefeedError(ctx, recvErr); err != nil { + return err + } + + toRestart := ms.close() + if log.V(0) { + log.Infof(ctx, "mux to node %d restarted %d streams", ms.nodeID, len(toRestart)) + } + return m.restartActiveRangeFeeds(ctx, recvErr, toRestart) } return nil @@ -374,7 +383,7 @@ func (m *rangefeedMuxer) receiveEventsFromNode( // which one executes is a coin flip) and so it is possible that we may see // additional event(s) arriving for a stream that is no longer active. if active == nil { - if log.V(1) { + if log.V(0) { log.Infof(ctx, "received stray event stream %d: %v", event.StreamID, event) } continue @@ -401,9 +410,12 @@ func (m *rangefeedMuxer) receiveEventsFromNode( m.ds.metrics.RangefeedErrorCatchup.Inc(1) } ms.deleteStream(event.StreamID) - if err := m.restartActiveRangeFeed(ctx, active, t.Error.GoError()); err != nil { - return err - } + // Restart rangefeed on another goroutine. Restart might be a bit + // expensive, particularly if we have to resolve span. We do not want + // to block receiveEventsFromNode for too long. + m.g.GoCtx(func(ctx context.Context) error { + return m.restartActiveRangeFeed(ctx, active, t.Error.GoError()) + }) continue } @@ -420,12 +432,16 @@ func (m *rangefeedMuxer) receiveEventsFromNode( // NB: this does not notify the server in any way. We may have to add // a more complex protocol -- or better yet, figure out why ranges // get stuck in the first place. - if timeutil.Now().Before(nextStuckCheck) { + if timeutil.Now().After(nextStuckCheck) { if threshold := stuckThreshold(); threshold > 0 { - for _, a := range ms.purgeStuckStreams(threshold) { - if err := m.restartActiveRangeFeed(ctx, a, errRestartStuckRange); err != nil { - return err - } + // Restart rangefeed on another goroutine. Restart might be a bit + // expensive, particularly if we have to resolve span. We do not want + // to block receiveEventsFromNode for too long. + toRestart := ms.purgeStuckStreams(threshold) + if len(toRestart) > 0 { + m.g.GoCtx(func(ctx context.Context) error { + return m.restartActiveRangeFeeds(ctx, errRestartStuckRange, toRestart) + }) } } nextStuckCheck = timeutil.Now().Add(stuckCheckFreq()) @@ -433,13 +449,25 @@ func (m *rangefeedMuxer) receiveEventsFromNode( } } +// restarActiveRangeFeeds restarts one or more rangefeeds. +func (m *rangefeedMuxer) restartActiveRangeFeeds( + ctx context.Context, reason error, toRestart []*activeMuxRangeFeed, +) error { + for _, active := range toRestart { + if err := m.restartActiveRangeFeed(ctx, active, reason); err != nil { + return err + } + } + return nil +} + // restartActiveRangeFeed restarts rangefeed after it encountered "reason" error. func (m *rangefeedMuxer) restartActiveRangeFeed( ctx context.Context, active *activeMuxRangeFeed, reason error, ) error { m.ds.metrics.RangefeedRestartRanges.Inc(1) - if log.V(1) { + if log.V(0) { log.Infof(ctx, "RangeFeed %s@%s disconnected with last checkpoint %s ago: %v", active.Span, active.StartAfter, timeutil.Since(active.Resolved.GoTime()), reason) } @@ -521,29 +549,16 @@ func (c *muxStream) deleteStream(streamID int64) { c.mu.Unlock() } -func (c *muxStream) closeWithRestart( - ctx context.Context, reason error, restartFn func(a *activeMuxRangeFeed) error, -) error { +// close closes mux stream returning the list of active range feeds. +func (c *muxStream) close() []*activeMuxRangeFeed { c.mu.Lock() c.mu.closed = true - toRestart := c.mu.streams + toRestart := make([]*activeMuxRangeFeed, 0, len(c.mu.streams)) + for _, a := range c.mu.streams { + toRestart = append(toRestart, a) + } c.mu.streams = nil c.mu.Unlock() - // make sure that the underlying error is not fatal. If it is, there is no - // reason to restart each rangefeed, so just bail out. - if _, err := handleRangefeedError(ctx, reason); err != nil { - return err - } - - for _, a := range toRestart { - if err := restartFn(a); err != nil { - return err - } - } - - if log.V(1) { - log.Infof(ctx, "mux to node %d restarted %d streams", c.nodeID, len(toRestart)) - } - return nil + return toRestart } From 6c4d605e4cc38350e31983c8dead6c6390fcf406 Mon Sep 17 00:00:00 2001 From: Stan Rosenberg Date: Tue, 4 Apr 2023 13:26:55 -0400 Subject: [PATCH 4/5] roachtest: set config.Quiet to true After refactoring in [1], the default of config.Quiet was set to false since the roachprod CLI option is intended to set it to true. This resulted in an unwanted side-effect, namely roachtests running with the new default. Consequently, test_runner's log ended up with a bunch of (terminal) escape codes due to (status) spinner. This change ensures roachtest explicitly sets config.Quiet to true. [1] https://github.com/cockroachdb/cockroach/pull/99133 Epic: none Release note: None --- pkg/cmd/roachtest/main.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/pkg/cmd/roachtest/main.go b/pkg/cmd/roachtest/main.go index 1d270989fa49..20b47966081f 100644 --- a/pkg/cmd/roachtest/main.go +++ b/pkg/cmd/roachtest/main.go @@ -353,6 +353,8 @@ runner itself. fmt.Fprintf(os.Stderr, "unable to lookup current user: %s\n", err) os.Exit(1) } + // Disable spinners and other fancy status messages since all IO is non-interactive. + config.Quiet = true if err := roachprod.InitDirs(); err != nil { fmt.Fprintf(os.Stderr, "%s\n", err) From 097fe1b86c0d745c5a47b449b5efb1744d7e8021 Mon Sep 17 00:00:00 2001 From: j82w Date: Mon, 3 Apr 2023 17:58:11 -0400 Subject: [PATCH 5/5] sql: fix flaky TestSQLStatsCompactor The test failure is showing more total wide scans than expected. Change the compact stats job to run once a year to avoid it running at the same time as the test. The interceptor is disabled right after delete reducing the possibility of another operation causing a conflict. Epic: none closes: #99653 Release note: none --- .../persistedsqlstats/compaction_test.go | 52 +++++++++---------- 1 file changed, 26 insertions(+), 26 deletions(-) diff --git a/pkg/sql/sqlstats/persistedsqlstats/compaction_test.go b/pkg/sql/sqlstats/persistedsqlstats/compaction_test.go index 5c0d30a024db..2021952cf073 100644 --- a/pkg/sql/sqlstats/persistedsqlstats/compaction_test.go +++ b/pkg/sql/sqlstats/persistedsqlstats/compaction_test.go @@ -143,6 +143,9 @@ func TestSQLStatsCompactor(t *testing.T) { // Disable automatic flush since the test will handle the flush manually. sqlConn.Exec(t, "SET CLUSTER SETTING sql.stats.flush.interval = '24h'") + // Change the automatic compaction job to avoid it running during the test. + // Test creates a new compactor and calls it directly. + sqlConn.Exec(t, "SET CLUSTER SETTING sql.stats.cleanup.recurrence = '@yearly';") for _, tc := range testCases { t.Run(fmt.Sprintf("stmtCount=%d/maxPersistedRowLimit=%d/rowsDeletePerTxn=%d", @@ -206,32 +209,29 @@ func TestSQLStatsCompactor(t *testing.T) { require.Equal(t, tc.maxPersistedRowLimit, len(expectedDeletedStmtFingerprints)) require.Equal(t, tc.maxPersistedRowLimit, len(expectedDeletedTxnFingerprints)) - run := func() { - // The two interceptors (kvInterceptor and cleanupInterceptor) are - // injected into kvserver and StatsCompactor respectively. - // The cleanupInterceptor calculates the number of expected "wide scan" - // that should be issued by the StatsCompactor. - // The kvInterceptor counts the number of actual "wide scan" KV Request - // issued. - kvInterceptor.reset() - cleanupInterceptor.reset() - kvInterceptor.enable() - defer kvInterceptor.disable() - - err := statsCompactor.DeleteOldestEntries(ctx) - require.NoError(t, err) - - expectedNumberOfWideScans := cleanupInterceptor.getExpectedNumberOfWideScans() - actualNumberOfWideScans := kvInterceptor.getTotalWideScans() - - require.Equal(t, - expectedNumberOfWideScans, - actualNumberOfWideScans, - "expected %d number of wide scans issued, but %d number of "+ - "wide scan issued", expectedNumberOfWideScans, actualNumberOfWideScans, - ) - } - run() + // The two interceptors (kvInterceptor and cleanupInterceptor) are + // injected into kvserver and StatsCompactor respectively. + // The cleanupInterceptor calculates the number of expected "wide scan" + // that should be issued by the StatsCompactor. + // The kvInterceptor counts the number of actual "wide scan" KV Request + // issued. + kvInterceptor.reset() + cleanupInterceptor.reset() + kvInterceptor.enable() + + err = statsCompactor.DeleteOldestEntries(ctx) + kvInterceptor.disable() + require.NoError(t, err) + + expectedNumberOfWideScans := cleanupInterceptor.getExpectedNumberOfWideScans() + actualNumberOfWideScans := kvInterceptor.getTotalWideScans() + + require.Equal(t, + expectedNumberOfWideScans, + actualNumberOfWideScans, + "expected %d number of wide scans issued, but %d number of "+ + "wide scan issued", expectedNumberOfWideScans, actualNumberOfWideScans, + ) actualStmtFingerprints, actualTxnFingerprints := getTopSortedFingerprints(t, sqlConn, 0 /* limit */)