From 7af839fcefe6cca46effa7a9426cc2bd383745b2 Mon Sep 17 00:00:00 2001 From: Andrew Baptist Date: Mon, 15 May 2023 16:32:46 -0400 Subject: [PATCH] liveness: Convert to using hlc.Timestamp Previously there were some conversions back and forth between hlc times and go times. We always use hlc time when looking at the validity of liveness, and this change makes that more clear. Epic: none Release note: None --- .../allocator/allocatorimpl/allocator_test.go | 16 +- .../kvserver/allocator/storepool/BUILD.bazel | 1 - .../storepool/override_store_pool.go | 2 +- .../storepool/override_store_pool_test.go | 9 +- .../allocator/storepool/store_pool.go | 56 +++--- .../allocator/storepool/store_pool_test.go | 178 +++++++----------- .../allocator/storepool/test_helpers.go | 5 +- pkg/kv/kvserver/allocator_impl_test.go | 2 +- pkg/kv/kvserver/asim/gossip/BUILD.bazel | 1 + pkg/kv/kvserver/asim/gossip/exchange.go | 3 +- pkg/kv/kvserver/asim/state/BUILD.bazel | 1 + pkg/kv/kvserver/asim/state/impl.go | 2 +- pkg/kv/kvserver/asim/state/state_test.go | 8 +- .../client_replica_circuit_breaker_test.go | 2 +- pkg/kv/kvserver/helpers_test.go | 2 +- pkg/kv/kvserver/liveness/liveness.go | 16 +- .../kvserver/liveness/livenesspb/BUILD.bazel | 2 +- .../kvserver/liveness/livenesspb/liveness.go | 14 +- pkg/kv/kvserver/replica_raft.go | 2 +- pkg/kv/kvserver/replicate_queue_test.go | 2 +- pkg/server/admin.go | 12 +- pkg/server/auto_upgrade.go | 2 +- pkg/server/clock_monotonicity.go | 2 +- pkg/server/fanout_clients.go | 2 +- pkg/server/status.go | 2 +- pkg/util/hlc/timestamp.go | 14 ++ 26 files changed, 171 insertions(+), 187 deletions(-) diff --git a/pkg/kv/kvserver/allocator/allocatorimpl/allocator_test.go b/pkg/kv/kvserver/allocator/allocatorimpl/allocator_test.go index 803cfdae9a60..6cf33fa4b197 100644 --- a/pkg/kv/kvserver/allocator/allocatorimpl/allocator_test.go +++ b/pkg/kv/kvserver/allocator/allocatorimpl/allocator_test.go @@ -532,7 +532,7 @@ func mockStorePool( for _, storeID := range suspectedStoreIDs { liveNodeSet[roachpb.NodeID(storeID)] = livenesspb.NodeLivenessStatus_LIVE detail := storePool.GetStoreDetailLocked(storeID) - detail.LastUnavailable = storePool.Clock().Now().GoTime() + detail.LastUnavailable = storePool.Clock().Now() detail.Desc = &roachpb.StoreDescriptor{ StoreID: storeID, Node: roachpb.NodeDescriptor{NodeID: roachpb.NodeID(storeID)}, @@ -542,7 +542,7 @@ func mockStorePool( // Set the node liveness function using the set we constructed. // TODO(sarkesian): This override needs to be fixed to stop exporting this field. storePool.NodeLivenessFn = - func(nodeID roachpb.NodeID, now time.Time, threshold time.Duration) livenesspb.NodeLivenessStatus { + func(nodeID roachpb.NodeID, now hlc.Timestamp, threshold time.Duration) livenesspb.NodeLivenessStatus { if status, ok := liveNodeSet[nodeID]; ok { return status } @@ -842,7 +842,7 @@ func TestAllocatorReplaceDecommissioningReplica(t *testing.T) { gossiputil.NewStoreGossiper(g).GossipStores(storeDescriptors, t) // Override liveness of n3 to decommissioning so the only available target is s4. - oSp := storepool.NewOverrideStorePool(sp, func(nid roachpb.NodeID, now time.Time, timeUntilStoreDead time.Duration) livenesspb.NodeLivenessStatus { + oSp := storepool.NewOverrideStorePool(sp, func(nid roachpb.NodeID, now hlc.Timestamp, timeUntilStoreDead time.Duration) livenesspb.NodeLivenessStatus { if nid == roachpb.NodeID(3) { return livenesspb.NodeLivenessStatus_DECOMMISSIONING } @@ -900,7 +900,7 @@ func TestAllocatorReplaceFailsOnConstrainedDecommissioningReplica(t *testing.T) gossiputil.NewStoreGossiper(g).GossipStores(sameDCStores, t) // Override liveness of n3 to decommissioning so the only available target is s4. - oSp := storepool.NewOverrideStorePool(sp, func(nid roachpb.NodeID, now time.Time, timeUntilStoreDead time.Duration) livenesspb.NodeLivenessStatus { + oSp := storepool.NewOverrideStorePool(sp, func(nid roachpb.NodeID, now hlc.Timestamp, timeUntilStoreDead time.Duration) livenesspb.NodeLivenessStatus { if nid == roachpb.NodeID(3) { return livenesspb.NodeLivenessStatus_DECOMMISSIONING } @@ -6829,7 +6829,7 @@ func TestAllocatorComputeActionWithStorePoolRemoveDead(t *testing.T) { // Mark all dead nodes as alive, so we can override later. all := append(tcase.live, tcase.dead...) mockStorePool(sp, all, nil, nil, nil, nil, nil) - oSp := storepool.NewOverrideStorePool(sp, func(nid roachpb.NodeID, now time.Time, timeUntilStoreDead time.Duration) livenesspb.NodeLivenessStatus { + oSp := storepool.NewOverrideStorePool(sp, func(nid roachpb.NodeID, now hlc.Timestamp, timeUntilStoreDead time.Duration) livenesspb.NodeLivenessStatus { for _, deadStoreID := range tcase.dead { if nid == roachpb.NodeID(deadStoreID) { return livenesspb.NodeLivenessStatus_DEAD @@ -8503,7 +8503,7 @@ func TestAllocatorFullDisks(t *testing.T) { for i := 0; i < generations; i++ { // First loop through test stores and randomly add data. for j := 0; j < len(testStores); j++ { - if mockNodeLiveness.NodeLivenessFunc(roachpb.NodeID(j), time.Time{}, 0) == livenesspb.NodeLivenessStatus_DEAD { + if mockNodeLiveness.NodeLivenessFunc(roachpb.NodeID(j), hlc.Timestamp{}, 0) == livenesspb.NodeLivenessStatus_DEAD { continue } ts := &testStores[j] @@ -8530,7 +8530,7 @@ func TestAllocatorFullDisks(t *testing.T) { // Loop through each store a number of times and maybe rebalance. for j := 0; j < 10; j++ { for k := 0; k < len(testStores); k++ { - if mockNodeLiveness.NodeLivenessFunc(roachpb.NodeID(k), time.Time{}, 0) == livenesspb.NodeLivenessStatus_DEAD { + if mockNodeLiveness.NodeLivenessFunc(roachpb.NodeID(k), hlc.Timestamp{}, 0) == livenesspb.NodeLivenessStatus_DEAD { continue } ts := &testStores[k] @@ -8567,7 +8567,7 @@ func TestAllocatorFullDisks(t *testing.T) { // Simulate rocksdb compactions freeing up disk space. for j := 0; j < len(testStores); j++ { - if mockNodeLiveness.NodeLivenessFunc(roachpb.NodeID(j), time.Time{}, 0) != livenesspb.NodeLivenessStatus_DEAD { + if mockNodeLiveness.NodeLivenessFunc(roachpb.NodeID(j), hlc.Timestamp{}, 0) != livenesspb.NodeLivenessStatus_DEAD { ts := &testStores[j] if ts.Capacity.Available <= 0 { t.Errorf("testStore %d ran out of space during generation %d: %+v", j, i, ts.Capacity) diff --git a/pkg/kv/kvserver/allocator/storepool/BUILD.bazel b/pkg/kv/kvserver/allocator/storepool/BUILD.bazel index 73c8adc436fc..cac55e29e712 100644 --- a/pkg/kv/kvserver/allocator/storepool/BUILD.bazel +++ b/pkg/kv/kvserver/allocator/storepool/BUILD.bazel @@ -48,7 +48,6 @@ go_test( "//pkg/util/hlc", "//pkg/util/leaktest", "//pkg/util/log", - "//pkg/util/timeutil", "@com_github_cockroachdb_errors//:errors", "@com_github_kr_pretty//:pretty", "@com_github_stretchr_testify//require", diff --git a/pkg/kv/kvserver/allocator/storepool/override_store_pool.go b/pkg/kv/kvserver/allocator/storepool/override_store_pool.go index cc6338fc5700..339ba7657cd5 100644 --- a/pkg/kv/kvserver/allocator/storepool/override_store_pool.go +++ b/pkg/kv/kvserver/allocator/storepool/override_store_pool.go @@ -50,7 +50,7 @@ var _ AllocatorStorePool = &OverrideStorePool{} func OverrideNodeLivenessFunc( overrides map[roachpb.NodeID]livenesspb.NodeLivenessStatus, realNodeLivenessFunc NodeLivenessFunc, ) NodeLivenessFunc { - return func(nid roachpb.NodeID, now time.Time, timeUntilStoreDead time.Duration) livenesspb.NodeLivenessStatus { + return func(nid roachpb.NodeID, now hlc.Timestamp, timeUntilStoreDead time.Duration) livenesspb.NodeLivenessStatus { if override, ok := overrides[nid]; ok { return override } diff --git a/pkg/kv/kvserver/allocator/storepool/override_store_pool_test.go b/pkg/kv/kvserver/allocator/storepool/override_store_pool_test.go index 310043395c1a..3876afa3149a 100644 --- a/pkg/kv/kvserver/allocator/storepool/override_store_pool_test.go +++ b/pkg/kv/kvserver/allocator/storepool/override_store_pool_test.go @@ -20,6 +20,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/testutils/gossiputil" + "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/stretchr/testify/require" @@ -42,7 +43,7 @@ func TestOverrideStorePoolStatusString(t *testing.T) { sg := gossiputil.NewStoreGossiper(g) livenessOverrides := make(map[roachpb.NodeID]livenesspb.NodeLivenessStatus) - sp := NewOverrideStorePool(testStorePool, func(nid roachpb.NodeID, now time.Time, timeUntilStoreDead time.Duration) livenesspb.NodeLivenessStatus { + sp := NewOverrideStorePool(testStorePool, func(nid roachpb.NodeID, now hlc.Timestamp, timeUntilStoreDead time.Duration) livenesspb.NodeLivenessStatus { if overriddenLiveness, ok := livenessOverrides[nid]; ok { return overriddenLiveness } @@ -123,7 +124,7 @@ func TestOverrideStorePoolDecommissioningReplicas(t *testing.T) { sg := gossiputil.NewStoreGossiper(g) livenessOverrides := make(map[roachpb.NodeID]livenesspb.NodeLivenessStatus) - sp := NewOverrideStorePool(testStorePool, func(nid roachpb.NodeID, now time.Time, timeUntilStoreDead time.Duration) livenesspb.NodeLivenessStatus { + sp := NewOverrideStorePool(testStorePool, func(nid roachpb.NodeID, now hlc.Timestamp, timeUntilStoreDead time.Duration) livenesspb.NodeLivenessStatus { if overriddenLiveness, ok := livenessOverrides[nid]; ok { return overriddenLiveness } @@ -240,7 +241,7 @@ func TestOverrideStorePoolGetStoreList(t *testing.T) { sg := gossiputil.NewStoreGossiper(g) livenessOverrides := make(map[roachpb.NodeID]livenesspb.NodeLivenessStatus) - sp := NewOverrideStorePool(testStorePool, func(nid roachpb.NodeID, now time.Time, timeUntilStoreDead time.Duration) livenesspb.NodeLivenessStatus { + sp := NewOverrideStorePool(testStorePool, func(nid roachpb.NodeID, now hlc.Timestamp, timeUntilStoreDead time.Duration) livenesspb.NodeLivenessStatus { if overriddenLiveness, ok := livenessOverrides[nid]; ok { return overriddenLiveness } @@ -335,7 +336,7 @@ func TestOverrideStorePoolGetStoreList(t *testing.T) { // Set suspectedStore as suspected. testStorePool.DetailsMu.Lock() - testStorePool.DetailsMu.StoreDetails[suspectedStore.StoreID].LastUnavailable = testStorePool.clock.Now().GoTime() + testStorePool.DetailsMu.StoreDetails[suspectedStore.StoreID].LastUnavailable = testStorePool.clock.Now() testStorePool.DetailsMu.Unlock() // No filter or limited set of store IDs. diff --git a/pkg/kv/kvserver/allocator/storepool/store_pool.go b/pkg/kv/kvserver/allocator/storepool/store_pool.go index de3b33194af1..6098dd8eeda3 100644 --- a/pkg/kv/kvserver/allocator/storepool/store_pool.go +++ b/pkg/kv/kvserver/allocator/storepool/store_pool.go @@ -113,7 +113,7 @@ type NodeCountFunc func() int // not the node is live. A node is considered dead if its liveness record has // expired by more than TimeUntilStoreDead. type NodeLivenessFunc func( - nid roachpb.NodeID, now time.Time, timeUntilStoreDead time.Duration, + nid roachpb.NodeID, now hlc.Timestamp, timeUntilStoreDead time.Duration, ) livenesspb.NodeLivenessStatus // MakeStorePoolNodeLivenessFunc returns a function which determines @@ -121,7 +121,7 @@ type NodeLivenessFunc func( // NodeLiveness. func MakeStorePoolNodeLivenessFunc(nodeLiveness *liveness.NodeLiveness) NodeLivenessFunc { return func( - nodeID roachpb.NodeID, now time.Time, timeUntilStoreDead time.Duration, + nodeID roachpb.NodeID, now hlc.Timestamp, timeUntilStoreDead time.Duration, ) livenesspb.NodeLivenessStatus { liveness, ok := nodeLiveness.GetLiveness(nodeID) if !ok { @@ -160,7 +160,7 @@ func MakeStorePoolNodeLivenessFunc(nodeLiveness *liveness.NodeLiveness) NodeLive // ideally we should remove usage of NodeLivenessStatus altogether. See #50707 // for more details. func LivenessStatus( - l livenesspb.Liveness, now time.Time, deadThreshold time.Duration, + l livenesspb.Liveness, now hlc.Timestamp, deadThreshold time.Duration, ) livenesspb.NodeLivenessStatus { // If we don't have a liveness expiration time, treat the status as unknown. // This is different than unavailable as it doesn't transition through being @@ -195,16 +195,16 @@ type StoreDetail struct { Desc *roachpb.StoreDescriptor // ThrottledUntil is when a throttled store can be considered available again // due to a failed or declined snapshot. - ThrottledUntil time.Time + ThrottledUntil hlc.Timestamp // throttledBecause is set to the most recent reason for which a store was // marked as throttled. throttledBecause string // LastUpdatedTime is set when a store is first consulted and every time // gossip arrives for a store. - LastUpdatedTime time.Time + LastUpdatedTime hlc.Timestamp // LastUnavailable is set when it's detected that a store was unavailable, // i.e. failed liveness. - LastUnavailable time.Time + LastUnavailable hlc.Timestamp } // storeStatus is the current status of a store. @@ -238,7 +238,10 @@ const ( ) func (sd *StoreDetail) status( - now time.Time, deadThreshold time.Duration, nl NodeLivenessFunc, suspectDuration time.Duration, + now hlc.Timestamp, + deadThreshold time.Duration, + nl NodeLivenessFunc, + suspectDuration time.Duration, ) storeStatus { // During normal operation, we expect the state transitions for stores to look like the following: // @@ -265,7 +268,7 @@ func (sd *StoreDetail) status( // within the liveness threshold. Note that LastUpdatedTime is set // when the store detail is created and will have a non-zero value // even before the first gossip arrives for a store. - deadAsOf := sd.LastUpdatedTime.Add(deadThreshold) + deadAsOf := sd.LastUpdatedTime.AddDuration(deadThreshold) if now.After(deadAsOf) { sd.LastUnavailable = now return storeStatusDead @@ -305,7 +308,7 @@ func (sd *StoreDetail) status( // Check whether the store is currently suspect. We measure that by // looking at the time it was last unavailable making sure we have not seen any // failures for a period of time defined by StoreSuspectDuration. - if sd.LastUnavailable.Add(suspectDuration).After(now) { + if sd.LastUnavailable.AddDuration(suspectDuration).After(now) { return storeStatusSuspect } @@ -443,7 +446,7 @@ type StorePool struct { gossip *gossip.Gossip nodeCountFn NodeCountFunc NodeLivenessFn NodeLivenessFunc - startTime time.Time + startTime hlc.Timestamp deterministic bool // We use separate mutexes for storeDetails and nodeLocalities because the @@ -492,7 +495,7 @@ func NewStorePool( gossip: g, nodeCountFn: nodeCountFn, NodeLivenessFn: nodeLivenessFn, - startTime: clock.PhysicalTime(), + startTime: clock.Now(), deterministic: deterministic, } sp.DetailsMu.StoreDetails = make(map[roachpb.StoreID]*StoreDetail) @@ -523,7 +526,7 @@ func (sp *StorePool) statusString(nl NodeLivenessFunc) string { sort.Sort(ids) var buf bytes.Buffer - now := sp.clock.Now().GoTime() + now := sp.clock.Now() timeUntilStoreDead := TimeUntilStoreDead.Get(&sp.st.SV) timeAfterStoreSuspect := TimeAfterStoreSuspect.Get(&sp.st.SV) @@ -538,9 +541,8 @@ func (sp *StorePool) statusString(nl NodeLivenessFunc) string { fmt.Fprintf(&buf, ": range-count=%d fraction-used=%.2f", detail.Desc.Capacity.RangeCount, detail.Desc.Capacity.FractionUsed()) } - throttled := detail.ThrottledUntil.Sub(now) - if throttled > 0 { - fmt.Fprintf(&buf, " [throttled=%.1fs]", throttled.Seconds()) + if detail.ThrottledUntil.After(now) { + fmt.Fprintf(&buf, " [throttled=%.1fs]", detail.ThrottledUntil.GoTime().Sub(now.GoTime()).Seconds()) } _, _ = buf.WriteString("\n") } @@ -569,7 +571,7 @@ func (sp *StorePool) storeDescriptorUpdate(storeDesc roachpb.StoreDescriptor) { storeID := storeDesc.StoreID curCapacity := storeDesc.Capacity - now := sp.clock.PhysicalTime() + now := sp.clock.Now() sp.DetailsMu.Lock() detail := sp.GetStoreDetailLocked(storeID) @@ -815,9 +817,9 @@ func (sp *StorePool) decommissioningReplicasWithLiveness( sp.DetailsMu.Lock() defer sp.DetailsMu.Unlock() - // NB: We use clock.Now().GoTime() instead of clock.PhysicalTime() is order to + // NB: We use clock.Now() instead of clock.PhysicalTime() is order to // take clock signals from remote nodes into consideration. - now := sp.clock.Now().GoTime() + now := sp.clock.Now() timeUntilStoreDead := TimeUntilStoreDead.Get(&sp.st.SV) timeAfterStoreSuspect := TimeAfterStoreSuspect.Get(&sp.st.SV) @@ -859,12 +861,12 @@ func (sp *StorePool) IsDead(storeID roachpb.StoreID) (bool, time.Duration, error if !ok { return false, 0, errors.Errorf("store %d was not found", storeID) } - // NB: We use clock.Now().GoTime() instead of clock.PhysicalTime() is order to + // NB: We use clock.Now() instead of clock.PhysicalTime() is order to // take clock signals from remote nodes into consideration. - now := sp.clock.Now().GoTime() + now := sp.clock.Now() timeUntilStoreDead := TimeUntilStoreDead.Get(&sp.st.SV) - deadAsOf := sd.LastUpdatedTime.Add(timeUntilStoreDead) + deadAsOf := sd.LastUpdatedTime.AddDuration(timeUntilStoreDead) if now.After(deadAsOf) { return true, 0, nil } @@ -873,7 +875,7 @@ func (sp *StorePool) IsDead(storeID roachpb.StoreID) (bool, time.Duration, error if sd.Desc == nil { return false, 0, errors.Errorf("store %d status unknown, cant tell if it's dead or alive", storeID) } - return false, deadAsOf.Sub(now), nil + return false, deadAsOf.GoTime().Sub(now.GoTime()), nil } // IsUnknown returns true if the given store's status is `storeStatusUnknown` @@ -935,9 +937,9 @@ func (sp *StorePool) storeStatus( if !ok { return storeStatusUnknown, errors.Errorf("store %d was not found", storeID) } - // NB: We use clock.Now().GoTime() instead of clock.PhysicalTime() is order to + // NB: We use clock.Now() instead of clock.PhysicalTime() is order to // take clock signals from remote nodes into consideration. - now := sp.clock.Now().GoTime() + now := sp.clock.Now() timeUntilStoreDead := TimeUntilStoreDead.Get(&sp.st.SV) timeAfterStoreSuspect := TimeAfterStoreSuspect.Get(&sp.st.SV) return sd.status(now, timeUntilStoreDead, nl, timeAfterStoreSuspect), nil @@ -971,7 +973,7 @@ func (sp *StorePool) liveAndDeadReplicasWithLiveness( sp.DetailsMu.Lock() defer sp.DetailsMu.Unlock() - now := sp.clock.Now().GoTime() + now := sp.clock.Now() timeUntilStoreDead := TimeUntilStoreDead.Get(&sp.st.SV) timeAfterStoreSuspect := TimeAfterStoreSuspect.Get(&sp.st.SV) @@ -1247,7 +1249,7 @@ func (sp *StorePool) getStoreListFromIDsLocked( var throttled ThrottledStoreReasons var storeDescriptors []roachpb.StoreDescriptor - now := sp.clock.Now().GoTime() + now := sp.clock.Now() timeUntilStoreDead := TimeUntilStoreDead.Get(&sp.st.SV) timeAfterStoreSuspect := TimeAfterStoreSuspect.Get(&sp.st.SV) @@ -1311,7 +1313,7 @@ func (sp *StorePool) Throttle(reason ThrottleReason, why string, storeID roachpb switch reason { case ThrottleFailed: timeout := FailedReservationsTimeout.Get(&sp.st.SV) - detail.ThrottledUntil = sp.clock.PhysicalTime().Add(timeout) + detail.ThrottledUntil = sp.clock.Now().AddDuration(timeout) if log.V(2) { ctx := sp.AnnotateCtx(context.TODO()) log.Infof(ctx, "snapshot failed (%s), s%d will be throttled for %s until %s", diff --git a/pkg/kv/kvserver/allocator/storepool/store_pool_test.go b/pkg/kv/kvserver/allocator/storepool/store_pool_test.go index 9c1910be9f84..b5ab8cd5bfc7 100644 --- a/pkg/kv/kvserver/allocator/storepool/store_pool_test.go +++ b/pkg/kv/kvserver/allocator/storepool/store_pool_test.go @@ -26,7 +26,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/log" - "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/cockroachdb/errors" "github.com/kr/pretty" "github.com/stretchr/testify/require" @@ -206,9 +205,9 @@ func TestStorePoolGetStoreList(t *testing.T) { mnl.SetNodeStatus(deadStore.Node.NodeID, livenesspb.NodeLivenessStatus_DEAD) sp.DetailsMu.Lock() // Set declinedStore as throttled. - sp.DetailsMu.StoreDetails[declinedStore.StoreID].ThrottledUntil = sp.clock.Now().GoTime().Add(time.Hour) + sp.DetailsMu.StoreDetails[declinedStore.StoreID].ThrottledUntil = sp.clock.Now().AddDuration(time.Hour) // Set suspectedStore as suspected. - sp.DetailsMu.StoreDetails[suspectedStore.StoreID].LastUnavailable = sp.clock.Now().GoTime() + sp.DetailsMu.StoreDetails[suspectedStore.StoreID].LastUnavailable = sp.clock.Now() sp.DetailsMu.Unlock() // No filter or limited set of store IDs. @@ -589,13 +588,13 @@ func TestStorePoolThrottle(t *testing.T) { sg := gossiputil.NewStoreGossiper(g) sg.GossipStores(uniqueStore, t) - expected := sp.clock.Now().GoTime().Add(FailedReservationsTimeout.Get(&sp.st.SV)) + expected := sp.clock.Now().AddDuration(FailedReservationsTimeout.Get(&sp.st.SV)) sp.Throttle(ThrottleFailed, "", 1) sp.DetailsMu.Lock() detail := sp.GetStoreDetailLocked(1) sp.DetailsMu.Unlock() - if !detail.ThrottledUntil.Equal(expected) { + if detail.ThrottledUntil.WallTime != expected.WallTime { t.Errorf("expected store to have been throttled to %v, found %v", expected, detail.ThrottledUntil) } @@ -614,7 +613,7 @@ func TestStorePoolSuspected(t *testing.T) { livenesspb.NodeLivenessStatus_DEAD) defer stopper.Stop(ctx) - now := sp.clock.Now().GoTime() + now := sp.clock.Now() timeUntilStoreDead := TimeUntilStoreDead.Get(&sp.st.SV) timeAfterStoreSuspect := TimeAfterStoreSuspect.Get(&sp.st.SV) @@ -622,7 +621,7 @@ func TestStorePoolSuspected(t *testing.T) { detail := sp.GetStoreDetailLocked(0) s := detail.status(now, timeUntilStoreDead, sp.NodeLivenessFn, timeAfterStoreSuspect) require.Equal(t, s, storeStatusUnknown) - require.True(t, detail.LastUnavailable.IsZero()) + require.Equal(t, hlc.Timestamp{}, detail.LastUnavailable) // Now start gossiping the stores statuses. sg := gossiputil.NewStoreGossiper(g) @@ -637,13 +636,13 @@ func TestStorePoolSuspected(t *testing.T) { s = detail.status(now, timeUntilStoreDead, sp.NodeLivenessFn, timeAfterStoreSuspect) require.Equal(t, s, storeStatusAvailable) - require.True(t, detail.LastUnavailable.IsZero()) + require.Equal(t, hlc.Timestamp{}, detail.LastUnavailable) // When the store transitions to unavailable, its status changes to temporarily unknown. mnl.SetNodeStatus(store.Node.NodeID, livenesspb.NodeLivenessStatus_UNAVAILABLE) s = detail.status(now, timeUntilStoreDead, sp.NodeLivenessFn, timeAfterStoreSuspect) require.Equal(t, s, storeStatusUnknown) - require.False(t, detail.LastUnavailable.IsZero()) + require.NotEqual(t, hlc.Timestamp{}, detail.LastUnavailable) // When the store transitions back to live, it passes through suspect for a period. mnl.SetNodeStatus(store.Node.NodeID, livenesspb.NodeLivenessStatus_LIVE) @@ -651,7 +650,7 @@ func TestStorePoolSuspected(t *testing.T) { require.Equal(t, s, storeStatusSuspect) // Once the window has passed, it will return to available. - now = now.Add(timeAfterStoreSuspect).Add(time.Millisecond) + now = now.AddDuration(timeAfterStoreSuspect).AddDuration(time.Millisecond) s = detail.status(now, timeUntilStoreDead, sp.NodeLivenessFn, timeAfterStoreSuspect) require.Equal(t, s, storeStatusAvailable) @@ -666,7 +665,7 @@ func TestStorePoolSuspected(t *testing.T) { require.Equal(t, s, storeStatusSuspect) // Verify it also returns correctly to available after suspect time. - now = now.Add(timeAfterStoreSuspect).Add(time.Millisecond) + now = now.AddDuration(timeAfterStoreSuspect).AddDuration(time.Millisecond) s = detail.status(now, timeUntilStoreDead, sp.NodeLivenessFn, timeAfterStoreSuspect) require.Equal(t, s, storeStatusAvailable) @@ -680,7 +679,7 @@ func TestStorePoolSuspected(t *testing.T) { s = detail.status(now, timeUntilStoreDead, sp.NodeLivenessFn, timeAfterStoreSuspect) require.Equal(t, s, storeStatusSuspect) - now = now.Add(timeAfterStoreSuspect).Add(time.Millisecond) + now = now.AddDuration(timeAfterStoreSuspect).AddDuration(time.Millisecond) mnl.SetNodeStatus(store.Node.NodeID, livenesspb.NodeLivenessStatus_LIVE) s = detail.status(now, timeUntilStoreDead, sp.NodeLivenessFn, timeAfterStoreSuspect) require.Equal(t, s, storeStatusAvailable) @@ -866,120 +865,95 @@ func TestStorePoolDecommissioningReplicas(t *testing.T) { func TestNodeLivenessLivenessStatus(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) - now := timeutil.Now() + now := hlc.Timestamp{ + WallTime: time.Date(2020, 0, 0, 0, 0, 0, 0, time.UTC).UnixNano(), + } threshold := 5 * time.Minute for _, tc := range []struct { + name string liveness livenesspb.Liveness expected livenesspb.NodeLivenessStatus }{ // Valid status. { + name: "Valid 5 min", liveness: livenesspb.Liveness{ - NodeID: 1, - Epoch: 1, - Expiration: hlc.LegacyTimestamp{ - WallTime: now.Add(5 * time.Minute).UnixNano(), - }, - Draining: false, + NodeID: 1, + Epoch: 1, + Expiration: now.AddDuration(5 * time.Minute).ToLegacyTimestamp(), + Draining: false, }, expected: livenesspb.NodeLivenessStatus_LIVE, }, { + name: "Expires just slightly in the future", liveness: livenesspb.Liveness{ - NodeID: 1, - Epoch: 1, - Expiration: hlc.LegacyTimestamp{ - // Expires just slightly in the future. - WallTime: now.UnixNano() + 1, - }, - Draining: false, + NodeID: 1, + Epoch: 1, + Expiration: now.AddDuration(time.Nanosecond).ToLegacyTimestamp(), + Draining: false, }, expected: livenesspb.NodeLivenessStatus_LIVE, }, - // Expired status. - { - liveness: livenesspb.Liveness{ - NodeID: 1, - Epoch: 1, - Expiration: hlc.LegacyTimestamp{ - // Just expired. - WallTime: now.UnixNano(), - }, - Draining: false, - }, - expected: livenesspb.NodeLivenessStatus_UNAVAILABLE, - }, - // Expired status. { + name: "Expired status", liveness: livenesspb.Liveness{ - NodeID: 1, - Epoch: 1, - Expiration: hlc.LegacyTimestamp{ - WallTime: now.UnixNano(), - }, - Draining: false, + NodeID: 1, + Epoch: 1, + Expiration: now.ToLegacyTimestamp(), + Draining: false, }, expected: livenesspb.NodeLivenessStatus_UNAVAILABLE, }, - // Max bound of expired. { + name: "Max bound of expired", liveness: livenesspb.Liveness{ - NodeID: 1, - Epoch: 1, - Expiration: hlc.LegacyTimestamp{ - WallTime: now.Add(-threshold).UnixNano() + 1, - }, - Draining: false, + NodeID: 1, + Epoch: 1, + Expiration: now.AddDuration(-threshold).AddDuration(time.Nanosecond).ToLegacyTimestamp(), + Draining: false, }, expected: livenesspb.NodeLivenessStatus_UNAVAILABLE, }, - // Dead status. { + name: "Dead", liveness: livenesspb.Liveness{ - NodeID: 1, - Epoch: 1, - Expiration: hlc.LegacyTimestamp{ - WallTime: now.Add(-threshold).UnixNano(), - }, - Draining: false, + NodeID: 1, + Epoch: 1, + Expiration: now.AddDuration(-threshold).ToLegacyTimestamp(), + Draining: false, }, expected: livenesspb.NodeLivenessStatus_DEAD, }, - // Decommissioning. { + name: "Decommissioning", liveness: livenesspb.Liveness{ - NodeID: 1, - Epoch: 1, - Expiration: hlc.LegacyTimestamp{ - WallTime: now.Add(time.Second).UnixNano(), - }, + NodeID: 1, + Epoch: 1, + Expiration: now.AddDuration(time.Second).ToLegacyTimestamp(), Membership: livenesspb.MembershipStatus_DECOMMISSIONING, Draining: false, }, expected: livenesspb.NodeLivenessStatus_DECOMMISSIONING, }, - // Decommissioning + expired. { + name: "Decommissioning + expired", liveness: livenesspb.Liveness{ - NodeID: 1, - Epoch: 1, - Expiration: hlc.LegacyTimestamp{ - WallTime: now.Add(-threshold).UnixNano(), - }, + NodeID: 1, + Epoch: 1, + Expiration: now.AddDuration(-threshold).ToLegacyTimestamp(), Membership: livenesspb.MembershipStatus_DECOMMISSIONING, Draining: false, }, expected: livenesspb.NodeLivenessStatus_DECOMMISSIONED, }, - // Decommissioned + live. { + name: "Decommissioning + live", liveness: livenesspb.Liveness{ - NodeID: 1, - Epoch: 1, - Expiration: hlc.LegacyTimestamp{ - WallTime: now.Add(time.Second).UnixNano(), - }, + NodeID: 1, + Epoch: 1, + Expiration: now.AddDuration(time.Second).ToLegacyTimestamp(), Membership: livenesspb.MembershipStatus_DECOMMISSIONED, Draining: false, }, @@ -988,60 +962,52 @@ func TestNodeLivenessLivenessStatus(t *testing.T) { // "Decommissioning". See #50707 for more details. expected: livenesspb.NodeLivenessStatus_DECOMMISSIONING, }, - // Decommissioned + expired. { + name: "Decommissioning + expired", liveness: livenesspb.Liveness{ - NodeID: 1, - Epoch: 1, - Expiration: hlc.LegacyTimestamp{ - WallTime: now.Add(-threshold).UnixNano(), - }, + NodeID: 1, + Epoch: 1, + Expiration: now.AddDuration(-threshold).ToLegacyTimestamp(), Membership: livenesspb.MembershipStatus_DECOMMISSIONED, Draining: false, }, expected: livenesspb.NodeLivenessStatus_DECOMMISSIONED, }, - // Draining { + name: "Draining", liveness: livenesspb.Liveness{ - NodeID: 1, - Epoch: 1, - Expiration: hlc.LegacyTimestamp{ - WallTime: now.Add(5 * time.Minute).UnixNano(), - }, - Draining: true, + NodeID: 1, + Epoch: 1, + Expiration: now.AddDuration(5 * time.Minute).ToLegacyTimestamp(), + Draining: true, }, expected: livenesspb.NodeLivenessStatus_DRAINING, }, - // Decommissioning that is unavailable. { + name: "Decommissioning that is unavailable", liveness: livenesspb.Liveness{ - NodeID: 1, - Epoch: 1, - Expiration: hlc.LegacyTimestamp{ - WallTime: now.UnixNano(), - }, + NodeID: 1, + Epoch: 1, + Expiration: now.ToLegacyTimestamp(), Draining: false, Membership: livenesspb.MembershipStatus_DECOMMISSIONING, }, expected: livenesspb.NodeLivenessStatus_UNAVAILABLE, }, - // Draining that is unavailable. { + name: "Draining that is unavailable", liveness: livenesspb.Liveness{ - NodeID: 1, - Epoch: 1, - Expiration: hlc.LegacyTimestamp{ - WallTime: now.UnixNano(), - }, - Draining: true, + NodeID: 1, + Epoch: 1, + Expiration: now.ToLegacyTimestamp(), + Draining: true, }, expected: livenesspb.NodeLivenessStatus_UNAVAILABLE, }, } { - t.Run("", func(t *testing.T) { + t.Run(tc.name, func(t *testing.T) { if a, e := LivenessStatus(tc.liveness, now, threshold), tc.expected; a != e { - t.Errorf("liveness status was %s, wanted %s", a.String(), e.String()) + t.Errorf("liveness status was %s, wanted %s for %+v, %v", a.String(), e.String(), tc.liveness, now) } }) } diff --git a/pkg/kv/kvserver/allocator/storepool/test_helpers.go b/pkg/kv/kvserver/allocator/storepool/test_helpers.go index 85df6efef8a4..ede0acb0f52b 100644 --- a/pkg/kv/kvserver/allocator/storepool/test_helpers.go +++ b/pkg/kv/kvserver/allocator/storepool/test_helpers.go @@ -55,7 +55,7 @@ func (m *MockNodeLiveness) SetNodeStatus( // NodeLivenessFunc is the method that can be injected as part of store pool // construction to mock out node liveness, in tests. func (m *MockNodeLiveness) NodeLivenessFunc( - nodeID roachpb.NodeID, now time.Time, threshold time.Duration, + nodeID roachpb.NodeID, now hlc.Timestamp, threshold time.Duration, ) livenesspb.NodeLivenessStatus { m.Lock() defer m.Unlock() @@ -76,7 +76,8 @@ func CreateTestStorePool( defaultNodeStatus livenesspb.NodeLivenessStatus, ) (*stop.Stopper, *gossip.Gossip, *timeutil.ManualTime, *StorePool, *MockNodeLiveness) { stopper := stop.NewStopper() - mc := timeutil.NewManualTime(timeutil.Unix(0, 123)) + // Pick a random date that is "realistic" and far enough away from 0. + mc := timeutil.NewManualTime(time.Date(2020, 0, 0, 0, 0, 0, 0, time.UTC)) clock := hlc.NewClockForTesting(mc) ambientCtx := log.MakeTestingAmbientContext(stopper.Tracer()) g := gossip.NewTest(1, stopper, metric.NewRegistry(), zonepb.DefaultZoneConfigRef()) diff --git a/pkg/kv/kvserver/allocator_impl_test.go b/pkg/kv/kvserver/allocator_impl_test.go index d3b31dc73d2a..15a1c318bb9e 100644 --- a/pkg/kv/kvserver/allocator_impl_test.go +++ b/pkg/kv/kvserver/allocator_impl_test.go @@ -419,7 +419,7 @@ func TestAllocatorThrottled(t *testing.T) { if !ok { t.Fatalf("store:%d was not found in the store pool", singleStore[0].StoreID) } - storeDetail.ThrottledUntil = timeutil.Now().Add(24 * time.Hour) + storeDetail.ThrottledUntil = hlc.Timestamp{WallTime: timeutil.Now().Add(24 * time.Hour).UnixNano()} sp.DetailsMu.Unlock() _, _, err = a.AllocateVoter(ctx, sp, simpleSpanConfig, []roachpb.ReplicaDescriptor{}, nil, nil, allocatorimpl.Dead) if _, ok := IsPurgatoryError(err); ok { diff --git a/pkg/kv/kvserver/asim/gossip/BUILD.bazel b/pkg/kv/kvserver/asim/gossip/BUILD.bazel index f007187eb6e6..54408e7eadcd 100644 --- a/pkg/kv/kvserver/asim/gossip/BUILD.bazel +++ b/pkg/kv/kvserver/asim/gossip/BUILD.bazel @@ -15,6 +15,7 @@ go_library( "//pkg/kv/kvserver/asim/config", "//pkg/kv/kvserver/asim/state", "//pkg/roachpb", + "//pkg/util/hlc", "//pkg/util/protoutil", ], ) diff --git a/pkg/kv/kvserver/asim/gossip/exchange.go b/pkg/kv/kvserver/asim/gossip/exchange.go index 314e739ae6fd..e5dd9f84deab 100644 --- a/pkg/kv/kvserver/asim/gossip/exchange.go +++ b/pkg/kv/kvserver/asim/gossip/exchange.go @@ -17,6 +17,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvserver/allocator/storepool" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/asim/config" "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/util/hlc" ) // exchangeInfo contains the information of a gossiped store descriptor. @@ -58,6 +59,6 @@ func (u *fixedDelayExchange) updates(tick time.Time) []*storepool.StoreDetail { func makeStoreDetail(desc *roachpb.StoreDescriptor, tick time.Time) *storepool.StoreDetail { return &storepool.StoreDetail{ Desc: desc, - LastUpdatedTime: tick, + LastUpdatedTime: hlc.Timestamp{WallTime: tick.UnixNano()}, } } diff --git a/pkg/kv/kvserver/asim/state/BUILD.bazel b/pkg/kv/kvserver/asim/state/BUILD.bazel index 9ca689d2e1be..ba5b88255cf4 100644 --- a/pkg/kv/kvserver/asim/state/BUILD.bazel +++ b/pkg/kv/kvserver/asim/state/BUILD.bazel @@ -59,6 +59,7 @@ go_test( "//pkg/kv/kvserver/liveness/livenesspb", "//pkg/kv/kvserver/load", "//pkg/roachpb", + "//pkg/util/hlc", "@com_github_stretchr_testify//require", ], ) diff --git a/pkg/kv/kvserver/asim/state/impl.go b/pkg/kv/kvserver/asim/state/impl.go index 41d999dba0ba..e2023be4c8dc 100644 --- a/pkg/kv/kvserver/asim/state/impl.go +++ b/pkg/kv/kvserver/asim/state/impl.go @@ -968,7 +968,7 @@ func (s *state) SetNodeLiveness(nodeID NodeID, status livenesspb.NodeLivenessSta // liveness of the Node with ID NodeID. // TODO(kvoli): Find a better home for this method, required by the storepool. func (s *state) NodeLivenessFn() storepool.NodeLivenessFunc { - return func(nid roachpb.NodeID, now time.Time, timeUntilStoreDead time.Duration) livenesspb.NodeLivenessStatus { + return func(nid roachpb.NodeID, now hlc.Timestamp, timeUntilStoreDead time.Duration) livenesspb.NodeLivenessStatus { return s.quickLivenessMap[NodeID(nid)] } } diff --git a/pkg/kv/kvserver/asim/state/state_test.go b/pkg/kv/kvserver/asim/state/state_test.go index a30dffa78478..bd1473f950c7 100644 --- a/pkg/kv/kvserver/asim/state/state_test.go +++ b/pkg/kv/kvserver/asim/state/state_test.go @@ -14,13 +14,13 @@ import ( "math" "math/rand" "testing" - "time" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/asim/config" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/asim/workload" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/liveness/livenesspb" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/load" "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/stretchr/testify/require" ) @@ -605,9 +605,9 @@ func TestSetNodeLiveness(t *testing.T) { // Liveness status returend should ignore time till store dead or the // timestamp given. - require.Equal(t, livenesspb.NodeLivenessStatus_LIVE, liveFn(1, time.Time{}, math.MaxInt64)) - require.Equal(t, livenesspb.NodeLivenessStatus_DEAD, liveFn(2, time.Time{}, math.MaxInt64)) - require.Equal(t, livenesspb.NodeLivenessStatus_DECOMMISSIONED, liveFn(3, time.Time{}, math.MaxInt64)) + require.Equal(t, livenesspb.NodeLivenessStatus_LIVE, liveFn(1, hlc.Timestamp{}, math.MaxInt64)) + require.Equal(t, livenesspb.NodeLivenessStatus_DEAD, liveFn(2, hlc.Timestamp{}, math.MaxInt64)) + require.Equal(t, livenesspb.NodeLivenessStatus_DECOMMISSIONED, liveFn(3, hlc.Timestamp{}, math.MaxInt64)) }) t.Run("node count fn", func(t *testing.T) { diff --git a/pkg/kv/kvserver/client_replica_circuit_breaker_test.go b/pkg/kv/kvserver/client_replica_circuit_breaker_test.go index d3540c62f219..2e4894419eab 100644 --- a/pkg/kv/kvserver/client_replica_circuit_breaker_test.go +++ b/pkg/kv/kvserver/client_replica_circuit_breaker_test.go @@ -884,7 +884,7 @@ func (cbt *circuitBreakerTest) ExpireAllLeasesAndN1LivenessRecord( testutils.SucceedsSoon(t, func() error { self, ok := lv.Self() require.True(t, ok) - if self.IsLive(cbt.Server(n2).Clock().Now().GoTime()) { + if self.IsLive(cbt.Server(n2).Clock().Now()) { // Someone else must have incremented epoch. return nil } diff --git a/pkg/kv/kvserver/helpers_test.go b/pkg/kv/kvserver/helpers_test.go index f3837c445492..2ac26f013cc3 100644 --- a/pkg/kv/kvserver/helpers_test.go +++ b/pkg/kv/kvserver/helpers_test.go @@ -225,7 +225,7 @@ func NewTestStorePool(cfg StoreConfig) *storepool.StorePool { func() int { return 1 }, - func(roachpb.NodeID, time.Time, time.Duration) livenesspb.NodeLivenessStatus { + func(roachpb.NodeID, hlc.Timestamp, time.Duration) livenesspb.NodeLivenessStatus { return livenesspb.NodeLivenessStatus_LIVE }, /* deterministic */ false, diff --git a/pkg/kv/kvserver/liveness/liveness.go b/pkg/kv/kvserver/liveness/liveness.go index fb56f92450cb..cb9128aa3433 100644 --- a/pkg/kv/kvserver/liveness/liveness.go +++ b/pkg/kv/kvserver/liveness/liveness.go @@ -699,7 +699,7 @@ func (nl *NodeLiveness) IsLive(nodeID roachpb.NodeID) (bool, error) { } // NB: We use clock.Now().GoTime() instead of clock.PhysicalTime() in order to // consider clock signals from other nodes. - return liveness.IsLive(nl.clock.Now().GoTime()), nil + return liveness.IsLive(nl.clock.Now()), nil } // IsAvailable returns whether or not the specified node is available to serve @@ -708,7 +708,7 @@ func (nl *NodeLiveness) IsLive(nodeID roachpb.NodeID) (bool, error) { // Returns false if the node is not in the local liveness table. func (nl *NodeLiveness) IsAvailable(nodeID roachpb.NodeID) bool { liveness, ok := nl.GetLiveness(nodeID) - return ok && liveness.IsLive(nl.clock.Now().GoTime()) && !liveness.Membership.Decommissioned() + return ok && liveness.IsLive(nl.clock.Now()) && !liveness.Membership.Decommissioned() } // IsAvailableNotDraining returns whether or not the specified node is available @@ -720,7 +720,7 @@ func (nl *NodeLiveness) IsAvailable(nodeID roachpb.NodeID) bool { func (nl *NodeLiveness) IsAvailableNotDraining(nodeID roachpb.NodeID) bool { liveness, ok := nl.GetLiveness(nodeID) return ok && - liveness.IsLive(nl.clock.Now().GoTime()) && + liveness.IsLive(nl.clock.Now()) && !liveness.Membership.Decommissioning() && !liveness.Membership.Decommissioned() && !liveness.Draining @@ -1007,7 +1007,7 @@ func (nl *NodeLiveness) heartbeatInternal( // return a success even if the expiration is only 5 seconds in the // future. The next heartbeat will then start with only 0.5 seconds // before expiration. - if actual.IsLive(nl.clock.Now().GoTime()) && !incrementEpoch { + if actual.IsLive(nl.clock.Now()) && !incrementEpoch { return errNodeAlreadyLive } // Otherwise, return error. @@ -1055,7 +1055,7 @@ func (nl *NodeLiveness) GetIsLiveMap() livenesspb.IsLiveMap { lMap := livenesspb.IsLiveMap{} nl.mu.RLock() defer nl.mu.RUnlock() - now := nl.clock.Now().GoTime() + now := nl.clock.Now() for nID, l := range nl.mu.nodes { isLive := l.IsLive(now) if !isLive && !l.Membership.Active() { @@ -1215,7 +1215,7 @@ func (nl *NodeLiveness) IncrementEpoch(ctx context.Context, liveness livenesspb. <-sem }() - if liveness.IsLive(nl.clock.Now().GoTime()) { + if liveness.IsLive(nl.clock.Now()) { return errors.Errorf("cannot increment epoch on live node: %+v", liveness) } @@ -1445,7 +1445,7 @@ func (nl *NodeLiveness) maybeUpdate(ctx context.Context, newLivenessRec Record) return } - now := nl.clock.Now().GoTime() + now := nl.clock.Now() if !oldLivenessRec.IsLive(now) && newLivenessRec.IsLive(now) { for _, fn := range onIsLive { fn(newLivenessRec.Liveness) @@ -1524,7 +1524,7 @@ func (nl *NodeLiveness) numLiveNodes() int64 { if !ok { return 0 } - now := nl.clock.Now().GoTime() + now := nl.clock.Now() // If this node isn't live, we don't want to report its view of node liveness // because it's more likely to be inaccurate than the view of a live node. if !self.IsLive(now) { diff --git a/pkg/kv/kvserver/liveness/livenesspb/BUILD.bazel b/pkg/kv/kvserver/liveness/livenesspb/BUILD.bazel index edaaaa99f792..562c74e854d3 100644 --- a/pkg/kv/kvserver/liveness/livenesspb/BUILD.bazel +++ b/pkg/kv/kvserver/liveness/livenesspb/BUILD.bazel @@ -11,7 +11,7 @@ go_library( visibility = ["//visibility:public"], deps = [ "//pkg/roachpb", - "//pkg/util/timeutil", + "//pkg/util/hlc", "@org_golang_google_grpc//codes", "@org_golang_google_grpc//status", ], diff --git a/pkg/kv/kvserver/liveness/livenesspb/liveness.go b/pkg/kv/kvserver/liveness/livenesspb/liveness.go index d6184c1d56de..dfea30af2a9f 100644 --- a/pkg/kv/kvserver/liveness/livenesspb/liveness.go +++ b/pkg/kv/kvserver/liveness/livenesspb/liveness.go @@ -15,7 +15,7 @@ import ( "time" "github.com/cockroachdb/cockroach/pkg/roachpb" - "github.com/cockroachdb/cockroach/pkg/util/timeutil" + "github.com/cockroachdb/cockroach/pkg/util/hlc" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" ) @@ -30,18 +30,16 @@ import ( // considered expired. For that purpose, it's better to pass in // clock.Now().GoTime() rather than clock.PhysicalNow() - the former takes into // consideration clock signals from other nodes, the latter doesn't. -func (l *Liveness) IsLive(now time.Time) bool { - expiration := timeutil.Unix(0, l.Expiration.WallTime) - return now.Before(expiration) +func (l *Liveness) IsLive(now hlc.Timestamp) bool { + return now.Less(l.Expiration.ToTimestamp()) } // IsDead returns true if the liveness expired more than threshold ago. // // Note that, because of threshold, IsDead() is not the inverse of IsLive(). -func (l *Liveness) IsDead(now time.Time, threshold time.Duration) bool { - expiration := timeutil.Unix(0, l.Expiration.WallTime) - deadAsOf := expiration.Add(threshold) - return !now.Before(deadAsOf) +func (l *Liveness) IsDead(now hlc.Timestamp, threshold time.Duration) bool { + expiration := l.Expiration.ToTimestamp().AddDuration(threshold) + return !now.Less(expiration) } // Compare returns an integer comparing two pieces of liveness information, diff --git a/pkg/kv/kvserver/replica_raft.go b/pkg/kv/kvserver/replica_raft.go index 0c8ef9ac2f29..80d3b162410b 100644 --- a/pkg/kv/kvserver/replica_raft.go +++ b/pkg/kv/kvserver/replica_raft.go @@ -2166,7 +2166,7 @@ func shouldCampaignOnLeaseRequestRedirect( // Store.updateLivenessMap). We only care whether the leader is currently live // according to node liveness because this determines whether it will be able // to acquire an epoch-based lease. - return !livenessEntry.Liveness.IsLive(now.GoTime()) + return !livenessEntry.Liveness.IsLive(now) } func (r *Replica) campaignLocked(ctx context.Context) { diff --git a/pkg/kv/kvserver/replicate_queue_test.go b/pkg/kv/kvserver/replicate_queue_test.go index 288bb2cac671..b38ceacf745e 100644 --- a/pkg/kv/kvserver/replicate_queue_test.go +++ b/pkg/kv/kvserver/replicate_queue_test.go @@ -1059,7 +1059,7 @@ func TestReplicateQueueDeadNonVoters(t *testing.T) { }, NodeLiveness: kvserver.NodeLivenessTestingKnobs{ StorePoolNodeLivenessFn: func( - id roachpb.NodeID, now time.Time, duration time.Duration, + id roachpb.NodeID, now hlc.Timestamp, duration time.Duration, ) livenesspb.NodeLivenessStatus { val := livenessTrap.Load() if val == nil { diff --git a/pkg/server/admin.go b/pkg/server/admin.go index 4815d38cf287..8a169bae8238 100644 --- a/pkg/server/admin.go +++ b/pkg/server/admin.go @@ -2183,7 +2183,7 @@ func (s *systemAdminServer) checkReadinessForHealthCheck(ctx context.Context) er if !ok { return grpcstatus.Error(codes.Unavailable, "liveness record not found") } - if !l.IsLive(s.clock.Now().GoTime()) { + if !l.IsLive(s.clock.Now()) { return grpcstatus.Errorf(codes.Unavailable, "node is not healthy") } if l.Draining { @@ -2212,7 +2212,7 @@ func (s *systemAdminServer) checkReadinessForHealthCheck(ctx context.Context) er // // getLivenessStatusMap() includes removed nodes (dead + decommissioned). func getLivenessStatusMap( - ctx context.Context, nl *liveness.NodeLiveness, now time.Time, st *cluster.Settings, + ctx context.Context, nl *liveness.NodeLiveness, now hlc.Timestamp, st *cluster.Settings, ) (map[roachpb.NodeID]livenesspb.NodeLivenessStatus, error) { livenesses, err := nl.GetLivenessesFromKV(ctx) if err != nil { @@ -2232,7 +2232,7 @@ func getLivenessStatusMap( // a slice containing the liveness record of all nodes that have ever been a part of the // cluster. func getLivenessResponse( - ctx context.Context, nl optionalnodeliveness.Interface, now time.Time, st *cluster.Settings, + ctx context.Context, nl optionalnodeliveness.Interface, now hlc.Timestamp, st *cluster.Settings, ) (*serverpb.LivenessResponse, error) { livenesses, err := nl.GetLivenessesFromKV(ctx) if err != nil { @@ -2274,7 +2274,7 @@ func (s *systemAdminServer) Liveness( ) (*serverpb.LivenessResponse, error) { clock := s.clock - return getLivenessResponse(ctx, s.nodeLiveness, clock.Now().GoTime(), s.st) + return getLivenessResponse(ctx, s.nodeLiveness, clock.Now(), s.st) } func (s *adminServer) Jobs( @@ -2718,7 +2718,7 @@ func (s *systemAdminServer) DecommissionPreCheck( // Initially evaluate node liveness status, so we filter the nodes to check. var nodesToCheck []roachpb.NodeID - livenessStatusByNodeID, err := getLivenessStatusMap(ctx, s.nodeLiveness, s.clock.Now().GoTime(), s.st) + livenessStatusByNodeID, err := getLivenessStatusMap(ctx, s.nodeLiveness, s.clock.Now(), s.st) if err != nil { return nil, serverError(ctx, err) } @@ -2927,7 +2927,7 @@ func (s *systemAdminServer) decommissionStatusHelper( Draining: l.Draining, ReportedReplicas: replicasToReport[l.NodeID], } - if l.IsLive(s.clock.Now().GoTime()) { + if l.IsLive(s.clock.Now()) { nodeResp.IsLive = true } res.Status = append(res.Status, nodeResp) diff --git a/pkg/server/auto_upgrade.go b/pkg/server/auto_upgrade.go index c758d12f54b2..ac8c292d1e5a 100644 --- a/pkg/server/auto_upgrade.go +++ b/pkg/server/auto_upgrade.go @@ -128,7 +128,7 @@ func (s *Server) upgradeStatus( return upgradeBlockedDueToError, err } clock := s.admin.server.clock - statusMap, err := getLivenessStatusMap(ctx, s.nodeLiveness, clock.Now().GoTime(), s.st) + statusMap, err := getLivenessStatusMap(ctx, s.nodeLiveness, clock.Now(), s.st) if err != nil { return upgradeBlockedDueToError, err } diff --git a/pkg/server/clock_monotonicity.go b/pkg/server/clock_monotonicity.go index c7aebd60efc6..42928eac60be 100644 --- a/pkg/server/clock_monotonicity.go +++ b/pkg/server/clock_monotonicity.go @@ -139,7 +139,7 @@ func ensureClockMonotonicity( if delta > 0 { log.Ops.Infof( ctx, - "Sleeping till wall time %v to catches up to %v to ensure monotonicity. Delta: %v", + "Sleeping till wall time %v to catches up to %v to ensure monotonicity. Sub: %v", currentWallTime, sleepUntil, delta, diff --git a/pkg/server/fanout_clients.go b/pkg/server/fanout_clients.go index 7d1c5dfc1307..7cb7a1372ab0 100644 --- a/pkg/server/fanout_clients.go +++ b/pkg/server/fanout_clients.go @@ -240,7 +240,7 @@ func (k kvFanoutClient) listNodes(ctx context.Context) (*serverpb.NodesResponse, } clock := k.clock - resp.LivenessByNodeID, err = getLivenessStatusMap(ctx, k.nodeLiveness, clock.Now().GoTime(), k.st) + resp.LivenessByNodeID, err = getLivenessStatusMap(ctx, k.nodeLiveness, clock.Now(), k.st) if err != nil { return nil, err } diff --git a/pkg/server/status.go b/pkg/server/status.go index 27fc454e5264..fb068ec533d3 100644 --- a/pkg/server/status.go +++ b/pkg/server/status.go @@ -1816,7 +1816,7 @@ func (s *systemStatusServer) nodesHelper( } clock := s.clock - resp.LivenessByNodeID, err = getLivenessStatusMap(ctx, s.nodeLiveness, clock.Now().GoTime(), s.st) + resp.LivenessByNodeID, err = getLivenessStatusMap(ctx, s.nodeLiveness, clock.Now(), s.st) if err != nil { return nil, 0, err } diff --git a/pkg/util/hlc/timestamp.go b/pkg/util/hlc/timestamp.go index ed514b10db1c..11633150a8fa 100644 --- a/pkg/util/hlc/timestamp.go +++ b/pkg/util/hlc/timestamp.go @@ -54,6 +54,12 @@ func (t Timestamp) LessEq(s Timestamp) bool { return t.WallTime < s.WallTime || (t.WallTime == s.WallTime && t.Logical <= s.Logical) } +// After returns whether the provided timestamp is after our timestamp. +// This matches the behavior of time.After. +func (t Timestamp) After(s Timestamp) bool { + return !t.LessEq(s) +} + // Compare returns -1 if this timestamp is lesser than the given timestamp, 1 if // it is greater, and 0 if they are equal. func (t Timestamp) Compare(s Timestamp) int { @@ -205,6 +211,14 @@ func (t Timestamp) IsSet() bool { return !t.IsEmpty() } +// AddDuration adds a given duration to this Timestamp. The resulting timestamp +// is Synthetic. Normally if you want to bump your clock to the higher of two +// timestamps, use Forward, however this method is here to create a +// hlc.Timestamp in the future (or past). +func (t Timestamp) AddDuration(duration time.Duration) Timestamp { + return t.Add(duration.Nanoseconds(), t.Logical) +} + // Add returns a timestamp with the WallTime and Logical components increased. // wallTime is expressed in nanos. //