diff --git a/pkg/storage/intent_resolver.go b/pkg/storage/intent_resolver.go index 68bd3bd11353..b456efaf6dcd 100644 --- a/pkg/storage/intent_resolver.go +++ b/pkg/storage/intent_resolver.go @@ -16,6 +16,7 @@ package storage import ( + "container/list" "context" "sort" "time" @@ -58,12 +59,201 @@ const ( intentResolverBatchSize = 100 ) +type pusher struct { + txn *roachpb.Transaction + waitCh chan *roachpb.Transaction +} + +func newPusher(txn *roachpb.Transaction) *pusher { + return &pusher{ + txn: txn, + waitCh: make(chan *roachpb.Transaction, 1), + } +} + +func (p *pusher) writingTxn() bool { + return p.txn != nil && p.txn.Writing +} + +// contentionQueue handles contention on keys with conflicting intents +// by forming queues of "pushers" which are requests that experienced +// a WriteIntentError. There is a queue for each key with one or more +// pushers. The physical implementation of queues is complicated by +// the difference between pushers with writing transactions (i.e. they +// have a transaction record which can be pushed) and non-writing +// transactions (e.g., non-transactional requests, read-only +// transactions, and transactions trying to start but which experienced +// a WriteIntentError on the batch containing the BeginTxn). +// +// Queues are linked lists, with each element containing a transaction +// (can be nil), and a wait channel. The wait channel is closed when +// the request is dequeued and run to completion, whether to success +// or failure. Pushers without a writing transaction wait on the most +// recent pusher in the queue to complete. However, pushers with a +// writing transaction must actually send a PushTxn RPC. This is +// necessary in order to properly detect dependency cycles. However, +// instead of all pushers with writing transactions pushing the same +// transaction which owns the original intent, they instead push the +// most recent pusher in the queue with a writing transaction. In this +// fashion, all writing transactions are daisy chained in such a way +// that they run to completion in a serialized fashion, and also still +// allow the system to detect dependency cycles and abort on deadlock. +type contentionQueue struct { + // keys is a map from key to a linked list of pusher instances, + // ordered [loosely, see above] as a FIFO queue. + mu struct { + syncutil.Mutex + keys map[string]*list.List + } +} + +func newContentionQueue() *contentionQueue { + cq := &contentionQueue{} + cq.mu.keys = map[string]*list.List{} + return cq +} + +// add adds the intent specified in the supplied wiErr to the +// contention queue. This may block the current goroutine if the +// pusher has no transaction or the transaction is not yet writing +// (i.e. read-only or hasn't successfully executed BeginTxn). +// +// Returns a cleanup function to be invoked by the caller after the +// original request completes, a possibly updated WriteIntentError and +// a bool indicating whether the intent resolver should regard the +// original push / resolve as no longer applicable and skip those +// steps to retry the original request that generated the +// WriteIntentError. +func (cq *contentionQueue) add( + ctx context.Context, + wiErr *roachpb.WriteIntentError, + txn *roachpb.Transaction, +) (func(newIntentTxn *roachpb.Transaction), *roachpb.WriteIntentError, bool) { + cq.mu.Lock() + intent := wiErr.Intents[0] + key := string(intent.Span.Key) + curPusher := newPusher(txn) + + // updateErr creates a copy of the write intent error with the + // intent updated to point to a new transaction. + updateErr := func(txn *roachpb.Transaction) *roachpb.WriteIntentError { + wiErrCopy := *wiErr + wiErrCopy.Intents = []roachpb.Intent{ + roachpb.Intent{ + Span: intent.Span, + Txn: txn.TxnMeta, + Status: txn.Status, + }, + } + return &wiErrCopy + } + + // Consider prior pushers in reverse arrival order to build queue + // by waiting on the most recent overlapping pusher. + var waitCh chan *roachpb.Transaction + var alreadyInserted bool + var curElement *list.Element + contended, ok := cq.mu.keys[key] + if !ok { + contended = list.New() + cq.mu.keys[key] = contended + } + for e := contended.Back(); e != nil; e = e.Prev() { + p := e.Value.(*pusher) + // If both the current and prior pusher have writing transactions, + // redirect current to await prior txn's completion. + // + // However, if the current pusher is the first pusher with a + // writing transaction, it must still push the transaction of the + // original pushee (i.e. the owner of the intent). This is true + // because the non-writing pushers ahead in the queue do not have + // transaction records on which to daisy chain PushTxn + // requests. Because the current pusher is a writing transaction, + // it may be involved in a dependency cycle and must push a + // transaction in order to detect and break deadlocks. + // + // While both the first non-writing pusher and the current pusher + // are both pushing the original pushee, and will finish their + // pushes at the same time, we want to divert other non-writing + // pushers to wait on the current pusher instead. This is more + // likely to create an orderly serialization because this pusher + // will leave an intent. + if curPusher.writingTxn() && (p.writingTxn() || e == contended.Front()) { + // The redirect case. + if p.writingTxn() { + wiErr = updateErr(p.txn) + } + // If applicable, insert this pusher earlier into the queue of + // waiting pushers, so that pushers with non-writing txns will + // wait on the outcome of this pusher. Note that this reordering + // to put writing txns first and daisy chain their pushers + // results in 20% better performance on YCSB workload A. + if e != contended.Back() { + if p.writingTxn() { + curElement = contended.InsertAfter(curPusher, e) + } else { + curElement = contended.InsertBefore(curPusher, e) + curPusher.waitCh, p.waitCh = p.waitCh, curPusher.waitCh + } + alreadyInserted = true + } + break + } else if !curPusher.writingTxn() { + // Otherwise, if the pusher is not writing, it always waits on + // the most recent overlapping prior pusher. + waitCh = p.waitCh + break + } + } + // If not already inserted, append the current pusher to the queue. + if !alreadyInserted { + curElement = contended.PushBack(curPusher) + } + cq.mu.Unlock() + + // Wait synchronously on a prior pusher if the current pusher does + // not have a writing txn. Writing txns must push some txn + // immediately in order to detect dependency cycles. + var done bool + if waitCh != nil { + select { + case txn, ok := <-waitCh: + if !ok { + log.Fatalf(ctx, "the wait channel of a prior pusher was used twice (pusher=%s)", txn) + } + // If the prior pusher wrote an intent, push it instead. + if txn != nil { + wiErr = updateErr(txn) + } else { + // No intent was left by the prior pusher; don't push, go + // immediately to retrying the conflicted request. + done = true + } + case <-ctx.Done(): + // The pusher's context timed out. Return without pushing. + done = true + } + } + + return func(newIntentTxn *roachpb.Transaction) { + cq.mu.Lock() + contended.Remove(curElement) + if contended.Len() == 0 { + delete(cq.mu.keys, key) + } + curPusher.waitCh <- newIntentTxn + close(curPusher.waitCh) + cq.mu.Unlock() + }, wiErr, done +} + // intentResolver manages the process of pushing transactions and // resolving intents. type intentResolver struct { store *Store - sem chan struct{} // Semaphore to limit async goroutines. + sem chan struct{} // Semaphore to limit async goroutines. + contentionQ *contentionQueue // manages contention on individual keys mu struct { syncutil.Mutex @@ -78,8 +268,9 @@ type intentResolver struct { func newIntentResolver(store *Store, taskLimit int) *intentResolver { ir := &intentResolver{ - store: store, - sem: make(chan struct{}, taskLimit), + store: store, + sem: make(chan struct{}, taskLimit), + contentionQ: newContentionQueue(), } ir.mu.inFlightPushes = map[uuid.UUID]int{} ir.mu.inFlightTxnCleanups = map[uuid.UUID]struct{}{} @@ -88,29 +279,46 @@ func newIntentResolver(store *Store, taskLimit int) *intentResolver { // processWriteIntentError tries to push the conflicting // transaction(s) responsible for the given WriteIntentError, and to -// resolve those intents if possible. Returns a new error to be used -// in place of the original. +// resolve those intents if possible. Returns a cleanup function and +// potentially a new error to be used in place of the original. The +// cleanup function should be invoked by the caller after the request +// which experienced the conflict has completed with a parameter +// specifying a transaction in the event that the request left its own +// intent. func (ir *intentResolver) processWriteIntentError( ctx context.Context, wiPErr *roachpb.Error, args roachpb.Request, h roachpb.Header, pushType roachpb.PushTxnType, -) *roachpb.Error { +) (func(newIntentTxn *roachpb.Transaction), *roachpb.Error) { wiErr, ok := wiPErr.GetDetail().(*roachpb.WriteIntentError) if !ok { - return roachpb.NewErrorf("not a WriteIntentError: %v", wiPErr) + return nil, roachpb.NewErrorf("not a WriteIntentError: %v", wiPErr) } if log.V(6) { log.Infof(ctx, "resolving write intent %s", wiErr) } + // Possibly queue this processing if the write intent error is for a + // single intent affecting a unitary key. + var cleanup func(*roachpb.Transaction) + if len(wiErr.Intents) == 1 && len(wiErr.Intents[0].Span.EndKey) == 0 { + var done bool + // Note that the write intent error may be mutated here in the event + // that this pusher is queued to wait for a different transaction + // instead. + if cleanup, wiErr, done = ir.contentionQ.add(ctx, wiErr, h.Txn); done { + return cleanup, nil + } + } + resolveIntents, pErr := ir.maybePushTransactions( ctx, wiErr.Intents, h, pushType, false, /* skipIfInFlight */ ) if pErr != nil { - return pErr + return cleanup, pErr } // We always poison due to limitations of the API: not poisoning equals @@ -125,10 +333,10 @@ func (ir *intentResolver) processWriteIntentError( // poison. if err := ir.resolveIntents(ctx, resolveIntents, ResolveOptions{Wait: false, Poison: true}); err != nil { - return roachpb.NewError(err) + return cleanup, roachpb.NewError(err) } - return nil + return cleanup, nil } // maybePushTransactions tries to push the conflicting transaction(s) @@ -180,7 +388,7 @@ func (ir *intentResolver) maybePushTransactions( ir.mu.Lock() // TODO(tschottdorf): can optimize this and use same underlying slice. var pushIntents []roachpb.Intent - cleanupPushIntentsLocked := func() { + cleanupInFlightPushesLocked := func() { for _, intent := range pushIntents { ir.mu.inFlightPushes[intent.Txn.ID]-- if ir.mu.inFlightPushes[intent.Txn.ID] == 0 { @@ -195,7 +403,7 @@ func (ir *intentResolver) maybePushTransactions( // because the transaction is already finalized. // This shouldn't happen as all intents created are in // the PENDING status. - cleanupPushIntentsLocked() + cleanupInFlightPushesLocked() ir.mu.Unlock() return nil, roachpb.NewErrorf("unexpected %s intent: %+v", intent.Status, intent) } @@ -247,7 +455,7 @@ func (ir *intentResolver) maybePushTransactions( pErr = b.MustPErr() } ir.mu.Lock() - cleanupPushIntentsLocked() + cleanupInFlightPushesLocked() ir.mu.Unlock() if pErr != nil { return nil, pErr diff --git a/pkg/storage/intent_resolver_test.go b/pkg/storage/intent_resolver_test.go index c6f4360f81ab..6caa1c04a8a0 100644 --- a/pkg/storage/intent_resolver_test.go +++ b/pkg/storage/intent_resolver_test.go @@ -17,12 +17,17 @@ package storage import ( "context" + "sync" "testing" + "time" + "github.com/cockroachdb/cockroach/pkg/internal/client" "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/storage/engine/enginepb" "github.com/cockroachdb/cockroach/pkg/testutils" "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/stop" + "github.com/pkg/errors" ) // TestPushTransactionsWithNonPendingIntent verifies that maybePushTransactions @@ -48,7 +53,216 @@ func TestPushTransactionsWithNonPendingIntent(t *testing.T) { t.Errorf("expected error on aborted/resolved intent, but got %s", pErr) } if cnt := len(tc.store.intentResolver.mu.inFlightPushes); cnt != 0 { - t.Errorf("expected no inflight pushe refcount map entries, found %d", cnt) + t.Errorf("expected no inflight pushes refcount map entries, found %d", cnt) } } } + +func beginTransaction( + t *testing.T, store *Store, pri roachpb.UserPriority, key roachpb.Key, putKey bool, +) *roachpb.Transaction { + txn := newTransaction("test", key, pri, enginepb.SERIALIZABLE, store.Clock()) + + var ba roachpb.BatchRequest + bt, header := beginTxnArgs(key, txn) + ba.Header = header + ba.Add(&bt) + if putKey { + put := putArgs(key, []byte("value")) + ba.Add(&put) + } + br, pErr := store.TestSender().Send(context.Background(), ba) + if pErr != nil { + t.Fatal(pErr) + } + txn = br.Txn + + return txn +} + +// TestContendedIntent verifies that multiple transactions, some actively +// writing and others read-only, are queued if processing write intent +// errors on a contended key. The test verifies the expected ordering in +// the queue. +func TestContendedIntent(t *testing.T) { + defer leaktest.AfterTest(t)() + + stopper := stop.NewStopper() + defer stopper.Stop(context.TODO()) + store, _ := createTestStore(t, stopper) + ctx, cancel := context.WithCancel(context.Background()) + + key := roachpb.Key("a") + span := roachpb.Span{Key: key} + origTxn := beginTransaction(t, store, 1, key, true /* putKey */) + + roTxn1 := newTransaction("test", key, 1, enginepb.SERIALIZABLE, store.Clock()) + roTxn2 := newTransaction("test", key, 1, enginepb.SERIALIZABLE, store.Clock()) + roTxn3 := newTransaction("test", key, 1, enginepb.SERIALIZABLE, store.Clock()) + rwTxn1 := beginTransaction(t, store, 1, roachpb.Key("b"), true /* putKey */) + rwTxn2 := beginTransaction(t, store, 1, roachpb.Key("c"), true /* putKey */) + + testCases := []struct { + pusher *roachpb.Transaction + expTxns []*roachpb.Transaction + }{ + // First establish a chain of three read-only txns. + {pusher: roTxn1, expTxns: []*roachpb.Transaction{roTxn1}}, + {pusher: roTxn2, expTxns: []*roachpb.Transaction{roTxn1, roTxn2}}, + {pusher: roTxn3, expTxns: []*roachpb.Transaction{roTxn1, roTxn2, roTxn3}}, + // Now, verify that a writing txn is inserted at the start of the queue. + {pusher: rwTxn1, expTxns: []*roachpb.Transaction{rwTxn1, roTxn1, roTxn2, roTxn3}}, + // And a second writing txn is inserted after it. + {pusher: rwTxn2, expTxns: []*roachpb.Transaction{rwTxn1, rwTxn2, roTxn1, roTxn2, roTxn3}}, + } + + var wg sync.WaitGroup + ir := store.intentResolver + for _, tc := range testCases { + t.Run("", func(t *testing.T) { + wiErr := &roachpb.WriteIntentError{Intents: []roachpb.Intent{{Txn: origTxn.TxnMeta, Span: span}}} + h := roachpb.Header{Txn: tc.pusher} + go func() { + wg.Add(1) + _, _ = ir.processWriteIntentError(ctx, roachpb.NewError(wiErr), nil, h, roachpb.PUSH_ABORT) + wg.Done() + }() + testutils.SucceedsSoon(t, func() error { + ir.contentionQ.mu.Lock() + defer ir.contentionQ.mu.Unlock() + contended, ok := ir.contentionQ.mu.keys[string(key)] + if !ok { + return errors.Errorf("key not contended") + } + if lc, let := contended.Len(), len(tc.expTxns); lc != let { + return errors.Errorf("expected len %d; got %d", let, lc) + } + var idx int + for e := contended.Front(); e != nil; e = e.Next() { + p := e.Value.(*pusher) + if p.txn != tc.expTxns[idx] { + return errors.Errorf("expected txn %s at index %d; got %s", tc.expTxns[idx], idx, p.txn) + } + idx++ + } + return nil + }) + }) + } + + // Free up all waiters to complete the test. + cancel() + wg.Wait() +} + +// TestContendedIntentWithDependencyCycle verifies that a queue of +// writers on a contended key, each pushing the prior writer, will +// still notice a dependency cycle. In this case, txn3 writes "a", +// then txn1 writes "b" and "a", then txn2 writes "b", then txn3 +// writes "b". The deadlock is broken by an aborted transaction. +// +// Additional non-transactional reads on the same contended key are +// inserted to verify they do not interfere with writing transactions +// always pushing to ensure the dependency cycle can be detected. +func TestContendedIntentWithDependencyCycle(t *testing.T) { + defer leaktest.AfterTest(t)() + stopper := stop.NewStopper() + defer stopper.Stop(context.TODO()) + store, _ := createTestStore(t, stopper) + ctx := context.Background() + + keyA := roachpb.Key("a") + keyB := roachpb.Key("b") + spanA := roachpb.Span{Key: keyA} + spanB := roachpb.Span{Key: keyB} + + // Create the three transactions; at this point, none of them have + // conflicts. Txn1 has written "b", Txn3 has written "a". + txn1 := beginTransaction(t, store, -3, keyB, true /* putKey */) + txn2 := beginTransaction(t, store, -2, keyB, false /* putKey */) + txn3 := beginTransaction(t, store, -1, keyA, true /* putKey */) + + // Send txn1 put, followed by an end transaction. + txnCh1 := make(chan error, 1) + go func() { + put := putArgs(keyA, []byte("value")) + txn1.Sequence++ + if _, pErr := client.SendWrappedWith(ctx, store.TestSender(), roachpb.Header{Txn: txn1}, &put); pErr != nil { + txnCh1 <- pErr.GoError() + } + et, _ := endTxnArgs(txn1, true) + et.IntentSpans = []roachpb.Span{spanA, spanB} + et.NoRefreshSpans = true + txn1.Sequence++ + _, pErr := client.SendWrappedWith(ctx, store.TestSender(), roachpb.Header{Txn: txn1}, &et) + txnCh1 <- pErr.GoError() + }() + + // Send a non-transactional read to keyB. This adds an early waiter + // to the intent resolver on keyB which txn2 must skip in order to + // properly register itself as a dependency by pushing txn1. + readCh1 := make(chan error, 1) + go func() { + get := getArgs(keyB) + _, pErr := client.SendWrapped(ctx, store.TestSender(), &get) + readCh1 <- pErr.GoError() + }() + + // Send txn2 put, followed by an end transaction. + txnCh2 := make(chan error, 1) + go func() { + put := putArgs(keyB, []byte("value")) + txn2.Sequence++ + repl, pErr := client.SendWrappedWith(ctx, store.TestSender(), roachpb.Header{Txn: txn2}, &put) + if pErr != nil { + txnCh2 <- pErr.GoError() + } + txn2 = repl.Header().Txn + et, _ := endTxnArgs(txn2, true) + et.IntentSpans = []roachpb.Span{spanB} + et.NoRefreshSpans = true + txn2.Sequence++ + _, pErr = client.SendWrappedWith(ctx, store.TestSender(), roachpb.Header{Txn: txn2}, &et) + txnCh2 <- pErr.GoError() + }() + + // Send another non-transactional read to keyB to add a waiter in + // between txn2 and txn3. Txn3 must wait on txn2, instead of getting + // queued behind this reader, in order to establish the dependency cycle. + readCh2 := make(chan error, 1) + go func() { + get := getArgs(keyB) + _, pErr := client.SendWrapped(ctx, store.TestSender(), &get) + readCh2 <- pErr.GoError() + }() + + // Send txn3. Pause for 10ms to make it more likely that we have a + // dependency cycle of length 3, although we don't mind testing + // either way. + time.Sleep(10 * time.Millisecond) + txnCh3 := make(chan error, 1) + go func() { + put := putArgs(keyB, []byte("value")) + txn3.Sequence++ + _, pErr := client.SendWrappedWith(ctx, store.TestSender(), roachpb.Header{Txn: txn3}, &put) + txnCh3 <- pErr.GoError() + }() + + // The third transaction will always be aborted. + err := <-txnCh3 + if _, ok := err.(*roachpb.UnhandledRetryableError); !ok { + t.Fatalf("expected transaction aborted error; got %T", err) + } + if err := <-txnCh1; err != nil { + t.Fatal(err) + } + if err := <-txnCh2; err != nil { + t.Fatal(err) + } + if err := <-readCh1; err != nil { + t.Fatal(err) + } + if err := <-readCh2; err != nil { + t.Fatal(err) + } +} diff --git a/pkg/storage/store.go b/pkg/storage/store.go index 53a0641e83af..9a9674fcab89 100644 --- a/pkg/storage/store.go +++ b/pkg/storage/store.go @@ -2703,6 +2703,20 @@ func (s *Store) Send( log.Eventf(ctx, "executing %d requests", len(ba.Requests)) } + var cleanupAfterWriteIntentError func(newIntentTxn *roachpb.Transaction) + defer func() { + if cleanupAfterWriteIntentError != nil { + // This request wrote an intent only if there was no error, the request + // is transactional, the transaction is still pending, and the request + // wasn't read-only. + if pErr == nil && ba.Txn != nil && br.Txn.Status == roachpb.PENDING && !ba.IsReadOnly() { + cleanupAfterWriteIntentError(br.Txn) + } else { + cleanupAfterWriteIntentError(nil) + } + } + }() + // Add the command to the range for execution; exit retry loop on success. for { // Exit loop if context has been canceled or timed out. @@ -2810,7 +2824,14 @@ func (s *Store) Send( clonedTxn := h.Txn.Clone() h.Txn = &clonedTxn } - if pErr = s.intentResolver.processWriteIntentError(ctx, pErr, args, h, pushType); pErr != nil { + // Handle the case where we get more than one write intent error; + // we need to cleanup the previous attempt to handle it to allow + // any other pusher queued up behind this RPC to proceed. + if cleanupAfterWriteIntentError != nil { + cleanupAfterWriteIntentError(nil) + } + if cleanupAfterWriteIntentError, pErr = + s.intentResolver.processWriteIntentError(ctx, pErr, args, h, pushType); pErr != nil { // Do not propagate ambiguous results; assume success and retry original op. if _, ok := pErr.GetDetail().(*roachpb.AmbiguousResultError); !ok { // Preserve the error index. diff --git a/pkg/storage/txnwait/queue.go b/pkg/storage/txnwait/queue.go index 9934d5458f88..c68baa9c1719 100644 --- a/pkg/storage/txnwait/queue.go +++ b/pkg/storage/txnwait/queue.go @@ -278,6 +278,7 @@ func (q *Queue) UpdateTxn(ctx context.Context, txn *roachpb.Transaction) { q.mu.Unlock() return } + pending, ok := q.mu.txns[txn.ID] if !ok { q.mu.Unlock() @@ -433,7 +434,9 @@ func (q *Queue) MaybeWaitForPush( var queryPusherCh <-chan *roachpb.Transaction // accepts updates to the pusher txn var queryPusherErrCh <-chan *roachpb.Error // accepts errors querying the pusher txn var readyCh chan struct{} // signaled when pusher txn should be queried - if req.PusherTxn.ID != (uuid.UUID{}) { + + // Query the pusher if it's a valid transaction which already has a transaction record. + if req.PusherTxn.ID != uuid.Nil && req.PusherTxn.Key != nil && req.PusherTxn.Writing { // Create a context which will be canceled once this call completes. // This ensures that the goroutine created to query the pusher txn // is properly cleaned up. diff --git a/pkg/util/interval/interval.go b/pkg/util/interval/interval.go index 4453d3476b1e..73116eb1d6f3 100644 --- a/pkg/util/interval/interval.go +++ b/pkg/util/interval/interval.go @@ -140,7 +140,7 @@ func Compare(a, b Interface) int { } } -// Equal returns a boolean indicating whethter the given Interfaces are equal to each other. If +// Equal returns a boolean indicating whether the given Interfaces are equal to each other. If // "Equal(a, b) == true", "a.Range().End == b.Range().End" must hold. Otherwise, the interval tree // behavior is undefined. "Equal(a, b) == true" is equivalent to "Compare(a, b) == 0". But the // former has measurably better performance than the latter. So Equal should be used when only