Skip to content

Commit

Permalink
storage: Dedupe PushTxn calls from async intent resolution
Browse files Browse the repository at this point in the history
An explosion of these calls for the same transaction (or a small number
of transactions) was responsible for cockroachdb#4925

Fixes cockroachdb#4925
  • Loading branch information
bdarnell committed Mar 14, 2016
1 parent 76f0dd6 commit d1655b0
Showing 1 changed file with 42 additions and 16 deletions.
58 changes: 42 additions & 16 deletions storage/intent_resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/cockroachdb/cockroach/roachpb"
"github.com/cockroachdb/cockroach/util/log"
"github.com/cockroachdb/cockroach/util/tracing"
"github.com/cockroachdb/cockroach/util/uuid"
"github.com/opentracing/opentracing-go"
"golang.org/x/net/context"
)
Expand All @@ -35,10 +36,20 @@ import (
// resolving intents.
type intentResolver struct {
store *Store

mu struct {
sync.Mutex
// Maps transaction ids to a refcount.
inFlight map[*uuid.UUID]int
}
}

func newIntentResolver(store *Store) *intentResolver {
return &intentResolver{store}
ir := &intentResolver{
store: store,
}
ir.mu.inFlight = map[*uuid.UUID]int{}
return ir
}

// processWriteIntentError tries to push the conflicting
Expand All @@ -59,7 +70,7 @@ func (ir *intentResolver) processWriteIntentError(ctx context.Context, wiErr roa
method := args.Method()
readOnly := roachpb.IsReadOnly(args) // TODO(tschottdorf): pass as param

resolveIntents, pushErr := ir.maybePushTransactions(ctx, wiErr.Intents, h, pushType)
resolveIntents, pushErr := ir.maybePushTransactions(ctx, wiErr.Intents, h, pushType, false)
if resErr := ir.resolveIntents(ctx, r, resolveIntents, false /* !wait */, true /* poison */); resErr != nil {
// When resolving without waiting, errors should not
// usually be returned here, although there are some cases
Expand Down Expand Up @@ -113,38 +124,45 @@ func (ir *intentResolver) processWriteIntentError(ctx context.Context, wiErr roa
// c) resolving intents upon EndTransaction which are not local to the given
// range. This is the only path in which the transaction is going to be
// in non-pending state and doesn't require a push.
func (ir *intentResolver) maybePushTransactions(ctx context.Context, intents []roachpb.Intent, h roachpb.Header, pushType roachpb.PushTxnType) ([]roachpb.Intent, *roachpb.Error) {
func (ir *intentResolver) maybePushTransactions(ctx context.Context, intents []roachpb.Intent, h roachpb.Header, pushType roachpb.PushTxnType, skipInFlight bool) ([]roachpb.Intent, *roachpb.Error) {
now := ir.store.Clock().Now()

pusherTxn := h.Txn
// If there's no pusher, we communicate a priority by sending an empty
// txn with only the priority set.
if pusherTxn == nil {
pusherTxn = &roachpb.Transaction{
Priority: roachpb.MakePriority(h.UserPriority),
}
}

sp, cleanupSp := tracing.SpanFromContext(opStore, ir.store.Tracer(), ctx)
defer cleanupSp()
sp.LogEvent("intent resolution")

// Split intents into those we need to push and those which are good to
// resolve.
ir.mu.Lock()
// TODO(tschottdorf): can optimize this and use same underlying slice.
var pushIntents, resolveIntents []roachpb.Intent
for _, intent := range intents {
// The current intent does not need conflict resolution.
if intent.Status != roachpb.PENDING {
// The current intent does not need conflict resolution
// because the transaction is already finalized.
// TODO(bdarnell): can this happen any more?
resolveIntents = append(resolveIntents, intent)
} else if _, ok := ir.mu.inFlight[intent.Txn.ID]; ok && skipInFlight {
// Another goroutine is working on this transaction so we can
// skip it.
continue
} else {
pushIntents = append(pushIntents, intent)
ir.mu.inFlight[intent.Txn.ID]++
}
}
ir.mu.Unlock()

// Attempt to push the transaction(s) which created the conflicting intent(s).
now := ir.store.Clock().Now()

// TODO(tschottdorf): need deduplication here (many pushes for the same
// txn are awkward but even worse, could ratchet up the priority).
// If there's no pusher, we communicate a priority by sending an empty
// txn with only the priority set.
if pusherTxn == nil {
pusherTxn = &roachpb.Transaction{
Priority: roachpb.MakePriority(h.UserPriority),
}
}
var pushReqs []roachpb.Request
for _, intent := range pushIntents {
pushReqs = append(pushReqs, &roachpb.PushTxnRequest{
Expand All @@ -168,6 +186,14 @@ func (ir *intentResolver) maybePushTransactions(ctx context.Context, intents []r
b := &client.Batch{}
b.InternalAddRequest(pushReqs...)
br, err := ir.store.db.RunWithResponse(b)
ir.mu.Lock()
for _, intent := range pushIntents {
ir.mu.inFlight[intent.Txn.ID]--
if ir.mu.inFlight[intent.Txn.ID] == 0 {
delete(ir.mu.inFlight, intent.Txn.ID)
}
}
ir.mu.Unlock()
if err != nil {
// TODO(bdarnell): return resolveIntents even on error.
return nil, err
Expand Down Expand Up @@ -208,7 +234,7 @@ func (ir *intentResolver) processIntentsAsync(r *Replica, intents []intentsWithA
defer cancel()
h := roachpb.Header{Timestamp: now}
resolveIntents, pushErr := ir.maybePushTransactions(ctxWithTimeout,
item.intents, h, roachpb.PUSH_TOUCH)
item.intents, h, roachpb.PUSH_TOUCH, true /* skipInFlight */)
if pErr := ir.resolveIntents(ctxWithTimeout, r, resolveIntents, true /* wait */, false /* TODO(tschottdorf): #5088 */); pErr != nil {
log.Warningc(ctxWithTimeout, "failed to resolve intents: %s", pErr)
return
Expand Down

0 comments on commit d1655b0

Please sign in to comment.