diff --git a/pkg/kv/kvserver/client_raft_helpers_test.go b/pkg/kv/kvserver/client_raft_helpers_test.go index e4cbec366b99..a87e080b1480 100644 --- a/pkg/kv/kvserver/client_raft_helpers_test.go +++ b/pkg/kv/kvserver/client_raft_helpers_test.go @@ -12,6 +12,7 @@ package kvserver_test import ( "context" + "fmt" "github.com/cockroachdb/cockroach/pkg/kv/kvserver" "github.com/cockroachdb/cockroach/pkg/roachpb" @@ -22,7 +23,9 @@ import ( ) type unreliableRaftHandlerFuncs struct { - // If non-nil, can return false to avoid dropping a msg to rangeID. + // If non-nil, can return false to avoid dropping the msg to + // unreliableRaftHandler.rangeID. If nil, all messages pertaining to the + // respective range are dropped. dropReq func(*kvserver.RaftMessageRequest) bool dropHB func(*kvserver.RaftHeartbeat) bool dropResp func(*kvserver.RaftMessageResponse) bool @@ -47,6 +50,7 @@ func noopRaftHandlerFuncs() unreliableRaftHandlerFuncs { // unreliableRaftHandler drops all Raft messages that are addressed to the // specified rangeID, but lets all other messages through. type unreliableRaftHandler struct { + name string rangeID roachpb.RangeID kvserver.RaftMessageHandler unreliableRaftHandlerFuncs @@ -68,9 +72,14 @@ func (h *unreliableRaftHandler) HandleRaftRequest( } } else if req.RangeID == h.rangeID { if h.dropReq == nil || h.dropReq(req) { + var prefix string + if h.name != "" { + prefix = fmt.Sprintf("[%s] ", h.name) + } log.Infof( ctx, - "dropping r%d Raft message %s", + "%sdropping r%d Raft message %s", + prefix, req.RangeID, raft.DescribeMessage(req.Message, func([]byte) string { return "" diff --git a/pkg/kv/kvserver/client_raft_test.go b/pkg/kv/kvserver/client_raft_test.go index aed0ce8c6704..ff12b7010f23 100644 --- a/pkg/kv/kvserver/client_raft_test.go +++ b/pkg/kv/kvserver/client_raft_test.go @@ -36,6 +36,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/rpc" "github.com/cockroachdb/cockroach/pkg/rpc/nodedialer" + "github.com/cockroachdb/cockroach/pkg/server" "github.com/cockroachdb/cockroach/pkg/server/serverpb" "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/storage" @@ -903,6 +904,7 @@ func TestSnapshotAfterTruncationWithUncommittedTail(t *testing.T) { // x x // [1]<---->[2] // + log.Infof(ctx, "test: installing unreliable Raft transports") for _, s := range []int{0, 1, 2} { h := &unreliableRaftHandler{rangeID: 1, RaftMessageHandler: mtc.stores[s]} if s != partStore { @@ -922,6 +924,7 @@ func TestSnapshotAfterTruncationWithUncommittedTail(t *testing.T) { // not succeed before their context is canceled, but they will be appended // to the partitioned replica's Raft log because it is currently the Raft // leader. + log.Infof(ctx, "test: sending writes to partitioned replica") g := ctxgroup.WithContext(ctx) for i := 0; i < 32; i++ { otherKey := roachpb.Key(fmt.Sprintf("other-%d", i)) @@ -942,26 +945,45 @@ func TestSnapshotAfterTruncationWithUncommittedTail(t *testing.T) { } // Transfer the lease to one of the followers and perform a write. The - // partition ensures that this will require a Raft leadership change. - const newLeaderStore = partStore + 1 - newLeaderRepl, err := mtc.stores[newLeaderStore].GetReplica(1) - if err != nil { - t.Fatal(err) - } - newLeaderReplSender := mtc.stores[newLeaderStore].TestSender() - + // partition ensures that this will require a Raft leadership change. It's + // unpredictable which one of the followers will become leader. Only the + // leader will be allowed to acquire the lease (see + // TestSnapshotAfterTruncationWithUncommittedTail), so it's also unpredictable + // who will get the lease. We try repeatedly sending requests to both + // candidates until one of them succeeds. + var nonPartitionedSenders [2]kv.Sender + nonPartitionedSenders[0] = mtc.stores[1].TestSender() + nonPartitionedSenders[1] = mtc.stores[2].TestSender() + + log.Infof(ctx, "test: sending write to transfer lease") incArgs = incrementArgs(key, incB) + var i int + var newLeaderRepl *kvserver.Replica + var newLeaderReplSender kv.Sender testutils.SucceedsSoon(t, func() error { mtc.advanceClock(ctx) - _, pErr := kv.SendWrapped(ctx, newLeaderReplSender, incArgs) + i++ + sender := nonPartitionedSenders[i%2] + _, pErr := kv.SendWrapped(ctx, sender, incArgs) if _, ok := pErr.GetDetail().(*roachpb.NotLeaseHolderError); ok { return pErr.GoError() } else if pErr != nil { t.Fatal(pErr) } + + // A request succeeded, proving that there is a new leader and leaseholder. + // Remember who that is. + newLeaderStoreIdx := 1 + (i % 2) + newLeaderRepl, err = mtc.stores[newLeaderStoreIdx].GetReplica(1) + if err != nil { + t.Fatal(err) + } + newLeaderReplSender = mtc.stores[newLeaderStoreIdx].TestSender() return nil }) + log.Infof(ctx, "test: waiting for values...") mtc.waitForValues(key, []int64{incA, incAB, incAB}) + log.Infof(ctx, "test: waiting for values... done") index, err := newLeaderRepl.GetLastIndex() if err != nil { @@ -970,6 +992,7 @@ func TestSnapshotAfterTruncationWithUncommittedTail(t *testing.T) { // Truncate the log at index+1 (log entries < N are removed, so this // includes the increment). + log.Infof(ctx, "test: truncating log") truncArgs := truncateLogArgs(index+1, 1) testutils.SucceedsSoon(t, func() error { mtc.advanceClock(ctx) @@ -986,6 +1009,7 @@ func TestSnapshotAfterTruncationWithUncommittedTail(t *testing.T) { snapsBefore := snapsMetric.Count() // Remove the partition. Snapshot should follow. + log.Infof(ctx, "test: removing the partition") for _, s := range []int{0, 1, 2} { mtc.transport.Listen(mtc.stores[s].Ident.StoreID, &unreliableRaftHandler{ rangeID: 1, @@ -1024,6 +1048,235 @@ func TestSnapshotAfterTruncationWithUncommittedTail(t *testing.T) { mtc.waitForValues(key, []int64{incABC, incABC, incABC}) } +// TestRequestsOnLaggingReplica tests that requests sent to a replica that's +// behind in log application don't block. The test indirectly verifies that a +// replica that's not the leader does not attempt to acquire a lease and, thus, +// does not block until it figures out that it cannot, in fact, take the lease. +// +// This test relies on follower replicas refusing to forward lease acquisition +// requests to the leader, thereby refusing to acquire a lease. The point of +// this behavior is to prevent replicas that are behind from trying to acquire +// the lease and then blocking traffic for a long time until they find out +// whether they successfully took the lease or not. +func TestRequestsOnLaggingReplica(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + ctx := context.Background() + + clusterArgs := base.TestClusterArgs{ + ReplicationMode: base.ReplicationManual, + ServerArgs: base.TestServerArgs{ + // Reduce the election timeout some to speed up the test. + RaftConfig: base.RaftConfig{RaftElectionTimeoutTicks: 10}, + Knobs: base.TestingKnobs{ + NodeLiveness: kvserver.NodeLivenessTestingKnobs{ + // This test waits for an epoch-based lease to expire, so we're + // setting the liveness duration as low as possible while still + // keeping the test stable. + LivenessDuration: 3000 * time.Millisecond, + RenewalDuration: 1500 * time.Millisecond, + }, + Store: &kvserver.StoreTestingKnobs{ + // We eliminate clock offsets in order to eliminate the stasis period + // of leases, in order to speed up the test. + MaxOffset: time.Nanosecond, + }, + }, + }, + } + + tc := testcluster.StartTestCluster(t, 3, clusterArgs) + defer tc.Stopper().Stop(ctx) + + rngDesc, err := tc.Servers[0].ScratchRangeEx() + require.NoError(t, err) + key := rngDesc.StartKey.AsRawKey() + // Add replicas on all the stores. + tc.AddVotersOrFatal(t, rngDesc.StartKey.AsRawKey(), tc.Target(1), tc.Target(2)) + + { + // Write a value so that the respective key is present in all stores and we + // can increment it again later. + _, err := tc.Server(0).DB().Inc(ctx, key, 1) + require.NoError(t, err) + log.Infof(ctx, "test: waiting for initial values...") + tc.WaitForValues(t, key, []int64{1, 1, 1}) + log.Infof(ctx, "test: waiting for initial values... done") + } + + // Partition the original leader from its followers. We do this by installing + // unreliableRaftHandler listeners on all three Stores. The handler on the + // partitioned store filters out all messages while the handler on the other + // two stores only filters out messages from the partitioned store. The + // configuration looks like: + // + // [0] + // x x + // / \ + // x x + // [1]<---->[2] + // + log.Infof(ctx, "test: partitioning node") + const partitionNodeIdx = 0 + partitionStore := tc.GetFirstStoreFromServer(t, partitionNodeIdx) + partRepl, err := partitionStore.GetReplica(rngDesc.RangeID) + require.NoError(t, err) + partReplDesc, err := partRepl.GetReplicaDescriptor() + require.NoError(t, err) + partitionedStoreSender := partitionStore.TestSender() + const otherStoreIdx = 1 + otherStore := tc.GetFirstStoreFromServer(t, otherStoreIdx) + otherRepl, err := otherStore.GetReplica(rngDesc.RangeID) + require.NoError(t, err) + + for _, i := range []int{0, 1, 2} { + store := tc.GetFirstStoreFromServer(t, i) + h := &unreliableRaftHandler{ + name: fmt.Sprintf("store %d", i), + rangeID: rngDesc.RangeID, + RaftMessageHandler: store, + } + if i != partitionNodeIdx { + // Only filter messages from the partitioned store on the other two + // stores. + h.dropReq = func(req *kvserver.RaftMessageRequest) bool { + return req.FromReplica.StoreID == partRepl.StoreID() + } + h.dropHB = func(hb *kvserver.RaftHeartbeat) bool { + return hb.FromReplicaID == partReplDesc.ReplicaID + } + } + store.Transport().Listen(store.Ident.StoreID, h) + } + + // Stop the heartbeats so that n1's lease can expire. + log.Infof(ctx, "test: suspending heartbeats for n1") + resumeN1Heartbeats := partitionStore.NodeLiveness().PauseAllHeartbeatsForTest() + + // Wait until another replica campaigns and becomes leader, replacing the + // partitioned one. + log.Infof(ctx, "test: waiting for leadership transfer") + testutils.SucceedsSoon(t, func() error { + // Make sure this replica has not inadvertently quiesced. We need the + // replica ticking so that it campaigns. + if otherRepl.IsQuiescent() { + otherRepl.UnquiesceAndWakeLeader() + } + lead := otherRepl.RaftStatus().Lead + if lead == raft.None { + return errors.New("no leader yet") + } + if roachpb.ReplicaID(lead) == partReplDesc.ReplicaID { + return errors.New("partitioned replica is still leader") + } + return nil + }) + + leaderReplicaID := roachpb.ReplicaID(otherRepl.RaftStatus().Lead) + log.Infof(ctx, "test: the leader is replica ID %d", leaderReplicaID) + if leaderReplicaID != 2 && leaderReplicaID != 3 { + t.Fatalf("expected leader to be 1 or 2, was: %d", leaderReplicaID) + } + leaderNodeIdx := int(leaderReplicaID - 1) + leaderNode := tc.Server(leaderNodeIdx).(*server.TestServer) + leaderStore, err := leaderNode.GetStores().(*kvserver.Stores).GetStore(leaderNode.GetFirstStoreID()) + require.NoError(t, err) + + // Wait until the lease expires. + log.Infof(ctx, "test: waiting for lease expiration") + partitionedReplica, err := partitionStore.GetReplica(rngDesc.RangeID) + require.NoError(t, err) + testutils.SucceedsSoon(t, func() error { + status := partitionedReplica.CurrentLeaseStatus(ctx) + require.True(t, + status.Lease.OwnedBy(partitionStore.StoreID()), "someone else got the lease: %s", status) + if status.State == kvserverpb.LeaseState_VALID { + return errors.New("lease still valid") + } + // We need to wait for the stasis state to pass too; during stasis other + // replicas can't take the lease. + if status.State == kvserverpb.LeaseState_STASIS { + return errors.New("lease still in stasis") + } + return nil + }) + log.Infof(ctx, "test: lease expired") + + { + // Write something to generate some Raft log entries and then truncate the log. + log.Infof(ctx, "test: incrementing") + incArgs := incrementArgs(key, 1) + sender := leaderStore.TestSender() + _, pErr := kv.SendWrapped(ctx, sender, incArgs) + require.Nil(t, pErr) + } + + tc.WaitForValues(t, key, []int64{1, 2, 2}) + index, err := otherRepl.GetLastIndex() + require.NoError(t, err) + + // Truncate the log at index+1 (log entries < N are removed, so this includes + // the increment). This means that the partitioned replica will need a + // snapshot to catch up. + log.Infof(ctx, "test: truncating log...") + truncArgs := &roachpb.TruncateLogRequest{ + RequestHeader: roachpb.RequestHeader{ + Key: key, + }, + Index: index, + RangeID: rngDesc.RangeID, + } + { + _, pErr := kv.SendWrapped(ctx, leaderStore.TestSender(), truncArgs) + require.NoError(t, pErr.GoError()) + } + + // Resume n1's heartbeats and wait for it to become live again. This is to + // ensure that the rest of the test does not somehow fool itself because n1 is + // not live. + log.Infof(ctx, "test: resuming n1 heartbeats") + resumeN1Heartbeats() + + // Resolve the partition, but continue blocking snapshots destined for the + // previously-partitioned replica. The point of blocking the snapshots is to + // prevent the respective replica from catching up and becoming eligible to + // become the leader/leaseholder. The point of resolving the partition is to + // allow the replica in question to figure out that it's not the leader any + // more. As long as it is completely partitioned, the replica continues + // believing that it is the leader, and lease acquisition requests block. + log.Infof(ctx, "test: removing partition") + slowSnapHandler := &slowSnapRaftHandler{ + rangeID: rngDesc.RangeID, + waitCh: make(chan struct{}), + RaftMessageHandler: partitionStore, + } + defer slowSnapHandler.unblock() + partitionStore.Transport().Listen(partitionStore.Ident.StoreID, slowSnapHandler) + // Remove the unreliable transport from the other stores, so that messages + // sent by the partitioned store can reach them. + for _, i := range []int{0, 1, 2} { + if i == partitionNodeIdx { + // We've handled the partitioned store above. + continue + } + store := tc.GetFirstStoreFromServer(t, i) + store.Transport().Listen(store.Ident.StoreID, store) + } + + // Now we're going to send a request to the behind replica, and we expect it + // to not block; we expect a redirection to the leader. + log.Infof(ctx, "test: sending request") + timeoutCtx, cancel := context.WithTimeout(ctx, 10*time.Second) + defer cancel() + getRequest := getArgs(key) + _, pErr := kv.SendWrapped(timeoutCtx, partitionedStoreSender, getRequest) + require.NotNil(t, pErr, "unexpected success") + nlhe := pErr.GetDetail().(*roachpb.NotLeaseHolderError) + require.NotNil(t, nlhe, "expected NotLeaseholderError, got: %s", pErr) + require.NotNil(t, nlhe.LeaseHolder, "expected NotLeaseholderError with a known leaseholder, got: %s", pErr) + require.Equal(t, leaderReplicaID, nlhe.LeaseHolder.ReplicaID) +} + type fakeSnapshotStream struct { nextReq *kvserver.SnapshotRequest nextErr error @@ -3528,10 +3781,12 @@ func TestRemovedReplicaError(t *testing.T) { }) } +// Test that the Raft leadership is transferred to follow the lease. func TestTransferRaftLeadership(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) + ctx := context.Background() const numStores = 3 sc := kvserver.TestStoreConfig(nil) sc.TestingKnobs.DisableMergeQueue = true @@ -3600,25 +3855,14 @@ func TestTransferRaftLeadership(t *testing.T) { t.Fatalf("raft leader should be %d, but got status %+v", rd0.ReplicaID, status) } - // Force a read on Store 2 to request a new lease. Other moving parts in - // the system could have requested another lease as well, so we - // expire-request in a loop until we get our foot in the door. origCount0 := store0.Metrics().RangeRaftLeaderTransfers.Count() - for { - mtc.advanceClock(context.Background()) - if _, pErr := kv.SendWrappedWith( - context.Background(), store1, roachpb.Header{RangeID: repl0.RangeID}, getArgs, - ); pErr == nil { - break - } else { - switch pErr.GetDetail().(type) { - case *roachpb.NotLeaseHolderError, *roachpb.RangeNotFoundError: - default: - t.Fatal(pErr) - } - } - } - // Verify lease is transferred. + // Transfer the lease. We'll then check that the leadership follows + // automatically. + transferLeaseArgs := adminTransferLeaseArgs(key, store1.StoreID()) + _, pErr := kv.SendWrappedWith(ctx, store0, roachpb.Header{RangeID: repl0.RangeID}, transferLeaseArgs) + require.NoError(t, pErr.GoError()) + + // Verify leadership is transferred. testutils.SucceedsSoon(t, func() error { if a, e := repl0.RaftStatus().Lead, uint64(rd1.ReplicaID); a != e { return errors.Errorf("expected raft leader be %d; got %d", e, a) diff --git a/pkg/kv/kvserver/client_test.go b/pkg/kv/kvserver/client_test.go index d43cde5f9fae..406b789f2fc3 100644 --- a/pkg/kv/kvserver/client_test.go +++ b/pkg/kv/kvserver/client_test.go @@ -910,6 +910,8 @@ func (m *multiTestContext) addStore(idx int) { nodeID := roachpb.NodeID(idx + 1) cfg := m.makeStoreConfig(idx) ambient := log.AmbientContext{Tracer: cfg.Settings.Tracer} + ambient.AddLogTag("n", nodeID) + m.populateDB(idx, cfg.Settings, stopper) nlActive, nlRenewal := cfg.NodeLivenessDurations() m.nodeLivenesses[idx] = liveness.NewNodeLiveness(liveness.NodeLivenessOptions{ @@ -1251,7 +1253,8 @@ func (m *multiTestContext) changeReplicas( return desc.NextReplicaID, nil } -// replicateRange replicates the given range onto the given stores. +// replicateRange replicates the given range onto the given destination stores. The destinations +// are indicated by indexes within m.stores. func (m *multiTestContext) replicateRange(rangeID roachpb.RangeID, dests ...int) { m.t.Helper() if err := m.replicateRangeNonFatal(rangeID, dests...); err != nil { @@ -1368,9 +1371,9 @@ func (m *multiTestContext) waitForValuesT(t testing.TB, key roachpb.Key, expecte }) } -// waitForValues waits up to the given duration for the integer values -// at the given key to match the expected slice (across all engines). -// Fails the test if they do not match. +// waitForValues waits for the integer values at the given key to match the +// expected slice (across all engines). Fails the test if they do not match +// after the SucceedsSoon period. func (m *multiTestContext) waitForValues(key roachpb.Key, expected []int64) { m.t.Helper() m.waitForValuesT(m.t, key, expected) diff --git a/pkg/kv/kvserver/closed_timestamp_test.go b/pkg/kv/kvserver/closed_timestamp_test.go index adafc32c8c3e..24d024d1189b 100644 --- a/pkg/kv/kvserver/closed_timestamp_test.go +++ b/pkg/kv/kvserver/closed_timestamp_test.go @@ -549,6 +549,12 @@ func TestClosedTimestampInactiveAfterSubsumption(t *testing.T) { return nil }, DisableMergeQueue: true, + // A subtest wants to force a lease change by stopping the liveness + // heartbeats on the old leaseholder and sending a request to + // another replica. If we didn't use this knob, we'd have to + // architect a Raft leadership change too in order to let the + // replica get the lease. + AllowLeaseRequestProposalsWhenNotLeader: true, }, }, }, @@ -699,7 +705,7 @@ func forceLeaseTransferOnSubsumedRange( }) restartHeartbeats := oldLeaseholderStore.NodeLiveness().PauseAllHeartbeatsForTest() defer restartHeartbeats() - log.Infof(ctx, "paused RHS rightLeaseholder's liveness heartbeats") + log.Infof(ctx, "test: paused RHS rightLeaseholder's liveness heartbeats") time.Sleep(oldLeaseholderStore.NodeLiveness().GetLivenessThreshold()) // Send a read request from one of the followers of RHS so that it notices @@ -710,7 +716,8 @@ func forceLeaseTransferOnSubsumedRange( log.Infof(ctx, "sending a read request from a follower of RHS (store %d) in order to trigger lease acquisition", newRightLeaseholder.StoreID()) - newRightLeaseholder.Send(ctx, leaseAcquisitionRequest) + _, pErr := newRightLeaseholder.Send(ctx, leaseAcquisitionRequest) + log.Infof(ctx, "test: RHS read returned err: %v", pErr) // After the merge commits, the RHS will cease to exist and this read // request will return a RangeNotFoundError. But we cannot guarantee that // the merge will always successfully commit on its first attempt diff --git a/pkg/kv/kvserver/replica_init.go b/pkg/kv/kvserver/replica_init.go index 2d86a10c39ec..5cb802b52961 100644 --- a/pkg/kv/kvserver/replica_init.go +++ b/pkg/kv/kvserver/replica_init.go @@ -95,6 +95,7 @@ func newUnloadedReplica( r.mu.proposals = map[kvserverbase.CmdIDKey]*ProposalData{} r.mu.checksums = map[uuid.UUID]ReplicaChecksum{} r.mu.proposalBuf.Init((*replicaProposer)(r)) + r.mu.proposalBuf.testing.allowLeaseProposalWhenNotLeader = store.cfg.TestingKnobs.AllowLeaseRequestProposalsWhenNotLeader if leaseHistoryMaxEntries > 0 { r.leaseHistory = newLeaseHistory() diff --git a/pkg/kv/kvserver/replica_proposal_buf.go b/pkg/kv/kvserver/replica_proposal_buf.go index 4cbf41e364c2..041163e272d3 100644 --- a/pkg/kv/kvserver/replica_proposal_buf.go +++ b/pkg/kv/kvserver/replica_proposal_buf.go @@ -143,6 +143,14 @@ type propBuf struct { // process of replication. Dropped proposals are still eligible to be // reproposed due to ticks. submitProposalFilter func(*ProposalData) (drop bool, err error) + // allowLeaseProposalWhenNotLeader, if set, makes the proposal buffer allow + // lease request proposals even when the replica inserting that proposal is + // not the Raft leader. This can be used in tests to allow a replica to + // acquire a lease without first moving the Raft leadership to it (e.g. it + // allows tests to expire leases by stopping the old leaseholder's liveness + // heartbeats and then expect other replicas to take the lease without + // worrying about Raft). + allowLeaseProposalWhenNotLeader bool } } @@ -156,8 +164,28 @@ type proposer interface { leaseAppliedIndex() uint64 enqueueUpdateCheck() // The following require the proposer to hold an exclusive lock. - withGroupLocked(func(*raft.RawNode) error) error + withGroupLocked(func(proposerRaft) error) error registerProposalLocked(*ProposalData) + // rejectProposalWithRedirectLocked rejects a proposal and redirects the + // proposer to try it on another node. This is used to sometimes reject lease + // acquisitions when another replica is the leader; the intended consequence + // of the rejection is that the request that caused the lease acquisition + // attempt is tried on the leader, at which point it should result in a lease + // acquisition attempt by that node (or, perhaps by then the leader will have + // already gotten a lease and the request can be serviced directly). + rejectProposalWithRedirectLocked( + ctx context.Context, + prop *ProposalData, + redirectTo roachpb.ReplicaID, + ) +} + +// proposerRaft abstracts the propBuf's dependency on *raft.RawNode, to help +// testing. +type proposerRaft interface { + Step(raftpb.Message) error + BasicStatus() raft.BasicStatus + ProposeConfChange(raftpb.ConfChangeI) error } // Init initializes the proposal buffer and binds it to the provided proposer. @@ -346,7 +374,7 @@ func (b *propBuf) flushRLocked(ctx context.Context) error { } func (b *propBuf) flushLocked(ctx context.Context) error { - return b.p.withGroupLocked(func(raftGroup *raft.RawNode) error { + return b.p.withGroupLocked(func(raftGroup proposerRaft) error { _, err := b.FlushLockedWithRaftGroup(ctx, raftGroup) return err }) @@ -361,7 +389,7 @@ func (b *propBuf) flushLocked(ctx context.Context) error { // // Returns the number of proposals handed to the RawNode. func (b *propBuf) FlushLockedWithRaftGroup( - ctx context.Context, raftGroup *raft.RawNode, + ctx context.Context, raftGroup proposerRaft, ) (int, error) { // Before returning, make sure to forward the lease index base to at least // the proposer's currently applied lease index. This ensures that if the @@ -399,6 +427,25 @@ func (b *propBuf) FlushLockedWithRaftGroup( // and apply them. buf := b.arr.asSlice()[:used] ents := make([]raftpb.Entry, 0, used) + + // Figure out leadership info. We'll use it to conditionally drop some + // requests. + var leaderKnown, iAmTheLeader bool + var leader roachpb.ReplicaID + if raftGroup != nil { + status := raftGroup.BasicStatus() + iAmTheLeader = status.RaftState == raft.StateLeader + leaderKnown = status.Lead != raft.None + if leaderKnown { + leader = roachpb.ReplicaID(status.Lead) + if !iAmTheLeader && leader == b.p.replicaID() { + log.Fatalf(ctx, + "inconsistent Raft state: state %s while the current replica is also the lead: %d", + status.RaftState, leader) + } + } + } + // Remember the first error that we see when proposing the batch. We don't // immediately return this error because we want to finish clearing out the // buffer and registering each of the proposals with the proposer, but we @@ -412,6 +459,41 @@ func (b *propBuf) FlushLockedWithRaftGroup( } buf[i] = nil // clear buffer + // Handle an edge case about lease acquisitions: we don't want to forward + // lease acquisitions to another node (which is what happens when we're not + // the leader) because: + // a) if there is a different leader, that leader should acquire the lease + // itself and thus avoid a change of leadership caused by the leaseholder + // and leader being different (Raft leadership follows the lease), and + // b) being a follower, it's possible that this replica is behind in + // applying the log. Thus, there might be another lease in place that this + // follower doesn't know about, in which case the lease we're proposing here + // would be rejected. Not only would proposing such a lease be wasted work, + // but we're trying to protect against pathological cases where it takes a + // long time for this follower to catch up (for example because it's waiting + // for a snapshot, and the snapshot is queued behind many other snapshots). + // In such a case, we don't want all requests arriving at this node to be + // blocked on this lease acquisition (which is very likely to eventually + // fail anyway). + // + // Thus, we do one of two things: + // - if the leader is known, we reject this proposal and make sure the + // request that needed the lease is redirected to the leaseholder; + // - if the leader is not known, we don't do anything special here to + // terminate the proposal, but we know that Raft will reject it with a + // ErrProposalDropped. We'll eventually re-propose it once a leader is + // known, at which point it will either go through or be rejected based on + // whether or not it is this replica that became the leader. + if !iAmTheLeader && p.Request.IsLeaseRequest() { + if leaderKnown && !b.testing.allowLeaseProposalWhenNotLeader { + log.VEventf(ctx, 2, "not proposing lease acquisition because we're not the leader; replica %d is", + leader) + b.p.rejectProposalWithRedirectLocked(ctx, p, leader) + continue + } + // If the leader is not known, continue with the proposal as explained above. + } + // Raft processing bookkeeping. b.p.registerProposalLocked(p) @@ -498,7 +580,7 @@ func (b *propBuf) forwardLeaseIndexBase(v uint64) { } } -func proposeBatch(raftGroup *raft.RawNode, replID roachpb.ReplicaID, ents []raftpb.Entry) error { +func proposeBatch(raftGroup proposerRaft, replID roachpb.ReplicaID, ents []raftpb.Entry) error { if len(ents) == 0 { return nil } @@ -622,7 +704,7 @@ func (rp *replicaProposer) enqueueUpdateCheck() { rp.store.enqueueRaftUpdateCheck(rp.RangeID) } -func (rp *replicaProposer) withGroupLocked(fn func(*raft.RawNode) error) error { +func (rp *replicaProposer) withGroupLocked(fn func(raftGroup proposerRaft) error) error { // Pass true for mayCampaignOnWake because we're about to propose a command. return (*Replica)(rp).withRaftGroupLocked(true, func(raftGroup *raft.RawNode) (bool, error) { // We're proposing a command here so there is no need to wake the leader @@ -638,3 +720,22 @@ func (rp *replicaProposer) registerProposalLocked(p *ProposalData) { p.proposedAtTicks = rp.mu.ticks rp.mu.proposals[p.idKey] = p } + +// rejectProposalWithRedirectLocked is part of the proposer interface. +func (rp *replicaProposer) rejectProposalWithRedirectLocked( + ctx context.Context, prop *ProposalData, redirectTo roachpb.ReplicaID, +) { + r := (*Replica)(rp) + rangeDesc := r.descRLocked() + storeID := r.store.StoreID() + leaderRep, _ /* ok */ := rangeDesc.GetReplicaDescriptorByID(redirectTo) + speculativeLease := &roachpb.Lease{ + Replica: leaderRep, + } + log.VEventf(ctx, 2, "redirecting proposal to node %s; request: %s", leaderRep.NodeID, prop.Request) + r.cleanupFailedProposalLocked(prop) + prop.finishApplication(ctx, proposalResult{ + Err: roachpb.NewError(newNotLeaseHolderError( + speculativeLease, storeID, rangeDesc, "refusing to acquire lease on follower")), + }) +} diff --git a/pkg/kv/kvserver/replica_proposal_buf_test.go b/pkg/kv/kvserver/replica_proposal_buf_test.go index c536bada240e..534a7dcffb00 100644 --- a/pkg/kv/kvserver/replica_proposal_buf_test.go +++ b/pkg/kv/kvserver/replica_proposal_buf_test.go @@ -26,6 +26,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "go.etcd.io/etcd/raft/v3" + "go.etcd.io/etcd/raft/v3/raftpb" "golang.org/x/sync/errgroup" ) @@ -36,6 +37,33 @@ type testProposer struct { lai uint64 enqueued int registered int + + // If not nil, this can be a testProposerRaft used to mock the raft group + // passed to FlushLockedWithRaftGroup(). + raftGroup proposerRaft + // If not nil, this is called by RejectProposalWithRedirectLocked(). If nil, + // RejectProposalWithRedirectLocked() panics. + onRejectProposalWithRedirectLocked func(prop *ProposalData, redirectTo roachpb.ReplicaID) +} + +type testProposerRaft struct { + status raft.BasicStatus +} + +var _ proposerRaft = testProposerRaft{} + +func (t testProposerRaft) Step(raftpb.Message) error { + // TODO(andrei, nvanbenschoten): Capture the message and test against it. + return nil +} + +func (t testProposerRaft) BasicStatus() raft.BasicStatus { + return t.status +} + +func (t testProposerRaft) ProposeConfChange(i raftpb.ConfChangeI) error { + // TODO(andrei, nvanbenschoten): Capture the message and test against it. + return nil } func (t *testProposer) locker() sync.Locker { @@ -62,15 +90,24 @@ func (t *testProposer) enqueueUpdateCheck() { t.enqueued++ } -func (t *testProposer) withGroupLocked(fn func(*raft.RawNode) error) error { - // Pass nil for the RawNode, which FlushLockedWithRaftGroup supports. - return fn(nil) +func (t *testProposer) withGroupLocked(fn func(proposerRaft) error) error { + // Note that t.raftGroup can be nil, which FlushLockedWithRaftGroup supports. + return fn(t.raftGroup) } func (t *testProposer) registerProposalLocked(p *ProposalData) { t.registered++ } +func (t *testProposer) rejectProposalWithRedirectLocked( + ctx context.Context, prop *ProposalData, redirectTo roachpb.ReplicaID, +) { + if t.onRejectProposalWithRedirectLocked == nil { + panic("unexpected rejectProposalWithRedirectLocked() call") + } + t.onRejectProposalWithRedirectLocked(prop, redirectTo) +} + func newPropData(leaseReq bool) (*ProposalData, []byte) { var ba roachpb.BatchRequest if leaseReq { @@ -341,3 +378,90 @@ func TestPropBufCnt(t *testing.T) { assert.Equal(t, -1, res.arrayIndex()) assert.Equal(t, uint64(0), res.leaseIndexOffset()) } + +// Test that the proposal buffer rejects lease acquisition proposals from +// followers. We want the leader to take the lease; see comments in +// FlushLockedWithRaftGroup(). +func TestProposalBufferRejectLeaseAcqOnFollower(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + ctx := context.Background() + + self := uint64(1) + // Each subtest will try to propose a lease acquisition in a different Raft + // scenario. Some proposals should be allowed, some should be rejected. + for _, tc := range []struct { + name string + state raft.StateType + leader uint64 + expRejection bool + }{ + { + name: "leader", + state: raft.StateLeader, + leader: self, + // No rejection. The leader can request a lease. + expRejection: false, + }, + { + name: "follower known leader", + state: raft.StateFollower, + // Someone else is leader. + leader: self + 1, + // Rejection - a follower can't request a lease. + expRejection: true, + }, + { + name: "follower unknown leader", + state: raft.StateFollower, + // Unknown leader. + leader: raft.None, + // No rejection if the leader is unknown. See comments in + // FlushLockedWithRaftGroup(). + expRejection: false, + }, + } { + t.Run(tc.name, func(t *testing.T) { + var p testProposer + // p.replicaID() is hardcoded; it'd better be hardcoded to what this test + // expects. + require.Equal(t, self, uint64(p.replicaID())) + + var rejected roachpb.ReplicaID + if tc.expRejection { + p.onRejectProposalWithRedirectLocked = func(_ *ProposalData, redirectTo roachpb.ReplicaID) { + if rejected != 0 { + t.Fatalf("unexpected 2nd rejection") + } + rejected = redirectTo + } + } else { + p.onRejectProposalWithRedirectLocked = func(_ *ProposalData, _ roachpb.ReplicaID) { + t.Fatalf("unexpected redirection") + } + } + + raftStatus := raft.BasicStatus{ + ID: self, + SoftState: raft.SoftState{ + RaftState: tc.state, + Lead: tc.leader, + }, + } + r := testProposerRaft{status: raftStatus} + p.raftGroup = r + var b propBuf + b.Init(&p) + + pd, data := newPropData(true /* leaseReq */) + _, err := b.Insert(ctx, pd, data) + require.NoError(t, err) + require.NoError(t, b.flushLocked(ctx)) + if tc.expRejection { + require.Equal(t, roachpb.ReplicaID(tc.leader), rejected) + } else { + require.Equal(t, roachpb.ReplicaID(0), rejected) + } + }) + } +} diff --git a/pkg/kv/kvserver/replica_range_lease.go b/pkg/kv/kvserver/replica_range_lease.go index 7cd4ed8ae555..c81de08e736e 100644 --- a/pkg/kv/kvserver/replica_range_lease.go +++ b/pkg/kv/kvserver/replica_range_lease.go @@ -585,9 +585,9 @@ func (r *Replica) leaseStatus( return status } -// currentLeaseStatus returns the status of the current lease for a current +// CurrentLeaseStatus returns the status of the current lease for a current // timestamp. -func (r *Replica) currentLeaseStatus(ctx context.Context) kvserverpb.LeaseStatus { +func (r *Replica) CurrentLeaseStatus(ctx context.Context) kvserverpb.LeaseStatus { timestamp := r.store.Clock().Now() r.mu.RLock() defer r.mu.RUnlock() diff --git a/pkg/kv/kvserver/replica_test.go b/pkg/kv/kvserver/replica_test.go index 02b3e18ddbad..be68d14a4aa9 100644 --- a/pkg/kv/kvserver/replica_test.go +++ b/pkg/kv/kvserver/replica_test.go @@ -1466,7 +1466,7 @@ func TestReplicaDrainLease(t *testing.T) { rd := tc.LookupRangeOrFatal(t, rngKey) r1, err := store1.GetReplica(rd.RangeID) require.NoError(t, err) - status := r1.currentLeaseStatus(ctx) + status := r1.CurrentLeaseStatus(ctx) require.True(t, status.Lease.OwnedBy(store1.StoreID()), "someone else got the lease: %s", status) // We expect the lease to be valid, but don't check that because, under race, it might have // expired already. @@ -1479,7 +1479,7 @@ func TestReplicaDrainLease(t *testing.T) { require.NoError(t, err) testutils.SucceedsSoon(t, func() error { - status := r1.currentLeaseStatus(ctx) + status := r1.CurrentLeaseStatus(ctx) require.True(t, status.Lease.OwnedBy(store1.StoreID()), "someone else got the lease: %s", status) if status.State == kvserverpb.LeaseState_VALID { return errors.New("lease still valid") diff --git a/pkg/kv/kvserver/testing_knobs.go b/pkg/kv/kvserver/testing_knobs.go index 5b52156d6c62..dcad8b9988da 100644 --- a/pkg/kv/kvserver/testing_knobs.go +++ b/pkg/kv/kvserver/testing_knobs.go @@ -247,6 +247,14 @@ type StoreTestingKnobs struct { // RangeFeedPushTxnsAge overrides the default value for // rangefeed.Config.PushTxnsAge. RangeFeedPushTxnsAge time.Duration + // AllowLeaseProposalWhenNotLeader, if set, makes the proposal buffer allow + // lease request proposals even when the replica inserting that proposal is + // not the Raft leader. This can be used in tests to allow a replica to + // acquire a lease without first moving the Raft leadership to it (e.g. it + // allows tests to expire leases by stopping the old leaseholder's liveness + // heartbeats and then expect other replicas to take the lease without + // worrying about Raft). + AllowLeaseRequestProposalsWhenNotLeader bool } // ModuleTestingKnobs is part of the base.ModuleTestingKnobs interface. diff --git a/pkg/server/testserver.go b/pkg/server/testserver.go index fb25d6d85cb5..5e350fe103df 100644 --- a/pkg/server/testserver.go +++ b/pkg/server/testserver.go @@ -1257,18 +1257,28 @@ func (ts *TestServer) ForceTableGC( return pErr.GoError() } -// ScratchRange splits off a range suitable to be used as KV scratch space. (it -// doesn't overlap system spans or SQL tables). +// ScratchRangeEx splits off a range suitable to be used as KV scratch space. +// (it doesn't overlap system spans or SQL tables). // // Calling this multiple times is undefined (but see TestCluster.ScratchRange() // which is idempotent). -func (ts *TestServer) ScratchRange() (roachpb.Key, error) { +func (ts *TestServer) ScratchRangeEx() (roachpb.RangeDescriptor, error) { scratchKey := keys.TableDataMax - _, _, err := ts.SplitRange(scratchKey) + _, rngDesc, err := ts.SplitRange(scratchKey) + if err != nil { + return roachpb.RangeDescriptor{}, err + } + return rngDesc, nil +} + +// ScratchRange is like ScratchRangeEx, but only returns the start key of the +// new range instead of the range descriptor. +func (ts *TestServer) ScratchRange() (roachpb.Key, error) { + desc, err := ts.ScratchRangeEx() if err != nil { return nil, err } - return scratchKey, nil + return desc.StartKey.AsRawKey(), nil } type testServerFactoryImpl struct{}