diff --git a/pkg/storage/client_raft_test.go b/pkg/storage/client_raft_test.go index a1125a67adc5..d30fea21fad1 100644 --- a/pkg/storage/client_raft_test.go +++ b/pkg/storage/client_raft_test.go @@ -40,6 +40,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/storage/storagebase" "github.com/cockroachdb/cockroach/pkg/testutils" "github.com/cockroachdb/cockroach/pkg/util/hlc" + "github.com/cockroachdb/cockroach/pkg/util/humanizeutil" "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/protoutil" @@ -1124,6 +1125,144 @@ func TestRefreshPendingCommands(t *testing.T) { } } +// Test that when a Raft group is not able to establish a quorum, its Raft log +// does not grow without bound. It tests two different scenerios where this used +// to be possible (see #27772): +// 1. The leader proposes a command and cannot establish a quorum. The leader +// continually re-proposes the command. +// 2. The follower proposes a command and forwards it to the leader, who cannot +// establish a quorum. The follower continually re-proposes and forwards the +// command to the leader. +func TestLogGrowthWhenRefreshingPendingCommands(t *testing.T) { + defer leaktest.AfterTest(t)() + + sc := storage.TestStoreConfig(nil) + // Drop the raft tick interval so the Raft group is ticked more. + sc.RaftTickInterval = 10 * time.Millisecond + // Don't timeout raft leader. We don't want leadership moving. + sc.RaftElectionTimeoutTicks = 1000000 + // Disable leader transfers during leaseholder changes so that we + // can easily create leader-not-leaseholder scenarios. + sc.TestingKnobs.DisableLeaderFollowsLeaseholder = true + // Refresh pending commands on every Raft group tick instead of + // every RaftElectionTimeoutTicks. + sc.TestingKnobs.RefreshReasonTicksPeriod = 1 + // Disable periodic gossip tasks which can move the range 1 lease + // unexpectedly. + sc.TestingKnobs.DisablePeriodicGossips = true + mtc := &multiTestContext{storeConfig: &sc} + defer mtc.Stop() + mtc.Start(t, 5) + + const rangeID = roachpb.RangeID(1) + mtc.replicateRange(rangeID, 1, 2, 3, 4) + + // Raft leadership is kept on node 0. + leaderRepl, err := mtc.Store(0).GetReplica(rangeID) + if err != nil { + t.Fatal(err) + } + + // Put some data in the range so we'll have something to test for. + incArgs := incrementArgs([]byte("a"), 5) + if _, err := client.SendWrapped(context.Background(), mtc.stores[0].TestSender(), incArgs); err != nil { + t.Fatal(err) + } + + // Wait for all nodes to catch up. + mtc.waitForValues(roachpb.Key("a"), []int64{5, 5, 5, 5, 5}) + + // Test proposing on leader and proposing on follower. Neither should result + // in unbounded raft log growth. + testutils.RunTrueAndFalse(t, "proposeOnFollower", func(t *testing.T, proposeOnFollower bool) { + // Restart any nodes that are down. + for _, s := range []int{2, 3, 4} { + if mtc.Store(s) == nil { + mtc.restartStore(s) + } + } + + // Determine which node to propose on. Transfer lease to that node. + var propIdx, otherIdx int + if !proposeOnFollower { + propIdx, otherIdx = 0, 1 + } else { + propIdx, otherIdx = 1, 0 + } + propNode := mtc.stores[propIdx].TestSender() + mtc.transferLease(context.TODO(), rangeID, otherIdx, propIdx) + testutils.SucceedsSoon(t, func() error { + // Lease transfers may not be immediately observed by the new + // leaseholder. Wait until the new leaseholder is aware. + repl, err := mtc.Store(propIdx).GetReplica(rangeID) + if err != nil { + t.Fatal(err) + } + repDesc, err := repl.GetReplicaDescriptor() + if err != nil { + t.Fatal(err) + } + if lease, _ := repl.GetLease(); lease.Replica != repDesc { + return errors.Errorf("lease not transferred yet; found %v", lease) + } + return nil + }) + + // Stop enough nodes to prevent a quorum. + for _, s := range []int{2, 3, 4} { + mtc.stopStore(s) + } + + // Determine the current raft log size. + initLogSize := leaderRepl.GetRaftLogSize() + + // While a majority nodes are down, write some data. + putRes := make(chan *roachpb.Error) + go func() { + putArgs := putArgs([]byte("b"), make([]byte, 8<<10 /* 8 KB */)) + _, err := client.SendWrapped(context.Background(), propNode, putArgs) + putRes <- err + }() + + // Wait for a bit and watch for Raft log growth. + wait := time.After(500 * time.Millisecond) + ticker := time.Tick(50 * time.Millisecond) + Loop: + for { + select { + case <-wait: + break Loop + case <-ticker: + // Verify that the leader is node 0. + status := leaderRepl.RaftStatus() + if status == nil || status.RaftState != raft.StateLeader { + t.Fatalf("raft leader should be node 0, but got status %+v", status) + } + + // Check raft log size. + const logSizeLimit = 64 << 10 // 64 KB + curlogSize := leaderRepl.GetRaftLogSize() + logSize := curlogSize - initLogSize + logSizeStr := humanizeutil.IBytes(logSize) + if logSize > logSizeLimit { + t.Fatalf("raft log size grew to %s", logSizeStr) + } + t.Logf("raft log size grew to %s", logSizeStr) + case err := <-putRes: + t.Fatalf("write finished with quorum unavailable; err=%v", err) + } + } + + // Start enough nodes to establish a quorum. + mtc.restartStore(2) + + // The write should now succeed. + if err := <-putRes; err != nil { + t.Fatal(err) + } + }) +} + // TestStoreRangeUpReplicate verifies that the replication queue will notice // under-replicated ranges and replicate them. Also tests that preemptive // snapshots which contain sideloaded proposals don't panic the receiving end. diff --git a/pkg/storage/replica.go b/pkg/storage/replica.go index ba63fc7ba7c3..3ecc5cc19e55 100644 --- a/pkg/storage/replica.go +++ b/pkg/storage/replica.go @@ -392,7 +392,12 @@ type Replica struct { // map must only be referenced while Replica.mu is held, except if the // element is removed from the map first. The notable exception is the // contained RaftCommand, which we treat as immutable. - proposals map[storagebase.CmdIDKey]*ProposalData + proposals map[storagebase.CmdIDKey]*ProposalData + // remoteProposals is maintained by Raft leaders and stores in-flight + // commands that were forwarded to the leader during its current term. + // The set allows leaders to detect duplicate forwarded commands and + // avoid re-proposing the same forwarded command multiple times. + remoteProposals map[storagebase.CmdIDKey]struct{} internalRaftGroup *raft.RawNode // The ID of the replica within the Raft group. May be 0 if the replica has // been created from a preemptive snapshot (i.e. before being added to the @@ -852,6 +857,7 @@ func (r *Replica) cancelPendingCommandsLocked() { p.finishRaftApplication(resp) } r.mu.proposals = map[storagebase.CmdIDKey]*ProposalData{} + r.mu.remoteProposals = nil } // setTombstoneKey writes a tombstone to disk to ensure that replica IDs never @@ -3333,6 +3339,39 @@ func (r *Replica) stepRaftGroup(req *RaftMessageRequest) error { // other replica is not quiesced, so we don't need to wake the leader. r.unquiesceLocked() r.refreshLastUpdateTimeForReplicaLocked(req.FromReplica.ReplicaID) + if req.Message.Type == raftpb.MsgProp { + // A proposal was forwarded to this replica. + if r.mu.replicaID == r.mu.leaderID { + // This replica is the leader. Record that the proposal + // was seen and drop the proposal if it was already seen. + // This prevents duplicate forwarded proposals from each + // being appended to a leader's raft log. + allSeen := true + for _, e := range req.Message.Entries { + switch e.Type { + case raftpb.EntryNormal: + cmdID, _ := DecodeRaftCommand(e.Data) + if r.mu.remoteProposals == nil { + r.mu.remoteProposals = map[storagebase.CmdIDKey]struct{}{} + } + if _, ok := r.mu.remoteProposals[cmdID]; !ok { + r.mu.remoteProposals[cmdID] = struct{}{} + allSeen = false + } + case raftpb.EntryConfChange: + // We could peek into the EntryConfChange to find the + // command ID, but we don't expect follower-initiated + // conf changes. + allSeen = false + default: + log.Fatalf(context.TODO(), "unexpected Raft entry: %v", e) + } + } + if allSeen { + return false /* unquiesceAndWakeLeader */, nil + } + } + } err := raftGroup.Step(req.Message) if err == raft.ErrProposalDropped { // A proposal was forwarded to this replica but we couldn't propose it. @@ -3441,6 +3480,9 @@ func (r *Replica) handleRaftReadyRaftMuLocked( if !r.store.TestingKnobs().DisableRefreshReasonNewLeader { refreshReason = reasonNewLeader } + // Clear the remote proposal set. Would have been nil already if not + // previously the leader. + r.mu.remoteProposals = nil leaderID = roachpb.ReplicaID(rd.SoftState.Lead) } @@ -3831,17 +3873,28 @@ func (r *Replica) tick() (bool, error) { r.mu.ticks++ r.mu.internalRaftGroup.Tick() + + refreshAtDelta := r.store.cfg.RaftElectionTimeoutTicks + if knob := r.store.TestingKnobs().RefreshReasonTicksPeriod; knob > 0 { + refreshAtDelta = knob + } if !r.store.TestingKnobs().DisableRefreshReasonTicks && - r.mu.ticks%r.store.cfg.RaftElectionTimeoutTicks == 0 { + r.mu.replicaID != r.mu.leaderID && + r.mu.ticks%refreshAtDelta == 0 { // RaftElectionTimeoutTicks is a reasonable approximation of how long we // should wait before deciding that our previous proposal didn't go // through. Note that the combination of the above condition and passing // RaftElectionTimeoutTicks to refreshProposalsLocked means that commands // will be refreshed when they have been pending for 1 to 2 election // cycles. - r.refreshProposalsLocked( - r.store.cfg.RaftElectionTimeoutTicks, reasonTicks, - ) + // + // However, we don't refresh proposals if we are the leader because + // doing so would be useless. The commands tracked by a leader replica + // were either all proposed when the replica was a leader or were + // re-proposed when the replica became a leader. Either way, they are + // guaranteed to be in the leader's Raft log so re-proposing won't do + // anything. + r.refreshProposalsLocked(refreshAtDelta, reasonTicks) } return true, nil } @@ -3948,6 +4001,9 @@ func (r *Replica) maybeTransferRaftLeader( if !r.isLeaseValidRLocked(l, now) { return } + if r.store.TestingKnobs().DisableLeaderFollowsLeaseholder { + return + } if pr, ok := status.Progress[uint64(l.Replica.ReplicaID)]; ok && pr.Match >= status.Commit { log.VEventf(ctx, 1, "transferring raft leadership to replica ID %v", l.Replica.ReplicaID) r.store.metrics.RangeRaftLeaderTransfers.Inc(1) @@ -4609,6 +4665,9 @@ func (r *Replica) processRaftCommand( delete(r.mu.proposals, idKey) } + // Delete the entry for a forwarded proposal set. + delete(r.mu.remoteProposals, idKey) + leaseIndex, proposalRetry, forcedErr := r.checkForcedErrLocked(ctx, idKey, raftCmd, proposal, proposedLocally) r.mu.Unlock() diff --git a/pkg/storage/replica_proposal.go b/pkg/storage/replica_proposal.go index 0ccea15d9321..7fd39f8d44bd 100644 --- a/pkg/storage/replica_proposal.go +++ b/pkg/storage/replica_proposal.go @@ -274,7 +274,8 @@ func (r *Replica) leasePostApply(ctx context.Context, newLease roachpb.Lease) { r.txnWaitQueue.Clear(true /* disable */) } - if !iAmTheLeaseHolder && r.IsLeaseValid(newLease, r.store.Clock().Now()) { + if !iAmTheLeaseHolder && r.IsLeaseValid(newLease, r.store.Clock().Now()) && + !r.store.TestingKnobs().DisableLeaderFollowsLeaseholder { // If this replica is the raft leader but it is not the new lease holder, // then try to transfer the raft leadership to match the lease. We like it // when leases and raft leadership are collocated because that facilitates diff --git a/pkg/storage/replica_test.go b/pkg/storage/replica_test.go index 785ca9ebe420..0f9a7fca444c 100644 --- a/pkg/storage/replica_test.go +++ b/pkg/storage/replica_test.go @@ -7835,8 +7835,14 @@ func TestReplicaRefreshPendingCommandsTicks(t *testing.T) { t.Fatal(err) } - electionTicks := tc.store.cfg.RaftElectionTimeoutTicks + // Only followers refresh pending commands during tick events. Change the + // replica that the range thinks is the leader so that the replica thinks + // it's a follower. + r.mu.Lock() + r.mu.leaderID = 2 + r.mu.Unlock() + electionTicks := tc.store.cfg.RaftElectionTimeoutTicks { // The verifications of the reproposal counts below rely on r.mu.ticks // starting with a value of 0 (modulo electionTicks). Move the replica into diff --git a/pkg/storage/store.go b/pkg/storage/store.go index 6f0d90ffeed2..35c30ebbdfab 100644 --- a/pkg/storage/store.go +++ b/pkg/storage/store.go @@ -733,6 +733,9 @@ type StoreTestingKnobs struct { DisableScanner bool // DisablePeriodicGossips disables periodic gossiping. DisablePeriodicGossips bool + // DisableLeaderFollowsLeaseholder disables attempts to transfer raft + // leadership when it diverges from the range's leaseholder. + DisableLeaderFollowsLeaseholder bool // DisableRefreshReasonTicks disables refreshing pending commands when a new // leader is discovered. DisableRefreshReasonNewLeader bool @@ -742,6 +745,10 @@ type StoreTestingKnobs struct { // DisableRefreshReasonTicks disables refreshing pending commands // periodically. DisableRefreshReasonTicks bool + // RefreshReasonTicksPeriod overrides the default period over which + // pending commands are refreshed. The period is specified as a multiple + // of Raft group ticks. + RefreshReasonTicksPeriod int // DisableProcessRaft disables the process raft loop. DisableProcessRaft bool // DisableLastProcessedCheck disables checking on replica queue last processed times.