Skip to content

Commit

Permalink
Merge #34733
Browse files Browse the repository at this point in the history
34733: storage: regression test leaked intents on bounced proposal r=petermattis a=tbg

This adds the test promised in the PR below. When a transaction
committed but the commit applied at an invalid lease applied index,
we'd formerly (due to a recent change) leak the intents as committed
which would cause dirty writes. Adapt an existing test to roughly
do the following to prevent regression.

The test (now) sets up two ranges and lets a transaction (anchored on
the left) write to both of them. It then starts readers for both keys
written by the txn and waits for them to enter the txn wait queue. Next,
it lets the txn attempt to commit but injects a forced error below Raft.
The bugs would formerly notify the txn wait queue that the transaction
had committed (not true) and that its external intent (i.e. the one on
the right range) could be resolved (not true). Verify that neither
occurs.

See #34659.

Release note: None

Co-authored-by: Tobias Schottdorf <[email protected]>
  • Loading branch information
craig[bot] and tbg committed Feb 11, 2019
2 parents d4e7883 + 6fd1e11 commit 8ad594e
Show file tree
Hide file tree
Showing 7 changed files with 96 additions and 51 deletions.
93 changes: 63 additions & 30 deletions pkg/kv/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,19 @@ func TestDelayedBeginRetryable(t *testing.T) {
// Test that waiters on transactions whose commit command is rejected see the
// transaction as Aborted. This test is a regression test for #30792 which was
// causing pushers in the txn wait queue to consider such a transaction
// committed.
// committed. It is also a regression test for a similar bug [1] in which
// it was not the notification to the txn wait queue that was leaked, but the
// intents.
//
// The test sets up two ranges and lets a transaction (anchored on the left)
// write to both of them. It then starts readers for both keys written by the
// txn and waits for them to enter the txn wait queue. Next, it lets the txn
// attempt to commit but injects a forced error below Raft. The bugs would
// formerly notify the txn wait queue that the transaction had committed (not
// true) and that its external intent (i.e. the one on the right range) could
// be resolved (not true). Verify that neither occurs.
//
// [1]: https://github.com/cockroachdb/cockroach/issues/34025#issuecomment-460934278
func TestWaiterOnRejectedCommit(t *testing.T) {
defer leaktest.AfterTest(t)()
ctx := context.Background()
Expand All @@ -203,16 +215,19 @@ func TestWaiterOnRejectedCommit(t *testing.T) {
var txnID atomic.Value
// The EndTransaction proposal that we want to reject. A string.
var commitCmdID atomic.Value
readerBlocked := make(chan struct{})
readerBlocked := make(chan struct{}, 2)
// txnUpdate is signaled once the txn wait queue is updated for our
// transaction. Normally it only needs a buffer length of 1, but bugs that
// cause it to be pinged several times (e.g. #30792) might need a bigger
// buffer to avoid the test timing out.
txnUpdate := make(chan roachpb.TransactionStatus, 10)
txnUpdate := make(chan roachpb.TransactionStatus, 20)

illegalLeaseIndex := true
s, _, db := serverutils.StartServer(t, base.TestServerArgs{
Knobs: base.TestingKnobs{
Store: &storage.StoreTestingKnobs{
DisableMergeQueue: true,
DisableSplitQueue: true,
TestingProposalFilter: func(args storagebase.ProposalFilterArgs) *roachpb.Error {
// We'll recognize the attempt to commit our transaction and store the
// respective command id.
Expand All @@ -234,18 +249,24 @@ func TestWaiterOnRejectedCommit(t *testing.T) {
commitCmdID.Store(args.CmdID)
return nil
},
TestingApplyFilter: func(args storagebase.ApplyFilterArgs) *roachpb.Error {
TestingApplyFilter: func(args storagebase.ApplyFilterArgs) (int, *roachpb.Error) {
// We'll trap the processing of the commit command and return an error
// for it.
v := commitCmdID.Load()
if v == nil {
return nil
return 0, nil
}
cmdID := v.(storagebase.CmdIDKey)
if args.CmdID == cmdID {
return roachpb.NewErrorf("test injected err")
if illegalLeaseIndex {
illegalLeaseIndex = false
// NB: 1 is proposalIllegalLeaseIndex.
return 1, roachpb.NewErrorf("test injected err (illegal lease index)")
}
// NB: 0 is proposalNoReevaluation.
return 0, roachpb.NewErrorf("test injected err")
}
return nil
return 0, nil
},
TxnWait: txnwait.TestingKnobs{
OnPusherBlocked: func(ctx context.Context, push *roachpb.PushTxnRequest) {
Expand All @@ -255,7 +276,7 @@ func TestWaiterOnRejectedCommit(t *testing.T) {
return
}
if push.PusheeTxn.ID.Equal(v.(uuid.UUID)) {
close(readerBlocked)
readerBlocked <- struct{}{}
}
},
OnTxnUpdate: func(ctx context.Context, txn *roachpb.Transaction) {
Expand All @@ -274,43 +295,55 @@ func TestWaiterOnRejectedCommit(t *testing.T) {
})
defer s.Stopper().Stop(ctx)

// We'll start a transaction, write an intent, then separately do a read on a
// different goroutine and wait for that read to block on the intent, then
// we'll attempt to commit the transaction but we'll intercept the processing
// of the commit command and reject it.
// Then we'll assert that the txn wait queue is told that the transaction
if _, _, err := s.SplitRange(roachpb.Key("b")); err != nil {
t.Fatal(err)
}

// We'll start a transaction, write an intent on both sides of the split,
// then separately do a read on a different goroutine and wait for that read
// to block on the intent, then we'll attempt to commit the transaction but
// we'll intercept the processing of the commit command and reject it. Then
// we'll assert that the txn wait queue is told that the transaction
// aborted, and we also check that the reader got a nil value.

txn := client.NewTxn(ctx, db, s.NodeID(), client.RootTxn)
key := "key"
if err := txn.Put(ctx, key, "val"); err != nil {
t.Fatal(err)
keyLeft, keyRight := "a", "c"
for _, key := range []string{keyLeft, keyRight} {
if err := txn.Put(ctx, key, "val"); err != nil {
t.Fatal(err)
}
}
txnID.Store(txn.ID())

readerDone := make(chan error, 1)
readerDone := make(chan error, 2)

go func() {
val, err := db.Get(ctx, key)
if err != nil {
readerDone <- err
}
if val.Exists() {
readerDone <- fmt.Errorf("expected value to not exist, got: %s", val)
}
close(readerDone)
}()
for _, key := range []string{keyLeft, keyRight} {
go func(key string) {
val, err := db.Get(ctx, key)
if err != nil {
readerDone <- err
} else if val.Exists() {
readerDone <- fmt.Errorf("%s: expected value to not exist, got: %s", key, val)
} else {
readerDone <- nil
}
}(key)
}

// Wait for the reader to enter the txn wait queue.
// Wait for both readers to enter the txn wait queue.
<-readerBlocked
<-readerBlocked

if err := txn.CommitOrCleanup(ctx); !testutils.IsError(err, "test injected err") {
t.Fatalf("expected injected err, got: %v", err)
}
// Wait for the txn wait queue to be pinged and check the status.
if status := <-txnUpdate; status != roachpb.ABORTED {
t.Fatalf("expected the wait queue to be updated with an Aborted txn, instead got: %s", status)
}
if err := <-readerDone; err != nil {
t.Fatal(err)
for i := 0; i < 2; i++ {
if err := <-readerDone; err != nil {
t.Fatal(err)
}
}
}
4 changes: 2 additions & 2 deletions pkg/storage/client_merge_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2992,7 +2992,7 @@ func TestStoreRangeMergeDuringShutdown(t *testing.T) {
rhsDesc *roachpb.RangeDescriptor
stop, stopping bool
}
storeCfg.TestingKnobs.TestingPostApplyFilter = func(args storagebase.ApplyFilterArgs) *roachpb.Error {
storeCfg.TestingKnobs.TestingPostApplyFilter = func(args storagebase.ApplyFilterArgs) (int, *roachpb.Error) {
state.Lock()
if state.stop && !state.stopping && args.RangeID == state.rhsDesc.RangeID && args.IsLeaseRequest {
// Shut down the store. The lease acquisition will notice that a merge is
Expand All @@ -3010,7 +3010,7 @@ func TestStoreRangeMergeDuringShutdown(t *testing.T) {
} else {
state.Unlock()
}
return nil
return 0, nil
}

mtc = &multiTestContext{
Expand Down
12 changes: 6 additions & 6 deletions pkg/storage/client_raft_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1463,15 +1463,15 @@ func TestStoreRangeCorruptionChangeReplicas(t *testing.T) {
// TODO(bdarnell): I think this should be a TestingApplyFilter
// instead of a TestingPostApplyFilter, but making that change
// causes this test to fail.
sc.TestingKnobs.TestingPostApplyFilter = func(filterArgs storagebase.ApplyFilterArgs) *roachpb.Error {
sc.TestingKnobs.TestingPostApplyFilter = func(filterArgs storagebase.ApplyFilterArgs) (int, *roachpb.Error) {
corrupt.Lock()
defer corrupt.Unlock()

if corrupt.store == nil || filterArgs.StoreID != corrupt.store.StoreID() {
return nil
return 0, nil
}

return roachpb.NewError(
return 0, roachpb.NewError(
roachpb.NewReplicaCorruptionError(errors.New("boom")),
)
}
Expand Down Expand Up @@ -4113,11 +4113,11 @@ func TestFailedConfChange(t *testing.T) {
// followers.
var filterActive int32
sc := storage.TestStoreConfig(nil)
sc.TestingKnobs.TestingApplyFilter = func(filterArgs storagebase.ApplyFilterArgs) *roachpb.Error {
sc.TestingKnobs.TestingApplyFilter = func(filterArgs storagebase.ApplyFilterArgs) (int, *roachpb.Error) {
if atomic.LoadInt32(&filterActive) == 1 && filterArgs.ChangeReplicas != nil {
return roachpb.NewErrorf("boom")
return 0, roachpb.NewErrorf("boom")
}
return nil
return 0, nil
}
mtc := &multiTestContext{
storeConfig: &sc,
Expand Down
4 changes: 2 additions & 2 deletions pkg/storage/consistency_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,13 +126,13 @@ func TestCheckConsistencyReplay(t *testing.T) {

// Arrange to count the number of times each checksum command applies to each
// store.
storeCfg.TestingKnobs.TestingApplyFilter = func(args storagebase.ApplyFilterArgs) *roachpb.Error {
storeCfg.TestingKnobs.TestingApplyFilter = func(args storagebase.ApplyFilterArgs) (int, *roachpb.Error) {
state.Lock()
defer state.Unlock()
if ccr := args.ComputeChecksum; ccr != nil {
state.applies[applyKey{ccr.ChecksumID, args.StoreID}]++
}
return nil
return 0, nil
}

// Arrange to trigger a retry when a ComputeChecksum request arrives.
Expand Down
13 changes: 11 additions & 2 deletions pkg/storage/replica_raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -1822,12 +1822,16 @@ func (r *Replica) processRaftCommand(
var writeBatch *storagepb.WriteBatch
{
if filter := r.store.cfg.TestingKnobs.TestingApplyFilter; forcedErr == nil && filter != nil {
forcedErr = filter(storagebase.ApplyFilterArgs{
var newPropRetry int
newPropRetry, forcedErr = filter(storagebase.ApplyFilterArgs{
CmdID: idKey,
ReplicatedEvalResult: raftCmd.ReplicatedEvalResult,
StoreID: r.store.StoreID(),
RangeID: r.RangeID,
})
if proposalRetry == 0 {
proposalRetry = proposalReevaluationReason(newPropRetry)
}
}

if forcedErr != nil {
Expand Down Expand Up @@ -1943,12 +1947,17 @@ func (r *Replica) processRaftCommand(
}

if filter := r.store.cfg.TestingKnobs.TestingPostApplyFilter; pErr == nil && filter != nil {
pErr = filter(storagebase.ApplyFilterArgs{
var newPropRetry int
newPropRetry, pErr = filter(storagebase.ApplyFilterArgs{
CmdID: idKey,
ReplicatedEvalResult: raftCmd.ReplicatedEvalResult,
StoreID: r.store.StoreID(),
RangeID: r.RangeID,
})
if proposalRetry == 0 {
proposalRetry = proposalReevaluationReason(newPropRetry)
}

}

// calling maybeSetCorrupt here is mostly for tests and looks. The
Expand Down
14 changes: 7 additions & 7 deletions pkg/storage/replica_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8689,11 +8689,11 @@ func TestErrorInRaftApplicationClearsIntents(t *testing.T) {
if err != nil {
t.Fatal(err)
}
storeKnobs.TestingApplyFilter = func(filterArgs storagebase.ApplyFilterArgs) *roachpb.Error {
storeKnobs.TestingApplyFilter = func(filterArgs storagebase.ApplyFilterArgs) (int, *roachpb.Error) {
if atomic.LoadInt32(&filterActive) == 1 {
return roachpb.NewErrorf("boom")
return 0, roachpb.NewErrorf("boom")
}
return nil
return 0, nil
}
s, _, kvDB := serverutils.StartServer(t, base.TestServerArgs{
Knobs: base.TestingKnobs{Store: &storeKnobs}})
Expand Down Expand Up @@ -8769,11 +8769,11 @@ func TestProposeWithAsyncConsensus(t *testing.T) {
var filterActive int32
blockRaftApplication := make(chan struct{})
tsc.TestingKnobs.TestingApplyFilter =
func(filterArgs storagebase.ApplyFilterArgs) *roachpb.Error {
func(filterArgs storagebase.ApplyFilterArgs) (int, *roachpb.Error) {
if atomic.LoadInt32(&filterActive) == 1 {
<-blockRaftApplication
}
return nil
return 0, nil
}

stopper := stop.NewStopper()
Expand Down Expand Up @@ -8830,15 +8830,15 @@ func TestApplyPaginatedCommittedEntries(t *testing.T) {
blockRaftApplication := make(chan struct{})
blockingRaftApplication := make(chan struct{}, 1)
tsc.TestingKnobs.TestingApplyFilter =
func(filterArgs storagebase.ApplyFilterArgs) *roachpb.Error {
func(filterArgs storagebase.ApplyFilterArgs) (int, *roachpb.Error) {
if atomic.LoadInt32(&filterActive) == 1 {
select {
case blockingRaftApplication <- struct{}{}:
default:
}
<-blockRaftApplication
}
return nil
return 0, nil
}

stopper := stop.NewStopper()
Expand Down
7 changes: 5 additions & 2 deletions pkg/storage/storagebase/base.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,8 +94,11 @@ type ReplicaCommandFilter func(args FilterArgs) *roachpb.Error
type ReplicaProposalFilter func(args ProposalFilterArgs) *roachpb.Error

// A ReplicaApplyFilter can be used in testing to influence the error returned
// from proposals after they apply.
type ReplicaApplyFilter func(args ApplyFilterArgs) *roachpb.Error
// from proposals after they apply. The returned int is treated as a
// storage.proposalReevaluationReason and will only take an effect when it is
// nonzero and the existing reason is zero. Similarly, the error is only applied
// if there's no error so far.
type ReplicaApplyFilter func(args ApplyFilterArgs) (int, *roachpb.Error)

// ReplicaResponseFilter is used in unittests to modify the outbound
// response returned to a waiting client after a replica command has
Expand Down

0 comments on commit 8ad594e

Please sign in to comment.