Skip to content

Commit

Permalink
Merge #59059
Browse files Browse the repository at this point in the history
59059: testcluster: Introduce a HybridManualClock and wire it into TestCluster r=lunevalex a=lunevalex

Makes progress on #8299

This commit introduces a new type of ManualClock a HybridManualClock.
and wires it into the TestCluster.  This clock follows the physical
wall time of a regular clock, but allows the developer to move it
forward. This is needed to be able to test functionality around
lease expiration and other time based mechanisms.

To verify that the clock is usefull in tests, a single test in
client_merge_tests.go is converted to use TestCluster with a
HybridManualClock. To make the test possible we also need a
simple way to create a range with an expiration based lease. This
is done through the new TestCluster.ScratchRangeWithExpirationLease
function.

Release note: None

Co-authored-by: Alex Lunev <[email protected]>
  • Loading branch information
craig[bot] and lunevalex committed Jan 21, 2021
2 parents c54d450 + 634286a commit 3b6a17d
Show file tree
Hide file tree
Showing 10 changed files with 174 additions and 116 deletions.
170 changes: 67 additions & 103 deletions pkg/kv/kvserver/client_merge_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/rpc"
"github.com/cockroachdb/cockroach/pkg/rpc/nodedialer"
"github.com/cockroachdb/cockroach/pkg/server"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/storage"
"github.com/cockroachdb/cockroach/pkg/storage/enginepb"
Expand Down Expand Up @@ -243,14 +244,11 @@ func TestStoreRangeMergeWithData(t *testing.T) {

func mergeWithData(t *testing.T, retries int64) {
ctx := context.Background()
storeCfg := kvserver.TestStoreConfig(nil)
storeCfg.TestingKnobs.DisableReplicateQueue = true
storeCfg.TestingKnobs.DisableMergeQueue = true
storeCfg.Clock = nil // manual clock

manualClock := hlc.NewHybridManualClock()
var store *kvserver.Store
// Maybe inject some retryable errors when the merge transaction commits.
var mtc *multiTestContext
storeCfg.TestingKnobs.TestingRequestFilter = func(_ context.Context, ba roachpb.BatchRequest) *roachpb.Error {
testingRequestFilter := func(_ context.Context, ba roachpb.BatchRequest) *roachpb.Error {
for _, req := range ba.Requests {
if et := req.GetEndTxn(); et != nil && et.InternalCommitTrigger.GetMergeTrigger() != nil {
if atomic.AddInt64(&retries, -1) >= 0 {
Expand All @@ -263,148 +261,114 @@ func mergeWithData(t *testing.T, retries int64) {
// Subsume can execute. This triggers an unusual code path where the
// lease acquisition, not Subsume, notices the merge and installs a
// mergeComplete channel on the replica.
mtc.advanceClock(ctx)
manualClock.Increment(store.GetStoreConfig().LeaseExpiration())
}
}
return nil
}

mtc = &multiTestContext{
storeConfig: &storeCfg,
// This test was written before the multiTestContext started creating many
// system ranges at startup, and hasn't been update to take that into
// account.
startWithSingleRange: true,
}
serv, _, _ := serverutils.StartServer(t, base.TestServerArgs{
Knobs: base.TestingKnobs{
Store: &kvserver.StoreTestingKnobs{
DisableMergeQueue: true,
DisableSplitQueue: true,
TestingRequestFilter: testingRequestFilter,
},
Server: &server.TestingKnobs{
ClockSource: manualClock.UnixNano,
},
},
})
s := serv.(*server.TestServer)
defer s.Stopper().Stop(ctx)
store, err := s.Stores().GetStore(s.GetFirstStoreID())
require.NoError(t, err)

var store1, store2 *kvserver.Store
mtc.Start(t, 1)
store1, store2 = mtc.stores[0], mtc.stores[0]
defer mtc.Stop()
scratchKey, err := s.ScratchRangeWithExpirationLease()
repl := store.LookupReplica(roachpb.RKey(scratchKey))
require.NoError(t, err)

lhsDesc, rhsDesc, pErr := createSplitRanges(ctx, store1)
if pErr != nil {
t.Fatal(pErr)
}
lhsDesc, rhsDesc, pErr := s.SplitRange(scratchKey.Next().Next())
require.NoError(t, pErr)

content := []byte("testing!")

// Write some values left and right of the proposed split key.
pArgs := putArgs(roachpb.Key("aaa"), content)
if _, pErr := kv.SendWrapped(ctx, store1.TestSender(), pArgs); pErr != nil {
t.Fatal(pErr)
}
pArgs = putArgs(roachpb.Key("ccc"), content)
if _, pErr := kv.SendWrappedWith(ctx, store2.TestSender(), roachpb.Header{
RangeID: rhsDesc.RangeID,
}, pArgs); pErr != nil {
t.Fatal(pErr)
}

// Confirm the values are there.
gArgs := getArgs(roachpb.Key("aaa"))
if reply, pErr := kv.SendWrapped(ctx, store1.TestSender(), gArgs); pErr != nil {
t.Fatal(pErr)
} else if replyBytes, err := reply.(*roachpb.GetResponse).Value.GetBytes(); err != nil {
t.Fatal(err)
} else if !bytes.Equal(replyBytes, content) {
t.Fatalf("actual value %q did not match expected value %q", replyBytes, content)
put := func(key roachpb.Key, rangeID roachpb.RangeID, value []byte) {
pArgs := putArgs(key, value)
if _, pErr := kv.SendWrappedWith(ctx, store.TestSender(), roachpb.Header{
RangeID: rangeID,
}, pArgs); pErr != nil {
t.Fatal(pErr)
}
}
gArgs = getArgs(roachpb.Key("ccc"))
if reply, pErr := kv.SendWrappedWith(ctx, store2.TestSender(), roachpb.Header{
RangeID: rhsDesc.RangeID,
}, gArgs); pErr != nil {
t.Fatal(pErr)
} else if replyBytes, err := reply.(*roachpb.GetResponse).Value.GetBytes(); err != nil {
t.Fatal(err)
} else if !bytes.Equal(replyBytes, content) {
t.Fatalf("actual value %q did not match expected value %q", replyBytes, content)

verify := func(key roachpb.Key, rangeID roachpb.RangeID, value []byte) {
// Confirm the values are there.
gArgs := getArgs(key)
if reply, pErr := kv.SendWrappedWith(ctx, store.TestSender(), roachpb.Header{
RangeID: rangeID,
}, gArgs); pErr != nil {
} else if replyBytes, err := reply.(*roachpb.GetResponse).Value.GetBytes(); err != nil {
t.Fatal(err)
} else if !bytes.Equal(replyBytes, value) {
t.Fatalf("actual value %q did not match expected value %q", replyBytes, content)
}
}

put(lhsDesc.StartKey.Next().AsRawKey(), lhsDesc.RangeID, content)
put(rhsDesc.StartKey.Next().AsRawKey(), rhsDesc.RangeID, content)

verify(lhsDesc.StartKey.Next().AsRawKey(), lhsDesc.RangeID, content)
verify(rhsDesc.StartKey.Next().AsRawKey(), rhsDesc.RangeID, content)

// Merge the b range back into the a range.
args := adminMergeArgs(lhsDesc.StartKey.AsRawKey())
if _, pErr := kv.SendWrapped(ctx, store1.TestSender(), args); pErr != nil {
if _, pErr := kv.SendWrapped(ctx, store.TestSender(), args); pErr != nil {
t.Fatal(pErr)
}

// Verify no intents remains on range descriptor keys.
for _, key := range []roachpb.Key{keys.RangeDescriptorKey(lhsDesc.StartKey), keys.RangeDescriptorKey(rhsDesc.StartKey)} {
if _, _, err := storage.MVCCGet(
ctx, store1.Engine(), key, store1.Clock().Now(), storage.MVCCGetOptions{},
ctx, store.Engine(), key, store.Clock().Now(), storage.MVCCGetOptions{},
); err != nil {
t.Fatal(err)
}
}

// Verify the merge by looking up keys from both ranges.
lhsRepl := store1.LookupReplica(roachpb.RKey("a"))
rhsRepl := store1.LookupReplica(roachpb.RKey("c"))
lhsRepl := store.LookupReplica(lhsDesc.StartKey.Next())
rhsRepl := store.LookupReplica(rhsDesc.StartKey.Next())

if lhsRepl != rhsRepl {
t.Fatalf("ranges were not merged %+v=%+v", lhsRepl.Desc(), rhsRepl.Desc())
}
if startKey := lhsRepl.Desc().StartKey; !bytes.Equal(startKey, roachpb.RKeyMin) {
if startKey := lhsRepl.Desc().StartKey; !bytes.Equal(startKey, repl.Desc().StartKey) {
t.Fatalf("The start key is not equal to KeyMin %q=%q", startKey, roachpb.RKeyMin)
}
if endKey := rhsRepl.Desc().EndKey; !bytes.Equal(endKey, roachpb.RKeyMax) {
if endKey := rhsRepl.Desc().EndKey; !bytes.Equal(endKey, repl.Desc().EndKey) {
t.Fatalf("The end key is not equal to KeyMax %q=%q", endKey, roachpb.RKeyMax)
}

// Try to get values from after the merge.
gArgs = getArgs(roachpb.Key("aaa"))
if reply, pErr := kv.SendWrapped(ctx, store1.TestSender(), gArgs); pErr != nil {
t.Fatal(pErr)
} else if replyBytes, err := reply.(*roachpb.GetResponse).Value.GetBytes(); err != nil {
t.Fatal(err)
} else if !bytes.Equal(replyBytes, content) {
t.Fatalf("actual value %q did not match expected value %q", replyBytes, content)
}
gArgs = getArgs(roachpb.Key("ccc"))
if reply, pErr := kv.SendWrappedWith(ctx, store1.TestSender(), roachpb.Header{
RangeID: rhsRepl.RangeID,
}, gArgs); pErr != nil {
t.Fatal(pErr)
} else if replyBytes, err := reply.(*roachpb.GetResponse).Value.GetBytes(); err != nil {
t.Fatal(err)
} else if !bytes.Equal(replyBytes, content) {
t.Fatalf("actual value %q did not match expected value %q", replyBytes, content)
}
verify(lhsDesc.StartKey.Next().AsRawKey(), lhsRepl.RangeID, content)
verify(rhsDesc.StartKey.Next().AsRawKey(), rhsRepl.RangeID, content)

newContent := []byte("testing!better!")
// Put new values after the merge on both sides.
pArgs = putArgs(roachpb.Key("aaaa"), content)
if _, pErr := kv.SendWrapped(ctx, store1.TestSender(), pArgs); pErr != nil {
t.Fatal(pErr)
}
pArgs = putArgs(roachpb.Key("cccc"), content)
if _, pErr := kv.SendWrappedWith(ctx, store1.TestSender(), roachpb.Header{
RangeID: rhsRepl.RangeID,
}, pArgs); pErr != nil {
t.Fatal(pErr)
}
put(lhsDesc.StartKey.Next().AsRawKey(), lhsRepl.RangeID, newContent)
put(rhsDesc.StartKey.Next().AsRawKey(), rhsRepl.RangeID, newContent)

// Try to get the newly placed values.
gArgs = getArgs(roachpb.Key("aaaa"))
if reply, pErr := kv.SendWrapped(ctx, store1.TestSender(), gArgs); pErr != nil {
t.Fatal(pErr)
} else if replyBytes, err := reply.(*roachpb.GetResponse).Value.GetBytes(); err != nil {
t.Fatal(err)
} else if !bytes.Equal(replyBytes, content) {
t.Fatalf("actual value %q did not match expected value %q", replyBytes, content)
}
gArgs = getArgs(roachpb.Key("cccc"))
if reply, pErr := kv.SendWrapped(ctx, store1.TestSender(), gArgs); pErr != nil {
t.Fatal(pErr)
} else if replyBytes, err := reply.(*roachpb.GetResponse).Value.GetBytes(); err != nil {
t.Fatal(err)
} else if !bytes.Equal(replyBytes, content) {
t.Fatalf("actual value %q did not match expected value %q", replyBytes, content)
}
verify(lhsDesc.StartKey.Next().AsRawKey(), lhsRepl.RangeID, newContent)
verify(rhsDesc.StartKey.Next().AsRawKey(), rhsRepl.RangeID, newContent)

gArgs = getArgs(roachpb.Key("cccc"))
if _, pErr := kv.SendWrappedWith(ctx, store2, roachpb.Header{
gArgs := getArgs(lhsDesc.StartKey.Next().AsRawKey())
if _, pErr := kv.SendWrappedWith(ctx, store, roachpb.Header{
RangeID: rhsDesc.RangeID,
}, gArgs); !testutils.IsPError(
pErr, `r2 was not found`,
pErr, `was not found on s`,
) {
t.Fatalf("expected get on rhs to fail after merge, but got err=%v", pErr)
}
Expand Down
6 changes: 5 additions & 1 deletion pkg/kv/kvserver/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -791,6 +791,11 @@ func (sc *StoreConfig) SetDefaults() {
}
}

// GetStoreConfig exposes the config used for this store.
func (s *Store) GetStoreConfig() *StoreConfig {
return &s.cfg
}

// LeaseExpiration returns an int64 to increment a manual clock with to
// make sure that all active range leases expire.
func (sc *StoreConfig) LeaseExpiration() int64 {
Expand Down Expand Up @@ -1656,7 +1661,6 @@ func (s *Store) startGossip() {
_, pErr := repl.getLeaseForGossip(ctx)
return pErr.GoError()
}

gossipFns := []struct {
key roachpb.Key
fn func(context.Context, *Replica) error
Expand Down
4 changes: 4 additions & 0 deletions pkg/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,10 @@ func NewServer(cfg Config, stopper *stop.Stopper) (*Server, error) {
return nil, errors.Wrap(err, "instantiating clock source")
}
clock = hlc.NewClock(clockSrc.UnixNano, time.Duration(cfg.MaxOffset))
} else if cfg.TestingKnobs.Server != nil &&
cfg.TestingKnobs.Server.(*TestingKnobs).ClockSource != nil {
clock = hlc.NewClock(cfg.TestingKnobs.Server.(*TestingKnobs).ClockSource,
time.Duration(cfg.MaxOffset))
} else {
clock = hlc.NewClock(hlc.UnixNano, time.Duration(cfg.MaxOffset))
}
Expand Down
3 changes: 3 additions & 0 deletions pkg/server/testing_knobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,9 @@ type TestingKnobs struct {
// StickyEngineRegistry manages the lifecycle of sticky in memory engines,
// which can be enabled via base.StoreSpec.StickyInMemoryEngineID.
StickyEngineRegistry StickyInMemEnginesRegistry
// Clock Source used to an inject a custom clock for testing the server. It is
// typically either an hlc.HybridManualClock or hlc.ManualClock.
ClockSource func() int64
}

// ModuleTestingKnobs is part of the base.ModuleTestingKnobs interface.
Expand Down
16 changes: 13 additions & 3 deletions pkg/server/testserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
package server

import (
"bytes"
"context"
"fmt"
"net/http"
Expand Down Expand Up @@ -1338,9 +1339,6 @@ func (ts *TestServer) ForceTableGC(

// ScratchRangeEx splits off a range suitable to be used as KV scratch space.
// (it doesn't overlap system spans or SQL tables).
//
// Calling this multiple times is undefined (but see TestCluster.ScratchRange()
// which is idempotent).
func (ts *TestServer) ScratchRangeEx() (roachpb.RangeDescriptor, error) {
scratchKey := keys.TableDataMax
_, rngDesc, err := ts.SplitRange(scratchKey)
Expand All @@ -1360,6 +1358,18 @@ func (ts *TestServer) ScratchRange() (roachpb.Key, error) {
return desc.StartKey.AsRawKey(), nil
}

// ScratchRangeWithExpirationLease is like ScratchRange but creates a range with
// an expiration based lease.
func (ts *TestServer) ScratchRangeWithExpirationLease() (roachpb.Key, error) {
scratchKey := roachpb.Key(bytes.Join([][]byte{keys.SystemPrefix,
roachpb.RKey("\x00aaa-testing")}, nil))
_, _, err := ts.SplitRange(scratchKey)
if err != nil {
return nil, err
}
return scratchKey, nil
}

// MetricsRecorder periodically records node-level and store-level metrics.
func (ts *TestServer) MetricsRecorder() *status.MetricsRecorder {
return ts.node.recorder
Expand Down
24 changes: 15 additions & 9 deletions pkg/testutils/testcluster/testcluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,11 +48,10 @@ import (
// analogous to TestServer, but with control over range replication and join
// flags.
type TestCluster struct {
Servers []*server.TestServer
Conns []*gosql.DB
stopper *stop.Stopper
scratchRangeKey roachpb.Key
mu struct {
Servers []*server.TestServer
Conns []*gosql.DB
stopper *stop.Stopper
mu struct {
syncutil.Mutex
serverStoppers []*stop.Stopper
}
Expand Down Expand Up @@ -856,14 +855,21 @@ func (tc *TestCluster) FindRangeLeaseHolder(
// kv scratch space (it doesn't overlap system spans or SQL tables). The range
// is lazily split off on the first call to ScratchRange.
func (tc *TestCluster) ScratchRange(t testing.TB) roachpb.Key {
if tc.scratchRangeKey != nil {
return tc.scratchRangeKey
}
scratchKey, err := tc.Servers[0].ScratchRange()
if err != nil {
t.Fatal(err)
}
tc.scratchRangeKey = scratchKey
return scratchKey
}

// ScratchRangeWithExpirationLease returns the start key of a span of keyspace
// suitable for use as kv scratch space and that has an expiration based lease.
// The range is lazily split off on the first call to ScratchRangeWithExpirationLease.
func (tc *TestCluster) ScratchRangeWithExpirationLease(t testing.TB) roachpb.Key {
scratchKey, err := tc.Servers[0].ScratchRangeWithExpirationLease()
if err != nil {
t.Fatal(err)
}
return scratchKey
}

Expand Down
22 changes: 22 additions & 0 deletions pkg/testutils/testcluster/testcluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -331,3 +331,25 @@ func TestRestart(t *testing.T) {
// Verify we can still read data.
tc.WaitForValues(t, roachpb.Key("b"), []int64{9, 9, 9})
}

func TestExpirationBasedLeases(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

ctx := context.Background()
tc := StartTestCluster(t, 1,
base.TestClusterArgs{
ReplicationMode: base.ReplicationManual,
})
defer tc.Stopper().Stop(ctx)

key := tc.ScratchRangeWithExpirationLease(t)
repl := tc.GetFirstStoreFromServer(t, 0).LookupReplica(roachpb.RKey(key))
lease, _ := repl.GetLease()
require.NotNil(t, lease.Expiration)

// Verify idempotence of ScratchRangeWithExpirationLease
keyAgain := tc.ScratchRangeWithExpirationLease(t)
replAgain := tc.GetFirstStoreFromServer(t, 0).LookupReplica(roachpb.RKey(keyAgain))
require.Equal(t, repl, replAgain)
}
1 change: 1 addition & 0 deletions pkg/util/hlc/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ go_test(
"//pkg/util/timeutil",
"@com_github_cockroachdb_errors//:errors",
"@com_github_stretchr_testify//assert",
"@com_github_stretchr_testify//require",
],
)

Expand Down
Loading

0 comments on commit 3b6a17d

Please sign in to comment.