diff --git a/pkg/cli/error.go b/pkg/cli/error.go index 30a255eea8a5..b187bc7c52dd 100644 --- a/pkg/cli/error.go +++ b/pkg/cli/error.go @@ -20,6 +20,7 @@ import ( "strconv" "strings" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver" "github.com/cockroachdb/cockroach/pkg/security" "github.com/cockroachdb/cockroach/pkg/server" "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgcode" @@ -359,6 +360,11 @@ func MaybeDecorateGRPCError( return server.ErrClusterInitialized } + // Are we trying to recommission/decommision a node that does not exist? + if strings.Contains(err.Error(), kvserver.ErrMissingLivenessRecord.Error()) { + return fmt.Errorf("node does not exist") + } + // Nothing we can special case, just return what we have. return err } diff --git a/pkg/kv/kvserver/helpers_test.go b/pkg/kv/kvserver/helpers_test.go index a3a61b766b5b..330b7a4c229a 100644 --- a/pkg/kv/kvserver/helpers_test.go +++ b/pkg/kv/kvserver/helpers_test.go @@ -515,11 +515,10 @@ func (nl *NodeLiveness) SetDrainingInternal( func (nl *NodeLiveness) SetDecommissioningInternal( ctx context.Context, - nodeID roachpb.NodeID, - liveness LivenessRecord, + oldLivenessRec LivenessRecord, targetStatus kvserverpb.MembershipStatus, ) (changeCommitted bool, err error) { - return nl.setMembershipStatusInternal(ctx, nodeID, liveness, targetStatus) + return nl.setMembershipStatusInternal(ctx, oldLivenessRec, targetStatus) } // GetCircuitBreaker returns the circuit breaker controlling diff --git a/pkg/kv/kvserver/node_liveness.go b/pkg/kv/kvserver/node_liveness.go index 80a08f2695c1..21835ab5c273 100644 --- a/pkg/kv/kvserver/node_liveness.go +++ b/pkg/kv/kvserver/node_liveness.go @@ -40,9 +40,14 @@ import ( ) var ( - // ErrNoLivenessRecord is returned when asking for liveness information + // ErrMissingLivenessRecord is returned when asking for liveness information // about a node for which nothing is known. - ErrNoLivenessRecord = errors.New("node not in the liveness table") + ErrMissingLivenessRecord = errors.New("missing liveness record") + + // errLivenessRecordCacheMiss is returned when asking for the liveness + // record of a given node and it is not found in the in-memory cache of + // records. + errLivenessRecordCacheMiss = errors.New("node not in the liveness record cache") // errChangeMembershipStatusFailed is returned when we're not able to // conditionally write the target membership status. It's safe to retry @@ -260,9 +265,17 @@ func (nl *NodeLiveness) SetDraining( ctx = nl.ambientCtx.AnnotateCtx(ctx) for r := retry.StartWithCtx(ctx, base.DefaultRetryOptions()); r.Next(); { oldLivenessRec, err := nl.SelfEx() - if err != nil && !errors.Is(err, ErrNoLivenessRecord) { + if err != nil && !errors.Is(err, errLivenessRecordCacheMiss) { log.Errorf(ctx, "unexpected error getting liveness: %+v", err) } + if errors.Is(err, errLivenessRecordCacheMiss) { + nodeID := nl.gossip.NodeID.Get() + oldLivenessRec, err = nl.getLivenessRecordFromKV(ctx, nodeID) + if err != nil { + log.Errorf(ctx, "unexpected error getting liveness: %+v", err) + continue + } + } err = nl.setDrainingInternal(ctx, oldLivenessRec, drain, reporter) if err != nil { if log.V(1) { @@ -332,7 +345,8 @@ func (nl *NodeLiveness) SetMembershipStatus( return false, errors.Wrap(err, "unable to get liveness") } if kv.Value == nil { - return false, ErrNoLivenessRecord + // We must be trying to decommission a node that does not exist. + return false, ErrMissingLivenessRecord } if err := kv.Value.GetProto(&oldLiveness); err != nil { return false, errors.Wrap(err, "invalid liveness record") @@ -347,8 +361,8 @@ func (nl *NodeLiveness) SetMembershipStatus( // Offer it to make sure that when we actually try to update the // liveness, the previous view is correct. This, too, is required to // de-flake TestNodeLivenessDecommissionAbsent. - nl.maybeUpdate(oldLivenessRec) - return nl.setMembershipStatusInternal(ctx, nodeID, oldLivenessRec, targetStatus) + nl.maybeUpdate(ctx, oldLivenessRec) + return nl.setMembershipStatusInternal(ctx, oldLivenessRec, targetStatus) } for { @@ -379,18 +393,14 @@ func (nl *NodeLiveness) setDrainingInternal( <-sem }() - // Let's compute what our new liveness record should be. - var newLiveness kvserverpb.Liveness if oldLivenessRec.Liveness == (kvserverpb.Liveness{}) { - // Liveness record didn't previously exist, so we create one. - newLiveness = kvserverpb.Liveness{ - NodeID: nodeID, - Epoch: 1, - } - } else { - newLiveness = oldLivenessRec.Liveness + log.Fatal(ctx, "invalid old liveness record; found to be empty") } + // Let's compute what our new liveness record should be. We start off with a + // copy of our existing liveness record. + newLiveness := oldLivenessRec.Liveness + if reporter != nil && drain && !newLiveness.Draining { // Report progress to the Drain RPC. reporter(1, "liveness record") @@ -404,7 +414,7 @@ func (nl *NodeLiveness) setDrainingInternal( ignoreCache: true, } written, err := nl.updateLiveness(ctx, update, func(actual LivenessRecord) error { - nl.maybeUpdate(actual) + nl.maybeUpdate(ctx, actual) if actual.Draining == update.newLiveness.Draining { return errNodeDrainingSet @@ -421,7 +431,7 @@ func (nl *NodeLiveness) setDrainingInternal( return err } - nl.maybeUpdate(written) + nl.maybeUpdate(ctx, written) return nil } @@ -502,28 +512,16 @@ func (nl *NodeLiveness) CreateLivenessRecord(ctx context.Context, nodeID roachpb func (nl *NodeLiveness) setMembershipStatusInternal( ctx context.Context, - nodeID roachpb.NodeID, oldLivenessRec LivenessRecord, targetStatus kvserverpb.MembershipStatus, ) (statusChanged bool, err error) { - // Let's compute what our new liveness record should be. - var newLiveness kvserverpb.Liveness if oldLivenessRec.Liveness == (kvserverpb.Liveness{}) { - // Liveness record didn't previously exist, so we create one. - // - // TODO(irfansharif): The above is now no longer possible. We always - // create one (see CreateLivenessRecord, WriteInitialClusterData) when - // adding a node to the cluster. We should clean up all this logic that - // tries to work around the liveness record possibly not existing. - newLiveness = kvserverpb.Liveness{ - NodeID: nodeID, - Epoch: 1, - } - } else { - // We start off with a copy of our existing liveness record. - newLiveness = oldLivenessRec.Liveness + log.Fatal(ctx, "invalid old liveness record; found to be empty") } + // Let's compute what our new liveness record should be. We start off with a + // copy of our existing liveness record. + newLiveness := oldLivenessRec.Liveness newLiveness.Membership = targetStatus if oldLivenessRec.Membership == newLiveness.Membership { // No-op. Return early. @@ -535,11 +533,8 @@ func (nl *NodeLiveness) setMembershipStatusInternal( return false, nil } - if oldLivenessRec.Liveness != (kvserverpb.Liveness{}) { - err := kvserverpb.ValidateLivenessTransition(oldLivenessRec.Liveness, newLiveness) - if err != nil { - return false, err - } + if err := kvserverpb.ValidateLivenessTransition(oldLivenessRec.Liveness, newLiveness); err != nil { + return false, err } update := livenessUpdate{ @@ -592,7 +587,10 @@ func (nl *NodeLiveness) IsLive(nodeID roachpb.NodeID) (bool, error) { // liveness. The slice of engines will be written to before each heartbeat to // avoid maintaining liveness in the presence of disk stalls. func (nl *NodeLiveness) StartHeartbeat( - ctx context.Context, stopper *stop.Stopper, engines []storage.Engine, alive HeartbeatCallback, + ctx context.Context, + stopper *stop.Stopper, + engines []storage.Engine, + alive HeartbeatCallback, ) { log.VEventf(ctx, 1, "starting liveness heartbeat") retryOpts := base.DefaultRetryOptions() @@ -632,9 +630,17 @@ func (nl *NodeLiveness) StartHeartbeat( // Retry heartbeat in the event the conditional put fails. for r := retry.StartWithCtx(ctx, retryOpts); r.Next(); { oldLiveness, err := nl.Self() - if err != nil && !errors.Is(err, ErrNoLivenessRecord) { + if err != nil && !errors.Is(err, errLivenessRecordCacheMiss) { log.Errorf(ctx, "unexpected error getting liveness: %+v", err) } + if errors.Is(err, errLivenessRecordCacheMiss) { + nodeID := nl.gossip.NodeID.Get() + oldLivenessRec, err := nl.getLivenessRecordFromKV(ctx, nodeID) + if err != nil { + return err + } + oldLiveness = oldLivenessRec.Liveness + } if err := nl.heartbeatInternal(ctx, oldLiveness, incrementEpoch); err != nil { if errors.Is(err, ErrEpochIncremented) { log.Infof(ctx, "%s; retrying", err) @@ -793,67 +799,13 @@ func (nl *NodeLiveness) heartbeatInternal( } } - // Let's compute what our new liveness record should be. - var newLiveness kvserverpb.Liveness - if oldLiveness != (kvserverpb.Liveness{}) { - // Start off with our existing view of liveness. - newLiveness = oldLiveness - } else { - // We haven't seen our own liveness record yet[1]. This happens when - // we're heartbeating for the very first time. Let's retrieve it from KV - // before proceeding. - // - // [1]: Elsewhere we maintain the invariant that there always exist a - // liveness record for every given node. See the join RPC and - // WriteInitialClusterData for where that's done. - kv, err := nl.db.Get(ctx, keys.NodeLivenessKey(nodeID)) - if err != nil { - return errors.Wrap(err, "unable to get liveness") - } - - if kv.Value != nil { - // This is the happy path. Let's unpack the liveness record we found - // within KV, and use that to inform what our new liveness should - // be. - if err := kv.Value.GetProto(&oldLiveness); err != nil { - return errors.Wrap(err, "invalid liveness record") - } - - oldLivenessRec := LivenessRecord{ - Liveness: oldLiveness, - raw: kv.Value.TagAndDataBytes(), - } - - // Update our cache with the liveness record we just found. - nl.maybeUpdate(oldLivenessRec) - - newLiveness = oldLiveness - } else { - // This is a "should basically never happen" scenario given our - // invariant around always persisting liveness records on node - // startup. But that was a change we added in 20.2. Though unlikely, - // it's possible to get into the following scenario: - // - // - v20.1 node gets added to v20.1 cluster, and is quickly removed - // before being able to persist its liveness record. - // - The cluster is upgraded to v20.2. - // - The node from earlier is rolled into v20.2, and re-added to the - // cluster. - // - It's never able to successfully heartbeat (it didn't join - // through the join rpc, bootstrap, or gossip). Welp. - // - // Given this possibility, we'll just fall back to creating the - // liveness record here as we did in v20.1 code. - // - // TODO(irfansharif): Remove this once v20.2 is cut. - log.Warningf(ctx, "missing liveness record for n%d; falling back to creating it in-place", nodeID) - newLiveness = kvserverpb.Liveness{ - NodeID: nodeID, - Epoch: 0, // incremented to epoch=1 below as needed - } - } + if oldLiveness == (kvserverpb.Liveness{}) { + log.Fatal(ctx, "invalid old liveness record; found to be empty") } + // Let's compute what our new liveness record should be. Start off with our + // existing view of things. + newLiveness := oldLiveness if incrementEpoch { newLiveness.Epoch++ newLiveness.Draining = false // clear draining field @@ -878,7 +830,7 @@ func (nl *NodeLiveness) heartbeatInternal( } written, err := nl.updateLiveness(ctx, update, func(actual LivenessRecord) error { // Update liveness to actual value on mismatch. - nl.maybeUpdate(actual) + nl.maybeUpdate(ctx, actual) // If the actual liveness is different than expected, but is // considered live, treat the heartbeat as a success. This can @@ -919,12 +871,12 @@ func (nl *NodeLiveness) heartbeatInternal( } log.VEventf(ctx, 1, "heartbeat %+v", written.Expiration) - nl.maybeUpdate(written) + nl.maybeUpdate(ctx, written) nl.metrics.HeartbeatSuccesses.Inc(1) return nil } -// Self returns the liveness record for this node. ErrNoLivenessRecord +// Self returns the liveness record for this node. ErrMissingLivenessRecord // is returned in the event that the node has neither heartbeat its // liveness record successfully, nor received a gossip message containing // a former liveness update on restart. @@ -990,10 +942,10 @@ func (nl *NodeLiveness) GetLivenesses() []kvserverpb.Liveness { } // GetLiveness returns the liveness record for the specified nodeID. -// ErrNoLivenessRecord is returned in the event that nothing is yet known about -// nodeID via liveness gossip. The record returned also includes the raw, -// encoded value that the database has for this liveness record in addition to -// the decoded liveness proto. +// ErrMissingLivenessRecord is returned in the event that nothing is yet known +// about the nodeID via liveness gossip. The record returned also includes the +// raw, encoded value that the database has for this liveness record in addition +// to the decoded liveness proto. func (nl *NodeLiveness) GetLiveness(nodeID roachpb.NodeID) (LivenessRecord, error) { nl.mu.RLock() defer nl.mu.RUnlock() @@ -1004,7 +956,34 @@ func (nl *NodeLiveness) getLivenessLocked(nodeID roachpb.NodeID) (LivenessRecord if l, ok := nl.mu.nodes[nodeID]; ok { return l, nil } - return LivenessRecord{}, ErrNoLivenessRecord + return LivenessRecord{}, errLivenessRecordCacheMiss +} + +// getLivenessRecordFromKV fetches the liveness record from KV for a given node, and +// updates the internal in-memory cache when doing so. +func (nl *NodeLiveness) getLivenessRecordFromKV( + ctx context.Context, nodeID roachpb.NodeID, +) (LivenessRecord, error) { + kv, err := nl.db.Get(ctx, keys.NodeLivenessKey(nodeID)) + if err != nil { + return LivenessRecord{}, errors.Wrap(err, "unable to get liveness") + } + if kv.Value == nil { + log.Fatalf(ctx, "err: %+v", ErrMissingLivenessRecord) + } + var liveness kvserverpb.Liveness + if err := kv.Value.GetProto(&liveness); err != nil { + return LivenessRecord{}, errors.Wrap(err, "invalid liveness record") + } + + livenessRec := LivenessRecord{ + Liveness: liveness, + raw: kv.Value.TagAndDataBytes(), + } + + // Update our cache with the liveness record we just found. + nl.maybeUpdate(ctx, livenessRec) + return livenessRec, nil } // IncrementEpoch is called to attempt to revoke another node's @@ -1054,7 +1033,7 @@ func (nl *NodeLiveness) IncrementEpoch(ctx context.Context, liveness kvserverpb. update.newLiveness.Epoch++ written, err := nl.updateLiveness(ctx, update, func(actual LivenessRecord) error { - nl.maybeUpdate(actual) + nl.maybeUpdate(ctx, actual) if actual.Epoch > liveness.Epoch { return ErrEpochAlreadyIncremented @@ -1068,7 +1047,7 @@ func (nl *NodeLiveness) IncrementEpoch(ctx context.Context, liveness kvserverpb. } log.Infof(ctx, "incremented n%d liveness epoch to %d", written.NodeID, written.Epoch) - nl.maybeUpdate(written) + nl.maybeUpdate(ctx, written) nl.metrics.EpochIncrements.Inc(1) return nil } @@ -1106,7 +1085,9 @@ func (nl *NodeLiveness) RegisterCallback(cb IsLiveCallback) { // This includes the encoded bytes, and it can be used to update the local // cache. func (nl *NodeLiveness) updateLiveness( - ctx context.Context, update livenessUpdate, handleCondFailed func(actual LivenessRecord) error, + ctx context.Context, + update livenessUpdate, + handleCondFailed func(actual LivenessRecord) error, ) (LivenessRecord, error) { for { // Before each attempt, ensure that the context has not expired. @@ -1139,7 +1120,9 @@ func (nl *NodeLiveness) updateLiveness( } func (nl *NodeLiveness) updateLivenessAttempt( - ctx context.Context, update livenessUpdate, handleCondFailed func(actual LivenessRecord) error, + ctx context.Context, + update livenessUpdate, + handleCondFailed func(actual LivenessRecord) error, ) (LivenessRecord, error) { var oldRaw []byte if update.ignoreCache { @@ -1154,10 +1137,10 @@ func (nl *NodeLiveness) updateLivenessAttempt( } l, err := nl.GetLiveness(update.newLiveness.NodeID) - if err != nil && !errors.Is(err, ErrNoLivenessRecord) { + if err != nil { return LivenessRecord{}, err } - if err == nil && l.Liveness != update.oldLiveness { + if l.Liveness != update.oldLiveness { return LivenessRecord{}, handleCondFailed(l) } oldRaw = l.raw @@ -1215,16 +1198,16 @@ func (nl *NodeLiveness) updateLivenessAttempt( // maybeUpdate replaces the liveness (if it appears newer) and invokes the // registered callbacks if the node became live in the process. -func (nl *NodeLiveness) maybeUpdate(new LivenessRecord) { +func (nl *NodeLiveness) maybeUpdate(ctx context.Context, new LivenessRecord) { // An empty new means that we haven't updated anything. if new.Liveness == (kvserverpb.Liveness{}) { - return + log.Fatal(ctx, "invalid new liveness record; found to be empty") } nl.mu.Lock() old := nl.mu.nodes[new.NodeID] - should := shouldReplaceLiveness(old.Liveness, new.Liveness) + should := shouldReplaceLiveness(ctx, old.Liveness, new.Liveness) var callbacks []IsLiveCallback if should { nl.mu.nodes[new.NodeID] = new @@ -1246,9 +1229,9 @@ func (nl *NodeLiveness) maybeUpdate(new LivenessRecord) { // shouldReplaceLiveness checks to see if the new liveness is in fact newer // than the old liveness. -func shouldReplaceLiveness(old, new kvserverpb.Liveness) bool { +func shouldReplaceLiveness(ctx context.Context, old, new kvserverpb.Liveness) bool { if (old == kvserverpb.Liveness{}) { - return true + log.Fatal(ctx, "invalid old liveness record; found to be empty") } // Compare liveness information. If old < new, replace. @@ -1272,12 +1255,13 @@ func shouldReplaceLiveness(old, new kvserverpb.Liveness) bool { // in-memory liveness info up to date. func (nl *NodeLiveness) livenessGossipUpdate(_ string, content roachpb.Value) { var liveness kvserverpb.Liveness + ctx := context.TODO() if err := content.GetProto(&liveness); err != nil { - log.Errorf(context.TODO(), "%v", err) + log.Errorf(ctx, "%v", err) return } - nl.maybeUpdate(LivenessRecord{Liveness: liveness, raw: content.TagAndDataBytes()}) + nl.maybeUpdate(ctx, LivenessRecord{Liveness: liveness, raw: content.TagAndDataBytes()}) } // numLiveNodes is used to populate a metric that tracks the number of live @@ -1300,7 +1284,7 @@ func (nl *NodeLiveness) numLiveNodes() int64 { defer nl.mu.RUnlock() self, err := nl.getLivenessLocked(selfID) - if errors.Is(err, ErrNoLivenessRecord) { + if errors.Is(err, errLivenessRecordCacheMiss) { return 0 } if err != nil { diff --git a/pkg/kv/kvserver/node_liveness_test.go b/pkg/kv/kvserver/node_liveness_test.go index 4fc9aab9d70f..61f3675e0b23 100644 --- a/pkg/kv/kvserver/node_liveness_test.go +++ b/pkg/kv/kvserver/node_liveness_test.go @@ -612,7 +612,15 @@ func TestNodeLivenessGetIsLiveMap(t *testing.T) { // Advance the clock but only heartbeat node 0. mtc.manualClock.Increment(mtc.nodeLivenesses[0].GetLivenessThreshold().Nanoseconds() + 1) - liveness, _ := mtc.nodeLivenesses[0].GetLiveness(mtc.gossips[0].NodeID.Get()) + var liveness kvserver.LivenessRecord + testutils.SucceedsSoon(t, func() error { + livenessRec, err := mtc.nodeLivenesses[0].GetLiveness(mtc.gossips[0].NodeID.Get()) + if err != nil { + return err + } + liveness = livenessRec + return nil + }) testutils.SucceedsSoon(t, func() error { if err := mtc.nodeLivenesses[0].Heartbeat(context.Background(), liveness.Liveness); err != nil { @@ -668,7 +676,15 @@ func TestNodeLivenessGetLivenesses(t *testing.T) { // Advance the clock but only heartbeat node 0. mtc.manualClock.Increment(mtc.nodeLivenesses[0].GetLivenessThreshold().Nanoseconds() + 1) - liveness, _ := mtc.nodeLivenesses[0].GetLiveness(mtc.gossips[0].NodeID.Get()) + var liveness kvserver.LivenessRecord + testutils.SucceedsSoon(t, func() error { + livenessRec, err := mtc.nodeLivenesses[0].GetLiveness(mtc.gossips[0].NodeID.Get()) + if err != nil { + return err + } + liveness = livenessRec + return nil + }) if err := mtc.nodeLivenesses[0].Heartbeat(context.Background(), liveness.Liveness); err != nil { t.Fatal(err) } @@ -1077,8 +1093,12 @@ func testNodeLivenessSetDecommissioning(t *testing.T, decommissionNodeIdx int) { // Verify success on failed update of a liveness record that already has the // given decommissioning setting. + oldLivenessRec, err := callerNodeLiveness.GetLiveness(nodeID) + if err != nil { + t.Fatal(err) + } if _, err := callerNodeLiveness.SetDecommissioningInternal( - ctx, nodeID, kvserver.LivenessRecord{}, kvserverpb.MembershipStatus_ACTIVE, + ctx, oldLivenessRec, kvserverpb.MembershipStatus_ACTIVE, ); err != nil { t.Fatal(err) } @@ -1135,7 +1155,7 @@ func TestNodeLivenessDecommissionAbsent(t *testing.T) { // When the node simply never existed, expect an error. if _, err := mtc.nodeLivenesses[0].SetMembershipStatus( ctx, goneNodeID, kvserverpb.MembershipStatus_DECOMMISSIONING, - ); !errors.Is(err, kvserver.ErrNoLivenessRecord) { + ); !errors.Is(err, kvserver.ErrMissingLivenessRecord) { t.Fatal(err) } diff --git a/pkg/kv/kvserver/node_liveness_unit_test.go b/pkg/kv/kvserver/node_liveness_unit_test.go index 17fc439995dc..9b780ace0777 100644 --- a/pkg/kv/kvserver/node_liveness_unit_test.go +++ b/pkg/kv/kvserver/node_liveness_unit_test.go @@ -11,6 +11,7 @@ package kvserver import ( + "context" "fmt" "testing" @@ -58,8 +59,8 @@ func TestShouldReplaceLiveness(t *testing.T) { }{ { // Epoch update only. - kvserverpb.Liveness{}, l(1, hlc.Timestamp{}, false, "active"), + l(2, hlc.Timestamp{}, false, "active"), yes, }, { @@ -106,7 +107,7 @@ func TestShouldReplaceLiveness(t *testing.T) { }, } { t.Run("", func(t *testing.T) { - if act := shouldReplaceLiveness(test.old, test.new); act != test.exp { + if act := shouldReplaceLiveness(context.Background(), test.old, test.new); act != test.exp { t.Errorf("unexpected update: %+v", test) } })