From 6d7d5306cabfa0cd8e897ee50e68c0cbdbfccca3 Mon Sep 17 00:00:00 2001 From: irfan sharif Date: Thu, 24 Sep 2020 18:07:54 -0400 Subject: [PATCH] kvserver: add assertions for invariants around up liveness records Now that we have #53842, we maintain the invariant that there always exists a liveness record for any given node. We can now simplify our handling of liveness records internally: where previously we had code to handle the possibility of empty liveness records (we created a new one on the fly), we change them to assertions that verify that empty liveness records are no longer flying around in the system. When retrieving the liveness record from our in-memory cache, it was possible for us to not find anything due to gossip delays. Instead of simply giving up then, now we can read the records directly from KV (and evebtually update our caches to store this newly read record). This PR introduces this mechanism through usage of `getLivenessRecordFromKV`. We should note that the existing cache structure within NodeLiveness is a look-aside cache, and that's not changed. It would further simplify things if it was a look-through cache where the update happened while fetching any record and failing to find it, but we defer that to future work. A TODO outlining this will be introduced in a future commit. A note for ease of review: one structural change introduced in this diff is breaking down `ErrNoLivenessRecord` into `ErrMissingLivenessRecord` and `errLivenessRecordCacheMiss`. The former will be used in a future commit to generate better hints for users (it'll only ever surface when attempting to decommission/recommission non-existent nodes). The latter is used to represent cache misses. This too will be improved in a future commit, where instead of returning a specific error on cache access, we'll return a boolean instead. --- We don't intend to backport this to 20.2 due to the hazard described in \#54216. We want this PR to bake on master and (possibly) trip up the assertions added above if we've missed anything. They're the only ones checking for the invariant we've introduced around liveness records. That invariant will be depended on for long running migrations, so better to shake things out early. Release note: None --- pkg/kv/kvserver/helpers_test.go | 7 +- pkg/kv/kvserver/node_liveness.go | 269 ++++++++++----------- pkg/kv/kvserver/node_liveness_test.go | 4 +- pkg/kv/kvserver/node_liveness_unit_test.go | 3 +- 4 files changed, 140 insertions(+), 143 deletions(-) diff --git a/pkg/kv/kvserver/helpers_test.go b/pkg/kv/kvserver/helpers_test.go index 8431eb3c70d0..a359b65909cb 100644 --- a/pkg/kv/kvserver/helpers_test.go +++ b/pkg/kv/kvserver/helpers_test.go @@ -514,12 +514,9 @@ func (nl *NodeLiveness) SetDrainingInternal( } func (nl *NodeLiveness) SetDecommissioningInternal( - ctx context.Context, - nodeID roachpb.NodeID, - oldLivenessRec LivenessRecord, - targetStatus kvserverpb.MembershipStatus, + ctx context.Context, oldLivenessRec LivenessRecord, targetStatus kvserverpb.MembershipStatus, ) (changeCommitted bool, err error) { - return nl.setMembershipStatusInternal(ctx, nodeID, oldLivenessRec, targetStatus) + return nl.setMembershipStatusInternal(ctx, oldLivenessRec, targetStatus) } // GetCircuitBreaker returns the circuit breaker controlling diff --git a/pkg/kv/kvserver/node_liveness.go b/pkg/kv/kvserver/node_liveness.go index 76d3ea74a168..436b86a47a00 100644 --- a/pkg/kv/kvserver/node_liveness.go +++ b/pkg/kv/kvserver/node_liveness.go @@ -40,9 +40,14 @@ import ( ) var ( - // ErrNoLivenessRecord is returned when asking for liveness information - // about a node for which nothing is known. - ErrNoLivenessRecord = errors.New("node not in the liveness table") + // ErrMissingLivenessRecord is returned when asking for liveness information + // about a node for which nothing is known. This happens when attempting to + // {d,r}ecommission a non-existent node. + ErrMissingLivenessRecord = errors.New("missing liveness record") + + // errLivenessRecordCacheMiss is returned when asking for the liveness + // record of a given node and it is not found in the in-memory cache. + errLivenessRecordCacheMiss = errors.New("liveness record not found in cache") // errChangeMembershipStatusFailed is returned when we're not able to // conditionally write the target membership status. It's safe to retry @@ -260,11 +265,22 @@ func (nl *NodeLiveness) SetDraining( ctx = nl.ambientCtx.AnnotateCtx(ctx) for r := retry.StartWithCtx(ctx, base.DefaultRetryOptions()); r.Next(); { oldLivenessRec, err := nl.SelfEx() - if err != nil && !errors.Is(err, ErrNoLivenessRecord) { - log.Errorf(ctx, "unexpected error getting liveness: %+v", err) - } - err = nl.setDrainingInternal(ctx, oldLivenessRec, drain, reporter) if err != nil { + if !errors.Is(err, errLivenessRecordCacheMiss) { + // TODO(irfansharif): Remove this when we change the signature + // of SelfEx to return a boolean instead of an error. + log.Fatalf(ctx, "unexpected error getting liveness: %+v", err) + } + // There was a cache miss, let's now fetch the record from KV + // directly. + nodeID := nl.gossip.NodeID.Get() + livenessRec, err := nl.getLivenessRecordFromKV(ctx, nodeID) + if err != nil { + return err + } + oldLivenessRec = livenessRec + } + if err := nl.setDrainingInternal(ctx, oldLivenessRec, drain, reporter); err != nil { if log.V(1) { log.Infof(ctx, "attempting to set liveness draining status to %v: %v", drain, err) } @@ -336,7 +352,8 @@ func (nl *NodeLiveness) SetMembershipStatus( return false, errors.Wrap(err, "unable to get liveness") } if kv.Value == nil { - return false, ErrNoLivenessRecord + // We must be trying to decommission a node that does not exist. + return false, ErrMissingLivenessRecord } if err := kv.Value.GetProto(&oldLiveness); err != nil { return false, errors.Wrap(err, "invalid liveness record") @@ -351,8 +368,8 @@ func (nl *NodeLiveness) SetMembershipStatus( // Offer it to make sure that when we actually try to update the // liveness, the previous view is correct. This, too, is required to // de-flake TestNodeLivenessDecommissionAbsent. - nl.maybeUpdate(oldLivenessRec) - return nl.setMembershipStatusInternal(ctx, nodeID, oldLivenessRec, targetStatus) + nl.maybeUpdate(ctx, oldLivenessRec) + return nl.setMembershipStatusInternal(ctx, oldLivenessRec, targetStatus) } for { @@ -383,18 +400,14 @@ func (nl *NodeLiveness) setDrainingInternal( <-sem }() - // Let's compute what our new liveness record should be. - var newLiveness kvserverpb.Liveness if oldLivenessRec.Liveness == (kvserverpb.Liveness{}) { - // Liveness record didn't previously exist, so we create one. - newLiveness = kvserverpb.Liveness{ - NodeID: nodeID, - Epoch: 1, - } - } else { - newLiveness = oldLivenessRec.Liveness + return errors.AssertionFailedf("invalid old liveness record; found to be empty") } + // Let's compute what our new liveness record should be. We start off with a + // copy of our existing liveness record. + newLiveness := oldLivenessRec.Liveness + if reporter != nil && drain && !newLiveness.Draining { // Report progress to the Drain RPC. reporter(1, "liveness record") @@ -408,7 +421,7 @@ func (nl *NodeLiveness) setDrainingInternal( ignoreCache: true, } written, err := nl.updateLiveness(ctx, update, func(actual LivenessRecord) error { - nl.maybeUpdate(actual) + nl.maybeUpdate(ctx, actual) if actual.Draining == update.newLiveness.Draining { return errNodeDrainingSet @@ -425,7 +438,7 @@ func (nl *NodeLiveness) setDrainingInternal( return err } - nl.maybeUpdate(written) + nl.maybeUpdate(ctx, written) return nil } @@ -505,29 +518,15 @@ func (nl *NodeLiveness) CreateLivenessRecord(ctx context.Context, nodeID roachpb } func (nl *NodeLiveness) setMembershipStatusInternal( - ctx context.Context, - nodeID roachpb.NodeID, - oldLivenessRec LivenessRecord, - targetStatus kvserverpb.MembershipStatus, + ctx context.Context, oldLivenessRec LivenessRecord, targetStatus kvserverpb.MembershipStatus, ) (statusChanged bool, err error) { - // Let's compute what our new liveness record should be. - var newLiveness kvserverpb.Liveness if oldLivenessRec.Liveness == (kvserverpb.Liveness{}) { - // Liveness record didn't previously exist, so we create one. - // - // TODO(irfansharif): The above is now no longer possible. We always - // create one (see CreateLivenessRecord, WriteInitialClusterData) when - // adding a node to the cluster. We should clean up all this logic that - // tries to work around the liveness record possibly not existing. - newLiveness = kvserverpb.Liveness{ - NodeID: nodeID, - Epoch: 1, - } - } else { - // We start off with a copy of our existing liveness record. - newLiveness = oldLivenessRec.Liveness + return false, errors.AssertionFailedf("invalid old liveness record; found to be empty") } + // Let's compute what our new liveness record should be. We start off with a + // copy of our existing liveness record. + newLiveness := oldLivenessRec.Liveness newLiveness.Membership = targetStatus if oldLivenessRec.Membership == newLiveness.Membership { // No-op. Return early. @@ -539,11 +538,8 @@ func (nl *NodeLiveness) setMembershipStatusInternal( return false, nil } - if oldLivenessRec.Liveness != (kvserverpb.Liveness{}) { - err := kvserverpb.ValidateLivenessTransition(oldLivenessRec.Liveness, newLiveness) - if err != nil { - return false, err - } + if err := kvserverpb.ValidateLivenessTransition(oldLivenessRec.Liveness, newLiveness); err != nil { + return false, err } update := livenessUpdate{ @@ -636,8 +632,19 @@ func (nl *NodeLiveness) StartHeartbeat( // Retry heartbeat in the event the conditional put fails. for r := retry.StartWithCtx(ctx, retryOpts); r.Next(); { oldLiveness, err := nl.Self() - if err != nil && !errors.Is(err, ErrNoLivenessRecord) { - log.Errorf(ctx, "unexpected error getting liveness: %+v", err) + if err != nil { + if !errors.Is(err, errLivenessRecordCacheMiss) { + // TODO(irfansharif): Remove this when we change the signature + // of SelfEx to return a boolean instead of an error. + log.Fatalf(ctx, "unexpected error getting liveness: %+v", err) + } + nodeID := nl.gossip.NodeID.Get() + liveness, err := nl.getLivenessFromKV(ctx, nodeID) + if err != nil { + log.Infof(ctx, "unable to get liveness record: %s", err) + continue + } + oldLiveness = liveness } if err := nl.heartbeatInternal(ctx, oldLiveness, incrementEpoch); err != nil { if errors.Is(err, ErrEpochIncremented) { @@ -797,67 +804,13 @@ func (nl *NodeLiveness) heartbeatInternal( } } - // Let's compute what our new liveness record should be. - var newLiveness kvserverpb.Liveness - if oldLiveness != (kvserverpb.Liveness{}) { - // Start off with our existing view of liveness. - newLiveness = oldLiveness - } else { - // We haven't seen our own liveness record yet[1]. This happens when - // we're heartbeating for the very first time. Let's retrieve it from KV - // before proceeding. - // - // [1]: Elsewhere we maintain the invariant that there always exist a - // liveness record for every given node. See the join RPC and - // WriteInitialClusterData for where that's done. - kv, err := nl.db.Get(ctx, keys.NodeLivenessKey(nodeID)) - if err != nil { - return errors.Wrap(err, "unable to get liveness") - } - - if kv.Value != nil { - // This is the happy path. Let's unpack the liveness record we found - // within KV, and use that to inform what our new liveness should - // be. - if err := kv.Value.GetProto(&oldLiveness); err != nil { - return errors.Wrap(err, "invalid liveness record") - } - - oldLivenessRec := LivenessRecord{ - Liveness: oldLiveness, - raw: kv.Value.TagAndDataBytes(), - } - - // Update our cache with the liveness record we just found. - nl.maybeUpdate(oldLivenessRec) - - newLiveness = oldLiveness - } else { - // This is a "should basically never happen" scenario given our - // invariant around always persisting liveness records on node - // startup. But that was a change we added in 20.2. Though unlikely, - // it's possible to get into the following scenario: - // - // - v20.1 node gets added to v20.1 cluster, and is quickly removed - // before being able to persist its liveness record. - // - The cluster is upgraded to v20.2. - // - The node from earlier is rolled into v20.2, and re-added to the - // cluster. - // - It's never able to successfully heartbeat (it didn't join - // through the join rpc, bootstrap, or gossip). Welp. - // - // Given this possibility, we'll just fall back to creating the - // liveness record here as we did in v20.1 code. - // - // TODO(irfansharif): Remove this once v20.2 is cut. - log.Warningf(ctx, "missing liveness record for n%d; falling back to creating it in-place", nodeID) - newLiveness = kvserverpb.Liveness{ - NodeID: nodeID, - Epoch: 0, // incremented to epoch=1 below as needed - } - } + if oldLiveness == (kvserverpb.Liveness{}) { + return errors.AssertionFailedf("invalid old liveness record; found to be empty") } + // Let's compute what our new liveness record should be. Start off with our + // existing view of things. + newLiveness := oldLiveness if incrementEpoch { newLiveness.Epoch++ newLiveness.Draining = false // clear draining field @@ -882,7 +835,7 @@ func (nl *NodeLiveness) heartbeatInternal( } written, err := nl.updateLiveness(ctx, update, func(actual LivenessRecord) error { // Update liveness to actual value on mismatch. - nl.maybeUpdate(actual) + nl.maybeUpdate(ctx, actual) // If the actual liveness is different than expected, but is // considered live, treat the heartbeat as a success. This can @@ -923,12 +876,12 @@ func (nl *NodeLiveness) heartbeatInternal( } log.VEventf(ctx, 1, "heartbeat %+v", written.Expiration) - nl.maybeUpdate(written) + nl.maybeUpdate(ctx, written) nl.metrics.HeartbeatSuccesses.Inc(1) return nil } -// Self returns the liveness record for this node. ErrNoLivenessRecord +// Self returns the liveness record for this node. ErrMissingLivenessRecord // is returned in the event that the node has neither heartbeat its // liveness record successfully, nor received a gossip message containing // a former liveness update on restart. @@ -1004,11 +957,53 @@ func (nl *NodeLiveness) GetLiveness(nodeID roachpb.NodeID) (LivenessRecord, erro return nl.getLivenessLocked(nodeID) } +// TODO(irfansharif): This only returns one possible error, so should be made to +// return a boolean instead. func (nl *NodeLiveness) getLivenessLocked(nodeID roachpb.NodeID) (LivenessRecord, error) { if l, ok := nl.mu.nodes[nodeID]; ok { return l, nil } - return LivenessRecord{}, ErrNoLivenessRecord + return LivenessRecord{}, errLivenessRecordCacheMiss +} + +// getLivenessFromKV fetches the liveness record from KV for a given node, and +// updates the internal in-memory cache when doing so. +func (nl *NodeLiveness) getLivenessFromKV( + ctx context.Context, nodeID roachpb.NodeID, +) (kvserverpb.Liveness, error) { + livenessRec, err := nl.getLivenessRecordFromKV(ctx, nodeID) + if err != nil { + return kvserverpb.Liveness{}, err + } + return livenessRec.Liveness, nil +} + +// getLivenessRecordFromKV is like getLivenessFromKV, but returns the raw, +// encoded value that the database has for this liveness record in addition to +// the decoded liveness proto. +func (nl *NodeLiveness) getLivenessRecordFromKV( + ctx context.Context, nodeID roachpb.NodeID, +) (LivenessRecord, error) { + kv, err := nl.db.Get(ctx, keys.NodeLivenessKey(nodeID)) + if err != nil { + return LivenessRecord{}, errors.Wrap(err, "unable to get liveness") + } + if kv.Value == nil { + return LivenessRecord{}, errors.AssertionFailedf("missing liveness record") + } + var liveness kvserverpb.Liveness + if err := kv.Value.GetProto(&liveness); err != nil { + return LivenessRecord{}, errors.Wrap(err, "invalid liveness record") + } + + livenessRec := LivenessRecord{ + Liveness: liveness, + raw: kv.Value.TagAndDataBytes(), + } + + // Update our cache with the liveness record we just found. + nl.maybeUpdate(ctx, livenessRec) + return livenessRec, nil } // IncrementEpoch is called to attempt to revoke another node's @@ -1058,7 +1053,7 @@ func (nl *NodeLiveness) IncrementEpoch(ctx context.Context, liveness kvserverpb. update.newLiveness.Epoch++ written, err := nl.updateLiveness(ctx, update, func(actual LivenessRecord) error { - nl.maybeUpdate(actual) + nl.maybeUpdate(ctx, actual) if actual.Epoch > liveness.Epoch { return ErrEpochAlreadyIncremented @@ -1072,7 +1067,7 @@ func (nl *NodeLiveness) IncrementEpoch(ctx context.Context, liveness kvserverpb. } log.Infof(ctx, "incremented n%d liveness epoch to %d", written.NodeID, written.Epoch) - nl.maybeUpdate(written) + nl.maybeUpdate(ctx, written) nl.metrics.EpochIncrements.Inc(1) return nil } @@ -1158,10 +1153,10 @@ func (nl *NodeLiveness) updateLivenessAttempt( } l, err := nl.GetLiveness(update.newLiveness.NodeID) - if err != nil && !errors.Is(err, ErrNoLivenessRecord) { + if err != nil { return LivenessRecord{}, err } - if err == nil && l.Liveness != update.oldLiveness { + if l.Liveness != update.oldLiveness { return LivenessRecord{}, handleCondFailed(l) } oldRaw = l.raw @@ -1219,40 +1214,49 @@ func (nl *NodeLiveness) updateLivenessAttempt( // maybeUpdate replaces the liveness (if it appears newer) and invokes the // registered callbacks if the node became live in the process. -func (nl *NodeLiveness) maybeUpdate(new LivenessRecord) { - // An empty new means that we haven't updated anything. - if new.Liveness == (kvserverpb.Liveness{}) { - return +func (nl *NodeLiveness) maybeUpdate(ctx context.Context, newLivenessRec LivenessRecord) { + if newLivenessRec.Liveness == (kvserverpb.Liveness{}) { + log.Fatal(ctx, "invalid new liveness record; found to be empty") } + var shouldReplace bool nl.mu.Lock() - old := nl.mu.nodes[new.NodeID] + oldLivenessRec, err := nl.getLivenessLocked(newLivenessRec.NodeID) + if err != nil { + if !errors.Is(err, errLivenessRecordCacheMiss) { + // TODO(irfansharif): Remove this when we change the signature + // of SelfEx to return a boolean instead of an error. + log.Fatalf(ctx, "unexpected error getting liveness: %+v", err) + } + shouldReplace = true + } else { + shouldReplace = shouldReplaceLiveness(ctx, oldLivenessRec.Liveness, newLivenessRec.Liveness) + } - should := shouldReplaceLiveness(old.Liveness, new.Liveness) var callbacks []IsLiveCallback - if should { - nl.mu.nodes[new.NodeID] = new + if shouldReplace { + nl.mu.nodes[newLivenessRec.NodeID] = newLivenessRec callbacks = append(callbacks, nl.mu.callbacks...) } nl.mu.Unlock() - if !should { + if !shouldReplace { return } now := nl.clock.Now().GoTime() - if !old.IsLive(now) && new.IsLive(now) { + if !oldLivenessRec.IsLive(now) && newLivenessRec.IsLive(now) { for _, fn := range callbacks { - fn(new.Liveness) + fn(newLivenessRec.Liveness) } } } // shouldReplaceLiveness checks to see if the new liveness is in fact newer // than the old liveness. -func shouldReplaceLiveness(old, new kvserverpb.Liveness) bool { +func shouldReplaceLiveness(ctx context.Context, old, new kvserverpb.Liveness) bool { if (old == kvserverpb.Liveness{}) { - return true + log.Fatal(ctx, "invalid old liveness record; found to be empty") } // Compare liveness information. If old < new, replace. @@ -1276,12 +1280,13 @@ func shouldReplaceLiveness(old, new kvserverpb.Liveness) bool { // in-memory liveness info up to date. func (nl *NodeLiveness) livenessGossipUpdate(_ string, content roachpb.Value) { var liveness kvserverpb.Liveness + ctx := context.TODO() if err := content.GetProto(&liveness); err != nil { - log.Errorf(context.TODO(), "%v", err) + log.Errorf(ctx, "%v", err) return } - nl.maybeUpdate(LivenessRecord{Liveness: liveness, raw: content.TagAndDataBytes()}) + nl.maybeUpdate(ctx, LivenessRecord{Liveness: liveness, raw: content.TagAndDataBytes()}) } // numLiveNodes is used to populate a metric that tracks the number of live @@ -1293,8 +1298,6 @@ func (nl *NodeLiveness) livenessGossipUpdate(_ string, content roachpb.Value) { // nodes reporting the metric, so it's simplest to just have all live nodes // report it. func (nl *NodeLiveness) numLiveNodes() int64 { - ctx := nl.ambientCtx.AnnotateCtx(context.Background()) - selfID := nl.gossip.NodeID.Get() if selfID == 0 { return 0 @@ -1304,11 +1307,7 @@ func (nl *NodeLiveness) numLiveNodes() int64 { defer nl.mu.RUnlock() self, err := nl.getLivenessLocked(selfID) - if errors.Is(err, ErrNoLivenessRecord) { - return 0 - } if err != nil { - log.Warningf(ctx, "looking up own liveness: %+v", err) return 0 } now := nl.clock.Now().GoTime() diff --git a/pkg/kv/kvserver/node_liveness_test.go b/pkg/kv/kvserver/node_liveness_test.go index 7d41fe348794..af95e76c6839 100644 --- a/pkg/kv/kvserver/node_liveness_test.go +++ b/pkg/kv/kvserver/node_liveness_test.go @@ -1102,7 +1102,7 @@ func testNodeLivenessSetDecommissioning(t *testing.T, decommissionNodeIdx int) { oldLivenessRec, err := callerNodeLiveness.GetLiveness(nodeID) assert.Nil(t, err) if _, err := callerNodeLiveness.SetDecommissioningInternal( - ctx, nodeID, oldLivenessRec, kvserverpb.MembershipStatus_ACTIVE, + ctx, oldLivenessRec, kvserverpb.MembershipStatus_ACTIVE, ); err != nil { t.Fatal(err) } @@ -1159,7 +1159,7 @@ func TestNodeLivenessDecommissionAbsent(t *testing.T) { // When the node simply never existed, expect an error. if _, err := mtc.nodeLivenesses[0].SetMembershipStatus( ctx, goneNodeID, kvserverpb.MembershipStatus_DECOMMISSIONING, - ); !errors.Is(err, kvserver.ErrNoLivenessRecord) { + ); !errors.Is(err, kvserver.ErrMissingLivenessRecord) { t.Fatal(err) } diff --git a/pkg/kv/kvserver/node_liveness_unit_test.go b/pkg/kv/kvserver/node_liveness_unit_test.go index 412e69262953..9b780ace0777 100644 --- a/pkg/kv/kvserver/node_liveness_unit_test.go +++ b/pkg/kv/kvserver/node_liveness_unit_test.go @@ -11,6 +11,7 @@ package kvserver import ( + "context" "fmt" "testing" @@ -106,7 +107,7 @@ func TestShouldReplaceLiveness(t *testing.T) { }, } { t.Run("", func(t *testing.T) { - if act := shouldReplaceLiveness(test.old, test.new); act != test.exp { + if act := shouldReplaceLiveness(context.Background(), test.old, test.new); act != test.exp { t.Errorf("unexpected update: %+v", test) } })