Skip to content

Commit

Permalink
kvserver: improve look-aside cache get API
Browse files Browse the repository at this point in the history
Specifically, apply the following diff (and propagate through callers):

```diff
- getLivenessLocked(roachpb.NodeID) (LivenessRecord, error)
+ getLivenessLocked(roachpb.NodeID) (_ LivenessRecord, ok bool)
```

Previously there was only one possible error type returned, so just a
drive-by simplification.

Release note: None
  • Loading branch information
irfansharif authored and jayshrivastava committed Oct 8, 2020
1 parent bdc5697 commit a974772
Show file tree
Hide file tree
Showing 11 changed files with 123 additions and 149 deletions.
4 changes: 2 additions & 2 deletions pkg/jobs/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,14 +65,14 @@ func (*FakeNodeLiveness) ModuleTestingKnobs() {}
// Self implements the implicit storage.NodeLiveness interface. It uses NodeID
// as the node ID. On every call, a nonblocking send is performed over nl.ch to
// allow tests to execute a callback.
func (nl *FakeNodeLiveness) Self() (kvserverpb.Liveness, error) {
func (nl *FakeNodeLiveness) Self() (kvserverpb.Liveness, bool) {
select {
case nl.SelfCalledCh <- struct{}{}:
default:
}
nl.mu.Lock()
defer nl.mu.Unlock()
return *nl.mu.livenessMap[FakeNodeID.Get()], nil
return *nl.mu.livenessMap[FakeNodeID.Get()], true
}

// GetLivenesses implements the implicit storage.NodeLiveness interface.
Expand Down
6 changes: 3 additions & 3 deletions pkg/jobs/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -784,10 +784,10 @@ func (r *Registry) maybeCancelJobsDeprecated(
// https://github.com/cockroachdb/cockroach/issues/47892
return
}
liveness, err := nl.Self()
if err != nil {
liveness, ok := nl.Self()
if !ok {
if nodeLivenessLogLimiter.ShouldLog() {
log.Warningf(ctx, "unable to get node liveness: %s", err)
log.Warning(ctx, "own liveness record not found")
}
// Conservatively assume our lease has expired. Abort all jobs.
r.deprecatedCancelAll(ctx)
Expand Down
7 changes: 4 additions & 3 deletions pkg/kv/kvserver/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1435,11 +1435,12 @@ func (m *multiTestContext) heartbeatLiveness(ctx context.Context, store int) err
m.mu.RLock()
nl := m.nodeLivenesses[store]
m.mu.RUnlock()
l, err := nl.Self()
if err != nil {
return err
l, ok := nl.Self()
if !ok {
return errors.New("liveness not found")
}

var err error
for r := retry.StartWithCtx(ctx, retry.Options{MaxRetries: 5}); r.Next(); {
if err = nl.Heartbeat(ctx, l); !errors.Is(err, kvserver.ErrEpochIncremented) {
break
Expand Down
99 changes: 47 additions & 52 deletions pkg/kv/kvserver/node_liveness.go
Original file line number Diff line number Diff line change
Expand Up @@ -297,13 +297,8 @@ func (nl *NodeLiveness) SetDraining(
) error {
ctx = nl.ambientCtx.AnnotateCtx(ctx)
for r := retry.StartWithCtx(ctx, base.DefaultRetryOptions()); r.Next(); {
oldLivenessRec, err := nl.SelfEx()
if err != nil {
if !errors.Is(err, errLivenessRecordCacheMiss) {
// TODO(irfansharif): Remove this when we change the signature
// of SelfEx to return a boolean instead of an error.
log.Fatalf(ctx, "unexpected error getting liveness: %+v", err)
}
oldLivenessRec, ok := nl.SelfEx()
if !ok {
// There was a cache miss, let's now fetch the record from KV
// directly.
nodeID := nl.gossip.NodeID.Get()
Expand Down Expand Up @@ -610,9 +605,14 @@ func (nl *NodeLiveness) GetLivenessThreshold() time.Duration {
// whether or not its liveness has expired regardless of the liveness status. It
// is an error if the specified node is not in the local liveness table.
func (nl *NodeLiveness) IsLive(nodeID roachpb.NodeID) (bool, error) {
liveness, err := nl.GetLiveness(nodeID)
if err != nil {
return false, err
liveness, ok := nl.GetLiveness(nodeID)
if !ok {
// TODO(irfansharif): We only expect callers to supply us with node IDs
// they learnt through existing liveness records, which implies we
// should never find ourselves here. We should clean up this conditional
// once we re-visit the caching structure used within NodeLiveness;
// we should be able to return ErrMissingLivenessRecord instead.
return false, errLivenessRecordCacheMiss
}
// NB: We use clock.Now().GoTime() instead of clock.PhysicalTime() in order to
// consider clock signals from other nodes.
Expand Down Expand Up @@ -664,17 +664,12 @@ func (nl *NodeLiveness) StartHeartbeat(
func(ctx context.Context) error {
// Retry heartbeat in the event the conditional put fails.
for r := retry.StartWithCtx(ctx, retryOpts); r.Next(); {
oldLiveness, err := nl.Self()
if err != nil {
if !errors.Is(err, errLivenessRecordCacheMiss) {
// TODO(irfansharif): Remove this when we change the signature
// of SelfEx to return a boolean instead of an error.
log.Fatalf(ctx, "unexpected error getting liveness: %+v", err)
}
oldLiveness, ok := nl.Self()
if !ok {
nodeID := nl.gossip.NodeID.Get()
liveness, err := nl.getLivenessFromKV(ctx, nodeID)
if err != nil {
log.Infof(ctx, "unable to get liveness record: %s", err)
log.Infof(ctx, "unable to get liveness record from KV: %s", err)
continue
}
oldLiveness = liveness
Expand Down Expand Up @@ -831,8 +826,8 @@ func (nl *NodeLiveness) heartbeatInternal(
// would hit a ConditionFailedError and return a errNodeAlreadyLive down
// below.
if !incrementEpoch {
curLiveness, err := nl.Self()
if err == nil && minExpiration.Less(curLiveness.Expiration) {
curLiveness, ok := nl.Self()
if ok && minExpiration.Less(curLiveness.Expiration) {
return nil
}
}
Expand Down Expand Up @@ -918,17 +913,17 @@ func (nl *NodeLiveness) heartbeatInternal(
// is returned in the event that the node has neither heartbeat its
// liveness record successfully, nor received a gossip message containing
// a former liveness update on restart.
func (nl *NodeLiveness) Self() (kvserverpb.Liveness, error) {
rec, err := nl.SelfEx()
if err != nil {
return kvserverpb.Liveness{}, err
func (nl *NodeLiveness) Self() (_ kvserverpb.Liveness, ok bool) {
rec, ok := nl.SelfEx()
if !ok {
return kvserverpb.Liveness{}, false
}
return rec.Liveness, nil
return rec.Liveness, true
}

// SelfEx is like Self, but returns the raw, encoded value that the database has
// for this liveness record in addition to the decoded liveness proto.
func (nl *NodeLiveness) SelfEx() (LivenessRecord, error) {
func (nl *NodeLiveness) SelfEx() (_ LivenessRecord, ok bool) {
nl.mu.RLock()
defer nl.mu.RUnlock()
return nl.getLivenessLocked(nl.gossip.NodeID.Get())
Expand Down Expand Up @@ -979,24 +974,26 @@ func (nl *NodeLiveness) GetLivenesses() []kvserverpb.Liveness {
return livenesses
}

// GetLiveness returns the liveness record for the specified nodeID.
// ErrNoLivenessRecord is returned in the event that nothing is yet known about
// nodeID via liveness gossip. The record returned also includes the raw,
// encoded value that the database has for this liveness record in addition to
// the decoded liveness proto.
func (nl *NodeLiveness) GetLiveness(nodeID roachpb.NodeID) (LivenessRecord, error) {
// GetLiveness returns the liveness record for the specified nodeID. If the
// liveness record is not found (due to gossip propagation delays or due to the
// node not existing), we surface that to the caller. The record returned also
// includes the raw, encoded value that the database has for this liveness
// record in addition to the decoded liveness proto.
func (nl *NodeLiveness) GetLiveness(nodeID roachpb.NodeID) (_ LivenessRecord, ok bool) {
nl.mu.RLock()
defer nl.mu.RUnlock()
return nl.getLivenessLocked(nodeID)
}

// TODO(irfansharif): This only returns one possible error, so should be made to
// return a boolean instead.
func (nl *NodeLiveness) getLivenessLocked(nodeID roachpb.NodeID) (LivenessRecord, error) {
// getLivenessLocked returns the liveness record for the specified nodeID,
// consulting the in-memory cache. If nothing is found (could happen due to
// gossip propagation delays or the node not existing), we surface that to the
// caller.
func (nl *NodeLiveness) getLivenessLocked(nodeID roachpb.NodeID) (_ LivenessRecord, ok bool) {
if l, ok := nl.mu.nodes[nodeID]; ok {
return l, nil
return l, true
}
return LivenessRecord{}, errLivenessRecordCacheMiss
return LivenessRecord{}, false
}

// getLivenessFromKV fetches the liveness record from KV for a given node, and
Expand Down Expand Up @@ -1185,9 +1182,12 @@ func (nl *NodeLiveness) updateLivenessAttempt(
log.Fatalf(ctx, "unexpected oldRaw when ignoreCache not specified")
}

l, err := nl.GetLiveness(update.newLiveness.NodeID)
if err != nil {
return LivenessRecord{}, err
l, ok := nl.GetLiveness(update.newLiveness.NodeID)
if !ok {
// TODO(irfansharif): See TODO in `NodeLiveness.IsLive`, the same
// applies to this conditional. We probably want to be able to
// return ErrMissingLivenessRecord here instead.
return LivenessRecord{}, errLivenessRecordCacheMiss
}
if l.Liveness != update.oldLiveness {
return LivenessRecord{}, handleCondFailed(l)
Expand Down Expand Up @@ -1254,13 +1254,8 @@ func (nl *NodeLiveness) maybeUpdate(ctx context.Context, newLivenessRec Liveness

var shouldReplace bool
nl.mu.Lock()
oldLivenessRec, err := nl.getLivenessLocked(newLivenessRec.NodeID)
if err != nil {
if !errors.Is(err, errLivenessRecordCacheMiss) {
// TODO(irfansharif): Remove this when we change the signature
// of SelfEx to return a boolean instead of an error.
log.Fatalf(ctx, "unexpected error getting liveness: %+v", err)
}
oldLivenessRec, ok := nl.getLivenessLocked(newLivenessRec.NodeID)
if !ok {
shouldReplace = true
} else {
shouldReplace = shouldReplaceLiveness(ctx, oldLivenessRec.Liveness, newLivenessRec.Liveness)
Expand Down Expand Up @@ -1339,8 +1334,8 @@ func (nl *NodeLiveness) numLiveNodes() int64 {
nl.mu.RLock()
defer nl.mu.RUnlock()

self, err := nl.getLivenessLocked(selfID)
if err != nil {
self, ok := nl.getLivenessLocked(selfID)
if !ok {
return 0
}
now := nl.clock.Now().GoTime()
Expand All @@ -1364,9 +1359,9 @@ func (nl *NodeLiveness) numLiveNodes() int64 {
func (nl *NodeLiveness) AsLiveClock() closedts.LiveClockFn {
return func(nodeID roachpb.NodeID) (hlc.Timestamp, ctpb.Epoch, error) {
now := nl.clock.Now()
liveness, err := nl.GetLiveness(nodeID)
if err != nil {
return hlc.Timestamp{}, 0, err
liveness, ok := nl.GetLiveness(nodeID)
if !ok {
return hlc.Timestamp{}, 0, errLivenessRecordCacheMiss
}
if !liveness.IsLive(now.GoTime()) {
return hlc.Timestamp{}, 0, errLiveClockNotLive
Expand Down
Loading

0 comments on commit a974772

Please sign in to comment.