Skip to content

Commit

Permalink
Merge pull request cockroachdb#94930 from tbg/fix-early-ack
Browse files Browse the repository at this point in the history
kvserver: fix bug in ephemeral app batch
  • Loading branch information
nvanbenschoten authored Jan 12, 2023
2 parents 5968c05 + d3d262b commit bd3e876
Show file tree
Hide file tree
Showing 2 changed files with 89 additions and 0 deletions.
6 changes: 6 additions & 0 deletions pkg/kv/kvserver/replica_app_batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -513,6 +513,9 @@ func (b *replicaAppBatch) stageTrivialReplicatedEvalResult(
b.state.RaftAppliedIndex = cmd.Index()
b.state.RaftAppliedIndexTerm = cmd.Term

// NB: since the command is "trivial" we know the LeaseIndex field is set to
// something meaningful if it's nonzero (e.g. cmd is not a lease request). For
// a rejected command, cmd.LeaseIndex was zeroed out earlier.
if leaseAppliedIndex := cmd.LeaseIndex; leaseAppliedIndex != 0 {
b.state.LeaseAppliedIndex = leaseAppliedIndex
}
Expand Down Expand Up @@ -765,6 +768,9 @@ func (mb *ephemeralReplicaAppBatch) Stage(
)
fr = replicaApplyTestingFilters(ctx, mb.r, cmd, fr)
cmd.ForcedErrResult = fr
if !cmd.Rejected() && cmd.LeaseIndex > mb.state.LeaseAppliedIndex {
mb.state.LeaseAppliedIndex = cmd.LeaseIndex
}

return cmd, nil
}
Expand Down
83 changes: 83 additions & 0 deletions pkg/kv/kvserver/replica_application_state_machine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,12 @@ import (
"context"
"testing"

"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/apply"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/raftlog"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/storage"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
Expand Down Expand Up @@ -395,3 +397,84 @@ func TestReplicaStateMachineRaftLogTruncationLooselyCoupled(t *testing.T) {
})
})
}

// TestReplicaStateMachineEphemeralAppBatchRejection is a regression test for
// #94409. It verifies that if two commands are in an ephemeral batch but the
// first command's MaxLeaseIndex prevents the second command from succeeding, we
// don't accidentally ack the second command.
func TestReplicaStateMachineEphemeralAppBatchRejection(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
tc := testContext{}
ctx := context.Background()
stopper := stop.NewStopper()
defer stopper.Stop(ctx)
tc.Start(ctx, t, stopper)

// Lock the replica for the entire test.
r := tc.repl
r.raftMu.Lock()
defer r.raftMu.Unlock()
// Avoid additional raft processing after we're done with this replica because
// we've applied entries that aren't in the log.
defer r.mu.destroyStatus.Set(errors.New("boom"), destroyReasonRemoved)

sm := r.getStateMachine()

r.mu.Lock()
raftAppliedIndex := r.mu.state.RaftAppliedIndex
r.mu.Unlock()

descWriteRepr := func(v string) (roachpb.Request, []byte) {
b := tc.store.Engine().NewBatch()
defer b.Close()
key := keys.LocalMax
val := roachpb.MakeValueFromString("hello")
require.NoError(t, b.PutMVCC(storage.MVCCKey{
Timestamp: tc.Clock().Now(),
Key: key,
}, storage.MVCCValue{
Value: val,
}))
return roachpb.NewPut(key, val), b.Repr()
}

// Make two commands that have the same MaxLeaseIndex. They'll go
// into the same ephemeral batch and we expect that batch to accept
// the first command and reject the second.
var cmds []*replicatedCmd
for _, s := range []string{"earlier", "later"} {
req, repr := descWriteRepr(s)
ent := &raftlog.Entry{
Entry: raftpb.Entry{
Index: raftAppliedIndex + 1,
Type: raftpb.EntryNormal,
},
ID: makeIDKey(),
Cmd: kvserverpb.RaftCommand{
ProposerLeaseSequence: r.mu.state.Lease.Sequence,
MaxLeaseIndex: r.mu.state.LeaseAppliedIndex + 1,
WriteBatch: &kvserverpb.WriteBatch{Data: repr},
},
}
var ba roachpb.BatchRequest
ba.Add(req)
cmd := &replicatedCmd{
ctx: ctx,
ReplicatedCmd: raftlog.ReplicatedCmd{Entry: ent},
proposal: &ProposalData{Request: &ba},
}
require.True(t, cmd.CanAckBeforeApplication())
cmds = append(cmds, cmd)
}

var rejs []bool
b := sm.NewEphemeralBatch()
for _, cmd := range cmds {
checkedCmd, err := b.Stage(cmd.ctx, cmd)
require.NoError(t, err)
rejs = append(rejs, checkedCmd.Rejected())
}
b.Close()
require.Equal(t, []bool{false, true}, rejs)
}

0 comments on commit bd3e876

Please sign in to comment.