Skip to content

Commit

Permalink
kvserver: improve look-aside cache get API
Browse files Browse the repository at this point in the history
Specifically, apply the following diff (and propagate through callers):

```diff
- getLivenessLocked(roachpb.NodeID) (LivenessRecord, error)
+ getLivenessLocked(roachpb.NodeID) (_ LivenessRecord, ok bool)
```

Previously there was only one possible error type returned, so just a
drive-by simplification.

Release note: None
irfansharif committed Sep 30, 2020

Verified

This commit was created on GitHub.com and signed with GitHub’s verified signature.
1 parent 84d9c12 commit 785aea7
Showing 11 changed files with 123 additions and 149 deletions.
4 changes: 2 additions & 2 deletions pkg/jobs/helpers.go
Original file line number Diff line number Diff line change
@@ -65,14 +65,14 @@ func (*FakeNodeLiveness) ModuleTestingKnobs() {}
// Self implements the implicit storage.NodeLiveness interface. It uses NodeID
// as the node ID. On every call, a nonblocking send is performed over nl.ch to
// allow tests to execute a callback.
func (nl *FakeNodeLiveness) Self() (kvserverpb.Liveness, error) {
func (nl *FakeNodeLiveness) Self() (kvserverpb.Liveness, bool) {
select {
case nl.SelfCalledCh <- struct{}{}:
default:
}
nl.mu.Lock()
defer nl.mu.Unlock()
return *nl.mu.livenessMap[FakeNodeID.Get()], nil
return *nl.mu.livenessMap[FakeNodeID.Get()], true
}

// GetLivenesses implements the implicit storage.NodeLiveness interface.
6 changes: 3 additions & 3 deletions pkg/jobs/registry.go
Original file line number Diff line number Diff line change
@@ -784,10 +784,10 @@ func (r *Registry) maybeCancelJobsDeprecated(
// https://github.com/cockroachdb/cockroach/issues/47892
return
}
liveness, err := nl.Self()
if err != nil {
liveness, ok := nl.Self()
if !ok {
if nodeLivenessLogLimiter.ShouldLog() {
log.Warningf(ctx, "unable to get node liveness: %s", err)
log.Warning(ctx, "own liveness record not found")
}
// Conservatively assume our lease has expired. Abort all jobs.
r.deprecatedCancelAll(ctx)
7 changes: 4 additions & 3 deletions pkg/kv/kvserver/client_test.go
Original file line number Diff line number Diff line change
@@ -1435,11 +1435,12 @@ func (m *multiTestContext) heartbeatLiveness(ctx context.Context, store int) err
m.mu.RLock()
nl := m.nodeLivenesses[store]
m.mu.RUnlock()
l, err := nl.Self()
if err != nil {
return err
l, ok := nl.Self()
if !ok {
return errors.New("liveness not found")
}

var err error
for r := retry.StartWithCtx(ctx, retry.Options{MaxRetries: 5}); r.Next(); {
if err = nl.Heartbeat(ctx, l); !errors.Is(err, kvserver.ErrEpochIncremented) {
break
99 changes: 47 additions & 52 deletions pkg/kv/kvserver/node_liveness.go
Original file line number Diff line number Diff line change
@@ -297,13 +297,8 @@ func (nl *NodeLiveness) SetDraining(
) error {
ctx = nl.ambientCtx.AnnotateCtx(ctx)
for r := retry.StartWithCtx(ctx, base.DefaultRetryOptions()); r.Next(); {
oldLivenessRec, err := nl.SelfEx()
if err != nil {
if !errors.Is(err, errLivenessRecordCacheMiss) {
// TODO(irfansharif): Remove this when we change the signature
// of SelfEx to return a boolean instead of an error.
log.Fatalf(ctx, "unexpected error getting liveness: %+v", err)
}
oldLivenessRec, ok := nl.SelfEx()
if !ok {
// There was a cache miss, let's now fetch the record from KV
// directly.
nodeID := nl.gossip.NodeID.Get()
@@ -610,9 +605,14 @@ func (nl *NodeLiveness) GetLivenessThreshold() time.Duration {
// whether or not its liveness has expired regardless of the liveness status. It
// is an error if the specified node is not in the local liveness table.
func (nl *NodeLiveness) IsLive(nodeID roachpb.NodeID) (bool, error) {
liveness, err := nl.GetLiveness(nodeID)
if err != nil {
return false, err
liveness, ok := nl.GetLiveness(nodeID)
if !ok {
// TODO(irfansharif): We only expect callers to supply us with node IDs
// they learnt through existing liveness records, which implies we
// should never find ourselves here. We should clean up this conditional
// once we re-visit the caching structure used within NodeLiveness;
// we should be able to return ErrMissingLivenessRecord instead.
return false, errLivenessRecordCacheMiss
}
// NB: We use clock.Now().GoTime() instead of clock.PhysicalTime() in order to
// consider clock signals from other nodes.
@@ -664,17 +664,12 @@ func (nl *NodeLiveness) StartHeartbeat(
func(ctx context.Context) error {
// Retry heartbeat in the event the conditional put fails.
for r := retry.StartWithCtx(ctx, retryOpts); r.Next(); {
oldLiveness, err := nl.Self()
if err != nil {
if !errors.Is(err, errLivenessRecordCacheMiss) {
// TODO(irfansharif): Remove this when we change the signature
// of SelfEx to return a boolean instead of an error.
log.Fatalf(ctx, "unexpected error getting liveness: %+v", err)
}
oldLiveness, ok := nl.Self()
if !ok {
nodeID := nl.gossip.NodeID.Get()
liveness, err := nl.getLivenessFromKV(ctx, nodeID)
if err != nil {
log.Infof(ctx, "unable to get liveness record: %s", err)
log.Infof(ctx, "unable to get liveness record from KV: %s", err)
continue
}
oldLiveness = liveness
@@ -831,8 +826,8 @@ func (nl *NodeLiveness) heartbeatInternal(
// would hit a ConditionFailedError and return a errNodeAlreadyLive down
// below.
if !incrementEpoch {
curLiveness, err := nl.Self()
if err == nil && minExpiration.Less(curLiveness.Expiration) {
curLiveness, ok := nl.Self()
if ok && minExpiration.Less(curLiveness.Expiration) {
return nil
}
}
@@ -918,17 +913,17 @@ func (nl *NodeLiveness) heartbeatInternal(
// is returned in the event that the node has neither heartbeat its
// liveness record successfully, nor received a gossip message containing
// a former liveness update on restart.
func (nl *NodeLiveness) Self() (kvserverpb.Liveness, error) {
rec, err := nl.SelfEx()
if err != nil {
return kvserverpb.Liveness{}, err
func (nl *NodeLiveness) Self() (_ kvserverpb.Liveness, ok bool) {
rec, ok := nl.SelfEx()
if !ok {
return kvserverpb.Liveness{}, false
}
return rec.Liveness, nil
return rec.Liveness, true
}

// SelfEx is like Self, but returns the raw, encoded value that the database has
// for this liveness record in addition to the decoded liveness proto.
func (nl *NodeLiveness) SelfEx() (LivenessRecord, error) {
func (nl *NodeLiveness) SelfEx() (_ LivenessRecord, ok bool) {
nl.mu.RLock()
defer nl.mu.RUnlock()
return nl.getLivenessLocked(nl.gossip.NodeID.Get())
@@ -979,24 +974,26 @@ func (nl *NodeLiveness) GetLivenesses() []kvserverpb.Liveness {
return livenesses
}

// GetLiveness returns the liveness record for the specified nodeID.
// ErrNoLivenessRecord is returned in the event that nothing is yet known about
// nodeID via liveness gossip. The record returned also includes the raw,
// encoded value that the database has for this liveness record in addition to
// the decoded liveness proto.
func (nl *NodeLiveness) GetLiveness(nodeID roachpb.NodeID) (LivenessRecord, error) {
// GetLiveness returns the liveness record for the specified nodeID. If the
// liveness record is not found (due to gossip propagation delays or due to the
// node not existing), we surface that to the caller. The record returned also
// includes the raw, encoded value that the database has for this liveness
// record in addition to the decoded liveness proto.
func (nl *NodeLiveness) GetLiveness(nodeID roachpb.NodeID) (_ LivenessRecord, ok bool) {
nl.mu.RLock()
defer nl.mu.RUnlock()
return nl.getLivenessLocked(nodeID)
}

// TODO(irfansharif): This only returns one possible error, so should be made to
// return a boolean instead.
func (nl *NodeLiveness) getLivenessLocked(nodeID roachpb.NodeID) (LivenessRecord, error) {
// getLivenessLocked returns the liveness record for the specified nodeID,
// consulting the in-memory cache. If nothing is found (could happen due to
// gossip propagation delays or the node not existing), we surface that to the
// caller.
func (nl *NodeLiveness) getLivenessLocked(nodeID roachpb.NodeID) (_ LivenessRecord, ok bool) {
if l, ok := nl.mu.nodes[nodeID]; ok {
return l, nil
return l, true
}
return LivenessRecord{}, errLivenessRecordCacheMiss
return LivenessRecord{}, false
}

// getLivenessFromKV fetches the liveness record from KV for a given node, and
@@ -1185,9 +1182,12 @@ func (nl *NodeLiveness) updateLivenessAttempt(
log.Fatalf(ctx, "unexpected oldRaw when ignoreCache not specified")
}

l, err := nl.GetLiveness(update.newLiveness.NodeID)
if err != nil {
return LivenessRecord{}, err
l, ok := nl.GetLiveness(update.newLiveness.NodeID)
if !ok {
// TODO(irfansharif): See TODO in `NodeLiveness.IsLive`, the same
// applies to this conditional. We probably want to be able to
// return ErrMissingLivenessRecord here instead.
return LivenessRecord{}, errLivenessRecordCacheMiss
}
if l.Liveness != update.oldLiveness {
return LivenessRecord{}, handleCondFailed(l)
@@ -1254,13 +1254,8 @@ func (nl *NodeLiveness) maybeUpdate(ctx context.Context, newLivenessRec Liveness

var shouldReplace bool
nl.mu.Lock()
oldLivenessRec, err := nl.getLivenessLocked(newLivenessRec.NodeID)
if err != nil {
if !errors.Is(err, errLivenessRecordCacheMiss) {
// TODO(irfansharif): Remove this when we change the signature
// of SelfEx to return a boolean instead of an error.
log.Fatalf(ctx, "unexpected error getting liveness: %+v", err)
}
oldLivenessRec, ok := nl.getLivenessLocked(newLivenessRec.NodeID)
if !ok {
shouldReplace = true
} else {
shouldReplace = shouldReplaceLiveness(ctx, oldLivenessRec.Liveness, newLivenessRec.Liveness)
@@ -1339,8 +1334,8 @@ func (nl *NodeLiveness) numLiveNodes() int64 {
nl.mu.RLock()
defer nl.mu.RUnlock()

self, err := nl.getLivenessLocked(selfID)
if err != nil {
self, ok := nl.getLivenessLocked(selfID)
if !ok {
return 0
}
now := nl.clock.Now().GoTime()
@@ -1364,9 +1359,9 @@ func (nl *NodeLiveness) numLiveNodes() int64 {
func (nl *NodeLiveness) AsLiveClock() closedts.LiveClockFn {
return func(nodeID roachpb.NodeID) (hlc.Timestamp, ctpb.Epoch, error) {
now := nl.clock.Now()
liveness, err := nl.GetLiveness(nodeID)
if err != nil {
return hlc.Timestamp{}, 0, err
liveness, ok := nl.GetLiveness(nodeID)
if !ok {
return hlc.Timestamp{}, 0, errLivenessRecordCacheMiss
}
if !liveness.IsLive(now.GoTime()) {
return hlc.Timestamp{}, 0, errLiveClockNotLive
115 changes: 48 additions & 67 deletions pkg/kv/kvserver/node_liveness_test.go
Original file line number Diff line number Diff line change
@@ -109,10 +109,8 @@ func TestNodeLiveness(t *testing.T) {
}
// Trigger a manual heartbeat and verify liveness is reestablished.
for _, nl := range mtc.nodeLivenesses {
l, err := nl.Self()
if err != nil {
t.Fatal(err)
}
l, ok := nl.Self()
assert.True(t, ok)
for {
err := nl.Heartbeat(context.Background(), l)
if err == nil {
@@ -146,10 +144,8 @@ func TestNodeLivenessInitialIncrement(t *testing.T) {
// Verify liveness of all nodes for all nodes.
verifyLiveness(t, mtc)

liveness, err := mtc.nodeLivenesses[0].GetLiveness(mtc.gossips[0].NodeID.Get())
if err != nil {
t.Fatal(err)
}
liveness, ok := mtc.nodeLivenesses[0].GetLiveness(mtc.gossips[0].NodeID.Get())
assert.True(t, ok)
if liveness.Epoch != 1 {
t.Errorf("expected epoch to be set to 1 initially; got %d", liveness.Epoch)
}
@@ -185,10 +181,8 @@ func TestNodeLivenessAppearsAtStart(t *testing.T) {
t.Fatalf("node %d not live", nodeID)
}

livenessRec, err := nl.GetLiveness(nodeID)
if err != nil {
t.Fatal(err)
}
livenessRec, ok := nl.GetLiveness(nodeID)
assert.True(t, ok)
if livenessRec.NodeID != nodeID {
t.Fatalf("expected node ID %d, got %d", nodeID, livenessRec.NodeID)
}
@@ -205,9 +199,9 @@ func TestNodeLivenessAppearsAtStart(t *testing.T) {

func verifyEpochIncremented(t *testing.T, mtc *multiTestContext, nodeIdx int) {
testutils.SucceedsSoon(t, func() error {
liveness, err := mtc.nodeLivenesses[nodeIdx].GetLiveness(mtc.gossips[nodeIdx].NodeID.Get())
if err != nil {
return err
liveness, ok := mtc.nodeLivenesses[nodeIdx].GetLiveness(mtc.gossips[nodeIdx].NodeID.Get())
if !ok {
return errors.New("liveness not found")
}
if liveness.Epoch < 2 {
return errors.Errorf("expected epoch to be >=2 on restart but was %d", liveness.Epoch)
@@ -235,8 +229,8 @@ func TestRedundantNodeLivenessHeartbeatsAvoided(t *testing.T) {
nl.PauseHeartbeatLoopForTest()
enableSync := nl.PauseSynchronousHeartbeatsForTest()

liveness, err := nl.Self()
require.NoError(t, err)
liveness, ok := nl.Self()
assert.True(t, ok)
hbBefore := nl.Metrics().HeartbeatSuccesses.Count()
require.Equal(t, int64(0), nl.Metrics().HeartbeatsInFlight.Value())

@@ -251,10 +245,8 @@ func TestRedundantNodeLivenessHeartbeatsAvoided(t *testing.T) {
if err := nl.Heartbeat(ctx, liveness); err != nil {
return err
}
livenessAfter, err := nl.Self()
if err != nil {
return err
}
livenessAfter, found := nl.Self()
assert.True(t, found)
exp := livenessAfter.Expiration
minExp := hlc.LegacyTimestamp(before.Add(nlActive.Nanoseconds(), 0))
if exp.Less(minExp) {
@@ -284,8 +276,8 @@ func TestRedundantNodeLivenessHeartbeatsAvoided(t *testing.T) {
require.Equal(t, int64(0), nl.Metrics().HeartbeatsInFlight.Value())

// Send one more heartbeat. Should update liveness record.
liveness, err = nl.Self()
require.NoError(t, err)
liveness, ok = nl.Self()
require.True(t, ok)
require.NoError(t, nl.Heartbeat(ctx, liveness))
require.Equal(t, hbBefore+2, nl.Metrics().HeartbeatSuccesses.Count())
require.Equal(t, int64(0), nl.Metrics().HeartbeatsInFlight.Value())
@@ -317,10 +309,8 @@ func TestNodeIsLiveCallback(t *testing.T) {

// Trigger a manual heartbeat and verify callbacks for each node ID are invoked.
for _, nl := range mtc.nodeLivenesses {
l, err := nl.Self()
if err != nil {
t.Fatal(err)
}
l, ok := nl.Self()
assert.True(t, ok)
if err := nl.Heartbeat(context.Background(), l); err != nil {
t.Fatal(err)
}
@@ -376,10 +366,8 @@ func TestNodeHeartbeatCallback(t *testing.T) {
// store.
mtc.manualClock.Increment(mtc.nodeLivenesses[0].GetLivenessThreshold().Nanoseconds() + 1)
for _, nl := range mtc.nodeLivenesses {
l, err := nl.Self()
if err != nil {
t.Fatal(err)
}
l, ok := nl.Self()
assert.True(t, ok)
if err := nl.Heartbeat(context.Background(), l); err != nil {
t.Fatal(err)
}
@@ -410,10 +398,8 @@ func TestNodeLivenessEpochIncrement(t *testing.T) {

// First try to increment the epoch of a known-live node.
deadNodeID := mtc.gossips[1].NodeID.Get()
oldLiveness, err := mtc.nodeLivenesses[0].GetLiveness(deadNodeID)
if err != nil {
t.Fatal(err)
}
oldLiveness, ok := mtc.nodeLivenesses[0].GetLiveness(deadNodeID)
assert.True(t, ok)
if err := mtc.nodeLivenesses[0].IncrementEpoch(
ctx, oldLiveness.Liveness,
); !testutils.IsError(err, "cannot increment epoch on live node") {
@@ -428,9 +414,9 @@ func TestNodeLivenessEpochIncrement(t *testing.T) {

// Verify that the epoch has been advanced.
testutils.SucceedsSoon(t, func() error {
newLiveness, err := mtc.nodeLivenesses[0].GetLiveness(deadNodeID)
if err != nil {
return err
newLiveness, ok := mtc.nodeLivenesses[0].GetLiveness(deadNodeID)
if !ok {
return errors.New("liveness not found")
}
if newLiveness.Epoch != oldLiveness.Epoch+1 {
return errors.Errorf("expected epoch to increment")
@@ -545,9 +531,12 @@ func TestNodeLivenessSelf(t *testing.T) {
// callback.
var liveness kvserver.LivenessRecord
testutils.SucceedsSoon(t, func() error {
var err error
liveness, err = mtc.nodeLivenesses[0].GetLiveness(g.NodeID.Get())
return err
l, ok := mtc.nodeLivenesses[0].GetLiveness(g.NodeID.Get())
if !ok {
return errors.New("liveness not found")
}
liveness = l
return nil
})
if err := mtc.nodeLivenesses[0].Heartbeat(context.Background(), liveness.Liveness); err != nil {
t.Fatal(err)
@@ -575,13 +564,11 @@ func TestNodeLivenessSelf(t *testing.T) {

// Self should not see the fake liveness, but have kept the real one.
l := mtc.nodeLivenesses[0]
lGetRec, err := l.GetLiveness(g.NodeID.Get())
require.NoError(t, err)
lGetRec, ok := l.GetLiveness(g.NodeID.Get())
require.True(t, ok)
lGet := lGetRec.Liveness
lSelf, err := l.Self()
if err != nil {
t.Fatal(err)
}
lSelf, ok := l.Self()
assert.True(t, ok)
if !reflect.DeepEqual(lGet, lSelf) {
t.Errorf("expected GetLiveness() to return same value as Self(): %+v != %+v", lGet, lSelf)
}
@@ -616,9 +603,9 @@ func TestNodeLivenessGetIsLiveMap(t *testing.T) {
mtc.manualClock.Increment(mtc.nodeLivenesses[0].GetLivenessThreshold().Nanoseconds() + 1)
var liveness kvserver.LivenessRecord
testutils.SucceedsSoon(t, func() error {
livenessRec, err := mtc.nodeLivenesses[0].GetLiveness(mtc.gossips[0].NodeID.Get())
if err != nil {
return err
livenessRec, ok := mtc.nodeLivenesses[0].GetLiveness(mtc.gossips[0].NodeID.Get())
if !ok {
return errors.New("liveness not found")
}
liveness = livenessRec
return nil
@@ -680,9 +667,9 @@ func TestNodeLivenessGetLivenesses(t *testing.T) {
mtc.manualClock.Increment(mtc.nodeLivenesses[0].GetLivenessThreshold().Nanoseconds() + 1)
var liveness kvserver.LivenessRecord
testutils.SucceedsSoon(t, func() error {
livenessRec, err := mtc.nodeLivenesses[0].GetLiveness(mtc.gossips[0].NodeID.Get())
if err != nil {
return err
livenessRec, ok := mtc.nodeLivenesses[0].GetLiveness(mtc.gossips[0].NodeID.Get())
if !ok {
return errors.New("liveness not found")
}
liveness = livenessRec
return nil
@@ -729,10 +716,8 @@ func TestNodeLivenessConcurrentHeartbeats(t *testing.T) {
// Advance clock past the liveness threshold & concurrently heartbeat node.
nl := mtc.nodeLivenesses[0]
mtc.manualClock.Increment(nl.GetLivenessThreshold().Nanoseconds() + 1)
l, err := nl.Self()
if err != nil {
t.Fatal(err)
}
l, ok := nl.Self()
assert.True(t, ok)
errCh := make(chan error, concurrency)
for i := 0; i < concurrency; i++ {
go func() {
@@ -763,10 +748,8 @@ func TestNodeLivenessConcurrentIncrementEpochs(t *testing.T) {
// Advance the clock and this time increment epoch concurrently for node 1.
nl := mtc.nodeLivenesses[0]
mtc.manualClock.Increment(nl.GetLivenessThreshold().Nanoseconds() + 1)
l, err := nl.GetLiveness(mtc.gossips[1].NodeID.Get())
if err != nil {
t.Fatal(err)
}
l, ok := nl.GetLiveness(mtc.gossips[1].NodeID.Get())
assert.True(t, ok)
errCh := make(chan error, concurrency)
for i := 0; i < concurrency; i++ {
go func() {
@@ -915,10 +898,8 @@ func TestNodeLivenessRetryAmbiguousResultError(t *testing.T) {
verifyLiveness(t, mtc)

nl := mtc.nodeLivenesses[0]
l, err := nl.Self()
if err != nil {
t.Fatal(err)
}
l, ok := nl.Self()
assert.True(t, ok)

// And again on manual heartbeat.
injectError.Store(true)
@@ -1099,8 +1080,8 @@ func testNodeLivenessSetDecommissioning(t *testing.T, decommissionNodeIdx int) {

// Verify success on failed update of a liveness record that already has the
// given decommissioning setting.
oldLivenessRec, err := callerNodeLiveness.GetLiveness(nodeID)
assert.Nil(t, err)
oldLivenessRec, ok := callerNodeLiveness.GetLiveness(nodeID)
assert.True(t, ok)
if _, err := callerNodeLiveness.SetDecommissioningInternal(
ctx, oldLivenessRec, kvserverpb.MembershipStatus_ACTIVE,
); err != nil {
3 changes: 1 addition & 2 deletions pkg/kv/kvserver/replica_gc_queue.go
Original file line number Diff line number Diff line change
@@ -165,8 +165,7 @@ func (rgcq *replicaGCQueue) shouldQueue(
// dormant ranges. Make sure NodeLiveness isn't nil because it can be in
// tests/benchmarks.
if repl.store.cfg.NodeLiveness != nil {
if liveness, err := repl.store.cfg.NodeLiveness.Self(); err == nil &&
!liveness.Membership.Active() {
if liveness, ok := repl.store.cfg.NodeLiveness.Self(); ok && !liveness.Membership.Active() {
return true, replicaGCPriorityDefault
}
}
14 changes: 7 additions & 7 deletions pkg/kv/kvserver/replica_range_lease.go
Original file line number Diff line number Diff line change
@@ -224,12 +224,12 @@ func (p *pendingLeaseRequest) InitOrJoinRequest(
*reqLease.Expiration = status.Timestamp.Add(int64(p.repl.store.cfg.RangeLeaseActiveDuration()), 0)
} else {
// Get the liveness for the next lease holder and set the epoch in the lease request.
liveness, err := p.repl.store.cfg.NodeLiveness.GetLiveness(nextLeaseHolder.NodeID)
if err != nil {
liveness, ok := p.repl.store.cfg.NodeLiveness.GetLiveness(nextLeaseHolder.NodeID)
if !ok {
llHandle.resolve(roachpb.NewError(&roachpb.LeaseRejectedError{
Existing: status.Lease,
Requested: reqLease,
Message: fmt.Sprintf("couldn't request lease for %+v: %v", nextLeaseHolder, err),
Message: fmt.Sprintf("couldn't request lease for %+v: %v", nextLeaseHolder, errLivenessRecordCacheMiss),
}))
return llHandle
}
@@ -543,17 +543,17 @@ func (r *Replica) leaseStatus(
if lease.Type() == roachpb.LeaseExpiration {
expiration = lease.GetExpiration()
} else {
l, err := r.store.cfg.NodeLiveness.GetLiveness(lease.Replica.NodeID)
l, ok := r.store.cfg.NodeLiveness.GetLiveness(lease.Replica.NodeID)
status.Liveness = l.Liveness
if err != nil || status.Liveness.Epoch < lease.Epoch {
if !ok || status.Liveness.Epoch < lease.Epoch {
// If lease validity can't be determined (e.g. gossip is down
// and liveness info isn't available for owner), we can neither
// use the lease nor do we want to attempt to acquire it.
if err != nil {
if !ok {
if leaseStatusLogLimiter.ShouldLog() {
ctx = r.AnnotateCtx(ctx)
log.Warningf(ctx, "can't determine lease status of %s due to node liveness error: %+v",
lease.Replica, err)
lease.Replica, errLivenessRecordCacheMiss)
}
}
status.State = kvserverpb.LeaseState_ERROR
4 changes: 2 additions & 2 deletions pkg/kv/kvserver/store_pool.go
Original file line number Diff line number Diff line change
@@ -103,8 +103,8 @@ func MakeStorePoolNodeLivenessFunc(nodeLiveness *NodeLiveness) NodeLivenessFunc
return func(
nodeID roachpb.NodeID, now time.Time, timeUntilStoreDead time.Duration,
) kvserverpb.NodeLivenessStatus {
liveness, err := nodeLiveness.GetLiveness(nodeID)
if err != nil {
liveness, ok := nodeLiveness.GetLiveness(nodeID)
if !ok {
return kvserverpb.NodeLivenessStatus_UNAVAILABLE
}
return LivenessStatus(liveness.Liveness, now, timeUntilStoreDead)
12 changes: 6 additions & 6 deletions pkg/server/admin.go
Original file line number Diff line number Diff line change
@@ -1344,9 +1344,9 @@ func (s *adminServer) checkReadinessForHealthCheck() error {

// TODO(knz): update this code when progress is made on
// https://github.com/cockroachdb/cockroach/issues/45123
l, err := s.server.nodeLiveness.GetLiveness(s.server.NodeID())
if err != nil {
return s.serverError(err)
l, ok := s.server.nodeLiveness.GetLiveness(s.server.NodeID())
if !ok {
return status.Error(codes.Unavailable, "liveness record not found")
}
if !l.IsLive(s.server.clock.Now().GoTime()) {
return status.Errorf(codes.Unavailable, "node is not healthy")
@@ -1682,9 +1682,9 @@ func (s *adminServer) DecommissionStatus(
var res serverpb.DecommissionStatusResponse

for nodeID := range replicaCounts {
l, err := s.server.nodeLiveness.GetLiveness(nodeID)
if err != nil {
return nil, errors.Wrapf(err, "unable to get liveness for %d", nodeID)
l, ok := s.server.nodeLiveness.GetLiveness(nodeID)
if !ok {
return nil, errors.Newf("unable to get liveness for %d", nodeID)
}
nodeResp := serverpb.DecommissionStatusResponse_Status{
NodeID: l.NodeID,
6 changes: 2 additions & 4 deletions pkg/server/admin_test.go
Original file line number Diff line number Diff line change
@@ -1349,10 +1349,8 @@ func TestHealthAPI(t *testing.T) {
// server's clock.
ts := s.(*TestServer)
defer ts.nodeLiveness.PauseAllHeartbeatsForTest()()
self, err := ts.nodeLiveness.Self()
if err != nil {
t.Fatal(err)
}
self, ok := ts.nodeLiveness.Self()
assert.True(t, ok)
s.Clock().Update(hlc.Timestamp(self.Expiration).Add(1, 0))

var resp serverpb.HealthResponse
2 changes: 1 addition & 1 deletion pkg/sql/optionalnodeliveness/node_liveness.go
Original file line number Diff line number Diff line change
@@ -18,7 +18,7 @@ import (

// Interface is the interface used in Container.
type Interface interface {
Self() (kvserverpb.Liveness, error)
Self() (kvserverpb.Liveness, bool)
GetLivenesses() []kvserverpb.Liveness
IsLive(roachpb.NodeID) (bool, error)
}

0 comments on commit 785aea7

Please sign in to comment.