Skip to content

Commit

Permalink
storage: improve testing of command cancellation with local spans
Browse files Browse the repository at this point in the history
  • Loading branch information
nvanbenschoten committed Aug 28, 2017
1 parent 3d0a813 commit 01d3d51
Showing 1 changed file with 102 additions and 16 deletions.
118 changes: 102 additions & 16 deletions pkg/storage/replica_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2425,37 +2425,47 @@ func TestReplicaCommandQueueCancellationRandom(t *testing.T) {
}
}

func TestReplicaCommandQueueCancellation16266(t *testing.T) {
// TestReplicaCommandQueueCancellationLocal tests various cancellation scenerios
// surrounding EndTxnReqs, PushTxnReqs, and HeartbeatTxnReqs. One of these
// scenerios was the cause of #16266.
func TestReplicaCommandQueueCancellationLocal(t *testing.T) {
defer leaktest.AfterTest(t)()

var instrs []cancelInstr
now := hlc.Timestamp{WallTime: cmdQCancelTestTimestamp}
txn := roachpb.MakeTransaction("txn", roachpb.Key("foobar"), 0, enginepb.SERIALIZABLE, now, 0)
intentKey := roachpb.Key("baz")

newBa := func(withTxn bool) *roachpb.BatchRequest {
ba := roachpb.BatchRequest{}
ba.Timestamp = now
if withTxn {
ba.Txn = &txn
}
return &ba
}

// Used only to block the other requests. Not a necessary part of the
// scenerio. See the comment on RunWithoutInitialSpan.
heartbeatBa := roachpb.BatchRequest{}
heartbeatBa.Timestamp = now
heartbeatBa.Txn = &txn
heartbeatBa := newBa(true /* withTxn */)
heartbeatBa.Add(&roachpb.HeartbeatTxnRequest{
Span: roachpb.Span{
Key: txn.Key,
},
Now: now,
})

endTxnBa := roachpb.BatchRequest{}
endTxnBa.Timestamp = now
endTxnBa.Txn = &txn
endTxnBa := newBa(true /* withTxn */)
endTxnBa.Add(&roachpb.EndTransactionRequest{
Span: roachpb.Span{
Key: txn.Key,
},
Commit: true,
IntentSpans: []roachpb.Span{
{Key: intentKey},
},
})

pushBa := roachpb.BatchRequest{}
pushBa.Timestamp = now
pushBa := newBa(false /* withTxn */)
pushBa.Add(&roachpb.PushTxnRequest{
Span: roachpb.Span{
Key: txn.Key,
Expand All @@ -2466,12 +2476,88 @@ func TestReplicaCommandQueueCancellation16266(t *testing.T) {
Force: true,
})

ct := newCmdQCancelTest(t)
instrs = append(instrs, cancelInstr{reqOverride: &heartbeatBa, expErr: "record not present"})
instrs = append(instrs, cancelInstr{reqOverride: &endTxnBa, expErr: "does not exist"})
instrs = append(instrs, cancelInstr{reqOverride: &pushBa})
instrs = append(instrs, cancelInstr{reqOverride: &pushBa})
ct.RunWithoutInitialSpan(instrs, []int{2})
resolveIntentBa := newBa(false /* withTxn */)
resolveIntentBa.Add(&roachpb.ResolveIntentRequest{
Span: roachpb.Span{
Key: intentKey,
},
IntentTxn: txn.TxnMeta,
Status: roachpb.PENDING,
})

getKeyBa := newBa(true /* withTxn */)
getKeyBa.Add(&roachpb.GetRequest{
Span: roachpb.Span{
Key: intentKey,
},
})

putKeyBa := newBa(true /* withTxn */)
putKeyBa.Add(&roachpb.PutRequest{
Span: roachpb.Span{
Key: intentKey,
},
Value: roachpb.MakeValueFromBytes([]byte("val")),
})

t.Run("16266", func(t *testing.T) {
instrs := []cancelInstr{
{reqOverride: heartbeatBa, expErr: "record not present"},
{reqOverride: endTxnBa, expErr: "does not exist"},
{reqOverride: pushBa},
{reqOverride: pushBa},
}

ct := newCmdQCancelTest(t)
ct.RunWithoutInitialSpan(instrs, []int{2})
})
t.Run("CancelEndTxn", func(t *testing.T) {
instrs := []cancelInstr{
{reqOverride: heartbeatBa, expErr: "record not present"},
{reqOverride: endTxnBa, expErr: "does not exist"},
{reqOverride: pushBa},
{reqOverride: heartbeatBa, expErr: "record not present"},
{reqOverride: resolveIntentBa},
{reqOverride: pushBa},
}

for _, cancelOrder := range [][]int{
{1, 2},
{2, 1},
} {
t.Run(fmt.Sprint(cancelOrder), func(t *testing.T) {
ct := newCmdQCancelTest(t)
ct.RunWithoutInitialSpan(instrs, cancelOrder)
})
}
})
t.Run("CancelResolveIntent", func(t *testing.T) {
instrs := []cancelInstr{
{reqOverride: resolveIntentBa},
{reqOverride: getKeyBa},
{reqOverride: putKeyBa},
{reqOverride: resolveIntentBa},
{reqOverride: putKeyBa, expErr: "retry txn"},
{reqOverride: getKeyBa},
{reqOverride: putKeyBa, expErr: "retry txn"},
{reqOverride: resolveIntentBa},
{reqOverride: pushBa},
}

for _, cancelOrder := range [][]int{
{3, 4},
{4, 3},
{6, 3},
{6, 5, 4, 3, 2},
{2, 3, 4, 5, 6},
{2, 6, 3, 5, 4},
} {
t.Run(fmt.Sprint(cancelOrder), func(t *testing.T) {
ct := newCmdQCancelTest(t)
ct.RunWithoutInitialSpan(instrs, cancelOrder)
})
}
})
}

type cancelInstr struct {
Expand Down

0 comments on commit 01d3d51

Please sign in to comment.