From 1d76206f6b2cf18dfd7a7722cf223dfdf2d7e6ac Mon Sep 17 00:00:00 2001 From: Andrei Matei Date: Thu, 9 Jan 2020 12:45:16 -0500 Subject: [PATCH] storage: change low-level liveness methods to regular time liveness.IsLive/IsDead were taking an HLC, although liveness expiration doesn not have a logical part. Remove usage of hlc to simplify things. Also standardize the time source used by the different functions testing liveness expiration. Some of them were using clock.Now(), others were using clock.PhysicalTime(). This patch picks the former, which also matches the source used when setting the liveness.Expiration. Release note: None --- pkg/jobs/registry.go | 6 +++--- pkg/server/admin.go | 4 ++-- pkg/server/status.go | 8 +++----- pkg/storage/allocator.go | 1 + pkg/storage/client_replica_test.go | 12 +++++++++--- pkg/storage/node_liveness.go | 17 ++++++++-------- pkg/storage/storagepb/liveness.go | 29 +++++++++++++++++++--------- pkg/storage/storagepb/liveness.pb.go | 11 ++++++----- pkg/storage/storagepb/liveness.proto | 3 ++- pkg/storage/store_pool.go | 18 ++++++++++------- pkg/util/hlc/hlc.go | 4 ++++ 11 files changed, 70 insertions(+), 43 deletions(-) diff --git a/pkg/jobs/registry.go b/pkg/jobs/registry.go index 513f77b29dfd..a60eedfd9c2d 100644 --- a/pkg/jobs/registry.go +++ b/pkg/jobs/registry.go @@ -172,7 +172,7 @@ func (r *Registry) MetricsStruct() *Metrics { // lenientNow returns the timestamp after which we should attempt // to steal a job from a node whose liveness is failing. This allows // jobs coordinated by a node which is temporarily saturated to continue. -func (r *Registry) lenientNow() hlc.Timestamp { +func (r *Registry) lenientNow() time.Time { // We see this in tests. var offset time.Duration if r.settings == cluster.NoSettings { @@ -181,7 +181,7 @@ func (r *Registry) lenientNow() hlc.Timestamp { offset = LeniencySetting.Get(&r.settings.SV) } - return r.clock.Now().Add(-offset.Nanoseconds(), 0) + return r.clock.Now().GoTime().Add(-offset) } // makeCtx returns a new context from r's ambient context and an associated @@ -741,7 +741,7 @@ func (r *Registry) maybeAdoptJob(ctx context.Context, nl NodeLiveness) error { // Don't try to start any more jobs unless we're really live, // otherwise we'd just immediately cancel them. if liveness.NodeID == r.nodeID.Get() { - if !liveness.IsLive(r.clock.Now()) { + if !liveness.IsLive(r.clock.Now().GoTime()) { return errors.Errorf( "trying to adopt jobs on node %d which is not live", liveness.NodeID) } diff --git a/pkg/server/admin.go b/pkg/server/admin.go index 2d6d60dd4924..19e38f3428e6 100644 --- a/pkg/server/admin.go +++ b/pkg/server/admin.go @@ -1268,7 +1268,7 @@ func (s *adminServer) Liveness( ) (*serverpb.LivenessResponse, error) { clock := s.server.clock statusMap := getLivenessStatusMap( - s.server.nodeLiveness, clock.PhysicalTime(), s.server.st) + s.server.nodeLiveness, clock.Now().GoTime(), s.server.st) livenesses := s.server.nodeLiveness.GetLivenesses() return &serverpb.LivenessResponse{ Livenesses: livenesses, @@ -1575,7 +1575,7 @@ func (s *adminServer) DecommissionStatus( Decommissioning: l.Decommissioning, Draining: l.Draining, } - if l.IsLive(s.server.clock.Now()) { + if l.IsLive(s.server.clock.Now().GoTime()) { nodeResp.IsLive = true } diff --git a/pkg/server/status.go b/pkg/server/status.go index 31a3e9065efe..a55043e97372 100644 --- a/pkg/server/status.go +++ b/pkg/server/status.go @@ -50,7 +50,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/storage" "github.com/cockroachdb/cockroach/pkg/storage/storagepb" "github.com/cockroachdb/cockroach/pkg/util/contextutil" - "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/httputil" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/stop" @@ -671,8 +670,7 @@ func (s *statusServer) Details( if err != nil { return nil, grpcstatus.Error(codes.Internal, err.Error()) } - nowHlc := hlc.Timestamp{WallTime: s.admin.server.clock.PhysicalNow()} - isHealthy := l.IsLive(nowHlc) && !l.Draining + isHealthy := l.IsLive(s.admin.server.clock.Now().GoTime()) && !l.Draining if !isHealthy { return nil, grpcstatus.Error(codes.Unavailable, "node is not ready") } @@ -1041,7 +1039,7 @@ func (s *statusServer) Nodes( } clock := s.admin.server.clock - resp.LivenessByNodeID = getLivenessStatusMap(s.nodeLiveness, clock.PhysicalTime(), s.st) + resp.LivenessByNodeID = getLivenessStatusMap(s.nodeLiveness, clock.Now().GoTime(), s.st) return &resp, nil } @@ -1056,7 +1054,7 @@ func (s *statusServer) nodesStatusWithLiveness( return nil, err } clock := s.admin.server.clock - statusMap := getLivenessStatusMap(s.nodeLiveness, clock.PhysicalTime(), s.st) + statusMap := getLivenessStatusMap(s.nodeLiveness, clock.Now().GoTime(), s.st) ret := make(map[roachpb.NodeID]nodeStatusWithLiveness) for _, node := range nodes.Nodes { nodeID := node.Desc.NodeID diff --git a/pkg/storage/allocator.go b/pkg/storage/allocator.go index 9d353487814f..bd173e194f3a 100644 --- a/pkg/storage/allocator.go +++ b/pkg/storage/allocator.go @@ -853,6 +853,7 @@ func (a *Allocator) TransferLeaseTarget( // Short-circuit if there are no valid targets out there. if len(existing) == 0 || (len(existing) == 1 && existing[0].StoreID == leaseStoreID) { + log.VEventf(ctx, 2, "no lease transfer target found") return roachpb.ReplicaDescriptor{} } diff --git a/pkg/storage/client_replica_test.go b/pkg/storage/client_replica_test.go index 4d7689272b73..41d16481bfe4 100644 --- a/pkg/storage/client_replica_test.go +++ b/pkg/storage/client_replica_test.go @@ -507,7 +507,8 @@ func TestRangeLookupUseReverse(t *testing.T) { } type leaseTransferTest struct { - mtc *multiTestContext + mtc *multiTestContext + // replicas of range covering key "a" on the first and the second stores. replica0, replica1 *storage.Replica replica0Desc, replica1Desc roachpb.ReplicaDescriptor leftKey roachpb.Key @@ -643,15 +644,20 @@ func (l *leaseTransferTest) setFilter(setTo bool, extensionSem chan struct{}) { l.filter = nil l.filterMu.Unlock() extensionSem <- struct{}{} + log.Infof(filterArgs.Ctx, "filter blocking request: %s", llReq) <-extensionSem + log.Infof(filterArgs.Ctx, "filter unblocking lease request") } return nil } } +// forceLeaseExtension moves the clock forward close to the lease's expiration, +// and then performs a read on the range, which will force the lease to be +// renewed. This assumes the lease is not epoch-based. func (l *leaseTransferTest) forceLeaseExtension(storeIdx int, lease roachpb.Lease) error { - shouldRenewTS := lease.Expiration.Add(-1, 0) - l.mtc.manualClock.Set(shouldRenewTS.WallTime + 1) + // Set the clock close to the lease's expiration. + l.mtc.manualClock.Set(lease.Expiration.WallTime - 10) err := l.sendRead(storeIdx).GoError() // We can sometimes receive an error from our renewal attempt because the // lease transfer ends up causing the renewal to re-propose and second diff --git a/pkg/storage/node_liveness.go b/pkg/storage/node_liveness.go index 9dc91d1a0b8b..00259aa0001c 100644 --- a/pkg/storage/node_liveness.go +++ b/pkg/storage/node_liveness.go @@ -410,7 +410,9 @@ func (nl *NodeLiveness) IsLive(nodeID roachpb.NodeID) (bool, error) { if err != nil { return false, err } - return liveness.IsLive(nl.clock.Now()), nil + // NB: We use clock.Now().GoTime() instead of clock.PhysicalTime() in order to + // consider clock signals from other nodes. + return liveness.IsLive(nl.clock.Now().GoTime()), nil } // StartHeartbeat starts a periodic heartbeat to refresh this node's @@ -599,7 +601,7 @@ func (nl *NodeLiveness) heartbeatInternal( // expired while in flight, so maybe we don't have to care about // that and only need to distinguish between same and different // epochs in our return value. - if actual.IsLive(nl.clock.Now()) && !incrementEpoch { + if actual.IsLive(nl.clock.Now().GoTime()) && !incrementEpoch { return errNodeAlreadyLive } // Otherwise, return error. @@ -646,7 +648,7 @@ func (nl *NodeLiveness) GetIsLiveMap() IsLiveMap { lMap := IsLiveMap{} nl.mu.RLock() defer nl.mu.RUnlock() - now := nl.clock.Now() + now := nl.clock.Now().GoTime() for nID, l := range nl.mu.nodes { isLive := l.IsLive(now) if !isLive && l.Decommissioning { @@ -726,7 +728,7 @@ func (nl *NodeLiveness) IncrementEpoch(ctx context.Context, liveness storagepb.L <-sem }() - if liveness.IsLive(nl.clock.Now()) { + if liveness.IsLive(nl.clock.Now().GoTime()) { return errors.Errorf("cannot increment epoch on live node: %+v", liveness) } update := livenessUpdate{Liveness: liveness} @@ -896,7 +898,7 @@ func (nl *NodeLiveness) maybeUpdate(new storagepb.Liveness) { return } - now := nl.clock.Now() + now := nl.clock.Now().GoTime() if !old.IsLive(now) && new.IsLive(now) { for _, fn := range callbacks { fn(new.NodeID) @@ -956,8 +958,6 @@ func (nl *NodeLiveness) numLiveNodes() int64 { return 0 } - now := nl.clock.Now() - nl.mu.RLock() defer nl.mu.RUnlock() @@ -969,6 +969,7 @@ func (nl *NodeLiveness) numLiveNodes() int64 { log.Warningf(ctx, "looking up own liveness: %+v", err) return 0 } + now := nl.clock.Now().GoTime() // If this node isn't live, we don't want to report its view of node liveness // because it's more likely to be inaccurate than the view of a live node. if !self.IsLive(now) { @@ -993,7 +994,7 @@ func (nl *NodeLiveness) AsLiveClock() closedts.LiveClockFn { if err != nil { return hlc.Timestamp{}, 0, err } - if !liveness.IsLive(now) { + if !liveness.IsLive(now.GoTime()) { return hlc.Timestamp{}, 0, errLiveClockNotLive } return now, ctpb.Epoch(liveness.Epoch), nil diff --git a/pkg/storage/storagepb/liveness.go b/pkg/storage/storagepb/liveness.go index 0276804cbbc7..c73f37ef6e9c 100644 --- a/pkg/storage/storagepb/liveness.go +++ b/pkg/storage/storagepb/liveness.go @@ -13,18 +13,29 @@ package storagepb import ( "time" - "github.com/cockroachdb/cockroach/pkg/util/hlc" + "github.com/cockroachdb/cockroach/pkg/util/timeutil" ) -// IsLive returns whether the node is considered live at the given time with the -// given clock offset. -func (l *Liveness) IsLive(now hlc.Timestamp) bool { - expiration := hlc.Timestamp(l.Expiration) - return now.Less(expiration) +// IsLive returns whether the node is considered live at the given time. +// +// NOTE: If one is interested whether the Liveness is valid currently, then the +// timestamp passed in should be the known high-water mark of all the clocks of +// the nodes in the cluster. For example, if the liveness expires at ts 100, our +// physical clock is at 90, but we know that another node's clock is at 110, +// then it's preferable (more consistent across nodes) for the liveness to be +// considered expired. For that purpose, it's better to pass in +// clock.Now().GoTime() rather than clock.PhysicalNow() - the former takes into +// consideration clock signals from other nodes, the latter doesn't. +func (l *Liveness) IsLive(now time.Time) bool { + expiration := timeutil.Unix(0, l.Expiration.WallTime) + return now.Before(expiration) } // IsDead returns true if the liveness expired more than threshold ago. -func (l *Liveness) IsDead(now hlc.Timestamp, threshold time.Duration) bool { - deadAsOf := hlc.Timestamp(l.Expiration).GoTime().Add(threshold) - return !now.GoTime().Before(deadAsOf) +// +// Note that, because of threshold, IsDead() is not the inverse of IsLive(). +func (l *Liveness) IsDead(now time.Time, threshold time.Duration) bool { + expiration := timeutil.Unix(0, l.Expiration.WallTime) + deadAsOf := expiration.Add(threshold) + return !now.Before(deadAsOf) } diff --git a/pkg/storage/storagepb/liveness.pb.go b/pkg/storage/storagepb/liveness.pb.go index f9e34af58d12..eb9fdd0c8234 100644 --- a/pkg/storage/storagepb/liveness.pb.go +++ b/pkg/storage/storagepb/liveness.pb.go @@ -67,7 +67,7 @@ func (x NodeLivenessStatus) String() string { return proto.EnumName(NodeLivenessStatus_name, int32(x)) } func (NodeLivenessStatus) EnumDescriptor() ([]byte, []int) { - return fileDescriptor_liveness_8b452156800a71b7, []int{0} + return fileDescriptor_liveness_61dc59fe6795f558, []int{0} } // Liveness holds information about a node's latest heartbeat and epoch. @@ -80,7 +80,8 @@ type Liveness struct { // may be incremented if the liveness record expires (current time // is later than the expiration timestamp). Epoch int64 `protobuf:"varint,2,opt,name=epoch,proto3" json:"epoch,omitempty"` - // The timestamp at which this liveness record expires. + // The timestamp at which this liveness record expires. The logical part of + // this timestamp is zero. // // Note that the clock max offset is not accounted for in any way when this // expiration is set. If a checker wants to be extra-optimistic about another @@ -98,7 +99,7 @@ func (m *Liveness) Reset() { *m = Liveness{} } func (m *Liveness) String() string { return proto.CompactTextString(m) } func (*Liveness) ProtoMessage() {} func (*Liveness) Descriptor() ([]byte, []int) { - return fileDescriptor_liveness_8b452156800a71b7, []int{0} + return fileDescriptor_liveness_61dc59fe6795f558, []int{0} } func (m *Liveness) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -583,10 +584,10 @@ var ( ) func init() { - proto.RegisterFile("storage/storagepb/liveness.proto", fileDescriptor_liveness_8b452156800a71b7) + proto.RegisterFile("storage/storagepb/liveness.proto", fileDescriptor_liveness_61dc59fe6795f558) } -var fileDescriptor_liveness_8b452156800a71b7 = []byte{ +var fileDescriptor_liveness_61dc59fe6795f558 = []byte{ // 426 bytes of a gzipped FileDescriptorProto 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x5c, 0x52, 0xc1, 0x6e, 0x9b, 0x30, 0x18, 0xc6, 0x09, 0x49, 0x33, 0x47, 0x5a, 0x98, 0xd7, 0x43, 0x94, 0x03, 0xa0, 0xed, 0x82, 0x36, diff --git a/pkg/storage/storagepb/liveness.proto b/pkg/storage/storagepb/liveness.proto index 8cf62c5c10f8..e40774636ca6 100644 --- a/pkg/storage/storagepb/liveness.proto +++ b/pkg/storage/storagepb/liveness.proto @@ -28,7 +28,8 @@ message Liveness { // may be incremented if the liveness record expires (current time // is later than the expiration timestamp). int64 epoch = 2; - // The timestamp at which this liveness record expires. + // The timestamp at which this liveness record expires. The logical part of + // this timestamp is zero. // // Note that the clock max offset is not accounted for in any way when this // expiration is set. If a checker wants to be extra-optimistic about another diff --git a/pkg/storage/store_pool.go b/pkg/storage/store_pool.go index 9763532ef257..2465f4a19401 100644 --- a/pkg/storage/store_pool.go +++ b/pkg/storage/store_pool.go @@ -114,6 +114,9 @@ func MakeStorePoolNodeLivenessFunc(nodeLiveness *NodeLiveness) NodeLivenessFunc // LivenessStatus returns a NodeLivenessStatus enumeration value for the // provided Liveness based on the provided timestamp and threshold. // +// See the note on IsLive() for considerations on what should be passed in as +// `now`. +// // The timeline of the states that a liveness goes through as time passes after // the respective liveness record is written is the following: // @@ -131,8 +134,7 @@ func MakeStorePoolNodeLivenessFunc(nodeLiveness *NodeLiveness) NodeLivenessFunc func LivenessStatus( l storagepb.Liveness, now time.Time, deadThreshold time.Duration, ) storagepb.NodeLivenessStatus { - nowHlc := hlc.Timestamp{WallTime: now.UnixNano()} - if l.IsDead(nowHlc, deadThreshold) { + if l.IsDead(now, deadThreshold) { if l.Decommissioning { return storagepb.NodeLivenessStatus_DECOMMISSIONED } @@ -144,7 +146,7 @@ func LivenessStatus( if l.Draining { return storagepb.NodeLivenessStatus_UNAVAILABLE } - if l.IsLive(nowHlc) { + if l.IsLive(now) { return storagepb.NodeLivenessStatus_LIVE } return storagepb.NodeLivenessStatus_UNAVAILABLE @@ -310,7 +312,7 @@ func (sp *StorePool) String() string { sort.Sort(ids) var buf bytes.Buffer - now := sp.clock.PhysicalTime() + now := sp.clock.Now().GoTime() timeUntilStoreDead := TimeUntilStoreDead.Get(&sp.st.SV) for _, id := range ids { @@ -475,7 +477,9 @@ func (sp *StorePool) decommissioningReplicas( sp.detailsMu.Lock() defer sp.detailsMu.Unlock() - now := sp.clock.PhysicalTime() + // NB: We use clock.Now().GoTime() instead of clock.PhysicalTime() is order to + // take clock signals from remote nodes into consideration. + now := sp.clock.Now().GoTime() timeUntilStoreDead := TimeUntilStoreDead.Get(&sp.st.SV) for _, repl := range repls { @@ -506,7 +510,7 @@ func (sp *StorePool) liveAndDeadReplicas( sp.detailsMu.Lock() defer sp.detailsMu.Unlock() - now := sp.clock.PhysicalTime() + now := sp.clock.Now().GoTime() timeUntilStoreDead := TimeUntilStoreDead.Get(&sp.st.SV) for _, repl := range repls { @@ -682,7 +686,7 @@ func (sp *StorePool) getStoreListFromIDsRLocked( var throttled throttledStoreReasons var storeDescriptors []roachpb.StoreDescriptor - now := sp.clock.PhysicalTime() + now := sp.clock.Now().GoTime() timeUntilStoreDead := TimeUntilStoreDead.Get(&sp.st.SV) for _, storeID := range storeIDs { diff --git a/pkg/util/hlc/hlc.go b/pkg/util/hlc/hlc.go index 4121522b227b..f4ba11237834 100644 --- a/pkg/util/hlc/hlc.go +++ b/pkg/util/hlc/hlc.go @@ -288,6 +288,10 @@ func (c *Clock) enforceWallTimeWithinBoundLocked() { } // PhysicalNow returns the local wall time. +// +// Note that, contrary to Now(), PhysicalNow does not take into consideration +// higher clock signals received through Update(). If you want to take them into +// consideration, use c.Now().GoTime(). func (c *Clock) PhysicalNow() int64 { return c.physicalClock() }