Skip to content

Commit

Permalink
kvserver: replace multiTestContext with TestCluster/TestServer in cli…
Browse files Browse the repository at this point in the history
…ent split/merge/status tests

Makes progress on #8299

multiTestContext is a legacy construct that is deprecated in favor of running
tests via TestCluster. This is one PR out of many to remove the usage of
multiTestContext in client_status_test, client_merge_test, client_split_test.

To support these changes the following enhancements are introduced
- Allow a test to provide a ProtectedTimestampCache to the TestServer/TestCluster
- Refactor TestServer.ScratchRangeWithExpirationLease to expose the descriptors
  for the split.
- Add a convinience method to TestCluster to manually hearbeat node liveliness.

Release note: None
  • Loading branch information
lunevalex committed Jan 29, 2021
1 parent 03e1493 commit 78078b8
Show file tree
Hide file tree
Showing 11 changed files with 518 additions and 412 deletions.
516 changes: 292 additions & 224 deletions pkg/kv/kvserver/client_merge_test.go

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion pkg/kv/kvserver/client_raft_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3432,7 +3432,7 @@ func TestReplicateRemovedNodeDisruptiveElection(t *testing.T) {

// 2. Wait for all nodes to process the increment (and therefore the
// new lease).
tc.WaitForValues(t, key, []int64{value, value, value})
tc.WaitForValues(t, key, []int64{0, value, value, value})

// 3. Wait for the lease holder to obtain raft leadership too.
testutils.SucceedsSoon(t, func() error {
Expand Down
265 changes: 162 additions & 103 deletions pkg/kv/kvserver/client_split_test.go

Large diffs are not rendered by default.

31 changes: 15 additions & 16 deletions pkg/kv/kvserver/client_status_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,38 +14,37 @@ import (
"context"
"testing"

"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/server"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/errors"
"github.com/stretchr/testify/require"
)

func TestComputeStatsForKeySpan(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
mtc := &multiTestContext{}
defer mtc.Stop()
mtc.Start(t, 3)

ctx := context.Background()
serv, _, _ := serverutils.StartServer(t, base.TestServerArgs{})
s := serv.(*server.TestServer)
defer s.Stopper().Stop(ctx)
store, err := s.Stores().GetStore(s.GetFirstStoreID())
require.NoError(t, err)
// Create a number of ranges using splits.
splitKeys := []string{"a", "c", "e", "g", "i"}
for _, k := range splitKeys {
key := roachpb.Key(k)
repl := mtc.stores[0].LookupReplica(roachpb.RKey(key))
args := adminSplitArgs(key)
header := roachpb.Header{
RangeID: repl.RangeID,
}
if _, err := kv.SendWrappedWith(context.Background(), mtc.stores[0], header, args); err != nil {
t.Fatal(err)
}
_, _, err := s.SplitRange(roachpb.Key(k))
require.NoError(t, err)
}

// Wait for splits to finish.
testutils.SucceedsSoon(t, func() error {
repl := mtc.stores[0].LookupReplica(roachpb.RKey("z"))
repl := store.LookupReplica(roachpb.RKey("z"))
if actualRSpan := repl.Desc().RSpan(); !actualRSpan.Key.Equal(roachpb.RKey("i")) {
return errors.Errorf("expected range %s to begin at key 'i'", repl)
}
Expand All @@ -55,7 +54,7 @@ func TestComputeStatsForKeySpan(t *testing.T) {
// Create some keys across the ranges.
incKeys := []string{"b", "bb", "bbb", "d", "dd", "h"}
for _, k := range incKeys {
if _, err := mtc.dbs[0].Inc(context.Background(), []byte(k), 5); err != nil {
if _, err := store.DB().Inc(context.Background(), []byte(k), 5); err != nil {
t.Fatal(err)
}
}
Expand All @@ -73,7 +72,7 @@ func TestComputeStatsForKeySpan(t *testing.T) {
{"e", "i", 2, 1},
} {
start, end := tcase.startKey, tcase.endKey
result, err := mtc.stores[0].ComputeStatsForKeySpan(
result, err := store.ComputeStatsForKeySpan(
roachpb.RKey(start), roachpb.RKey(end))
if err != nil {
t.Fatal(err)
Expand Down
48 changes: 0 additions & 48 deletions pkg/kv/kvserver/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -498,54 +498,6 @@ func (m *multiTestContext) Stop() {
}
}

// gossipStores forces each store to gossip its store descriptor and then
// blocks until all nodes have received these updated descriptors.
func (m *multiTestContext) gossipStores() {
timestamps := make(map[string]int64)
for i := 0; i < len(m.stores); i++ {
<-m.gossips[i].Connected
if err := m.stores[i].GossipStore(context.Background(), false /* useCached */); err != nil {
m.t.Fatal(err)
}
infoStatus := m.gossips[i].GetInfoStatus()
storeKey := gossip.MakeStoreKey(m.stores[i].Ident.StoreID)
timestamps[storeKey] = infoStatus.Infos[storeKey].OrigStamp
}
// Wait until all stores know about each other.
testutils.SucceedsSoon(m.t, func() error {
for i := 0; i < len(m.stores); i++ {
nodeID := m.stores[i].Ident.NodeID
infoStatus := m.gossips[i].GetInfoStatus()
for storeKey, timestamp := range timestamps {
info, ok := infoStatus.Infos[storeKey]
if !ok {
return errors.Errorf("node %d does not have a storeDesc for %s yet", nodeID, storeKey)
}
if info.OrigStamp < timestamp {
return errors.Errorf("node %d's storeDesc for %s is not up to date", nodeID, storeKey)
}
}
}
return nil
})
}

// initGossipNetwork gossips all store descriptors and waits until all
// storePools have received those descriptors.
func (m *multiTestContext) initGossipNetwork() {
m.gossipStores()
testutils.SucceedsSoon(m.t, func() error {
for i := 0; i < len(m.stores); i++ {
if _, alive, _ := m.storePools[i].GetStoreList(); alive != len(m.stores) {
return errors.Errorf("node %d's store pool only has %d alive stores, expected %d",
m.stores[i].Ident.NodeID, alive, len(m.stores))
}
}
return nil
})
log.Info(context.Background(), "gossip network initialized")
}

type multiTestContextKVTransport struct {
mtc *multiTestContext
idx int
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ func (s *Store) SetSplitQueueActive(active bool) {
s.setSplitQueueActive(active)
}

// SetMergeQueueActive enables or disables the split queue.
// SetMergeQueueActive enables or disables the merge queue.
func (s *Store) SetMergeQueueActive(active bool) {
s.setMergeQueueActive(active)
}
Expand Down
18 changes: 10 additions & 8 deletions pkg/kv/kvserver/protectedts/ptprovider/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,14 +50,13 @@ func New(cfg Config) (protectedts.Provider, error) {
}
storage := ptstorage.New(cfg.Settings, cfg.InternalExecutor)
verifier := ptverifier.New(cfg.DB, storage)
cache := ptcache.New(ptcache.Config{
DB: cfg.DB,
Storage: storage,
Settings: cfg.Settings,
})
return &provider{
Storage: storage,
Cache: cache,
Storage: storage,
Cache: ptcache.New(ptcache.Config{
DB: cfg.DB,
Storage: storage,
Settings: cfg.Settings,
}),
Verifier: verifier,
}, nil
}
Expand All @@ -76,5 +75,8 @@ func validateConfig(cfg Config) error {
}

func (p *provider) Start(ctx context.Context, stopper *stop.Stopper) error {
return p.Cache.(*ptcache.Cache).Start(ctx, stopper)
if cache, ok := p.Cache.(*ptcache.Cache); ok {
return cache.Start(ctx, stopper)
}
return nil
}
1 change: 0 additions & 1 deletion pkg/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -486,7 +486,6 @@ func NewServer(cfg Config, stopper *stop.Stopper) (*Server, error) {
user security.SQLUsername) (cloud.ExternalStorage, error) {
return externalStorageBuilder.makeExternalStorageFromURI(ctx, uri, user)
}

protectedtsProvider, err := ptprovider.New(ptprovider.Config{
DB: db,
InternalExecutor: internalExecutor,
Expand Down
22 changes: 16 additions & 6 deletions pkg/server/testserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -1386,16 +1386,26 @@ func (ts *TestServer) ScratchRange() (roachpb.Key, error) {
return desc.StartKey.AsRawKey(), nil
}

// ScratchRangeWithExpirationLease is like ScratchRange but creates a range with
// an expiration based lease.
// ScratchRangeWithExpirationLease is like ScratchRangeWithExpirationLeaseEx but
// returns a key for the RHS ranges, instead of both descriptors from the split.
func (ts *TestServer) ScratchRangeWithExpirationLease() (roachpb.Key, error) {
scratchKey := roachpb.Key(bytes.Join([][]byte{keys.SystemPrefix,
roachpb.RKey("\x00aaa-testing")}, nil))
_, _, err := ts.SplitRange(scratchKey)
_, desc, err := ts.ScratchRangeWithExpirationLeaseEx()
if err != nil {
return nil, err
}
return scratchKey, nil
return desc.StartKey.AsRawKey(), nil
}

// ScratchRangeWithExpirationLeaseEx is like ScratchRange but creates a range with
// an expiration based lease.
func (ts *TestServer) ScratchRangeWithExpirationLeaseEx() (
roachpb.RangeDescriptor,
roachpb.RangeDescriptor,
error,
) {
scratchKey := roachpb.Key(bytes.Join([][]byte{keys.SystemPrefix,
roachpb.RKey("\x00aaa-testing")}, nil))
return ts.SplitRange(scratchKey)
}

// MetricsRecorder periodically records node-level and store-level metrics.
Expand Down
1 change: 1 addition & 0 deletions pkg/testutils/testcluster/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ go_library(
"//pkg/gossip",
"//pkg/keys",
"//pkg/kv/kvserver",
"//pkg/kv/kvserver/liveness",
"//pkg/kv/kvserver/liveness/livenesspb",
"//pkg/roachpb",
"//pkg/rpc",
Expand Down
24 changes: 20 additions & 4 deletions pkg/testutils/testcluster/testcluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/gossip"
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/liveness"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/liveness/livenesspb"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/rpc"
Expand Down Expand Up @@ -1065,8 +1066,8 @@ func (tc *TestCluster) ToggleReplicateQueues(active bool) {
// from all configured engines, filling in zeros when the value is not
// found.
func (tc *TestCluster) readIntFromStores(key roachpb.Key) []int64 {
results := make([]int64, 0, len(tc.Servers))
for _, server := range tc.Servers {
results := make([]int64, len(tc.Servers))
for i, server := range tc.Servers {
err := server.Stores().VisitStores(func(s *kvserver.Store) error {
val, _, err := storage.MVCCGet(context.Background(), s.Engine(), key,
server.Clock().Now(), storage.MVCCGetOptions{})
Expand All @@ -1075,11 +1076,10 @@ func (tc *TestCluster) readIntFromStores(key roachpb.Key) []int64 {
} else if val == nil {
log.VEventf(context.Background(), 1, "store %d: missing key %s", s.StoreID(), key)
} else {
result, err := val.GetInt()
results[i], err = val.GetInt()
if err != nil {
log.Errorf(context.Background(), "store %d: error decoding %s from key %s: %+v", s.StoreID(), val, key, err)
}
results = append(results, result)
}
return nil
})
Expand Down Expand Up @@ -1265,6 +1265,22 @@ func (tc *TestCluster) GetStatusClient(
return serverpb.NewStatusClient(cc)
}

// HeartbeatLiveness sends a liveness heartbeat on a particular store.
func (tc *TestCluster) HeartbeatLiveness(ctx context.Context, storeIdx int) error {
nl := tc.Servers[storeIdx].NodeLiveness().(*liveness.NodeLiveness)
l, ok := nl.Self()
if !ok {
return errors.New("liveness not found")
}
var err error
for r := retry.StartWithCtx(ctx, retry.Options{MaxRetries: 5}); r.Next(); {
if err = nl.Heartbeat(ctx, l); !errors.Is(err, liveness.ErrEpochIncremented) {
break
}
}
return err
}

type testClusterFactoryImpl struct{}

// TestClusterFactory can be passed to serverutils.InitTestClusterFactory
Expand Down

0 comments on commit 78078b8

Please sign in to comment.