From c18156317ddefe552f88db6f3c9f19e4d6bddcc1 Mon Sep 17 00:00:00 2001 From: Nathan VanBenschoten Date: Mon, 30 Jan 2023 22:56:05 -0500 Subject: [PATCH] kv: stop limiting AckCommittedEntriesBeforeApplication to Raft log's previous LastIndex This commit removes logic in `apply.Task.AckCommittedEntriesBeforeApplication` which was ensuring that we don't acknowledge committed Raft log entries before they were durable in a node's local Raft log. This is now ensured inside of the etcd/raft library when AsyncStorageWrites is enabled, as the comment added here describes. --- pkg/kv/kvserver/apply/doc_test.go | 2 +- pkg/kv/kvserver/apply/task.go | 13 ++--------- pkg/kv/kvserver/apply/task_test.go | 36 +----------------------------- pkg/kv/kvserver/replica_raft.go | 22 +++++++++++------- 4 files changed, 18 insertions(+), 55 deletions(-) diff --git a/pkg/kv/kvserver/apply/doc_test.go b/pkg/kv/kvserver/apply/doc_test.go index 6f34af877499..7258b1fd3a25 100644 --- a/pkg/kv/kvserver/apply/doc_test.go +++ b/pkg/kv/kvserver/apply/doc_test.go @@ -45,7 +45,7 @@ Setting up a batch of seven log entries: } fmt.Println("\nAckCommittedEntriesBeforeApplication:") - if err := t.AckCommittedEntriesBeforeApplication(ctx, 10 /* maxIndex */); err != nil { + if err := t.AckCommittedEntriesBeforeApplication(ctx); err != nil { panic(err) } fmt.Print(` diff --git a/pkg/kv/kvserver/apply/task.go b/pkg/kv/kvserver/apply/task.go index b6063c0de096..989258346681 100644 --- a/pkg/kv/kvserver/apply/task.go +++ b/pkg/kv/kvserver/apply/task.go @@ -197,13 +197,7 @@ func (t *Task) assertDecoded() { // it is applied. Because of this, the client can be informed of the success of // a write at this point, but we cannot release that write's latches until the // write has applied. See ProposalData.signalProposalResult/finishApplication. -// -// 4. Note that when catching up a follower that is behind, the (etcd/raft) -// leader will emit an MsgApp with a commit index that encompasses the entries -// in the MsgApp, and Ready() will expose these as present in both the Entries -// and CommittedEntries slices (i.e. append and apply). We don't ack these -// early - the caller will pass the "old" last index in. -func (t *Task) AckCommittedEntriesBeforeApplication(ctx context.Context, maxIndex uint64) error { +func (t *Task) AckCommittedEntriesBeforeApplication(ctx context.Context) error { t.assertDecoded() if !t.anyLocal { return nil // fast-path @@ -218,11 +212,8 @@ func (t *Task) AckCommittedEntriesBeforeApplication(ctx context.Context, maxInde defer iter.Close() // Collect a batch of trivial commands from the applier. Stop at the first - // non-trivial command or at the first command with an index above maxIndex. + // non-trivial command. batchIter := takeWhileCmdIter(iter, func(cmd Command) bool { - if cmd.Index() > maxIndex { - return false - } return cmd.IsTrivial() }) diff --git a/pkg/kv/kvserver/apply/task_test.go b/pkg/kv/kvserver/apply/task_test.go index a970173e524f..7927bdca5492 100644 --- a/pkg/kv/kvserver/apply/task_test.go +++ b/pkg/kv/kvserver/apply/task_test.go @@ -317,7 +317,7 @@ func TestAckCommittedEntriesBeforeApplication(t *testing.T) { appT := apply.MakeTask(sm, dec) defer appT.Close() require.NoError(t, appT.Decode(ctx, ents)) - require.NoError(t, appT.AckCommittedEntriesBeforeApplication(ctx, 10 /* maxIndex */)) + require.NoError(t, appT.AckCommittedEntriesBeforeApplication(ctx)) // Assert that the state machine was not updated. require.Equal(t, testStateMachine{}, *sm) @@ -338,40 +338,6 @@ func TestAckCommittedEntriesBeforeApplication(t *testing.T) { require.Equal(t, exp, cmd.acked) require.False(t, cmd.finished) } - - // Try again with a lower maximum log index. - appT.Close() - ents = makeEntries(5) - - dec = newTestDecoder() - dec.nonLocal[2] = true - dec.shouldReject[3] = true - - appT = apply.MakeTask(sm, dec) - require.NoError(t, appT.Decode(ctx, ents)) - require.NoError(t, appT.AckCommittedEntriesBeforeApplication(ctx, 4 /* maxIndex */)) - - // Assert that the state machine was not updated. - require.Equal(t, testStateMachine{}, *sm) - - // Assert that some commands were acknowledged early and that none were finished. - for _, cmd := range dec.cmds { - var exp bool - switch cmd.index { - case 1, 4: - exp = true // local and successful - case 2: - exp = false // remote - case 3: - exp = false // local and rejected - case 5: - exp = false // index too high - default: - t.Fatalf("unexpected index %d", cmd.index) - } - require.Equal(t, exp, cmd.acked) - require.False(t, cmd.finished) - } } func TestApplyCommittedEntriesWithErr(t *testing.T) { diff --git a/pkg/kv/kvserver/replica_raft.go b/pkg/kv/kvserver/replica_raft.go index 0a0b9b3b248c..6d306b3efb38 100644 --- a/pkg/kv/kvserver/replica_raft.go +++ b/pkg/kv/kvserver/replica_raft.go @@ -808,13 +808,19 @@ func (r *Replica) handleRaftReadyRaftMuLocked( // entries and acknowledge as many as we can trivially prove will not be // rejected beneath raft. // - // Note that the CommittedEntries slice may contain entries that are also in - // the Entries slice (to be appended in this ready pass). This can happen when - // a follower is being caught up on committed commands. We could acknowledge - // these commands early even though they aren't durably in the local raft log - // yet (since they're committed via a quorum elsewhere), but we chose to be - // conservative and avoid it by passing the last Ready cycle's `lastIndex` for - // the maxIndex argument to AckCommittedEntriesBeforeApplication. + // Note that the Entries slice in the MsgStorageApply cannot refer to entries + // that are also in the Entries slice in the MsgStorageAppend. Raft will not + // allow unstable entries to be applied when AsyncStorageWrites is enabled. + // + // If we disable AsyncStorageWrites in the future, this property will no + // longer be true, and the two slices could overlap. For example, this can + // happen when a follower is being caught up on committed commands. We could + // acknowledge these commands early even though they aren't durably in the + // local raft log yet (since they're committed via a quorum elsewhere), but + // we'd likely want to revert to an earlier version of this code that chose to + // be conservative and avoid this behavior by passing the last Ready cycle's + // `lastIndex` for a maxIndex argument to + // AckCommittedEntriesBeforeApplication. // // TODO(nvanbenschoten): this is less important with async storage writes. // Consider getting rid of it. @@ -829,7 +835,7 @@ func (r *Replica) handleRaftReadyRaftMuLocked( return stats, err } if knobs := r.store.TestingKnobs(); knobs == nil || !knobs.DisableCanAckBeforeApplication { - if err := appTask.AckCommittedEntriesBeforeApplication(ctx, state.LastIndex); err != nil { + if err := appTask.AckCommittedEntriesBeforeApplication(ctx); err != nil { return stats, err } }