From 634286a6a52d05a96b1a6522a31797b1896678cf Mon Sep 17 00:00:00 2001 From: Alex Lunev Date: Fri, 15 Jan 2021 10:02:19 -0800 Subject: [PATCH] testcluster: Introduce a HybridManualClock and wire it into TestCluster Makes progress on #8299 This commit introduces a new type of ManualClock a HybridManualClock. and wires it into the TestCluster. This clock follows the physical wall time of a regular clock, but allows the developer to move it forward. This is needed to be able to test functionality around lease expiration and other time based mechanisms. To verify that the clock is usefull in tests, a single test in client_merge_tests.go is converted to use TestCluster with a HybridManualClock. To make the test possible we also need a simple way to create a range with an expiration based lease. This is done through the new TestCluster.ScratchRangeWithExpirationLease function. Release note: None --- pkg/kv/kvserver/client_merge_test.go | 170 +++++++----------- pkg/kv/kvserver/store.go | 6 +- pkg/server/server.go | 4 + pkg/server/testing_knobs.go | 3 + pkg/server/testserver.go | 16 +- pkg/testutils/testcluster/testcluster.go | 24 ++- pkg/testutils/testcluster/testcluster_test.go | 22 +++ pkg/util/hlc/BUILD.bazel | 1 + pkg/util/hlc/hlc.go | 25 +++ pkg/util/hlc/hlc_test.go | 19 ++ 10 files changed, 174 insertions(+), 116 deletions(-) diff --git a/pkg/kv/kvserver/client_merge_test.go b/pkg/kv/kvserver/client_merge_test.go index 787e579cf00f..def8f53ea915 100644 --- a/pkg/kv/kvserver/client_merge_test.go +++ b/pkg/kv/kvserver/client_merge_test.go @@ -40,6 +40,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/rpc" "github.com/cockroachdb/cockroach/pkg/rpc/nodedialer" + "github.com/cockroachdb/cockroach/pkg/server" "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/storage" "github.com/cockroachdb/cockroach/pkg/storage/enginepb" @@ -243,14 +244,11 @@ func TestStoreRangeMergeWithData(t *testing.T) { func mergeWithData(t *testing.T, retries int64) { ctx := context.Background() - storeCfg := kvserver.TestStoreConfig(nil) - storeCfg.TestingKnobs.DisableReplicateQueue = true - storeCfg.TestingKnobs.DisableMergeQueue = true - storeCfg.Clock = nil // manual clock + manualClock := hlc.NewHybridManualClock() + var store *kvserver.Store // Maybe inject some retryable errors when the merge transaction commits. - var mtc *multiTestContext - storeCfg.TestingKnobs.TestingRequestFilter = func(_ context.Context, ba roachpb.BatchRequest) *roachpb.Error { + testingRequestFilter := func(_ context.Context, ba roachpb.BatchRequest) *roachpb.Error { for _, req := range ba.Requests { if et := req.GetEndTxn(); et != nil && et.InternalCommitTrigger.GetMergeTrigger() != nil { if atomic.AddInt64(&retries, -1) >= 0 { @@ -263,148 +261,114 @@ func mergeWithData(t *testing.T, retries int64) { // Subsume can execute. This triggers an unusual code path where the // lease acquisition, not Subsume, notices the merge and installs a // mergeComplete channel on the replica. - mtc.advanceClock(ctx) + manualClock.Increment(store.GetStoreConfig().LeaseExpiration()) } } return nil } - mtc = &multiTestContext{ - storeConfig: &storeCfg, - // This test was written before the multiTestContext started creating many - // system ranges at startup, and hasn't been update to take that into - // account. - startWithSingleRange: true, - } + serv, _, _ := serverutils.StartServer(t, base.TestServerArgs{ + Knobs: base.TestingKnobs{ + Store: &kvserver.StoreTestingKnobs{ + DisableMergeQueue: true, + DisableSplitQueue: true, + TestingRequestFilter: testingRequestFilter, + }, + Server: &server.TestingKnobs{ + ClockSource: manualClock.UnixNano, + }, + }, + }) + s := serv.(*server.TestServer) + defer s.Stopper().Stop(ctx) + store, err := s.Stores().GetStore(s.GetFirstStoreID()) + require.NoError(t, err) - var store1, store2 *kvserver.Store - mtc.Start(t, 1) - store1, store2 = mtc.stores[0], mtc.stores[0] - defer mtc.Stop() + scratchKey, err := s.ScratchRangeWithExpirationLease() + repl := store.LookupReplica(roachpb.RKey(scratchKey)) + require.NoError(t, err) - lhsDesc, rhsDesc, pErr := createSplitRanges(ctx, store1) - if pErr != nil { - t.Fatal(pErr) - } + lhsDesc, rhsDesc, pErr := s.SplitRange(scratchKey.Next().Next()) + require.NoError(t, pErr) content := []byte("testing!") // Write some values left and right of the proposed split key. - pArgs := putArgs(roachpb.Key("aaa"), content) - if _, pErr := kv.SendWrapped(ctx, store1.TestSender(), pArgs); pErr != nil { - t.Fatal(pErr) - } - pArgs = putArgs(roachpb.Key("ccc"), content) - if _, pErr := kv.SendWrappedWith(ctx, store2.TestSender(), roachpb.Header{ - RangeID: rhsDesc.RangeID, - }, pArgs); pErr != nil { - t.Fatal(pErr) - } - // Confirm the values are there. - gArgs := getArgs(roachpb.Key("aaa")) - if reply, pErr := kv.SendWrapped(ctx, store1.TestSender(), gArgs); pErr != nil { - t.Fatal(pErr) - } else if replyBytes, err := reply.(*roachpb.GetResponse).Value.GetBytes(); err != nil { - t.Fatal(err) - } else if !bytes.Equal(replyBytes, content) { - t.Fatalf("actual value %q did not match expected value %q", replyBytes, content) + put := func(key roachpb.Key, rangeID roachpb.RangeID, value []byte) { + pArgs := putArgs(key, value) + if _, pErr := kv.SendWrappedWith(ctx, store.TestSender(), roachpb.Header{ + RangeID: rangeID, + }, pArgs); pErr != nil { + t.Fatal(pErr) + } } - gArgs = getArgs(roachpb.Key("ccc")) - if reply, pErr := kv.SendWrappedWith(ctx, store2.TestSender(), roachpb.Header{ - RangeID: rhsDesc.RangeID, - }, gArgs); pErr != nil { - t.Fatal(pErr) - } else if replyBytes, err := reply.(*roachpb.GetResponse).Value.GetBytes(); err != nil { - t.Fatal(err) - } else if !bytes.Equal(replyBytes, content) { - t.Fatalf("actual value %q did not match expected value %q", replyBytes, content) + + verify := func(key roachpb.Key, rangeID roachpb.RangeID, value []byte) { + // Confirm the values are there. + gArgs := getArgs(key) + if reply, pErr := kv.SendWrappedWith(ctx, store.TestSender(), roachpb.Header{ + RangeID: rangeID, + }, gArgs); pErr != nil { + } else if replyBytes, err := reply.(*roachpb.GetResponse).Value.GetBytes(); err != nil { + t.Fatal(err) + } else if !bytes.Equal(replyBytes, value) { + t.Fatalf("actual value %q did not match expected value %q", replyBytes, content) + } } + put(lhsDesc.StartKey.Next().AsRawKey(), lhsDesc.RangeID, content) + put(rhsDesc.StartKey.Next().AsRawKey(), rhsDesc.RangeID, content) + + verify(lhsDesc.StartKey.Next().AsRawKey(), lhsDesc.RangeID, content) + verify(rhsDesc.StartKey.Next().AsRawKey(), rhsDesc.RangeID, content) + // Merge the b range back into the a range. args := adminMergeArgs(lhsDesc.StartKey.AsRawKey()) - if _, pErr := kv.SendWrapped(ctx, store1.TestSender(), args); pErr != nil { + if _, pErr := kv.SendWrapped(ctx, store.TestSender(), args); pErr != nil { t.Fatal(pErr) } // Verify no intents remains on range descriptor keys. for _, key := range []roachpb.Key{keys.RangeDescriptorKey(lhsDesc.StartKey), keys.RangeDescriptorKey(rhsDesc.StartKey)} { if _, _, err := storage.MVCCGet( - ctx, store1.Engine(), key, store1.Clock().Now(), storage.MVCCGetOptions{}, + ctx, store.Engine(), key, store.Clock().Now(), storage.MVCCGetOptions{}, ); err != nil { t.Fatal(err) } } // Verify the merge by looking up keys from both ranges. - lhsRepl := store1.LookupReplica(roachpb.RKey("a")) - rhsRepl := store1.LookupReplica(roachpb.RKey("c")) + lhsRepl := store.LookupReplica(lhsDesc.StartKey.Next()) + rhsRepl := store.LookupReplica(rhsDesc.StartKey.Next()) if lhsRepl != rhsRepl { t.Fatalf("ranges were not merged %+v=%+v", lhsRepl.Desc(), rhsRepl.Desc()) } - if startKey := lhsRepl.Desc().StartKey; !bytes.Equal(startKey, roachpb.RKeyMin) { + if startKey := lhsRepl.Desc().StartKey; !bytes.Equal(startKey, repl.Desc().StartKey) { t.Fatalf("The start key is not equal to KeyMin %q=%q", startKey, roachpb.RKeyMin) } - if endKey := rhsRepl.Desc().EndKey; !bytes.Equal(endKey, roachpb.RKeyMax) { + if endKey := rhsRepl.Desc().EndKey; !bytes.Equal(endKey, repl.Desc().EndKey) { t.Fatalf("The end key is not equal to KeyMax %q=%q", endKey, roachpb.RKeyMax) } - // Try to get values from after the merge. - gArgs = getArgs(roachpb.Key("aaa")) - if reply, pErr := kv.SendWrapped(ctx, store1.TestSender(), gArgs); pErr != nil { - t.Fatal(pErr) - } else if replyBytes, err := reply.(*roachpb.GetResponse).Value.GetBytes(); err != nil { - t.Fatal(err) - } else if !bytes.Equal(replyBytes, content) { - t.Fatalf("actual value %q did not match expected value %q", replyBytes, content) - } - gArgs = getArgs(roachpb.Key("ccc")) - if reply, pErr := kv.SendWrappedWith(ctx, store1.TestSender(), roachpb.Header{ - RangeID: rhsRepl.RangeID, - }, gArgs); pErr != nil { - t.Fatal(pErr) - } else if replyBytes, err := reply.(*roachpb.GetResponse).Value.GetBytes(); err != nil { - t.Fatal(err) - } else if !bytes.Equal(replyBytes, content) { - t.Fatalf("actual value %q did not match expected value %q", replyBytes, content) - } + verify(lhsDesc.StartKey.Next().AsRawKey(), lhsRepl.RangeID, content) + verify(rhsDesc.StartKey.Next().AsRawKey(), rhsRepl.RangeID, content) + newContent := []byte("testing!better!") // Put new values after the merge on both sides. - pArgs = putArgs(roachpb.Key("aaaa"), content) - if _, pErr := kv.SendWrapped(ctx, store1.TestSender(), pArgs); pErr != nil { - t.Fatal(pErr) - } - pArgs = putArgs(roachpb.Key("cccc"), content) - if _, pErr := kv.SendWrappedWith(ctx, store1.TestSender(), roachpb.Header{ - RangeID: rhsRepl.RangeID, - }, pArgs); pErr != nil { - t.Fatal(pErr) - } + put(lhsDesc.StartKey.Next().AsRawKey(), lhsRepl.RangeID, newContent) + put(rhsDesc.StartKey.Next().AsRawKey(), rhsRepl.RangeID, newContent) // Try to get the newly placed values. - gArgs = getArgs(roachpb.Key("aaaa")) - if reply, pErr := kv.SendWrapped(ctx, store1.TestSender(), gArgs); pErr != nil { - t.Fatal(pErr) - } else if replyBytes, err := reply.(*roachpb.GetResponse).Value.GetBytes(); err != nil { - t.Fatal(err) - } else if !bytes.Equal(replyBytes, content) { - t.Fatalf("actual value %q did not match expected value %q", replyBytes, content) - } - gArgs = getArgs(roachpb.Key("cccc")) - if reply, pErr := kv.SendWrapped(ctx, store1.TestSender(), gArgs); pErr != nil { - t.Fatal(pErr) - } else if replyBytes, err := reply.(*roachpb.GetResponse).Value.GetBytes(); err != nil { - t.Fatal(err) - } else if !bytes.Equal(replyBytes, content) { - t.Fatalf("actual value %q did not match expected value %q", replyBytes, content) - } + verify(lhsDesc.StartKey.Next().AsRawKey(), lhsRepl.RangeID, newContent) + verify(rhsDesc.StartKey.Next().AsRawKey(), rhsRepl.RangeID, newContent) - gArgs = getArgs(roachpb.Key("cccc")) - if _, pErr := kv.SendWrappedWith(ctx, store2, roachpb.Header{ + gArgs := getArgs(lhsDesc.StartKey.Next().AsRawKey()) + if _, pErr := kv.SendWrappedWith(ctx, store, roachpb.Header{ RangeID: rhsDesc.RangeID, }, gArgs); !testutils.IsPError( - pErr, `r2 was not found`, + pErr, `was not found on s`, ) { t.Fatalf("expected get on rhs to fail after merge, but got err=%v", pErr) } diff --git a/pkg/kv/kvserver/store.go b/pkg/kv/kvserver/store.go index c083288c1fbd..c69976da7030 100644 --- a/pkg/kv/kvserver/store.go +++ b/pkg/kv/kvserver/store.go @@ -791,6 +791,11 @@ func (sc *StoreConfig) SetDefaults() { } } +// GetStoreConfig exposes the config used for this store. +func (s *Store) GetStoreConfig() *StoreConfig { + return &s.cfg +} + // LeaseExpiration returns an int64 to increment a manual clock with to // make sure that all active range leases expire. func (sc *StoreConfig) LeaseExpiration() int64 { @@ -1656,7 +1661,6 @@ func (s *Store) startGossip() { _, pErr := repl.getLeaseForGossip(ctx) return pErr.GoError() } - gossipFns := []struct { key roachpb.Key fn func(context.Context, *Replica) error diff --git a/pkg/server/server.go b/pkg/server/server.go index 8b5615dcf586..8d2c710cb8cf 100644 --- a/pkg/server/server.go +++ b/pkg/server/server.go @@ -232,6 +232,10 @@ func NewServer(cfg Config, stopper *stop.Stopper) (*Server, error) { return nil, errors.Wrap(err, "instantiating clock source") } clock = hlc.NewClock(clockSrc.UnixNano, time.Duration(cfg.MaxOffset)) + } else if cfg.TestingKnobs.Server != nil && + cfg.TestingKnobs.Server.(*TestingKnobs).ClockSource != nil { + clock = hlc.NewClock(cfg.TestingKnobs.Server.(*TestingKnobs).ClockSource, + time.Duration(cfg.MaxOffset)) } else { clock = hlc.NewClock(hlc.UnixNano, time.Duration(cfg.MaxOffset)) } diff --git a/pkg/server/testing_knobs.go b/pkg/server/testing_knobs.go index 2a6bb4046ec3..53169381e152 100644 --- a/pkg/server/testing_knobs.go +++ b/pkg/server/testing_knobs.go @@ -80,6 +80,9 @@ type TestingKnobs struct { // StickyEngineRegistry manages the lifecycle of sticky in memory engines, // which can be enabled via base.StoreSpec.StickyInMemoryEngineID. StickyEngineRegistry StickyInMemEnginesRegistry + // Clock Source used to an inject a custom clock for testing the server. It is + // typically either an hlc.HybridManualClock or hlc.ManualClock. + ClockSource func() int64 } // ModuleTestingKnobs is part of the base.ModuleTestingKnobs interface. diff --git a/pkg/server/testserver.go b/pkg/server/testserver.go index d3c6ee200c2d..5609ef7b3334 100644 --- a/pkg/server/testserver.go +++ b/pkg/server/testserver.go @@ -11,6 +11,7 @@ package server import ( + "bytes" "context" "fmt" "net/http" @@ -1338,9 +1339,6 @@ func (ts *TestServer) ForceTableGC( // ScratchRangeEx splits off a range suitable to be used as KV scratch space. // (it doesn't overlap system spans or SQL tables). -// -// Calling this multiple times is undefined (but see TestCluster.ScratchRange() -// which is idempotent). func (ts *TestServer) ScratchRangeEx() (roachpb.RangeDescriptor, error) { scratchKey := keys.TableDataMax _, rngDesc, err := ts.SplitRange(scratchKey) @@ -1360,6 +1358,18 @@ func (ts *TestServer) ScratchRange() (roachpb.Key, error) { return desc.StartKey.AsRawKey(), nil } +// ScratchRangeWithExpirationLease is like ScratchRange but creates a range with +// an expiration based lease. +func (ts *TestServer) ScratchRangeWithExpirationLease() (roachpb.Key, error) { + scratchKey := roachpb.Key(bytes.Join([][]byte{keys.SystemPrefix, + roachpb.RKey("\x00aaa-testing")}, nil)) + _, _, err := ts.SplitRange(scratchKey) + if err != nil { + return nil, err + } + return scratchKey, nil +} + // MetricsRecorder periodically records node-level and store-level metrics. func (ts *TestServer) MetricsRecorder() *status.MetricsRecorder { return ts.node.recorder diff --git a/pkg/testutils/testcluster/testcluster.go b/pkg/testutils/testcluster/testcluster.go index 072732d5ffa5..fef7e20bd830 100644 --- a/pkg/testutils/testcluster/testcluster.go +++ b/pkg/testutils/testcluster/testcluster.go @@ -48,11 +48,10 @@ import ( // analogous to TestServer, but with control over range replication and join // flags. type TestCluster struct { - Servers []*server.TestServer - Conns []*gosql.DB - stopper *stop.Stopper - scratchRangeKey roachpb.Key - mu struct { + Servers []*server.TestServer + Conns []*gosql.DB + stopper *stop.Stopper + mu struct { syncutil.Mutex serverStoppers []*stop.Stopper } @@ -856,14 +855,21 @@ func (tc *TestCluster) FindRangeLeaseHolder( // kv scratch space (it doesn't overlap system spans or SQL tables). The range // is lazily split off on the first call to ScratchRange. func (tc *TestCluster) ScratchRange(t testing.TB) roachpb.Key { - if tc.scratchRangeKey != nil { - return tc.scratchRangeKey - } scratchKey, err := tc.Servers[0].ScratchRange() if err != nil { t.Fatal(err) } - tc.scratchRangeKey = scratchKey + return scratchKey +} + +// ScratchRangeWithExpirationLease returns the start key of a span of keyspace +// suitable for use as kv scratch space and that has an expiration based lease. +// The range is lazily split off on the first call to ScratchRangeWithExpirationLease. +func (tc *TestCluster) ScratchRangeWithExpirationLease(t testing.TB) roachpb.Key { + scratchKey, err := tc.Servers[0].ScratchRangeWithExpirationLease() + if err != nil { + t.Fatal(err) + } return scratchKey } diff --git a/pkg/testutils/testcluster/testcluster_test.go b/pkg/testutils/testcluster/testcluster_test.go index 346e25ed8105..9947a9abca4d 100644 --- a/pkg/testutils/testcluster/testcluster_test.go +++ b/pkg/testutils/testcluster/testcluster_test.go @@ -331,3 +331,25 @@ func TestRestart(t *testing.T) { // Verify we can still read data. tc.WaitForValues(t, roachpb.Key("b"), []int64{9, 9, 9}) } + +func TestExpirationBasedLeases(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + ctx := context.Background() + tc := StartTestCluster(t, 1, + base.TestClusterArgs{ + ReplicationMode: base.ReplicationManual, + }) + defer tc.Stopper().Stop(ctx) + + key := tc.ScratchRangeWithExpirationLease(t) + repl := tc.GetFirstStoreFromServer(t, 0).LookupReplica(roachpb.RKey(key)) + lease, _ := repl.GetLease() + require.NotNil(t, lease.Expiration) + + // Verify idempotence of ScratchRangeWithExpirationLease + keyAgain := tc.ScratchRangeWithExpirationLease(t) + replAgain := tc.GetFirstStoreFromServer(t, 0).LookupReplica(roachpb.RKey(keyAgain)) + require.Equal(t, repl, replAgain) +} diff --git a/pkg/util/hlc/BUILD.bazel b/pkg/util/hlc/BUILD.bazel index b214591093f0..977ba40a672b 100644 --- a/pkg/util/hlc/BUILD.bazel +++ b/pkg/util/hlc/BUILD.bazel @@ -35,6 +35,7 @@ go_test( "//pkg/util/timeutil", "@com_github_cockroachdb_errors//:errors", "@com_github_stretchr_testify//assert", + "@com_github_stretchr_testify//require", ], ) diff --git a/pkg/util/hlc/hlc.go b/pkg/util/hlc/hlc.go index b870292e0b7a..b4049d69dcee 100644 --- a/pkg/util/hlc/hlc.go +++ b/pkg/util/hlc/hlc.go @@ -124,6 +124,31 @@ func (m *ManualClock) Set(nanos int64) { atomic.StoreInt64(&m.nanos, nanos) } +// HybridManualClock is a convenience type to facilitate +// creating a hybrid logical clock whose physical clock +// ticks with the wall clock, but that can be moved arbitrarily +// into the future. HybridManualClock is thread safe. +type HybridManualClock struct { + nanos int64 + physicalClock func() int64 +} + +// NewHybridManualClock returns a new instance, initialized with +// specified timestamp. +func NewHybridManualClock() *HybridManualClock { + return &HybridManualClock{nanos: 0, physicalClock: UnixNano} +} + +// UnixNano returns the underlying hybrid manual clock's timestamp. +func (m *HybridManualClock) UnixNano() int64 { + return atomic.LoadInt64(&m.nanos) + m.physicalClock() +} + +// Increment atomically increments the hybrid manual clock's timestamp. +func (m *HybridManualClock) Increment(incr int64) { + atomic.AddInt64(&m.nanos, incr) +} + // UnixNano returns the local machine's physical nanosecond // unix epoch timestamp as a convenience to create a HLC via // c := hlc.NewClock(hlc.UnixNano, ...). diff --git a/pkg/util/hlc/hlc_test.go b/pkg/util/hlc/hlc_test.go index bf2c07b01ffe..b11a61587726 100644 --- a/pkg/util/hlc/hlc_test.go +++ b/pkg/util/hlc/hlc_test.go @@ -25,6 +25,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/cockroachdb/errors" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" ) type Event uint8 @@ -358,6 +359,24 @@ func TestExampleManualClock(t *testing.T) { } } +// TestHybridManualClock test the basic functionality of the +// TestHybridManualClock. +func TestHybridManualClock(t *testing.T) { + m := NewHybridManualClock() + c := NewClock(m.UnixNano, time.Nanosecond) + + // We do a two sided test to make sure that the physical clock matches + // the hybrid value. Since we cant pull a value off both clocks at the same + // time, we use two LessOrEqual comparisons with reverse order, to establish + // that the values are roughly equal. + require.LessOrEqual(t, c.Now().WallTime, UnixNano()) + require.LessOrEqual(t, UnixNano(), c.Now().WallTime) + + m.Increment(10) + require.LessOrEqual(t, c.Now().WallTime, UnixNano()+10) + require.LessOrEqual(t, UnixNano()+10, c.Now().WallTime) +} + func TestHLCMonotonicityCheck(t *testing.T) { m := NewManualClock(100000) c := NewClock(m.UnixNano, 100*time.Nanosecond)