diff --git a/pkg/kv/kvserver/allocator/allocatorimpl/allocator_test.go b/pkg/kv/kvserver/allocator/allocatorimpl/allocator_test.go index c4c43afcd5d7..970c0721b23b 100644 --- a/pkg/kv/kvserver/allocator/allocatorimpl/allocator_test.go +++ b/pkg/kv/kvserver/allocator/allocatorimpl/allocator_test.go @@ -531,7 +531,7 @@ func mockStorePool( for _, storeID := range suspectedStoreIDs { liveNodeSet[roachpb.NodeID(storeID)] = livenesspb.NodeLivenessStatus_LIVE detail := storePool.GetStoreDetailLocked(storeID) - detail.LastUnavailable = storePool.Clock().Now().GoTime() + detail.LastUnavailable = storePool.Clock().Now() detail.Desc = &roachpb.StoreDescriptor{ StoreID: storeID, Node: roachpb.NodeDescriptor{NodeID: roachpb.NodeID(storeID)}, @@ -541,7 +541,7 @@ func mockStorePool( // Set the node liveness function using the set we constructed. // TODO(sarkesian): This override needs to be fixed to stop exporting this field. storePool.NodeLivenessFn = - func(nodeID roachpb.NodeID, now time.Time, threshold time.Duration) livenesspb.NodeLivenessStatus { + func(nodeID roachpb.NodeID, now hlc.Timestamp, threshold time.Duration) livenesspb.NodeLivenessStatus { if status, ok := liveNodeSet[nodeID]; ok { return status } @@ -841,7 +841,7 @@ func TestAllocatorReplaceDecommissioningReplica(t *testing.T) { gossiputil.NewStoreGossiper(g).GossipStores(storeDescriptors, t) // Override liveness of n3 to decommissioning so the only available target is s4. - oSp := storepool.NewOverrideStorePool(sp, func(nid roachpb.NodeID, now time.Time, timeUntilStoreDead time.Duration) livenesspb.NodeLivenessStatus { + oSp := storepool.NewOverrideStorePool(sp, func(nid roachpb.NodeID, now hlc.Timestamp, timeUntilStoreDead time.Duration) livenesspb.NodeLivenessStatus { if nid == roachpb.NodeID(3) { return livenesspb.NodeLivenessStatus_DECOMMISSIONING } @@ -899,7 +899,7 @@ func TestAllocatorReplaceFailsOnConstrainedDecommissioningReplica(t *testing.T) gossiputil.NewStoreGossiper(g).GossipStores(sameDCStores, t) // Override liveness of n3 to decommissioning so the only available target is s4. - oSp := storepool.NewOverrideStorePool(sp, func(nid roachpb.NodeID, now time.Time, timeUntilStoreDead time.Duration) livenesspb.NodeLivenessStatus { + oSp := storepool.NewOverrideStorePool(sp, func(nid roachpb.NodeID, now hlc.Timestamp, timeUntilStoreDead time.Duration) livenesspb.NodeLivenessStatus { if nid == roachpb.NodeID(3) { return livenesspb.NodeLivenessStatus_DECOMMISSIONING } @@ -6828,7 +6828,7 @@ func TestAllocatorComputeActionWithStorePoolRemoveDead(t *testing.T) { // Mark all dead nodes as alive, so we can override later. all := append(tcase.live, tcase.dead...) mockStorePool(sp, all, nil, nil, nil, nil, nil) - oSp := storepool.NewOverrideStorePool(sp, func(nid roachpb.NodeID, now time.Time, timeUntilStoreDead time.Duration) livenesspb.NodeLivenessStatus { + oSp := storepool.NewOverrideStorePool(sp, func(nid roachpb.NodeID, now hlc.Timestamp, timeUntilStoreDead time.Duration) livenesspb.NodeLivenessStatus { for _, deadStoreID := range tcase.dead { if nid == roachpb.NodeID(deadStoreID) { return livenesspb.NodeLivenessStatus_DEAD @@ -8502,7 +8502,7 @@ func TestAllocatorFullDisks(t *testing.T) { for i := 0; i < generations; i++ { // First loop through test stores and randomly add data. for j := 0; j < len(testStores); j++ { - if mockNodeLiveness.NodeLivenessFunc(roachpb.NodeID(j), time.Time{}, 0) == livenesspb.NodeLivenessStatus_DEAD { + if mockNodeLiveness.NodeLivenessFunc(roachpb.NodeID(j), hlc.Timestamp{}, 0) == livenesspb.NodeLivenessStatus_DEAD { continue } ts := &testStores[j] @@ -8529,7 +8529,7 @@ func TestAllocatorFullDisks(t *testing.T) { // Loop through each store a number of times and maybe rebalance. for j := 0; j < 10; j++ { for k := 0; k < len(testStores); k++ { - if mockNodeLiveness.NodeLivenessFunc(roachpb.NodeID(k), time.Time{}, 0) == livenesspb.NodeLivenessStatus_DEAD { + if mockNodeLiveness.NodeLivenessFunc(roachpb.NodeID(k), hlc.Timestamp{}, 0) == livenesspb.NodeLivenessStatus_DEAD { continue } ts := &testStores[k] @@ -8566,7 +8566,7 @@ func TestAllocatorFullDisks(t *testing.T) { // Simulate rocksdb compactions freeing up disk space. for j := 0; j < len(testStores); j++ { - if mockNodeLiveness.NodeLivenessFunc(roachpb.NodeID(j), time.Time{}, 0) != livenesspb.NodeLivenessStatus_DEAD { + if mockNodeLiveness.NodeLivenessFunc(roachpb.NodeID(j), hlc.Timestamp{}, 0) != livenesspb.NodeLivenessStatus_DEAD { ts := &testStores[j] if ts.Capacity.Available <= 0 { t.Errorf("testStore %d ran out of space during generation %d: %+v", j, i, ts.Capacity) diff --git a/pkg/kv/kvserver/allocator/storepool/BUILD.bazel b/pkg/kv/kvserver/allocator/storepool/BUILD.bazel index 73c8adc436fc..cac55e29e712 100644 --- a/pkg/kv/kvserver/allocator/storepool/BUILD.bazel +++ b/pkg/kv/kvserver/allocator/storepool/BUILD.bazel @@ -48,7 +48,6 @@ go_test( "//pkg/util/hlc", "//pkg/util/leaktest", "//pkg/util/log", - "//pkg/util/timeutil", "@com_github_cockroachdb_errors//:errors", "@com_github_kr_pretty//:pretty", "@com_github_stretchr_testify//require", diff --git a/pkg/kv/kvserver/allocator/storepool/override_store_pool.go b/pkg/kv/kvserver/allocator/storepool/override_store_pool.go index cc6338fc5700..339ba7657cd5 100644 --- a/pkg/kv/kvserver/allocator/storepool/override_store_pool.go +++ b/pkg/kv/kvserver/allocator/storepool/override_store_pool.go @@ -50,7 +50,7 @@ var _ AllocatorStorePool = &OverrideStorePool{} func OverrideNodeLivenessFunc( overrides map[roachpb.NodeID]livenesspb.NodeLivenessStatus, realNodeLivenessFunc NodeLivenessFunc, ) NodeLivenessFunc { - return func(nid roachpb.NodeID, now time.Time, timeUntilStoreDead time.Duration) livenesspb.NodeLivenessStatus { + return func(nid roachpb.NodeID, now hlc.Timestamp, timeUntilStoreDead time.Duration) livenesspb.NodeLivenessStatus { if override, ok := overrides[nid]; ok { return override } diff --git a/pkg/kv/kvserver/allocator/storepool/override_store_pool_test.go b/pkg/kv/kvserver/allocator/storepool/override_store_pool_test.go index 310043395c1a..3876afa3149a 100644 --- a/pkg/kv/kvserver/allocator/storepool/override_store_pool_test.go +++ b/pkg/kv/kvserver/allocator/storepool/override_store_pool_test.go @@ -20,6 +20,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/testutils/gossiputil" + "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/stretchr/testify/require" @@ -42,7 +43,7 @@ func TestOverrideStorePoolStatusString(t *testing.T) { sg := gossiputil.NewStoreGossiper(g) livenessOverrides := make(map[roachpb.NodeID]livenesspb.NodeLivenessStatus) - sp := NewOverrideStorePool(testStorePool, func(nid roachpb.NodeID, now time.Time, timeUntilStoreDead time.Duration) livenesspb.NodeLivenessStatus { + sp := NewOverrideStorePool(testStorePool, func(nid roachpb.NodeID, now hlc.Timestamp, timeUntilStoreDead time.Duration) livenesspb.NodeLivenessStatus { if overriddenLiveness, ok := livenessOverrides[nid]; ok { return overriddenLiveness } @@ -123,7 +124,7 @@ func TestOverrideStorePoolDecommissioningReplicas(t *testing.T) { sg := gossiputil.NewStoreGossiper(g) livenessOverrides := make(map[roachpb.NodeID]livenesspb.NodeLivenessStatus) - sp := NewOverrideStorePool(testStorePool, func(nid roachpb.NodeID, now time.Time, timeUntilStoreDead time.Duration) livenesspb.NodeLivenessStatus { + sp := NewOverrideStorePool(testStorePool, func(nid roachpb.NodeID, now hlc.Timestamp, timeUntilStoreDead time.Duration) livenesspb.NodeLivenessStatus { if overriddenLiveness, ok := livenessOverrides[nid]; ok { return overriddenLiveness } @@ -240,7 +241,7 @@ func TestOverrideStorePoolGetStoreList(t *testing.T) { sg := gossiputil.NewStoreGossiper(g) livenessOverrides := make(map[roachpb.NodeID]livenesspb.NodeLivenessStatus) - sp := NewOverrideStorePool(testStorePool, func(nid roachpb.NodeID, now time.Time, timeUntilStoreDead time.Duration) livenesspb.NodeLivenessStatus { + sp := NewOverrideStorePool(testStorePool, func(nid roachpb.NodeID, now hlc.Timestamp, timeUntilStoreDead time.Duration) livenesspb.NodeLivenessStatus { if overriddenLiveness, ok := livenessOverrides[nid]; ok { return overriddenLiveness } @@ -335,7 +336,7 @@ func TestOverrideStorePoolGetStoreList(t *testing.T) { // Set suspectedStore as suspected. testStorePool.DetailsMu.Lock() - testStorePool.DetailsMu.StoreDetails[suspectedStore.StoreID].LastUnavailable = testStorePool.clock.Now().GoTime() + testStorePool.DetailsMu.StoreDetails[suspectedStore.StoreID].LastUnavailable = testStorePool.clock.Now() testStorePool.DetailsMu.Unlock() // No filter or limited set of store IDs. diff --git a/pkg/kv/kvserver/allocator/storepool/store_pool.go b/pkg/kv/kvserver/allocator/storepool/store_pool.go index d6ba4881ea47..c6bab4ecc590 100644 --- a/pkg/kv/kvserver/allocator/storepool/store_pool.go +++ b/pkg/kv/kvserver/allocator/storepool/store_pool.go @@ -113,7 +113,7 @@ type NodeCountFunc func() int // not the node is live. A node is considered dead if its liveness record has // expired by more than TimeUntilStoreDead. type NodeLivenessFunc func( - nid roachpb.NodeID, now time.Time, timeUntilStoreDead time.Duration, + nid roachpb.NodeID, now hlc.Timestamp, timeUntilStoreDead time.Duration, ) livenesspb.NodeLivenessStatus // MakeStorePoolNodeLivenessFunc returns a function which determines @@ -121,7 +121,7 @@ type NodeLivenessFunc func( // NodeLiveness. func MakeStorePoolNodeLivenessFunc(nodeLiveness *liveness.NodeLiveness) NodeLivenessFunc { return func( - nodeID roachpb.NodeID, now time.Time, timeUntilStoreDead time.Duration, + nodeID roachpb.NodeID, now hlc.Timestamp, timeUntilStoreDead time.Duration, ) livenesspb.NodeLivenessStatus { liveness, ok := nodeLiveness.GetLiveness(nodeID) if !ok { @@ -160,7 +160,7 @@ func MakeStorePoolNodeLivenessFunc(nodeLiveness *liveness.NodeLiveness) NodeLive // ideally we should remove usage of NodeLivenessStatus altogether. See #50707 // for more details. func LivenessStatus( - l livenesspb.Liveness, now time.Time, deadThreshold time.Duration, + l livenesspb.Liveness, now hlc.Timestamp, deadThreshold time.Duration, ) livenesspb.NodeLivenessStatus { // If we don't have a liveness expiration time, treat the status as unknown. // This is different than unavailable as it doesn't transition through being @@ -195,16 +195,16 @@ type StoreDetail struct { Desc *roachpb.StoreDescriptor // ThrottledUntil is when a throttled store can be considered available again // due to a failed or declined snapshot. - ThrottledUntil time.Time + ThrottledUntil hlc.Timestamp // throttledBecause is set to the most recent reason for which a store was // marked as throttled. throttledBecause string // LastUpdatedTime is set when a store is first consulted and every time // gossip arrives for a store. - LastUpdatedTime time.Time + LastUpdatedTime hlc.Timestamp // LastUnavailable is set when it's detected that a store was unavailable, // i.e. failed liveness. - LastUnavailable time.Time + LastUnavailable hlc.Timestamp } // storeStatus is the current status of a store. @@ -238,7 +238,10 @@ const ( ) func (sd *StoreDetail) status( - now time.Time, deadThreshold time.Duration, nl NodeLivenessFunc, suspectDuration time.Duration, + now hlc.Timestamp, + deadThreshold time.Duration, + nl NodeLivenessFunc, + suspectDuration time.Duration, ) storeStatus { // During normal operation, we expect the state transitions for stores to look like the following: // @@ -265,7 +268,7 @@ func (sd *StoreDetail) status( // within the liveness threshold. Note that LastUpdatedTime is set // when the store detail is created and will have a non-zero value // even before the first gossip arrives for a store. - deadAsOf := sd.LastUpdatedTime.Add(deadThreshold) + deadAsOf := sd.LastUpdatedTime.AddDuration(deadThreshold) if now.After(deadAsOf) { sd.LastUnavailable = now return storeStatusDead @@ -305,7 +308,7 @@ func (sd *StoreDetail) status( // Check whether the store is currently suspect. We measure that by // looking at the time it was last unavailable making sure we have not seen any // failures for a period of time defined by StoreSuspectDuration. - if sd.LastUnavailable.Add(suspectDuration).After(now) { + if sd.LastUnavailable.AddDuration(suspectDuration).After(now) { return storeStatusSuspect } @@ -443,7 +446,7 @@ type StorePool struct { gossip *gossip.Gossip nodeCountFn NodeCountFunc NodeLivenessFn NodeLivenessFunc - startTime time.Time + startTime hlc.Timestamp deterministic bool // We use separate mutexes for storeDetails and nodeLocalities because the @@ -492,7 +495,7 @@ func NewStorePool( gossip: g, nodeCountFn: nodeCountFn, NodeLivenessFn: nodeLivenessFn, - startTime: clock.PhysicalTime(), + startTime: clock.Now(), deterministic: deterministic, } sp.DetailsMu.StoreDetails = make(map[roachpb.StoreID]*StoreDetail) @@ -523,7 +526,7 @@ func (sp *StorePool) statusString(nl NodeLivenessFunc) string { sort.Sort(ids) var buf bytes.Buffer - now := sp.clock.Now().GoTime() + now := sp.clock.Now() timeUntilStoreDead := TimeUntilStoreDead.Get(&sp.st.SV) timeAfterStoreSuspect := TimeAfterStoreSuspect.Get(&sp.st.SV) @@ -538,9 +541,8 @@ func (sp *StorePool) statusString(nl NodeLivenessFunc) string { fmt.Fprintf(&buf, ": range-count=%d fraction-used=%.2f", detail.Desc.Capacity.RangeCount, detail.Desc.Capacity.FractionUsed()) } - throttled := detail.ThrottledUntil.Sub(now) - if throttled > 0 { - fmt.Fprintf(&buf, " [throttled=%.1fs]", throttled.Seconds()) + if detail.ThrottledUntil.After(now) { + fmt.Fprintf(&buf, " [throttled=%v]", detail.ThrottledUntil) } _, _ = buf.WriteString("\n") } @@ -569,7 +571,7 @@ func (sp *StorePool) storeDescriptorUpdate(storeDesc roachpb.StoreDescriptor) { storeID := storeDesc.StoreID curCapacity := storeDesc.Capacity - now := sp.clock.PhysicalTime() + now := sp.clock.Now() sp.DetailsMu.Lock() detail := sp.GetStoreDetailLocked(storeID) @@ -815,9 +817,9 @@ func (sp *StorePool) decommissioningReplicasWithLiveness( sp.DetailsMu.Lock() defer sp.DetailsMu.Unlock() - // NB: We use clock.Now().GoTime() instead of clock.PhysicalTime() is order to + // NB: We use clock.Now() instead of clock.PhysicalTime() is order to // take clock signals from remote nodes into consideration. - now := sp.clock.Now().GoTime() + now := sp.clock.Now() timeUntilStoreDead := TimeUntilStoreDead.Get(&sp.st.SV) timeAfterStoreSuspect := TimeAfterStoreSuspect.Get(&sp.st.SV) @@ -859,12 +861,12 @@ func (sp *StorePool) IsDead(storeID roachpb.StoreID) (bool, time.Duration, error if !ok { return false, 0, errors.Errorf("store %d was not found", storeID) } - // NB: We use clock.Now().GoTime() instead of clock.PhysicalTime() is order to + // NB: We use clock.Now() instead of clock.PhysicalTime() is order to // take clock signals from remote nodes into consideration. - now := sp.clock.Now().GoTime() + now := sp.clock.Now() timeUntilStoreDead := TimeUntilStoreDead.Get(&sp.st.SV) - deadAsOf := sd.LastUpdatedTime.Add(timeUntilStoreDead) + deadAsOf := sd.LastUpdatedTime.AddDuration(timeUntilStoreDead) if now.After(deadAsOf) { return true, 0, nil } @@ -873,7 +875,7 @@ func (sp *StorePool) IsDead(storeID roachpb.StoreID) (bool, time.Duration, error if sd.Desc == nil { return false, 0, errors.Errorf("store %d status unknown, cant tell if it's dead or alive", storeID) } - return false, deadAsOf.Sub(now), nil + return false, deadAsOf.GoTime().Sub(now.GoTime()), nil } // IsUnknown returns true if the given store's status is `storeStatusUnknown` @@ -935,9 +937,9 @@ func (sp *StorePool) storeStatus( if !ok { return storeStatusUnknown, errors.Errorf("store %d was not found", storeID) } - // NB: We use clock.Now().GoTime() instead of clock.PhysicalTime() is order to + // NB: We use clock.Now() instead of clock.PhysicalTime() is order to // take clock signals from remote nodes into consideration. - now := sp.clock.Now().GoTime() + now := sp.clock.Now() timeUntilStoreDead := TimeUntilStoreDead.Get(&sp.st.SV) timeAfterStoreSuspect := TimeAfterStoreSuspect.Get(&sp.st.SV) return sd.status(now, timeUntilStoreDead, nl, timeAfterStoreSuspect), nil @@ -971,7 +973,7 @@ func (sp *StorePool) liveAndDeadReplicasWithLiveness( sp.DetailsMu.Lock() defer sp.DetailsMu.Unlock() - now := sp.clock.Now().GoTime() + now := sp.clock.Now() timeUntilStoreDead := TimeUntilStoreDead.Get(&sp.st.SV) timeAfterStoreSuspect := TimeAfterStoreSuspect.Get(&sp.st.SV) @@ -1247,7 +1249,7 @@ func (sp *StorePool) getStoreListFromIDsLocked( var throttled ThrottledStoreReasons var storeDescriptors []roachpb.StoreDescriptor - now := sp.clock.Now().GoTime() + now := sp.clock.Now() timeUntilStoreDead := TimeUntilStoreDead.Get(&sp.st.SV) timeAfterStoreSuspect := TimeAfterStoreSuspect.Get(&sp.st.SV) @@ -1311,7 +1313,7 @@ func (sp *StorePool) Throttle(reason ThrottleReason, why string, storeID roachpb switch reason { case ThrottleFailed: timeout := FailedReservationsTimeout.Get(&sp.st.SV) - detail.ThrottledUntil = sp.clock.PhysicalTime().Add(timeout) + detail.ThrottledUntil = sp.clock.Now().AddDuration(timeout) if log.V(2) { ctx := sp.AnnotateCtx(context.TODO()) log.Infof(ctx, "snapshot failed (%s), s%d will be throttled for %s until %s", diff --git a/pkg/kv/kvserver/allocator/storepool/store_pool_test.go b/pkg/kv/kvserver/allocator/storepool/store_pool_test.go index 9c1910be9f84..b5ab8cd5bfc7 100644 --- a/pkg/kv/kvserver/allocator/storepool/store_pool_test.go +++ b/pkg/kv/kvserver/allocator/storepool/store_pool_test.go @@ -26,7 +26,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/log" - "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/cockroachdb/errors" "github.com/kr/pretty" "github.com/stretchr/testify/require" @@ -206,9 +205,9 @@ func TestStorePoolGetStoreList(t *testing.T) { mnl.SetNodeStatus(deadStore.Node.NodeID, livenesspb.NodeLivenessStatus_DEAD) sp.DetailsMu.Lock() // Set declinedStore as throttled. - sp.DetailsMu.StoreDetails[declinedStore.StoreID].ThrottledUntil = sp.clock.Now().GoTime().Add(time.Hour) + sp.DetailsMu.StoreDetails[declinedStore.StoreID].ThrottledUntil = sp.clock.Now().AddDuration(time.Hour) // Set suspectedStore as suspected. - sp.DetailsMu.StoreDetails[suspectedStore.StoreID].LastUnavailable = sp.clock.Now().GoTime() + sp.DetailsMu.StoreDetails[suspectedStore.StoreID].LastUnavailable = sp.clock.Now() sp.DetailsMu.Unlock() // No filter or limited set of store IDs. @@ -589,13 +588,13 @@ func TestStorePoolThrottle(t *testing.T) { sg := gossiputil.NewStoreGossiper(g) sg.GossipStores(uniqueStore, t) - expected := sp.clock.Now().GoTime().Add(FailedReservationsTimeout.Get(&sp.st.SV)) + expected := sp.clock.Now().AddDuration(FailedReservationsTimeout.Get(&sp.st.SV)) sp.Throttle(ThrottleFailed, "", 1) sp.DetailsMu.Lock() detail := sp.GetStoreDetailLocked(1) sp.DetailsMu.Unlock() - if !detail.ThrottledUntil.Equal(expected) { + if detail.ThrottledUntil.WallTime != expected.WallTime { t.Errorf("expected store to have been throttled to %v, found %v", expected, detail.ThrottledUntil) } @@ -614,7 +613,7 @@ func TestStorePoolSuspected(t *testing.T) { livenesspb.NodeLivenessStatus_DEAD) defer stopper.Stop(ctx) - now := sp.clock.Now().GoTime() + now := sp.clock.Now() timeUntilStoreDead := TimeUntilStoreDead.Get(&sp.st.SV) timeAfterStoreSuspect := TimeAfterStoreSuspect.Get(&sp.st.SV) @@ -622,7 +621,7 @@ func TestStorePoolSuspected(t *testing.T) { detail := sp.GetStoreDetailLocked(0) s := detail.status(now, timeUntilStoreDead, sp.NodeLivenessFn, timeAfterStoreSuspect) require.Equal(t, s, storeStatusUnknown) - require.True(t, detail.LastUnavailable.IsZero()) + require.Equal(t, hlc.Timestamp{}, detail.LastUnavailable) // Now start gossiping the stores statuses. sg := gossiputil.NewStoreGossiper(g) @@ -637,13 +636,13 @@ func TestStorePoolSuspected(t *testing.T) { s = detail.status(now, timeUntilStoreDead, sp.NodeLivenessFn, timeAfterStoreSuspect) require.Equal(t, s, storeStatusAvailable) - require.True(t, detail.LastUnavailable.IsZero()) + require.Equal(t, hlc.Timestamp{}, detail.LastUnavailable) // When the store transitions to unavailable, its status changes to temporarily unknown. mnl.SetNodeStatus(store.Node.NodeID, livenesspb.NodeLivenessStatus_UNAVAILABLE) s = detail.status(now, timeUntilStoreDead, sp.NodeLivenessFn, timeAfterStoreSuspect) require.Equal(t, s, storeStatusUnknown) - require.False(t, detail.LastUnavailable.IsZero()) + require.NotEqual(t, hlc.Timestamp{}, detail.LastUnavailable) // When the store transitions back to live, it passes through suspect for a period. mnl.SetNodeStatus(store.Node.NodeID, livenesspb.NodeLivenessStatus_LIVE) @@ -651,7 +650,7 @@ func TestStorePoolSuspected(t *testing.T) { require.Equal(t, s, storeStatusSuspect) // Once the window has passed, it will return to available. - now = now.Add(timeAfterStoreSuspect).Add(time.Millisecond) + now = now.AddDuration(timeAfterStoreSuspect).AddDuration(time.Millisecond) s = detail.status(now, timeUntilStoreDead, sp.NodeLivenessFn, timeAfterStoreSuspect) require.Equal(t, s, storeStatusAvailable) @@ -666,7 +665,7 @@ func TestStorePoolSuspected(t *testing.T) { require.Equal(t, s, storeStatusSuspect) // Verify it also returns correctly to available after suspect time. - now = now.Add(timeAfterStoreSuspect).Add(time.Millisecond) + now = now.AddDuration(timeAfterStoreSuspect).AddDuration(time.Millisecond) s = detail.status(now, timeUntilStoreDead, sp.NodeLivenessFn, timeAfterStoreSuspect) require.Equal(t, s, storeStatusAvailable) @@ -680,7 +679,7 @@ func TestStorePoolSuspected(t *testing.T) { s = detail.status(now, timeUntilStoreDead, sp.NodeLivenessFn, timeAfterStoreSuspect) require.Equal(t, s, storeStatusSuspect) - now = now.Add(timeAfterStoreSuspect).Add(time.Millisecond) + now = now.AddDuration(timeAfterStoreSuspect).AddDuration(time.Millisecond) mnl.SetNodeStatus(store.Node.NodeID, livenesspb.NodeLivenessStatus_LIVE) s = detail.status(now, timeUntilStoreDead, sp.NodeLivenessFn, timeAfterStoreSuspect) require.Equal(t, s, storeStatusAvailable) @@ -866,120 +865,95 @@ func TestStorePoolDecommissioningReplicas(t *testing.T) { func TestNodeLivenessLivenessStatus(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) - now := timeutil.Now() + now := hlc.Timestamp{ + WallTime: time.Date(2020, 0, 0, 0, 0, 0, 0, time.UTC).UnixNano(), + } threshold := 5 * time.Minute for _, tc := range []struct { + name string liveness livenesspb.Liveness expected livenesspb.NodeLivenessStatus }{ // Valid status. { + name: "Valid 5 min", liveness: livenesspb.Liveness{ - NodeID: 1, - Epoch: 1, - Expiration: hlc.LegacyTimestamp{ - WallTime: now.Add(5 * time.Minute).UnixNano(), - }, - Draining: false, + NodeID: 1, + Epoch: 1, + Expiration: now.AddDuration(5 * time.Minute).ToLegacyTimestamp(), + Draining: false, }, expected: livenesspb.NodeLivenessStatus_LIVE, }, { + name: "Expires just slightly in the future", liveness: livenesspb.Liveness{ - NodeID: 1, - Epoch: 1, - Expiration: hlc.LegacyTimestamp{ - // Expires just slightly in the future. - WallTime: now.UnixNano() + 1, - }, - Draining: false, + NodeID: 1, + Epoch: 1, + Expiration: now.AddDuration(time.Nanosecond).ToLegacyTimestamp(), + Draining: false, }, expected: livenesspb.NodeLivenessStatus_LIVE, }, - // Expired status. - { - liveness: livenesspb.Liveness{ - NodeID: 1, - Epoch: 1, - Expiration: hlc.LegacyTimestamp{ - // Just expired. - WallTime: now.UnixNano(), - }, - Draining: false, - }, - expected: livenesspb.NodeLivenessStatus_UNAVAILABLE, - }, - // Expired status. { + name: "Expired status", liveness: livenesspb.Liveness{ - NodeID: 1, - Epoch: 1, - Expiration: hlc.LegacyTimestamp{ - WallTime: now.UnixNano(), - }, - Draining: false, + NodeID: 1, + Epoch: 1, + Expiration: now.ToLegacyTimestamp(), + Draining: false, }, expected: livenesspb.NodeLivenessStatus_UNAVAILABLE, }, - // Max bound of expired. { + name: "Max bound of expired", liveness: livenesspb.Liveness{ - NodeID: 1, - Epoch: 1, - Expiration: hlc.LegacyTimestamp{ - WallTime: now.Add(-threshold).UnixNano() + 1, - }, - Draining: false, + NodeID: 1, + Epoch: 1, + Expiration: now.AddDuration(-threshold).AddDuration(time.Nanosecond).ToLegacyTimestamp(), + Draining: false, }, expected: livenesspb.NodeLivenessStatus_UNAVAILABLE, }, - // Dead status. { + name: "Dead", liveness: livenesspb.Liveness{ - NodeID: 1, - Epoch: 1, - Expiration: hlc.LegacyTimestamp{ - WallTime: now.Add(-threshold).UnixNano(), - }, - Draining: false, + NodeID: 1, + Epoch: 1, + Expiration: now.AddDuration(-threshold).ToLegacyTimestamp(), + Draining: false, }, expected: livenesspb.NodeLivenessStatus_DEAD, }, - // Decommissioning. { + name: "Decommissioning", liveness: livenesspb.Liveness{ - NodeID: 1, - Epoch: 1, - Expiration: hlc.LegacyTimestamp{ - WallTime: now.Add(time.Second).UnixNano(), - }, + NodeID: 1, + Epoch: 1, + Expiration: now.AddDuration(time.Second).ToLegacyTimestamp(), Membership: livenesspb.MembershipStatus_DECOMMISSIONING, Draining: false, }, expected: livenesspb.NodeLivenessStatus_DECOMMISSIONING, }, - // Decommissioning + expired. { + name: "Decommissioning + expired", liveness: livenesspb.Liveness{ - NodeID: 1, - Epoch: 1, - Expiration: hlc.LegacyTimestamp{ - WallTime: now.Add(-threshold).UnixNano(), - }, + NodeID: 1, + Epoch: 1, + Expiration: now.AddDuration(-threshold).ToLegacyTimestamp(), Membership: livenesspb.MembershipStatus_DECOMMISSIONING, Draining: false, }, expected: livenesspb.NodeLivenessStatus_DECOMMISSIONED, }, - // Decommissioned + live. { + name: "Decommissioning + live", liveness: livenesspb.Liveness{ - NodeID: 1, - Epoch: 1, - Expiration: hlc.LegacyTimestamp{ - WallTime: now.Add(time.Second).UnixNano(), - }, + NodeID: 1, + Epoch: 1, + Expiration: now.AddDuration(time.Second).ToLegacyTimestamp(), Membership: livenesspb.MembershipStatus_DECOMMISSIONED, Draining: false, }, @@ -988,60 +962,52 @@ func TestNodeLivenessLivenessStatus(t *testing.T) { // "Decommissioning". See #50707 for more details. expected: livenesspb.NodeLivenessStatus_DECOMMISSIONING, }, - // Decommissioned + expired. { + name: "Decommissioning + expired", liveness: livenesspb.Liveness{ - NodeID: 1, - Epoch: 1, - Expiration: hlc.LegacyTimestamp{ - WallTime: now.Add(-threshold).UnixNano(), - }, + NodeID: 1, + Epoch: 1, + Expiration: now.AddDuration(-threshold).ToLegacyTimestamp(), Membership: livenesspb.MembershipStatus_DECOMMISSIONED, Draining: false, }, expected: livenesspb.NodeLivenessStatus_DECOMMISSIONED, }, - // Draining { + name: "Draining", liveness: livenesspb.Liveness{ - NodeID: 1, - Epoch: 1, - Expiration: hlc.LegacyTimestamp{ - WallTime: now.Add(5 * time.Minute).UnixNano(), - }, - Draining: true, + NodeID: 1, + Epoch: 1, + Expiration: now.AddDuration(5 * time.Minute).ToLegacyTimestamp(), + Draining: true, }, expected: livenesspb.NodeLivenessStatus_DRAINING, }, - // Decommissioning that is unavailable. { + name: "Decommissioning that is unavailable", liveness: livenesspb.Liveness{ - NodeID: 1, - Epoch: 1, - Expiration: hlc.LegacyTimestamp{ - WallTime: now.UnixNano(), - }, + NodeID: 1, + Epoch: 1, + Expiration: now.ToLegacyTimestamp(), Draining: false, Membership: livenesspb.MembershipStatus_DECOMMISSIONING, }, expected: livenesspb.NodeLivenessStatus_UNAVAILABLE, }, - // Draining that is unavailable. { + name: "Draining that is unavailable", liveness: livenesspb.Liveness{ - NodeID: 1, - Epoch: 1, - Expiration: hlc.LegacyTimestamp{ - WallTime: now.UnixNano(), - }, - Draining: true, + NodeID: 1, + Epoch: 1, + Expiration: now.ToLegacyTimestamp(), + Draining: true, }, expected: livenesspb.NodeLivenessStatus_UNAVAILABLE, }, } { - t.Run("", func(t *testing.T) { + t.Run(tc.name, func(t *testing.T) { if a, e := LivenessStatus(tc.liveness, now, threshold), tc.expected; a != e { - t.Errorf("liveness status was %s, wanted %s", a.String(), e.String()) + t.Errorf("liveness status was %s, wanted %s for %+v, %v", a.String(), e.String(), tc.liveness, now) } }) } diff --git a/pkg/kv/kvserver/allocator/storepool/test_helpers.go b/pkg/kv/kvserver/allocator/storepool/test_helpers.go index 85df6efef8a4..ede0acb0f52b 100644 --- a/pkg/kv/kvserver/allocator/storepool/test_helpers.go +++ b/pkg/kv/kvserver/allocator/storepool/test_helpers.go @@ -55,7 +55,7 @@ func (m *MockNodeLiveness) SetNodeStatus( // NodeLivenessFunc is the method that can be injected as part of store pool // construction to mock out node liveness, in tests. func (m *MockNodeLiveness) NodeLivenessFunc( - nodeID roachpb.NodeID, now time.Time, threshold time.Duration, + nodeID roachpb.NodeID, now hlc.Timestamp, threshold time.Duration, ) livenesspb.NodeLivenessStatus { m.Lock() defer m.Unlock() @@ -76,7 +76,8 @@ func CreateTestStorePool( defaultNodeStatus livenesspb.NodeLivenessStatus, ) (*stop.Stopper, *gossip.Gossip, *timeutil.ManualTime, *StorePool, *MockNodeLiveness) { stopper := stop.NewStopper() - mc := timeutil.NewManualTime(timeutil.Unix(0, 123)) + // Pick a random date that is "realistic" and far enough away from 0. + mc := timeutil.NewManualTime(time.Date(2020, 0, 0, 0, 0, 0, 0, time.UTC)) clock := hlc.NewClockForTesting(mc) ambientCtx := log.MakeTestingAmbientContext(stopper.Tracer()) g := gossip.NewTest(1, stopper, metric.NewRegistry(), zonepb.DefaultZoneConfigRef()) diff --git a/pkg/kv/kvserver/allocator_impl_test.go b/pkg/kv/kvserver/allocator_impl_test.go index d3b31dc73d2a..15a1c318bb9e 100644 --- a/pkg/kv/kvserver/allocator_impl_test.go +++ b/pkg/kv/kvserver/allocator_impl_test.go @@ -419,7 +419,7 @@ func TestAllocatorThrottled(t *testing.T) { if !ok { t.Fatalf("store:%d was not found in the store pool", singleStore[0].StoreID) } - storeDetail.ThrottledUntil = timeutil.Now().Add(24 * time.Hour) + storeDetail.ThrottledUntil = hlc.Timestamp{WallTime: timeutil.Now().Add(24 * time.Hour).UnixNano()} sp.DetailsMu.Unlock() _, _, err = a.AllocateVoter(ctx, sp, simpleSpanConfig, []roachpb.ReplicaDescriptor{}, nil, nil, allocatorimpl.Dead) if _, ok := IsPurgatoryError(err); ok { diff --git a/pkg/kv/kvserver/asim/BUILD.bazel b/pkg/kv/kvserver/asim/BUILD.bazel index 55ddf0aca259..ea4178891f1c 100644 --- a/pkg/kv/kvserver/asim/BUILD.bazel +++ b/pkg/kv/kvserver/asim/BUILD.bazel @@ -15,6 +15,7 @@ go_library( "//pkg/kv/kvserver/asim/state", "//pkg/kv/kvserver/asim/storerebalancer", "//pkg/kv/kvserver/asim/workload", + "//pkg/util/hlc", ], ) diff --git a/pkg/kv/kvserver/asim/asim.go b/pkg/kv/kvserver/asim/asim.go index 2df673ad2834..56d7c9caef4f 100644 --- a/pkg/kv/kvserver/asim/asim.go +++ b/pkg/kv/kvserver/asim/asim.go @@ -22,13 +22,14 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvserver/asim/state" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/asim/storerebalancer" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/asim/workload" + "github.com/cockroachdb/cockroach/pkg/util/hlc" ) // Simulator simulates an entire cluster, and runs the allocator of each store // in that cluster. type Simulator struct { - curr time.Time - end time.Time + curr hlc.Timestamp + end hlc.Timestamp // interval is the step between ticks for active simulaton components, such // as the queues, store rebalancer and state changers. It should be set // lower than the bgInterval, as updated occur more frequently. @@ -133,7 +134,7 @@ func NewSimulator( s := &Simulator{ curr: settings.StartTime, - end: settings.StartTime.Add(duration), + end: settings.StartTime.AddDuration(duration), interval: settings.TickInterval, generators: wgs, state: initialState, @@ -154,10 +155,10 @@ func NewSimulator( // GetNextTickTime returns a simulated tick time, or an indication that the // simulation is done. -func (s *Simulator) GetNextTickTime() (done bool, tick time.Time) { - s.curr = s.curr.Add(s.interval) +func (s *Simulator) GetNextTickTime() (done bool, tick hlc.Timestamp) { + s.curr = s.curr.AddDuration(s.interval) if s.curr.After(s.end) { - return true, time.Time{} + return true, hlc.Timestamp{} } return false, s.curr } @@ -217,7 +218,7 @@ func (s *Simulator) RunSim(ctx context.Context) { } // tickWorkload gets the next workload events and applies them to state. -func (s *Simulator) tickWorkload(ctx context.Context, tick time.Time) { +func (s *Simulator) tickWorkload(ctx context.Context, tick hlc.Timestamp) { s.shuffler( len(s.generators), func(i, j int) { s.generators[i], s.generators[j] = s.generators[j], s.generators[i] }, @@ -231,7 +232,7 @@ func (s *Simulator) tickWorkload(ctx context.Context, tick time.Time) { // tickStateChanges ticks atomic pending changes, in the changer. Then, for // each store ticks the pending operations such as relocate range and lease // transfers. -func (s *Simulator) tickStateChanges(ctx context.Context, tick time.Time) { +func (s *Simulator) tickStateChanges(ctx context.Context, tick hlc.Timestamp) { s.changer.Tick(tick, s.state) stores := s.state.Stores() s.shuffler(len(stores), func(i, j int) { stores[i], stores[j] = stores[j], stores[i] }) @@ -243,18 +244,18 @@ func (s *Simulator) tickStateChanges(ctx context.Context, tick time.Time) { // tickGossip puts the current tick store descriptors into the state // exchange. It then updates the exchanged descriptors for each store's store // pool. -func (s *Simulator) tickGossip(ctx context.Context, tick time.Time) { +func (s *Simulator) tickGossip(ctx context.Context, tick hlc.Timestamp) { s.gossip.Tick(ctx, tick, s.state) } -func (s *Simulator) tickStoreClocks(tick time.Time) { +func (s *Simulator) tickStoreClocks(tick hlc.Timestamp) { s.state.TickClock(tick) } // tickQueues iterates over the next replicas for each store to // consider. It then enqueues each of these and ticks the replicate queue for // processing. -func (s *Simulator) tickQueues(ctx context.Context, tick time.Time, state state.State) { +func (s *Simulator) tickQueues(ctx context.Context, tick hlc.Timestamp, state state.State) { stores := s.state.Stores() s.shuffler(len(stores), func(i, j int) { stores[i], stores[j] = stores[j], stores[i] }) for _, store := range stores { @@ -299,7 +300,9 @@ func (s *Simulator) tickQueues(ctx context.Context, tick time.Time, state state. // tickStoreRebalancers iterates over the store rebalancers in the cluster and // ticks their control loop. -func (s *Simulator) tickStoreRebalancers(ctx context.Context, tick time.Time, state state.State) { +func (s *Simulator) tickStoreRebalancers( + ctx context.Context, tick hlc.Timestamp, state state.State, +) { stores := s.state.Stores() s.shuffler(len(stores), func(i, j int) { stores[i], stores[j] = stores[j], stores[i] }) for _, store := range stores { @@ -308,6 +311,6 @@ func (s *Simulator) tickStoreRebalancers(ctx context.Context, tick time.Time, st } // tickMetrics prints the metrics up to the given tick. -func (s *Simulator) tickMetrics(ctx context.Context, tick time.Time) { +func (s *Simulator) tickMetrics(ctx context.Context, tick hlc.Timestamp) { s.metrics.Tick(ctx, tick, s.state) } diff --git a/pkg/kv/kvserver/asim/config/BUILD.bazel b/pkg/kv/kvserver/asim/config/BUILD.bazel index 0c0dee0f3484..cc8d333ce83d 100644 --- a/pkg/kv/kvserver/asim/config/BUILD.bazel +++ b/pkg/kv/kvserver/asim/config/BUILD.bazel @@ -6,6 +6,7 @@ go_library( srcs = ["settings.go"], importpath = "github.com/cockroachdb/cockroach/pkg/kv/kvserver/asim/config", visibility = ["//visibility:public"], + deps = ["//pkg/util/hlc"], ) get_x_data(name = "get_x_data") diff --git a/pkg/kv/kvserver/asim/config/settings.go b/pkg/kv/kvserver/asim/config/settings.go index e3619afdf937..838bea6dcc8b 100644 --- a/pkg/kv/kvserver/asim/config/settings.go +++ b/pkg/kv/kvserver/asim/config/settings.go @@ -10,7 +10,11 @@ package config -import "time" +import ( + "time" + + "github.com/cockroachdb/cockroach/pkg/util/hlc" +) const ( defaultTickInteval = 500 * time.Millisecond @@ -38,8 +42,10 @@ const ( var ( // defaultStartTime is used as the default beginning time for simulation // runs. It isn't necessarily meaningful other than for logging and having - // "some" start time for components taking a time.Time. - defaultStartTime = time.Date(2022, 03, 21, 11, 0, 0, 0, time.UTC) + // "some" start time for components taking a hlc.Timestamp. + defaultStartTime = hlc.Timestamp{ + WallTime: time.Date(2022, 03, 21, 11, 0, 0, 0, time.UTC).UnixNano(), + } ) // SimulationSettings controls @@ -47,7 +53,7 @@ var ( type SimulationSettings struct { // StartTime is the time to start the simulation at. This is also used to // init the shared state simulation clock. - StartTime time.Time + StartTime hlc.Timestamp // TickInterval is the duration between simulator ticks. The lower this // setting, the higher resolution the simulation will be. A lower // TickInterval will also take longer to execute so a tradeoff exists. diff --git a/pkg/kv/kvserver/asim/gossip/BUILD.bazel b/pkg/kv/kvserver/asim/gossip/BUILD.bazel index f007187eb6e6..54408e7eadcd 100644 --- a/pkg/kv/kvserver/asim/gossip/BUILD.bazel +++ b/pkg/kv/kvserver/asim/gossip/BUILD.bazel @@ -15,6 +15,7 @@ go_library( "//pkg/kv/kvserver/asim/config", "//pkg/kv/kvserver/asim/state", "//pkg/roachpb", + "//pkg/util/hlc", "//pkg/util/protoutil", ], ) diff --git a/pkg/kv/kvserver/asim/gossip/exchange.go b/pkg/kv/kvserver/asim/gossip/exchange.go index 314e739ae6fd..24cd70c39059 100644 --- a/pkg/kv/kvserver/asim/gossip/exchange.go +++ b/pkg/kv/kvserver/asim/gossip/exchange.go @@ -12,16 +12,16 @@ package gossip import ( "sort" - "time" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/allocator/storepool" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/asim/config" "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/util/hlc" ) // exchangeInfo contains the information of a gossiped store descriptor. type exchangeInfo struct { - created time.Time + created hlc.Timestamp desc roachpb.StoreDescriptor } @@ -34,7 +34,7 @@ type fixedDelayExchange struct { // put adds the given descriptors at the current tick into the exchange // network. -func (u *fixedDelayExchange) put(tick time.Time, descs ...roachpb.StoreDescriptor) { +func (u *fixedDelayExchange) put(tick hlc.Timestamp, descs ...roachpb.StoreDescriptor) { for _, desc := range descs { u.pending = append(u.pending, exchangeInfo{created: tick, desc: desc}) } @@ -42,11 +42,11 @@ func (u *fixedDelayExchange) put(tick time.Time, descs ...roachpb.StoreDescripto // updates returns back exchanged infos, wrapped as store details that have // completed between the last tick update was called and the tick given. -func (u *fixedDelayExchange) updates(tick time.Time) []*storepool.StoreDetail { - sort.Slice(u.pending, func(i, j int) bool { return u.pending[i].created.Before(u.pending[j].created) }) - ready := []*storepool.StoreDetail{} +func (u *fixedDelayExchange) updates(tick hlc.Timestamp) []*storepool.StoreDetail { + sort.Slice(u.pending, func(i, j int) bool { return u.pending[i].created.Less(u.pending[j].created) }) + var ready []*storepool.StoreDetail i := 0 - for ; i < len(u.pending) && !tick.Before(u.pending[i].created.Add(u.settings.StateExchangeDelay)); i++ { + for ; i < len(u.pending) && !tick.Less(u.pending[i].created.AddDuration(u.settings.StateExchangeDelay)); i++ { ready = append(ready, makeStoreDetail(&u.pending[i].desc, u.pending[i].created)) } u.pending = u.pending[i:] @@ -55,7 +55,7 @@ func (u *fixedDelayExchange) updates(tick time.Time) []*storepool.StoreDetail { // makeStoreDetail wraps a store descriptor into a storepool StoreDetail at the // given tick. -func makeStoreDetail(desc *roachpb.StoreDescriptor, tick time.Time) *storepool.StoreDetail { +func makeStoreDetail(desc *roachpb.StoreDescriptor, tick hlc.Timestamp) *storepool.StoreDetail { return &storepool.StoreDetail{ Desc: desc, LastUpdatedTime: tick, diff --git a/pkg/kv/kvserver/asim/gossip/exchange_test.go b/pkg/kv/kvserver/asim/gossip/exchange_test.go index 3fc7ca057b5b..f16f603d34c4 100644 --- a/pkg/kv/kvserver/asim/gossip/exchange_test.go +++ b/pkg/kv/kvserver/asim/gossip/exchange_test.go @@ -40,11 +40,11 @@ func TestFixedDelayExchange(t *testing.T) { require.Len(t, exchange.pending, 3) // There should be no updates until after the tick + state exchange delay. - halfTick := tick.Add(settings.StateExchangeDelay / 2) + halfTick := tick.AddDuration(settings.StateExchangeDelay / 2) require.Len(t, exchange.updates(halfTick), 0) // Update the tick to be >= tick + delay, there should be three updates. - tick = tick.Add(settings.StateExchangeDelay) + tick = tick.AddDuration(settings.StateExchangeDelay) require.Len(t, exchange.updates(tick), 3) require.Len(t, exchange.pending, 0) } diff --git a/pkg/kv/kvserver/asim/gossip/gossip.go b/pkg/kv/kvserver/asim/gossip/gossip.go index 28a3e6cfa2b6..50f2da0666ae 100644 --- a/pkg/kv/kvserver/asim/gossip/gossip.go +++ b/pkg/kv/kvserver/asim/gossip/gossip.go @@ -20,6 +20,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvserver/asim/config" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/asim/state" "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/protoutil" ) @@ -28,7 +29,7 @@ import ( type Gossip interface { // Tick checks for completed gossip updates and triggers new gossip // updates if needed. - Tick(context.Context, time.Time, state.State) + Tick(context.Context, hlc.Timestamp, state.State) } // gossip is an implementation of the Gossip interface. It manages the @@ -46,14 +47,14 @@ var _ Gossip = &gossip{} // have been triggered by the underlying kvserver.StoreGossip component. type storeGossiper struct { local *kvserver.StoreGossip - lastIntervalGossip time.Time + lastIntervalGossip hlc.Timestamp descriptorGetter func(cached bool) roachpb.StoreDescriptor pendingOutbound *roachpb.StoreDescriptor } func newStoreGossiper(descriptorGetter func(cached bool) roachpb.StoreDescriptor) *storeGossiper { sg := &storeGossiper{ - lastIntervalGossip: time.Time{}, + lastIntervalGossip: hlc.Timestamp{}, descriptorGetter: descriptorGetter, } @@ -123,7 +124,7 @@ func (g *gossip) addStoreToGossip(s state.State, storeID state.StoreID) { // Tick checks for completed gossip updates and triggers new gossip // updates if needed. -func (g *gossip) Tick(ctx context.Context, tick time.Time, s state.State) { +func (g *gossip) Tick(ctx context.Context, tick hlc.Timestamp, s state.State) { stores := s.Stores() for _, store := range stores { var sg *storeGossiper @@ -139,7 +140,7 @@ func (g *gossip) Tick(ctx context.Context, tick time.Time, s state.State) { // shoud gossip. // NB: In the real code this is controlled by a gossip // ticker on the node that activates every 10 seconds. - if !tick.Before(sg.lastIntervalGossip.Add(g.settings.StateExchangeInterval)) { + if !tick.Less(sg.lastIntervalGossip.AddDuration(g.settings.StateExchangeInterval)) { sg.lastIntervalGossip = tick _ = sg.local.GossipStore(ctx, false /* useCached */) } @@ -182,7 +183,7 @@ func (g *gossip) NewCapacityNotify(capacity roachpb.StoreCapacity, storeID state } } -func (g *gossip) maybeUpdateState(tick time.Time, s state.State) { +func (g *gossip) maybeUpdateState(tick hlc.Timestamp, s state.State) { // NB: The updates function gives back all store descriptors which have // completed exchange. We apply the update to every stores state uniformly, // i.e. fixed delay. diff --git a/pkg/kv/kvserver/asim/gossip/gossip_test.go b/pkg/kv/kvserver/asim/gossip/gossip_test.go index 830482aaceec..46fb3b5b23c4 100644 --- a/pkg/kv/kvserver/asim/gossip/gossip_test.go +++ b/pkg/kv/kvserver/asim/gossip/gossip_test.go @@ -79,7 +79,7 @@ func TestGossip(t *testing.T) { // Add the delay interval and then assert that the storepools for each // store are populated. - tick = tick.Add(settings.StateExchangeDelay) + tick = tick.AddDuration(settings.StateExchangeDelay) gossip.Tick(ctx, tick, s) // The exchange component should now be empty, clearing the previous @@ -98,7 +98,7 @@ func TestGossip(t *testing.T) { require.Len(t, gossip.exchange.pending, 2) // Increment the tick and check that the updated lease count information // reached each storepool. - tick = tick.Add(settings.StateExchangeDelay) + tick = tick.AddDuration(settings.StateExchangeDelay) gossip.Tick(ctx, tick, s) require.Len(t, gossip.exchange.pending, 0) diff --git a/pkg/kv/kvserver/asim/metrics/BUILD.bazel b/pkg/kv/kvserver/asim/metrics/BUILD.bazel index 4fa11a10cdbb..95b4a4fdadfb 100644 --- a/pkg/kv/kvserver/asim/metrics/BUILD.bazel +++ b/pkg/kv/kvserver/asim/metrics/BUILD.bazel @@ -13,6 +13,7 @@ go_library( deps = [ "//pkg/kv/kvserver/asim/state", "//pkg/util/encoding/csv", + "//pkg/util/hlc", "//pkg/util/log", ], ) diff --git a/pkg/kv/kvserver/asim/metrics/cluster_tracker.go b/pkg/kv/kvserver/asim/metrics/cluster_tracker.go index 2fac6985ca59..ce71396d0ec9 100644 --- a/pkg/kv/kvserver/asim/metrics/cluster_tracker.go +++ b/pkg/kv/kvserver/asim/metrics/cluster_tracker.go @@ -14,9 +14,9 @@ import ( "context" "fmt" "io" - "time" "github.com/cockroachdb/cockroach/pkg/util/encoding/csv" + "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/log" ) @@ -68,7 +68,7 @@ func max(a, b int64) int64 { // Listen implements the StoreMetricsListener interface. func (m *ClusterMetricsTracker) Listen(ctx context.Context, sms []StoreMetrics) { var ( - tick time.Time + tick hlc.Timestamp totalRangeCount int64 totalLeaseTransfers int64 totalRebalances int64 @@ -100,7 +100,7 @@ func (m *ClusterMetricsTracker) Listen(ctx context.Context, sms []StoreMetrics) } record := make([]string, 0, 10) - record = append(record, tick.String()) + record = append(record, tick.GoTime().String()) record = append(record, fmt.Sprintf("%d", totalRangeCount)) record = append(record, fmt.Sprintf("%d", totalWriteKeys)) record = append(record, fmt.Sprintf("%d", totalWriteBytes)) diff --git a/pkg/kv/kvserver/asim/metrics/tracker.go b/pkg/kv/kvserver/asim/metrics/tracker.go index 8543cff3d42e..115bbb8fb5b3 100644 --- a/pkg/kv/kvserver/asim/metrics/tracker.go +++ b/pkg/kv/kvserver/asim/metrics/tracker.go @@ -17,12 +17,13 @@ import ( "time" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/asim/state" + "github.com/cockroachdb/cockroach/pkg/util/hlc" ) // StoreMetrics tracks metrics per-store in a simulation run. Each metrics // struct is associated with a tick. type StoreMetrics struct { - Tick time.Time + Tick hlc.Timestamp StoreID int64 QPS int64 WriteKeys int64 @@ -52,7 +53,7 @@ type StoreMetricsListener interface { // StoreMetrics information when ticked. type Tracker struct { storeListeners []StoreMetricsListener - lastTick time.Time + lastTick hlc.Timestamp interval time.Duration } @@ -72,8 +73,8 @@ func (mt *Tracker) Register(listeners ...StoreMetricsListener) { // Tick updates all listeners attached to the metrics tracker with the state at // the tick given. -func (mt *Tracker) Tick(ctx context.Context, tick time.Time, s state.State) { - if mt.lastTick.Add(mt.interval).After(tick) { +func (mt *Tracker) Tick(ctx context.Context, tick hlc.Timestamp, s state.State) { + if mt.lastTick.AddDuration(mt.interval).After(tick) { // Nothing to do yet. return } diff --git a/pkg/kv/kvserver/asim/op/BUILD.bazel b/pkg/kv/kvserver/asim/op/BUILD.bazel index 0467780be8cb..1a95dd297d4c 100644 --- a/pkg/kv/kvserver/asim/op/BUILD.bazel +++ b/pkg/kv/kvserver/asim/op/BUILD.bazel @@ -20,6 +20,7 @@ go_library( "//pkg/kv/kvserver/asim/config", "//pkg/kv/kvserver/asim/state", "//pkg/roachpb", + "//pkg/util/hlc", "@com_github_cockroachdb_errors//:errors", ], ) diff --git a/pkg/kv/kvserver/asim/op/controller.go b/pkg/kv/kvserver/asim/op/controller.go index 402ee7108eaf..bcb354428988 100644 --- a/pkg/kv/kvserver/asim/op/controller.go +++ b/pkg/kv/kvserver/asim/op/controller.go @@ -13,7 +13,6 @@ package op import ( "container/heap" "context" - "time" "github.com/cockroachdb/cockroach/pkg/kv/kvserver" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/allocator/allocatorimpl" @@ -21,6 +20,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvserver/asim/config" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/asim/state" "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/errors" ) @@ -35,10 +35,10 @@ type Controller interface { // Dispatch enqueues an operation to be processed. It returns a ticket // associated with the operation that may be used to check on the operation // progress. - Dispatch(context.Context, time.Time, state.State, ControlledOperation) DispatchedTicket + Dispatch(context.Context, hlc.Timestamp, state.State, ControlledOperation) DispatchedTicket // Tick iterates through pending operations and processes them up to the // current tick. - Tick(context.Context, time.Time, state.State) + Tick(context.Context, hlc.Timestamp, state.State) // Check checks the progress of the operation associated with the ticket // given. If the ticket exists, it returns the operation and true, else // false. @@ -81,7 +81,7 @@ func NewController( // associated with the operation that may be used to check on the operation // progress. func (c *controller) Dispatch( - ctx context.Context, tick time.Time, state state.State, co ControlledOperation, + ctx context.Context, tick hlc.Timestamp, state state.State, co ControlledOperation, ) DispatchedTicket { c.ticketGen++ ticket := c.ticketGen @@ -95,7 +95,7 @@ func (c *controller) Dispatch( // Tick iterates through pending operations and processes them up to the // current tick. -func (c *controller) Tick(ctx context.Context, tick time.Time, state state.State) { +func (c *controller) Tick(ctx context.Context, tick hlc.Timestamp, state state.State) { for c.pending.Len() > 0 { i := heap.Pop(c.pending) qop, _ := i.(*queuedOp) @@ -128,7 +128,7 @@ func (c *controller) Check(ticket DispatchedTicket) (op ControlledOperation, ok } func (c *controller) process( - ctx context.Context, tick time.Time, state state.State, co ControlledOperation, + ctx context.Context, tick hlc.Timestamp, state state.State, co ControlledOperation, ) { switch op := co.(type) { case *RelocateRangeOp: @@ -149,7 +149,7 @@ func (c *controller) process( } func (c *controller) processRelocateRange( - ctx context.Context, tick time.Time, s state.State, ro *RelocateRangeOp, + ctx context.Context, tick hlc.Timestamp, s state.State, ro *RelocateRangeOp, ) error { rng := s.RangeFor(ro.key) options := SimRelocateOneOptions{allocator: c.allocator, storePool: c.storePool, state: s} @@ -229,7 +229,7 @@ func (c *controller) processRelocateRange( } func (c *controller) processTransferLease( - ctx context.Context, tick time.Time, s state.State, ro *TransferLeaseOp, + ctx context.Context, tick hlc.Timestamp, s state.State, ro *TransferLeaseOp, ) error { if store, ok := s.LeaseholderStore(ro.rangeID); ok && store.StoreID() == ro.target { ro.done = true @@ -255,6 +255,6 @@ func (c *controller) processTransferLease( ro.rangeID, ro.target) } - ro.next = tick.Add(delay) + ro.next = tick.AddDuration(delay) return nil } diff --git a/pkg/kv/kvserver/asim/op/operation.go b/pkg/kv/kvserver/asim/op/operation.go index 0a4acffc7e4a..a61e74bc2c18 100644 --- a/pkg/kv/kvserver/asim/op/operation.go +++ b/pkg/kv/kvserver/asim/op/operation.go @@ -11,8 +11,7 @@ package op import ( - "time" - + "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/errors" ) @@ -20,38 +19,38 @@ import ( // It maintains a set of methods to inspect the lifecycle of an operation. type ControlledOperation interface { // Next returns the next time that this operation should be checked. - Next() time.Time + Next() hlc.Timestamp // Done returns whether this controlled operation is done, if so, when the // operation finished. - Done() (bool, time.Time) + Done() (bool, hlc.Timestamp) // Errors returns any errors that were recorded for this operation, if any // exist, otherwise nil. Errors() error } type baseOp struct { - start, next, complete time.Time + start, next, complete hlc.Timestamp done bool errs []error } -func newBaseOp(tick time.Time) baseOp { +func newBaseOp(tick hlc.Timestamp) baseOp { return baseOp{ start: tick, next: tick, - complete: time.Time{}, + complete: hlc.Timestamp{}, errs: []error{}, } } // Next returns the next time that this operation should be checked. -func (bo baseOp) Next() time.Time { +func (bo baseOp) Next() hlc.Timestamp { return bo.next } // Done returns whether this controlled operation is done, if so, when the // operation finished. -func (bo baseOp) Done() (bool, time.Time) { +func (bo baseOp) Done() (bool, hlc.Timestamp) { return bo.done, bo.complete } diff --git a/pkg/kv/kvserver/asim/op/pq.go b/pkg/kv/kvserver/asim/op/pq.go index fa49d1fd31d9..4ec3e15f2eb4 100644 --- a/pkg/kv/kvserver/asim/op/pq.go +++ b/pkg/kv/kvserver/asim/op/pq.go @@ -24,10 +24,10 @@ type priorityQueue struct { // Less is part of the container.Heap interface. func (pq *priorityQueue) Less(i, j int) bool { a, b := pq.items[i], pq.items[j] - if a.Next().Equal(b.Next()) { + if a.Next().EqOrdering(b.Next()) { return a.seq < b.seq } - return a.Next().Before(b.Next()) + return a.Next().Less(b.Next()) } // Swap is part of the container.Heap interface. diff --git a/pkg/kv/kvserver/asim/op/relocate_range.go b/pkg/kv/kvserver/asim/op/relocate_range.go index 37b22cbfaba1..9da5613c4a5d 100644 --- a/pkg/kv/kvserver/asim/op/relocate_range.go +++ b/pkg/kv/kvserver/asim/op/relocate_range.go @@ -12,12 +12,12 @@ package op import ( "context" - "time" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/allocator/allocatorimpl" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/allocator/storepool" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/asim/state" "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/errors" ) @@ -31,7 +31,7 @@ type RelocateRangeOp struct { // NewRelocateRangeOp returns a new NewRelocateRangeOp. func NewRelocateRangeOp( - tick time.Time, + tick hlc.Timestamp, key roachpb.Key, voterTargets, nonVoterTargets []roachpb.ReplicationTarget, transferLeaseToFirstVoter bool, diff --git a/pkg/kv/kvserver/asim/op/transfer_lease.go b/pkg/kv/kvserver/asim/op/transfer_lease.go index a742de381201..275886ac7ec1 100644 --- a/pkg/kv/kvserver/asim/op/transfer_lease.go +++ b/pkg/kv/kvserver/asim/op/transfer_lease.go @@ -11,11 +11,10 @@ package op import ( - "time" - "github.com/cockroachdb/cockroach/pkg/kv/kvserver/allocator" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/asim/state" "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/errors" ) @@ -29,7 +28,7 @@ type TransferLeaseOp struct { // NewTransferLeaseOp returns a new TransferLeaseOp. func NewTransferLeaseOp( - tick time.Time, + tick hlc.Timestamp, rangeID roachpb.RangeID, source, target roachpb.StoreID, usage allocator.RangeUsageInfo, diff --git a/pkg/kv/kvserver/asim/queue/BUILD.bazel b/pkg/kv/kvserver/asim/queue/BUILD.bazel index 4873b80be382..2686c656d6fd 100644 --- a/pkg/kv/kvserver/asim/queue/BUILD.bazel +++ b/pkg/kv/kvserver/asim/queue/BUILD.bazel @@ -16,6 +16,7 @@ go_library( "//pkg/kv/kvserver/allocator/storepool", "//pkg/kv/kvserver/asim/state", "//pkg/roachpb", + "//pkg/util/hlc", "//pkg/util/log", ], ) diff --git a/pkg/kv/kvserver/asim/queue/pacer.go b/pkg/kv/kvserver/asim/queue/pacer.go index 1e45d6873196..9f917d4433df 100644 --- a/pkg/kv/kvserver/asim/queue/pacer.go +++ b/pkg/kv/kvserver/asim/queue/pacer.go @@ -14,12 +14,13 @@ import ( "time" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/asim/state" + "github.com/cockroachdb/cockroach/pkg/util/hlc" ) // ReplicaPacer controls the speed of considering a replica. type ReplicaPacer interface { // Next returns the next replica for the current tick, if exists. - Next(tick time.Time) state.Replica + Next(tick hlc.Timestamp) state.Replica } // ReplicaScanner simulates store scanner replica pacing, iterating over @@ -30,8 +31,8 @@ type ReplicaScanner struct { // replicas in state. nextReplsFn func() []state.Replica repls []state.Replica - start time.Time - lastLoop time.Time + start hlc.Timestamp + lastLoop hlc.Timestamp targetLoopInterval time.Duration minIterInvterval time.Duration maxIterInvterval time.Duration @@ -73,7 +74,7 @@ func (rp ReplicaScanner) Swap(i, j int) { // resetPacerLoop collects the current replicas on the store and sets the // pacing interval to complete iteration over all replicas in exactly target // loop interval. -func (rp *ReplicaScanner) resetPacerLoop(tick time.Time) { +func (rp *ReplicaScanner) resetPacerLoop(tick hlc.Timestamp) { rp.repls = rp.nextReplsFn() // Avoid the same replicas being processed in the same order in each @@ -113,21 +114,21 @@ func (rp *ReplicaScanner) resetPacerLoop(tick time.Time) { // maybeResetPacerLoop checks whether we have completed iteration and resets // the pacing loop if so. -func (rp *ReplicaScanner) maybeResetPacerLoop(tick time.Time) { +func (rp *ReplicaScanner) maybeResetPacerLoop(tick hlc.Timestamp) { if rp.visited >= len(rp.repls) { rp.resetPacerLoop(tick) } } // Next returns the next replica for the current tick, if exists. -func (rp *ReplicaScanner) Next(tick time.Time) state.Replica { +func (rp *ReplicaScanner) Next(tick hlc.Timestamp) state.Replica { rp.maybeResetPacerLoop(tick) elapsed := tick.Sub(rp.start) if elapsed >= rp.iterInterval && len(rp.repls) > 0 { repl := rp.repls[rp.visited] rp.visited++ - rp.start = rp.start.Add(rp.iterInterval) + rp.start = rp.start.AddDuration(rp.iterInterval) return repl } return nil diff --git a/pkg/kv/kvserver/asim/queue/queue.go b/pkg/kv/kvserver/asim/queue/queue.go index d5e6f2b78efc..1751d957f561 100644 --- a/pkg/kv/kvserver/asim/queue/queue.go +++ b/pkg/kv/kvserver/asim/queue/queue.go @@ -12,10 +12,10 @@ package queue import ( "context" - "time" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/asim/state" "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/util/hlc" ) // RangeQueue presents an interface to interact with a single consumer @@ -35,7 +35,7 @@ type RangeQueue interface { // processed at a time and the duration taken to process a replica depends // on the action taken. Replicas in the queue are processed in order of // priority, then in FIFO order on ties. - Tick(ctx context.Context, tick time.Time, state state.State) + Tick(ctx context.Context, tick hlc.Timestamp, state state.State) } // replicaItem represents an item in the replica queue. @@ -104,5 +104,5 @@ type baseQueue struct { priorityQueue storeID state.StoreID stateChanger state.Changer - next, lastTick time.Time + next, lastTick hlc.Timestamp } diff --git a/pkg/kv/kvserver/asim/queue/replicate_queue.go b/pkg/kv/kvserver/asim/queue/replicate_queue.go index 3e72ba8a7269..115ce204cde1 100644 --- a/pkg/kv/kvserver/asim/queue/replicate_queue.go +++ b/pkg/kv/kvserver/asim/queue/replicate_queue.go @@ -19,6 +19,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvserver/allocator/storepool" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/asim/state" "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/log" ) @@ -36,7 +37,7 @@ func NewReplicateQueue( delay func(rangeSize int64, add bool) time.Duration, allocator allocatorimpl.Allocator, storePool storepool.AllocatorStorePool, - start time.Time, + start hlc.Timestamp, ) RangeQueue { return &replicateQueue{ baseQueue: baseQueue{ @@ -85,12 +86,12 @@ func (rq *replicateQueue) MaybeAdd( // supports processing ConsiderRebalance actions on replicas. // TODO(kvoli,lidorcarmel): Support taking additional actions, beyond consider // rebalance. -func (rq *replicateQueue) Tick(ctx context.Context, tick time.Time, s state.State) { +func (rq *replicateQueue) Tick(ctx context.Context, tick hlc.Timestamp, s state.State) { if rq.lastTick.After(rq.next) { rq.next = rq.lastTick } - for !tick.Before(rq.next) && rq.priorityQueue.Len() != 0 { + for !tick.Less(rq.next) && rq.priorityQueue.Len() != 0 { item := heap.Pop(rq).(*replicaItem) if item == nil { return @@ -124,7 +125,7 @@ func (rq *replicateQueue) Tick(ctx context.Context, tick time.Time, s state.Stat // changer and update the time to process the next replica, with the completion // time returned. func (rq *replicateQueue) considerRebalance( - ctx context.Context, tick time.Time, rng state.Range, s state.State, + ctx context.Context, tick hlc.Timestamp, rng state.Range, s state.State, ) { add, remove, _, ok := rq.allocator.RebalanceVoter( ctx, diff --git a/pkg/kv/kvserver/asim/queue/split_queue.go b/pkg/kv/kvserver/asim/queue/split_queue.go index 1cfd61cfd263..c7ca0a431f74 100644 --- a/pkg/kv/kvserver/asim/queue/split_queue.go +++ b/pkg/kv/kvserver/asim/queue/split_queue.go @@ -17,6 +17,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvserver/asim/state" "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/util/hlc" ) type splitQueue struct { @@ -32,7 +33,7 @@ func NewSplitQueue( stateChanger state.Changer, delay func() time.Duration, splitThreshold int64, - start time.Time, + start hlc.Timestamp, ) RangeQueue { return &splitQueue{ baseQueue: baseQueue{ @@ -69,12 +70,12 @@ func (sq *splitQueue) MaybeAdd(ctx context.Context, replica state.Replica, state // taken. Replicas in the queue are processed in order of priority, then in // FIFO order on ties. The tick currently only considers size based range // splitting. -func (sq *splitQueue) Tick(ctx context.Context, tick time.Time, s state.State) { +func (sq *splitQueue) Tick(ctx context.Context, tick hlc.Timestamp, s state.State) { if sq.lastTick.After(sq.next) { sq.next = sq.lastTick } - for !tick.Before(sq.next) && sq.priorityQueue.Len() != 0 { + for !tick.Less(sq.next) && sq.priorityQueue.Len() != 0 { item := heap.Pop(sq).(*replicaItem) if item == nil { return @@ -115,7 +116,9 @@ func (sq *splitQueue) Tick(ctx context.Context, tick time.Time, s state.State) { // shouldSplit returns whether a range should be split into two. When the // floating point number returned is greater than or equal to 1, it should be // split with that priority, else it shouldn't. -func (sq *splitQueue) shouldSplit(tick time.Time, rangeID state.RangeID, s state.State) float64 { +func (sq *splitQueue) shouldSplit( + tick hlc.Timestamp, rangeID state.RangeID, s state.State, +) float64 { rng, ok := s.Range(rangeID) if !ok { return 0 @@ -136,7 +139,7 @@ func (sq *splitQueue) shouldSplit(tick time.Time, rangeID state.RangeID, s state // two. It will return the key that divides the range into an equal number of // keys on the lhs and rhs. func (sq *splitQueue) findKeySpanSplit( - tick time.Time, s state.State, rangeID state.RangeID, + tick hlc.Timestamp, s state.State, rangeID state.RangeID, ) (state.Key, bool) { // Try and use the split key suggested by the load based splitter, if one // exists. diff --git a/pkg/kv/kvserver/asim/state/change.go b/pkg/kv/kvserver/asim/state/change.go index 7a5223eb6f19..901c371f7048 100644 --- a/pkg/kv/kvserver/asim/state/change.go +++ b/pkg/kv/kvserver/asim/state/change.go @@ -13,6 +13,7 @@ package state import ( "time" + "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/google/btree" ) @@ -38,10 +39,10 @@ type Changer interface { // state change per Range() at any time. Push returns the time the state // change will apply, and true if this is satisfied; else it will return // false. - Push(tick time.Time, sc Change) (time.Time, bool) + Push(tick hlc.Timestamp, sc Change) (hlc.Timestamp, bool) // Tick updates state changer to apply any changes that have occurred // between the last tick and this one. - Tick(tick time.Time, state State) + Tick(tick hlc.Timestamp, state State) } // ReplicaChange contains information necessary to add, remove or move (both) a @@ -211,7 +212,7 @@ type replicaChanger struct { lastTicket int completeAt *btree.BTree pendingTickets map[int]Change - pendingTarget map[StoreID]time.Time + pendingTarget map[StoreID]hlc.Timestamp pendingRange map[RangeID]int } @@ -221,20 +222,20 @@ func NewReplicaChanger() Changer { return &replicaChanger{ completeAt: btree.New(8), pendingTickets: make(map[int]Change), - pendingTarget: make(map[StoreID]time.Time), + pendingTarget: make(map[StoreID]hlc.Timestamp), pendingRange: make(map[RangeID]int), } } type pendingChange struct { ticket int - completeAt time.Time + completeAt hlc.Timestamp } // Less is part of the btree.Item interface. func (pc *pendingChange) Less(than btree.Item) bool { // Targettal order on (completeAt, ticket) - return pc.completeAt.Before(than.(*pendingChange).completeAt) || + return pc.completeAt.Less(than.(*pendingChange).completeAt) || (pc.completeAt.Equal(than.(*pendingChange).completeAt) && pc.ticket < than.(*pendingChange).ticket) } @@ -242,7 +243,7 @@ func (pc *pendingChange) Less(than btree.Item) bool { // state change per Range() at any time. Push returns the time the state // change will apply, and true if this is satisfied; else it will return // false. -func (rc *replicaChanger) Push(tick time.Time, change Change) (time.Time, bool) { +func (rc *replicaChanger) Push(tick hlc.Timestamp, change Change) (hlc.Timestamp, bool) { // Allow at most one pending action per range at any point in time. if _, ok := rc.pendingRange[change.Range()]; ok { return tick, false @@ -264,7 +265,7 @@ func (rc *replicaChanger) Push(tick time.Time, change Change) (time.Time, bool) } completeAt = rc.pendingTarget[change.Target()] } - completeAt = completeAt.Add(change.Delay()) + completeAt = completeAt.AddDuration(change.Delay()) // Create a unique entry (completionTime, ticket) and append it to the // completion queue. @@ -276,12 +277,12 @@ func (rc *replicaChanger) Push(tick time.Time, change Change) (time.Time, bool) // Tick updates state changer to apply any changes that have occurred // between the last tick and this one. -func (rc *replicaChanger) Tick(tick time.Time, state State) { +func (rc *replicaChanger) Tick(tick hlc.Timestamp, state State) { changeList := make(map[int]*pendingChange) // NB: Add the smallest unit of time, in order to find all items in // [smallest, tick]. - pivot := &pendingChange{completeAt: tick.Add(time.Nanosecond)} + pivot := &pendingChange{completeAt: tick.AddDuration(time.Nanosecond)} rc.completeAt.AscendLessThan(pivot, func(i btree.Item) bool { nextChange, _ := i.(*pendingChange) changeList[nextChange.ticket] = nextChange diff --git a/pkg/kv/kvserver/asim/state/helpers.go b/pkg/kv/kvserver/asim/state/helpers.go index 0c143d136713..44ee3fe98b89 100644 --- a/pkg/kv/kvserver/asim/state/helpers.go +++ b/pkg/kv/kvserver/asim/state/helpers.go @@ -36,8 +36,10 @@ func NewShuffler(seed int64) func(n int, swap func(i, j int)) { } // TestingStartTime returns a start time that may be used for tests. -func TestingStartTime() time.Time { - return time.Date(2022, 03, 21, 11, 0, 0, 0, time.UTC) +func TestingStartTime() hlc.Timestamp { + return hlc.Timestamp{ + WallTime: time.Date(2022, 03, 21, 11, 0, 0, 0, time.UTC).UnixNano(), + } } // TestingWorkloadSeed returns a seed to use for constructing a workload @@ -88,14 +90,14 @@ func NewStorePool( // OffsetTick offsets start time by adding tick number of seconds to it. // TODO(kvoli): Use a dedicated tick package, which would contain methods such // as this. Deprecating direct use of time. -func OffsetTick(start time.Time, tick int64) time.Time { - tickTime := start.Add(time.Duration(tick) * time.Second) +func OffsetTick(start hlc.Timestamp, tick int64) hlc.Timestamp { + tickTime := start.AddDuration(time.Duration(tick) * time.Second) return tickTime } // ReverseOffsetTick converts an offset time from the start time, into the // number of ticks (seconds) since the start. -func ReverseOffsetTick(start, tickTime time.Time) int64 { +func ReverseOffsetTick(start, tickTime hlc.Timestamp) int64 { offSetTickTime := tickTime.Sub(start) return int64(offSetTickTime.Seconds()) } diff --git a/pkg/kv/kvserver/asim/state/impl.go b/pkg/kv/kvserver/asim/state/impl.go index b33196c5f9ed..9100311b8fc2 100644 --- a/pkg/kv/kvserver/asim/state/impl.go +++ b/pkg/kv/kvserver/asim/state/impl.go @@ -61,7 +61,7 @@ func newState(settings *config.SimulationSettings) *state { nodes: make(map[NodeID]*node), stores: make(map[StoreID]*store), loadsplits: make(map[StoreID]LoadSplitter), - clock: &ManualSimClock{nanos: settings.StartTime.UnixNano()}, + clock: &ManualSimClock{nanos: settings.StartTime.GoTime().UnixNano()}, ranges: newRMap(), usageInfo: newClusterUsageInfo(), settings: settings, @@ -702,7 +702,7 @@ func (s *state) applyLoad(rng *rng, le workload.LoadEvent) { if !ok { return } - s.loadsplits[store.StoreID()].Record(s.clock.Now(), rng.rangeID, le) + s.loadsplits[store.StoreID()].Record(hlc.Timestamp{WallTime: s.clock.Now().UnixNano()}, rng.rangeID, le) } // ReplicaLoad returns the usage information for the Range with ID @@ -736,8 +736,8 @@ func (s *state) ClusterUsageInfo() *ClusterUsageInfo { // TickClock modifies the state Clock time to Tick. The clock is used as the // system time source for the store pools that are spawned from this state. -func (s *state) TickClock(tick time.Time) { - s.clock.Set(tick.UnixNano()) +func (s *state) TickClock(tick hlc.Timestamp) { + s.clock.Set(tick.GoTime().UnixNano()) } // UpdateStorePool modifies the state of the StorePool for the Store with @@ -766,7 +766,7 @@ func (s *state) NextReplicasFn(storeID StoreID) func() []Replica { // liveness of the Node with ID NodeID. // TODO(kvoli): Find a better home for this method, required by the storepool. func (s *state) NodeLivenessFn() storepool.NodeLivenessFunc { - nodeLivenessFn := func(nid roachpb.NodeID, now time.Time, timeUntilStoreDead time.Duration) livenesspb.NodeLivenessStatus { + nodeLivenessFn := func(nid roachpb.NodeID, now hlc.Timestamp, timeUntilStoreDead time.Duration) livenesspb.NodeLivenessStatus { // TODO(kvoli): Implement liveness records for nodes, that signal they // are dead when simulating partitions, crashes etc. return livenesspb.NodeLivenessStatus_LIVE diff --git a/pkg/kv/kvserver/asim/state/split_decider.go b/pkg/kv/kvserver/asim/state/split_decider.go index b30a3848214f..cd4cb02a5880 100644 --- a/pkg/kv/kvserver/asim/state/split_decider.go +++ b/pkg/kv/kvserver/asim/state/split_decider.go @@ -19,6 +19,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvserver/asim/workload" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/split" "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/metric" ) @@ -28,10 +29,10 @@ import ( type LoadSplitter interface { // Record records a workload event at the time given, against the range // with ID RangeID. - Record(time.Time, RangeID, workload.LoadEvent) bool + Record(hlc.Timestamp, RangeID, workload.LoadEvent) bool // SplitKey returns whether split key and true if a valid split key exists // given the recorded load, otherwise returning false. - SplitKey(time.Time, RangeID) (Key, bool) + SplitKey(hlc.Timestamp, RangeID) (Key, bool) // ClearSplitKeys returns a suggested list of ranges that should be split // due to load. Calling this function resets the list of suggestions. ClearSplitKeys() []RangeID @@ -47,7 +48,7 @@ type loadSplitConfig struct { // NewLoadBasedSplitter returns a new LoadBasedSplitter that may be used to // find the midpoint based on recorded load. func (lsc loadSplitConfig) NewLoadBasedSplitter( - startTime time.Time, _ split.SplitObjective, + startTime hlc.Timestamp, _ split.SplitObjective, ) split.LoadBasedSplitter { return split.NewUnweightedFinder(startTime, lsc.randSource) } @@ -93,7 +94,7 @@ func (s *SplitDecider) newDecider() *split.Decider { // Record records a workload event at the time given, against the range // with ID RangeID. -func (s *SplitDecider) Record(tick time.Time, rangeID RangeID, le workload.LoadEvent) bool { +func (s *SplitDecider) Record(tick hlc.Timestamp, rangeID RangeID, le workload.LoadEvent) bool { decider := s.deciders[rangeID] if decider == nil { @@ -119,7 +120,7 @@ func (s *SplitDecider) Record(tick time.Time, rangeID RangeID, le workload.LoadE // SplitKey returns whether split key and true if a valid split key exists // given the recorded load, otherwise returning false. -func (s *SplitDecider) SplitKey(tick time.Time, rangeID RangeID) (Key, bool) { +func (s *SplitDecider) SplitKey(tick hlc.Timestamp, rangeID RangeID) (Key, bool) { decider := s.deciders[rangeID] if decider == nil { return InvalidKey, false diff --git a/pkg/kv/kvserver/asim/state/split_decider_test.go b/pkg/kv/kvserver/asim/state/split_decider_test.go index bcec5a9e8e6d..cfb2eed55a11 100644 --- a/pkg/kv/kvserver/asim/state/split_decider_test.go +++ b/pkg/kv/kvserver/asim/state/split_decider_test.go @@ -59,7 +59,7 @@ func TestSplitDecider(t *testing.T) { // There should now be 1 suggested range for splitting which corresponds to // the midpoint of the testing sequence. require.Equal(t, []RangeID{1}, decider.ClearSplitKeys()) - splitKey, found = decider.SplitKey(startTime.Add(testSettings.SplitStatRetention), 1) + splitKey, found = decider.SplitKey(startTime.AddDuration(testSettings.SplitStatRetention), 1) require.True(t, found) require.Equal(t, Key(6), splitKey) diff --git a/pkg/kv/kvserver/asim/state/state.go b/pkg/kv/kvserver/asim/state/state.go index 01c47ca78879..e98d133846b0 100644 --- a/pkg/kv/kvserver/asim/state/state.go +++ b/pkg/kv/kvserver/asim/state/state.go @@ -19,6 +19,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvserver/allocator/storepool" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/asim/workload" "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/timeutil" "go.etcd.io/raft/v3" ) @@ -131,7 +132,7 @@ type State interface { // ClusterUsageInfo returns the usage information for the entire cluster. ClusterUsageInfo() *ClusterUsageInfo // TickClock modifies the state Clock time to Tick. - TickClock(time.Time) + TickClock(hlc.Timestamp) // UpdateStorePool modifies the state of the StorePool for the Store with // ID StoreID. UpdateStorePool(StoreID, map[roachpb.StoreID]*storepool.StoreDetail) diff --git a/pkg/kv/kvserver/asim/storerebalancer/store_rebalancer.go b/pkg/kv/kvserver/asim/storerebalancer/store_rebalancer.go index 1a584e3f7c38..29e1a12d1268 100644 --- a/pkg/kv/kvserver/asim/storerebalancer/store_rebalancer.go +++ b/pkg/kv/kvserver/asim/storerebalancer/store_rebalancer.go @@ -12,7 +12,6 @@ package storerebalancer import ( "context" - "time" "github.com/cockroachdb/cockroach/pkg/kv/kvserver" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/allocator/allocatorimpl" @@ -22,6 +21,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvserver/asim/op" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/asim/state" "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/util/hlc" "go.etcd.io/raft/v3" ) @@ -45,7 +45,7 @@ const ( // StoreRebalancer is a tickable actor which scans the replicas on the store // associated with it and attempts to perform lease, then, range rebalancing. type StoreRebalancer interface { - Tick(context.Context, time.Time, state.State) + Tick(context.Context, hlc.Timestamp, state.State) } // storeRebalancerState mantains the store rebalancer state used in the three @@ -62,7 +62,7 @@ type storeRebalancerState struct { pendingTransferTarget roachpb.ReplicaDescriptor pendingTicket op.DispatchedTicket - lastTick time.Time + lastTick hlc.Timestamp } type storeRebalancerControl struct { @@ -78,7 +78,7 @@ type storeRebalancerControl struct { // NewStoreRebalancer returns a new simulator store rebalancer. func NewStoreRebalancer( - start time.Time, + start hlc.Timestamp, storeID state.StoreID, controller op.Controller, allocator allocatorimpl.Allocator, @@ -90,7 +90,7 @@ func NewStoreRebalancer( } func newStoreRebalancerControl( - start time.Time, + start hlc.Timestamp, storeID state.StoreID, controller op.Controller, allocator allocatorimpl.Allocator, @@ -112,7 +112,7 @@ func newStoreRebalancerControl( sr: sr, settings: settings, rebalancerState: &storeRebalancerState{ - lastTick: start.Add(-settings.LBRebalancingInterval), + lastTick: start.AddDuration(-settings.LBRebalancingInterval), }, storeID: storeID, allocator: allocator, @@ -144,11 +144,11 @@ func (src *storeRebalancerControl) scorerOptions() *allocatorimpl.LoadScorerOpti } } -func (src *storeRebalancerControl) checkPendingTicket() (done bool, next time.Time, _ error) { +func (src *storeRebalancerControl) checkPendingTicket() (done bool, next hlc.Timestamp, _ error) { ticket := src.rebalancerState.pendingTicket op, ok := src.controller.Check(ticket) if !ok { - return true, time.Time{}, nil + return true, hlc.Timestamp{}, nil } done, next = op.Done() if !done { @@ -157,7 +157,9 @@ func (src *storeRebalancerControl) checkPendingTicket() (done bool, next time.Ti return true, next, op.Errors() } -func (src *storeRebalancerControl) Tick(ctx context.Context, tick time.Time, state state.State) { +func (src *storeRebalancerControl) Tick( + ctx context.Context, tick hlc.Timestamp, state state.State, +) { src.sr.AddLogTag("tick", tick) ctx = src.sr.ResetAndAnnotateCtx(ctx) switch src.rebalancerState.phase { @@ -172,8 +174,10 @@ func (src *storeRebalancerControl) Tick(ctx context.Context, tick time.Time, sta // phaseSleep checks whether the store rebalancer should continue sleeping. If // not, it performs a state transfer to prologue. -func (src *storeRebalancerControl) phaseSleep(ctx context.Context, tick time.Time, s state.State) { - sleepedTick := src.rebalancerState.lastTick.Add(src.settings.LBRebalancingInterval) +func (src *storeRebalancerControl) phaseSleep( + ctx context.Context, tick hlc.Timestamp, s state.State, +) { + sleepedTick := src.rebalancerState.lastTick.AddDuration(src.settings.LBRebalancingInterval) if tick.After(sleepedTick) { src.rebalancerState.lastTick = sleepedTick src.phasePrologue(ctx, tick, s) @@ -185,7 +189,7 @@ func (src *storeRebalancerControl) phaseSleep(ctx context.Context, tick time.Tim // if it passes the should rebalance store check, otherwise it transfers // directly into the epilogue phase. func (src *storeRebalancerControl) phasePrologue( - ctx context.Context, tick time.Time, s state.State, + ctx context.Context, tick hlc.Timestamp, s state.State, ) { rctx := src.sr.NewRebalanceContext( ctx, src.scorerOptions(), @@ -245,7 +249,7 @@ func (src *storeRebalancerControl) checkPendingLeaseRebalance(ctx context.Contex func (src *storeRebalancerControl) applyLeaseRebalance( ctx context.Context, - tick time.Time, + tick hlc.Timestamp, s state.State, candidateReplica kvserver.CandidateReplica, target roachpb.ReplicaDescriptor, @@ -266,7 +270,7 @@ func (src *storeRebalancerControl) applyLeaseRebalance( } func (src *storeRebalancerControl) phaseLeaseRebalancing( - ctx context.Context, tick time.Time, s state.State, + ctx context.Context, tick hlc.Timestamp, s state.State, ) { for { // Check the pending transfer state, if we can't continue to searching @@ -336,7 +340,7 @@ func (src *storeRebalancerControl) checkPendingRangeRebalance(ctx context.Contex func (src *storeRebalancerControl) applyRangeRebalance( ctx context.Context, - tick time.Time, + tick hlc.Timestamp, s state.State, candidateReplica kvserver.CandidateReplica, voterTargets, nonVoterTargets []roachpb.ReplicationTarget, @@ -359,7 +363,7 @@ func (src *storeRebalancerControl) applyRangeRebalance( } func (src *storeRebalancerControl) phaseRangeRebalancing( - ctx context.Context, tick time.Time, s state.State, + ctx context.Context, tick hlc.Timestamp, s state.State, ) { for { // Check the pending range rebalance state, if we can't continue to @@ -386,7 +390,7 @@ func (src *storeRebalancerControl) phaseRangeRebalancing( // phaseEpilogue clears the rebalancing context and updates the last tick // interval. This transfers into a sleeping phase. -func (src *storeRebalancerControl) phaseEpilogue(ctx context.Context, tick time.Time) { +func (src *storeRebalancerControl) phaseEpilogue(ctx context.Context, tick hlc.Timestamp) { src.rebalancerState.phase = rebalancerSleeping src.rebalancerState.rctx = nil src.rebalancerState.lastTick = tick diff --git a/pkg/kv/kvserver/asim/workload/BUILD.bazel b/pkg/kv/kvserver/asim/workload/BUILD.bazel index 027bac06bbc3..e9c3094ec869 100644 --- a/pkg/kv/kvserver/asim/workload/BUILD.bazel +++ b/pkg/kv/kvserver/asim/workload/BUILD.bazel @@ -6,6 +6,7 @@ go_library( srcs = ["workload.go"], importpath = "github.com/cockroachdb/cockroach/pkg/kv/kvserver/asim/workload", visibility = ["//visibility:public"], + deps = ["//pkg/util/hlc"], ) go_test( @@ -13,7 +14,10 @@ go_test( srcs = ["workload_test.go"], args = ["-test.timeout=295s"], embed = [":workload"], - deps = ["@com_github_stretchr_testify//require"], + deps = [ + "//pkg/util/hlc", + "@com_github_stretchr_testify//require", + ], ) get_x_data(name = "get_x_data") diff --git a/pkg/kv/kvserver/asim/workload/workload.go b/pkg/kv/kvserver/asim/workload/workload.go index 831878ef83ec..93e65da78f49 100644 --- a/pkg/kv/kvserver/asim/workload/workload.go +++ b/pkg/kv/kvserver/asim/workload/workload.go @@ -14,7 +14,8 @@ import ( "fmt" "math/rand" "sort" - "time" + + "github.com/cockroachdb/cockroach/pkg/util/hlc" ) // LoadEvent represent a key access that generates load against the database. @@ -51,7 +52,7 @@ func (lb LoadBatch) Len() int { type Generator interface { // Tick returns the load events up till time tick, from the last time the // workload generator was called. - Tick(tick time.Time) LoadBatch + Tick(tick hlc.Timestamp) LoadBatch } // RandomGenerator generates random operations within some limits. @@ -59,7 +60,7 @@ type RandomGenerator struct { seed int64 keyGenerator KeyGenerator rand *rand.Rand - lastRun time.Time + lastRun hlc.Timestamp rollsPerSecond float64 readRatio float64 maxSize int @@ -69,7 +70,7 @@ type RandomGenerator struct { // NewRandomGenerator returns a generator that generates random operations // within some limits. func NewRandomGenerator( - start time.Time, + start hlc.Timestamp, seed int64, keyGenerator KeyGenerator, rate float64, @@ -83,7 +84,7 @@ func NewRandomGenerator( // newRandomGenerator returns a generator that generates random operations // within some limits. func newRandomGenerator( - start time.Time, + start hlc.Timestamp, seed int64, keyGenerator KeyGenerator, rate float64, @@ -105,7 +106,7 @@ func newRandomGenerator( // Tick returns the load events up till time tick, from the last time the // workload generator was called. -func (rwg *RandomGenerator) Tick(maxTime time.Time) LoadBatch { +func (rwg *RandomGenerator) Tick(maxTime hlc.Timestamp) LoadBatch { elapsed := maxTime.Sub(rwg.lastRun).Seconds() count := int(elapsed * rwg.rollsPerSecond) // Do not attempt to generate additional load events if the elapsed @@ -244,7 +245,9 @@ func (g *zipfianGenerator) rand() *rand.Rand { // TestCreateWorkloadGenerator creates a simple uniform workload generator that // will generate load events at the rate given. The read ratio is fixed to // 0.95. -func TestCreateWorkloadGenerator(seed int64, start time.Time, rate int, keySpan int64) Generator { +func TestCreateWorkloadGenerator( + seed int64, start hlc.Timestamp, rate int, keySpan int64, +) Generator { readRatio := 0.95 minWriteSize := 128 maxWriteSize := 256 diff --git a/pkg/kv/kvserver/asim/workload/workload_test.go b/pkg/kv/kvserver/asim/workload/workload_test.go index de6f635abb5a..7fbc605770d1 100644 --- a/pkg/kv/kvserver/asim/workload/workload_test.go +++ b/pkg/kv/kvserver/asim/workload/workload_test.go @@ -17,6 +17,7 @@ import ( "testing" "time" + "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/stretchr/testify/require" ) @@ -144,11 +145,11 @@ func TestRandWorkloadGenerator(t *testing.T) { }, } - start := time.Date(2022, 03, 21, 11, 0, 0, 0, time.UTC) + start := hlc.Timestamp{WallTime: time.Date(2022, 03, 21, 11, 0, 0, 0, time.UTC).UnixNano()} for _, tc := range testCases { workLoadGenerator := newRandomGenerator(start, testingSeed, tc.keyGenerator, tc.rate, tc.readRatio, tc.maxSize, tc.minSize) workLoadGenerator.lastRun = start - end := start.Add(tc.duration) + end := start.AddDuration(tc.duration) // Generate the workload events. ops := workLoadGenerator.Tick(end) diff --git a/pkg/kv/kvserver/client_merge_test.go b/pkg/kv/kvserver/client_merge_test.go index 0fe173995273..a3ceb3134937 100644 --- a/pkg/kv/kvserver/client_merge_test.go +++ b/pkg/kv/kvserver/client_merge_test.go @@ -4422,8 +4422,8 @@ func TestMergeQueue(t *testing.T) { // not leak over between subtests. Then, bump the manual clock so that // both range's load-based splitters consider their measurements to be // reliable. - lhs().LoadBasedSplitter().Reset(tc.Servers[0].Clock().PhysicalTime()) - rhs().LoadBasedSplitter().Reset(tc.Servers[1].Clock().PhysicalTime()) + lhs().LoadBasedSplitter().Reset(tc.Servers[0].Clock().Now()) + rhs().LoadBasedSplitter().Reset(tc.Servers[1].Clock().Now()) manualClock.Increment(splitByLoadMergeDelay.Nanoseconds()) } for _, splitObjective := range []kvserver.LBRebalancingObjective{ @@ -4434,7 +4434,7 @@ func TestMergeQueue(t *testing.T) { t.Run(fmt.Sprintf("unreliable-lhs-%s", splitObjective.ToDimension().String()), func(t *testing.T) { resetForLoadBasedSubtest(t) - lhs().LoadBasedSplitter().Reset(tc.Servers[0].Clock().PhysicalTime()) + lhs().LoadBasedSplitter().Reset(tc.Servers[0].Clock().Now()) clearRange(t, lhsStartKey, rhsEndKey) verifyUnmergedSoon(t, store, lhsStartKey, rhsStartKey) @@ -4443,7 +4443,7 @@ func TestMergeQueue(t *testing.T) { t.Run(fmt.Sprintf("unreliable-rhs-%s", splitObjective.ToDimension().String()), func(t *testing.T) { resetForLoadBasedSubtest(t) - rhs().LoadBasedSplitter().Reset(tc.Servers[1].Clock().PhysicalTime()) + rhs().LoadBasedSplitter().Reset(tc.Servers[1].Clock().Now()) clearRange(t, lhsStartKey, rhsEndKey) verifyUnmergedSoon(t, store, lhsStartKey, rhsStartKey) @@ -4453,8 +4453,8 @@ func TestMergeQueue(t *testing.T) { resetForLoadBasedSubtest(t) moreThanHalfStat := mergeByLoadStat/2 + 1 - rhs().LoadBasedSplitter().RecordMax(tc.Servers[0].Clock().PhysicalTime(), moreThanHalfStat) - lhs().LoadBasedSplitter().RecordMax(tc.Servers[1].Clock().PhysicalTime(), moreThanHalfStat) + rhs().LoadBasedSplitter().RecordMax(tc.Servers[0].Clock().Now(), moreThanHalfStat) + lhs().LoadBasedSplitter().RecordMax(tc.Servers[1].Clock().Now(), moreThanHalfStat) clearRange(t, lhsStartKey, rhsEndKey) verifyUnmergedSoon(t, store, lhsStartKey, rhsStartKey) @@ -4465,8 +4465,8 @@ func TestMergeQueue(t *testing.T) { manualClock.Increment(splitByLoadMergeDelay.Nanoseconds()) lessThanHalfStat := mergeByLoadStat/2 - 1 - rhs().LoadBasedSplitter().RecordMax(tc.Servers[0].Clock().PhysicalTime(), lessThanHalfStat) - lhs().LoadBasedSplitter().RecordMax(tc.Servers[1].Clock().PhysicalTime(), lessThanHalfStat) + rhs().LoadBasedSplitter().RecordMax(tc.Servers[0].Clock().Now(), lessThanHalfStat) + lhs().LoadBasedSplitter().RecordMax(tc.Servers[1].Clock().Now(), lessThanHalfStat) clearRange(t, lhsStartKey, rhsEndKey) verifyMergedSoon(t, store, lhsStartKey, rhsStartKey) @@ -4495,8 +4495,8 @@ func TestMergeQueue(t *testing.T) { resetForLoadBasedSubtest(t) moreThanHalfStat := mergeByLoadStat/2 + 1 - rhs().LoadBasedSplitter().RecordMax(tc.Servers[0].Clock().PhysicalTime(), moreThanHalfStat) - lhs().LoadBasedSplitter().RecordMax(tc.Servers[1].Clock().PhysicalTime(), moreThanHalfStat) + rhs().LoadBasedSplitter().RecordMax(tc.Servers[0].Clock().Now(), moreThanHalfStat) + lhs().LoadBasedSplitter().RecordMax(tc.Servers[1].Clock().Now(), moreThanHalfStat) clearRange(t, lhsStartKey, rhsEndKey) // Switch the dimension, so that any recorded load should @@ -4515,8 +4515,8 @@ func TestMergeQueue(t *testing.T) { manualClock.Increment(splitByLoadMergeDelay.Nanoseconds()) lessThanHalfStat := mergeByLoadStat/2 - 1 - rhs().LoadBasedSplitter().RecordMax(tc.Servers[0].Clock().PhysicalTime(), lessThanHalfStat) - lhs().LoadBasedSplitter().RecordMax(tc.Servers[1].Clock().PhysicalTime(), lessThanHalfStat) + rhs().LoadBasedSplitter().RecordMax(tc.Servers[0].Clock().Now(), lessThanHalfStat) + lhs().LoadBasedSplitter().RecordMax(tc.Servers[1].Clock().Now(), lessThanHalfStat) clearRange(t, lhsStartKey, rhsEndKey) setSplitObjective(secondSplitObjective) diff --git a/pkg/kv/kvserver/client_replica_circuit_breaker_test.go b/pkg/kv/kvserver/client_replica_circuit_breaker_test.go index d3540c62f219..2e4894419eab 100644 --- a/pkg/kv/kvserver/client_replica_circuit_breaker_test.go +++ b/pkg/kv/kvserver/client_replica_circuit_breaker_test.go @@ -884,7 +884,7 @@ func (cbt *circuitBreakerTest) ExpireAllLeasesAndN1LivenessRecord( testutils.SucceedsSoon(t, func() error { self, ok := lv.Self() require.True(t, ok) - if self.IsLive(cbt.Server(n2).Clock().Now().GoTime()) { + if self.IsLive(cbt.Server(n2).Clock().Now()) { // Someone else must have incremented epoch. return nil } diff --git a/pkg/kv/kvserver/helpers_test.go b/pkg/kv/kvserver/helpers_test.go index 234e5cf72d72..42e6f8aea0d8 100644 --- a/pkg/kv/kvserver/helpers_test.go +++ b/pkg/kv/kvserver/helpers_test.go @@ -225,7 +225,7 @@ func NewTestStorePool(cfg StoreConfig) *storepool.StorePool { func() int { return 1 }, - func(roachpb.NodeID, time.Time, time.Duration) livenesspb.NodeLivenessStatus { + func(roachpb.NodeID, hlc.Timestamp, time.Duration) livenesspb.NodeLivenessStatus { return livenesspb.NodeLivenessStatus_LIVE }, /* deterministic */ false, diff --git a/pkg/kv/kvserver/liveness/liveness.go b/pkg/kv/kvserver/liveness/liveness.go index b25aabf136e1..a09d90721986 100644 --- a/pkg/kv/kvserver/liveness/liveness.go +++ b/pkg/kv/kvserver/liveness/liveness.go @@ -699,7 +699,7 @@ func (nl *NodeLiveness) IsLive(nodeID roachpb.NodeID) (bool, error) { } // NB: We use clock.Now().GoTime() instead of clock.PhysicalTime() in order to // consider clock signals from other nodes. - return liveness.IsLive(nl.clock.Now().GoTime()), nil + return liveness.IsLive(nl.clock.Now()), nil } // IsAvailable returns whether or not the specified node is available to serve @@ -708,7 +708,7 @@ func (nl *NodeLiveness) IsLive(nodeID roachpb.NodeID) (bool, error) { // Returns false if the node is not in the local liveness table. func (nl *NodeLiveness) IsAvailable(nodeID roachpb.NodeID) bool { liveness, ok := nl.GetLiveness(nodeID) - return ok && liveness.IsLive(nl.clock.Now().GoTime()) && !liveness.Membership.Decommissioned() + return ok && liveness.IsLive(nl.clock.Now()) && !liveness.Membership.Decommissioned() } // IsAvailableNotDraining returns whether or not the specified node is available @@ -720,7 +720,7 @@ func (nl *NodeLiveness) IsAvailable(nodeID roachpb.NodeID) bool { func (nl *NodeLiveness) IsAvailableNotDraining(nodeID roachpb.NodeID) bool { liveness, ok := nl.GetLiveness(nodeID) return ok && - liveness.IsLive(nl.clock.Now().GoTime()) && + liveness.IsLive(nl.clock.Now()) && !liveness.Membership.Decommissioning() && !liveness.Membership.Decommissioned() && !liveness.Draining @@ -1007,7 +1007,7 @@ func (nl *NodeLiveness) heartbeatInternal( // return a success even if the expiration is only 5 seconds in the // future. The next heartbeat will then start with only 0.5 seconds // before expiration. - if actual.IsLive(nl.clock.Now().GoTime()) && !incrementEpoch { + if actual.IsLive(nl.clock.Now()) && !incrementEpoch { return errNodeAlreadyLive } // Otherwise, return error. @@ -1055,7 +1055,7 @@ func (nl *NodeLiveness) GetIsLiveMap() livenesspb.IsLiveMap { lMap := livenesspb.IsLiveMap{} nl.mu.RLock() defer nl.mu.RUnlock() - now := nl.clock.Now().GoTime() + now := nl.clock.Now() for nID, l := range nl.mu.nodes { isLive := l.IsLive(now) if !isLive && !l.Membership.Active() { @@ -1215,7 +1215,7 @@ func (nl *NodeLiveness) IncrementEpoch(ctx context.Context, liveness livenesspb. <-sem }() - if liveness.IsLive(nl.clock.Now().GoTime()) { + if liveness.IsLive(nl.clock.Now()) { return errors.Errorf("cannot increment epoch on live node: %+v", liveness) } @@ -1445,7 +1445,7 @@ func (nl *NodeLiveness) maybeUpdate(ctx context.Context, newLivenessRec Record) return } - now := nl.clock.Now().GoTime() + now := nl.clock.Now() if !oldLivenessRec.IsLive(now) && newLivenessRec.IsLive(now) { for _, fn := range onIsLive { fn(newLivenessRec.Liveness) @@ -1524,7 +1524,7 @@ func (nl *NodeLiveness) numLiveNodes() int64 { if !ok { return 0 } - now := nl.clock.Now().GoTime() + now := nl.clock.Now() // If this node isn't live, we don't want to report its view of node liveness // because it's more likely to be inaccurate than the view of a live node. if !self.IsLive(now) { diff --git a/pkg/kv/kvserver/liveness/livenesspb/BUILD.bazel b/pkg/kv/kvserver/liveness/livenesspb/BUILD.bazel index edaaaa99f792..562c74e854d3 100644 --- a/pkg/kv/kvserver/liveness/livenesspb/BUILD.bazel +++ b/pkg/kv/kvserver/liveness/livenesspb/BUILD.bazel @@ -11,7 +11,7 @@ go_library( visibility = ["//visibility:public"], deps = [ "//pkg/roachpb", - "//pkg/util/timeutil", + "//pkg/util/hlc", "@org_golang_google_grpc//codes", "@org_golang_google_grpc//status", ], diff --git a/pkg/kv/kvserver/liveness/livenesspb/liveness.go b/pkg/kv/kvserver/liveness/livenesspb/liveness.go index d6184c1d56de..dfea30af2a9f 100644 --- a/pkg/kv/kvserver/liveness/livenesspb/liveness.go +++ b/pkg/kv/kvserver/liveness/livenesspb/liveness.go @@ -15,7 +15,7 @@ import ( "time" "github.com/cockroachdb/cockroach/pkg/roachpb" - "github.com/cockroachdb/cockroach/pkg/util/timeutil" + "github.com/cockroachdb/cockroach/pkg/util/hlc" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" ) @@ -30,18 +30,16 @@ import ( // considered expired. For that purpose, it's better to pass in // clock.Now().GoTime() rather than clock.PhysicalNow() - the former takes into // consideration clock signals from other nodes, the latter doesn't. -func (l *Liveness) IsLive(now time.Time) bool { - expiration := timeutil.Unix(0, l.Expiration.WallTime) - return now.Before(expiration) +func (l *Liveness) IsLive(now hlc.Timestamp) bool { + return now.Less(l.Expiration.ToTimestamp()) } // IsDead returns true if the liveness expired more than threshold ago. // // Note that, because of threshold, IsDead() is not the inverse of IsLive(). -func (l *Liveness) IsDead(now time.Time, threshold time.Duration) bool { - expiration := timeutil.Unix(0, l.Expiration.WallTime) - deadAsOf := expiration.Add(threshold) - return !now.Before(deadAsOf) +func (l *Liveness) IsDead(now hlc.Timestamp, threshold time.Duration) bool { + expiration := l.Expiration.ToTimestamp().AddDuration(threshold) + return !now.Less(expiration) } // Compare returns an integer comparing two pieces of liveness information, diff --git a/pkg/kv/kvserver/merge_queue.go b/pkg/kv/kvserver/merge_queue.go index b986b5fa7cdd..8b8dedefc14b 100644 --- a/pkg/kv/kvserver/merge_queue.go +++ b/pkg/kv/kvserver/merge_queue.go @@ -276,7 +276,7 @@ func (mq *mergeQueue) process( mergedStats := lhsStats mergedStats.Add(rhsStats) - lhsLoadSplitSnap := lhsRepl.loadBasedSplitter.Snapshot(ctx, mq.store.Clock().PhysicalTime()) + lhsLoadSplitSnap := lhsRepl.loadBasedSplitter.Snapshot(ctx, mq.store.Clock().Now()) var loadMergeReason string if lhsRepl.SplitByLoadEnabled() { var canMergeLoad bool @@ -428,7 +428,7 @@ func (mq *mergeQueue) process( // could just Reset the splitter, but then we'd need to wait out a full // measurement period (default of 5m) before merging this range again. if mergedLoadSplitStat := lhsLoadSplitSnap.Max + rhsLoadSplitSnap.Max; mergedLoadSplitStat != 0 { - lhsRepl.loadBasedSplitter.RecordMax(mq.store.Clock().PhysicalTime(), mergedLoadSplitStat) + lhsRepl.loadBasedSplitter.RecordMax(mq.store.Clock().Now(), mergedLoadSplitStat) } return true, nil } diff --git a/pkg/kv/kvserver/replica.go b/pkg/kv/kvserver/replica.go index 6e4235c926bd..62efa7e26fda 100644 --- a/pkg/kv/kvserver/replica.go +++ b/pkg/kv/kvserver/replica.go @@ -1323,7 +1323,7 @@ func (r *Replica) SetMVCCStatsForTesting(stats *enginepb.MVCCStats) { // // Use LoadStats.QueriesPerSecond for all other purposes. func (r *Replica) GetMaxSplitQPS(ctx context.Context) (float64, bool) { - snap := r.loadBasedSplitter.Snapshot(ctx, r.Clock().PhysicalTime()) + snap := r.loadBasedSplitter.Snapshot(ctx, r.Clock().Now()) if snap.SplitObjective != split.SplitQPS { return 0, false @@ -1341,7 +1341,7 @@ func (r *Replica) GetMaxSplitQPS(ctx context.Context) (float64, bool) { // Use LoadStats.RaftCPUNanosPerSecond and RequestCPUNanosPerSecond for current // CPU stats for all other purposes. func (r *Replica) GetMaxSplitCPU(ctx context.Context) (float64, bool) { - snap := r.loadBasedSplitter.Snapshot(ctx, r.Clock().PhysicalTime()) + snap := r.loadBasedSplitter.Snapshot(ctx, r.Clock().Now()) if snap.SplitObjective != split.SplitCPU { return 0, false diff --git a/pkg/kv/kvserver/replica_consistency.go b/pkg/kv/kvserver/replica_consistency.go index 99786884b2d4..ded59eacbca6 100644 --- a/pkg/kv/kvserver/replica_consistency.go +++ b/pkg/kv/kvserver/replica_consistency.go @@ -165,7 +165,7 @@ func (r *Replica) checkConsistencyImpl( // match. It helps to further check that the recomputed MVCC stats match the // stored stats. // - // Both Persisted and Delta stats were computed deterministically from the + // Both Persisted and Sub stats were computed deterministically from the // data fed into the checksum, so if all checksums match, we can take the // stats from an arbitrary replica that succeeded. // diff --git a/pkg/kv/kvserver/replica_proposal.go b/pkg/kv/kvserver/replica_proposal.go index b0707d4514aa..9fac1b47e308 100644 --- a/pkg/kv/kvserver/replica_proposal.go +++ b/pkg/kv/kvserver/replica_proposal.go @@ -330,7 +330,7 @@ func (r *Replica) leasePostApplyLocked( if r.loadStats != nil { r.loadStats.Reset() } - r.loadBasedSplitter.Reset(r.Clock().PhysicalTime()) + r.loadBasedSplitter.Reset(r.Clock().Now()) } // Inform the concurrency manager that the lease holder has been updated. diff --git a/pkg/kv/kvserver/replica_raft.go b/pkg/kv/kvserver/replica_raft.go index 11d96ed5df9a..6b4926f0ed21 100644 --- a/pkg/kv/kvserver/replica_raft.go +++ b/pkg/kv/kvserver/replica_raft.go @@ -2155,7 +2155,7 @@ func shouldCampaignOnLeaseRequestRedirect( // Store.updateLivenessMap). We only care whether the leader is currently live // according to node liveness because this determines whether it will be able // to acquire an epoch-based lease. - return !livenessEntry.Liveness.IsLive(now.GoTime()) + return !livenessEntry.Liveness.IsLive(now) } func (r *Replica) campaignLocked(ctx context.Context) { diff --git a/pkg/kv/kvserver/replica_split_load.go b/pkg/kv/kvserver/replica_split_load.go index 24adbc238cd2..a5dd311fe500 100644 --- a/pkg/kv/kvserver/replica_split_load.go +++ b/pkg/kv/kvserver/replica_split_load.go @@ -22,6 +22,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/settings" "github.com/cockroachdb/cockroach/pkg/settings/cluster" + "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/errors" ) @@ -101,7 +102,7 @@ func newReplicaSplitConfig(st *cluster.Settings) *replicaSplitConfig { // NewLoadBasedSplitter returns a new LoadBasedSplitter that may be used to // find the midpoint based on recorded load. func (c *replicaSplitConfig) NewLoadBasedSplitter( - startTime time.Time, obj split.SplitObjective, + startTime hlc.Timestamp, obj split.SplitObjective, ) split.LoadBasedSplitter { switch obj { case split.SplitQPS: @@ -246,7 +247,7 @@ func (r *Replica) recordBatchForLoadBasedSplitting( return getResponseBoundarySpan(ba, br) } - shouldInitSplit := r.loadBasedSplitter.Record(ctx, r.Clock().PhysicalTime(), loadFn, spanFn) + shouldInitSplit := r.loadBasedSplitter.Record(ctx, r.Clock().Now(), loadFn, spanFn) if shouldInitSplit { r.store.splitQueue.MaybeAddAsync(ctx, r, r.store.Clock().NowAsClockTimestamp()) } @@ -255,8 +256,8 @@ func (r *Replica) recordBatchForLoadBasedSplitting( // loadSplitKey returns a suggested load split key for the range if it exists, // otherwise it returns nil. If there were any errors encountered when // validating the split key, the error is returned as well. -func (r *Replica) loadSplitKey(ctx context.Context, now time.Time) roachpb.Key { - splitKey := r.loadBasedSplitter.MaybeSplitKey(ctx, now) +func (r *Replica) loadSplitKey(ctx context.Context, timestamp hlc.Timestamp) roachpb.Key { + splitKey := r.loadBasedSplitter.MaybeSplitKey(ctx, timestamp) if splitKey == nil { return nil } diff --git a/pkg/kv/kvserver/replicate_queue_test.go b/pkg/kv/kvserver/replicate_queue_test.go index 3ae162fedfc7..7f5cf5dc8b9e 100644 --- a/pkg/kv/kvserver/replicate_queue_test.go +++ b/pkg/kv/kvserver/replicate_queue_test.go @@ -1054,7 +1054,7 @@ func TestReplicateQueueDeadNonVoters(t *testing.T) { }, NodeLiveness: kvserver.NodeLivenessTestingKnobs{ StorePoolNodeLivenessFn: func( - id roachpb.NodeID, now time.Time, duration time.Duration, + id roachpb.NodeID, now hlc.Timestamp, duration time.Duration, ) livenesspb.NodeLivenessStatus { val := livenessTrap.Load() if val == nil { diff --git a/pkg/kv/kvserver/split/BUILD.bazel b/pkg/kv/kvserver/split/BUILD.bazel index 26de0dd675e3..a3f2c85c7961 100644 --- a/pkg/kv/kvserver/split/BUILD.bazel +++ b/pkg/kv/kvserver/split/BUILD.bazel @@ -14,6 +14,7 @@ go_library( deps = [ "//pkg/keys", "//pkg/roachpb", + "//pkg/util/hlc", "//pkg/util/humanizeutil", "//pkg/util/log", "//pkg/util/metric", @@ -36,6 +37,7 @@ go_test( "//pkg/keys", "//pkg/roachpb", "//pkg/util/encoding", + "//pkg/util/hlc", "//pkg/util/leaktest", "//pkg/util/metric", "//pkg/util/stop", diff --git a/pkg/kv/kvserver/split/decider.go b/pkg/kv/kvserver/split/decider.go index c321623684d1..32510e3b9de9 100644 --- a/pkg/kv/kvserver/split/decider.go +++ b/pkg/kv/kvserver/split/decider.go @@ -20,6 +20,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/metric" "github.com/cockroachdb/cockroach/pkg/util/syncutil" @@ -40,7 +41,7 @@ type LoadBasedSplitter interface { // Ready checks if the LoadBasedSplitter has been initialized with a // sufficient sample duration. - Ready(nowTime time.Time) bool + Ready(nowTime hlc.Timestamp) bool // NoSplitKeyCauseLogMsg returns a log message containing information on the // number of samples that don't pass each split key requirement if not all @@ -56,7 +57,7 @@ type LoadBasedSplitter interface { type LoadSplitConfig interface { // NewLoadBasedSplitter returns a new LoadBasedSplitter that may be used to // find the midpoint based on recorded load. - NewLoadBasedSplitter(time.Time, SplitObjective) LoadBasedSplitter + NewLoadBasedSplitter(hlc.Timestamp, SplitObjective) LoadBasedSplitter // StatRetention returns the duration that recorded load is to be retained. StatRetention() time.Duration // StatThreshold returns the threshold for load above which the range @@ -144,19 +145,19 @@ type Decider struct { objective SplitObjective // supplied to Init // Fields tracking the current qps sample. - lastStatRollover time.Time // most recent time recorded by requests. - lastStatVal float64 // last reqs/s rate as of lastStatRollover - count int64 // number of requests recorded since last rollover + lastStatRollover hlc.Timestamp // most recent time recorded by requests. + lastStatVal float64 // last reqs/s rate as of lastStatRollover + count int64 // number of requests recorded since last rollover // Fields tracking historical qps samples. maxStat maxStatTracker // Fields tracking split key suggestions. splitFinder LoadBasedSplitter // populated when engaged or decided - lastSplitSuggestion time.Time // last stipulation to client to carry out split + lastSplitSuggestion hlc.Timestamp // last stipulation to client to carry out split // Fields tracking logging / metrics around load-based splitter split key. - lastNoSplitKeyLoggingMetrics time.Time + lastNoSplitKeyLoggingMetrics hlc.Timestamp } } @@ -184,7 +185,7 @@ func Init( // disappear as more keys are sampled) and should be initiated by the caller, // which can call MaybeSplitKey to retrieve the suggested key. func (d *Decider) Record( - ctx context.Context, now time.Time, load func(SplitObjective) int, span func() roachpb.Span, + ctx context.Context, now hlc.Timestamp, load func(SplitObjective) int, span func() roachpb.Span, ) bool { d.mu.Lock() defer d.mu.Unlock() @@ -193,12 +194,12 @@ func (d *Decider) Record( } func (d *Decider) recordLocked( - ctx context.Context, now time.Time, n int, span func() roachpb.Span, + ctx context.Context, now hlc.Timestamp, n int, span func() roachpb.Span, ) bool { d.mu.count += int64(n) // First compute requests per second since the last check. - if d.mu.lastStatRollover.IsZero() { + if d.mu.lastStatRollover.IsEmpty() { d.mu.lastStatRollover = now } elapsedSinceLastSample := now.Sub(d.mu.lastStatRollover) @@ -260,7 +261,7 @@ func (d *Decider) recordLocked( // RecordMax adds a stat measurement directly into the Decider's historical // stat value tracker. The stat sample is considered to have been captured at // the provided time. -func (d *Decider) RecordMax(now time.Time, qps float64) { +func (d *Decider) RecordMax(now hlc.Timestamp, qps float64) { d.mu.Lock() defer d.mu.Unlock() @@ -268,7 +269,7 @@ func (d *Decider) RecordMax(now time.Time, qps float64) { } // lastStatLocked returns the most recent stat measurement. -func (d *Decider) lastStatLocked(ctx context.Context, now time.Time) float64 { +func (d *Decider) lastStatLocked(ctx context.Context, now hlc.Timestamp) float64 { d.recordLocked(ctx, now, 0, nil) // force stat computation return d.mu.lastStatVal } @@ -276,7 +277,7 @@ func (d *Decider) lastStatLocked(ctx context.Context, now time.Time) float64 { // maxStatLocked returns the maximum stat measurement recorded over the retention // period. If the Decider has not been recording for a full retention period, // the method returns false. -func (d *Decider) maxStatLocked(ctx context.Context, now time.Time) (float64, bool) { +func (d *Decider) maxStatLocked(ctx context.Context, now hlc.Timestamp) (float64, bool) { d.recordLocked(ctx, now, 0, nil) // force stat computation return d.mu.maxStat.max(now, d.config.StatRetention()) } @@ -286,7 +287,7 @@ func (d *Decider) maxStatLocked(ctx context.Context, now time.Time) (float64, bo // or if it wasn't able to determine a suitable split key. // // It is legal to call MaybeSplitKey at any time. -func (d *Decider) MaybeSplitKey(ctx context.Context, now time.Time) roachpb.Key { +func (d *Decider) MaybeSplitKey(ctx context.Context, now hlc.Timestamp) roachpb.Key { var key roachpb.Key d.mu.Lock() @@ -336,26 +337,26 @@ func (d *Decider) MaybeSplitKey(ctx context.Context, now time.Time) roachpb.Key // Reset deactivates any current attempt at determining a split key. The method // also discards any historical stat tracking information. -func (d *Decider) Reset(now time.Time) { +func (d *Decider) Reset(now hlc.Timestamp) { d.mu.Lock() defer d.mu.Unlock() d.resetLocked(now) } -func (d *Decider) resetLocked(now time.Time) { - d.mu.lastStatRollover = time.Time{} +func (d *Decider) resetLocked(now hlc.Timestamp) { + d.mu.lastStatRollover = hlc.Timestamp{} d.mu.lastStatVal = 0 d.mu.count = 0 d.mu.maxStat.reset(now, d.config.StatRetention()) d.mu.splitFinder = nil - d.mu.lastSplitSuggestion = time.Time{} - d.mu.lastNoSplitKeyLoggingMetrics = time.Time{} + d.mu.lastSplitSuggestion = hlc.Timestamp{} + d.mu.lastNoSplitKeyLoggingMetrics = hlc.Timestamp{} } // SetSplitObjective sets the decider split objective to the given value and // discards any existing state. -func (d *Decider) SetSplitObjective(now time.Time, obj SplitObjective) { +func (d *Decider) SetSplitObjective(now hlc.Timestamp, obj SplitObjective) { d.mu.Lock() defer d.mu.Unlock() @@ -381,7 +382,7 @@ type LoadSplitSnapshot struct { } // Snapshot returns a consistent snapshot of the decider state. -func (d *Decider) Snapshot(ctx context.Context, now time.Time) LoadSplitSnapshot { +func (d *Decider) Snapshot(ctx context.Context, now hlc.Timestamp) LoadSplitSnapshot { d.mu.Lock() defer d.mu.Unlock() @@ -415,13 +416,13 @@ func (d *Decider) Snapshot(ctx context.Context, now time.Time) LoadSplitSnapshot type maxStatTracker struct { windows [6]float64 curIdx int - curStart time.Time - lastReset time.Time + curStart hlc.Timestamp + lastReset hlc.Timestamp minRetention time.Duration } // record adds the qps sample to the tracker. -func (t *maxStatTracker) record(now time.Time, minRetention time.Duration, qps float64) { +func (t *maxStatTracker) record(now hlc.Timestamp, minRetention time.Duration, qps float64) { t.maybeReset(now, minRetention) t.maybeRotate(now) t.windows[t.curIdx] = max(t.windows[t.curIdx], qps) @@ -429,7 +430,7 @@ func (t *maxStatTracker) record(now time.Time, minRetention time.Duration, qps f // reset clears the tracker. maxStatTracker will begin returning false until a full // minRetention period has elapsed. -func (t *maxStatTracker) reset(now time.Time, minRetention time.Duration) { +func (t *maxStatTracker) reset(now hlc.Timestamp, minRetention time.Duration) { if minRetention <= 0 { panic("minRetention must be positive") } @@ -440,7 +441,7 @@ func (t *maxStatTracker) reset(now time.Time, minRetention time.Duration) { t.minRetention = minRetention } -func (t *maxStatTracker) maybeReset(now time.Time, minRetention time.Duration) { +func (t *maxStatTracker) maybeReset(now hlc.Timestamp, minRetention time.Duration) { // If the retention period changes, simply reset the entire tracker. Merging // or splitting windows would be a difficult task and could lead to samples // either not being retained for long-enough, or being retained for too long. @@ -451,7 +452,7 @@ func (t *maxStatTracker) maybeReset(now time.Time, minRetention time.Duration) { } } -func (t *maxStatTracker) maybeRotate(now time.Time) { +func (t *maxStatTracker) maybeRotate(now hlc.Timestamp) { sinceLastRotate := now.Sub(t.curStart) windowWidth := t.windowWidth() if sinceLastRotate < windowWidth { @@ -469,7 +470,7 @@ func (t *maxStatTracker) maybeRotate(now time.Time) { } for i := 0; i < shift; i++ { t.curIdx = (t.curIdx + 1) % len(t.windows) - t.curStart = t.curStart.Add(windowWidth) + t.curStart = t.curStart.AddDuration(windowWidth) t.windows[t.curIdx] = 0 } } @@ -477,7 +478,7 @@ func (t *maxStatTracker) maybeRotate(now time.Time) { // max returns the maximum queries-per-second samples recorded over the last // retention period. If the tracker has not been recording for a full retention // period, then the method returns false. -func (t *maxStatTracker) max(now time.Time, minRetention time.Duration) (float64, bool) { +func (t *maxStatTracker) max(now hlc.Timestamp, minRetention time.Duration) (float64, bool) { t.record(now, minRetention, 0) // expire samples, if necessary if now.Sub(t.lastReset) < t.minRetention { diff --git a/pkg/kv/kvserver/split/decider_test.go b/pkg/kv/kvserver/split/decider_test.go index 43a75ac34b3b..8c7602441a1f 100644 --- a/pkg/kv/kvserver/split/decider_test.go +++ b/pkg/kv/kvserver/split/decider_test.go @@ -20,6 +20,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/util/encoding" + "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/metric" "github.com/stretchr/testify/assert" @@ -38,7 +39,7 @@ type testLoadSplitConfig struct { // NewLoadBasedSplitter returns a new LoadBasedSplitter that may be used to // find the midpoint based on recorded load. func (t *testLoadSplitConfig) NewLoadBasedSplitter( - startTime time.Time, _ SplitObjective, + startTime hlc.Timestamp, _ SplitObjective, ) LoadBasedSplitter { if t.useWeighted { return NewWeightedFinder(startTime, t.randSource) @@ -63,12 +64,12 @@ func ld(n int) func(SplitObjective) int { } } -func ms(i int) time.Time { +func ms(i int) hlc.Timestamp { ts, err := time.Parse(time.RFC3339, "2000-01-01T00:00:00Z") if err != nil { panic(err) } - return ts.Add(time.Duration(i) * time.Millisecond) + return hlc.Timestamp{WallTime: ts.Add(time.Duration(i) * time.Millisecond).UnixNano()} } func TestDecider(t *testing.T) { @@ -318,11 +319,11 @@ func TestDeciderCallsEnsureSafeSplitKey(t *testing.T) { require.NoError(t, err) var k roachpb.Key - var now time.Time + var now hlc.Timestamp for i := 0; i < 2*int(minSplitSuggestionInterval/time.Second); i++ { - now = now.Add(500 * time.Millisecond) + now = now.AddDuration(500 * time.Millisecond) d.Record(context.Background(), now, ld(1), c0) - now = now.Add(500 * time.Millisecond) + now = now.AddDuration(500 * time.Millisecond) d.Record(context.Background(), now, ld(1), c1) k = d.MaybeSplitKey(context.Background(), now) if len(k) != 0 { @@ -365,11 +366,11 @@ func TestDeciderIgnoresEnsureSafeSplitKeyOnError(t *testing.T) { require.Error(t, err) var k roachpb.Key - var now time.Time + var now hlc.Timestamp for i := 0; i < 2*int(minSplitSuggestionInterval/time.Second); i++ { - now = now.Add(500 * time.Millisecond) + now = now.AddDuration(500 * time.Millisecond) d.Record(context.Background(), now, ld(1), c0) - now = now.Add(500 * time.Millisecond) + now = now.AddDuration(500 * time.Millisecond) d.Record(context.Background(), now, ld(1), c1) k = d.MaybeSplitKey(context.Background(), now) if len(k) != 0 { diff --git a/pkg/kv/kvserver/split/load_based_splitter_test.go b/pkg/kv/kvserver/split/load_based_splitter_test.go index e043715a9735..e2d20246942b 100644 --- a/pkg/kv/kvserver/split/load_based_splitter_test.go +++ b/pkg/kv/kvserver/split/load_based_splitter_test.go @@ -20,6 +20,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/cockroachdb/cockroach/pkg/workload/ycsb" "golang.org/x/exp/rand" @@ -320,7 +321,7 @@ func ExampleUnweightedFinder() { rangeRequestPercent: 0.95, numRequests: 13000, lbs: func(randSource *rand.Rand) LoadBasedSplitter { - return NewUnweightedFinder(timeutil.Now(), randSource) + return NewUnweightedFinder(hlc.Timestamp{}, randSource) }, seed: 2022, }, @@ -330,7 +331,7 @@ func ExampleUnweightedFinder() { func ExampleWeightedFinder() { seed := uint64(2022) lbs := func(randSource *rand.Rand) LoadBasedSplitter { - return NewWeightedFinder(timeutil.Now(), randSource) + return NewWeightedFinder(hlc.Timestamp{}, randSource) } runTestMultipleSettings([]lbsTestSettings{ { diff --git a/pkg/kv/kvserver/split/unweighted_finder.go b/pkg/kv/kvserver/split/unweighted_finder.go index cbab193abf91..8228118b3846 100644 --- a/pkg/kv/kvserver/split/unweighted_finder.go +++ b/pkg/kv/kvserver/split/unweighted_finder.go @@ -18,6 +18,7 @@ import ( "time" "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/util/hlc" ) // Load-based splitting. @@ -62,14 +63,14 @@ type sample struct { // UnweightedFinder is a structure that is used to determine the split point // using the Reservoir Sampling method. type UnweightedFinder struct { - startTime time.Time + startTime hlc.Timestamp randSource RandSource samples [splitKeySampleSize]sample count int } // NewUnweightedFinder initiates a UnweightedFinder with the given time. -func NewUnweightedFinder(startTime time.Time, randSource RandSource) *UnweightedFinder { +func NewUnweightedFinder(startTime hlc.Timestamp, randSource RandSource) *UnweightedFinder { return &UnweightedFinder{ startTime: startTime, randSource: randSource, @@ -77,7 +78,7 @@ func NewUnweightedFinder(startTime time.Time, randSource RandSource) *Unweighted } // Ready implements the LoadBasedSplitter interface. -func (f *UnweightedFinder) Ready(nowTime time.Time) bool { +func (f *UnweightedFinder) Ready(nowTime hlc.Timestamp) bool { return nowTime.Sub(f.startTime) > RecordDurationThreshold } diff --git a/pkg/kv/kvserver/split/unweighted_finder_test.go b/pkg/kv/kvserver/split/unweighted_finder_test.go index e25fa844bfcf..10f54db66a08 100644 --- a/pkg/kv/kvserver/split/unweighted_finder_test.go +++ b/pkg/kv/kvserver/split/unweighted_finder_test.go @@ -19,9 +19,9 @@ import ( "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/stop" - "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/stretchr/testify/assert" ) @@ -168,7 +168,7 @@ func TestSplitFinderKey(t *testing.T) { randSource := rand.New(rand.NewSource(2022)) for i, test := range testCases { - finder := NewUnweightedFinder(timeutil.Now(), randSource) + finder := NewUnweightedFinder(hlc.Timestamp{}, randSource) finder.samples = test.reservoir if splitByLoadKey := finder.Key(); !bytes.Equal(splitByLoadKey, test.splitByLoadKey) { t.Errorf( @@ -266,7 +266,7 @@ func TestSplitFinderRecorder(t *testing.T) { } for i, test := range testCases { - finder := NewUnweightedFinder(timeutil.Now(), test.randSource) + finder := NewUnweightedFinder(hlc.Timestamp{}, test.randSource) finder.samples = test.currReservoir finder.count = test.currCount finder.Record(test.recordSpan, 1) @@ -323,7 +323,7 @@ func TestFinderNoSplitKeyCause(t *testing.T) { } randSource := rand.New(rand.NewSource(2022)) - finder := NewUnweightedFinder(timeutil.Now(), randSource) + finder := NewUnweightedFinder(hlc.Timestamp{}, randSource) finder.samples = samples insufficientCounters, imbalance, tooManyContained, imbalanceAndTooManyContained := finder.noSplitKeyCause() assert.Equal(t, 5, insufficientCounters, "unexpected insufficient counters") @@ -400,7 +400,7 @@ func TestFinderPopularKeyFrequency(t *testing.T) { randSource := rand.New(rand.NewSource(2022)) for i, test := range testCases { - finder := NewUnweightedFinder(timeutil.Now(), randSource) + finder := NewUnweightedFinder(hlc.Timestamp{}, randSource) finder.samples = test.samples popularKeyFrequency := finder.PopularKeyFrequency() assert.Equal(t, test.expectedPopularKeyFrequency, popularKeyFrequency, "unexpected popular key frequency in test %d", i) diff --git a/pkg/kv/kvserver/split/weighted_finder.go b/pkg/kv/kvserver/split/weighted_finder.go index 773c548e613f..008e11d322a3 100644 --- a/pkg/kv/kvserver/split/weighted_finder.go +++ b/pkg/kv/kvserver/split/weighted_finder.go @@ -14,9 +14,9 @@ import ( "fmt" "math" "sort" - "time" "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/util/hlc" ) // Load-based splitting. @@ -60,12 +60,12 @@ type WeightedFinder struct { samples [splitKeySampleSize]weightedSample count int totalWeight float64 - startTime time.Time + startTime hlc.Timestamp randSource RandSource } // NewWeightedFinder initiates a WeightedFinder with the given time. -func NewWeightedFinder(startTime time.Time, randSource RandSource) *WeightedFinder { +func NewWeightedFinder(startTime hlc.Timestamp, randSource RandSource) *WeightedFinder { return &WeightedFinder{ startTime: startTime, randSource: randSource, @@ -73,7 +73,7 @@ func NewWeightedFinder(startTime time.Time, randSource RandSource) *WeightedFind } // Ready implements the LoadBasedSplitter interface. -func (f *WeightedFinder) Ready(nowTime time.Time) bool { +func (f *WeightedFinder) Ready(nowTime hlc.Timestamp) bool { return nowTime.Sub(f.startTime) > RecordDurationThreshold } diff --git a/pkg/kv/kvserver/split/weighted_finder_test.go b/pkg/kv/kvserver/split/weighted_finder_test.go index 6db32a6bae3f..7f4545bffbdc 100644 --- a/pkg/kv/kvserver/split/weighted_finder_test.go +++ b/pkg/kv/kvserver/split/weighted_finder_test.go @@ -19,9 +19,9 @@ import ( "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/stop" - "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/stretchr/testify/assert" "golang.org/x/exp/rand" ) @@ -174,7 +174,7 @@ func TestSplitWeightedFinderKey(t *testing.T) { randSource := rand.New(rand.NewSource(2022)) for i, test := range testCases { - weightedFinder := NewWeightedFinder(timeutil.Now(), randSource) + weightedFinder := NewWeightedFinder(hlc.Timestamp{}, randSource) weightedFinder.samples = test.reservoir if splitByLoadKey := weightedFinder.Key(); !bytes.Equal(splitByLoadKey, test.splitByLoadKey) { t.Errorf( @@ -293,7 +293,7 @@ func TestSplitWeightedFinderRecorder(t *testing.T) { } for i, test := range testCases { - weightedFinder := NewWeightedFinder(timeutil.Now(), test.randSource) + weightedFinder := NewWeightedFinder(hlc.Timestamp{}, test.randSource) weightedFinder.samples = test.currReservoir weightedFinder.count = test.currCount weightedFinder.totalWeight = 100 @@ -333,7 +333,7 @@ func TestWeightedFinderNoSplitKeyCause(t *testing.T) { } randSource := rand.New(rand.NewSource(2022)) - weightedFinder := NewWeightedFinder(timeutil.Now(), randSource) + weightedFinder := NewWeightedFinder(hlc.Timestamp{}, randSource) weightedFinder.samples = samples insufficientCounters, imbalance := weightedFinder.noSplitKeyCause() assert.Equal(t, 7, insufficientCounters, "unexpected insufficient counters") @@ -403,7 +403,7 @@ func TestWeightedFinderPopularKeyFrequency(t *testing.T) { randSource := rand.New(rand.NewSource(2022)) for i, test := range testCases { - weightedFinder := NewWeightedFinder(timeutil.Now(), randSource) + weightedFinder := NewWeightedFinder(hlc.Timestamp{}, randSource) weightedFinder.samples = test.samples popularKeyFrequency := weightedFinder.PopularKeyFrequency() assert.True(t, math.Abs(test.expectedPopularKeyFrequency-popularKeyFrequency) < eps, diff --git a/pkg/kv/kvserver/split_queue.go b/pkg/kv/kvserver/split_queue.go index ebadd75e2950..85d68e716bf5 100644 --- a/pkg/kv/kvserver/split_queue.go +++ b/pkg/kv/kvserver/split_queue.go @@ -196,7 +196,7 @@ func (sq *splitQueue) shouldQueue( repl.GetMaxBytes(), repl.shouldBackpressureWrites(), confReader) if !shouldQ && repl.SplitByLoadEnabled() { - if splitKey := repl.loadSplitKey(ctx, repl.Clock().PhysicalTime()); splitKey != nil { + if splitKey := repl.loadSplitKey(ctx, repl.Clock().Now()); splitKey != nil { shouldQ, priority = true, 1.0 // default priority } } @@ -284,7 +284,7 @@ func (sq *splitQueue) processAttempt( return true, nil } - now := r.Clock().PhysicalTime() + now := r.Clock().Now() if splitByLoadKey := r.loadSplitKey(ctx, now); splitByLoadKey != nil { loadStats := r.loadStats.Stats() batchHandledQPS := loadStats.QueriesPerSecond @@ -332,7 +332,7 @@ func (sq *splitQueue) processAttempt( sq.metrics.LoadBasedSplitCount.Inc(1) // Reset the splitter now that the bounds of the range changed. - r.loadBasedSplitter.Reset(sq.store.Clock().PhysicalTime()) + r.loadBasedSplitter.Reset(sq.store.Clock().Now()) return true, nil } diff --git a/pkg/kv/kvserver/store.go b/pkg/kv/kvserver/store.go index 2556a60e1542..72a900ae8cf8 100644 --- a/pkg/kv/kvserver/store.go +++ b/pkg/kv/kvserver/store.go @@ -1239,7 +1239,7 @@ func NewStore( func(ctx context.Context, obj LBRebalancingObjective) { s.VisitReplicas(func(r *Replica) (wantMore bool) { r.loadBasedSplitter.SetSplitObjective( - s.Clock().PhysicalTime(), + s.Clock().Now(), obj.ToSplitObjective(), ) return true diff --git a/pkg/server/admin.go b/pkg/server/admin.go index 4815d38cf287..8a169bae8238 100644 --- a/pkg/server/admin.go +++ b/pkg/server/admin.go @@ -2183,7 +2183,7 @@ func (s *systemAdminServer) checkReadinessForHealthCheck(ctx context.Context) er if !ok { return grpcstatus.Error(codes.Unavailable, "liveness record not found") } - if !l.IsLive(s.clock.Now().GoTime()) { + if !l.IsLive(s.clock.Now()) { return grpcstatus.Errorf(codes.Unavailable, "node is not healthy") } if l.Draining { @@ -2212,7 +2212,7 @@ func (s *systemAdminServer) checkReadinessForHealthCheck(ctx context.Context) er // // getLivenessStatusMap() includes removed nodes (dead + decommissioned). func getLivenessStatusMap( - ctx context.Context, nl *liveness.NodeLiveness, now time.Time, st *cluster.Settings, + ctx context.Context, nl *liveness.NodeLiveness, now hlc.Timestamp, st *cluster.Settings, ) (map[roachpb.NodeID]livenesspb.NodeLivenessStatus, error) { livenesses, err := nl.GetLivenessesFromKV(ctx) if err != nil { @@ -2232,7 +2232,7 @@ func getLivenessStatusMap( // a slice containing the liveness record of all nodes that have ever been a part of the // cluster. func getLivenessResponse( - ctx context.Context, nl optionalnodeliveness.Interface, now time.Time, st *cluster.Settings, + ctx context.Context, nl optionalnodeliveness.Interface, now hlc.Timestamp, st *cluster.Settings, ) (*serverpb.LivenessResponse, error) { livenesses, err := nl.GetLivenessesFromKV(ctx) if err != nil { @@ -2274,7 +2274,7 @@ func (s *systemAdminServer) Liveness( ) (*serverpb.LivenessResponse, error) { clock := s.clock - return getLivenessResponse(ctx, s.nodeLiveness, clock.Now().GoTime(), s.st) + return getLivenessResponse(ctx, s.nodeLiveness, clock.Now(), s.st) } func (s *adminServer) Jobs( @@ -2718,7 +2718,7 @@ func (s *systemAdminServer) DecommissionPreCheck( // Initially evaluate node liveness status, so we filter the nodes to check. var nodesToCheck []roachpb.NodeID - livenessStatusByNodeID, err := getLivenessStatusMap(ctx, s.nodeLiveness, s.clock.Now().GoTime(), s.st) + livenessStatusByNodeID, err := getLivenessStatusMap(ctx, s.nodeLiveness, s.clock.Now(), s.st) if err != nil { return nil, serverError(ctx, err) } @@ -2927,7 +2927,7 @@ func (s *systemAdminServer) decommissionStatusHelper( Draining: l.Draining, ReportedReplicas: replicasToReport[l.NodeID], } - if l.IsLive(s.clock.Now().GoTime()) { + if l.IsLive(s.clock.Now()) { nodeResp.IsLive = true } res.Status = append(res.Status, nodeResp) diff --git a/pkg/server/auto_upgrade.go b/pkg/server/auto_upgrade.go index c758d12f54b2..ac8c292d1e5a 100644 --- a/pkg/server/auto_upgrade.go +++ b/pkg/server/auto_upgrade.go @@ -128,7 +128,7 @@ func (s *Server) upgradeStatus( return upgradeBlockedDueToError, err } clock := s.admin.server.clock - statusMap, err := getLivenessStatusMap(ctx, s.nodeLiveness, clock.Now().GoTime(), s.st) + statusMap, err := getLivenessStatusMap(ctx, s.nodeLiveness, clock.Now(), s.st) if err != nil { return upgradeBlockedDueToError, err } diff --git a/pkg/server/clock_monotonicity.go b/pkg/server/clock_monotonicity.go index c7aebd60efc6..42928eac60be 100644 --- a/pkg/server/clock_monotonicity.go +++ b/pkg/server/clock_monotonicity.go @@ -139,7 +139,7 @@ func ensureClockMonotonicity( if delta > 0 { log.Ops.Infof( ctx, - "Sleeping till wall time %v to catches up to %v to ensure monotonicity. Delta: %v", + "Sleeping till wall time %v to catches up to %v to ensure monotonicity. Sub: %v", currentWallTime, sleepUntil, delta, diff --git a/pkg/server/fanout_clients.go b/pkg/server/fanout_clients.go index 7d1c5dfc1307..7cb7a1372ab0 100644 --- a/pkg/server/fanout_clients.go +++ b/pkg/server/fanout_clients.go @@ -240,7 +240,7 @@ func (k kvFanoutClient) listNodes(ctx context.Context) (*serverpb.NodesResponse, } clock := k.clock - resp.LivenessByNodeID, err = getLivenessStatusMap(ctx, k.nodeLiveness, clock.Now().GoTime(), k.st) + resp.LivenessByNodeID, err = getLivenessStatusMap(ctx, k.nodeLiveness, clock.Now(), k.st) if err != nil { return nil, err } diff --git a/pkg/server/status.go b/pkg/server/status.go index 611421cc078a..ed1edc76535c 100644 --- a/pkg/server/status.go +++ b/pkg/server/status.go @@ -1815,7 +1815,7 @@ func (s *systemStatusServer) nodesHelper( } clock := s.clock - resp.LivenessByNodeID, err = getLivenessStatusMap(ctx, s.nodeLiveness, clock.Now().GoTime(), s.st) + resp.LivenessByNodeID, err = getLivenessStatusMap(ctx, s.nodeLiveness, clock.Now(), s.st) if err != nil { return nil, 0, err } diff --git a/pkg/util/hlc/timestamp.go b/pkg/util/hlc/timestamp.go index ed514b10db1c..dbc040da8ccd 100644 --- a/pkg/util/hlc/timestamp.go +++ b/pkg/util/hlc/timestamp.go @@ -54,6 +54,12 @@ func (t Timestamp) LessEq(s Timestamp) bool { return t.WallTime < s.WallTime || (t.WallTime == s.WallTime && t.Logical <= s.Logical) } +// After returns whether the provided timestamp is after our timestamp. +// This matches the +func (t Timestamp) After(s Timestamp) bool { + return !t.LessEq(s) +} + // Compare returns -1 if this timestamp is lesser than the given timestamp, 1 if // it is greater, and 0 if they are equal. func (t Timestamp) Compare(s Timestamp) int { @@ -205,6 +211,23 @@ func (t Timestamp) IsSet() bool { return !t.IsEmpty() } +// AddDuration adds a given duration to this Timestamp. The resulting timestamp +// is Synthetic. Normally if you want to bump your clock to the higher of two +// timestamps, use Forward, however this method is here to create a +// hlc.Timestamp in the future (or past). +func (t Timestamp) AddDuration(duration time.Duration) Timestamp { + return Timestamp{ + WallTime: t.WallTime + duration.Nanoseconds(), + Logical: t.Logical, + Synthetic: true, + } +} + +// Sub returns the delta of this time from a previous timestamp. +func (t Timestamp) Sub(previous Timestamp) time.Duration { + return time.Duration(t.WallTime - previous.WallTime) +} + // Add returns a timestamp with the WallTime and Logical components increased. // wallTime is expressed in nanos. //