diff --git a/pkg/storage/batcheval/cmd_lease_test.go b/pkg/storage/batcheval/cmd_lease_test.go index 30caf6e405de..40bba5227319 100644 --- a/pkg/storage/batcheval/cmd_lease_test.go +++ b/pkg/storage/batcheval/cmd_lease_test.go @@ -44,7 +44,8 @@ func TestLeaseTransferWithPipelinedWrite(t *testing.T) { db := tc.ServerConn(0) - for iter := 0; iter < 100; iter++ { + // More than 30 iterations is flaky under stressrace on teamcity. + for iter := 0; iter < 30; iter++ { log.Infof(ctx, "iter %d", iter) if _, err := db.ExecContext(ctx, "drop table if exists test"); err != nil { t.Fatal(err) diff --git a/pkg/storage/replica_raft.go b/pkg/storage/replica_raft.go index 6b2b77fed765..92d72203112e 100644 --- a/pkg/storage/replica_raft.go +++ b/pkg/storage/replica_raft.go @@ -1426,7 +1426,7 @@ func (r *Replica) checkForcedErrLocked( if proposedLocally { log.VEventf( ctx, 1, - "retry proposal %x: applied at lease index %d, required <= %d", + "retry proposal %x: applied at lease index %d, required < %d", proposal.idKey, leaseIndex, raftCmd.MaxLeaseIndex, ) retry = proposalIllegalLeaseIndex @@ -2070,10 +2070,28 @@ func (r *Replica) processRaftCommand( // // It is not intended for use elsewhere and is only a top-level // function so that it can avoid the below_raft_protos check. Returns -// true if the command was successfully reproposed. +// true if the command has been successfully reproposed (not +// necessarily by this method! But if this method returns true, the +// command will be in the local proposals map). func (r *Replica) tryReproposeWithNewLeaseIndex(proposal *ProposalData) bool { r.mu.Lock() defer r.mu.Unlock() + if proposal.command.MaxLeaseIndex > r.mu.state.LeaseAppliedIndex { + // If the command's MaxLeaseIndex is greater than the + // LeaseAppliedIndex, it must have already been reproposed (this + // can happen if there are multiple copies of the command in the + // logs; see TestReplicaRefreshMultiple). We must not create + // multiple copies with multiple lease indexes, so don't repropose + // it again. + // + // Note that the caller has already removed the current version of + // the proposal from the pending proposals map. We must re-add it + // since it's still pending. + log.VEventf(proposal.ctx, 2, "skipping reproposal, already reproposed at index %d", + proposal.command.MaxLeaseIndex) + r.mu.proposals[proposal.idKey] = proposal + return true + } // Some tests check for this log message in the trace. log.VEventf(proposal.ctx, 2, "retry: proposalIllegalLeaseIndex") if _, pErr := r.proposeLocked(proposal.ctx, proposal); pErr != nil { diff --git a/pkg/storage/replica_test.go b/pkg/storage/replica_test.go index c91080980535..bb4cedaafff2 100644 --- a/pkg/storage/replica_test.go +++ b/pkg/storage/replica_test.go @@ -7602,6 +7602,149 @@ func TestReplicaRefreshPendingCommandsTicks(t *testing.T) { } } +// TestReplicaRefreshMultiple tests an interaction between refreshing +// proposals after a new leader or ticks (which results in multiple +// copies in the log with the same lease index) and refreshing after +// an illegal lease index error (with a new lease index assigned). +// +// The setup here is rather artificial, but it represents something +// that can happen (very rarely) in the real world with multiple raft +// leadership transfers. +func TestReplicaRefreshMultiple(t *testing.T) { + defer leaktest.AfterTest(t)() + + ctx := context.Background() + + var filterActive int32 + var incCmdID storagebase.CmdIDKey + var incApplyCount int64 + tsc := TestStoreConfig(nil) + tsc.TestingKnobs.TestingApplyFilter = func(filterArgs storagebase.ApplyFilterArgs) (int, *roachpb.Error) { + if atomic.LoadInt32(&filterActive) != 0 && filterArgs.CmdID == incCmdID { + atomic.AddInt64(&incApplyCount, 1) + } + return 0, nil + } + var tc testContext + stopper := stop.NewStopper() + defer stopper.Stop(ctx) + tc.StartWithStoreConfig(t, stopper, tsc) + repl := tc.repl + + repDesc, err := repl.GetReplicaDescriptor() + if err != nil { + t.Fatal(err) + } + + key := roachpb.Key("a") + + // Run a few commands first: This advances the lease index, which is + // necessary for the tricks we're going to play to induce failures + // (we need to be able to subtract from the current lease index + // without going below 0). + for i := 0; i < 3; i++ { + inc := incrementArgs(key, 1) + if _, pErr := client.SendWrapped(ctx, tc.Sender(), &inc); pErr != nil { + t.Fatal(pErr) + } + } + // Sanity check the resulting value. + get := getArgs(key) + if resp, pErr := client.SendWrapped(ctx, tc.Sender(), &get); pErr != nil { + t.Fatal(pErr) + } else if x, err := resp.(*roachpb.GetResponse).Value.GetInt(); err != nil { + t.Fatalf("returned non-int: %s", err) + } else if x != 3 { + t.Fatalf("expected 3, got %d", x) + } + + // Manually propose another increment. This is the one we'll + // manipulate into failing. (the use of increment here is not + // significant. I originally wrote it this way because I thought the + // non-idempotence of increment would make it easier to test, but + // since the reproposals we're concerned with don't result in + // reevaluation it doesn't matter) + inc := incrementArgs(key, 1) + var ba roachpb.BatchRequest + ba.Add(&inc) + ba.Timestamp = tc.Clock().Now() + + incCmdID = makeIDKey() + atomic.StoreInt32(&filterActive, 1) + proposal, pErr := repl.requestToProposal(ctx, incCmdID, ba, nil, &allSpans) + if pErr != nil { + t.Fatal(pErr) + } + // Save this channel; it may get reset to nil before we read from it. + proposalDoneCh := proposal.doneCh + + // Propose the command manually with errors induced. + func() { + repl.mu.Lock() + defer repl.mu.Unlock() + ai := repl.mu.state.LeaseAppliedIndex + if ai <= 1 { + // Lease index zero is special in this test because we subtract + // from it below, so we need enough previous proposals in the + // log to ensure it doesn't go negative. + t.Fatalf("test requires LeaseAppliedIndex >= 2 at this point, have %d", ai) + } + // Manually run parts of the proposal process. Insert it, then + // tweak the lease index to ensure it will generate a retry when + // it fails. Don't call submitProposal (to simulate a dropped + // message). Then call refreshProposals twice to repropose it and + // put it in the logs twice. (submit + refresh should be + // equivalent to the double refresh used here) + // + // Note that all of this is under the same lock so there's no + // chance of it applying and exiting the pending state before we + // refresh. + proposal.command.ProposerReplica = repDesc + proposal.command.ProposerLeaseSequence = repl.mu.state.Lease.Sequence + repl.insertProposalLocked(proposal) + proposal.command.MaxLeaseIndex = ai - 1 + repl.refreshProposalsLocked(0, reasonNewLeader) + repl.refreshProposalsLocked(0, reasonNewLeader) + }() + + // Wait for our proposal to apply. The two refreshed proposals above + // will fail due to their illegal lease index. Then they'll generate + // a reproposal (in the bug that we're testing against, they'd + // *each* generate a reproposal). When this reproposal succeeds, the + // doneCh is signaled. + select { + case resp := <-proposalDoneCh: + if resp.Err != nil { + t.Fatal(resp.Err) + } + case <-time.After(5 * time.Second): + t.Fatal("timed out") + } + // In the buggy case, there's a second reproposal that we don't have + // a good way to observe, so just sleep to let it apply if it's in + // the system. + time.Sleep(10 * time.Millisecond) + + // The command applied exactly once. Note that this check would pass + // even in the buggy case, since illegal lease index proposals do + // not generate reevaluations (and increment is handled upstream of + // raft). + if resp, pErr := client.SendWrapped(ctx, tc.Sender(), &get); pErr != nil { + t.Fatal(pErr) + } else if x, err := resp.(*roachpb.GetResponse).Value.GetInt(); err != nil { + t.Fatalf("returned non-int: %s", err) + } else if x != 4 { + t.Fatalf("expected 4, got %d", x) + } + + // The real test: our apply filter can tell us whether there was a + // duplicate reproposal. (A reproposed increment isn't harmful, but + // some other commands could be) + if x := atomic.LoadInt64(&incApplyCount); x != 1 { + t.Fatalf("expected 1, got %d", x) + } +} + // TestGCWithoutThreshold validates that GCRequest only declares the threshold // keys which are subject to change, and that it does not access these keys if // it does not declare them.