diff --git a/pkg/storage/replica.go b/pkg/storage/replica.go index a49aa8a5b14f..a598de002977 100644 --- a/pkg/storage/replica.go +++ b/pkg/storage/replica.go @@ -4444,6 +4444,14 @@ func (r *Replica) handleRaftReadyRaftMuLocked( const expl = "during advance" if err := r.withRaftGroup(true, func(raftGroup *raft.RawNode) (bool, error) { raftGroup.Advance(rd) + + // If the Raft group still has more to process then we immediately + // re-enqueue it for another round of processing. This is possible if + // the group's committed entries were paginated due to size limitations + // and we didn't apply all of them in this pass. + if raftGroup.HasReady() { + r.store.enqueueRaftUpdateCheck(r.RangeID) + } return true, nil }); err != nil { return stats, expl, errors.Wrap(err, expl) diff --git a/pkg/storage/replica_test.go b/pkg/storage/replica_test.go index 3a7fb0a108ee..bbedf5c47904 100644 --- a/pkg/storage/replica_test.go +++ b/pkg/storage/replica_test.go @@ -9581,6 +9581,98 @@ func TestProposeWithAsyncConsensus(t *testing.T) { close(blockRaftApplication) } +// TestApplyPaginatedCommittedEntries tests that a Raft group's committed +// entries are quickly applied, even if their application is paginated due to +// the RaftMaxSizePerMsg configuration. This is a regression test for #31330. +func TestApplyPaginatedCommittedEntries(t *testing.T) { + defer leaktest.AfterTest(t)() + ctx := context.Background() + tc := testContext{} + tsc := TestStoreConfig(nil) + + // Drop the RaftMaxSizePerMsg so that even small Raft entries have their + // application paginated. + // TODO(nvanbenschoten): Switch this to using the new MaxCommitedSizePerReady + // configuration once #31511 is addressed. + tsc.RaftMaxSizePerMsg = 128 + // Slow down the tick interval dramatically so that Raft groups can't rely + // on ticks to trigger Raft ready iterations. + tsc.RaftTickInterval = 5 * time.Second + + var filterActive int32 + blockRaftApplication := make(chan struct{}) + blockingRaftApplication := make(chan struct{}, 1) + tsc.TestingKnobs.TestingApplyFilter = + func(filterArgs storagebase.ApplyFilterArgs) *roachpb.Error { + if atomic.LoadInt32(&filterActive) == 1 { + select { + case blockingRaftApplication <- struct{}{}: + default: + } + <-blockRaftApplication + } + return nil + } + + stopper := stop.NewStopper() + defer stopper.Stop(ctx) + tc.StartWithStoreConfig(t, stopper, tsc) + repl := tc.repl + + // Block command application then propose a command to Raft. + var ba roachpb.BatchRequest + key := roachpb.Key("a") + put := putArgs(key, []byte("val")) + ba.Add(&put) + ba.Timestamp = tc.Clock().Now() + + atomic.StoreInt32(&filterActive, 1) + exLease, _ := repl.GetLease() + _, _, _, pErr := repl.propose(ctx, exLease, ba, nil /* endCmds */, &allSpans) + if pErr != nil { + t.Fatal(pErr) + } + + // Once that command is stuck applying, propose a number of large commands. + // This will allow them to all build up without any being applied so that + // their application will require pagination. + <-blockingRaftApplication + var ch chan proposalResult + for i := 0; i < 50; i++ { + var ba2 roachpb.BatchRequest + key := roachpb.Key("a") + put := putArgs(key, make([]byte, 2*tsc.RaftMaxSizePerMsg)) + ba2.Add(&put) + ba2.Timestamp = tc.Clock().Now() + + var pErr *roachpb.Error + ch, _, _, pErr = repl.propose(ctx, exLease, ba, nil /* endCmds */, &allSpans) + if pErr != nil { + t.Fatal(pErr) + } + } + + // Stop blocking Raft application. All of the proposals should quickly + // commit and apply, even if their application is paginated due to the + // small RaftMaxSizePerMsg. + close(blockRaftApplication) + const maxWait = 10 * time.Second + select { + case propRes := <-ch: + if propRes.Err != nil { + t.Fatalf("unexpected proposal result error: %v", propRes.Err) + } + if propRes.Reply == nil || len(propRes.Reply.Responses) != 1 { + t.Fatalf("expected proposal result with 1 response, found: %v", propRes.Reply) + } + case <-time.After(maxWait): + // If we don't re-enqueue Raft groups for another round of processing + // when their committed entries are paginated and not all immediately + // applied, this test will take more than three minutes to finish. + t.Fatalf("stall detected, proposal did not finish within %s", maxWait) + } +} + func TestSplitMsgApps(t *testing.T) { defer leaktest.AfterTest(t)()