diff --git a/pkg/kv/kvserver/client_raft_helpers_test.go b/pkg/kv/kvserver/client_raft_helpers_test.go index d500cbbd9c39..d0a075dddc6d 100644 --- a/pkg/kv/kvserver/client_raft_helpers_test.go +++ b/pkg/kv/kvserver/client_raft_helpers_test.go @@ -22,7 +22,9 @@ import ( ) type unreliableRaftHandlerFuncs struct { - // If non-nil, can return false to avoid dropping a msg to rangeID. + // If non-nil, can return false to avoid dropping the msg to + // unreliableRaftHandler.rangeID. If nil, all messages pertaining to the + // respective range are dropped. dropReq func(*kvserver.RaftMessageRequest) bool dropHB func(*kvserver.RaftHeartbeat) bool dropResp func(*kvserver.RaftMessageResponse) bool diff --git a/pkg/kv/kvserver/client_raft_test.go b/pkg/kv/kvserver/client_raft_test.go index 85a93ad22251..1b46509fdfb7 100644 --- a/pkg/kv/kvserver/client_raft_test.go +++ b/pkg/kv/kvserver/client_raft_test.go @@ -903,6 +903,7 @@ func TestSnapshotAfterTruncationWithUncommittedTail(t *testing.T) { // x x // [1]<---->[2] // + log.Infof(ctx, "test: installing unreliable Raft transports") for _, s := range []int{0, 1, 2} { h := &unreliableRaftHandler{rangeID: 1, RaftMessageHandler: mtc.stores[s]} if s != partStore { @@ -922,6 +923,7 @@ func TestSnapshotAfterTruncationWithUncommittedTail(t *testing.T) { // not succeed before their context is canceled, but they will be appended // to the partitioned replica's Raft log because it is currently the Raft // leader. + log.Infof(ctx, "test: sending writes to partitioned replica") g := ctxgroup.WithContext(ctx) for i := 0; i < 32; i++ { otherKey := roachpb.Key(fmt.Sprintf("other-%d", i)) @@ -942,26 +944,45 @@ func TestSnapshotAfterTruncationWithUncommittedTail(t *testing.T) { } // Transfer the lease to one of the followers and perform a write. The - // partition ensures that this will require a Raft leadership change. - const newLeaderStore = partStore + 1 - newLeaderRepl, err := mtc.stores[newLeaderStore].GetReplica(1) - if err != nil { - t.Fatal(err) - } - newLeaderReplSender := mtc.stores[newLeaderStore].TestSender() - + // partition ensures that this will require a Raft leadership change. It's + // unpredictable which one of the followers will become leader. Only the + // leader will be allowed to acquire the lease (see + // TestSnapshotAfterTruncationWithUncommittedTail), so it's also unpredictable + // who will get the lease. We try repeatedly sending requests to both + // candidates until one of them succeeds. + var nonPartitionedSenders [2]kv.Sender + nonPartitionedSenders[0] = mtc.stores[1].TestSender() + nonPartitionedSenders[1] = mtc.stores[2].TestSender() + + log.Infof(ctx, "test: sending write to transfer lease") incArgs = incrementArgs(key, incB) + var i int + var newLeaderRepl *kvserver.Replica + var newLeaderReplSender kv.Sender testutils.SucceedsSoon(t, func() error { mtc.advanceClock(ctx) - _, pErr := kv.SendWrapped(ctx, newLeaderReplSender, incArgs) + i++ + sender := nonPartitionedSenders[i%2] + _, pErr := kv.SendWrapped(ctx, sender, incArgs) if _, ok := pErr.GetDetail().(*roachpb.NotLeaseHolderError); ok { return pErr.GoError() } else if pErr != nil { t.Fatal(pErr) } + + // A request succeeded, proving that there is a new leader and leaseholder. + // Remember who that is. + newLeaderStoreIdx := 1 + (i % 2) + newLeaderRepl, err = mtc.stores[newLeaderStoreIdx].GetReplica(1) + if err != nil { + t.Fatal(err) + } + newLeaderReplSender = mtc.stores[newLeaderStoreIdx].TestSender() return nil }) + log.Infof(ctx, "test: waiting for values...") mtc.waitForValues(key, []int64{incA, incAB, incAB}) + log.Infof(ctx, "test: waiting for values... done") index, err := newLeaderRepl.GetLastIndex() if err != nil { @@ -970,6 +991,7 @@ func TestSnapshotAfterTruncationWithUncommittedTail(t *testing.T) { // Truncate the log at index+1 (log entries < N are removed, so this // includes the increment). + log.Infof(ctx, "test: truncating log") truncArgs := truncateLogArgs(index+1, 1) testutils.SucceedsSoon(t, func() error { mtc.advanceClock(ctx) @@ -986,6 +1008,7 @@ func TestSnapshotAfterTruncationWithUncommittedTail(t *testing.T) { snapsBefore := snapsMetric.Count() // Remove the partition. Snapshot should follow. + log.Infof(ctx, "test: removing the partition") for _, s := range []int{0, 1, 2} { mtc.transport.Listen(mtc.stores[s].Ident.StoreID, &unreliableRaftHandler{ rangeID: 1, @@ -1024,6 +1047,145 @@ func TestSnapshotAfterTruncationWithUncommittedTail(t *testing.T) { mtc.waitForValues(key, []int64{incABC, incABC, incABC}) } +// TestRequestsOnLaggingReplica tests that requests sent to a replica that's behind in +// log application don't block. The test indirectly verifies that a replica that's not +// the leader does not attempt to acquire a lease and, thus, does not block until +// it figures out that it cannot, in fact, take the lease. +// !!! more words +func TestRequestsOnLaggingReplica(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + ctx := context.Background() + mtc := &multiTestContext{ + // This test only cares about a single range. + startWithSingleRange: true, + } + defer mtc.Stop() + mtc.Start(t, 3) + + // Add replicas on all the stores. + mtc.replicateRange(1 /* rangeID */, 1, 2 /* dests */) + + // Partition the original leader from its followers. We do this by installing + // unreliableRaftHandler listeners on all three Stores. The handler on the + // partitioned store filters out all messages while the handler on the other + // two stores only filters out messages from the partitioned store. The + // configuration looks like: + // + // [0] + // x x + // / \ + // x x + // [1]<---->[2] + // + log.Infof(ctx, "test: installing unreliable Raft transports") + const partStoreIdx = 0 + partStore := mtc.stores[partStoreIdx] + partRepl, err := partStore.GetReplica(1) + require.NoError(t, err) + partReplDesc, err := partRepl.GetReplicaDescriptor() + require.NoError(t, err) + partReplSender := partStore.TestSender() + const otherStoreIdx = 1 + otherStore := mtc.stores[otherStoreIdx] + otherRepl, err := otherStore.GetReplica(1) + require.NoError(t, err) + + for _, i := range []int{0, 1, 2} { + h := &unreliableRaftHandler{rangeID: 1, RaftMessageHandler: mtc.stores[i]} + if i != partStoreIdx { + // Only filter messages from the partitioned store on the other + // two stores. + h.dropReq = func(req *kvserver.RaftMessageRequest) bool { + return req.FromReplica.StoreID == partRepl.StoreID() + } + h.dropHB = func(hb *kvserver.RaftHeartbeat) bool { + return hb.FromReplicaID == partReplDesc.ReplicaID + } + } + mtc.transport.Listen(mtc.stores[i].Ident.StoreID, h) + } + // Wait until the leadership is transferred away from the partitioned replica. + testutils.SucceedsSoon(t, func() error { + mtc.advanceClock(ctx) + lead := otherRepl.RaftStatus().Lead + if lead == raft.None { + return errors.New("no leader yet") + } + if roachpb.ReplicaID(lead) == partReplDesc.ReplicaID { + return errors.New("partitioned replica is still leader") + } + return nil + }) + + leaderReplicaID := roachpb.ReplicaID(otherRepl.RaftStatus().Lead) + log.Infof(ctx, "test: the leader is replica ID %d", leaderReplicaID) + if leaderReplicaID != 2 && leaderReplicaID != 3 { + t.Fatalf("expected leader to be 1 or 2, was: %d", leaderReplicaID) + } + leaderSender := mtc.stores[leaderReplicaID-1].TestSender() + + // Write something to generate some Raft log entries and then truncate the log. + log.Infof(ctx, "test: incrementing") + keyA := roachpb.Key("a") + incArgs := incrementArgs(keyA, 1) + _, pErr := kv.SendWrapped(ctx, leaderSender, incArgs) + require.Nil(t, pErr) + log.Infof(ctx, "test: waiting for values...") + mtc.waitForValues(keyA, []int64{0, 1, 1}) + log.Infof(ctx, "test: waiting for values... done") + index, err := otherRepl.GetLastIndex() + require.NoError(t, err) + + // Truncate the log at index+1 (log entries < N are removed, so this includes + // the increment). This means that the partitioned replica will need a + // snapshot to catch up. + log.Infof(ctx, "test: truncating log...") + truncArgs := truncateLogArgs(index+1, 1) + testutils.SucceedsSoon(t, func() error { + mtc.advanceClock(ctx) + _, pErr := kv.SendWrapped(ctx, leaderSender, truncArgs) + if _, ok := pErr.GetDetail().(*roachpb.NotLeaseHolderError); ok { + return pErr.GoError() + } else if pErr != nil { + t.Fatal(pErr) + } + return nil + }) + log.Infof(ctx, "test: truncating log... done") + + // !!! assert lease expired + + // Resolve the partition, but continue blocking snapshots destined for the + // previously-partitioned replica. The point of blocking the snapshots is to + // prevent the respective replica from catching up and becoming eligible to + // becoming the leader/leaseholder. The point of resolving the partition is to + // allow the replica in question to figure out that it's not the leader any + // more. As long as it is completely partitioned, the replica continues + // believing that it is the leader, and lease acquisition requests block. + slowSnapHandler := &slowSnapRaftHandler{ + rangeID: 1, + waitCh: make(chan struct{}), + RaftMessageHandler: partStore, + } + defer slowSnapHandler.unblock() + mtc.transport.Listen(partStore.Ident.StoreID, slowSnapHandler) + + // Now we're going to send a request to the behind replica, and we expect it + // to not block; we expect a redirection to the leader. + log.Infof(ctx, "test: sending request") + timeoutCtx, cancel := context.WithTimeout(ctx, time.Second) + defer cancel() + getRequest := getArgs(keyA) + _, pErr = kv.SendWrapped(timeoutCtx, partReplSender, getRequest) + if pErr == nil { + t.Fatalf("unexpected success") + } + nlhe := pErr.Detail.GetNotLeaseHolder() + require.NotNil(t, nlhe, "expected NotLeaseholderError, got: %s", pErr) + require.Equal(t, leaderReplicaID, nlhe.LeaseHolder.ReplicaID) +} + type fakeSnapshotStream struct { nextReq *kvserver.SnapshotRequest nextErr error diff --git a/pkg/kv/kvserver/client_test.go b/pkg/kv/kvserver/client_test.go index 1758d26a686b..91d4cd2eb5a6 100644 --- a/pkg/kv/kvserver/client_test.go +++ b/pkg/kv/kvserver/client_test.go @@ -904,6 +904,8 @@ func (m *multiTestContext) addStore(idx int) { nodeID := roachpb.NodeID(idx + 1) cfg := m.makeStoreConfig(idx) ambient := log.AmbientContext{Tracer: cfg.Settings.Tracer} + ambient.AddLogTag("n", nodeID) + m.populateDB(idx, cfg.Settings, stopper) nlActive, nlRenewal := cfg.NodeLivenessDurations() m.nodeLivenesses[idx] = kvserver.NewNodeLiveness(kvserver.NodeLivenessOptions{ @@ -1263,7 +1265,8 @@ func (m *multiTestContext) changeReplicas( return desc.NextReplicaID, nil } -// replicateRange replicates the given range onto the given stores. +// replicateRange replicates the given range onto the given destination stores. The destinations +// are indicated by indexes within m.stores. func (m *multiTestContext) replicateRange(rangeID roachpb.RangeID, dests ...int) { m.t.Helper() if err := m.replicateRangeNonFatal(rangeID, dests...); err != nil { @@ -1388,9 +1391,9 @@ func (m *multiTestContext) waitForValuesT(t testing.TB, key roachpb.Key, expecte }) } -// waitForValues waits up to the given duration for the integer values -// at the given key to match the expected slice (across all engines). -// Fails the test if they do not match. +// waitForValues waits for the integer values at the given key to match the +// expected slice (across all engines). Fails the test if they do not match +// after the SucceedsSoon period. func (m *multiTestContext) waitForValues(key roachpb.Key, expected []int64) { m.t.Helper() m.waitForValuesT(m.t, key, expected) diff --git a/pkg/kv/kvserver/replica_proposal_buf.go b/pkg/kv/kvserver/replica_proposal_buf.go index cb1cc1e4cb28..51b58baddf75 100644 --- a/pkg/kv/kvserver/replica_proposal_buf.go +++ b/pkg/kv/kvserver/replica_proposal_buf.go @@ -158,6 +158,18 @@ type proposer interface { // The following require the proposer to hold an exclusive lock. withGroupLocked(func(*raft.RawNode) error) error registerProposalLocked(*ProposalData) + // rejectProposalWithRedirectLocked rejects a proposal and redirects the + // proposer to try it on another node. This is used to sometimes reject lease + // acquisitions when another replica is the leader; the intended consequence + // of the rejection is that the request that caused the lease acquisition + // attempt is tried on the leader, at which point it should result in a lease + // acquisition attempt by that node (or, perhaps by then the leader will have + // already gotten a lease and the request can be serviced directly). + rejectProposalWithRedirectLocked( + ctx context.Context, + prop *ProposalData, + redirectTo roachpb.ReplicaID, + ) } // Init initializes the proposal buffer and binds it to the provided proposer. @@ -399,6 +411,26 @@ func (b *propBuf) FlushLockedWithRaftGroup( // and apply them. buf := b.arr.asSlice()[:used] ents := make([]raftpb.Entry, 0, used) + + // Figure out leadership info. We'll use it to conditionally drop some + // requests. + var iAmTheLeader bool + var leaderKnown bool + var leader roachpb.ReplicaID + if raftGroup != nil { + status := raftGroup.BasicStatus() + iAmTheLeader = status.RaftState == raft.StateLeader + leaderKnown = status.Lead != raft.None + if leaderKnown { + leader = roachpb.ReplicaID(status.Lead) + if !iAmTheLeader && leader == b.p.replicaID() { + log.Fatalf(ctx, + "inconsistent Raft state: state %s while the current replica is also the lead: %d", + status.RaftState, leader) + } + } + } + // Remember the first error that we see when proposing the batch. We don't // immediately return this error because we want to finish clearing out the // buffer and registering each of the proposals with the proposer, but we @@ -412,6 +444,41 @@ func (b *propBuf) FlushLockedWithRaftGroup( } buf[i] = nil // clear buffer + // Handle an edge case about lease acquisitions: we don't want to forward + // lease acquisitions to another node (which is what happens when we're not + // the leader) because: + // a) if there is a different leader, that leader should acquire the lease + // itself and thus avoid a change of leadership caused by the leaseholder + // and leader being different (Raft leadership follows the lease), and + // b) being a follower, it's possible that this replica is behind in + // applying the log. Thus, there might be another lease in place that this + // follower doesn't know about, in which case the lease we're proposing here + // would be rejected. Not only would proposing such a lease be wasted work, + // but we're trying to protect against pathological cases where it takes a + // long time for this follower to catch up (for example because it's waiting + // for a snapshot, and the snapshot is queued behind many other snapshots). + // In such a case, we don't want all requests arriving at this node to be + // blocked on this lease acquisition (which is very likely to eventually + // fail anyway). + // + // Thus, we do one of two things: + // - if the leader is known, we reject this proposal and make sure the + // request that needed the lease is redirected to the leaseholder; + // - if the leader is not known, we don't do anything special here to + // terminate the proposal, but we know that Raft will reject it with a + // ErrProposalDropped. We'll eventually re-propose it once a leader is + // known, at which point it will either go through or be rejected based on + // whether or not it is this replica that became the leader. + if !iAmTheLeader && p.Request.IsLeaseRequest() { + if leaderKnown { + log.VEventf(ctx, 2, "not proposing lease acquisition because we're not the leader; replica %d is", + leader) + b.p.rejectProposalWithRedirectLocked(ctx, p, leader) + continue + } + // If the leader is not known, continue with the proposal as explained above. + } + // Raft processing bookkeeping. b.p.registerProposalLocked(p) @@ -638,3 +705,21 @@ func (rp *replicaProposer) registerProposalLocked(p *ProposalData) { p.proposedAtTicks = rp.mu.ticks rp.mu.proposals[p.idKey] = p } + +// rejectProposalWithRedirectLocked is part of the proposer interface. +func (rp *replicaProposer) rejectProposalWithRedirectLocked( + ctx context.Context, prop *ProposalData, redirectTo roachpb.ReplicaID, +) { + r := (*Replica)(rp) + rangeDesc := r.descRLocked() + storeID := r.store.StoreID() + leaderRep, _ /* ok */ := rangeDesc.GetReplicaDescriptorByID(redirectTo) + speculativeLease := &roachpb.Lease{ + Replica: leaderRep, + } + log.VEventf(ctx, 2, "redirecting proposal to node %s; request: %s", leaderRep.NodeID, prop.Request) + r.cleanupFailedProposalLocked(prop) + prop.finishApplication(ctx, proposalResult{ + Err: roachpb.NewError(newNotLeaseHolderError(speculativeLease, storeID, rangeDesc)), + }) +} diff --git a/pkg/kv/kvserver/replica_proposal_buf_test.go b/pkg/kv/kvserver/replica_proposal_buf_test.go index ec4710a0268d..9d04e2ebc501 100644 --- a/pkg/kv/kvserver/replica_proposal_buf_test.go +++ b/pkg/kv/kvserver/replica_proposal_buf_test.go @@ -71,6 +71,12 @@ func (t *testProposer) registerProposalLocked(p *ProposalData) { t.registered++ } +func (t *testProposer) rejectProposalWithRedirectLocked( + ctx context.Context, prop *ProposalData, redirectTo roachpb.ReplicaID, +) { + panic("unimplemented") +} + func newPropData(leaseReq bool) (*ProposalData, []byte) { var ba roachpb.BatchRequest if leaseReq {