diff --git a/storage/intent_resolver.go b/storage/intent_resolver.go index 0f372062fb52..5edcbdf0af32 100644 --- a/storage/intent_resolver.go +++ b/storage/intent_resolver.go @@ -27,6 +27,7 @@ import ( "github.com/cockroachdb/cockroach/roachpb" "github.com/cockroachdb/cockroach/util/log" "github.com/cockroachdb/cockroach/util/tracing" + "github.com/cockroachdb/cockroach/util/uuid" "github.com/opentracing/opentracing-go" "golang.org/x/net/context" ) @@ -35,10 +36,20 @@ import ( // resolving intents. type intentResolver struct { store *Store + + mu struct { + sync.Mutex + // Maps transaction ids to a refcount. + inFlight map[*uuid.UUID]int + } } func newIntentResolver(store *Store) *intentResolver { - return &intentResolver{store} + ir := &intentResolver{ + store: store, + } + ir.mu.inFlight = map[*uuid.UUID]int{} + return ir } // processWriteIntentError tries to push the conflicting @@ -59,7 +70,7 @@ func (ir *intentResolver) processWriteIntentError(ctx context.Context, wiErr roa method := args.Method() readOnly := roachpb.IsReadOnly(args) // TODO(tschottdorf): pass as param - resolveIntents, pushErr := ir.maybePushTransactions(ctx, wiErr.Intents, h, pushType) + resolveIntents, pushErr := ir.maybePushTransactions(ctx, wiErr.Intents, h, pushType, false) if resErr := ir.resolveIntents(ctx, r, resolveIntents, false /* !wait */, true /* poison */); resErr != nil { // When resolving without waiting, errors should not // usually be returned here, although there are some cases @@ -113,8 +124,17 @@ func (ir *intentResolver) processWriteIntentError(ctx context.Context, wiErr roa // c) resolving intents upon EndTransaction which are not local to the given // range. This is the only path in which the transaction is going to be // in non-pending state and doesn't require a push. -func (ir *intentResolver) maybePushTransactions(ctx context.Context, intents []roachpb.Intent, h roachpb.Header, pushType roachpb.PushTxnType) ([]roachpb.Intent, *roachpb.Error) { +func (ir *intentResolver) maybePushTransactions(ctx context.Context, intents []roachpb.Intent, h roachpb.Header, pushType roachpb.PushTxnType, skipInFlight bool) ([]roachpb.Intent, *roachpb.Error) { + now := ir.store.Clock().Now() + pusherTxn := h.Txn + // If there's no pusher, we communicate a priority by sending an empty + // txn with only the priority set. + if pusherTxn == nil { + pusherTxn = &roachpb.Transaction{ + Priority: roachpb.MakePriority(h.UserPriority), + } + } sp, cleanupSp := tracing.SpanFromContext(opStore, ir.store.Tracer(), ctx) defer cleanupSp() @@ -122,29 +142,27 @@ func (ir *intentResolver) maybePushTransactions(ctx context.Context, intents []r // Split intents into those we need to push and those which are good to // resolve. + ir.mu.Lock() // TODO(tschottdorf): can optimize this and use same underlying slice. var pushIntents, resolveIntents []roachpb.Intent for _, intent := range intents { - // The current intent does not need conflict resolution. if intent.Status != roachpb.PENDING { + // The current intent does not need conflict resolution + // because the transaction is already finalized. + // TODO(bdarnell): can this happen any more? resolveIntents = append(resolveIntents, intent) + } else if _, ok := ir.mu.inFlight[intent.Txn.ID]; ok && skipInFlight { + // Another goroutine is working on this transaction so we can + // skip it. + continue } else { pushIntents = append(pushIntents, intent) + ir.mu.inFlight[intent.Txn.ID]++ } } + ir.mu.Unlock() // Attempt to push the transaction(s) which created the conflicting intent(s). - now := ir.store.Clock().Now() - - // TODO(tschottdorf): need deduplication here (many pushes for the same - // txn are awkward but even worse, could ratchet up the priority). - // If there's no pusher, we communicate a priority by sending an empty - // txn with only the priority set. - if pusherTxn == nil { - pusherTxn = &roachpb.Transaction{ - Priority: roachpb.MakePriority(h.UserPriority), - } - } var pushReqs []roachpb.Request for _, intent := range pushIntents { pushReqs = append(pushReqs, &roachpb.PushTxnRequest{ @@ -168,6 +186,14 @@ func (ir *intentResolver) maybePushTransactions(ctx context.Context, intents []r b := &client.Batch{} b.InternalAddRequest(pushReqs...) br, err := ir.store.db.RunWithResponse(b) + ir.mu.Lock() + for _, intent := range pushIntents { + ir.mu.inFlight[intent.Txn.ID]-- + if ir.mu.inFlight[intent.Txn.ID] == 0 { + delete(ir.mu.inFlight, intent.Txn.ID) + } + } + ir.mu.Unlock() if err != nil { // TODO(bdarnell): return resolveIntents even on error. return nil, err @@ -208,7 +234,7 @@ func (ir *intentResolver) processIntentsAsync(r *Replica, intents []intentsWithA defer cancel() h := roachpb.Header{Timestamp: now} resolveIntents, pushErr := ir.maybePushTransactions(ctxWithTimeout, - item.intents, h, roachpb.PUSH_TOUCH) + item.intents, h, roachpb.PUSH_TOUCH, true /* skipInFlight */) if pErr := ir.resolveIntents(ctxWithTimeout, r, resolveIntents, true /* wait */, false /* TODO(tschottdorf): #5088 */); pErr != nil { log.Warningc(ctxWithTimeout, "failed to resolve intents: %s", pErr) return