Skip to content

Commit

Permalink
kvserver: refactor tests to avoid reading from stopped servers' engines
Browse files Browse the repository at this point in the history
Epic: none
Release note: none
  • Loading branch information
jbowens committed Aug 4, 2023
1 parent c5c821a commit 2f79490
Show file tree
Hide file tree
Showing 8 changed files with 69 additions and 146 deletions.
10 changes: 0 additions & 10 deletions pkg/cli/democluster/demo_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -307,16 +307,6 @@ func (c *transientCluster) Start(ctx context.Context) (err error) {
}
rpcAddrReadyChs[i] = rpcAddrReady
}

// Ensure we close all sticky stores we've created when the stopper
// instructs the entire cluster to stop. We do this only here
// because we want this closer to be registered after all the
// individual servers' Stop() methods have been registered
// via createAndAddNode() above.
c.stopper.AddCloser(stop.CloserFn(func() {
c.stickyVFSRegistry.CloseAllEngines()
}))

// Start the remaining nodes asynchronously.
for i := 1; i < c.demoCtx.NumNodes; i++ {
if err := c.startNodeAsync(ctx, i, errCh, timeoutCh); err != nil {
Expand Down
61 changes: 27 additions & 34 deletions pkg/kv/kvserver/client_metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ import (
"context"
"fmt"
"strconv"
"sync"
"testing"

"github.com/cockroachdb/cockroach/pkg/base"
Expand Down Expand Up @@ -50,14 +49,11 @@ func checkGauge(t *testing.T, id string, g gaugeValuer, e int64) {
}
}

// verifyStatsOnStoppedServer checks a sets of stats on the specified list of servers. This method
// may produce false negatives when executed against a running server that has
// live traffic on it.
func verifyStatsOnStoppedServer(t *testing.T, tc *testcluster.TestCluster, storeIdxSlice ...int) {
// verifyStatsOnServers checks a sets of stats on the specified
// list of servers.
func verifyStatsOnServers(t *testing.T, tc *testcluster.TestCluster, storeIdxSlice ...int) {
t.Helper()
var stores []*kvserver.Store
var wg sync.WaitGroup

for _, storeIdx := range storeIdxSlice {
stores = append(stores, tc.GetFirstStoreFromServer(t, storeIdx))
}
Expand All @@ -75,21 +71,25 @@ func verifyStatsOnStoppedServer(t *testing.T, tc *testcluster.TestCluster, store
})
}

wg.Add(len(storeIdxSlice))
// We actually stop *all* of the Servers. Stopping only a few is riddled
// with deadlocks since operations can span nodes, but stoppers don't
// know about this - taking all of them down at the same time is the
// only sane way of guaranteeing that nothing interesting happens, at
// least when bringing down the nodes jeopardizes majorities.
for _, storeIdx := range storeIdxSlice {
go func(i int) {
defer wg.Done()
tc.StopServer(i)
}(storeIdx)
// Acquire readers into all three stores.
// TODO(jackson): This still leaves an opening for a flake; I
// believe the previous code that stopped all servers also was
// susceptible to flakes.
consistentIters := make([]storage.Reader, len(stores))
defer func() {
for i := range consistentIters {
if consistentIters[i] != nil {
consistentIters[i].Close()
}
}
}()
for i, s := range stores {
ro := s.TODOEngine().NewReadOnly(storage.StandardDurability)
require.NoError(t, ro.PinEngineStateForIterators())
consistentIters[i] = ro
}
wg.Wait()

for _, s := range stores {
for i, s := range stores {
idString := s.Ident.String()
m := s.Metrics()

Expand All @@ -100,7 +100,7 @@ func verifyStatsOnStoppedServer(t *testing.T, tc *testcluster.TestCluster, store
}

// Compute real total MVCC statistics from store.
realStats, err := s.ComputeMVCCStats()
realStats, err := s.ComputeMVCCStats(consistentIters[i])
if err != nil {
t.Fatal(err)
}
Expand All @@ -127,12 +127,7 @@ func verifyStatsOnStoppedServer(t *testing.T, tc *testcluster.TestCluster, store
}

if t.Failed() {
t.Fatalf("verifyStatsOnStoppedServer failed, aborting test.")
}

// Restart all Stores.
for _, storeIdx := range storeIdxSlice {
require.NoError(t, tc.RestartServer(storeIdx))
t.Fatalf("verifyStatsOnServers failed, aborting test.")
}
}

Expand Down Expand Up @@ -265,8 +260,6 @@ func TestStoreMetrics(t *testing.T) {
defer log.Scope(t).Close(t)

ctx := context.Background()
stickyVFSRegistry := server.NewStickyVFSRegistry(server.ReuseEnginesDeprecated)
defer stickyVFSRegistry.CloseAllEngines()
const numServers int = 3
stickyServerArgs := make(map[int]base.TestServerArgs)
for i := 0; i < numServers; i++ {
Expand All @@ -284,7 +277,7 @@ func TestStoreMetrics(t *testing.T) {
},
Knobs: base.TestingKnobs{
Server: &server.TestingKnobs{
StickyVFSRegistry: stickyVFSRegistry,
StickyVFSRegistry: server.NewStickyVFSRegistry(),
},
Store: &kvserver.StoreTestingKnobs{
DisableRaftLogQueue: true,
Expand Down Expand Up @@ -322,7 +315,7 @@ func TestStoreMetrics(t *testing.T) {
require.NoError(t, tc.WaitForVoters(key, tc.Targets(1, 2)...))

// Verify stats on store1 after replication.
verifyStatsOnStoppedServer(t, tc, 1)
verifyStatsOnServers(t, tc, 1)

// Add some data to the "right" range.
rangeKeyStart, rangeKeyEnd := key, key.Next()
Expand All @@ -340,7 +333,7 @@ func TestStoreMetrics(t *testing.T) {
// do that given all if the system table activity generated by the TestCluster.
// We use Servers[1] and Servers[2] instead, since we can control the traffic
// on those servers.
verifyStatsOnStoppedServer(t, tc, 1, 2)
verifyStatsOnServers(t, tc, 1, 2)

// Create a transaction statement that fails. Regression test for #4969.
if err := tc.GetFirstStoreFromServer(t, 0).DB().Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
Expand All @@ -354,7 +347,7 @@ func TestStoreMetrics(t *testing.T) {
}

// Verify stats after addition.
verifyStatsOnStoppedServer(t, tc, 1, 2)
verifyStatsOnServers(t, tc, 1, 2)
checkGauge(t, "store 0", tc.GetFirstStoreFromServer(t, 0).Metrics().ReplicaCount, initialCount+1)
tc.RemoveLeaseHolderOrFatal(t, desc, tc.Target(0), tc.Target(1))
testutils.SucceedsSoon(t, func() error {
Expand All @@ -373,7 +366,7 @@ func TestStoreMetrics(t *testing.T) {
checkGauge(t, "store 1", tc.GetFirstStoreFromServer(t, 1).Metrics().ReplicaCount, 1)

// Verify all stats on all stores after range is removed.
verifyStatsOnStoppedServer(t, tc, 1, 2)
verifyStatsOnServers(t, tc, 1, 2)

verifyStorageStats(t, tc.GetFirstStoreFromServer(t, 1))
verifyStorageStats(t, tc.GetFirstStoreFromServer(t, 2))
Expand Down
Loading

0 comments on commit 2f79490

Please sign in to comment.