From 65221c88759b3a2435994a0b77a43450481b2b73 Mon Sep 17 00:00:00 2001 From: Nathan VanBenschoten Date: Tue, 4 Dec 2018 01:59:28 -0500 Subject: [PATCH 1/2] storage: don't clear lastTxnMeta on WriteIntentError to different key Fixes #32582. This change removes a faulty optimization in the `contentionQueue`. The optimization removed the txnMeta associated with a contended key in the queue when it found a `WriteIntentError` from a different key. It didn't take into account that this error could be from an earlier request within the same batch, meaning that we can't make any assumptions about the state of the previously contended intent simply because we see a different `WriteIntentError`. Release note (bug fix): Fix a bug where metadata about contended keys was inadvertently ignored, allowing for a failure in txn cycle detection and transaction deadlocks in rare cases. --- pkg/storage/intent_resolver.go | 86 +++++---- pkg/storage/intent_resolver_test.go | 269 +++++++++++++++++++++++++++- 2 files changed, 317 insertions(+), 38 deletions(-) diff --git a/pkg/storage/intent_resolver.go b/pkg/storage/intent_resolver.go index 06701466ff02..38eec313a52c 100644 --- a/pkg/storage/intent_resolver.go +++ b/pkg/storage/intent_resolver.go @@ -100,9 +100,6 @@ func newContendedKey() *contendedKey { // detection of dependency cycles. func (ck *contendedKey) setLastTxnMeta(txnMeta *enginepb.TxnMeta) { ck.lastTxnMeta = txnMeta - if txnMeta == nil { - return - } for e := ck.ll.Front(); e != nil; e = e.Next() { p := e.Value.(*pusher) if p.detectCh != nil { @@ -171,7 +168,8 @@ func txnID(txn *roachpb.Transaction) string { // newWIErr, non-nil in case the re-executed request experienced // another write intent error and could not complete; and // newIntentTxn, nil if the re-executed request left no intent, and -// non-nil if it did. +// non-nil if it did. At most one of these two arguments should be +// provided. func (cq *contentionQueue) add( ctx context.Context, wiErr *roachpb.WriteIntentError, h roachpb.Header, ) ( @@ -198,7 +196,7 @@ func (cq *contentionQueue) add( contended = newContendedKey() contended.setLastTxnMeta(&intent.Txn) cq.mu.keys[key] = contended - } else if contended.lastTxnMeta == nil || contended.lastTxnMeta.ID != intent.Txn.ID { + } else if contended.lastTxnMeta.ID != intent.Txn.ID { contended.setLastTxnMeta(&intent.Txn) } @@ -274,17 +272,11 @@ func (cq *contentionQueue) add( frontOfQueue := curElement == contended.ll.Front() pusheeTxn := contended.lastTxnMeta cq.mu.Unlock() - // If we're at the start of the queue, or there's no pushee - // transaction (the previous pusher didn't leave an intent), - // loop and wait for the wait channel to signal. + // If we're at the start of the queue loop and wait + // for the wait channel to signal. if frontOfQueue { log.VEventf(ctx, 3, "%s at front of queue; breaking from loop", txnID(curPusher.txn)) break Loop - } else if pusheeTxn == nil { - log.VEventf(ctx, 3, "%s cycle detection skipped because there is no txn to push", txnID(curPusher.txn)) - detectCh = nil - detectReady = time.After(dependencyCyclePushDelay) - continue } pushReq := &roachpb.PushTxnRequest{ RequestHeader: roachpb.RequestHeader{ @@ -313,12 +305,22 @@ func (cq *contentionQueue) add( } return func(newWIErr *roachpb.WriteIntentError, newIntentTxn *enginepb.TxnMeta) { + if newWIErr != nil && newIntentTxn != nil { + // The need for this implies that the function should be split + // into a more rigidly defined handle with multiple accessors. + // TODO(nvanbenschoten): clean this up and test better when we're + // not intending the change to be backported. + panic("newWIErr and newIntentTxn both provided") + } if newWIErr == nil { log.VEventf(ctx, 3, "%s finished, leaving intent? %t (owned by %s)", txnID(curPusher.txn), newIntentTxn != nil, newIntentTxn) } else { log.VEventf(ctx, 3, "%s encountered another write intent error %s", txnID(curPusher.txn), newWIErr) } cq.mu.Lock() + defer cq.mu.Unlock() + + // Remove the current element from its list of pushers. // If the current element isn't the front, it's being removed // because its context was canceled. Swap the wait channel with // the previous element. @@ -329,34 +331,44 @@ func (cq *contentionQueue) add( } prevPusher.waitCh, curPusher.waitCh = curPusher.waitCh, prevPusher.waitCh } - // If the pusher re-executed its request and encountered another - // write intent error, check if it's for the same intent; if so, - // we can set the newIntentTxn to match the new intent. - if newWIErr != nil && len(newWIErr.Intents) == 1 && - len(newWIErr.Intents[0].EndKey) == 0 && newWIErr.Intents[0].Key.Equal(intent.Key) { - newIntentTxn = &newWIErr.Intents[0].Txn - } contended.ll.Remove(curElement) + if contended.ll.Len() == 0 { + // If the contendedKey's list is now empty, remove it. We don't need + // to send on or close our waitCh because no one is or ever will wait + // on it. delete(cq.mu.keys, key) - } else if newIntentTxn != nil { - contended.setLastTxnMeta(newIntentTxn) - } else if newWIErr != nil { - // Note that we don't update last txn meta unless we know for - // sure the txn which has written the most recent intent to the - // contended key (i.e. newWIErr != nil). - contended.setLastTxnMeta(nil) - } - if newIntentTxn != nil { - // Shallow copy the TxnMeta. After this request returns (i.e. now), we might - // mutate it (DistSender and such), but the receiver of the channel will read - // it. - newIntentTxnCopy := *newIntentTxn - newIntentTxn = &newIntentTxnCopy + } else { + // If the pusher re-executed its request and encountered another + // write intent error, check if it's for the same intent; if so, + // we can set the newIntentTxn to match the new intent. If not, + // make sure that we don't pollute the old contendedKey with any + // new information. + if newWIErr != nil { + sameKey := len(newWIErr.Intents) == 1 && newWIErr.Intents[0].Span.Equal(intent.Span) + if sameKey { + newIntentTxn = &newWIErr.Intents[0].Txn + } else { + // If the pusher re-executed and found a different intent, make + // sure that we don't tell the old contendedKey anything about + // the new intent's transaction. This new intent could be from + // an earlier request in the batch than the one that previously + // hit the error, so we don't know anything about the state of + // the old intent. + newIntentTxn = nil + } + } + if newIntentTxn != nil { + // Shallow copy the TxnMeta. After this request returns (i.e. + // now), we might mutate it (DistSender and such), but the + // receiver of the channel will read it. + newIntentTxnCopy := *newIntentTxn + newIntentTxn = &newIntentTxnCopy + contended.setLastTxnMeta(newIntentTxn) + } + curPusher.waitCh <- newIntentTxn + close(curPusher.waitCh) } - curPusher.waitCh <- newIntentTxn - close(curPusher.waitCh) - cq.mu.Unlock() }, wiErr, done } diff --git a/pkg/storage/intent_resolver_test.go b/pkg/storage/intent_resolver_test.go index 248bcf0d3741..d33fb8883f29 100644 --- a/pkg/storage/intent_resolver_test.go +++ b/pkg/storage/intent_resolver_test.go @@ -17,17 +17,19 @@ package storage import ( "context" + "fmt" "sync" "testing" "time" + "github.com/pkg/errors" + "github.com/cockroachdb/cockroach/pkg/internal/client" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/storage/engine/enginepb" "github.com/cockroachdb/cockroach/pkg/testutils" "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/stop" - "github.com/pkg/errors" ) // TestPushTransactionsWithNonPendingIntent verifies that maybePushIntents @@ -274,3 +276,268 @@ func TestContendedIntentWithDependencyCycle(t *testing.T) { t.Fatal(err) } } + +// TestContendedIntentChangesOnRetry verifies that a batch which observes a +// WriteIntentError for one key and then a WriteIntentError for a different +// key doesn't pollute the old key's contentionQueue state. +// +// This also serves as a regression test for #32582. In that issue, we +// saw a transaction wait in the contentionQueue without pushing the +// transaction that it was deadlocked on. This was because of a bug in +// how the queue handled WriteIntentErrors for different intents on +// the re-evaluation of a batch. +// +// The scenario requires 5 unique transactions: +// 1. txn1 writes to keyA. +// 2. txn2 writes to keyB. +// 3. txn4 writes to keyC and keyB in the same batch. The batch initially +// fails with a WriteIntentError on keyA. It enters the contentionQueue +// and becomes the front of a contendedKey list. +// 4. txn5 writes to keyB. It enters the contentionQueue behind txn4. +// 5. txn3 writes to keyC. +// 6. txn2 is committed and the intent on keyB is resolved. +// 7. txn4 exits the contentionQueue and re-evaluates. This time, it hits +// a WriteIntentError on the first request in its batch: keyC. HOWEVER, +// before it updates the contentionQueue, steps 8-10 occur. +// 8. txn3 writes to keyB. It never enters the contentionQueue. txn3 then +// writes to keyA in a separate batch and gets stuck waiting on txn1. +// 9. txn2 writes to keyB. It observes txn3's intent and informs the +// contentionQueue of the new txn upon its entrance. +// 10. txn5, the new front of the contendedKey list, pushes txn2 and gets +// stuck in the txnWaitQueue. +// 11. txn4 finally updates the contentionQueue. A bug previously existed +// where it would set the contendedKey's lastTxnMeta to nil because it +// saw a WriteIntentError for a different key. +// 12. txn1 notices the nil lastTxnMeta and does not push txn2. This prevents +// cycle detection from succeeding and we observe a deadlock. +// +func TestContendedIntentChangesOnRetry(t *testing.T) { + defer leaktest.AfterTest(t)() + stopper := stop.NewStopper() + defer stopper.Stop(context.TODO()) + store, _ := createTestStore(t, stopper) + ctx := context.Background() + + keyA := roachpb.Key("a") + keyB := roachpb.Key("b") + keyC := roachpb.Key("c") + keyD := roachpb.Key("d") + spanA := roachpb.Span{Key: keyA} + spanB := roachpb.Span{Key: keyB} + spanC := roachpb.Span{Key: keyC} + + // Steps 1 and 2. + // + // Create the five transactions; at this point, none of them have + // conflicts. Txn1 has written "a", Txn2 has written "b". + txn1 := beginTransaction(t, store, -5, keyA, true /* putKey */) + txn2 := beginTransaction(t, store, -4, keyB, true /* putKey */) + txn3 := beginTransaction(t, store, -3, keyC, false /* putKey */) + txn4 := beginTransaction(t, store, -5, keyD, false /* putKey */) + txn5 := beginTransaction(t, store, -1, keyD, false /* putKey */) + + fmt.Println(txn1.ID, txn2.ID, txn3.ID, txn4.ID, txn5.ID) + + txnCh1 := make(chan error, 1) + txnCh3 := make(chan error, 1) + txnCh4 := make(chan error, 1) + txnCh5 := make(chan error, 1) + + // waitForContended waits until the provided key has the specified + // number of pushers in the contentionQueue. + waitForContended := func(key roachpb.Key, count int) { + testutils.SucceedsSoon(t, func() error { + ir := store.intentResolver + ir.contentionQ.mu.Lock() + defer ir.contentionQ.mu.Unlock() + contended, ok := ir.contentionQ.mu.keys[string(key)] + if !ok { + return errors.Errorf("key not contended") + } + if lc := contended.ll.Len(); lc != count { + return errors.Errorf("expected len %d; got %d", count, lc) + } + return nil + }) + } + + // Steps 3, 7, and 11. + // + // Send txn4's puts in a single batch, followed by an end transaction. + // This txn will hit a WriteIntentError on its second request during the + // first time that it evaluates the batch and will hit a WriteIntentError + // on its first request during the second time that it evaluates. This + // second WriteIntentError must not be entangled with the contentionQueue + // entry for the first key. + { + go func() { + putC := putArgs(keyC, []byte("value")) // will hit intent on 2nd iter + putB := putArgs(keyB, []byte("value")) // will hit intent on 1st iter + assignSeqNumsForReqs(txn4, &putC) + assignSeqNumsForReqs(txn4, &putB) + ba := roachpb.BatchRequest{} + ba.Header = roachpb.Header{Txn: txn4} + ba.Add(&putC, &putB) + br, pErr := store.TestSender().Send(ctx, ba) + if pErr != nil { + txnCh4 <- pErr.GoError() + return + } + txn4.Update(br.Txn) + + et, _ := endTxnArgs(txn4, true) + et.IntentSpans = []roachpb.Span{spanB, spanC} + et.NoRefreshSpans = true + assignSeqNumsForReqs(txn4, &et) + _, pErr = client.SendWrappedWith(ctx, store.TestSender(), roachpb.Header{Txn: txn4}, &et) + txnCh4 <- pErr.GoError() + }() + + waitForContended(keyB, 1) + t.Log("txn4 in contentionQueue") + } + + // Steps 4 and 10. + // + // Send txn5's put, followed by an end transaction. This request will + // wait at the head of the contention queue once txn2 is committed. + { + go func() { + // Write keyB to create a cycle with txn3. + putB := putArgs(keyB, []byte("value")) + assignSeqNumsForReqs(txn5, &putB) + repl, pErr := client.SendWrappedWith(ctx, store.TestSender(), roachpb.Header{Txn: txn5}, &putB) + if pErr != nil { + txnCh5 <- pErr.GoError() + return + } + txn5.Update(repl.Header().Txn) + + et, _ := endTxnArgs(txn5, true) + et.IntentSpans = []roachpb.Span{spanB} + et.NoRefreshSpans = true + assignSeqNumsForReqs(txn5, &et) + _, pErr = client.SendWrappedWith(ctx, store.TestSender(), roachpb.Header{Txn: txn5}, &et) + txnCh5 <- pErr.GoError() + }() + + waitForContended(keyB, 2) + t.Log("txn5 in contentionQueue") + } + + // Step 5. + // + // Write to keyC, which will block txn4 on its re-evaluation. + { + putC := putArgs(keyC, []byte("value")) + assignSeqNumsForReqs(txn3, &putC) + if _, pErr := client.SendWrappedWith(ctx, store.TestSender(), roachpb.Header{Txn: txn3}, &putC); pErr != nil { + t.Fatal(pErr) + } + } + + // Step 6. + // + // Commit txn2, which should set off a chain reaction of movement. txn3 should + // write an intent at keyB and go on to write at keyA. In doing so, it should + // create a cycle with txn1. This cycle should be detected. + // + // In #32582 we saw a case where txn4 would hit a different WriteIntentError + // during its re-evaluation (keyC instead of keyB). This caused it to remove + // the lastTxnMeta from keyB's contendedKey record, which prevented txn1 from + // pushing txn3 and in turn prevented cycle detection from finding the deadlock. + { + // Sleeping for dependencyCyclePushDelay before committing txn2 makes the + // failure reproduce more easily. + time.Sleep(100 * time.Millisecond) + + et, _ := endTxnArgs(txn2, true) + et.IntentSpans = []roachpb.Span{spanB} + et.NoRefreshSpans = true + assignSeqNumsForReqs(txn2, &et) + if _, pErr := client.SendWrappedWith(ctx, store.TestSender(), roachpb.Header{Txn: txn2}, &et); pErr != nil { + t.Fatal(pErr) + } + } + + // Step 8. + // + // Send txn3's two other put requests, followed by an end transaction. This + // txn will first write to keyB before writing to keyA. This will create a + // cycle between txn1 and txn3. + { + // Write to keyB, which will hit an intent from txn2. + putB := putArgs(keyB, []byte("value")) + assignSeqNumsForReqs(txn3, &putB) + repl, pErr := client.SendWrappedWith(ctx, store.TestSender(), roachpb.Header{Txn: txn3}, &putB) + if pErr != nil { + txnCh3 <- pErr.GoError() + return + } + txn3.Update(repl.Header().Txn) + + go func() { + // Write keyA, which will hit an intent from txn1 and create a cycle. + putA := putArgs(keyA, []byte("value")) + assignSeqNumsForReqs(txn3, &putA) + repl, pErr = client.SendWrappedWith(ctx, store.TestSender(), roachpb.Header{Txn: txn3}, &putA) + if pErr != nil { + txnCh3 <- pErr.GoError() + return + } + txn3.Update(repl.Header().Txn) + + et, _ := endTxnArgs(txn3, true) + et.IntentSpans = []roachpb.Span{spanA, spanB} + et.NoRefreshSpans = true + assignSeqNumsForReqs(txn3, &et) + _, pErr = client.SendWrappedWith(ctx, store.TestSender(), roachpb.Header{Txn: txn3}, &et) + txnCh3 <- pErr.GoError() + }() + } + + // Step 9. + // + // Send txn1's put request to keyB, which completes the cycle between txn1 + // and txn3. + { + go func() { + // Write keyB to create a cycle with txn3. + putB := putArgs(keyB, []byte("value")) + assignSeqNumsForReqs(txn1, &putB) + repl, pErr := client.SendWrappedWith(ctx, store.TestSender(), roachpb.Header{Txn: txn1}, &putB) + if pErr != nil { + txnCh1 <- pErr.GoError() + return + } + txn1.Update(repl.Header().Txn) + + et, _ := endTxnArgs(txn1, true) + et.IntentSpans = []roachpb.Span{spanB} + et.NoRefreshSpans = true + assignSeqNumsForReqs(txn1, &et) + _, pErr = client.SendWrappedWith(ctx, store.TestSender(), roachpb.Header{Txn: txn1}, &et) + txnCh1 <- pErr.GoError() + }() + } + + // The third transaction will always be aborted because it has + // a lower priority than the first. + err := <-txnCh3 + if _, ok := err.(*roachpb.UnhandledRetryableError); !ok { + t.Fatalf("expected transaction aborted error; got %T", err) + } + if err := <-txnCh1; err != nil { + t.Fatal(err) + } + if err := <-txnCh4; err != nil { + // txn4 can end up being aborted due to a perceived deadlock. This + // is rare and isn't important to the test, so we allow it. + if _, ok := err.(*roachpb.UnhandledRetryableError); !ok { + t.Fatal(err) + } + } + if err := <-txnCh5; err != nil { + t.Fatal(err) + } +} From 1146a03cc217cb57bdddd795e2d2fe2806c64985 Mon Sep 17 00:00:00 2001 From: Nathan VanBenschoten Date: Wed, 5 Dec 2018 14:28:49 -0500 Subject: [PATCH 2/2] storage: fix flake in TestContendedIntentChangesOnRetry Saw in CI of #32853. We were ignoring deadlock errors on txn4, but not txn5. Release note: None --- pkg/storage/intent_resolver_test.go | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/pkg/storage/intent_resolver_test.go b/pkg/storage/intent_resolver_test.go index d33fb8883f29..dc06d1630407 100644 --- a/pkg/storage/intent_resolver_test.go +++ b/pkg/storage/intent_resolver_test.go @@ -538,6 +538,10 @@ func TestContendedIntentChangesOnRetry(t *testing.T) { } } if err := <-txnCh5; err != nil { - t.Fatal(err) + // txn5 can end up being aborted due to a perceived deadlock. This + // is rare and isn't important to the test, so we allow it. + if _, ok := err.(*roachpb.UnhandledRetryableError); !ok { + t.Fatal(err) + } } }