Skip to content

Commit

Permalink
kv: stop limiting AckCommittedEntriesBeforeApplication to Raft log's …
Browse files Browse the repository at this point in the history
…previous LastIndex

This commit removes logic in `apply.Task.AckCommittedEntriesBeforeApplication`
which was ensuring that we don't acknowledge committed Raft log entries before
they were durable in a node's local Raft log. This is now ensured inside of the
etcd/raft library when AsyncStorageWrites is enabled, as the comment added here
describes.
  • Loading branch information
nvanbenschoten committed Feb 2, 2023
1 parent 702ff6f commit c181563
Show file tree
Hide file tree
Showing 4 changed files with 18 additions and 55 deletions.
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/apply/doc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ Setting up a batch of seven log entries:
}

fmt.Println("\nAckCommittedEntriesBeforeApplication:")
if err := t.AckCommittedEntriesBeforeApplication(ctx, 10 /* maxIndex */); err != nil {
if err := t.AckCommittedEntriesBeforeApplication(ctx); err != nil {
panic(err)
}
fmt.Print(`
Expand Down
13 changes: 2 additions & 11 deletions pkg/kv/kvserver/apply/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,13 +197,7 @@ func (t *Task) assertDecoded() {
// it is applied. Because of this, the client can be informed of the success of
// a write at this point, but we cannot release that write's latches until the
// write has applied. See ProposalData.signalProposalResult/finishApplication.
//
// 4. Note that when catching up a follower that is behind, the (etcd/raft)
// leader will emit an MsgApp with a commit index that encompasses the entries
// in the MsgApp, and Ready() will expose these as present in both the Entries
// and CommittedEntries slices (i.e. append and apply). We don't ack these
// early - the caller will pass the "old" last index in.
func (t *Task) AckCommittedEntriesBeforeApplication(ctx context.Context, maxIndex uint64) error {
func (t *Task) AckCommittedEntriesBeforeApplication(ctx context.Context) error {
t.assertDecoded()
if !t.anyLocal {
return nil // fast-path
Expand All @@ -218,11 +212,8 @@ func (t *Task) AckCommittedEntriesBeforeApplication(ctx context.Context, maxInde
defer iter.Close()

// Collect a batch of trivial commands from the applier. Stop at the first
// non-trivial command or at the first command with an index above maxIndex.
// non-trivial command.
batchIter := takeWhileCmdIter(iter, func(cmd Command) bool {
if cmd.Index() > maxIndex {
return false
}
return cmd.IsTrivial()
})

Expand Down
36 changes: 1 addition & 35 deletions pkg/kv/kvserver/apply/task_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -317,7 +317,7 @@ func TestAckCommittedEntriesBeforeApplication(t *testing.T) {
appT := apply.MakeTask(sm, dec)
defer appT.Close()
require.NoError(t, appT.Decode(ctx, ents))
require.NoError(t, appT.AckCommittedEntriesBeforeApplication(ctx, 10 /* maxIndex */))
require.NoError(t, appT.AckCommittedEntriesBeforeApplication(ctx))

// Assert that the state machine was not updated.
require.Equal(t, testStateMachine{}, *sm)
Expand All @@ -338,40 +338,6 @@ func TestAckCommittedEntriesBeforeApplication(t *testing.T) {
require.Equal(t, exp, cmd.acked)
require.False(t, cmd.finished)
}

// Try again with a lower maximum log index.
appT.Close()
ents = makeEntries(5)

dec = newTestDecoder()
dec.nonLocal[2] = true
dec.shouldReject[3] = true

appT = apply.MakeTask(sm, dec)
require.NoError(t, appT.Decode(ctx, ents))
require.NoError(t, appT.AckCommittedEntriesBeforeApplication(ctx, 4 /* maxIndex */))

// Assert that the state machine was not updated.
require.Equal(t, testStateMachine{}, *sm)

// Assert that some commands were acknowledged early and that none were finished.
for _, cmd := range dec.cmds {
var exp bool
switch cmd.index {
case 1, 4:
exp = true // local and successful
case 2:
exp = false // remote
case 3:
exp = false // local and rejected
case 5:
exp = false // index too high
default:
t.Fatalf("unexpected index %d", cmd.index)
}
require.Equal(t, exp, cmd.acked)
require.False(t, cmd.finished)
}
}

func TestApplyCommittedEntriesWithErr(t *testing.T) {
Expand Down
22 changes: 14 additions & 8 deletions pkg/kv/kvserver/replica_raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -808,13 +808,19 @@ func (r *Replica) handleRaftReadyRaftMuLocked(
// entries and acknowledge as many as we can trivially prove will not be
// rejected beneath raft.
//
// Note that the CommittedEntries slice may contain entries that are also in
// the Entries slice (to be appended in this ready pass). This can happen when
// a follower is being caught up on committed commands. We could acknowledge
// these commands early even though they aren't durably in the local raft log
// yet (since they're committed via a quorum elsewhere), but we chose to be
// conservative and avoid it by passing the last Ready cycle's `lastIndex` for
// the maxIndex argument to AckCommittedEntriesBeforeApplication.
// Note that the Entries slice in the MsgStorageApply cannot refer to entries
// that are also in the Entries slice in the MsgStorageAppend. Raft will not
// allow unstable entries to be applied when AsyncStorageWrites is enabled.
//
// If we disable AsyncStorageWrites in the future, this property will no
// longer be true, and the two slices could overlap. For example, this can
// happen when a follower is being caught up on committed commands. We could
// acknowledge these commands early even though they aren't durably in the
// local raft log yet (since they're committed via a quorum elsewhere), but
// we'd likely want to revert to an earlier version of this code that chose to
// be conservative and avoid this behavior by passing the last Ready cycle's
// `lastIndex` for a maxIndex argument to
// AckCommittedEntriesBeforeApplication.
//
// TODO(nvanbenschoten): this is less important with async storage writes.
// Consider getting rid of it.
Expand All @@ -829,7 +835,7 @@ func (r *Replica) handleRaftReadyRaftMuLocked(
return stats, err
}
if knobs := r.store.TestingKnobs(); knobs == nil || !knobs.DisableCanAckBeforeApplication {
if err := appTask.AckCommittedEntriesBeforeApplication(ctx, state.LastIndex); err != nil {
if err := appTask.AckCommittedEntriesBeforeApplication(ctx); err != nil {
return stats, err
}
}
Expand Down

0 comments on commit c181563

Please sign in to comment.