From 9b2c2be46f00b21e0cc3810ed5aa08d0f82833f5 Mon Sep 17 00:00:00 2001 From: irfan sharif Date: Tue, 8 Sep 2020 23:29:23 -0400 Subject: [PATCH 1/3] server: always create a liveness record before starting up Previously it used to be the case that it was possible for a node to be up and running, and for there to be no corresponding liveness record for it. This was a very transient situation as liveness records are created for a given node as soon as it out its first heartbeat. Still, given that this could take a few seconds, it lent to a lot of complexity in our handling of node liveness where we had to always anticipate the possibility of there being no corresponding liveness record for a given node (and thus creating it if necessary). Having a liveness record for each node always present is a crucial building block for long running migrations (#48843). There the intention is to have the orchestrator process look towards the list of liveness records for an authoritative view of cluster membership. Previously when it was possible for an active member of the cluster to not have a corresponding liveness record (no matter how unlikely or short-lived in practice), we could not generate such a view. --- This is an alternative implementation for #53805. Here we choose to manually write the liveness record for the bootstrapping node when writing initial cluster data. For all other nodes, we do it on the server-side of the join RPC. We're also careful to do it in the legacy codepath when joining a cluster through gossip. Release note: None --- pkg/cmd/roachtest/decommission.go | 15 --- pkg/kv/kvserver/client_test.go | 9 ++ pkg/kv/kvserver/node_liveness.go | 111 ++++++++++++++---- pkg/kv/kvserver/node_liveness_test.go | 43 +++++++ pkg/kv/kvserver/store_bootstrap.go | 25 +++- pkg/server/node.go | 22 +++- pkg/server/node_test.go | 2 + pkg/server/server.go | 8 +- pkg/server/testserver.go | 9 ++ pkg/testutils/serverutils/test_server_shim.go | 4 + 10 files changed, 197 insertions(+), 51 deletions(-) diff --git a/pkg/cmd/roachtest/decommission.go b/pkg/cmd/roachtest/decommission.go index 3617d0a4669c..31ab6c44060d 100644 --- a/pkg/cmd/roachtest/decommission.go +++ b/pkg/cmd/roachtest/decommission.go @@ -309,21 +309,6 @@ func runDecommissionRandomized(ctx context.Context, t *test, c *cluster) { Multiplier: 2, } - // This is a pretty gross hack to let the bootstrap info (cluster ID, - // liveness records) disseminate through the cluster. Since it's no longer - // happening through gossip, it takes a bit longer to happen. We should do - // two things to improve our story here: - // - // - We should opportunistically write to the liveness table when adding a - // node through the Join RPC. This would also simplify the handling of - // empty liveness records (they would no longer exist). - // - We should add roachtest helpers that wait until each node has received - // cluster ID information, and use it in all the tests that need it (which - // may very well be all the tests). - // - // TODO(irfansharif): Do the above. - time.Sleep(30 * time.Second) - // Partially decommission then recommission a random node, from another // random node. Run a couple of status checks while doing so. { diff --git a/pkg/kv/kvserver/client_test.go b/pkg/kv/kvserver/client_test.go index 237440087b38..15424fbe9526 100644 --- a/pkg/kv/kvserver/client_test.go +++ b/pkg/kv/kvserver/client_test.go @@ -1009,6 +1009,15 @@ func (m *multiTestContext) addStore(idx int) { }{ ch: make(chan struct{}), } + if idx != 0 { + // Given multiTestContext does not make use of the join RPC, we have to + // manually write out liveness records for each node to maintain the + // invariant that all nodes have liveness records present before they + // start heartbeating. + if err := m.nodeLivenesses[idx].CreateLivenessRecord(ctx, nodeID); err != nil { + m.t.Fatal(err) + } + } m.nodeLivenesses[idx].StartHeartbeat(ctx, stopper, m.engines[idx:idx+1], func(ctx context.Context) { now := clock.Now() if err := store.WriteLastUpTimestamp(ctx, now); err != nil { diff --git a/pkg/kv/kvserver/node_liveness.go b/pkg/kv/kvserver/node_liveness.go index da0e2bf81b4e..5a1b688db873 100644 --- a/pkg/kv/kvserver/node_liveness.go +++ b/pkg/kv/kvserver/node_liveness.go @@ -450,6 +450,56 @@ type livenessUpdate struct { oldRaw []byte } +// CreateLivenessRecord creates a liveness record for the node specified by the +// given node ID. This is typically used when adding a new node to a running +// cluster, or when bootstrapping a cluster through a given node. +// +// This is a pared down version of StartHeartbeat; it exists only to durably +// persist a liveness to record the node's existence. Nodes will heartbeat their +// records after starting up, and incrementing to epoch=1 when doing so, at +// which point we'll set an appropriate expiration timestamp, gossip the +// liveness record, and update our in-memory representation of it. +// +// NB: An existing liveness record is not overwritten by this method, we return +// an error instead. +func (nl *NodeLiveness) CreateLivenessRecord(ctx context.Context, nodeID roachpb.NodeID) error { + // We start off at epoch=0, entrusting the initial heartbeat to increment it + // to epoch=1 to signal the very first time the node is up and running. + liveness := kvserverpb.Liveness{NodeID: nodeID, Epoch: 0} + + // We skip adding an expiration, we only really care about the liveness + // record existing within KV. + + v := new(roachpb.Value) + if err := nl.db.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error { + b := txn.NewBatch() + key := keys.NodeLivenessKey(nodeID) + if err := v.SetProto(&liveness); err != nil { + log.Fatalf(ctx, "failed to marshall proto: %s", err) + } + // Given we're looking to create a new liveness record here, we don't + // expect to find anything. + b.CPut(key, v, nil) + + // We don't bother adding a gossip trigger, that'll happen with the + // first heartbeat. We still keep it as a 1PC commit to avoid leaving + // write intents. + b.AddRawRequest(&roachpb.EndTxnRequest{ + Commit: true, + Require1PC: true, + }) + return txn.Run(ctx, b) + }); err != nil { + return err + } + + // We'll learn about this liveness record through gossip eventually, so we + // don't bother updating our in-memory view of node liveness. + + log.Infof(ctx, "created liveness record for n%d", nodeID) + return nil +} + func (nl *NodeLiveness) setMembershipStatusInternal( ctx context.Context, nodeID roachpb.NodeID, @@ -461,16 +511,10 @@ func (nl *NodeLiveness) setMembershipStatusInternal( if oldLivenessRec.Liveness == (kvserverpb.Liveness{}) { // Liveness record didn't previously exist, so we create one. // - // TODO(irfansharif): This code feels a bit unwieldy because it's - // possible for a liveness record to not exist previously. It is just - // generally difficult to write it at startup. When a node joins the - // cluster, this completes before it has had a chance to write its - // liveness record. If it gets decommissioned immediately, there won't - // be one yet. The Connect RPC can solve this though, I think? We can - // bootstrap clusters with a liveness record for n1. Any other node at - // some point has to join the cluster for the first time via the Connect - // RPC, which as part of its job can make sure the liveness record - // exists before responding to the new node. + // TODO(irfansharif): The above is now no longer possible. We always + // create one (see CreateLivenessRecord, WriteInitialClusterData) when + // adding a node to the cluster. We should clean up all this logic that + // tries to work around the liveness record possibly not existing. newLiveness = kvserverpb.Liveness{ NodeID: nodeID, Epoch: 1, @@ -587,11 +631,11 @@ func (nl *NodeLiveness) StartHeartbeat( func(ctx context.Context) error { // Retry heartbeat in the event the conditional put fails. for r := retry.StartWithCtx(ctx, retryOpts); r.Next(); { - liveness, err := nl.Self() + oldLiveness, err := nl.Self() if err != nil && !errors.Is(err, ErrNoLivenessRecord) { log.Errorf(ctx, "unexpected error getting liveness: %+v", err) } - if err := nl.heartbeatInternal(ctx, liveness, incrementEpoch); err != nil { + if err := nl.heartbeatInternal(ctx, oldLiveness, incrementEpoch); err != nil { if errors.Is(err, ErrEpochIncremented) { log.Infof(ctx, "%s; retrying", err) continue @@ -737,7 +781,7 @@ func (nl *NodeLiveness) heartbeatInternal( // If we are not intending to increment the node's liveness epoch, detect // whether this heartbeat is needed anymore. It is possible that we queued - // for long enough on the sempahore such that other heartbeat attempts ahead + // for long enough on the semaphore such that other heartbeat attempts ahead // of us already incremented the expiration past what we wanted. Note that // if we allowed the heartbeat to proceed in this case, we know that it // would hit a ConditionFailedError and return a errNodeAlreadyLive down @@ -749,20 +793,39 @@ func (nl *NodeLiveness) heartbeatInternal( } } - // Let's compute what our new liveness record should be. - var newLiveness kvserverpb.Liveness if oldLiveness == (kvserverpb.Liveness{}) { - // Liveness record didn't previously exist, so we create one. - newLiveness = kvserverpb.Liveness{ - NodeID: nodeID, - Epoch: 1, + // We don't yet know about our own liveness record (which does exist, we + // maintain the invariant that there's always a liveness record for + // every given node). Let's retrieve it from KV before proceeding. + // + // If we didn't previously know about our liveness record, it indicates + // that we're heartbeating for the very first time. + kv, err := nl.db.Get(ctx, keys.NodeLivenessKey(nodeID)) + if err != nil { + return errors.Wrap(err, "unable to get liveness") } - } else { - newLiveness = oldLiveness - if incrementEpoch { - newLiveness.Epoch++ - newLiveness.Draining = false // Clear draining field. + if kv.Value == nil { + return ErrNoLivenessRecord + } + if err := kv.Value.GetProto(&oldLiveness); err != nil { + return errors.Wrap(err, "invalid liveness record") } + + oldLivenessRec := LivenessRecord{ + Liveness: oldLiveness, + raw: kv.Value.TagAndDataBytes(), + } + + // Offer it to make sure that when we actually try to update the + // liveness, the previous view is correct. + nl.maybeUpdate(oldLivenessRec) + } + + // Let's compute what our new liveness record should be. + newLiveness := oldLiveness + if incrementEpoch { + newLiveness.Epoch++ + newLiveness.Draining = false // clear draining field } // Grab a new clock reading to compute the new expiration time, diff --git a/pkg/kv/kvserver/node_liveness_test.go b/pkg/kv/kvserver/node_liveness_test.go index e628cca8e51b..4fc9aab9d70f 100644 --- a/pkg/kv/kvserver/node_liveness_test.go +++ b/pkg/kv/kvserver/node_liveness_test.go @@ -159,6 +159,49 @@ func TestNodeLivenessInitialIncrement(t *testing.T) { verifyEpochIncremented(t, mtc, 0) } +// TestNodeLivenessAppearsAtStart tests that liveness records are written right +// when nodes are added to the cluster (during bootstrap, and when connecting to +// a bootstrapped node). The test verifies that the liveness records found are +// what we expect them to be. +func TestNodeLivenessAppearsAtStart(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + ctx := context.Background() + tc := testcluster.StartTestCluster(t, 3, base.TestClusterArgs{}) + defer tc.Stopper().Stop(ctx) + + // At this point StartTestCluster has waited for all nodes to become live. + + // Verify liveness records exist for all nodes. + for i := 0; i < tc.NumServers(); i++ { + nodeID := tc.Server(i).NodeID() + nl := tc.Server(i).NodeLiveness().(*kvserver.NodeLiveness) + + if live, err := nl.IsLive(nodeID); err != nil { + t.Fatal(err) + } else if !live { + t.Fatalf("node %d not live", nodeID) + } + + livenessRec, err := nl.GetLiveness(nodeID) + if err != nil { + t.Fatal(err) + } + if livenessRec.NodeID != nodeID { + t.Fatalf("expected node ID %d, got %d", nodeID, livenessRec.NodeID) + } + // We expect epoch=1 as nodes first create a liveness record at epoch=0, + // and then increment it during their first heartbeat. + if livenessRec.Epoch != 1 { + t.Fatalf("expected epoch=1, got epoch=%d", livenessRec.Epoch) + } + if !livenessRec.Membership.Active() { + t.Fatalf("expected membership=active, got membership=%s", livenessRec.Membership) + } + } +} + func verifyEpochIncremented(t *testing.T, mtc *multiTestContext, nodeIdx int) { testutils.SucceedsSoon(t, func() error { liveness, err := mtc.nodeLivenesses[nodeIdx].GetLiveness(mtc.gossips[nodeIdx].NodeID.Get()) diff --git a/pkg/kv/kvserver/store_bootstrap.go b/pkg/kv/kvserver/store_bootstrap.go index 3520985f3214..facd19a247aa 100644 --- a/pkg/kv/kvserver/store_bootstrap.go +++ b/pkg/kv/kvserver/store_bootstrap.go @@ -14,6 +14,7 @@ import ( "context" "github.com/cockroachdb/cockroach/pkg/keys" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/rditer" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/stateloader" "github.com/cockroachdb/cockroach/pkg/roachpb" @@ -96,16 +97,34 @@ func WriteInitialClusterData( roachpb.KeyValue{Key: keys.BootstrapVersionKey, Value: bootstrapVal}) // Initialize various sequence generators. - var nodeIDVal, storeIDVal, rangeIDVal roachpb.Value - nodeIDVal.SetInt(1) // This node has id 1. + var nodeIDVal, storeIDVal, rangeIDVal, livenessVal roachpb.Value + + const firstNodeID = 1 // This node has id 1. + nodeIDVal.SetInt(firstNodeID) // The caller will initialize the stores with ids 1..numStores. storeIDVal.SetInt(int64(numStores)) // The last range has id = len(splits) + 1 rangeIDVal.SetInt(int64(len(splits) + 1)) + + // We're the the first node in the cluster, let's seed our liveness record. + // It's crucial that we do to maintain the invariant that there's always a + // liveness record for a given node. We'll do something similar through the + // join RPC when adding new nodes to an already bootstrapped cluster [1]. + // + // We start off at epoch=0; when nodes heartbeat their liveness records for + // the first time it'll get incremented to epoch=1 [2]. + // + // [1]: See `CreateLivenessRecord` and usages for where that happens. + // [2]: See `StartHeartbeat` for where that happens. + livenessRecord := kvserverpb.Liveness{NodeID: 1, Epoch: 0} + if err := livenessVal.SetProto(&livenessRecord); err != nil { + return err + } initialValues = append(initialValues, roachpb.KeyValue{Key: keys.NodeIDGenerator, Value: nodeIDVal}, roachpb.KeyValue{Key: keys.StoreIDGenerator, Value: storeIDVal}, - roachpb.KeyValue{Key: keys.RangeIDGenerator, Value: rangeIDVal}) + roachpb.KeyValue{Key: keys.RangeIDGenerator, Value: rangeIDVal}, + roachpb.KeyValue{Key: keys.NodeLivenessKey(firstNodeID), Value: livenessVal}) // firstRangeMS is going to accumulate the stats for the first range, as we // write the meta records for all the other ranges. diff --git a/pkg/server/node.go b/pkg/server/node.go index 390ec8efffa8..6add213574fe 100644 --- a/pkg/server/node.go +++ b/pkg/server/node.go @@ -377,6 +377,12 @@ func (n *Node) start( log.Infof(ctxWithSpan, "new node allocated ID %d", newID) span.Finish() nodeID = newID + + // We're joining via gossip, so we don't have a liveness record for + // ourselves yet. Let's create one while here. + if err := n.storeCfg.NodeLiveness.CreateLivenessRecord(ctx, nodeID); err != nil { + return err + } } // Inform the RPC context of the node ID. @@ -1128,10 +1134,6 @@ func (n *Node) GossipSubscription( // Join implements the roachpb.InternalServer service. This is the // "connectivity" API; individual CRDB servers are passed in a --join list and // the join targets are addressed through this API. -// -// TODO(irfansharif): Perhaps we could opportunistically create a liveness -// record here so as to no longer have to worry about the liveness record not -// existing for a given node. func (n *Node) Join( ctx context.Context, req *roachpb.JoinNodeRequest, ) (*roachpb.JoinNodeResponse, error) { @@ -1153,6 +1155,18 @@ func (n *Node) Join( return nil, err } + // We create a liveness record here for the joining node while here. We do + // so to maintain the invariant that there's always a liveness record for a + // given node. See `WriteInitialClusterData` for the other codepath where we + // manually create a liveness record to maintain this same invariant. + // + // NB: This invariant will be required for when we introduce long running + // migrations. See https://github.com/cockroachdb/cockroach/pull/48843 for + // details. + if err := n.storeCfg.NodeLiveness.CreateLivenessRecord(ctx, nodeID); err != nil { + return nil, err + } + log.Infof(ctx, "allocated IDs: n%d, s%d", nodeID, storeID) return &roachpb.JoinNodeResponse{ diff --git a/pkg/server/node_test.go b/pkg/server/node_test.go index 3bff2249cf80..5452a2e59447 100644 --- a/pkg/server/node_test.go +++ b/pkg/server/node_test.go @@ -77,6 +77,7 @@ func TestBootstrapCluster(t *testing.T) { for _, kv := range res.KVs { foundKeys = append(foundKeys, kv.Key) } + const firstNodeID = 1 var expectedKeys = keySlice{ testutils.MakeKey(roachpb.Key("\x02"), roachpb.KeyMax), testutils.MakeKey(roachpb.Key("\x03"), roachpb.KeyMax), @@ -84,6 +85,7 @@ func TestBootstrapCluster(t *testing.T) { roachpb.Key("\x04node-idgen"), roachpb.Key("\x04range-idgen"), roachpb.Key("\x04store-idgen"), + keys.NodeLivenessKey(firstNodeID), } for _, splitKey := range config.StaticSplits() { meta2Key := keys.RangeMetaKey(splitKey) diff --git a/pkg/server/server.go b/pkg/server/server.go index 40aac91500f9..1bcabf8bbbc7 100644 --- a/pkg/server/server.go +++ b/pkg/server/server.go @@ -1401,11 +1401,9 @@ func (s *Server) Start(ctx context.Context) error { // one, make sure it's the clusterID we already know (and are guaranteed to // know) at this point. If it's not the same, explode. // - // TODO(tbg): remove this when we have changed ServeAndWait() to join an - // existing cluster via a one-off RPC, at which point we can create gossip - // (and thus the RPC layer) only after the clusterID is already known. We - // can then rely on the RPC layer's protection against cross-cluster - // communication. + // TODO(irfansharif): The above is no longer applicable; in 21.1 we can + // always assume that the RPC layer will always get set up after having + // found out what the cluster ID is. The checks below can be removed then. { // We populated this above, so it should still be set. This is just to // demonstrate that we're not doing anything functional here (and to diff --git a/pkg/server/testserver.go b/pkg/server/testserver.go index 3d41f08ac934..84c57e2dc0cf 100644 --- a/pkg/server/testserver.go +++ b/pkg/server/testserver.go @@ -342,6 +342,15 @@ func (ts *TestServer) MigrationManager() interface{} { return nil } +// NodeLiveness exposes the NodeLiveness instance used by the TestServer as an +// interface{}. +func (ts *TestServer) NodeLiveness() interface{} { + if ts != nil { + return ts.nodeLiveness + } + return nil +} + // RPCContext returns the rpc context used by the TestServer. func (ts *TestServer) RPCContext() *rpc.Context { if ts != nil { diff --git a/pkg/testutils/serverutils/test_server_shim.go b/pkg/testutils/serverutils/test_server_shim.go index fe5108e9bf35..39edd305316d 100644 --- a/pkg/testutils/serverutils/test_server_shim.go +++ b/pkg/testutils/serverutils/test_server_shim.go @@ -110,6 +110,10 @@ type TestServerInterface interface { // MigrationManager returns the *jobs.Registry as an interface{}. MigrationManager() interface{} + // NodeLiveness exposes the NodeLiveness instance used by the TestServer as an + // interface{}. + NodeLiveness() interface{} + // SetDistSQLSpanResolver changes the SpanResolver used for DistSQL inside the // server's executor. The argument must be a physicalplan.SpanResolver // instance. From c3ce94bc9ceb61b33088b6c58cea271ecdfbf87d Mon Sep 17 00:00:00 2001 From: irfan sharif Date: Thu, 10 Sep 2020 18:12:31 -0400 Subject: [PATCH 2/3] localtestcluster: re-order setting of gossip descriptor The heartbeat loop depends on gossip to retrieve the node ID. When stressing a few tests that make use of LocalTestCluster, I was seeing empty liveness records for empty node IDs being heartbeated. By re-ordering things as such we bring it closer to the Server initialization ordering. Release note: None --- pkg/testutils/localtestcluster/local_test_cluster.go | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/pkg/testutils/localtestcluster/local_test_cluster.go b/pkg/testutils/localtestcluster/local_test_cluster.go index df1e108a9f10..258139bf321f 100644 --- a/pkg/testutils/localtestcluster/local_test_cluster.go +++ b/pkg/testutils/localtestcluster/local_test_cluster.go @@ -221,6 +221,13 @@ func (ltc *LocalTestCluster) Start(t testing.TB, baseCtx *base.Config, initFacto t.Fatalf("unable to start local test cluster: %s", err) } + // The heartbeat loop depends on gossip to retrieve the node ID, so we're + // sure to set it first. + nc.Set(ctx, nodeDesc.NodeID) + if err := ltc.Gossip.SetNodeDescriptor(nodeDesc); err != nil { + t.Fatalf("unable to set node descriptor: %s", err) + } + if !ltc.DisableLivenessHeartbeat { cfg.NodeLiveness.StartHeartbeat(ctx, ltc.stopper, []storage.Engine{ltc.Eng}, nil /* alive */) } @@ -230,10 +237,6 @@ func (ltc *LocalTestCluster) Start(t testing.TB, baseCtx *base.Config, initFacto } ltc.Stores.AddStore(ltc.Store) - nc.Set(ctx, nodeDesc.NodeID) - if err := ltc.Gossip.SetNodeDescriptor(nodeDesc); err != nil { - t.Fatalf("unable to set node descriptor: %s", err) - } ltc.Cfg = cfg } From 683f7137fb6c3970894d0846017b4e58be0b8fea Mon Sep 17 00:00:00 2001 From: irfan sharif Date: Thu, 10 Sep 2020 16:54:34 -0400 Subject: [PATCH 3/3] kvserver: address migration concern with node liveness In #53842 we introduced a change to always persist a liveness record on start up. As part of that change, we refactored how the liveness heartbeat codepath dealt with missing liveness records: it knew to fetch it from KV given we were now maintaining the invariant that it would always be present. Except that wasn't necessarily true, as demonstrated by the following scenario: ``` // - v20.1 node gets added to v20.1 cluster, and is quickly removed // before being able to persist its liveness record. // - The cluster is upgraded to v20.2. // - The node from earlier is rolled into v20.2, and re-added to the // cluster. // - It's never able to successfully heartbeat (it didn't join // through the join rpc, bootstrap, or gossip). Welp. ``` Though admittedly unlikely, we should handle it all the same instead of simply erroring out. We'll just fall back to creating the liveness record in-place as we did in v20.1 code. We can remove this fallback in 21.1 code. Release note: None --- pkg/kv/kvserver/node_liveness.go | 62 +++++++++++++++++++++++--------- 1 file changed, 46 insertions(+), 16 deletions(-) diff --git a/pkg/kv/kvserver/node_liveness.go b/pkg/kv/kvserver/node_liveness.go index 5a1b688db873..06af2f312940 100644 --- a/pkg/kv/kvserver/node_liveness.go +++ b/pkg/kv/kvserver/node_liveness.go @@ -793,7 +793,12 @@ func (nl *NodeLiveness) heartbeatInternal( } } - if oldLiveness == (kvserverpb.Liveness{}) { + // Let's compute what our new liveness record should be. + var newLiveness kvserverpb.Liveness + if oldLiveness != (kvserverpb.Liveness{}) { + // Start off with our existing view of liveness. + newLiveness = oldLiveness + } else { // We don't yet know about our own liveness record (which does exist, we // maintain the invariant that there's always a liveness record for // every given node). Let's retrieve it from KV before proceeding. @@ -804,25 +809,50 @@ func (nl *NodeLiveness) heartbeatInternal( if err != nil { return errors.Wrap(err, "unable to get liveness") } - if kv.Value == nil { - return ErrNoLivenessRecord - } - if err := kv.Value.GetProto(&oldLiveness); err != nil { - return errors.Wrap(err, "invalid liveness record") - } - oldLivenessRec := LivenessRecord{ - Liveness: oldLiveness, - raw: kv.Value.TagAndDataBytes(), - } + if kv.Value != nil { + // This is the happy path. Let's unpack the liveness record we found + // within KV, and use that to inform what our new liveness should + // be. + if err := kv.Value.GetProto(&oldLiveness); err != nil { + return errors.Wrap(err, "invalid liveness record") + } - // Offer it to make sure that when we actually try to update the - // liveness, the previous view is correct. - nl.maybeUpdate(oldLivenessRec) + oldLivenessRec := LivenessRecord{ + Liveness: oldLiveness, + raw: kv.Value.TagAndDataBytes(), + } + + // Update our cache with the liveness record we just found. + nl.maybeUpdate(oldLivenessRec) + + newLiveness = oldLiveness + } else { + // This is a "should basically never happen" scenario given our + // invariant around always persisting liveness records on node + // startup. But that was a change we added in 20.2. Though unlikely, + // it's possible to get into the following scenario: + // + // - v20.1 node gets added to v20.1 cluster, and is quickly removed + // before being able to persist its liveness record. + // - The cluster is upgraded to v20.2. + // - The node from earlier is rolled into v20.2, and re-added to the + // cluster. + // - It's never able to successfully heartbeat (it didn't join + // through the join rpc, bootstrap, or gossip). Welp. + // + // Given this possibility, we'll just fall back to creating the + // liveness record here as we did in v20.1 code. + // + // TODO(irfansharif): Remove this once v20.2 is cut. + log.Warningf(ctx, "missing liveness record for n%d; falling back to creating it in-place", nodeID) + newLiveness = kvserverpb.Liveness{ + NodeID: nodeID, + Epoch: 0, // incremented to epoch=1 below as needed + } + } } - // Let's compute what our new liveness record should be. - newLiveness := oldLiveness if incrementEpoch { newLiveness.Epoch++ newLiveness.Draining = false // clear draining field