Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

storage: re-enqueue Raft groups on paginated application #31568

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions pkg/storage/replica.go
Original file line number Diff line number Diff line change
Expand Up @@ -4444,6 +4444,14 @@ func (r *Replica) handleRaftReadyRaftMuLocked(
const expl = "during advance"
if err := r.withRaftGroup(true, func(raftGroup *raft.RawNode) (bool, error) {
raftGroup.Advance(rd)

// If the Raft group still has more to process then we immediately
// re-enqueue it for another round of processing. This is possible if
// the group's committed entries were paginated due to size limitations
// and we didn't apply all of them in this pass.
if raftGroup.HasReady() {
r.store.enqueueRaftUpdateCheck(r.RangeID)
}
return true, nil
}); err != nil {
return stats, expl, errors.Wrap(err, expl)
Expand Down
92 changes: 92 additions & 0 deletions pkg/storage/replica_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9581,6 +9581,98 @@ func TestProposeWithAsyncConsensus(t *testing.T) {
close(blockRaftApplication)
}

// TestApplyPaginatedCommittedEntries tests that a Raft group's committed
// entries are quickly applied, even if their application is paginated due to
// the RaftMaxSizePerMsg configuration. This is a regression test for #31330.
func TestApplyPaginatedCommittedEntries(t *testing.T) {
defer leaktest.AfterTest(t)()
ctx := context.Background()
tc := testContext{}
tsc := TestStoreConfig(nil)

// Drop the RaftMaxSizePerMsg so that even small Raft entries have their
// application paginated.
// TODO(nvanbenschoten): Switch this to using the new MaxCommitedSizePerReady
// configuration once #31511 is addressed.
tsc.RaftMaxSizePerMsg = 128
// Slow down the tick interval dramatically so that Raft groups can't rely
// on ticks to trigger Raft ready iterations.
tsc.RaftTickInterval = 5 * time.Second

var filterActive int32
blockRaftApplication := make(chan struct{})
blockingRaftApplication := make(chan struct{}, 1)
tsc.TestingKnobs.TestingApplyFilter =
func(filterArgs storagebase.ApplyFilterArgs) *roachpb.Error {
if atomic.LoadInt32(&filterActive) == 1 {
select {
case blockingRaftApplication <- struct{}{}:
default:
}
<-blockRaftApplication
}
return nil
}

stopper := stop.NewStopper()
defer stopper.Stop(ctx)
tc.StartWithStoreConfig(t, stopper, tsc)
repl := tc.repl

// Block command application then propose a command to Raft.
var ba roachpb.BatchRequest
key := roachpb.Key("a")
put := putArgs(key, []byte("val"))
ba.Add(&put)
ba.Timestamp = tc.Clock().Now()

atomic.StoreInt32(&filterActive, 1)
exLease, _ := repl.GetLease()
_, _, _, pErr := repl.propose(ctx, exLease, ba, nil /* endCmds */, &allSpans)
if pErr != nil {
t.Fatal(pErr)
}

// Once that command is stuck applying, propose a number of large commands.
// This will allow them to all build up without any being applied so that
// their application will require pagination.
<-blockingRaftApplication
var ch chan proposalResult
for i := 0; i < 50; i++ {
var ba2 roachpb.BatchRequest
key := roachpb.Key("a")
put := putArgs(key, make([]byte, 2*tsc.RaftMaxSizePerMsg))
ba2.Add(&put)
ba2.Timestamp = tc.Clock().Now()

var pErr *roachpb.Error
ch, _, _, pErr = repl.propose(ctx, exLease, ba, nil /* endCmds */, &allSpans)
if pErr != nil {
t.Fatal(pErr)
}
}

// Stop blocking Raft application. All of the proposals should quickly
// commit and apply, even if their application is paginated due to the
// small RaftMaxSizePerMsg.
close(blockRaftApplication)
const maxWait = 10 * time.Second
select {
case propRes := <-ch:
if propRes.Err != nil {
t.Fatalf("unexpected proposal result error: %v", propRes.Err)
}
if propRes.Reply == nil || len(propRes.Reply.Responses) != 1 {
t.Fatalf("expected proposal result with 1 response, found: %v", propRes.Reply)
}
case <-time.After(maxWait):
// If we don't re-enqueue Raft groups for another round of processing
// when their committed entries are paginated and not all immediately
// applied, this test will take more than three minutes to finish.
t.Fatalf("stall detected, proposal did not finish within %s", maxWait)
}
}

func TestSplitMsgApps(t *testing.T) {
defer leaktest.AfterTest(t)()

Expand Down