Skip to content

Commit

Permalink
storage: Avoid multiple reproposals for lease index errors
Browse files Browse the repository at this point in the history
Moving lease index reproposals below raft introduced a bug in which
the same proposal could be applied more than once at different lease
indexes. (This was originally caught via an assertion in the handling
of TruncateLog below raft). This commit adds a test and fixes the bug.

Release note: None
  • Loading branch information
bdarnell committed Mar 18, 2019
1 parent a854c8d commit f7dc847
Show file tree
Hide file tree
Showing 3 changed files with 168 additions and 3 deletions.
3 changes: 2 additions & 1 deletion pkg/storage/batcheval/cmd_lease_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,8 @@ func TestLeaseTransferWithPipelinedWrite(t *testing.T) {

db := tc.ServerConn(0)

for iter := 0; iter < 100; iter++ {
// More than 30 iterations is flaky under stressrace on teamcity.
for iter := 0; iter < 30; iter++ {
log.Infof(ctx, "iter %d", iter)
if _, err := db.ExecContext(ctx, "drop table if exists test"); err != nil {
t.Fatal(err)
Expand Down
25 changes: 23 additions & 2 deletions pkg/storage/replica_raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -1426,7 +1426,7 @@ func (r *Replica) checkForcedErrLocked(
if proposedLocally {
log.VEventf(
ctx, 1,
"retry proposal %x: applied at lease index %d, required <= %d",
"retry proposal %x: applied at lease index %d, required < %d",
proposal.idKey, leaseIndex, raftCmd.MaxLeaseIndex,
)
retry = proposalIllegalLeaseIndex
Expand Down Expand Up @@ -2070,10 +2070,31 @@ func (r *Replica) processRaftCommand(
//
// It is not intended for use elsewhere and is only a top-level
// function so that it can avoid the below_raft_protos check. Returns
// true if the command was successfully reproposed.
// true if the command has been successfully reproposed (not
// necessarily by this method! But if this method returns true, the
// command will be in the local proposals map).
func (r *Replica) tryReproposeWithNewLeaseIndex(proposal *ProposalData) bool {
r.mu.Lock()
defer r.mu.Unlock()
// Note that we don't need to validate anything about the proposal's
// lease here - if we got this far, we know that everything but the
// index is valid at this point in the log.
if proposal.command.MaxLeaseIndex > r.mu.state.LeaseAppliedIndex {
// If the command's MaxLeaseIndex is greater than the
// LeaseAppliedIndex, it must have already been reproposed (this
// can happen if there are multiple copies of the command in the
// logs; see TestReplicaRefreshMultiple). We must not create
// multiple copies with multiple lease indexes, so don't repropose
// it again.
//
// Note that the caller has already removed the current version of
// the proposal from the pending proposals map. We must re-add it
// since it's still pending.
log.VEventf(proposal.ctx, 2, "skipping reproposal, already reproposed at index %d",
proposal.command.MaxLeaseIndex)
r.mu.proposals[proposal.idKey] = proposal
return true
}
// Some tests check for this log message in the trace.
log.VEventf(proposal.ctx, 2, "retry: proposalIllegalLeaseIndex")
if _, pErr := r.proposeLocked(proposal.ctx, proposal); pErr != nil {
Expand Down
143 changes: 143 additions & 0 deletions pkg/storage/replica_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7602,6 +7602,149 @@ func TestReplicaRefreshPendingCommandsTicks(t *testing.T) {
}
}

// TestReplicaRefreshMultiple tests an interaction between refreshing
// proposals after a new leader or ticks (which results in multiple
// copies in the log with the same lease index) and refreshing after
// an illegal lease index error (with a new lease index assigned).
//
// The setup here is rather artificial, but it represents something
// that can happen (very rarely) in the real world with multiple raft
// leadership transfers.
func TestReplicaRefreshMultiple(t *testing.T) {
defer leaktest.AfterTest(t)()

ctx := context.Background()

var filterActive int32
var incCmdID storagebase.CmdIDKey
var incApplyCount int64
tsc := TestStoreConfig(nil)
tsc.TestingKnobs.TestingApplyFilter = func(filterArgs storagebase.ApplyFilterArgs) (int, *roachpb.Error) {
if atomic.LoadInt32(&filterActive) != 0 && filterArgs.CmdID == incCmdID {
atomic.AddInt64(&incApplyCount, 1)
}
return 0, nil
}
var tc testContext
stopper := stop.NewStopper()
defer stopper.Stop(ctx)
tc.StartWithStoreConfig(t, stopper, tsc)
repl := tc.repl

repDesc, err := repl.GetReplicaDescriptor()
if err != nil {
t.Fatal(err)
}

key := roachpb.Key("a")

// Run a few commands first: This advances the lease index, which is
// necessary for the tricks we're going to play to induce failures
// (we need to be able to subtract from the current lease index
// without going below 0).
for i := 0; i < 3; i++ {
inc := incrementArgs(key, 1)
if _, pErr := client.SendWrapped(ctx, tc.Sender(), &inc); pErr != nil {
t.Fatal(pErr)
}
}
// Sanity check the resulting value.
get := getArgs(key)
if resp, pErr := client.SendWrapped(ctx, tc.Sender(), &get); pErr != nil {
t.Fatal(pErr)
} else if x, err := resp.(*roachpb.GetResponse).Value.GetInt(); err != nil {
t.Fatalf("returned non-int: %s", err)
} else if x != 3 {
t.Fatalf("expected 3, got %d", x)
}

// Manually propose another increment. This is the one we'll
// manipulate into failing. (the use of increment here is not
// significant. I originally wrote it this way because I thought the
// non-idempotence of increment would make it easier to test, but
// since the reproposals we're concerned with don't result in
// reevaluation it doesn't matter)
inc := incrementArgs(key, 1)
var ba roachpb.BatchRequest
ba.Add(&inc)
ba.Timestamp = tc.Clock().Now()

incCmdID = makeIDKey()
atomic.StoreInt32(&filterActive, 1)
proposal, pErr := repl.requestToProposal(ctx, incCmdID, ba, nil, &allSpans)
if pErr != nil {
t.Fatal(pErr)
}
// Save this channel; it may get reset to nil before we read from it.
proposalDoneCh := proposal.doneCh

// Propose the command manually with errors induced.
func() {
repl.mu.Lock()
defer repl.mu.Unlock()
ai := repl.mu.state.LeaseAppliedIndex
if ai <= 1 {
// Lease index zero is special in this test because we subtract
// from it below, so we need enough previous proposals in the
// log to ensure it doesn't go negative.
t.Fatalf("test requires LeaseAppliedIndex >= 2 at this point, have %d", ai)
}
// Manually run parts of the proposal process. Insert it, then
// tweak the lease index to ensure it will generate a retry when
// it fails. Don't call submitProposal (to simulate a dropped
// message). Then call refreshProposals twice to repropose it and
// put it in the logs twice. (submit + refresh should be
// equivalent to the double refresh used here)
//
// Note that all of this is under the same lock so there's no
// chance of it applying and exiting the pending state before we
// refresh.
proposal.command.ProposerReplica = repDesc
proposal.command.ProposerLeaseSequence = repl.mu.state.Lease.Sequence
repl.insertProposalLocked(proposal)
proposal.command.MaxLeaseIndex = ai - 1
repl.refreshProposalsLocked(0, reasonNewLeader)
repl.refreshProposalsLocked(0, reasonNewLeader)
}()

// Wait for our proposal to apply. The two refreshed proposals above
// will fail due to their illegal lease index. Then they'll generate
// a reproposal (in the bug that we're testing against, they'd
// *each* generate a reproposal). When this reproposal succeeds, the
// doneCh is signaled.
select {
case resp := <-proposalDoneCh:
if resp.Err != nil {
t.Fatal(resp.Err)
}
case <-time.After(5 * time.Second):
t.Fatal("timed out")
}
// In the buggy case, there's a second reproposal that we don't have
// a good way to observe, so just sleep to let it apply if it's in
// the system.
time.Sleep(10 * time.Millisecond)

// The command applied exactly once. Note that this check would pass
// even in the buggy case, since illegal lease index proposals do
// not generate reevaluations (and increment is handled upstream of
// raft).
if resp, pErr := client.SendWrapped(ctx, tc.Sender(), &get); pErr != nil {
t.Fatal(pErr)
} else if x, err := resp.(*roachpb.GetResponse).Value.GetInt(); err != nil {
t.Fatalf("returned non-int: %s", err)
} else if x != 4 {
t.Fatalf("expected 4, got %d", x)
}

// The real test: our apply filter can tell us whether there was a
// duplicate reproposal. (A reproposed increment isn't harmful, but
// some other commands could be)
if x := atomic.LoadInt64(&incApplyCount); x != 1 {
t.Fatalf("expected 1, got %d", x)
}
}

// TestGCWithoutThreshold validates that GCRequest only declares the threshold
// keys which are subject to change, and that it does not access these keys if
// it does not declare them.
Expand Down

0 comments on commit f7dc847

Please sign in to comment.