Skip to content

Commit

Permalink
kvserver: convert the last set of multiTestContext tests to use TestC…
Browse files Browse the repository at this point in the history
…luster

Makes progress on #8299

This change converts the last set of tests in client_metrics_test,
client_raft_test, client_raft_log_queue_test to use TestCluster instead
of multiTestContext.

Release note: None
  • Loading branch information
lunevalex committed Feb 6, 2021
1 parent 222cded commit dd8197f
Show file tree
Hide file tree
Showing 7 changed files with 651 additions and 606 deletions.
180 changes: 79 additions & 101 deletions pkg/kv/kvserver/client_metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ package kvserver_test
import (
"context"
"fmt"
"sync"
"testing"
"time"

Expand All @@ -23,7 +22,9 @@ import (
"github.com/cockroachdb/cockroach/pkg/kv/kvserver"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/batcheval/result"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/server"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
"github.com/cockroachdb/cockroach/pkg/testutils/sqlutils"
"github.com/cockroachdb/cockroach/pkg/testutils/testcluster"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
Expand All @@ -47,23 +48,15 @@ func checkGauge(t *testing.T, id string, g gaugeValuer, e int64) {
}
}

func verifyStats(t *testing.T, mtc *multiTestContext, storeIdxSlice ...int) {
// verifyStats checks a sets of stats on the specified list of servers. This method
// may produce false negatives when executed against a running server that has
// live traffic on it.
func verifyStats(t *testing.T, tc *testcluster.TestCluster, storeIdxSlice ...int) {
t.Helper()
var stores []*kvserver.Store
var wg sync.WaitGroup

mtc.mu.RLock()
numStores := len(mtc.stores)
// We need to stop the stores at the given indexes, while keeping the reference to the
// store objects. ComputeMVCCStats() still works on a stopped store (it needs
// only the engine, which is still open), and the most recent stats are still
// available on the stopped store object; however, no further information can
// be committed to the store while it is stopped, preventing any races during
// verification.
for _, storeIdx := range storeIdxSlice {
stores = append(stores, mtc.stores[storeIdx])
stores = append(stores, tc.GetFirstStoreFromServer(t, storeIdx))
}
mtc.mu.RUnlock()

// Sanity regression check for bug #4624: ensure intent count is zero.
// This may not be true immediately due to the asynchronous nature of
Expand All @@ -78,20 +71,6 @@ func verifyStats(t *testing.T, mtc *multiTestContext, storeIdxSlice ...int) {
})
}

wg.Add(numStores)
// We actually stop *all* of the Stores. Stopping only a few is riddled
// with deadlocks since operations can span nodes, but stoppers don't
// know about this - taking all of them down at the same time is the
// only sane way of guaranteeing that nothing interesting happens, at
// least when bringing down the nodes jeopardizes majorities.
for i := 0; i < numStores; i++ {
go func(i int) {
defer wg.Done()
mtc.stopStore(i)
}(i)
}
wg.Wait()

for _, s := range stores {
idString := s.Ident.String()
m := s.Metrics()
Expand Down Expand Up @@ -128,11 +107,6 @@ func verifyStats(t *testing.T, mtc *multiTestContext, storeIdxSlice ...int) {
if t.Failed() {
t.Fatalf("verifyStats failed, aborting test.")
}

// Restart all Stores.
for i := 0; i < numStores; i++ {
mtc.restartStore(i)
}
}

func verifyRocksDBStats(t *testing.T, s *kvserver.Store) {
Expand Down Expand Up @@ -179,13 +153,16 @@ func TestStoreResolveMetrics(t *testing.T) {
}
}

mtc := &multiTestContext{}
defer mtc.Stop()
mtc.Start(t, 1)

ctx := context.Background()
serv, _, _ := serverutils.StartServer(t, base.TestServerArgs{})
s := serv.(*server.TestServer)
defer s.Stopper().Stop(ctx)
store, err := s.Stores().GetStore(s.GetFirstStoreID())
require.NoError(t, err)

span := roachpb.Span{Key: roachpb.Key("a"), EndKey: roachpb.Key("b")}
key, err := s.ScratchRange()
require.NoError(t, err)
span := roachpb.Span{Key: key, EndKey: key.Next()}

txn := roachpb.MakeTransaction("foo", span.Key, roachpb.MinUserPriority, hlc.Timestamp{WallTime: 123}, 999)

Expand All @@ -195,7 +172,7 @@ func TestStoreResolveMetrics(t *testing.T) {

var ba roachpb.BatchRequest
{
repl := mtc.stores[0].LookupReplica(keys.MustAddr(span.Key))
repl := store.LookupReplica(keys.MustAddr(span.Key))
var err error
if ba.Replica, err = repl.GetReplicaDescriptor(); err != nil {
t.Fatal(err)
Expand Down Expand Up @@ -231,17 +208,17 @@ func TestStoreResolveMetrics(t *testing.T) {
add(roachpb.ABORTED, false, resolveAbortCount)
add(roachpb.ABORTED, true, resolvePoisonCount)

if _, pErr := mtc.senders[0].Send(ctx, ba); pErr != nil {
if _, pErr := store.TestSender().Send(ctx, ba); pErr != nil {
t.Fatal(pErr)
}

if exp, act := resolveCommitCount, mtc.stores[0].Metrics().ResolveCommitCount.Count(); act < exp || act > exp+50 {
if exp, act := resolveCommitCount, store.Metrics().ResolveCommitCount.Count(); act < exp || act > exp+50 {
t.Errorf("expected around %d intent commits, saw %d", exp, act)
}
if exp, act := resolveAbortCount, mtc.stores[0].Metrics().ResolveAbortCount.Count(); act < exp || act > exp+50 {
if exp, act := resolveAbortCount, store.Metrics().ResolveAbortCount.Count(); act < exp || act > exp+50 {
t.Errorf("expected around %d intent aborts, saw %d", exp, act)
}
if exp, act := resolvePoisonCount, mtc.stores[0].Metrics().ResolvePoisonCount.Count(); act < exp || act > exp+50 {
if exp, act := resolvePoisonCount, store.Metrics().ResolvePoisonCount.Count(); act < exp || act > exp+50 {
t.Errorf("expected arounc %d abort span poisonings, saw %d", exp, act)
}
}
Expand All @@ -250,65 +227,69 @@ func TestStoreMetrics(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

storeCfg := kvserver.TestStoreConfig(nil /* clock */)
storeCfg.TestingKnobs.DisableMergeQueue = true
storeCfg.TestingKnobs.DisableReplicateQueue = true
mtc := &multiTestContext{
storeConfig: &storeCfg,
// This test was written before the multiTestContext started creating many
// system ranges at startup, and hasn't been update to take that into
// account.
startWithSingleRange: true,
}
defer mtc.Stop()
mtc.Start(t, 3)
ctx := context.Background()
tc := testcluster.StartTestCluster(t, 3,
base.TestClusterArgs{
ReplicationMode: base.ReplicationManual,
ServerArgs: base.TestServerArgs{
StoreSpecs: []base.StoreSpec{
{
InMemory: true,
// Specify a size to trigger the BlockCache in Pebble.
Size: base.SizeSpec{
InBytes: 1 << 20,
},
},
},
Knobs: base.TestingKnobs{
Store: &kvserver.StoreTestingKnobs{
DisableRaftLogQueue: true,
},
},
},
})
defer tc.Stopper().Stop(ctx)

// Flush RocksDB memtables, so that RocksDB begins using block-based tables.
// Flush Pebble memtables, so that Pebble begins using block-based tables.
// This is useful, because most of the stats we track don't apply to
// memtables.
if err := mtc.stores[0].Engine().Flush(); err != nil {
t.Fatal(err)
}
if err := mtc.stores[1].Engine().Flush(); err != nil {
t.Fatal(err)
}

// Disable the raft log truncation which confuses this test.
for _, s := range mtc.stores {
s.SetRaftLogQueueActive(false)
for i := range tc.Servers {
if err := tc.GetFirstStoreFromServer(t, i).Engine().Flush(); err != nil {
t.Fatal(err)
}
}

// Perform a split, which has special metrics handling.
splitArgs := adminSplitArgs(roachpb.Key("m"))
if _, err := kv.SendWrapped(context.Background(), mtc.stores[0].TestSender(), splitArgs); err != nil {
initialCount := tc.GetFirstStoreFromServer(t, 0).Metrics().ReplicaCount.Value()
key := tc.ScratchRange(t)
if _, err := tc.GetFirstStoreFromServer(t, 0).DB().Inc(ctx, key, 10); err != nil {
t.Fatal(err)
}

// Verify range count is as expected
checkGauge(t, "store 0", mtc.stores[0].Metrics().ReplicaCount, 2)

// Verify all stats on store0 after split.
verifyStats(t, mtc, 0)
checkGauge(t, "store 0", tc.GetFirstStoreFromServer(t, 0).Metrics().ReplicaCount, initialCount+1)

// Replicate the "right" range to the other stores.
replica := mtc.stores[0].LookupReplica(roachpb.RKey("z"))
mtc.replicateRange(replica.RangeID, 1, 2)
desc := tc.AddVotersOrFatal(t, key, tc.Targets(1, 2)...)
require.NoError(t, tc.WaitForVoters(key, tc.Targets(1, 2)...))

// Verify stats on store1 after replication.
verifyStats(t, mtc, 1)
verifyStats(t, tc, 1)

// Add some data to the "right" range.
dataKey := []byte("z")
if _, err := mtc.dbs[0].Inc(context.Background(), dataKey, 5); err != nil {
dataKey := key.Next()
if _, err := tc.GetFirstStoreFromServer(t, 0).DB().Inc(ctx, dataKey, 5); err != nil {
t.Fatal(err)
}
mtc.waitForValues(roachpb.Key("z"), []int64{5, 5, 5})
tc.WaitForValues(t, dataKey, []int64{5, 5, 5})

// Verify all stats on stores after addition.
verifyStats(t, mtc, 0, 1, 2)
// We skip verifying stats on Server[0] because there is no reliable way to
// do that given all if the system table activity generated by the TestCluster.
// We use Servers[1] and Servers[2] instead, since we can control the traffic
// on those servers.
verifyStats(t, tc, 1, 2)

// Create a transaction statement that fails. Regression test for #4969.
if err := mtc.dbs[0].Txn(context.Background(), func(ctx context.Context, txn *kv.Txn) error {
if err := tc.GetFirstStoreFromServer(t, 0).DB().Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
b := txn.NewBatch()
var expVal roachpb.Value
expVal.SetInt(6)
Expand All @@ -319,34 +300,31 @@ func TestStoreMetrics(t *testing.T) {
}

// Verify stats after addition.
verifyStats(t, mtc, 0, 1, 2)
checkGauge(t, "store 0", mtc.stores[0].Metrics().ReplicaCount, 2)
verifyStats(t, tc, 1, 2)
checkGauge(t, "store 0", tc.GetFirstStoreFromServer(t, 0).Metrics().ReplicaCount, initialCount+1)

// Unreplicate range from the first store.
tc.TransferRangeLeaseOrFatal(t, desc, tc.Target(1))
tc.RemoveVotersOrFatal(t, key, tc.Target(0))
testutils.SucceedsSoon(t, func() error {
// This statement can fail if store 0 is not the leaseholder.
if err := mtc.transferLeaseNonFatal(context.Background(), replica.RangeID, 0, 1); err != nil {
t.Log(err)
_, err := tc.GetFirstStoreFromServer(t, 0).GetReplica(desc.RangeID)
if err == nil {
return fmt.Errorf("replica still exists on dest 0")
} else if errors.HasType(err, (*roachpb.RangeNotFoundError)(nil)) {
return nil
}
// This statement will fail if store 0 IS the leaseholder. This can happen
// even after the previous statement.
return mtc.unreplicateRangeNonFatal(replica.RangeID, 0)
return err
})

// Wait until we're sure that store 0 has successfully processed its removal.
require.NoError(t, mtc.waitForUnreplicated(replica.RangeID, 0))

mtc.waitForValues(roachpb.Key("z"), []int64{0, 5, 5})
tc.WaitForValues(t, dataKey, []int64{0, 5, 5})

// Verify range count is as expected.
checkGauge(t, "store 0", mtc.stores[0].Metrics().ReplicaCount, 1)
checkGauge(t, "store 1", mtc.stores[1].Metrics().ReplicaCount, 1)
checkGauge(t, "store 0", tc.GetFirstStoreFromServer(t, 0).Metrics().ReplicaCount, initialCount)
checkGauge(t, "store 1", tc.GetFirstStoreFromServer(t, 1).Metrics().ReplicaCount, 1)

// Verify all stats on all stores after range is removed.
verifyStats(t, mtc, 0, 1, 2)
verifyStats(t, tc, 1, 2)

verifyRocksDBStats(t, mtc.stores[0])
verifyRocksDBStats(t, mtc.stores[1])
verifyRocksDBStats(t, tc.GetFirstStoreFromServer(t, 1))
verifyRocksDBStats(t, tc.GetFirstStoreFromServer(t, 2))
}

// TestStoreMaxBehindNanosOnlyTracksEpochBasedLeases ensures that the metric
Expand Down
Loading

0 comments on commit dd8197f

Please sign in to comment.