From 26a87f500131883227731e76d7c4a1d7d98ebcf1 Mon Sep 17 00:00:00 2001 From: Ben Darnell Date: Sat, 16 Mar 2019 22:42:26 -0400 Subject: [PATCH] storage: Avoid multiple reproposals for lease index errors Moving lease index reproposals below raft introduced a bug in which the same proposal could be applied more than once at different lease indexes. (This was originally caught via an assertion in the handling of TruncateLog below raft). This commit adds a test and fixes the bug. Release note: None --- pkg/storage/batcheval/cmd_lease_test.go | 3 +- pkg/storage/replica_raft.go | 22 +++- pkg/storage/replica_test.go | 143 ++++++++++++++++++++++++ 3 files changed, 165 insertions(+), 3 deletions(-) diff --git a/pkg/storage/batcheval/cmd_lease_test.go b/pkg/storage/batcheval/cmd_lease_test.go index 30caf6e405de..40bba5227319 100644 --- a/pkg/storage/batcheval/cmd_lease_test.go +++ b/pkg/storage/batcheval/cmd_lease_test.go @@ -44,7 +44,8 @@ func TestLeaseTransferWithPipelinedWrite(t *testing.T) { db := tc.ServerConn(0) - for iter := 0; iter < 100; iter++ { + // More than 30 iterations is flaky under stressrace on teamcity. + for iter := 0; iter < 30; iter++ { log.Infof(ctx, "iter %d", iter) if _, err := db.ExecContext(ctx, "drop table if exists test"); err != nil { t.Fatal(err) diff --git a/pkg/storage/replica_raft.go b/pkg/storage/replica_raft.go index 6b2b77fed765..92d72203112e 100644 --- a/pkg/storage/replica_raft.go +++ b/pkg/storage/replica_raft.go @@ -1426,7 +1426,7 @@ func (r *Replica) checkForcedErrLocked( if proposedLocally { log.VEventf( ctx, 1, - "retry proposal %x: applied at lease index %d, required <= %d", + "retry proposal %x: applied at lease index %d, required < %d", proposal.idKey, leaseIndex, raftCmd.MaxLeaseIndex, ) retry = proposalIllegalLeaseIndex @@ -2070,10 +2070,28 @@ func (r *Replica) processRaftCommand( // // It is not intended for use elsewhere and is only a top-level // function so that it can avoid the below_raft_protos check. Returns -// true if the command was successfully reproposed. +// true if the command has been successfully reproposed (not +// necessarily by this method! But if this method returns true, the +// command will be in the local proposals map). func (r *Replica) tryReproposeWithNewLeaseIndex(proposal *ProposalData) bool { r.mu.Lock() defer r.mu.Unlock() + if proposal.command.MaxLeaseIndex > r.mu.state.LeaseAppliedIndex { + // If the command's MaxLeaseIndex is greater than the + // LeaseAppliedIndex, it must have already been reproposed (this + // can happen if there are multiple copies of the command in the + // logs; see TestReplicaRefreshMultiple). We must not create + // multiple copies with multiple lease indexes, so don't repropose + // it again. + // + // Note that the caller has already removed the current version of + // the proposal from the pending proposals map. We must re-add it + // since it's still pending. + log.VEventf(proposal.ctx, 2, "skipping reproposal, already reproposed at index %d", + proposal.command.MaxLeaseIndex) + r.mu.proposals[proposal.idKey] = proposal + return true + } // Some tests check for this log message in the trace. log.VEventf(proposal.ctx, 2, "retry: proposalIllegalLeaseIndex") if _, pErr := r.proposeLocked(proposal.ctx, proposal); pErr != nil { diff --git a/pkg/storage/replica_test.go b/pkg/storage/replica_test.go index c91080980535..bb4cedaafff2 100644 --- a/pkg/storage/replica_test.go +++ b/pkg/storage/replica_test.go @@ -7602,6 +7602,149 @@ func TestReplicaRefreshPendingCommandsTicks(t *testing.T) { } } +// TestReplicaRefreshMultiple tests an interaction between refreshing +// proposals after a new leader or ticks (which results in multiple +// copies in the log with the same lease index) and refreshing after +// an illegal lease index error (with a new lease index assigned). +// +// The setup here is rather artificial, but it represents something +// that can happen (very rarely) in the real world with multiple raft +// leadership transfers. +func TestReplicaRefreshMultiple(t *testing.T) { + defer leaktest.AfterTest(t)() + + ctx := context.Background() + + var filterActive int32 + var incCmdID storagebase.CmdIDKey + var incApplyCount int64 + tsc := TestStoreConfig(nil) + tsc.TestingKnobs.TestingApplyFilter = func(filterArgs storagebase.ApplyFilterArgs) (int, *roachpb.Error) { + if atomic.LoadInt32(&filterActive) != 0 && filterArgs.CmdID == incCmdID { + atomic.AddInt64(&incApplyCount, 1) + } + return 0, nil + } + var tc testContext + stopper := stop.NewStopper() + defer stopper.Stop(ctx) + tc.StartWithStoreConfig(t, stopper, tsc) + repl := tc.repl + + repDesc, err := repl.GetReplicaDescriptor() + if err != nil { + t.Fatal(err) + } + + key := roachpb.Key("a") + + // Run a few commands first: This advances the lease index, which is + // necessary for the tricks we're going to play to induce failures + // (we need to be able to subtract from the current lease index + // without going below 0). + for i := 0; i < 3; i++ { + inc := incrementArgs(key, 1) + if _, pErr := client.SendWrapped(ctx, tc.Sender(), &inc); pErr != nil { + t.Fatal(pErr) + } + } + // Sanity check the resulting value. + get := getArgs(key) + if resp, pErr := client.SendWrapped(ctx, tc.Sender(), &get); pErr != nil { + t.Fatal(pErr) + } else if x, err := resp.(*roachpb.GetResponse).Value.GetInt(); err != nil { + t.Fatalf("returned non-int: %s", err) + } else if x != 3 { + t.Fatalf("expected 3, got %d", x) + } + + // Manually propose another increment. This is the one we'll + // manipulate into failing. (the use of increment here is not + // significant. I originally wrote it this way because I thought the + // non-idempotence of increment would make it easier to test, but + // since the reproposals we're concerned with don't result in + // reevaluation it doesn't matter) + inc := incrementArgs(key, 1) + var ba roachpb.BatchRequest + ba.Add(&inc) + ba.Timestamp = tc.Clock().Now() + + incCmdID = makeIDKey() + atomic.StoreInt32(&filterActive, 1) + proposal, pErr := repl.requestToProposal(ctx, incCmdID, ba, nil, &allSpans) + if pErr != nil { + t.Fatal(pErr) + } + // Save this channel; it may get reset to nil before we read from it. + proposalDoneCh := proposal.doneCh + + // Propose the command manually with errors induced. + func() { + repl.mu.Lock() + defer repl.mu.Unlock() + ai := repl.mu.state.LeaseAppliedIndex + if ai <= 1 { + // Lease index zero is special in this test because we subtract + // from it below, so we need enough previous proposals in the + // log to ensure it doesn't go negative. + t.Fatalf("test requires LeaseAppliedIndex >= 2 at this point, have %d", ai) + } + // Manually run parts of the proposal process. Insert it, then + // tweak the lease index to ensure it will generate a retry when + // it fails. Don't call submitProposal (to simulate a dropped + // message). Then call refreshProposals twice to repropose it and + // put it in the logs twice. (submit + refresh should be + // equivalent to the double refresh used here) + // + // Note that all of this is under the same lock so there's no + // chance of it applying and exiting the pending state before we + // refresh. + proposal.command.ProposerReplica = repDesc + proposal.command.ProposerLeaseSequence = repl.mu.state.Lease.Sequence + repl.insertProposalLocked(proposal) + proposal.command.MaxLeaseIndex = ai - 1 + repl.refreshProposalsLocked(0, reasonNewLeader) + repl.refreshProposalsLocked(0, reasonNewLeader) + }() + + // Wait for our proposal to apply. The two refreshed proposals above + // will fail due to their illegal lease index. Then they'll generate + // a reproposal (in the bug that we're testing against, they'd + // *each* generate a reproposal). When this reproposal succeeds, the + // doneCh is signaled. + select { + case resp := <-proposalDoneCh: + if resp.Err != nil { + t.Fatal(resp.Err) + } + case <-time.After(5 * time.Second): + t.Fatal("timed out") + } + // In the buggy case, there's a second reproposal that we don't have + // a good way to observe, so just sleep to let it apply if it's in + // the system. + time.Sleep(10 * time.Millisecond) + + // The command applied exactly once. Note that this check would pass + // even in the buggy case, since illegal lease index proposals do + // not generate reevaluations (and increment is handled upstream of + // raft). + if resp, pErr := client.SendWrapped(ctx, tc.Sender(), &get); pErr != nil { + t.Fatal(pErr) + } else if x, err := resp.(*roachpb.GetResponse).Value.GetInt(); err != nil { + t.Fatalf("returned non-int: %s", err) + } else if x != 4 { + t.Fatalf("expected 4, got %d", x) + } + + // The real test: our apply filter can tell us whether there was a + // duplicate reproposal. (A reproposed increment isn't harmful, but + // some other commands could be) + if x := atomic.LoadInt64(&incApplyCount); x != 1 { + t.Fatalf("expected 1, got %d", x) + } +} + // TestGCWithoutThreshold validates that GCRequest only declares the threshold // keys which are subject to change, and that it does not access these keys if // it does not declare them.