Skip to content

Commit

Permalink
storage: deflake TestReplicaBurstPendingCommandsAndRepropose
Browse files Browse the repository at this point in the history
Fixes #38615.

The test was guessing at the min lease applied index that
a proposal would be assigned instead of using the index
returned from evalAndPropose.

Release note: None
  • Loading branch information
nvanbenschoten committed Jul 16, 2019
1 parent 5dfa38c commit 9558144
Showing 1 changed file with 47 additions and 46 deletions.
93 changes: 47 additions & 46 deletions pkg/storage/replica_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7478,17 +7478,22 @@ func TestReplicaBurstPendingCommandsAndRepropose(t *testing.T) {
defer leaktest.AfterTest(t)()

var tc testContext
cfg := TestStoreConfig(nil)
// Disable reasonNewLeader and reasonNewLeaderOrConfigChange proposal
// refreshes so that our proposals don't risk being reproposed due to
// Raft leadership instability.
cfg.TestingKnobs.DisableRefreshReasonNewLeader = true
cfg.TestingKnobs.DisableRefreshReasonNewLeaderOrConfigChange = true
stopper := stop.NewStopper()
defer stopper.Stop(context.TODO())
tc.Start(t, stopper)
tc.StartWithStoreConfig(t, stopper, cfg)

type magicKey struct{}
ctx := context.WithValue(context.Background(), magicKey{}, "foo")

var seenCmds []int
dropAll := int32(1)
tc.repl.mu.Lock()
lease := *tc.repl.mu.state.Lease
tc.repl.mu.proposalBuf.testing.submitProposalFilter = func(p *ProposalData) (drop bool, _ error) {
if atomic.LoadInt32(&dropAll) == 1 {
return true, nil
Expand All @@ -7498,59 +7503,55 @@ func TestReplicaBurstPendingCommandsAndRepropose(t *testing.T) {
}
return false, nil
}
lease := *tc.repl.mu.state.Lease
tc.repl.mu.Unlock()

const num = 10
expIndexes := make([]int, 0, num)
chs := func() []chan proposalResult {
chs := make([]chan proposalResult, 0, num)

for i := 0; i < num; i++ {
expIndexes = append(expIndexes, i+1)
var ba roachpb.BatchRequest
ba.Timestamp = tc.Clock().Now()
ba.Add(&roachpb.PutRequest{
RequestHeader: roachpb.RequestHeader{
Key: roachpb.Key(fmt.Sprintf("k%d", i)),
},
})
ch, _, _, err := tc.repl.evalAndPropose(ctx, lease, &ba, &allSpans, endCmds{})
if err != nil {
t.Fatal(err)
}
chs = append(chs, ch)
tc.repl.mu.Lock()
if err := tc.repl.mu.proposalBuf.flushLocked(); err != nil {
t.Fatal(err)
}
tc.repl.mu.Unlock()
chs := make([]chan proposalResult, 0, num)
for i := 0; i < num; i++ {
var ba roachpb.BatchRequest
ba.Timestamp = tc.Clock().Now()
ba.Add(&roachpb.PutRequest{
RequestHeader: roachpb.RequestHeader{
Key: roachpb.Key(fmt.Sprintf("k%d", i)),
},
})
ch, _, idx, err := tc.repl.evalAndPropose(ctx, lease, &ba, &allSpans, endCmds{})
if err != nil {
t.Fatal(err)
}
chs = append(chs, ch)
expIndexes = append(expIndexes, int(idx))
}

tc.repl.mu.Lock()
origIndexes := make([]int, 0, num)
for _, p := range tc.repl.mu.proposals {
if v := p.ctx.Value(magicKey{}); v != nil {
origIndexes = append(origIndexes, int(p.command.MaxLeaseIndex))
}
tc.repl.mu.Lock()
if err := tc.repl.mu.proposalBuf.flushLocked(); err != nil {
t.Fatal(err)
}
origIndexes := make([]int, 0, num)
for _, p := range tc.repl.mu.proposals {
if v := p.ctx.Value(magicKey{}); v != nil {
origIndexes = append(origIndexes, int(p.command.MaxLeaseIndex))
}
sort.Ints(origIndexes)
tc.repl.mu.Unlock()
}
sort.Ints(origIndexes)
tc.repl.mu.Unlock()

if !reflect.DeepEqual(expIndexes, origIndexes) {
t.Fatalf("wanted required indexes %v, got %v", expIndexes, origIndexes)
}
if !reflect.DeepEqual(expIndexes, origIndexes) {
t.Fatalf("wanted required indexes %v, got %v", expIndexes, origIndexes)
}

tc.repl.raftMu.Lock()
tc.repl.mu.Lock()
atomic.StoreInt32(&dropAll, 0)
tc.repl.refreshProposalsLocked(0, reasonTicks)
if err := tc.repl.mu.proposalBuf.flushLocked(); err != nil {
t.Fatal(err)
}
tc.repl.mu.Unlock()
tc.repl.raftMu.Unlock()

tc.repl.raftMu.Lock()
tc.repl.mu.Lock()
atomic.StoreInt32(&dropAll, 0)
tc.repl.refreshProposalsLocked(0, reasonTicks)
if err := tc.repl.mu.proposalBuf.flushLocked(); err != nil {
t.Fatal(err)
}
tc.repl.mu.Unlock()
tc.repl.raftMu.Unlock()
return chs
}()
for _, ch := range chs {
if pErr := (<-ch).Err; pErr != nil {
t.Fatal(pErr)
Expand Down

0 comments on commit 9558144

Please sign in to comment.