Skip to content

Commit

Permalink
Merge pull request #32853 from nvanbenschoten/backport2.1-32773
Browse files Browse the repository at this point in the history
release-2.1: storage: don't clear lastTxnMeta on WriteIntentError to different key
  • Loading branch information
nvanbenschoten authored Dec 5, 2018
2 parents c00af9f + 1146a03 commit 8d807ce
Show file tree
Hide file tree
Showing 2 changed files with 321 additions and 38 deletions.
86 changes: 49 additions & 37 deletions pkg/storage/intent_resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,9 +100,6 @@ func newContendedKey() *contendedKey {
// detection of dependency cycles.
func (ck *contendedKey) setLastTxnMeta(txnMeta *enginepb.TxnMeta) {
ck.lastTxnMeta = txnMeta
if txnMeta == nil {
return
}
for e := ck.ll.Front(); e != nil; e = e.Next() {
p := e.Value.(*pusher)
if p.detectCh != nil {
Expand Down Expand Up @@ -171,7 +168,8 @@ func txnID(txn *roachpb.Transaction) string {
// newWIErr, non-nil in case the re-executed request experienced
// another write intent error and could not complete; and
// newIntentTxn, nil if the re-executed request left no intent, and
// non-nil if it did.
// non-nil if it did. At most one of these two arguments should be
// provided.
func (cq *contentionQueue) add(
ctx context.Context, wiErr *roachpb.WriteIntentError, h roachpb.Header,
) (
Expand All @@ -198,7 +196,7 @@ func (cq *contentionQueue) add(
contended = newContendedKey()
contended.setLastTxnMeta(&intent.Txn)
cq.mu.keys[key] = contended
} else if contended.lastTxnMeta == nil || contended.lastTxnMeta.ID != intent.Txn.ID {
} else if contended.lastTxnMeta.ID != intent.Txn.ID {
contended.setLastTxnMeta(&intent.Txn)
}

Expand Down Expand Up @@ -274,17 +272,11 @@ func (cq *contentionQueue) add(
frontOfQueue := curElement == contended.ll.Front()
pusheeTxn := contended.lastTxnMeta
cq.mu.Unlock()
// If we're at the start of the queue, or there's no pushee
// transaction (the previous pusher didn't leave an intent),
// loop and wait for the wait channel to signal.
// If we're at the start of the queue loop and wait
// for the wait channel to signal.
if frontOfQueue {
log.VEventf(ctx, 3, "%s at front of queue; breaking from loop", txnID(curPusher.txn))
break Loop
} else if pusheeTxn == nil {
log.VEventf(ctx, 3, "%s cycle detection skipped because there is no txn to push", txnID(curPusher.txn))
detectCh = nil
detectReady = time.After(dependencyCyclePushDelay)
continue
}
pushReq := &roachpb.PushTxnRequest{
RequestHeader: roachpb.RequestHeader{
Expand Down Expand Up @@ -313,12 +305,22 @@ func (cq *contentionQueue) add(
}

return func(newWIErr *roachpb.WriteIntentError, newIntentTxn *enginepb.TxnMeta) {
if newWIErr != nil && newIntentTxn != nil {
// The need for this implies that the function should be split
// into a more rigidly defined handle with multiple accessors.
// TODO(nvanbenschoten): clean this up and test better when we're
// not intending the change to be backported.
panic("newWIErr and newIntentTxn both provided")
}
if newWIErr == nil {
log.VEventf(ctx, 3, "%s finished, leaving intent? %t (owned by %s)", txnID(curPusher.txn), newIntentTxn != nil, newIntentTxn)
} else {
log.VEventf(ctx, 3, "%s encountered another write intent error %s", txnID(curPusher.txn), newWIErr)
}
cq.mu.Lock()
defer cq.mu.Unlock()

// Remove the current element from its list of pushers.
// If the current element isn't the front, it's being removed
// because its context was canceled. Swap the wait channel with
// the previous element.
Expand All @@ -329,34 +331,44 @@ func (cq *contentionQueue) add(
}
prevPusher.waitCh, curPusher.waitCh = curPusher.waitCh, prevPusher.waitCh
}
// If the pusher re-executed its request and encountered another
// write intent error, check if it's for the same intent; if so,
// we can set the newIntentTxn to match the new intent.
if newWIErr != nil && len(newWIErr.Intents) == 1 &&
len(newWIErr.Intents[0].EndKey) == 0 && newWIErr.Intents[0].Key.Equal(intent.Key) {
newIntentTxn = &newWIErr.Intents[0].Txn
}
contended.ll.Remove(curElement)

if contended.ll.Len() == 0 {
// If the contendedKey's list is now empty, remove it. We don't need
// to send on or close our waitCh because no one is or ever will wait
// on it.
delete(cq.mu.keys, key)
} else if newIntentTxn != nil {
contended.setLastTxnMeta(newIntentTxn)
} else if newWIErr != nil {
// Note that we don't update last txn meta unless we know for
// sure the txn which has written the most recent intent to the
// contended key (i.e. newWIErr != nil).
contended.setLastTxnMeta(nil)
}
if newIntentTxn != nil {
// Shallow copy the TxnMeta. After this request returns (i.e. now), we might
// mutate it (DistSender and such), but the receiver of the channel will read
// it.
newIntentTxnCopy := *newIntentTxn
newIntentTxn = &newIntentTxnCopy
} else {
// If the pusher re-executed its request and encountered another
// write intent error, check if it's for the same intent; if so,
// we can set the newIntentTxn to match the new intent. If not,
// make sure that we don't pollute the old contendedKey with any
// new information.
if newWIErr != nil {
sameKey := len(newWIErr.Intents) == 1 && newWIErr.Intents[0].Span.Equal(intent.Span)
if sameKey {
newIntentTxn = &newWIErr.Intents[0].Txn
} else {
// If the pusher re-executed and found a different intent, make
// sure that we don't tell the old contendedKey anything about
// the new intent's transaction. This new intent could be from
// an earlier request in the batch than the one that previously
// hit the error, so we don't know anything about the state of
// the old intent.
newIntentTxn = nil
}
}
if newIntentTxn != nil {
// Shallow copy the TxnMeta. After this request returns (i.e.
// now), we might mutate it (DistSender and such), but the
// receiver of the channel will read it.
newIntentTxnCopy := *newIntentTxn
newIntentTxn = &newIntentTxnCopy
contended.setLastTxnMeta(newIntentTxn)
}
curPusher.waitCh <- newIntentTxn
close(curPusher.waitCh)
}
curPusher.waitCh <- newIntentTxn
close(curPusher.waitCh)
cq.mu.Unlock()
}, wiErr, done
}

Expand Down
Loading

0 comments on commit 8d807ce

Please sign in to comment.