Skip to content

Commit

Permalink
storage: queue requests to push txn / resolve intents on single keys
Browse files Browse the repository at this point in the history
Previously, high contention on a single key would cause every thread to
push the same conflicting transaction then resolve the same intent in
parallel. This is inefficient as only one pusher needs to succeed, and
only one resolver needs to resolve the intent, and then only one writer
should proceed while the other readers/writers should in turn wait on
the previous writer by pushing its transaction. This effectively
serializes the conflicting reader/writers.

One complication is that all pushers which have a valid, writing
transaction (`Transaction.Writing = true`), must push either the
conflicting transaction or another transaction already pushing that
transaction. This allows dependency cycles to be discovered.
  • Loading branch information
spencerkimball committed May 4, 2018
1 parent 9a54f77 commit d066ef3
Show file tree
Hide file tree
Showing 5 changed files with 451 additions and 17 deletions.
227 changes: 214 additions & 13 deletions pkg/storage/intent_resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package storage

import (
"container/list"
"context"
"sort"
"time"
Expand Down Expand Up @@ -58,12 +59,197 @@ const (
intentResolverBatchSize = 100
)

type pusher struct {
txn *roachpb.Transaction
waitCh chan *roachpb.Transaction
}

func newPusher(txn *roachpb.Transaction) *pusher {
return &pusher{
txn: txn,
waitCh: make(chan *roachpb.Transaction, 1),
}
}

func (p *pusher) writingTxn() bool {
return p.txn != nil && p.txn.Writing
}

// contentionQueue handles contention on keys with conflicting intents
// by forming queues of "pushers" which are requests that experienced
// a WriteIntentError. There is a queue for each key with one or more
// pushers. The physical implementation of queues is complicated by
// the difference between pushers with writing transactions (i.e. they
// have a transaction record which can by pushed) and non-writing
// transactions (e.g., non-transactional requests, read-only
// transactions, and transactions trying to start but which experienced
// a WriteIntentError on the batch containing the BeginTxn).
//
// Queues are linked lists, with each element containing a transaction
// (can be nil), and a wait channel. The wait channel is closed when
// the request is dequeued and run to completion, whether to success
// or failure. Pushers without a writing transaction wait on the most
// recent pusher in the queue to complete. However, pushers with a
// writing transaction must actually send a PushTxn RPC. This is
// necessary in order to properly detect dependency cycles. However,
// instead of all pushers with writing transactions pushing the same
// transaction which owns the original intent, they instead push the
// most recent pusher in the queue with a writing transaction. In this
// fashion, all writing transactions are daisy chained in such a way
// that they run to completion in a serialized fashion, and also still
// allow the system to detect dependency cycles and abort on deadlock.
type contentionQueue struct {
// keys is a map from key to a linked list of pusher instances,
// ordered [loosely, see above] as a FIFO queue.
mu struct {
syncutil.Mutex
keys map[string]*list.List
}
}

func newContentionQueue() *contentionQueue {
cq := &contentionQueue{}
cq.mu.keys = map[string]*list.List{}
return cq
}

// add adds the intent specified in the supplied wiErr to the
// contention queue. This may block the current goroutine if the
// pusher has no transaction or the transaction is not yet writing
// (i.e. read-only or hasn't successfully executed BeginTxn).
//
// Otherwise, returns a cleanup function to be invoked by the caller
// after the original request completes, a possibly updated
// WriteIntentError and a bool indicating whether the intent resolver
// should regard the original push / resolve as no longer applicable
// and skip those steps to retry the original request that generated
// the WriteIntentError.
func (cq *contentionQueue) add(
ctx context.Context,
wiErr *roachpb.WriteIntentError,
h roachpb.Header,
) (func(*roachpb.Transaction), *roachpb.WriteIntentError, bool) {
cq.mu.Lock()
intent := wiErr.Intents[0]
key := string(intent.Span.Key)
curPusher := newPusher(h.Txn)

// updateErr creates a copy of the write intent error with the
// intent updated to point to a new transaction.
updateErr := func(txn *roachpb.Transaction) *roachpb.WriteIntentError {
wiErrCopy := *wiErr
wiErrCopy.Intents = []roachpb.Intent{
roachpb.Intent{
Span: intent.Span,
Txn: txn.TxnMeta,
Status: txn.Status,
},
}
return &wiErrCopy
}

// Consider prior pushers in reverse arrival order to build queue
// by waiting on the most recent overlapping pusher.
var waitCh chan *roachpb.Transaction
var alreadyInserted bool
var curElement *list.Element
contended, ok := cq.mu.keys[key]
if !ok {
contended = list.New()
cq.mu.keys[key] = contended
}
for e := contended.Back(); e != nil; e = e.Prev() {
p := e.Value.(*pusher)
// If both the current and prior pusher have writing transactions,
// redirect current to await prior txn's completion.
//
// However, if i==0, the current pusher is the first pusher with a
// writing transaction. In this case, the current pusher must
// still push the transaction of the original pushee (i.e. the
// owner of the intent). This is true because the non-writing
// pushers ahead in the queue do not have transaction records on
// which to daisy chain PushTxn requests. Because the current pusher
// is a writing transaction, it may be involved in a dependency cycle
// and must push a transaction in order to detect and break deadlocks.
//
// While both the first non-writing pusher and the current pusher
// are both pushing the original pushee, and will finish their
// pushes at the same time, we want to divert other non-writing
// pushers to wait on the current pusher instead. This is more
// likely to create an orderly serialization because this pusher
// will leave an intent.
if curPusher.writingTxn() && (p.writingTxn() || e == contended.Front()) {
// The redirect case.
if p.writingTxn() {
wiErr = updateErr(p.txn)
}
// If applicable, insert this pusher earlier into the queue of
// waiting pushers, so that pushers with non-writing txns will
// wait on the outcome of this pusher. Note that this reordering
// to put writing txns first and daisy chain their pushers
// results in 20% better performance on YCSB workload A.
if e != contended.Back() {
if p.writingTxn() {
curElement = contended.InsertAfter(curPusher, e)
} else {
curElement = contended.InsertBefore(curPusher, e)
}
curPusher.waitCh, p.waitCh = p.waitCh, curPusher.waitCh
alreadyInserted = true
}
break
} else if !curPusher.writingTxn() {
// Otherwise, if the pusher is not writing, it always waits on
// the most recent overlapping prior pusher.
waitCh = p.waitCh
break
}
}
// If not already inserted, append the current pusher to the queue.
if !alreadyInserted {
curElement = contended.PushBack(curPusher)
}
cq.mu.Unlock()

// Wait synchronously on a prior pusher if the current pusher does
// not have a writing txn. Writing txns must push some txn
// immediately in order to detect dependency cycles.
var done bool
if waitCh != nil {
select {
case txn := <-waitCh:
// If the prior pusher wrote an intent, push it instead.
if txn != nil {
wiErr = updateErr(txn)
} else {
// No intent was left by the prior pusher; don't push, go
// immediately to retrying the conflicted request.
done = true
}
case <-ctx.Done():
// The pusher's context timed out. Return without pushing.
done = true
}
}

return func(txn *roachpb.Transaction) {
cq.mu.Lock()
contended.Remove(curElement)
if contended.Len() == 0 {
delete(cq.mu.keys, key)
}
curPusher.waitCh <- txn
cq.mu.Unlock()
}, wiErr, done
}

// intentResolver manages the process of pushing transactions and
// resolving intents.
type intentResolver struct {
store *Store

sem chan struct{} // Semaphore to limit async goroutines.
sem chan struct{} // Semaphore to limit async goroutines.
contentionQ *contentionQueue // manages contention on individual keys

mu struct {
syncutil.Mutex
Expand All @@ -78,8 +264,9 @@ type intentResolver struct {

func newIntentResolver(store *Store, taskLimit int) *intentResolver {
ir := &intentResolver{
store: store,
sem: make(chan struct{}, taskLimit),
store: store,
sem: make(chan struct{}, taskLimit),
contentionQ: newContentionQueue(),
}
ir.mu.inFlightPushes = map[uuid.UUID]int{}
ir.mu.inFlightTxnCleanups = map[uuid.UUID]struct{}{}
Expand All @@ -88,29 +275,43 @@ func newIntentResolver(store *Store, taskLimit int) *intentResolver {

// processWriteIntentError tries to push the conflicting
// transaction(s) responsible for the given WriteIntentError, and to
// resolve those intents if possible. Returns a new error to be used
// in place of the original.
// resolve those intents if possible. Returns a cleanup function and
// potentially a new error to be used in place of the original. The
// cleanup function should be invoked by the caller after the request
// which experienced the conflict has completed with a parameter
// specifying a transaction in the event that the request left its own
// intent.
func (ir *intentResolver) processWriteIntentError(
ctx context.Context,
wiPErr *roachpb.Error,
args roachpb.Request,
h roachpb.Header,
pushType roachpb.PushTxnType,
) *roachpb.Error {
) (func(*roachpb.Transaction), *roachpb.Error) {
wiErr, ok := wiPErr.GetDetail().(*roachpb.WriteIntentError)
if !ok {
return roachpb.NewErrorf("not a WriteIntentError: %v", wiPErr)
return nil, roachpb.NewErrorf("not a WriteIntentError: %v", wiPErr)
}

if log.V(6) {
log.Infof(ctx, "resolving write intent %s", wiErr)
}

// Possibly queue this processing if the write intent error is for a
// single intent affecting a unitary key.
var cleanup func(*roachpb.Transaction)
if len(wiErr.Intents) == 1 && len(wiErr.Intents[0].Span.EndKey) == 0 {
var done bool
if cleanup, wiErr, done = ir.contentionQ.add(ctx, wiErr, h); done {
return cleanup, nil
}
}

resolveIntents, pErr := ir.maybePushTransactions(
ctx, wiErr.Intents, h, pushType, false, /* skipIfInFlight */
)
if pErr != nil {
return pErr
return cleanup, pErr
}

// We always poison due to limitations of the API: not poisoning equals
Expand All @@ -125,10 +326,10 @@ func (ir *intentResolver) processWriteIntentError(
// poison.
if err := ir.resolveIntents(ctx, resolveIntents,
ResolveOptions{Wait: false, Poison: true}); err != nil {
return roachpb.NewError(err)
return cleanup, roachpb.NewError(err)
}

return nil
return cleanup, nil
}

// maybePushTransactions tries to push the conflicting transaction(s)
Expand Down Expand Up @@ -180,7 +381,7 @@ func (ir *intentResolver) maybePushTransactions(
ir.mu.Lock()
// TODO(tschottdorf): can optimize this and use same underlying slice.
var pushIntents []roachpb.Intent
cleanupPushIntentsLocked := func() {
cleanupInFlightPushesLocked := func() {
for _, intent := range pushIntents {
ir.mu.inFlightPushes[intent.Txn.ID]--
if ir.mu.inFlightPushes[intent.Txn.ID] == 0 {
Expand All @@ -195,7 +396,7 @@ func (ir *intentResolver) maybePushTransactions(
// because the transaction is already finalized.
// This shouldn't happen as all intents created are in
// the PENDING status.
cleanupPushIntentsLocked()
cleanupInFlightPushesLocked()
ir.mu.Unlock()
return nil, roachpb.NewErrorf("unexpected %s intent: %+v", intent.Status, intent)
}
Expand Down Expand Up @@ -247,7 +448,7 @@ func (ir *intentResolver) maybePushTransactions(
pErr = b.MustPErr()
}
ir.mu.Lock()
cleanupPushIntentsLocked()
cleanupInFlightPushesLocked()
ir.mu.Unlock()
if pErr != nil {
return nil, pErr
Expand Down
Loading

0 comments on commit d066ef3

Please sign in to comment.