Skip to content

Commit

Permalink
storage: regression test leaked intents on bounced proposal
Browse files Browse the repository at this point in the history
This adds the test promised in the PR below. When a transaction
committed but the commit applied at an invalid lease applied index,
we'd formerly (due to a recent change) leak the intents as committed
which would cause dirty writes. Adapt an existing test to roughly
do the following to prevent regression.

The test (now) sets up two ranges and lets a transaction (anchored on
the left) write to both of them. It then starts readers for both keys
written by the txn and waits for them to enter the txn wait queue. Next,
it lets the txn attempt to commit but injects a forced error below Raft.
The bugs would formerly notify the txn wait queue that the transaction
had committed (not true) and that its external intent (i.e. the one on
the right range) could be resolved (not true). Verify that neither
occurs.

See cockroachdb#34659.

Release note: None
  • Loading branch information
tbg committed Feb 8, 2019
1 parent 70cd045 commit 72a1ed3
Show file tree
Hide file tree
Showing 7 changed files with 92 additions and 49 deletions.
87 changes: 59 additions & 28 deletions pkg/kv/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,19 @@ func TestDelayedBeginRetryable(t *testing.T) {
// Test that waiters on transactions whose commit command is rejected see the
// transaction as Aborted. This test is a regression test for #30792 which was
// causing pushers in the txn wait queue to consider such a transaction
// committed.
// committed. It is also a regression test for a the similar bug [1] in which
// it was not the notification to the txn wait queue that was leaked, but the
// intents.
//
// The test sets up two ranges and lets a transaction (anchored on the left)
// write to both of them. It then starts readers for both keys written by the
// txn and waits for them to enter the txn wait queue. Next, it lets the txn
// attempt to commit but injects a forced error below Raft. The bugs would
// formerly notify the txn wait queue that the transaction had committed (not
// true) and that its external intent (i.e. the one on the right range) could
// be resolved (not true). Verify that neither occurs.
//
// [1]: https://github.com/cockroachdb/cockroach/issues/34025#issuecomment-460934278
func TestWaiterOnRejectedCommit(t *testing.T) {
defer leaktest.AfterTest(t)()
ctx := context.Background()
Expand All @@ -203,16 +215,19 @@ func TestWaiterOnRejectedCommit(t *testing.T) {
var txnID atomic.Value
// The EndTransaction proposal that we want to reject. A string.
var commitCmdID atomic.Value
readerBlocked := make(chan struct{})
readerBlocked := make(chan struct{}, 2)
// txnUpdate is signaled once the txn wait queue is updated for our
// transaction. Normally it only needs a buffer length of 1, but bugs that
// cause it to be pinged several times (e.g. #30792) might need a bigger
// buffer to avoid the test timing out.
txnUpdate := make(chan roachpb.TransactionStatus, 10)
txnUpdate := make(chan roachpb.TransactionStatus, 20)

illegalLeaseIndex := true
s, _, db := serverutils.StartServer(t, base.TestServerArgs{
Knobs: base.TestingKnobs{
Store: &storage.StoreTestingKnobs{
DisableMergeQueue: true,
DisableSplitQueue: true,
TestingProposalFilter: func(args storagebase.ProposalFilterArgs) *roachpb.Error {
// We'll recognize the attempt to commit our transaction and store the
// respective command id.
Expand All @@ -234,18 +249,22 @@ func TestWaiterOnRejectedCommit(t *testing.T) {
commitCmdID.Store(args.CmdID)
return nil
},
TestingApplyFilter: func(args storagebase.ApplyFilterArgs) *roachpb.Error {
TestingApplyFilter: func(args storagebase.ApplyFilterArgs) (int, *roachpb.Error) {
// We'll trap the processing of the commit command and return an error
// for it.
v := commitCmdID.Load()
if v == nil {
return nil
return 0, nil
}
cmdID := v.(storagebase.CmdIDKey)
if args.CmdID == cmdID {
return roachpb.NewErrorf("test injected err")
if illegalLeaseIndex {
illegalLeaseIndex = false
return 1, roachpb.NewErrorf("test injected err (illegal lease index)")
}
return 0, roachpb.NewErrorf("test injected err")
}
return nil
return 0, nil
},
TxnWait: txnwait.TestingKnobs{
OnPusherBlocked: func(ctx context.Context, push *roachpb.PushTxnRequest) {
Expand All @@ -255,7 +274,7 @@ func TestWaiterOnRejectedCommit(t *testing.T) {
return
}
if push.PusheeTxn.ID.Equal(v.(uuid.UUID)) {
close(readerBlocked)
readerBlocked <- struct{}{}
}
},
OnTxnUpdate: func(ctx context.Context, txn *roachpb.Transaction) {
Expand All @@ -274,43 +293,55 @@ func TestWaiterOnRejectedCommit(t *testing.T) {
})
defer s.Stopper().Stop(ctx)

// We'll start a transaction, write an intent, then separately do a read on a
// different goroutine and wait for that read to block on the intent, then
// we'll attempt to commit the transaction but we'll intercept the processing
// of the commit command and reject it.
// Then we'll assert that the txn wait queue is told that the transaction
if _, _, err := s.SplitRange(roachpb.Key("b")); err != nil {
t.Fatal(err)
}

// We'll start a transaction, write an intent on both sides of the split,
// then separately do a read on a different goroutine and wait for that read
// to block on the intent, then we'll attempt to commit the transaction but
// we'll intercept the processing of the commit command and reject it. Then
// we'll assert that the txn wait queue is told that the transaction
// aborted, and we also check that the reader got a nil value.

txn := client.NewTxn(ctx, db, s.NodeID(), client.RootTxn)
key := "key"
if err := txn.Put(ctx, key, "val"); err != nil {
t.Fatal(err)
keyLeft, keyRight := "a", "c"
for _, key := range []string{keyLeft, keyRight} {
if err := txn.Put(ctx, key, "val"); err != nil {
t.Fatal(err)
}
}
txnID.Store(txn.ID())

readerDone := make(chan error, 1)

go func() {
val, err := db.Get(ctx, key)
if err != nil {
readerDone <- err
}
if val.Exists() {
readerDone <- fmt.Errorf("expected value to not exist, got: %s", val)
}
close(readerDone)
}()
for _, key := range []string{keyLeft, keyRight} {
go func(key string) {
val, err := db.Get(ctx, key)
if err != nil {
readerDone <- err
}
if val.Exists() {
readerDone <- fmt.Errorf("%s: expected value to not exist, got: %s", key, val)
}
readerDone <- nil
}(key)
}

// Wait for the reader to enter the txn wait queue.
<-readerBlocked
<-readerBlocked

if err := txn.CommitOrCleanup(ctx); !testutils.IsError(err, "test injected err") {
t.Fatalf("expected injected err, got: %v", err)
}
// Wait for the txn wait queue to be pinged and check the status.
if status := <-txnUpdate; status != roachpb.ABORTED {
t.Fatalf("expected the wait queue to be updated with an Aborted txn, instead got: %s", status)
}
if err := <-readerDone; err != nil {
t.Fatal(err)
for i := 0; i < 2; i++ {
if err := <-readerDone; err != nil {
t.Fatal(err)
}
}
}
4 changes: 2 additions & 2 deletions pkg/storage/client_merge_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2992,7 +2992,7 @@ func TestStoreRangeMergeDuringShutdown(t *testing.T) {
rhsDesc *roachpb.RangeDescriptor
stop, stopping bool
}
storeCfg.TestingKnobs.TestingPostApplyFilter = func(args storagebase.ApplyFilterArgs) *roachpb.Error {
storeCfg.TestingKnobs.TestingPostApplyFilter = func(args storagebase.ApplyFilterArgs) (int, *roachpb.Error) {
state.Lock()
if state.stop && !state.stopping && args.RangeID == state.rhsDesc.RangeID && args.IsLeaseRequest {
// Shut down the store. The lease acquisition will notice that a merge is
Expand All @@ -3010,7 +3010,7 @@ func TestStoreRangeMergeDuringShutdown(t *testing.T) {
} else {
state.Unlock()
}
return nil
return 0, nil
}

mtc = &multiTestContext{
Expand Down
12 changes: 6 additions & 6 deletions pkg/storage/client_raft_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1463,15 +1463,15 @@ func TestStoreRangeCorruptionChangeReplicas(t *testing.T) {
// TODO(bdarnell): I think this should be a TestingApplyFilter
// instead of a TestingPostApplyFilter, but making that change
// causes this test to fail.
sc.TestingKnobs.TestingPostApplyFilter = func(filterArgs storagebase.ApplyFilterArgs) *roachpb.Error {
sc.TestingKnobs.TestingPostApplyFilter = func(filterArgs storagebase.ApplyFilterArgs) (int, *roachpb.Error) {
corrupt.Lock()
defer corrupt.Unlock()

if corrupt.store == nil || filterArgs.StoreID != corrupt.store.StoreID() {
return nil
return 0, nil
}

return roachpb.NewError(
return 0, roachpb.NewError(
roachpb.NewReplicaCorruptionError(errors.New("boom")),
)
}
Expand Down Expand Up @@ -4113,11 +4113,11 @@ func TestFailedConfChange(t *testing.T) {
// followers.
var filterActive int32
sc := storage.TestStoreConfig(nil)
sc.TestingKnobs.TestingApplyFilter = func(filterArgs storagebase.ApplyFilterArgs) *roachpb.Error {
sc.TestingKnobs.TestingApplyFilter = func(filterArgs storagebase.ApplyFilterArgs) (int, *roachpb.Error) {
if atomic.LoadInt32(&filterActive) == 1 && filterArgs.ChangeReplicas != nil {
return roachpb.NewErrorf("boom")
return 0, roachpb.NewErrorf("boom")
}
return nil
return 0, nil
}
mtc := &multiTestContext{
storeConfig: &sc,
Expand Down
4 changes: 2 additions & 2 deletions pkg/storage/consistency_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,13 +126,13 @@ func TestCheckConsistencyReplay(t *testing.T) {

// Arrange to count the number of times each checksum command applies to each
// store.
storeCfg.TestingKnobs.TestingApplyFilter = func(args storagebase.ApplyFilterArgs) *roachpb.Error {
storeCfg.TestingKnobs.TestingApplyFilter = func(args storagebase.ApplyFilterArgs) (int, *roachpb.Error) {
state.Lock()
defer state.Unlock()
if ccr := args.ComputeChecksum; ccr != nil {
state.applies[applyKey{ccr.ChecksumID, args.StoreID}]++
}
return nil
return 0, nil
}

// Arrange to trigger a retry when a ComputeChecksum request arrives.
Expand Down
13 changes: 11 additions & 2 deletions pkg/storage/replica_raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -1792,12 +1792,16 @@ func (r *Replica) processRaftCommand(
var writeBatch *storagepb.WriteBatch
{
if filter := r.store.cfg.TestingKnobs.TestingApplyFilter; forcedErr == nil && filter != nil {
forcedErr = filter(storagebase.ApplyFilterArgs{
var newPropRetry int
newPropRetry, forcedErr = filter(storagebase.ApplyFilterArgs{
CmdID: idKey,
ReplicatedEvalResult: raftCmd.ReplicatedEvalResult,
StoreID: r.store.StoreID(),
RangeID: r.RangeID,
})
if proposalRetry == 0 {
proposalRetry = proposalReevaluationReason(newPropRetry)
}
}

if forcedErr != nil {
Expand Down Expand Up @@ -1913,12 +1917,17 @@ func (r *Replica) processRaftCommand(
}

if filter := r.store.cfg.TestingKnobs.TestingPostApplyFilter; pErr == nil && filter != nil {
pErr = filter(storagebase.ApplyFilterArgs{
var newPropRetry int
newPropRetry, pErr = filter(storagebase.ApplyFilterArgs{
CmdID: idKey,
ReplicatedEvalResult: raftCmd.ReplicatedEvalResult,
StoreID: r.store.StoreID(),
RangeID: r.RangeID,
})
if proposalRetry == 0 {
proposalRetry = proposalReevaluationReason(newPropRetry)
}

}

// calling maybeSetCorrupt here is mostly for tests and looks. The
Expand Down
14 changes: 7 additions & 7 deletions pkg/storage/replica_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8689,11 +8689,11 @@ func TestErrorInRaftApplicationClearsIntents(t *testing.T) {
if err != nil {
t.Fatal(err)
}
storeKnobs.TestingApplyFilter = func(filterArgs storagebase.ApplyFilterArgs) *roachpb.Error {
storeKnobs.TestingApplyFilter = func(filterArgs storagebase.ApplyFilterArgs) (int, *roachpb.Error) {
if atomic.LoadInt32(&filterActive) == 1 {
return roachpb.NewErrorf("boom")
return 0, roachpb.NewErrorf("boom")
}
return nil
return 0, nil
}
s, _, kvDB := serverutils.StartServer(t, base.TestServerArgs{
Knobs: base.TestingKnobs{Store: &storeKnobs}})
Expand Down Expand Up @@ -8769,11 +8769,11 @@ func TestProposeWithAsyncConsensus(t *testing.T) {
var filterActive int32
blockRaftApplication := make(chan struct{})
tsc.TestingKnobs.TestingApplyFilter =
func(filterArgs storagebase.ApplyFilterArgs) *roachpb.Error {
func(filterArgs storagebase.ApplyFilterArgs) (int, *roachpb.Error) {
if atomic.LoadInt32(&filterActive) == 1 {
<-blockRaftApplication
}
return nil
return 0, nil
}

stopper := stop.NewStopper()
Expand Down Expand Up @@ -8830,15 +8830,15 @@ func TestApplyPaginatedCommittedEntries(t *testing.T) {
blockRaftApplication := make(chan struct{})
blockingRaftApplication := make(chan struct{}, 1)
tsc.TestingKnobs.TestingApplyFilter =
func(filterArgs storagebase.ApplyFilterArgs) *roachpb.Error {
func(filterArgs storagebase.ApplyFilterArgs) (int, *roachpb.Error) {
if atomic.LoadInt32(&filterActive) == 1 {
select {
case blockingRaftApplication <- struct{}{}:
default:
}
<-blockRaftApplication
}
return nil
return 0, nil
}

stopper := stop.NewStopper()
Expand Down
7 changes: 5 additions & 2 deletions pkg/storage/storagebase/base.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,8 +94,11 @@ type ReplicaCommandFilter func(args FilterArgs) *roachpb.Error
type ReplicaProposalFilter func(args ProposalFilterArgs) *roachpb.Error

// A ReplicaApplyFilter can be used in testing to influence the error returned
// from proposals after they apply.
type ReplicaApplyFilter func(args ApplyFilterArgs) *roachpb.Error
// from proposals after they apply. The returned int is treated as a
// storage.proposalReevaluationReason and will only take an effect when it is
// nonzero and the existing reason is zero. Similarly, the error is only applied
// if there's no error so far.
type ReplicaApplyFilter func(args ApplyFilterArgs) (int, *roachpb.Error)

// ReplicaResponseFilter is used in unittests to modify the outbound
// response returned to a waiting client after a replica command has
Expand Down

0 comments on commit 72a1ed3

Please sign in to comment.