Skip to content

Commit

Permalink
liveness: Convert to using hlc.Timestamp
Browse files Browse the repository at this point in the history
Previously there were some conversions back and forth between hlc times
and go times. We always use hlc time when looking at the validity of
liveness, and this change makes that more clear.

Epic: none

Release note: None
  • Loading branch information
andrewbaptist committed May 22, 2023
1 parent e00172c commit 7af839f
Show file tree
Hide file tree
Showing 26 changed files with 171 additions and 187 deletions.
16 changes: 8 additions & 8 deletions pkg/kv/kvserver/allocator/allocatorimpl/allocator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -532,7 +532,7 @@ func mockStorePool(
for _, storeID := range suspectedStoreIDs {
liveNodeSet[roachpb.NodeID(storeID)] = livenesspb.NodeLivenessStatus_LIVE
detail := storePool.GetStoreDetailLocked(storeID)
detail.LastUnavailable = storePool.Clock().Now().GoTime()
detail.LastUnavailable = storePool.Clock().Now()
detail.Desc = &roachpb.StoreDescriptor{
StoreID: storeID,
Node: roachpb.NodeDescriptor{NodeID: roachpb.NodeID(storeID)},
Expand All @@ -542,7 +542,7 @@ func mockStorePool(
// Set the node liveness function using the set we constructed.
// TODO(sarkesian): This override needs to be fixed to stop exporting this field.
storePool.NodeLivenessFn =
func(nodeID roachpb.NodeID, now time.Time, threshold time.Duration) livenesspb.NodeLivenessStatus {
func(nodeID roachpb.NodeID, now hlc.Timestamp, threshold time.Duration) livenesspb.NodeLivenessStatus {
if status, ok := liveNodeSet[nodeID]; ok {
return status
}
Expand Down Expand Up @@ -842,7 +842,7 @@ func TestAllocatorReplaceDecommissioningReplica(t *testing.T) {
gossiputil.NewStoreGossiper(g).GossipStores(storeDescriptors, t)

// Override liveness of n3 to decommissioning so the only available target is s4.
oSp := storepool.NewOverrideStorePool(sp, func(nid roachpb.NodeID, now time.Time, timeUntilStoreDead time.Duration) livenesspb.NodeLivenessStatus {
oSp := storepool.NewOverrideStorePool(sp, func(nid roachpb.NodeID, now hlc.Timestamp, timeUntilStoreDead time.Duration) livenesspb.NodeLivenessStatus {
if nid == roachpb.NodeID(3) {
return livenesspb.NodeLivenessStatus_DECOMMISSIONING
}
Expand Down Expand Up @@ -900,7 +900,7 @@ func TestAllocatorReplaceFailsOnConstrainedDecommissioningReplica(t *testing.T)
gossiputil.NewStoreGossiper(g).GossipStores(sameDCStores, t)

// Override liveness of n3 to decommissioning so the only available target is s4.
oSp := storepool.NewOverrideStorePool(sp, func(nid roachpb.NodeID, now time.Time, timeUntilStoreDead time.Duration) livenesspb.NodeLivenessStatus {
oSp := storepool.NewOverrideStorePool(sp, func(nid roachpb.NodeID, now hlc.Timestamp, timeUntilStoreDead time.Duration) livenesspb.NodeLivenessStatus {
if nid == roachpb.NodeID(3) {
return livenesspb.NodeLivenessStatus_DECOMMISSIONING
}
Expand Down Expand Up @@ -6829,7 +6829,7 @@ func TestAllocatorComputeActionWithStorePoolRemoveDead(t *testing.T) {
// Mark all dead nodes as alive, so we can override later.
all := append(tcase.live, tcase.dead...)
mockStorePool(sp, all, nil, nil, nil, nil, nil)
oSp := storepool.NewOverrideStorePool(sp, func(nid roachpb.NodeID, now time.Time, timeUntilStoreDead time.Duration) livenesspb.NodeLivenessStatus {
oSp := storepool.NewOverrideStorePool(sp, func(nid roachpb.NodeID, now hlc.Timestamp, timeUntilStoreDead time.Duration) livenesspb.NodeLivenessStatus {
for _, deadStoreID := range tcase.dead {
if nid == roachpb.NodeID(deadStoreID) {
return livenesspb.NodeLivenessStatus_DEAD
Expand Down Expand Up @@ -8503,7 +8503,7 @@ func TestAllocatorFullDisks(t *testing.T) {
for i := 0; i < generations; i++ {
// First loop through test stores and randomly add data.
for j := 0; j < len(testStores); j++ {
if mockNodeLiveness.NodeLivenessFunc(roachpb.NodeID(j), time.Time{}, 0) == livenesspb.NodeLivenessStatus_DEAD {
if mockNodeLiveness.NodeLivenessFunc(roachpb.NodeID(j), hlc.Timestamp{}, 0) == livenesspb.NodeLivenessStatus_DEAD {
continue
}
ts := &testStores[j]
Expand All @@ -8530,7 +8530,7 @@ func TestAllocatorFullDisks(t *testing.T) {
// Loop through each store a number of times and maybe rebalance.
for j := 0; j < 10; j++ {
for k := 0; k < len(testStores); k++ {
if mockNodeLiveness.NodeLivenessFunc(roachpb.NodeID(k), time.Time{}, 0) == livenesspb.NodeLivenessStatus_DEAD {
if mockNodeLiveness.NodeLivenessFunc(roachpb.NodeID(k), hlc.Timestamp{}, 0) == livenesspb.NodeLivenessStatus_DEAD {
continue
}
ts := &testStores[k]
Expand Down Expand Up @@ -8567,7 +8567,7 @@ func TestAllocatorFullDisks(t *testing.T) {

// Simulate rocksdb compactions freeing up disk space.
for j := 0; j < len(testStores); j++ {
if mockNodeLiveness.NodeLivenessFunc(roachpb.NodeID(j), time.Time{}, 0) != livenesspb.NodeLivenessStatus_DEAD {
if mockNodeLiveness.NodeLivenessFunc(roachpb.NodeID(j), hlc.Timestamp{}, 0) != livenesspb.NodeLivenessStatus_DEAD {
ts := &testStores[j]
if ts.Capacity.Available <= 0 {
t.Errorf("testStore %d ran out of space during generation %d: %+v", j, i, ts.Capacity)
Expand Down
1 change: 0 additions & 1 deletion pkg/kv/kvserver/allocator/storepool/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@ go_test(
"//pkg/util/hlc",
"//pkg/util/leaktest",
"//pkg/util/log",
"//pkg/util/timeutil",
"@com_github_cockroachdb_errors//:errors",
"@com_github_kr_pretty//:pretty",
"@com_github_stretchr_testify//require",
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/allocator/storepool/override_store_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ var _ AllocatorStorePool = &OverrideStorePool{}
func OverrideNodeLivenessFunc(
overrides map[roachpb.NodeID]livenesspb.NodeLivenessStatus, realNodeLivenessFunc NodeLivenessFunc,
) NodeLivenessFunc {
return func(nid roachpb.NodeID, now time.Time, timeUntilStoreDead time.Duration) livenesspb.NodeLivenessStatus {
return func(nid roachpb.NodeID, now hlc.Timestamp, timeUntilStoreDead time.Duration) livenesspb.NodeLivenessStatus {
if override, ok := overrides[nid]; ok {
return override
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/testutils/gossiputil"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/stretchr/testify/require"
Expand All @@ -42,7 +43,7 @@ func TestOverrideStorePoolStatusString(t *testing.T) {
sg := gossiputil.NewStoreGossiper(g)

livenessOverrides := make(map[roachpb.NodeID]livenesspb.NodeLivenessStatus)
sp := NewOverrideStorePool(testStorePool, func(nid roachpb.NodeID, now time.Time, timeUntilStoreDead time.Duration) livenesspb.NodeLivenessStatus {
sp := NewOverrideStorePool(testStorePool, func(nid roachpb.NodeID, now hlc.Timestamp, timeUntilStoreDead time.Duration) livenesspb.NodeLivenessStatus {
if overriddenLiveness, ok := livenessOverrides[nid]; ok {
return overriddenLiveness
}
Expand Down Expand Up @@ -123,7 +124,7 @@ func TestOverrideStorePoolDecommissioningReplicas(t *testing.T) {
sg := gossiputil.NewStoreGossiper(g)

livenessOverrides := make(map[roachpb.NodeID]livenesspb.NodeLivenessStatus)
sp := NewOverrideStorePool(testStorePool, func(nid roachpb.NodeID, now time.Time, timeUntilStoreDead time.Duration) livenesspb.NodeLivenessStatus {
sp := NewOverrideStorePool(testStorePool, func(nid roachpb.NodeID, now hlc.Timestamp, timeUntilStoreDead time.Duration) livenesspb.NodeLivenessStatus {
if overriddenLiveness, ok := livenessOverrides[nid]; ok {
return overriddenLiveness
}
Expand Down Expand Up @@ -240,7 +241,7 @@ func TestOverrideStorePoolGetStoreList(t *testing.T) {
sg := gossiputil.NewStoreGossiper(g)

livenessOverrides := make(map[roachpb.NodeID]livenesspb.NodeLivenessStatus)
sp := NewOverrideStorePool(testStorePool, func(nid roachpb.NodeID, now time.Time, timeUntilStoreDead time.Duration) livenesspb.NodeLivenessStatus {
sp := NewOverrideStorePool(testStorePool, func(nid roachpb.NodeID, now hlc.Timestamp, timeUntilStoreDead time.Duration) livenesspb.NodeLivenessStatus {
if overriddenLiveness, ok := livenessOverrides[nid]; ok {
return overriddenLiveness
}
Expand Down Expand Up @@ -335,7 +336,7 @@ func TestOverrideStorePoolGetStoreList(t *testing.T) {

// Set suspectedStore as suspected.
testStorePool.DetailsMu.Lock()
testStorePool.DetailsMu.StoreDetails[suspectedStore.StoreID].LastUnavailable = testStorePool.clock.Now().GoTime()
testStorePool.DetailsMu.StoreDetails[suspectedStore.StoreID].LastUnavailable = testStorePool.clock.Now()
testStorePool.DetailsMu.Unlock()

// No filter or limited set of store IDs.
Expand Down
56 changes: 29 additions & 27 deletions pkg/kv/kvserver/allocator/storepool/store_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,15 +113,15 @@ type NodeCountFunc func() int
// not the node is live. A node is considered dead if its liveness record has
// expired by more than TimeUntilStoreDead.
type NodeLivenessFunc func(
nid roachpb.NodeID, now time.Time, timeUntilStoreDead time.Duration,
nid roachpb.NodeID, now hlc.Timestamp, timeUntilStoreDead time.Duration,
) livenesspb.NodeLivenessStatus

// MakeStorePoolNodeLivenessFunc returns a function which determines
// the status of a node based on information provided by the specified
// NodeLiveness.
func MakeStorePoolNodeLivenessFunc(nodeLiveness *liveness.NodeLiveness) NodeLivenessFunc {
return func(
nodeID roachpb.NodeID, now time.Time, timeUntilStoreDead time.Duration,
nodeID roachpb.NodeID, now hlc.Timestamp, timeUntilStoreDead time.Duration,
) livenesspb.NodeLivenessStatus {
liveness, ok := nodeLiveness.GetLiveness(nodeID)
if !ok {
Expand Down Expand Up @@ -160,7 +160,7 @@ func MakeStorePoolNodeLivenessFunc(nodeLiveness *liveness.NodeLiveness) NodeLive
// ideally we should remove usage of NodeLivenessStatus altogether. See #50707
// for more details.
func LivenessStatus(
l livenesspb.Liveness, now time.Time, deadThreshold time.Duration,
l livenesspb.Liveness, now hlc.Timestamp, deadThreshold time.Duration,
) livenesspb.NodeLivenessStatus {
// If we don't have a liveness expiration time, treat the status as unknown.
// This is different than unavailable as it doesn't transition through being
Expand Down Expand Up @@ -195,16 +195,16 @@ type StoreDetail struct {
Desc *roachpb.StoreDescriptor
// ThrottledUntil is when a throttled store can be considered available again
// due to a failed or declined snapshot.
ThrottledUntil time.Time
ThrottledUntil hlc.Timestamp
// throttledBecause is set to the most recent reason for which a store was
// marked as throttled.
throttledBecause string
// LastUpdatedTime is set when a store is first consulted and every time
// gossip arrives for a store.
LastUpdatedTime time.Time
LastUpdatedTime hlc.Timestamp
// LastUnavailable is set when it's detected that a store was unavailable,
// i.e. failed liveness.
LastUnavailable time.Time
LastUnavailable hlc.Timestamp
}

// storeStatus is the current status of a store.
Expand Down Expand Up @@ -238,7 +238,10 @@ const (
)

func (sd *StoreDetail) status(
now time.Time, deadThreshold time.Duration, nl NodeLivenessFunc, suspectDuration time.Duration,
now hlc.Timestamp,
deadThreshold time.Duration,
nl NodeLivenessFunc,
suspectDuration time.Duration,
) storeStatus {
// During normal operation, we expect the state transitions for stores to look like the following:
//
Expand All @@ -265,7 +268,7 @@ func (sd *StoreDetail) status(
// within the liveness threshold. Note that LastUpdatedTime is set
// when the store detail is created and will have a non-zero value
// even before the first gossip arrives for a store.
deadAsOf := sd.LastUpdatedTime.Add(deadThreshold)
deadAsOf := sd.LastUpdatedTime.AddDuration(deadThreshold)
if now.After(deadAsOf) {
sd.LastUnavailable = now
return storeStatusDead
Expand Down Expand Up @@ -305,7 +308,7 @@ func (sd *StoreDetail) status(
// Check whether the store is currently suspect. We measure that by
// looking at the time it was last unavailable making sure we have not seen any
// failures for a period of time defined by StoreSuspectDuration.
if sd.LastUnavailable.Add(suspectDuration).After(now) {
if sd.LastUnavailable.AddDuration(suspectDuration).After(now) {
return storeStatusSuspect
}

Expand Down Expand Up @@ -443,7 +446,7 @@ type StorePool struct {
gossip *gossip.Gossip
nodeCountFn NodeCountFunc
NodeLivenessFn NodeLivenessFunc
startTime time.Time
startTime hlc.Timestamp
deterministic bool

// We use separate mutexes for storeDetails and nodeLocalities because the
Expand Down Expand Up @@ -492,7 +495,7 @@ func NewStorePool(
gossip: g,
nodeCountFn: nodeCountFn,
NodeLivenessFn: nodeLivenessFn,
startTime: clock.PhysicalTime(),
startTime: clock.Now(),
deterministic: deterministic,
}
sp.DetailsMu.StoreDetails = make(map[roachpb.StoreID]*StoreDetail)
Expand Down Expand Up @@ -523,7 +526,7 @@ func (sp *StorePool) statusString(nl NodeLivenessFunc) string {
sort.Sort(ids)

var buf bytes.Buffer
now := sp.clock.Now().GoTime()
now := sp.clock.Now()
timeUntilStoreDead := TimeUntilStoreDead.Get(&sp.st.SV)
timeAfterStoreSuspect := TimeAfterStoreSuspect.Get(&sp.st.SV)

Expand All @@ -538,9 +541,8 @@ func (sp *StorePool) statusString(nl NodeLivenessFunc) string {
fmt.Fprintf(&buf, ": range-count=%d fraction-used=%.2f",
detail.Desc.Capacity.RangeCount, detail.Desc.Capacity.FractionUsed())
}
throttled := detail.ThrottledUntil.Sub(now)
if throttled > 0 {
fmt.Fprintf(&buf, " [throttled=%.1fs]", throttled.Seconds())
if detail.ThrottledUntil.After(now) {
fmt.Fprintf(&buf, " [throttled=%.1fs]", detail.ThrottledUntil.GoTime().Sub(now.GoTime()).Seconds())
}
_, _ = buf.WriteString("\n")
}
Expand Down Expand Up @@ -569,7 +571,7 @@ func (sp *StorePool) storeDescriptorUpdate(storeDesc roachpb.StoreDescriptor) {
storeID := storeDesc.StoreID
curCapacity := storeDesc.Capacity

now := sp.clock.PhysicalTime()
now := sp.clock.Now()

sp.DetailsMu.Lock()
detail := sp.GetStoreDetailLocked(storeID)
Expand Down Expand Up @@ -815,9 +817,9 @@ func (sp *StorePool) decommissioningReplicasWithLiveness(
sp.DetailsMu.Lock()
defer sp.DetailsMu.Unlock()

// NB: We use clock.Now().GoTime() instead of clock.PhysicalTime() is order to
// NB: We use clock.Now() instead of clock.PhysicalTime() is order to
// take clock signals from remote nodes into consideration.
now := sp.clock.Now().GoTime()
now := sp.clock.Now()
timeUntilStoreDead := TimeUntilStoreDead.Get(&sp.st.SV)
timeAfterStoreSuspect := TimeAfterStoreSuspect.Get(&sp.st.SV)

Expand Down Expand Up @@ -859,12 +861,12 @@ func (sp *StorePool) IsDead(storeID roachpb.StoreID) (bool, time.Duration, error
if !ok {
return false, 0, errors.Errorf("store %d was not found", storeID)
}
// NB: We use clock.Now().GoTime() instead of clock.PhysicalTime() is order to
// NB: We use clock.Now() instead of clock.PhysicalTime() is order to
// take clock signals from remote nodes into consideration.
now := sp.clock.Now().GoTime()
now := sp.clock.Now()
timeUntilStoreDead := TimeUntilStoreDead.Get(&sp.st.SV)

deadAsOf := sd.LastUpdatedTime.Add(timeUntilStoreDead)
deadAsOf := sd.LastUpdatedTime.AddDuration(timeUntilStoreDead)
if now.After(deadAsOf) {
return true, 0, nil
}
Expand All @@ -873,7 +875,7 @@ func (sp *StorePool) IsDead(storeID roachpb.StoreID) (bool, time.Duration, error
if sd.Desc == nil {
return false, 0, errors.Errorf("store %d status unknown, cant tell if it's dead or alive", storeID)
}
return false, deadAsOf.Sub(now), nil
return false, deadAsOf.GoTime().Sub(now.GoTime()), nil
}

// IsUnknown returns true if the given store's status is `storeStatusUnknown`
Expand Down Expand Up @@ -935,9 +937,9 @@ func (sp *StorePool) storeStatus(
if !ok {
return storeStatusUnknown, errors.Errorf("store %d was not found", storeID)
}
// NB: We use clock.Now().GoTime() instead of clock.PhysicalTime() is order to
// NB: We use clock.Now() instead of clock.PhysicalTime() is order to
// take clock signals from remote nodes into consideration.
now := sp.clock.Now().GoTime()
now := sp.clock.Now()
timeUntilStoreDead := TimeUntilStoreDead.Get(&sp.st.SV)
timeAfterStoreSuspect := TimeAfterStoreSuspect.Get(&sp.st.SV)
return sd.status(now, timeUntilStoreDead, nl, timeAfterStoreSuspect), nil
Expand Down Expand Up @@ -971,7 +973,7 @@ func (sp *StorePool) liveAndDeadReplicasWithLiveness(
sp.DetailsMu.Lock()
defer sp.DetailsMu.Unlock()

now := sp.clock.Now().GoTime()
now := sp.clock.Now()
timeUntilStoreDead := TimeUntilStoreDead.Get(&sp.st.SV)
timeAfterStoreSuspect := TimeAfterStoreSuspect.Get(&sp.st.SV)

Expand Down Expand Up @@ -1247,7 +1249,7 @@ func (sp *StorePool) getStoreListFromIDsLocked(
var throttled ThrottledStoreReasons
var storeDescriptors []roachpb.StoreDescriptor

now := sp.clock.Now().GoTime()
now := sp.clock.Now()
timeUntilStoreDead := TimeUntilStoreDead.Get(&sp.st.SV)
timeAfterStoreSuspect := TimeAfterStoreSuspect.Get(&sp.st.SV)

Expand Down Expand Up @@ -1311,7 +1313,7 @@ func (sp *StorePool) Throttle(reason ThrottleReason, why string, storeID roachpb
switch reason {
case ThrottleFailed:
timeout := FailedReservationsTimeout.Get(&sp.st.SV)
detail.ThrottledUntil = sp.clock.PhysicalTime().Add(timeout)
detail.ThrottledUntil = sp.clock.Now().AddDuration(timeout)
if log.V(2) {
ctx := sp.AnnotateCtx(context.TODO())
log.Infof(ctx, "snapshot failed (%s), s%d will be throttled for %s until %s",
Expand Down
Loading

0 comments on commit 7af839f

Please sign in to comment.