Skip to content

Commit

Permalink
storage: change low-level liveness methods to regular time
Browse files Browse the repository at this point in the history
liveness.IsLive/IsDead were taking an HLC, although liveness expiration
doesn not have a logical part. Remove usage of hlc to simplify things.
Also standardize the time source used by the different functions testing
liveness expiration. Some of them were using clock.Now(), others were
using clock.PhysicalTime(). This patch picks the former, which also
matches the source used when setting the liveness.Expiration.

Release note: None
  • Loading branch information
andreimatei committed Jan 23, 2020
1 parent 5a57cf6 commit 1d76206
Show file tree
Hide file tree
Showing 11 changed files with 70 additions and 43 deletions.
6 changes: 3 additions & 3 deletions pkg/jobs/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ func (r *Registry) MetricsStruct() *Metrics {
// lenientNow returns the timestamp after which we should attempt
// to steal a job from a node whose liveness is failing. This allows
// jobs coordinated by a node which is temporarily saturated to continue.
func (r *Registry) lenientNow() hlc.Timestamp {
func (r *Registry) lenientNow() time.Time {
// We see this in tests.
var offset time.Duration
if r.settings == cluster.NoSettings {
Expand All @@ -181,7 +181,7 @@ func (r *Registry) lenientNow() hlc.Timestamp {
offset = LeniencySetting.Get(&r.settings.SV)
}

return r.clock.Now().Add(-offset.Nanoseconds(), 0)
return r.clock.Now().GoTime().Add(-offset)
}

// makeCtx returns a new context from r's ambient context and an associated
Expand Down Expand Up @@ -741,7 +741,7 @@ func (r *Registry) maybeAdoptJob(ctx context.Context, nl NodeLiveness) error {
// Don't try to start any more jobs unless we're really live,
// otherwise we'd just immediately cancel them.
if liveness.NodeID == r.nodeID.Get() {
if !liveness.IsLive(r.clock.Now()) {
if !liveness.IsLive(r.clock.Now().GoTime()) {
return errors.Errorf(
"trying to adopt jobs on node %d which is not live", liveness.NodeID)
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/server/admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -1268,7 +1268,7 @@ func (s *adminServer) Liveness(
) (*serverpb.LivenessResponse, error) {
clock := s.server.clock
statusMap := getLivenessStatusMap(
s.server.nodeLiveness, clock.PhysicalTime(), s.server.st)
s.server.nodeLiveness, clock.Now().GoTime(), s.server.st)
livenesses := s.server.nodeLiveness.GetLivenesses()
return &serverpb.LivenessResponse{
Livenesses: livenesses,
Expand Down Expand Up @@ -1575,7 +1575,7 @@ func (s *adminServer) DecommissionStatus(
Decommissioning: l.Decommissioning,
Draining: l.Draining,
}
if l.IsLive(s.server.clock.Now()) {
if l.IsLive(s.server.clock.Now().GoTime()) {
nodeResp.IsLive = true
}

Expand Down
8 changes: 3 additions & 5 deletions pkg/server/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/storage"
"github.com/cockroachdb/cockroach/pkg/storage/storagepb"
"github.com/cockroachdb/cockroach/pkg/util/contextutil"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/httputil"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/stop"
Expand Down Expand Up @@ -671,8 +670,7 @@ func (s *statusServer) Details(
if err != nil {
return nil, grpcstatus.Error(codes.Internal, err.Error())
}
nowHlc := hlc.Timestamp{WallTime: s.admin.server.clock.PhysicalNow()}
isHealthy := l.IsLive(nowHlc) && !l.Draining
isHealthy := l.IsLive(s.admin.server.clock.Now().GoTime()) && !l.Draining
if !isHealthy {
return nil, grpcstatus.Error(codes.Unavailable, "node is not ready")
}
Expand Down Expand Up @@ -1041,7 +1039,7 @@ func (s *statusServer) Nodes(
}

clock := s.admin.server.clock
resp.LivenessByNodeID = getLivenessStatusMap(s.nodeLiveness, clock.PhysicalTime(), s.st)
resp.LivenessByNodeID = getLivenessStatusMap(s.nodeLiveness, clock.Now().GoTime(), s.st)

return &resp, nil
}
Expand All @@ -1056,7 +1054,7 @@ func (s *statusServer) nodesStatusWithLiveness(
return nil, err
}
clock := s.admin.server.clock
statusMap := getLivenessStatusMap(s.nodeLiveness, clock.PhysicalTime(), s.st)
statusMap := getLivenessStatusMap(s.nodeLiveness, clock.Now().GoTime(), s.st)
ret := make(map[roachpb.NodeID]nodeStatusWithLiveness)
for _, node := range nodes.Nodes {
nodeID := node.Desc.NodeID
Expand Down
1 change: 1 addition & 0 deletions pkg/storage/allocator.go
Original file line number Diff line number Diff line change
Expand Up @@ -853,6 +853,7 @@ func (a *Allocator) TransferLeaseTarget(

// Short-circuit if there are no valid targets out there.
if len(existing) == 0 || (len(existing) == 1 && existing[0].StoreID == leaseStoreID) {
log.VEventf(ctx, 2, "no lease transfer target found")
return roachpb.ReplicaDescriptor{}
}

Expand Down
12 changes: 9 additions & 3 deletions pkg/storage/client_replica_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -507,7 +507,8 @@ func TestRangeLookupUseReverse(t *testing.T) {
}

type leaseTransferTest struct {
mtc *multiTestContext
mtc *multiTestContext
// replicas of range covering key "a" on the first and the second stores.
replica0, replica1 *storage.Replica
replica0Desc, replica1Desc roachpb.ReplicaDescriptor
leftKey roachpb.Key
Expand Down Expand Up @@ -643,15 +644,20 @@ func (l *leaseTransferTest) setFilter(setTo bool, extensionSem chan struct{}) {
l.filter = nil
l.filterMu.Unlock()
extensionSem <- struct{}{}
log.Infof(filterArgs.Ctx, "filter blocking request: %s", llReq)
<-extensionSem
log.Infof(filterArgs.Ctx, "filter unblocking lease request")
}
return nil
}
}

// forceLeaseExtension moves the clock forward close to the lease's expiration,
// and then performs a read on the range, which will force the lease to be
// renewed. This assumes the lease is not epoch-based.
func (l *leaseTransferTest) forceLeaseExtension(storeIdx int, lease roachpb.Lease) error {
shouldRenewTS := lease.Expiration.Add(-1, 0)
l.mtc.manualClock.Set(shouldRenewTS.WallTime + 1)
// Set the clock close to the lease's expiration.
l.mtc.manualClock.Set(lease.Expiration.WallTime - 10)
err := l.sendRead(storeIdx).GoError()
// We can sometimes receive an error from our renewal attempt because the
// lease transfer ends up causing the renewal to re-propose and second
Expand Down
17 changes: 9 additions & 8 deletions pkg/storage/node_liveness.go
Original file line number Diff line number Diff line change
Expand Up @@ -410,7 +410,9 @@ func (nl *NodeLiveness) IsLive(nodeID roachpb.NodeID) (bool, error) {
if err != nil {
return false, err
}
return liveness.IsLive(nl.clock.Now()), nil
// NB: We use clock.Now().GoTime() instead of clock.PhysicalTime() in order to
// consider clock signals from other nodes.
return liveness.IsLive(nl.clock.Now().GoTime()), nil
}

// StartHeartbeat starts a periodic heartbeat to refresh this node's
Expand Down Expand Up @@ -599,7 +601,7 @@ func (nl *NodeLiveness) heartbeatInternal(
// expired while in flight, so maybe we don't have to care about
// that and only need to distinguish between same and different
// epochs in our return value.
if actual.IsLive(nl.clock.Now()) && !incrementEpoch {
if actual.IsLive(nl.clock.Now().GoTime()) && !incrementEpoch {
return errNodeAlreadyLive
}
// Otherwise, return error.
Expand Down Expand Up @@ -646,7 +648,7 @@ func (nl *NodeLiveness) GetIsLiveMap() IsLiveMap {
lMap := IsLiveMap{}
nl.mu.RLock()
defer nl.mu.RUnlock()
now := nl.clock.Now()
now := nl.clock.Now().GoTime()
for nID, l := range nl.mu.nodes {
isLive := l.IsLive(now)
if !isLive && l.Decommissioning {
Expand Down Expand Up @@ -726,7 +728,7 @@ func (nl *NodeLiveness) IncrementEpoch(ctx context.Context, liveness storagepb.L
<-sem
}()

if liveness.IsLive(nl.clock.Now()) {
if liveness.IsLive(nl.clock.Now().GoTime()) {
return errors.Errorf("cannot increment epoch on live node: %+v", liveness)
}
update := livenessUpdate{Liveness: liveness}
Expand Down Expand Up @@ -896,7 +898,7 @@ func (nl *NodeLiveness) maybeUpdate(new storagepb.Liveness) {
return
}

now := nl.clock.Now()
now := nl.clock.Now().GoTime()
if !old.IsLive(now) && new.IsLive(now) {
for _, fn := range callbacks {
fn(new.NodeID)
Expand Down Expand Up @@ -956,8 +958,6 @@ func (nl *NodeLiveness) numLiveNodes() int64 {
return 0
}

now := nl.clock.Now()

nl.mu.RLock()
defer nl.mu.RUnlock()

Expand All @@ -969,6 +969,7 @@ func (nl *NodeLiveness) numLiveNodes() int64 {
log.Warningf(ctx, "looking up own liveness: %+v", err)
return 0
}
now := nl.clock.Now().GoTime()
// If this node isn't live, we don't want to report its view of node liveness
// because it's more likely to be inaccurate than the view of a live node.
if !self.IsLive(now) {
Expand All @@ -993,7 +994,7 @@ func (nl *NodeLiveness) AsLiveClock() closedts.LiveClockFn {
if err != nil {
return hlc.Timestamp{}, 0, err
}
if !liveness.IsLive(now) {
if !liveness.IsLive(now.GoTime()) {
return hlc.Timestamp{}, 0, errLiveClockNotLive
}
return now, ctpb.Epoch(liveness.Epoch), nil
Expand Down
29 changes: 20 additions & 9 deletions pkg/storage/storagepb/liveness.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,18 +13,29 @@ package storagepb
import (
"time"

"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
)

// IsLive returns whether the node is considered live at the given time with the
// given clock offset.
func (l *Liveness) IsLive(now hlc.Timestamp) bool {
expiration := hlc.Timestamp(l.Expiration)
return now.Less(expiration)
// IsLive returns whether the node is considered live at the given time.
//
// NOTE: If one is interested whether the Liveness is valid currently, then the
// timestamp passed in should be the known high-water mark of all the clocks of
// the nodes in the cluster. For example, if the liveness expires at ts 100, our
// physical clock is at 90, but we know that another node's clock is at 110,
// then it's preferable (more consistent across nodes) for the liveness to be
// considered expired. For that purpose, it's better to pass in
// clock.Now().GoTime() rather than clock.PhysicalNow() - the former takes into
// consideration clock signals from other nodes, the latter doesn't.
func (l *Liveness) IsLive(now time.Time) bool {
expiration := timeutil.Unix(0, l.Expiration.WallTime)
return now.Before(expiration)
}

// IsDead returns true if the liveness expired more than threshold ago.
func (l *Liveness) IsDead(now hlc.Timestamp, threshold time.Duration) bool {
deadAsOf := hlc.Timestamp(l.Expiration).GoTime().Add(threshold)
return !now.GoTime().Before(deadAsOf)
//
// Note that, because of threshold, IsDead() is not the inverse of IsLive().
func (l *Liveness) IsDead(now time.Time, threshold time.Duration) bool {
expiration := timeutil.Unix(0, l.Expiration.WallTime)
deadAsOf := expiration.Add(threshold)
return !now.Before(deadAsOf)
}
11 changes: 6 additions & 5 deletions pkg/storage/storagepb/liveness.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion pkg/storage/storagepb/liveness.proto
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@ message Liveness {
// may be incremented if the liveness record expires (current time
// is later than the expiration timestamp).
int64 epoch = 2;
// The timestamp at which this liveness record expires.
// The timestamp at which this liveness record expires. The logical part of
// this timestamp is zero.
//
// Note that the clock max offset is not accounted for in any way when this
// expiration is set. If a checker wants to be extra-optimistic about another
Expand Down
18 changes: 11 additions & 7 deletions pkg/storage/store_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,9 @@ func MakeStorePoolNodeLivenessFunc(nodeLiveness *NodeLiveness) NodeLivenessFunc
// LivenessStatus returns a NodeLivenessStatus enumeration value for the
// provided Liveness based on the provided timestamp and threshold.
//
// See the note on IsLive() for considerations on what should be passed in as
// `now`.
//
// The timeline of the states that a liveness goes through as time passes after
// the respective liveness record is written is the following:
//
Expand All @@ -131,8 +134,7 @@ func MakeStorePoolNodeLivenessFunc(nodeLiveness *NodeLiveness) NodeLivenessFunc
func LivenessStatus(
l storagepb.Liveness, now time.Time, deadThreshold time.Duration,
) storagepb.NodeLivenessStatus {
nowHlc := hlc.Timestamp{WallTime: now.UnixNano()}
if l.IsDead(nowHlc, deadThreshold) {
if l.IsDead(now, deadThreshold) {
if l.Decommissioning {
return storagepb.NodeLivenessStatus_DECOMMISSIONED
}
Expand All @@ -144,7 +146,7 @@ func LivenessStatus(
if l.Draining {
return storagepb.NodeLivenessStatus_UNAVAILABLE
}
if l.IsLive(nowHlc) {
if l.IsLive(now) {
return storagepb.NodeLivenessStatus_LIVE
}
return storagepb.NodeLivenessStatus_UNAVAILABLE
Expand Down Expand Up @@ -310,7 +312,7 @@ func (sp *StorePool) String() string {
sort.Sort(ids)

var buf bytes.Buffer
now := sp.clock.PhysicalTime()
now := sp.clock.Now().GoTime()
timeUntilStoreDead := TimeUntilStoreDead.Get(&sp.st.SV)

for _, id := range ids {
Expand Down Expand Up @@ -475,7 +477,9 @@ func (sp *StorePool) decommissioningReplicas(
sp.detailsMu.Lock()
defer sp.detailsMu.Unlock()

now := sp.clock.PhysicalTime()
// NB: We use clock.Now().GoTime() instead of clock.PhysicalTime() is order to
// take clock signals from remote nodes into consideration.
now := sp.clock.Now().GoTime()
timeUntilStoreDead := TimeUntilStoreDead.Get(&sp.st.SV)

for _, repl := range repls {
Expand Down Expand Up @@ -506,7 +510,7 @@ func (sp *StorePool) liveAndDeadReplicas(
sp.detailsMu.Lock()
defer sp.detailsMu.Unlock()

now := sp.clock.PhysicalTime()
now := sp.clock.Now().GoTime()
timeUntilStoreDead := TimeUntilStoreDead.Get(&sp.st.SV)

for _, repl := range repls {
Expand Down Expand Up @@ -682,7 +686,7 @@ func (sp *StorePool) getStoreListFromIDsRLocked(
var throttled throttledStoreReasons
var storeDescriptors []roachpb.StoreDescriptor

now := sp.clock.PhysicalTime()
now := sp.clock.Now().GoTime()
timeUntilStoreDead := TimeUntilStoreDead.Get(&sp.st.SV)

for _, storeID := range storeIDs {
Expand Down
4 changes: 4 additions & 0 deletions pkg/util/hlc/hlc.go
Original file line number Diff line number Diff line change
Expand Up @@ -288,6 +288,10 @@ func (c *Clock) enforceWallTimeWithinBoundLocked() {
}

// PhysicalNow returns the local wall time.
//
// Note that, contrary to Now(), PhysicalNow does not take into consideration
// higher clock signals received through Update(). If you want to take them into
// consideration, use c.Now().GoTime().
func (c *Clock) PhysicalNow() int64 {
return c.physicalClock()
}
Expand Down

0 comments on commit 1d76206

Please sign in to comment.