diff --git a/pkg/kv/kvserver/helpers_test.go b/pkg/kv/kvserver/helpers_test.go index 8431eb3c70d0..a359b65909cb 100644 --- a/pkg/kv/kvserver/helpers_test.go +++ b/pkg/kv/kvserver/helpers_test.go @@ -514,12 +514,9 @@ func (nl *NodeLiveness) SetDrainingInternal( } func (nl *NodeLiveness) SetDecommissioningInternal( - ctx context.Context, - nodeID roachpb.NodeID, - oldLivenessRec LivenessRecord, - targetStatus kvserverpb.MembershipStatus, + ctx context.Context, oldLivenessRec LivenessRecord, targetStatus kvserverpb.MembershipStatus, ) (changeCommitted bool, err error) { - return nl.setMembershipStatusInternal(ctx, nodeID, oldLivenessRec, targetStatus) + return nl.setMembershipStatusInternal(ctx, oldLivenessRec, targetStatus) } // GetCircuitBreaker returns the circuit breaker controlling diff --git a/pkg/kv/kvserver/node_liveness.go b/pkg/kv/kvserver/node_liveness.go index cb59b4b420cc..431b2ca3cb61 100644 --- a/pkg/kv/kvserver/node_liveness.go +++ b/pkg/kv/kvserver/node_liveness.go @@ -40,9 +40,14 @@ import ( ) var ( - // ErrNoLivenessRecord is returned when asking for liveness information - // about a node for which nothing is known. - ErrNoLivenessRecord = errors.New("node not in the liveness table") + // ErrMissingLivenessRecord is returned when asking for liveness information + // about a node for which nothing is known. This happens when attempting to + // {d,r}ecommission a non-existent node. + ErrMissingLivenessRecord = errors.New("missing liveness record") + + // errLivenessRecordCacheMiss is returned when asking for the liveness + // record of a given node and it is not found in the in-memory cache. + errLivenessRecordCacheMiss = errors.New("liveness record not found in cache") // errChangeMembershipStatusFailed is returned when we're not able to // conditionally write the target membership status. It's safe to retry @@ -260,11 +265,22 @@ func (nl *NodeLiveness) SetDraining( ctx = nl.ambientCtx.AnnotateCtx(ctx) for r := retry.StartWithCtx(ctx, base.DefaultRetryOptions()); r.Next(); { oldLivenessRec, err := nl.SelfEx() - if err != nil && !errors.Is(err, ErrNoLivenessRecord) { - log.Errorf(ctx, "unexpected error getting liveness: %+v", err) - } - err = nl.setDrainingInternal(ctx, oldLivenessRec, drain, reporter) if err != nil { + if !errors.Is(err, errLivenessRecordCacheMiss) { + // TODO(irfansharif): Remove this when we change the signature + // of SelfEx to return a boolean instead of an error. + log.Fatalf(ctx, "unexpected error getting liveness: %+v", err) + } + // There was a cache miss, let's now fetch the record from KV + // directly. + nodeID := nl.gossip.NodeID.Get() + livenessRec, err := nl.getLivenessRecordFromKV(ctx, nodeID) + if err != nil { + return err + } + oldLivenessRec = livenessRec + } + if err := nl.setDrainingInternal(ctx, oldLivenessRec, drain, reporter); err != nil { if log.V(1) { log.Infof(ctx, "attempting to set liveness draining status to %v: %v", drain, err) } @@ -337,7 +353,8 @@ func (nl *NodeLiveness) SetMembershipStatus( return false, errors.Wrap(err, "unable to get liveness") } if kv.Value == nil { - return false, ErrNoLivenessRecord + // We must be trying to decommission a node that does not exist. + return false, ErrMissingLivenessRecord } if err := kv.Value.GetProto(&oldLiveness); err != nil { return false, errors.Wrap(err, "invalid liveness record") @@ -352,8 +369,8 @@ func (nl *NodeLiveness) SetMembershipStatus( // Offer it to make sure that when we actually try to update the // liveness, the previous view is correct. This, too, is required to // de-flake TestNodeLivenessDecommissionAbsent. - nl.maybeUpdate(oldLivenessRec) - return nl.setMembershipStatusInternal(ctx, nodeID, oldLivenessRec, targetStatus) + nl.maybeUpdate(ctx, oldLivenessRec) + return nl.setMembershipStatusInternal(ctx, oldLivenessRec, targetStatus) } for { @@ -384,18 +401,14 @@ func (nl *NodeLiveness) setDrainingInternal( <-sem }() - // Let's compute what our new liveness record should be. - var newLiveness kvserverpb.Liveness if oldLivenessRec.Liveness == (kvserverpb.Liveness{}) { - // Liveness record didn't previously exist, so we create one. - newLiveness = kvserverpb.Liveness{ - NodeID: nodeID, - Epoch: 1, - } - } else { - newLiveness = oldLivenessRec.Liveness + return errors.AssertionFailedf("invalid old liveness record; found to be empty") } + // Let's compute what our new liveness record should be. We start off with a + // copy of our existing liveness record. + newLiveness := oldLivenessRec.Liveness + if reporter != nil && drain && !newLiveness.Draining { // Report progress to the Drain RPC. reporter(1, "liveness record") @@ -409,7 +422,7 @@ func (nl *NodeLiveness) setDrainingInternal( ignoreCache: true, } written, err := nl.updateLiveness(ctx, update, func(actual LivenessRecord) error { - nl.maybeUpdate(actual) + nl.maybeUpdate(ctx, actual) if actual.Draining == update.newLiveness.Draining { return errNodeDrainingSet @@ -426,7 +439,7 @@ func (nl *NodeLiveness) setDrainingInternal( return err } - nl.maybeUpdate(written) + nl.maybeUpdate(ctx, written) return nil } @@ -506,29 +519,15 @@ func (nl *NodeLiveness) CreateLivenessRecord(ctx context.Context, nodeID roachpb } func (nl *NodeLiveness) setMembershipStatusInternal( - ctx context.Context, - nodeID roachpb.NodeID, - oldLivenessRec LivenessRecord, - targetStatus kvserverpb.MembershipStatus, + ctx context.Context, oldLivenessRec LivenessRecord, targetStatus kvserverpb.MembershipStatus, ) (statusChanged bool, err error) { - // Let's compute what our new liveness record should be. - var newLiveness kvserverpb.Liveness if oldLivenessRec.Liveness == (kvserverpb.Liveness{}) { - // Liveness record didn't previously exist, so we create one. - // - // TODO(irfansharif): The above is now no longer possible. We always - // create one (see CreateLivenessRecord, WriteInitialClusterData) when - // adding a node to the cluster. We should clean up all this logic that - // tries to work around the liveness record possibly not existing. - newLiveness = kvserverpb.Liveness{ - NodeID: nodeID, - Epoch: 1, - } - } else { - // We start off with a copy of our existing liveness record. - newLiveness = oldLivenessRec.Liveness + return false, errors.AssertionFailedf("invalid old liveness record; found to be empty") } + // Let's compute what our new liveness record should be. We start off with a + // copy of our existing liveness record. + newLiveness := oldLivenessRec.Liveness newLiveness.Membership = targetStatus if oldLivenessRec.Membership == newLiveness.Membership { // No-op. Return early. @@ -540,11 +539,8 @@ func (nl *NodeLiveness) setMembershipStatusInternal( return false, nil } - if oldLivenessRec.Liveness != (kvserverpb.Liveness{}) { - err := kvserverpb.ValidateLivenessTransition(oldLivenessRec.Liveness, newLiveness) - if err != nil { - return false, err - } + if err := kvserverpb.ValidateLivenessTransition(oldLivenessRec.Liveness, newLiveness); err != nil { + return false, err } update := livenessUpdate{ @@ -637,8 +633,19 @@ func (nl *NodeLiveness) StartHeartbeat( // Retry heartbeat in the event the conditional put fails. for r := retry.StartWithCtx(ctx, retryOpts); r.Next(); { oldLiveness, err := nl.Self() - if err != nil && !errors.Is(err, ErrNoLivenessRecord) { - log.Errorf(ctx, "unexpected error getting liveness: %+v", err) + if err != nil { + if !errors.Is(err, errLivenessRecordCacheMiss) { + // TODO(irfansharif): Remove this when we change the signature + // of SelfEx to return a boolean instead of an error. + log.Fatalf(ctx, "unexpected error getting liveness: %+v", err) + } + nodeID := nl.gossip.NodeID.Get() + liveness, err := nl.getLivenessFromKV(ctx, nodeID) + if err != nil { + log.Infof(ctx, "unable to get liveness record: %s", err) + continue + } + oldLiveness = liveness } if err := nl.heartbeatInternal(ctx, oldLiveness, incrementEpoch); err != nil { if errors.Is(err, ErrEpochIncremented) { @@ -798,67 +805,13 @@ func (nl *NodeLiveness) heartbeatInternal( } } - // Let's compute what our new liveness record should be. - var newLiveness kvserverpb.Liveness - if oldLiveness != (kvserverpb.Liveness{}) { - // Start off with our existing view of liveness. - newLiveness = oldLiveness - } else { - // We haven't seen our own liveness record yet[1]. This happens when - // we're heartbeating for the very first time. Let's retrieve it from KV - // before proceeding. - // - // [1]: Elsewhere we maintain the invariant that there always exist a - // liveness record for every given node. See the join RPC and - // WriteInitialClusterData for where that's done. - kv, err := nl.db.Get(ctx, keys.NodeLivenessKey(nodeID)) - if err != nil { - return errors.Wrap(err, "unable to get liveness") - } - - if kv.Value != nil { - // This is the happy path. Let's unpack the liveness record we found - // within KV, and use that to inform what our new liveness should - // be. - if err := kv.Value.GetProto(&oldLiveness); err != nil { - return errors.Wrap(err, "invalid liveness record") - } - - oldLivenessRec := LivenessRecord{ - Liveness: oldLiveness, - raw: kv.Value.TagAndDataBytes(), - } - - // Update our cache with the liveness record we just found. - nl.maybeUpdate(oldLivenessRec) - - newLiveness = oldLiveness - } else { - // This is a "should basically never happen" scenario given our - // invariant around always persisting liveness records on node - // startup. But that was a change we added in 20.2. Though unlikely, - // it's possible to get into the following scenario: - // - // - v20.1 node gets added to v20.1 cluster, and is quickly removed - // before being able to persist its liveness record. - // - The cluster is upgraded to v20.2. - // - The node from earlier is rolled into v20.2, and re-added to the - // cluster. - // - It's never able to successfully heartbeat (it didn't join - // through the join rpc, bootstrap, or gossip). Welp. - // - // Given this possibility, we'll just fall back to creating the - // liveness record here as we did in v20.1 code. - // - // TODO(irfansharif): Remove this once v20.2 is cut. - log.Warningf(ctx, "missing liveness record for n%d; falling back to creating it in-place", nodeID) - newLiveness = kvserverpb.Liveness{ - NodeID: nodeID, - Epoch: 0, // incremented to epoch=1 below as needed - } - } + if oldLiveness == (kvserverpb.Liveness{}) { + return errors.AssertionFailedf("invalid old liveness record; found to be empty") } + // Let's compute what our new liveness record should be. Start off with our + // existing view of things. + newLiveness := oldLiveness if incrementEpoch { newLiveness.Epoch++ newLiveness.Draining = false // clear draining field @@ -883,7 +836,7 @@ func (nl *NodeLiveness) heartbeatInternal( } written, err := nl.updateLiveness(ctx, update, func(actual LivenessRecord) error { // Update liveness to actual value on mismatch. - nl.maybeUpdate(actual) + nl.maybeUpdate(ctx, actual) // If the actual liveness is different than expected, but is // considered live, treat the heartbeat as a success. This can @@ -924,12 +877,12 @@ func (nl *NodeLiveness) heartbeatInternal( } log.VEventf(ctx, 1, "heartbeat %+v", written.Expiration) - nl.maybeUpdate(written) + nl.maybeUpdate(ctx, written) nl.metrics.HeartbeatSuccesses.Inc(1) return nil } -// Self returns the liveness record for this node. ErrNoLivenessRecord +// Self returns the liveness record for this node. ErrMissingLivenessRecord // is returned in the event that the node has neither heartbeat its // liveness record successfully, nor received a gossip message containing // a former liveness update on restart. @@ -1005,11 +958,53 @@ func (nl *NodeLiveness) GetLiveness(nodeID roachpb.NodeID) (LivenessRecord, erro return nl.getLivenessLocked(nodeID) } +// TODO(irfansharif): This only returns one possible error, so should be made to +// return a boolean instead. func (nl *NodeLiveness) getLivenessLocked(nodeID roachpb.NodeID) (LivenessRecord, error) { if l, ok := nl.mu.nodes[nodeID]; ok { return l, nil } - return LivenessRecord{}, ErrNoLivenessRecord + return LivenessRecord{}, errLivenessRecordCacheMiss +} + +// getLivenessFromKV fetches the liveness record from KV for a given node, and +// updates the internal in-memory cache when doing so. +func (nl *NodeLiveness) getLivenessFromKV( + ctx context.Context, nodeID roachpb.NodeID, +) (kvserverpb.Liveness, error) { + livenessRec, err := nl.getLivenessRecordFromKV(ctx, nodeID) + if err != nil { + return kvserverpb.Liveness{}, err + } + return livenessRec.Liveness, nil +} + +// getLivenessRecordFromKV is like getLivenessFromKV, but returns the raw, +// encoded value that the database has for this liveness record in addition to +// the decoded liveness proto. +func (nl *NodeLiveness) getLivenessRecordFromKV( + ctx context.Context, nodeID roachpb.NodeID, +) (LivenessRecord, error) { + kv, err := nl.db.Get(ctx, keys.NodeLivenessKey(nodeID)) + if err != nil { + return LivenessRecord{}, errors.Wrap(err, "unable to get liveness") + } + if kv.Value == nil { + return LivenessRecord{}, errors.AssertionFailedf("missing liveness record") + } + var liveness kvserverpb.Liveness + if err := kv.Value.GetProto(&liveness); err != nil { + return LivenessRecord{}, errors.Wrap(err, "invalid liveness record") + } + + livenessRec := LivenessRecord{ + Liveness: liveness, + raw: kv.Value.TagAndDataBytes(), + } + + // Update our cache with the liveness record we just found. + nl.maybeUpdate(ctx, livenessRec) + return livenessRec, nil } // IncrementEpoch is called to attempt to revoke another node's @@ -1059,7 +1054,7 @@ func (nl *NodeLiveness) IncrementEpoch(ctx context.Context, liveness kvserverpb. update.newLiveness.Epoch++ written, err := nl.updateLiveness(ctx, update, func(actual LivenessRecord) error { - nl.maybeUpdate(actual) + nl.maybeUpdate(ctx, actual) if actual.Epoch > liveness.Epoch { return ErrEpochAlreadyIncremented @@ -1073,7 +1068,7 @@ func (nl *NodeLiveness) IncrementEpoch(ctx context.Context, liveness kvserverpb. } log.Infof(ctx, "incremented n%d liveness epoch to %d", written.NodeID, written.Epoch) - nl.maybeUpdate(written) + nl.maybeUpdate(ctx, written) nl.metrics.EpochIncrements.Inc(1) return nil } @@ -1159,10 +1154,10 @@ func (nl *NodeLiveness) updateLivenessAttempt( } l, err := nl.GetLiveness(update.newLiveness.NodeID) - if err != nil && !errors.Is(err, ErrNoLivenessRecord) { + if err != nil { return LivenessRecord{}, err } - if err == nil && l.Liveness != update.oldLiveness { + if l.Liveness != update.oldLiveness { return LivenessRecord{}, handleCondFailed(l) } oldRaw = l.raw @@ -1220,40 +1215,49 @@ func (nl *NodeLiveness) updateLivenessAttempt( // maybeUpdate replaces the liveness (if it appears newer) and invokes the // registered callbacks if the node became live in the process. -func (nl *NodeLiveness) maybeUpdate(new LivenessRecord) { - // An empty new means that we haven't updated anything. - if new.Liveness == (kvserverpb.Liveness{}) { - return +func (nl *NodeLiveness) maybeUpdate(ctx context.Context, newLivenessRec LivenessRecord) { + if newLivenessRec.Liveness == (kvserverpb.Liveness{}) { + log.Fatal(ctx, "invalid new liveness record; found to be empty") } + var shouldReplace bool nl.mu.Lock() - old := nl.mu.nodes[new.NodeID] + oldLivenessRec, err := nl.getLivenessLocked(newLivenessRec.NodeID) + if err != nil { + if !errors.Is(err, errLivenessRecordCacheMiss) { + // TODO(irfansharif): Remove this when we change the signature + // of SelfEx to return a boolean instead of an error. + log.Fatalf(ctx, "unexpected error getting liveness: %+v", err) + } + shouldReplace = true + } else { + shouldReplace = shouldReplaceLiveness(ctx, oldLivenessRec.Liveness, newLivenessRec.Liveness) + } - should := shouldReplaceLiveness(old.Liveness, new.Liveness) var callbacks []IsLiveCallback - if should { - nl.mu.nodes[new.NodeID] = new + if shouldReplace { + nl.mu.nodes[newLivenessRec.NodeID] = newLivenessRec callbacks = append(callbacks, nl.mu.callbacks...) } nl.mu.Unlock() - if !should { + if !shouldReplace { return } now := nl.clock.Now().GoTime() - if !old.IsLive(now) && new.IsLive(now) { + if !oldLivenessRec.IsLive(now) && newLivenessRec.IsLive(now) { for _, fn := range callbacks { - fn(new.Liveness) + fn(newLivenessRec.Liveness) } } } // shouldReplaceLiveness checks to see if the new liveness is in fact newer // than the old liveness. -func shouldReplaceLiveness(old, new kvserverpb.Liveness) bool { +func shouldReplaceLiveness(ctx context.Context, old, new kvserverpb.Liveness) bool { if (old == kvserverpb.Liveness{}) { - return true + log.Fatal(ctx, "invalid old liveness record; found to be empty") } // Compare liveness information. If old < new, replace. @@ -1277,12 +1281,13 @@ func shouldReplaceLiveness(old, new kvserverpb.Liveness) bool { // in-memory liveness info up to date. func (nl *NodeLiveness) livenessGossipUpdate(_ string, content roachpb.Value) { var liveness kvserverpb.Liveness + ctx := context.TODO() if err := content.GetProto(&liveness); err != nil { - log.Errorf(context.TODO(), "%v", err) + log.Errorf(ctx, "%v", err) return } - nl.maybeUpdate(LivenessRecord{Liveness: liveness, raw: content.TagAndDataBytes()}) + nl.maybeUpdate(ctx, LivenessRecord{Liveness: liveness, raw: content.TagAndDataBytes()}) } // numLiveNodes is used to populate a metric that tracks the number of live @@ -1294,8 +1299,6 @@ func (nl *NodeLiveness) livenessGossipUpdate(_ string, content roachpb.Value) { // nodes reporting the metric, so it's simplest to just have all live nodes // report it. func (nl *NodeLiveness) numLiveNodes() int64 { - ctx := nl.ambientCtx.AnnotateCtx(context.Background()) - selfID := nl.gossip.NodeID.Get() if selfID == 0 { return 0 @@ -1305,11 +1308,7 @@ func (nl *NodeLiveness) numLiveNodes() int64 { defer nl.mu.RUnlock() self, err := nl.getLivenessLocked(selfID) - if errors.Is(err, ErrNoLivenessRecord) { - return 0 - } if err != nil { - log.Warningf(ctx, "looking up own liveness: %+v", err) return 0 } now := nl.clock.Now().GoTime() diff --git a/pkg/kv/kvserver/node_liveness_test.go b/pkg/kv/kvserver/node_liveness_test.go index 32ee16e991dc..9fb297289ddb 100644 --- a/pkg/kv/kvserver/node_liveness_test.go +++ b/pkg/kv/kvserver/node_liveness_test.go @@ -1102,7 +1102,7 @@ func testNodeLivenessSetDecommissioning(t *testing.T, decommissionNodeIdx int) { oldLivenessRec, err := callerNodeLiveness.GetLiveness(nodeID) assert.Nil(t, err) if _, err := callerNodeLiveness.SetDecommissioningInternal( - ctx, nodeID, oldLivenessRec, kvserverpb.MembershipStatus_ACTIVE, + ctx, oldLivenessRec, kvserverpb.MembershipStatus_ACTIVE, ); err != nil { t.Fatal(err) } @@ -1159,7 +1159,7 @@ func TestNodeLivenessDecommissionAbsent(t *testing.T) { // When the node simply never existed, expect an error. if _, err := mtc.nodeLivenesses[0].SetMembershipStatus( ctx, goneNodeID, kvserverpb.MembershipStatus_DECOMMISSIONING, - ); !errors.Is(err, kvserver.ErrNoLivenessRecord) { + ); !errors.Is(err, kvserver.ErrMissingLivenessRecord) { t.Fatal(err) } diff --git a/pkg/kv/kvserver/node_liveness_unit_test.go b/pkg/kv/kvserver/node_liveness_unit_test.go index 412e69262953..9b780ace0777 100644 --- a/pkg/kv/kvserver/node_liveness_unit_test.go +++ b/pkg/kv/kvserver/node_liveness_unit_test.go @@ -11,6 +11,7 @@ package kvserver import ( + "context" "fmt" "testing" @@ -106,7 +107,7 @@ func TestShouldReplaceLiveness(t *testing.T) { }, } { t.Run("", func(t *testing.T) { - if act := shouldReplaceLiveness(test.old, test.new); act != test.exp { + if act := shouldReplaceLiveness(context.Background(), test.old, test.new); act != test.exp { t.Errorf("unexpected update: %+v", test) } })