diff --git a/pkg/kv/kvserver/client_raft_test.go b/pkg/kv/kvserver/client_raft_test.go index 7f2efb2f0b9b..c4fbabd959fb 100644 --- a/pkg/kv/kvserver/client_raft_test.go +++ b/pkg/kv/kvserver/client_raft_test.go @@ -4032,7 +4032,7 @@ func TestRangeQuiescence(t *testing.T) { }) defer tc.Stopper().Stop(ctx) - pauseNodeLivenessHeartbeatLoopsTC(tc) + pauseNodeLivenessHeartbeatLoops(tc) key := tc.ScratchRange(t) tc.AddVotersOrFatal(t, key, tc.Targets(1, 2)...) diff --git a/pkg/kv/kvserver/node_liveness_test.go b/pkg/kv/kvserver/node_liveness_test.go index c3403ff87faa..4c3bea015459 100644 --- a/pkg/kv/kvserver/node_liveness_test.go +++ b/pkg/kv/kvserver/node_liveness_test.go @@ -14,9 +14,11 @@ import ( "context" "reflect" "sort" + "strconv" "sync/atomic" "testing" + "github.com/cockroachdb/cockroach/pkg/base" "github.com/cockroachdb/cockroach/pkg/gossip" "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/kv/kvserver" @@ -24,8 +26,11 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvserver/liveness" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/liveness/livenesspb" "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/server" "github.com/cockroachdb/cockroach/pkg/testutils" + "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" "github.com/cockroachdb/cockroach/pkg/testutils/testcluster" + "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/syncutil" @@ -35,42 +40,33 @@ import ( "golang.org/x/sync/errgroup" ) -func verifyLiveness(t *testing.T, mtc *multiTestContext) { +func verifyLiveness(t *testing.T, tc *testcluster.TestCluster) { testutils.SucceedsSoon(t, func() error { - for i, nl := range mtc.nodeLivenesses { - for _, g := range mtc.gossips { - live, err := nl.IsLive(g.NodeID.Get()) - if err != nil { - return err - } else if !live { - return errors.Errorf("node %d not live", g.NodeID.Get()) - } - } - if a, e := nl.Metrics().LiveNodes.Value(), int64(len(mtc.nodeLivenesses)); a != e { - return errors.Errorf("expected node %d's LiveNodes metric to be %d; got %d", - mtc.gossips[i].NodeID.Get(), e, a) - } + for _, s := range tc.Servers { + return verifyLivenessServer(s, int64(len(tc.Servers))) } return nil }) } - -func pauseNodeLivenessHeartbeatLoops(mtc *multiTestContext) func() { - var enableFns []func() - for _, nl := range mtc.nodeLivenesses { - enableFns = append(enableFns, nl.PauseHeartbeatLoopForTest()) +func verifyLivenessServer(s *server.TestServer, numServers int64) error { + nl := s.NodeLiveness().(*liveness.NodeLiveness) + live, err := nl.IsLive(s.Gossip().NodeID.Get()) + if err != nil { + return err + } else if !live { + return errors.Errorf("node %d not live", s.Gossip().NodeID.Get()) } - return func() { - for _, fn := range enableFns { - fn() - } + if a, e := nl.Metrics().LiveNodes.Value(), numServers; a != e { + return errors.Errorf("expected node %d's LiveNodes metric to be %d; got %d", + s.Gossip().NodeID.Get(), e, a) } + return nil } -func pauseNodeLivenessHeartbeatLoopsTC(tc *testcluster.TestCluster) func() { +func pauseNodeLivenessHeartbeatLoops(tc *testcluster.TestCluster) func() { var enableFns []func() - for _, server := range tc.Servers { - enableFns = append(enableFns, server.NodeLiveness().(*liveness.NodeLiveness).PauseHeartbeatLoopForTest()) + for _, s := range tc.Servers { + enableFns = append(enableFns, s.NodeLiveness().(*liveness.NodeLiveness).PauseHeartbeatLoopForTest()) } return func() { for _, fn := range enableFns { @@ -78,22 +74,35 @@ func pauseNodeLivenessHeartbeatLoopsTC(tc *testcluster.TestCluster) func() { } } } - func TestNodeLiveness(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) - mtc := &multiTestContext{} - defer mtc.Stop() - mtc.Start(t, 3) + + ctx := context.Background() + manualClock := hlc.NewHybridManualClock() + tc := testcluster.StartTestCluster(t, 3, + base.TestClusterArgs{ + ReplicationMode: base.ReplicationManual, + ServerArgs: base.TestServerArgs{ + Knobs: base.TestingKnobs{ + Server: &server.TestingKnobs{ + ClockSource: manualClock.UnixNano, + }, + }, + }, + }) + defer tc.Stopper().Stop(ctx) // Verify liveness of all nodes for all nodes. - verifyLiveness(t, mtc) - pauseNodeLivenessHeartbeatLoops(mtc) + verifyLiveness(t, tc) + pauseNodeLivenessHeartbeatLoops(tc) // Advance clock past the liveness threshold to verify IsLive becomes false. - mtc.manualClock.Increment(mtc.nodeLivenesses[0].GetLivenessThreshold().Nanoseconds() + 1) - for idx, nl := range mtc.nodeLivenesses { - nodeID := mtc.gossips[idx].NodeID.Get() + manualClock.Increment(tc.Servers[0].NodeLiveness().(*liveness.NodeLiveness).GetLivenessThreshold().Nanoseconds() + 1) + + for _, s := range tc.Servers { + nl := s.NodeLiveness().(*liveness.NodeLiveness) + nodeID := s.Gossip().NodeID.Get() live, err := nl.IsLive(nodeID) if err != nil { t.Error(err) @@ -109,7 +118,8 @@ func TestNodeLiveness(t *testing.T) { }) } // Trigger a manual heartbeat and verify liveness is reestablished. - for _, nl := range mtc.nodeLivenesses { + for _, s := range tc.Servers { + nl := s.NodeLiveness().(*liveness.NodeLiveness) l, ok := nl.Self() assert.True(t, ok) for { @@ -125,10 +135,11 @@ func TestNodeLiveness(t *testing.T) { t.Fatal(err) } } - verifyLiveness(t, mtc) + verifyLiveness(t, tc) // Verify metrics counts. - for i, nl := range mtc.nodeLivenesses { + for i, s := range tc.Servers { + nl := s.NodeLiveness().(*liveness.NodeLiveness) if c := nl.Metrics().HeartbeatSuccesses.Count(); c < 2 { t.Errorf("node %d: expected metrics count >= 2; got %d", (i + 1), c) } @@ -138,33 +149,52 @@ func TestNodeLiveness(t *testing.T) { func TestNodeLivenessInitialIncrement(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) - mtc := &multiTestContext{} - defer mtc.Stop() - mtc.Start(t, 1) + + stickyEngineRegistry := server.NewStickyInMemEnginesRegistry() + defer stickyEngineRegistry.CloseAllStickyInMemEngines() + + ctx := context.Background() + tc := testcluster.StartTestCluster(t, 1, + base.TestClusterArgs{ + ReplicationMode: base.ReplicationManual, + ServerArgs: base.TestServerArgs{ + StoreSpecs: []base.StoreSpec{ + { + InMemory: true, + StickyInMemoryEngineID: "1", + }, + }, + Knobs: base.TestingKnobs{ + Server: &server.TestingKnobs{ + StickyEngineRegistry: stickyEngineRegistry, + }, + }, + }, + }) + defer tc.Stopper().Stop(ctx) // Verify liveness of all nodes for all nodes. - verifyLiveness(t, mtc) + verifyLiveness(t, tc) - liveness, ok := mtc.nodeLivenesses[0].GetLiveness(mtc.gossips[0].NodeID.Get()) + nl, ok := tc.Servers[0].NodeLiveness().(*liveness.NodeLiveness).GetLiveness(tc.Servers[0].Gossip().NodeID.Get()) assert.True(t, ok) - if liveness.Epoch != 1 { - t.Errorf("expected epoch to be set to 1 initially; got %d", liveness.Epoch) + if nl.Epoch != 1 { + t.Errorf("expected epoch to be set to 1 initially; got %d", nl.Epoch) } // Restart the node and verify the epoch is incremented with initial heartbeat. - mtc.stopStore(0) - mtc.restartStore(0) - verifyEpochIncremented(t, mtc, 0) + require.NoError(t, tc.Restart()) + verifyEpochIncremented(t, tc, 0) } -func verifyEpochIncremented(t *testing.T, mtc *multiTestContext, nodeIdx int) { +func verifyEpochIncremented(t *testing.T, tc *testcluster.TestCluster, nodeIdx int) { testutils.SucceedsSoon(t, func() error { - liveness, ok := mtc.nodeLivenesses[nodeIdx].GetLiveness(mtc.gossips[nodeIdx].NodeID.Get()) + liv, ok := tc.Servers[nodeIdx].NodeLiveness().(*liveness.NodeLiveness).GetLiveness(tc.Servers[nodeIdx].Gossip().NodeID.Get()) if !ok { return errors.New("liveness not found") } - if liveness.Epoch < 2 { - return errors.Errorf("expected epoch to be >=2 on restart but was %d", liveness.Epoch) + if liv.Epoch < 2 { + return errors.Errorf("expected epoch to be >=2 on restart but was %d", liv.Epoch) } return nil }) @@ -178,18 +208,22 @@ func TestRedundantNodeLivenessHeartbeatsAvoided(t *testing.T) { defer log.Scope(t).Close(t) ctx := context.Background() - mtc := &multiTestContext{} - defer mtc.Stop() - mtc.Start(t, 1) - nl := mtc.nodeLivenesses[0] - nlActive, _ := mtc.storeConfig.NodeLivenessDurations() + serv, _, _ := serverutils.StartServer(t, base.TestServerArgs{}) + s := serv.(*server.TestServer) + store, err := s.Stores().GetStore(s.GetFirstStoreID()) + require.NoError(t, err) + defer s.Stopper().Stop(ctx) - // Verify liveness of all nodes for all nodes. - verifyLiveness(t, mtc) + nl := s.NodeLiveness().(*liveness.NodeLiveness) + nlActive, _ := store.GetStoreConfig().NodeLivenessDurations() + + testutils.SucceedsSoon(t, func() error { + return verifyLivenessServer(s, 1) + }) nl.PauseHeartbeatLoopForTest() enableSync := nl.PauseSynchronousHeartbeatsForTest() - liveness, ok := nl.Self() + nlSelf, ok := nl.Self() assert.True(t, ok) hbBefore := nl.Metrics().HeartbeatSuccesses.Count() require.Equal(t, int64(0), nl.Metrics().HeartbeatsInFlight.Value()) @@ -201,8 +235,8 @@ func TestRedundantNodeLivenessHeartbeatsAvoided(t *testing.T) { const herdSize = 30 for i := 0; i < herdSize; i++ { g.Go(func() error { - before := mtc.clock().Now() - if err := nl.Heartbeat(ctx, liveness); err != nil { + before := s.Clock().Now() + if err := nl.Heartbeat(ctx, nlSelf); err != nil { return err } livenessAfter, found := nl.Self() @@ -236,9 +270,9 @@ func TestRedundantNodeLivenessHeartbeatsAvoided(t *testing.T) { require.Equal(t, int64(0), nl.Metrics().HeartbeatsInFlight.Value()) // Send one more heartbeat. Should update liveness record. - liveness, ok = nl.Self() + nlSelf, ok = nl.Self() require.True(t, ok) - require.NoError(t, nl.Heartbeat(ctx, liveness)) + require.NoError(t, nl.Heartbeat(ctx, nlSelf)) require.Equal(t, hbBefore+2, nl.Metrics().HeartbeatSuccesses.Count()) require.Equal(t, int64(0), nl.Metrics().HeartbeatsInFlight.Value()) } @@ -248,27 +282,40 @@ func TestRedundantNodeLivenessHeartbeatsAvoided(t *testing.T) { func TestNodeIsLiveCallback(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) - mtc := &multiTestContext{} - defer mtc.Stop() - mtc.Start(t, 3) + + ctx := context.Background() + manualClock := hlc.NewHybridManualClock() + tc := testcluster.StartTestCluster(t, 3, + base.TestClusterArgs{ + ReplicationMode: base.ReplicationManual, + ServerArgs: base.TestServerArgs{ + Knobs: base.TestingKnobs{ + Server: &server.TestingKnobs{ + ClockSource: manualClock.UnixNano, + }, + }, + }, + }) + defer tc.Stopper().Stop(ctx) // Verify liveness of all nodes for all nodes. - verifyLiveness(t, mtc) - pauseNodeLivenessHeartbeatLoops(mtc) + verifyLiveness(t, tc) + pauseNodeLivenessHeartbeatLoops(tc) var cbMu syncutil.Mutex cbs := map[roachpb.NodeID]struct{}{} - mtc.nodeLivenesses[0].RegisterCallback(func(l livenesspb.Liveness) { + tc.Servers[0].NodeLiveness().(*liveness.NodeLiveness).RegisterCallback(func(l livenesspb.Liveness) { cbMu.Lock() defer cbMu.Unlock() cbs[l.NodeID] = struct{}{} }) // Advance clock past the liveness threshold. - mtc.manualClock.Increment(mtc.nodeLivenesses[0].GetLivenessThreshold().Nanoseconds() + 1) + manualClock.Increment(tc.Servers[0].NodeLiveness().(*liveness.NodeLiveness).GetLivenessThreshold().Nanoseconds() + 1) // Trigger a manual heartbeat and verify callbacks for each node ID are invoked. - for _, nl := range mtc.nodeLivenesses { + for _, s := range tc.Servers { + nl := s.NodeLiveness().(*liveness.NodeLiveness) l, ok := nl.Self() assert.True(t, ok) if err := nl.Heartbeat(context.Background(), l); err != nil { @@ -279,8 +326,8 @@ func TestNodeIsLiveCallback(t *testing.T) { testutils.SucceedsSoon(t, func() error { cbMu.Lock() defer cbMu.Unlock() - for _, g := range mtc.gossips { - nodeID := g.NodeID.Get() + for _, s := range tc.Servers { + nodeID := s.Gossip().NodeID.Get() if _, ok := cbs[nodeID]; !ok { return errors.Errorf("expected IsLive callback for node %d", nodeID) } @@ -294,23 +341,36 @@ func TestNodeIsLiveCallback(t *testing.T) { func TestNodeHeartbeatCallback(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) - mtc := &multiTestContext{} - defer mtc.Stop() - mtc.Start(t, 3) + + ctx := context.Background() + manualClock := hlc.NewHybridManualClock() + expected := manualClock.UnixNano() + tc := testcluster.StartTestCluster(t, 3, + base.TestClusterArgs{ + ReplicationMode: base.ReplicationManual, + ServerArgs: base.TestServerArgs{ + Knobs: base.TestingKnobs{ + Server: &server.TestingKnobs{ + ClockSource: manualClock.UnixNano, + }, + }, + }, + }) + defer tc.Stopper().Stop(ctx) // Verify liveness of all nodes for all nodes. - verifyLiveness(t, mtc) - pauseNodeLivenessHeartbeatLoops(mtc) + verifyLiveness(t, tc) + pauseNodeLivenessHeartbeatLoops(tc) // Verify that last update time has been set for all nodes. verifyUptimes := func() error { - expected := mtc.clock().Now() - for i, s := range mtc.stores { + for i := range tc.Servers { + s := tc.GetFirstStoreFromServer(t, i) uptm, err := s.ReadLastUpTimestamp(context.Background()) if err != nil { return errors.Wrapf(err, "error reading last up time from store %d", i) } - if a, e := uptm.WallTime, expected.WallTime; a != e { + if a, e := uptm.WallTime, expected; a < e { return errors.Errorf("store %d last uptime = %d; wanted %d", i, a, e) } } @@ -324,8 +384,10 @@ func TestNodeHeartbeatCallback(t *testing.T) { // Advance clock past the liveness threshold and force a manual heartbeat on // all node liveness objects, which should update the last up time for each // store. - mtc.manualClock.Increment(mtc.nodeLivenesses[0].GetLivenessThreshold().Nanoseconds() + 1) - for _, nl := range mtc.nodeLivenesses { + manualClock.Increment(tc.Servers[0].NodeLiveness().(*liveness.NodeLiveness).GetLivenessThreshold().Nanoseconds() + 1) + expected = manualClock.UnixNano() + for _, s := range tc.Servers { + nl := s.NodeLiveness().(*liveness.NodeLiveness) l, ok := nl.Self() assert.True(t, ok) if err := nl.Heartbeat(context.Background(), l); err != nil { @@ -349,32 +411,43 @@ func TestNodeLivenessEpochIncrement(t *testing.T) { defer log.Scope(t).Close(t) ctx := context.Background() - mtc := &multiTestContext{} - defer mtc.Stop() - mtc.Start(t, 2) + manualClock := hlc.NewHybridManualClock() + tc := testcluster.StartTestCluster(t, 2, + base.TestClusterArgs{ + ReplicationMode: base.ReplicationManual, + ServerArgs: base.TestServerArgs{ + Knobs: base.TestingKnobs{ + Server: &server.TestingKnobs{ + ClockSource: manualClock.UnixNano, + }, + }, + }, + }) + defer tc.Stopper().Stop(ctx) - verifyLiveness(t, mtc) - pauseNodeLivenessHeartbeatLoops(mtc) + // Verify liveness of all nodes for all nodes. + verifyLiveness(t, tc) + pauseNodeLivenessHeartbeatLoops(tc) // First try to increment the epoch of a known-live node. - deadNodeID := mtc.gossips[1].NodeID.Get() - oldLiveness, ok := mtc.nodeLivenesses[0].GetLiveness(deadNodeID) + deadNodeID := tc.Servers[1].Gossip().NodeID.Get() + oldLiveness, ok := tc.Servers[0].NodeLiveness().(*liveness.NodeLiveness).GetLiveness(deadNodeID) assert.True(t, ok) - if err := mtc.nodeLivenesses[0].IncrementEpoch( + if err := tc.Servers[0].NodeLiveness().(*liveness.NodeLiveness).IncrementEpoch( ctx, oldLiveness.Liveness, ); !testutils.IsError(err, "cannot increment epoch on live node") { t.Fatalf("expected error incrementing a live node: %+v", err) } // Advance clock past liveness threshold & increment epoch. - mtc.manualClock.Increment(mtc.nodeLivenesses[0].GetLivenessThreshold().Nanoseconds() + 1) - if err := mtc.nodeLivenesses[0].IncrementEpoch(ctx, oldLiveness.Liveness); err != nil { + manualClock.Increment(tc.Servers[0].NodeLiveness().(*liveness.NodeLiveness).GetLivenessThreshold().Nanoseconds() + 1) + if err := tc.Servers[0].NodeLiveness().(*liveness.NodeLiveness).IncrementEpoch(ctx, oldLiveness.Liveness); err != nil { t.Fatalf("unexpected error incrementing a non-live node: %+v", err) } // Verify that the epoch has been advanced. testutils.SucceedsSoon(t, func() error { - newLiveness, ok := mtc.nodeLivenesses[0].GetLiveness(deadNodeID) + newLiveness, ok := tc.Servers[0].NodeLiveness().(*liveness.NodeLiveness).GetLiveness(deadNodeID) if !ok { return errors.New("liveness not found") } @@ -384,19 +457,19 @@ func TestNodeLivenessEpochIncrement(t *testing.T) { if newLiveness.Expiration != oldLiveness.Expiration { return errors.Errorf("expected expiration to remain unchanged") } - if live, err := mtc.nodeLivenesses[0].IsLive(deadNodeID); live || err != nil { + if live, err := tc.Servers[0].NodeLiveness().(*liveness.NodeLiveness).IsLive(deadNodeID); live || err != nil { return errors.Errorf("expected dead node to remain dead after epoch increment %t: %v", live, err) } return nil }) // Verify epoch increment metric count. - if c := mtc.nodeLivenesses[0].Metrics().EpochIncrements.Count(); c != 1 { + if c := tc.Servers[0].NodeLiveness().(*liveness.NodeLiveness).Metrics().EpochIncrements.Count(); c != 1 { t.Errorf("expected epoch increment == 1; got %d", c) } // Verify error on incrementing an already-incremented epoch. - if err := mtc.nodeLivenesses[0].IncrementEpoch( + if err := tc.Servers[0].NodeLiveness().(*liveness.NodeLiveness).IncrementEpoch( ctx, oldLiveness.Liveness, ); !errors.Is(err, liveness.ErrEpochAlreadyIncremented) { t.Fatalf("unexpected error incrementing a non-live node: %+v", err) @@ -404,7 +477,7 @@ func TestNodeLivenessEpochIncrement(t *testing.T) { // Verify error incrementing with a too-high expectation for liveness epoch. oldLiveness.Epoch = 3 - if err := mtc.nodeLivenesses[0].IncrementEpoch( + if err := tc.Servers[0].NodeLiveness().(*liveness.NodeLiveness).IncrementEpoch( ctx, oldLiveness.Liveness, ); !testutils.IsError(err, "unexpected liveness epoch 2; expected >= 3") { t.Fatalf("expected error incrementing with a too-high expected epoch: %+v", err) @@ -416,22 +489,48 @@ func TestNodeLivenessEpochIncrement(t *testing.T) { func TestNodeLivenessRestart(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) - mtc := &multiTestContext{} - defer mtc.Stop() - mtc.Start(t, 2) + + stickyEngineRegistry := server.NewStickyInMemEnginesRegistry() + defer stickyEngineRegistry.CloseAllStickyInMemEngines() + + const numServers int = 2 + stickyServerArgs := make(map[int]base.TestServerArgs) + for i := 0; i < numServers; i++ { + stickyServerArgs[i] = base.TestServerArgs{ + StoreSpecs: []base.StoreSpec{ + { + InMemory: true, + StickyInMemoryEngineID: strconv.FormatInt(int64(i), 10), + }, + }, + Knobs: base.TestingKnobs{ + Server: &server.TestingKnobs{ + StickyEngineRegistry: stickyEngineRegistry, + }, + }, + } + } + + ctx := context.Background() + tc := testcluster.StartTestCluster(t, 2, + base.TestClusterArgs{ + ReplicationMode: base.ReplicationManual, + ServerArgsPerNode: stickyServerArgs, + }) + defer tc.Stopper().Stop(ctx) // After verifying node is in liveness table, stop store. - verifyLiveness(t, mtc) - mtc.stopStore(0) + verifyLiveness(t, tc) + tc.StopServer(1) // Clear the liveness records in store 1's gossip to make sure we're // seeing the liveness record properly gossiped at store startup. var expKeys []string - for _, g := range mtc.gossips { - nodeID := g.NodeID.Get() + for _, s := range tc.Servers { + nodeID := s.Gossip().NodeID.Get() key := gossip.MakeNodeLivenessKey(nodeID) expKeys = append(expKeys, key) - if err := g.AddInfoProto(key, &livenesspb.Liveness{NodeID: nodeID}, 0); err != nil { + if err := s.Gossip().AddInfoProto(key, &livenesspb.Liveness{NodeID: nodeID}, 0); err != nil { t.Fatal(err) } } @@ -443,20 +542,21 @@ func TestNodeLivenessRestart(t *testing.T) { syncutil.Mutex keys []string } - livenessRegex := gossip.MakePrefixPattern(gossip.KeyNodeLivenessPrefix) - mtc.gossips[0].RegisterCallback(livenessRegex, func(key string, _ roachpb.Value) { - keysMu.Lock() - defer keysMu.Unlock() - for _, k := range keysMu.keys { - if k == key { - return - } - } - keysMu.keys = append(keysMu.keys, key) - }) // Restart store and verify gossip contains liveness record for nodes 1&2. - mtc.restartStore(0) + require.NoError(t, tc.RestartServerWithInspect(1, func(s *server.TestServer) { + livenessRegex := gossip.MakePrefixPattern(gossip.KeyNodeLivenessPrefix) + s.Gossip().RegisterCallback(livenessRegex, func(key string, _ roachpb.Value) { + keysMu.Lock() + defer keysMu.Unlock() + for _, k := range keysMu.keys { + if k == key { + return + } + } + keysMu.keys = append(keysMu.keys, key) + }) + })) testutils.SucceedsSoon(t, func() error { keysMu.Lock() defer keysMu.Unlock() @@ -479,26 +579,27 @@ func TestNodeLivenessRestart(t *testing.T) { func TestNodeLivenessSelf(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) - mtc := &multiTestContext{} - defer mtc.Stop() - mtc.Start(t, 1) - g := mtc.gossips[0] - - pauseNodeLivenessHeartbeatLoops(mtc) + ctx := context.Background() + serv, _, _ := serverutils.StartServer(t, base.TestServerArgs{}) + s := serv.(*server.TestServer) + defer s.Stopper().Stop(ctx) + g := s.Gossip() + nl := s.NodeLiveness().(*liveness.NodeLiveness) + nl.PauseHeartbeatLoopForTest() // Verify liveness is properly initialized. This needs to be wrapped in a // SucceedsSoon because node liveness gets initialized via an async gossip // callback. - var liveness liveness.Record + var livenessRecord liveness.Record testutils.SucceedsSoon(t, func() error { - l, ok := mtc.nodeLivenesses[0].GetLiveness(g.NodeID.Get()) + l, ok := nl.GetLiveness(g.NodeID.Get()) if !ok { return errors.New("liveness not found") } - liveness = l + livenessRecord = l return nil }) - if err := mtc.nodeLivenesses[0].Heartbeat(context.Background(), liveness.Liveness); err != nil { + if err := nl.Heartbeat(context.Background(), livenessRecord.Liveness); err != nil { t.Fatal(err) } @@ -510,7 +611,7 @@ func TestNodeLivenessSelf(t *testing.T) { atomic.AddInt32(&count, 1) }) testutils.SucceedsSoon(t, func() error { - fakeBehindLiveness := liveness + fakeBehindLiveness := livenessRecord fakeBehindLiveness.Epoch-- // almost certainly results in zero if err := g.AddInfoProto(key, &fakeBehindLiveness, 0); err != nil { @@ -523,11 +624,10 @@ func TestNodeLivenessSelf(t *testing.T) { }) // Self should not see the fake liveness, but have kept the real one. - l := mtc.nodeLivenesses[0] - lGetRec, ok := l.GetLiveness(g.NodeID.Get()) + lGetRec, ok := nl.GetLiveness(g.NodeID.Get()) require.True(t, ok) lGet := lGetRec.Liveness - lSelf, ok := l.Self() + lSelf, ok := nl.Self() assert.True(t, ok) if !reflect.DeepEqual(lGet, lSelf) { t.Errorf("expected GetLiveness() to return same value as Self(): %+v != %+v", lGet, lSelf) @@ -540,16 +640,29 @@ func TestNodeLivenessSelf(t *testing.T) { func TestNodeLivenessGetIsLiveMap(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) - mtc := &multiTestContext{} - defer mtc.Stop() - mtc.Start(t, 3) - - verifyLiveness(t, mtc) - pauseNodeLivenessHeartbeatLoops(mtc) - lMap := mtc.nodeLivenesses[0].GetIsLiveMap() - l1, _ := mtc.nodeLivenesses[0].GetLiveness(1) - l2, _ := mtc.nodeLivenesses[0].GetLiveness(2) - l3, _ := mtc.nodeLivenesses[0].GetLiveness(3) + + ctx := context.Background() + manualClock := hlc.NewHybridManualClock() + tc := testcluster.StartTestCluster(t, 3, + base.TestClusterArgs{ + ReplicationMode: base.ReplicationManual, + ServerArgs: base.TestServerArgs{ + Knobs: base.TestingKnobs{ + Server: &server.TestingKnobs{ + ClockSource: manualClock.UnixNano, + }, + }, + }, + }) + defer tc.Stopper().Stop(ctx) + + verifyLiveness(t, tc) + pauseNodeLivenessHeartbeatLoops(tc) + nl := tc.Servers[0].NodeLiveness().(*liveness.NodeLiveness) + lMap := nl.GetIsLiveMap() + l1, _ := nl.GetLiveness(1) + l2, _ := nl.GetLiveness(2) + l3, _ := nl.GetLiveness(3) expectedLMap := liveness.IsLiveMap{ 1: {Liveness: l1.Liveness, IsLive: true}, 2: {Liveness: l2.Liveness, IsLive: true}, @@ -560,10 +673,10 @@ func TestNodeLivenessGetIsLiveMap(t *testing.T) { } // Advance the clock but only heartbeat node 0. - mtc.manualClock.Increment(mtc.nodeLivenesses[0].GetLivenessThreshold().Nanoseconds() + 1) + manualClock.Increment(nl.GetLivenessThreshold().Nanoseconds() + 1) var livenessRec liveness.Record testutils.SucceedsSoon(t, func() error { - lr, ok := mtc.nodeLivenesses[0].GetLiveness(mtc.gossips[0].NodeID.Get()) + lr, ok := nl.GetLiveness(tc.Servers[0].Gossip().NodeID.Get()) if !ok { return errors.New("liveness not found") } @@ -572,7 +685,7 @@ func TestNodeLivenessGetIsLiveMap(t *testing.T) { }) testutils.SucceedsSoon(t, func() error { - if err := mtc.nodeLivenesses[0].Heartbeat(context.Background(), livenessRec.Liveness); err != nil { + if err := nl.Heartbeat(context.Background(), livenessRec.Liveness); err != nil { if errors.Is(err, liveness.ErrEpochIncremented) { return err } @@ -582,10 +695,10 @@ func TestNodeLivenessGetIsLiveMap(t *testing.T) { }) // Now verify only node 0 is live. - lMap = mtc.nodeLivenesses[0].GetIsLiveMap() - l1, _ = mtc.nodeLivenesses[0].GetLiveness(1) - l2, _ = mtc.nodeLivenesses[0].GetLiveness(2) - l3, _ = mtc.nodeLivenesses[0].GetLiveness(3) + lMap = nl.GetIsLiveMap() + l1, _ = nl.GetLiveness(1) + l2, _ = nl.GetLiveness(2) + l3, _ = nl.GetLiveness(3) expectedLMap = liveness.IsLiveMap{ 1: {Liveness: l1.Liveness, IsLive: true}, 2: {Liveness: l2.Liveness, IsLive: false}, @@ -599,21 +712,33 @@ func TestNodeLivenessGetIsLiveMap(t *testing.T) { func TestNodeLivenessGetLivenesses(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) - mtc := &multiTestContext{} - defer mtc.Stop() - mtc.Start(t, 3) + ctx := context.Background() + manualClock := hlc.NewHybridManualClock() + testStartTime := manualClock.UnixNano() + tc := testcluster.StartTestCluster(t, 3, + base.TestClusterArgs{ + ReplicationMode: base.ReplicationManual, + ServerArgs: base.TestServerArgs{ + Knobs: base.TestingKnobs{ + Server: &server.TestingKnobs{ + ClockSource: manualClock.UnixNano, + }, + }, + }, + }) + defer tc.Stopper().Stop(ctx) - verifyLiveness(t, mtc) - pauseNodeLivenessHeartbeatLoops(mtc) + verifyLiveness(t, tc) + pauseNodeLivenessHeartbeatLoops(tc) - livenesses := mtc.nodeLivenesses[0].GetLivenesses() + nl := tc.Servers[0].NodeLiveness().(*liveness.NodeLiveness) actualLMapNodes := make(map[roachpb.NodeID]struct{}) - originalExpiration := mtc.clock().PhysicalNow() + mtc.nodeLivenesses[0].GetLivenessThreshold().Nanoseconds() - for _, l := range livenesses { + originalExpiration := testStartTime + nl.GetLivenessThreshold().Nanoseconds() + for _, l := range nl.GetLivenesses() { if a, e := l.Epoch, int64(1); a != e { t.Errorf("liveness record had epoch %d, wanted %d", a, e) } - if a, e := l.Expiration.WallTime, originalExpiration; a != e { + if a, e := l.Expiration.WallTime, originalExpiration; a < e { t.Errorf("liveness record had expiration %d, wanted %d", a, e) } actualLMapNodes[l.NodeID] = struct{}{} @@ -624,32 +749,31 @@ func TestNodeLivenessGetLivenesses(t *testing.T) { } // Advance the clock but only heartbeat node 0. - mtc.manualClock.Increment(mtc.nodeLivenesses[0].GetLivenessThreshold().Nanoseconds() + 1) - var liveness liveness.Record + manualClock.Increment(nl.GetLivenessThreshold().Nanoseconds() + 1) + var livenessRecord liveness.Record testutils.SucceedsSoon(t, func() error { - livenessRec, ok := mtc.nodeLivenesses[0].GetLiveness(mtc.gossips[0].NodeID.Get()) + livenessRec, ok := nl.GetLiveness(tc.Servers[0].Gossip().NodeID.Get()) if !ok { return errors.New("liveness not found") } - liveness = livenessRec + livenessRecord = livenessRec return nil }) - if err := mtc.nodeLivenesses[0].Heartbeat(context.Background(), liveness.Liveness); err != nil { + if err := nl.Heartbeat(context.Background(), livenessRecord.Liveness); err != nil { t.Fatal(err) } // Verify that node liveness receives the change. - livenesses = mtc.nodeLivenesses[0].GetLivenesses() actualLMapNodes = make(map[roachpb.NodeID]struct{}) - for _, l := range livenesses { + for _, l := range nl.GetLivenesses() { if a, e := l.Epoch, int64(1); a != e { t.Errorf("liveness record had epoch %d, wanted %d", a, e) } expectedExpiration := originalExpiration if l.NodeID == 1 { - expectedExpiration += mtc.nodeLivenesses[0].GetLivenessThreshold().Nanoseconds() + 1 + expectedExpiration += nl.GetLivenessThreshold().Nanoseconds() + 1 } - if a, e := l.Expiration.WallTime, expectedExpiration; a != e { + if a, e := l.Expiration.WallTime, expectedExpiration; a < e { t.Errorf("liveness record had expiration %d, wanted %d", a, e) } actualLMapNodes[l.NodeID] = struct{}{} @@ -664,18 +788,29 @@ func TestNodeLivenessGetLivenesses(t *testing.T) { func TestNodeLivenessConcurrentHeartbeats(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) - mtc := &multiTestContext{} - defer mtc.Stop() - mtc.Start(t, 1) - verifyLiveness(t, mtc) - pauseNodeLivenessHeartbeatLoops(mtc) + ctx := context.Background() + manualClock := hlc.NewHybridManualClock() + serv, _, _ := serverutils.StartServer(t, base.TestServerArgs{ + Knobs: base.TestingKnobs{ + Server: &server.TestingKnobs{ + ClockSource: manualClock.UnixNano, + }, + }, + }) + s := serv.(*server.TestServer) + defer s.Stopper().Stop(ctx) + + testutils.SucceedsSoon(t, func() error { + return verifyLivenessServer(s, 1) + }) + nl := s.NodeLiveness().(*liveness.NodeLiveness) + nl.PauseHeartbeatLoopForTest() const concurrency = 10 // Advance clock past the liveness threshold & concurrently heartbeat node. - nl := mtc.nodeLivenesses[0] - mtc.manualClock.Increment(nl.GetLivenessThreshold().Nanoseconds() + 1) + manualClock.Increment(nl.GetLivenessThreshold().Nanoseconds() + 1) l, ok := nl.Self() assert.True(t, ok) errCh := make(chan error, concurrency) @@ -696,19 +831,31 @@ func TestNodeLivenessConcurrentHeartbeats(t *testing.T) { func TestNodeLivenessConcurrentIncrementEpochs(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) - mtc := &multiTestContext{} - defer mtc.Stop() - mtc.Start(t, 2) - verifyLiveness(t, mtc) - pauseNodeLivenessHeartbeatLoops(mtc) + ctx := context.Background() + manualClock := hlc.NewHybridManualClock() + tc := testcluster.StartTestCluster(t, 2, + base.TestClusterArgs{ + ReplicationMode: base.ReplicationManual, + ServerArgs: base.TestServerArgs{ + Knobs: base.TestingKnobs{ + Server: &server.TestingKnobs{ + ClockSource: manualClock.UnixNano, + }, + }, + }, + }) + defer tc.Stopper().Stop(ctx) + + verifyLiveness(t, tc) + pauseNodeLivenessHeartbeatLoops(tc) const concurrency = 10 // Advance the clock and this time increment epoch concurrently for node 1. - nl := mtc.nodeLivenesses[0] - mtc.manualClock.Increment(nl.GetLivenessThreshold().Nanoseconds() + 1) - l, ok := nl.GetLiveness(mtc.gossips[1].NodeID.Get()) + nl := tc.Servers[0].NodeLiveness().(*liveness.NodeLiveness) + manualClock.Increment(nl.GetLivenessThreshold().Nanoseconds() + 1) + l, ok := nl.GetLiveness(tc.Servers[1].Gossip().NodeID.Get()) assert.True(t, ok) errCh := make(chan error, concurrency) for i := 0; i < concurrency; i++ { @@ -729,16 +876,41 @@ func TestNodeLivenessConcurrentIncrementEpochs(t *testing.T) { func TestNodeLivenessSetDraining(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) - mtc := &multiTestContext{} - defer mtc.Stop() - mtc.Start(t, 3) - mtc.initGossipNetwork() - verifyLiveness(t, mtc) + stickyEngineRegistry := server.NewStickyInMemEnginesRegistry() + defer stickyEngineRegistry.CloseAllStickyInMemEngines() + + const numServers int = 3 + stickyServerArgs := make(map[int]base.TestServerArgs) + for i := 0; i < numServers; i++ { + stickyServerArgs[i] = base.TestServerArgs{ + StoreSpecs: []base.StoreSpec{ + { + InMemory: true, + StickyInMemoryEngineID: strconv.FormatInt(int64(i), 10), + }, + }, + Knobs: base.TestingKnobs{ + Server: &server.TestingKnobs{ + StickyEngineRegistry: stickyEngineRegistry, + }, + }, + } + } ctx := context.Background() + tc := testcluster.StartTestCluster(t, numServers, + base.TestClusterArgs{ + ReplicationMode: base.ReplicationManual, + ServerArgsPerNode: stickyServerArgs, + }) + defer tc.Stopper().Stop(ctx) + + // Verify liveness of all nodes for all nodes. + verifyLiveness(t, tc) + drainingNodeIdx := 0 - drainingNodeID := mtc.gossips[drainingNodeIdx].NodeID.Get() + drainingNodeID := tc.Servers[0].Gossip().NodeID.Get() nodeIDAppearsInStoreList := func(id roachpb.NodeID, sl kvserver.StoreList) bool { for _, store := range sl.Stores() { @@ -751,7 +923,7 @@ func TestNodeLivenessSetDraining(t *testing.T) { // Verify success on failed update of a liveness record that already has the // given draining setting. - if err := mtc.nodeLivenesses[drainingNodeIdx].TestingSetDrainingInternal( + if err := tc.Servers[drainingNodeIdx].NodeLiveness().(*liveness.NodeLiveness).TestingSetDrainingInternal( ctx, liveness.Record{Liveness: livenesspb.Liveness{ NodeID: drainingNodeID, }}, false, @@ -759,7 +931,7 @@ func TestNodeLivenessSetDraining(t *testing.T) { t.Fatal(err) } - if err := mtc.nodeLivenesses[drainingNodeIdx].SetDraining(ctx, true /* drain */, nil /* reporter */); err != nil { + if err := tc.Servers[drainingNodeIdx].NodeLiveness().(*liveness.NodeLiveness).SetDraining(ctx, true /* drain */, nil /* reporter */); err != nil { t.Fatal(err) } @@ -769,9 +941,9 @@ func TestNodeLivenessSetDraining(t *testing.T) { // Executed in a retry loop to wait until the new liveness record has // been gossiped to the rest of the cluster. testutils.SucceedsSoon(t, func() error { - for i, sp := range mtc.storePools { - curNodeID := mtc.gossips[i].NodeID.Get() - sl, alive, _ := sp.GetStoreList() + for i, s := range tc.Servers { + curNodeID := s.Gossip().NodeID.Get() + sl, alive, _ := tc.GetFirstStoreFromServer(t, i).GetStoreConfig().StorePool.GetStoreList() if alive != expectedLive { return errors.Errorf( "expected %d live stores but got %d from node %d", @@ -794,8 +966,8 @@ func TestNodeLivenessSetDraining(t *testing.T) { // Stop and restart the store to verify that a restarted server clears the // draining field on the liveness record. - mtc.stopStore(drainingNodeIdx) - mtc.restartStore(drainingNodeIdx) + tc.StopServer(drainingNodeIdx) + require.NoError(t, tc.RestartServer(drainingNodeIdx)) // Restarted node appears once again in the store list. { @@ -803,9 +975,9 @@ func TestNodeLivenessSetDraining(t *testing.T) { // Executed in a retry loop to wait until the new liveness record has // been gossiped to the rest of the cluster. testutils.SucceedsSoon(t, func() error { - for i, sp := range mtc.storePools { - curNodeID := mtc.gossips[i].NodeID.Get() - sl, alive, _ := sp.GetStoreList() + for i, s := range tc.Servers { + curNodeID := s.Gossip().NodeID.Get() + sl, alive, _ := tc.GetFirstStoreFromServer(t, i).GetStoreConfig().StorePool.GetStoreList() if alive != expectedLive { return errors.Errorf( "expected %d live stores but got %d from node %d", @@ -836,8 +1008,7 @@ func TestNodeLivenessRetryAmbiguousResultError(t *testing.T) { var injectedErrorCount int32 injectError.Store(true) - storeCfg := kvserver.TestStoreConfig(nil) - storeCfg.TestingKnobs.EvalKnobs.TestingEvalFilter = func(args kvserverbase.FilterArgs) *roachpb.Error { + testingEvalFilter := func(args kvserverbase.FilterArgs) *roachpb.Error { if _, ok := args.Req.(*roachpb.ConditionalPutRequest); !ok { return nil } @@ -848,16 +1019,28 @@ func TestNodeLivenessRetryAmbiguousResultError(t *testing.T) { } return nil } - mtc := &multiTestContext{ - storeConfig: &storeCfg, - } - mtc.Start(t, 1) - defer mtc.Stop() + ctx := context.Background() + manualClock := hlc.NewHybridManualClock() + serv, _, _ := serverutils.StartServer(t, base.TestServerArgs{ + Knobs: base.TestingKnobs{ + Server: &server.TestingKnobs{ + ClockSource: manualClock.UnixNano, + }, + Store: &kvserver.StoreTestingKnobs{ + EvalKnobs: kvserverbase.BatchEvalTestingKnobs{ + TestingEvalFilter: testingEvalFilter, + }, + }, + }, + }) + s := serv.(*server.TestServer) + defer s.Stopper().Stop(ctx) - // Verify retry of the ambiguous result for heartbeat loop. - verifyLiveness(t, mtc) + testutils.SucceedsSoon(t, func() error { + return verifyLivenessServer(s, 1) + }) + nl := s.NodeLiveness().(*liveness.NodeLiveness) - nl := mtc.nodeLivenesses[0] l, ok := nl.Self() assert.True(t, ok) @@ -871,16 +1054,15 @@ func TestNodeLivenessRetryAmbiguousResultError(t *testing.T) { } } -func verifyNodeIsDecommissioning(t *testing.T, mtc *multiTestContext, nodeID roachpb.NodeID) { +func verifyNodeIsDecommissioning(t *testing.T, tc *testcluster.TestCluster, nodeID roachpb.NodeID) { testutils.SucceedsSoon(t, func() error { - for _, nl := range mtc.nodeLivenesses { - livenesses := nl.GetLivenesses() - for _, liveness := range livenesses { - if liveness.NodeID != nodeID { + for _, s := range tc.Servers { + for _, liv := range s.NodeLiveness().(*liveness.NodeLiveness).GetLivenesses() { + if liv.NodeID != nodeID { continue } - if !liveness.Membership.Decommissioning() { - return errors.Errorf("unexpected Membership value of %v for node %v", liveness.Membership, liveness.NodeID) + if !liv.Membership.Decommissioning() { + return errors.Errorf("unexpected Membership value of %v for node %v", liv.Membership, liv.NodeID) } } } @@ -889,16 +1071,40 @@ func verifyNodeIsDecommissioning(t *testing.T, mtc *multiTestContext, nodeID roa } func testNodeLivenessSetDecommissioning(t *testing.T, decommissionNodeIdx int) { - mtc := &multiTestContext{} - defer mtc.Stop() - mtc.Start(t, 3) - mtc.initGossipNetwork() - - verifyLiveness(t, mtc) + stickyEngineRegistry := server.NewStickyInMemEnginesRegistry() + defer stickyEngineRegistry.CloseAllStickyInMemEngines() + + const numServers int = 3 + stickyServerArgs := make(map[int]base.TestServerArgs) + for i := 0; i < numServers; i++ { + stickyServerArgs[i] = base.TestServerArgs{ + StoreSpecs: []base.StoreSpec{ + { + InMemory: true, + StickyInMemoryEngineID: strconv.FormatInt(int64(i), 10), + }, + }, + Knobs: base.TestingKnobs{ + Server: &server.TestingKnobs{ + StickyEngineRegistry: stickyEngineRegistry, + }, + }, + } + } ctx := context.Background() - callerNodeLiveness := mtc.nodeLivenesses[0] - nodeID := mtc.gossips[decommissionNodeIdx].NodeID.Get() + tc := testcluster.StartTestCluster(t, numServers, + base.TestClusterArgs{ + ReplicationMode: base.ReplicationManual, + ServerArgsPerNode: stickyServerArgs, + }) + defer tc.Stopper().Stop(ctx) + + // Verify liveness of all nodes for all nodes. + verifyLiveness(t, tc) + + callerNodeLiveness := tc.Servers[0].NodeLiveness().(*liveness.NodeLiveness) + nodeID := tc.Servers[decommissionNodeIdx].Gossip().NodeID.Get() // Verify success on failed update of a liveness record that already has the // given decommissioning setting. @@ -915,18 +1121,18 @@ func testNodeLivenessSetDecommissioning(t *testing.T, decommissionNodeIdx int) { ctx, nodeID, livenesspb.MembershipStatus_DECOMMISSIONING); err != nil { t.Fatal(err) } - verifyNodeIsDecommissioning(t, mtc, nodeID) + verifyNodeIsDecommissioning(t, tc, nodeID) // Stop and restart the store to verify that a restarted server retains the // decommissioning field on the liveness record. - mtc.stopStore(decommissionNodeIdx) - mtc.restartStore(decommissionNodeIdx) + tc.StopServer(decommissionNodeIdx) + require.NoError(t, tc.RestartServer(decommissionNodeIdx)) // Wait until store has restarted and published a new heartbeat to ensure not // looking at pre-restart state. Want to be sure test fails if node wiped the // decommission flag. - verifyEpochIncremented(t, mtc, decommissionNodeIdx) - verifyNodeIsDecommissioning(t, mtc, nodeID) + verifyEpochIncremented(t, tc, decommissionNodeIdx) + verifyNodeIsDecommissioning(t, tc, nodeID) } // TestNodeLivenessSetDecommissioning verifies that when decommissioning, a @@ -949,66 +1155,56 @@ func TestNodeLivenessDecommissionAbsent(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) - mtc := &multiTestContext{} - defer mtc.Stop() - mtc.Start(t, 3) - mtc.initGossipNetwork() + ctx := context.Background() + tc := testcluster.StartTestCluster(t, 3, + base.TestClusterArgs{ + ReplicationMode: base.ReplicationManual, + }) + defer tc.Stopper().Stop(ctx) - verifyLiveness(t, mtc) + // Verify liveness of all nodes for all nodes. + verifyLiveness(t, tc) - ctx := context.Background() const goneNodeID = roachpb.NodeID(10000) + nl := tc.Servers[0].NodeLiveness().(*liveness.NodeLiveness) + nl1 := tc.Servers[1].NodeLiveness().(*liveness.NodeLiveness) + nl2 := tc.Servers[1].NodeLiveness().(*liveness.NodeLiveness) // When the node simply never existed, expect an error. - if _, err := mtc.nodeLivenesses[0].SetMembershipStatus( + if _, err := nl.SetMembershipStatus( ctx, goneNodeID, livenesspb.MembershipStatus_DECOMMISSIONING, ); !errors.Is(err, liveness.ErrMissingRecord) { t.Fatal(err) } // Pretend the node was once there but isn't gossiped anywhere. - if err := mtc.dbs[0].CPut(ctx, keys.NodeLivenessKey(goneNodeID), &livenesspb.Liveness{ + if err := tc.Servers[0].DB().CPut(ctx, keys.NodeLivenessKey(goneNodeID), &livenesspb.Liveness{ NodeID: goneNodeID, Epoch: 1, - Expiration: mtc.clock().Now().ToLegacyTimestamp(), + Expiration: tc.Servers[0].Clock().Now().ToLegacyTimestamp(), Membership: livenesspb.MembershipStatus_ACTIVE, }, nil); err != nil { t.Fatal(err) } - // Decommission from second node. - if committed, err := mtc.nodeLivenesses[1].SetMembershipStatus( - ctx, goneNodeID, livenesspb.MembershipStatus_DECOMMISSIONING); err != nil { - t.Fatal(err) - } else if !committed { - t.Fatal("no change committed") + setMembershipStatus := func(nodeLiveness *liveness.NodeLiveness, + status livenesspb.MembershipStatus, shouldCommit bool) { + if committed, err := nodeLiveness.SetMembershipStatus( + ctx, goneNodeID, status); err != nil { + t.Fatal(err) + } else { + require.Equal(t, committed, shouldCommit) + } } + + // Decommission from second node. + setMembershipStatus(nl1, livenesspb.MembershipStatus_DECOMMISSIONING, true) // Re-decommission from first node. - if committed, err := mtc.nodeLivenesses[0].SetMembershipStatus( - ctx, goneNodeID, livenesspb.MembershipStatus_DECOMMISSIONING); err != nil { - t.Fatal(err) - } else if committed { - t.Fatal("spurious change committed") - } + setMembershipStatus(nl, livenesspb.MembershipStatus_DECOMMISSIONING, false) // Recommission from first node. - if committed, err := mtc.nodeLivenesses[0].SetMembershipStatus( - ctx, goneNodeID, livenesspb.MembershipStatus_ACTIVE); err != nil { - t.Fatal(err) - } else if !committed { - t.Fatal("no change committed") - } + setMembershipStatus(nl, livenesspb.MembershipStatus_ACTIVE, true) // Decommission from second node (a second time). - if committed, err := mtc.nodeLivenesses[1].SetMembershipStatus( - ctx, goneNodeID, livenesspb.MembershipStatus_DECOMMISSIONING); err != nil { - t.Fatal(err) - } else if !committed { - t.Fatal("no change committed") - } + setMembershipStatus(nl1, livenesspb.MembershipStatus_DECOMMISSIONING, true) // Recommission from third node. - if committed, err := mtc.nodeLivenesses[2].SetMembershipStatus( - ctx, goneNodeID, livenesspb.MembershipStatus_ACTIVE); err != nil { - t.Fatal(err) - } else if !committed { - t.Fatal("no change committed") - } + setMembershipStatus(nl2, livenesspb.MembershipStatus_ACTIVE, true) } diff --git a/pkg/testutils/testcluster/testcluster.go b/pkg/testutils/testcluster/testcluster.go index 52e72596e6c0..4448c7a094cd 100644 --- a/pkg/testutils/testcluster/testcluster.go +++ b/pkg/testutils/testcluster/testcluster.go @@ -1128,6 +1128,15 @@ func (tc *TestCluster) Restart() error { // RestartServer uses the cached ServerArgs to restart a Server specified by // the passed index. func (tc *TestCluster) RestartServer(idx int) error { + return tc.RestartServerWithInspect(idx, nil) +} + +// RestartServerWithInspect uses the cached ServerArgs to restart a Server +// specified by the passed index. We allow an optional inspect function to be +// passed in that can observe the server once its been re-created but before it's +// bean started. This is useful for tests that want to capture that the startup +// sequence performs the correct actions i.e. that on startup liveness is gossiped. +func (tc *TestCluster) RestartServerWithInspect(idx int, inspect func(s *server.TestServer)) error { if !tc.ServerStopped(idx) { return errors.Errorf("server %d must be stopped before attempting to restart", idx) } @@ -1168,6 +1177,10 @@ func (tc *TestCluster) RestartServer(idx int) error { tc.Servers[idx] = s tc.mu.serverStoppers[idx] = s.Stopper() + if inspect != nil { + inspect(s) + } + if err := srv.Start(); err != nil { return err }