Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
32773: storage: don't clear lastTxnMeta on WriteIntentError to different key r=nvanbenschoten a=nvanbenschoten

Fixes cockroachdb#32582.

This change removes a faulty optimization in the `contentionQueue`.
The optimization removed the txnMeta associated with a contended key
in the queue when it found a `WriteIntentError` from a different key.
It didn't take into account that this error could be from an earlier
request within the same batch, meaning that we can't make any assumptions
about the state of the previously contended intent simply because we
see a different `WriteIntentError`.

Release note (bug fix): Fix a bug where metadata about contended keys
was inadvertently ignored, allowing for a failure in txn cycle detection
and transaction deadlocks in rare cases.

Co-authored-by: Nathan VanBenschoten <[email protected]>
  • Loading branch information
craig[bot] and nvanbenschoten committed Dec 5, 2018
2 parents 9c46723 + 1252ac7 commit f5c1c7e
Show file tree
Hide file tree
Showing 2 changed files with 317 additions and 38 deletions.
86 changes: 49 additions & 37 deletions pkg/storage/intent_resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,9 +100,6 @@ func newContendedKey() *contendedKey {
// detection of dependency cycles.
func (ck *contendedKey) setLastTxnMeta(txnMeta *enginepb.TxnMeta) {
ck.lastTxnMeta = txnMeta
if txnMeta == nil {
return
}
for e := ck.ll.Front(); e != nil; e = e.Next() {
p := e.Value.(*pusher)
if p.detectCh != nil {
Expand Down Expand Up @@ -171,7 +168,8 @@ func txnID(txn *roachpb.Transaction) string {
// newWIErr, non-nil in case the re-executed request experienced
// another write intent error and could not complete; and
// newIntentTxn, nil if the re-executed request left no intent, and
// non-nil if it did.
// non-nil if it did. At most one of these two arguments should be
// provided.
func (cq *contentionQueue) add(
ctx context.Context, wiErr *roachpb.WriteIntentError, h roachpb.Header,
) (
Expand All @@ -198,7 +196,7 @@ func (cq *contentionQueue) add(
contended = newContendedKey()
contended.setLastTxnMeta(&intent.Txn)
cq.mu.keys[key] = contended
} else if contended.lastTxnMeta == nil || contended.lastTxnMeta.ID != intent.Txn.ID {
} else if contended.lastTxnMeta.ID != intent.Txn.ID {
contended.setLastTxnMeta(&intent.Txn)
}

Expand Down Expand Up @@ -274,17 +272,11 @@ func (cq *contentionQueue) add(
frontOfQueue := curElement == contended.ll.Front()
pusheeTxn := contended.lastTxnMeta
cq.mu.Unlock()
// If we're at the start of the queue, or there's no pushee
// transaction (the previous pusher didn't leave an intent),
// loop and wait for the wait channel to signal.
// If we're at the start of the queue loop and wait
// for the wait channel to signal.
if frontOfQueue {
log.VEventf(ctx, 3, "%s at front of queue; breaking from loop", txnID(curPusher.txn))
break Loop
} else if pusheeTxn == nil {
log.VEventf(ctx, 3, "%s cycle detection skipped because there is no txn to push", txnID(curPusher.txn))
detectCh = nil
detectReady = time.After(dependencyCyclePushDelay)
continue
}
pushReq := &roachpb.PushTxnRequest{
RequestHeader: roachpb.RequestHeader{
Expand Down Expand Up @@ -313,12 +305,22 @@ func (cq *contentionQueue) add(
}

return func(newWIErr *roachpb.WriteIntentError, newIntentTxn *enginepb.TxnMeta) {
if newWIErr != nil && newIntentTxn != nil {
// The need for this implies that the function should be split
// into a more rigidly defined handle with multiple accessors.
// TODO(nvanbenschoten): clean this up and test better when we're
// not intending the change to be backported.
panic("newWIErr and newIntentTxn both provided")
}
if newWIErr == nil {
log.VEventf(ctx, 3, "%s finished, leaving intent? %t (owned by %s)", txnID(curPusher.txn), newIntentTxn != nil, newIntentTxn)
} else {
log.VEventf(ctx, 3, "%s encountered another write intent error %s", txnID(curPusher.txn), newWIErr)
}
cq.mu.Lock()
defer cq.mu.Unlock()

// Remove the current element from its list of pushers.
// If the current element isn't the front, it's being removed
// because its context was canceled. Swap the wait channel with
// the previous element.
Expand All @@ -329,34 +331,44 @@ func (cq *contentionQueue) add(
}
prevPusher.waitCh, curPusher.waitCh = curPusher.waitCh, prevPusher.waitCh
}
// If the pusher re-executed its request and encountered another
// write intent error, check if it's for the same intent; if so,
// we can set the newIntentTxn to match the new intent.
if newWIErr != nil && len(newWIErr.Intents) == 1 &&
len(newWIErr.Intents[0].EndKey) == 0 && newWIErr.Intents[0].Key.Equal(intent.Key) {
newIntentTxn = &newWIErr.Intents[0].Txn
}
contended.ll.Remove(curElement)

if contended.ll.Len() == 0 {
// If the contendedKey's list is now empty, remove it. We don't need
// to send on or close our waitCh because no one is or ever will wait
// on it.
delete(cq.mu.keys, key)
} else if newIntentTxn != nil {
contended.setLastTxnMeta(newIntentTxn)
} else if newWIErr != nil {
// Note that we don't update last txn meta unless we know for
// sure the txn which has written the most recent intent to the
// contended key (i.e. newWIErr != nil).
contended.setLastTxnMeta(nil)
}
if newIntentTxn != nil {
// Shallow copy the TxnMeta. After this request returns (i.e. now), we might
// mutate it (DistSender and such), but the receiver of the channel will read
// it.
newIntentTxnCopy := *newIntentTxn
newIntentTxn = &newIntentTxnCopy
} else {
// If the pusher re-executed its request and encountered another
// write intent error, check if it's for the same intent; if so,
// we can set the newIntentTxn to match the new intent. If not,
// make sure that we don't pollute the old contendedKey with any
// new information.
if newWIErr != nil {
sameKey := len(newWIErr.Intents) == 1 && newWIErr.Intents[0].Span.Equal(intent.Span)
if sameKey {
newIntentTxn = &newWIErr.Intents[0].Txn
} else {
// If the pusher re-executed and found a different intent, make
// sure that we don't tell the old contendedKey anything about
// the new intent's transaction. This new intent could be from
// an earlier request in the batch than the one that previously
// hit the error, so we don't know anything about the state of
// the old intent.
newIntentTxn = nil
}
}
if newIntentTxn != nil {
// Shallow copy the TxnMeta. After this request returns (i.e.
// now), we might mutate it (DistSender and such), but the
// receiver of the channel will read it.
newIntentTxnCopy := *newIntentTxn
newIntentTxn = &newIntentTxnCopy
contended.setLastTxnMeta(newIntentTxn)
}
curPusher.waitCh <- newIntentTxn
close(curPusher.waitCh)
}
curPusher.waitCh <- newIntentTxn
close(curPusher.waitCh)
cq.mu.Unlock()
}, wiErr, done
}

Expand Down
Loading

0 comments on commit f5c1c7e

Please sign in to comment.